Pular para o conteúdo principal

ETL no Databricks SQL

Ao lidar com grandes quantidades de dados, você precisa de um pipeline que possa processar apenas os registros novos e alterados, em vez de reprocessar todo o dataset. Isso se chama ETL incremental. No Databricks SQL, você pode criar um pipeline ETL incremental usando tabelas de transmissão e visualizações materializadas, sem escrever código procedural ou realizar atualizações manuais programadas.

Este tutorial explica um padrão comum: o acompanhamento do produto muda ao longo do tempo. Você cria uma tabela de origem, captura eventos de alteração, constrói uma tabela de dimensão que preserva todo o histórico de cada produto e adiciona uma camada de relatório agregado por cima.

O recurso key neste tutorial é AUTO CDC. Num armazém tradicional, você escreveria instruções MERGE INTO complexas para conciliar eventos de inserção, atualização e exclusão em uma tabela de destino. Essa abordagem é propensa a erros, especialmente quando os eventos chegam fora de ordem. AUTO CDC trata disso para você. Você declara a key de negócio, a coluna de sequenciamento e se deseja SCD Tipo 1 (somente o valor mais recente) ou SCD Tipo 2 (histórico completo), e Databricks aplica automaticamente a lógica merge correta. Para obter uma visão geral do CDC, consulte APIs do AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline.

Ao final deste tutorial, você terá:

  1. Criei uma tabela de origem que rastreia alterações com o Feed de Dados de Alteração.
  2. Analisei os dados brutos de alterações para compreender a transmissão de eventos CDC .
  3. Utilizou-se AUTO CDC para construir uma tabela de dimensão SCD Tipo 2 a partir desses eventos.
  4. Os eventos de exclusão foram processados incrementalmente ao longo do pipeline.
  5. Criou-se uma view materializada que mantém incrementalmente um relatório agregado.
  6. Configurado SCHEDULE REFRESH EVERY 1 DAY para que as alterações se propaguem automaticamente pelo pipeline.

Requisitos

Para completar este tutorial, você deve atender aos seguintes requisitos:

Passo 1: Configure seu catálogo e esquema

Abra o editorDatabricks SQL e defina seu catálogo e esquema de trabalho. Você precisa ter permissão para USE o catálogo e o esquema que você selecionar:

SQL
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

Etapa 2: Criar uma tabela de origem e carregar dados

Crie uma tabela products com a opção Usar feed de dados de alteração do Delta Lake no Databricks (CDF) ativada. O CDF é um recurso Delta Lake que registra cada inserção, atualização e exclusão como um log de alterações consultável. Isso é semelhante a uma transmissão CDC de um sistema de origem transacional, exceto que as alterações são capturadas diretamente na tabela Delta , em vez de em um log externo. Você usa o CDF aqui para gerar os eventos de mudança que o pipeline subsequente irá consumir.

  1. Crie a tabela e carregue os registros iniciais:

    SQL
    CREATE OR REPLACE TABLE products (
    product_id INT,
    product_name STRING,
    category STRING,
    warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);

    INSERT INTO products VALUES
    (1, 'Spoon', 'Cutlery', 'Seattle'),
    (2, 'Fork', 'Cutlery', 'Portland'),
    (3, 'Knife', 'Cutlery', 'Denver'),
    (4, 'Chair', 'Furniture', 'Austin'),
    (5, 'Table', 'Furniture', 'Chicago'),
    (6, 'Lamp', 'Lighting', 'Boston'),
    (7, 'Mug', 'Kitchenware', 'Seattle'),
    (8, 'Plate', 'Kitchenware', 'Atlanta'),
    (9, 'Bowl', 'Kitchenware', 'Dallas'),
    (10, 'Glass', 'Kitchenware', 'Phoenix');
  2. Simule alterações a montante, incluindo novos produtos, mudança de armazém e reatribuição de categoria:

    SQL
    INSERT INTO products VALUES
    (11, 'Napkin', 'Dining', 'San Francisco'),
    (12, 'Coaster', 'Dining', 'New York');

    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;

o passo 3: Consultar o feed de dados alterados

Antes de construir o pipeline downstream, é útil analisar os eventos de mudança brutos para que você possa entender o que AUTO CDC irá processar. A função table_changes() lê o log do CDF e retorna todas as operações capturadas juntamente com as colunas de metadados:

SQL
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

Por exemplo, a Colher tem três eventos: um insert (Seattle), um update_preimage (Seattle) e um update_postimage (Los Angeles).

Note que uma única mudança lógica (por exemplo, mover a colher para um armazém diferente) produz múltiplos eventos: uma pré-imagem e uma pós-imagem. Num data warehouse tradicional, você escreveria uma instrução MERGE para reconciliar todos esses eventos em uma tabela de destino, lidando com inserções, atualizações e exclusões com lógica separada e garantindo que os eventos sejam aplicados na ordem correta. Esta é exatamente a complexidade que AUTO CDC elimina no próximo passo.

o passo 4: Construir uma dimensão SCD Tipo 2 com AUTO CDC

info

Beta

AUTO CDC Está em versão Beta. Requer Databricks Runtime 17.3 ou acima.

Uma tabela de transmissão processa dados de forma incremental. A cada refresh, o programa lê apenas as novas linhas desde a última execução, portanto não precisa reprocessar todo o dataset. Isso o torna ideal para fontes de alto volume ou que mudam com frequência.

AUTO CDC adiciona processamento de captura de dados de alterações (CDC) em cima de uma tabela de transmissão. Em vez de escrever uma instrução MERGE INTO que lida manualmente com inserções, atualizações e exclusões, você declara a key de negócio e a coluna de sequenciamento e deixa Databricks aplicar a lógica correta. AUTO CDC também lida com eventos fora de ordem automaticamente, o que é um problema comum ao usar MERGE INTO para lidar com eventos que chegam de sistemas distribuídos ou cargas de lotes com carimbos de data/hora sobrepostos.

A seguinte instrução cria uma tabela SCD Tipo 2 que preserva o histórico completo de versões de cada produto. Cada versão recebe carimbos de data/hora __START_AT e __END_AT . Um NULL em __END_AT marca a versão atual.

SQL
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAYAtualizar a tabela diariamente, conforme programado.
  • FLOW AUTO CDC: declara isso como um fluxo do CDC. O Databricks aplica automaticamente as semânticas de inserção, atualização e exclusão.
  • KEYS (product_id): a key do negócio. Eventos com a mesma key são mesclados em linhas versionadas.
  • APPLY AS DELETE WHEN _change_type = 'delete': Encerra a versão atual quando um evento de exclusão chega. Isso permite definir a condição que identifica um evento de exclusão.
  • SEQUENCE BY _commit_timestamp: estabelece a ordem dos eventos. Lida corretamente com chegadas fora de ordem.
  • STORED AS SCD TYPE 2: mantém a história completa. AUTO CDC suporta tanto SCD Tipo 1 quanto SCD Tipo 2.

Consultar a tabela de dimensões:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • Colher: duas versões. Seattle (fechado, conjunto __END_AT ) e Los Angeles (atual, __END_AT = NULL).
  • Bifurcação: duas versões. Categoria Talheres (fechada) e Categoria Refeições (atual).
  • Guardanapo e porta-copos: uma versão de cada (recém-inseridos, __END_AT = NULL).
  • Todos os outros produtos: uma versão cada (__END_AT = NULL).

o passo 5: Processar exclusões através do pipeline

Agora, simule dois produtos descontinuados, excluindo-os da tabela de origem:

SQL
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

Esses eventos de exclusão são registrados no log do CDF, mas a tabela de transmissão ainda não os viu. Atualize a tabela de transmissão para processar os novos eventos:

SQL
REFRESH STREAMING TABLE products_history;

Consulte a tabela de dimensões para verificar se as exclusões foram aplicadas:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

O Bowl e o Glass agora estão fechados com __END_AT definido, marcando-os como descontinuados. Todos os demais produtos atuais permanecem inalterados. A tabela de transmissão processou apenas os novos eventos de exclusão, sem reprocessar as inserções e atualizações da refresh anterior.

o passo 6: Criar uma viewmaterializada agregada

Agora que você tem uma tabela de dimensões que se mantém atualizada com as alterações da fonte, você pode adicionar uma camada de relatórios por cima.

Uma viewmaterializada armazena os resultados de consultas pré-computadas como uma tabela física. Ao contrário de uma view regular, que reexecuta a consulta cada vez que você lê dela, uma view materializada persiste os resultados e recalcula apenas as linhas afetadas por alterações upstream a cada refresh. Isso o torna ideal para dashboards e relatórios onde o desempenho das consultas é importante.

SQL
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY Significa que esta view é atualizada diariamente pelo programador. Combinando isso com o mesmo programa na tabela de transmissão, você agora tem um pipeline de três estágios, onde as alterações na tabela de origem se propagam pela dimensão e chegam ao agregado a cada ciclo refresh . Não há refresh manual para execução.

SQL
SELECT * FROM products_by_category ORDER BY active_products DESC;

o passo 7: Verificar a cascata de ponta a ponta

Para verificar toda a cascata do pipeline, faça uma alteração na tabela de origem:

SQL
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

O filme The Knife muda-se de Denver para Seattle. Essa única alteração DML desencadeia toda a cascata do pipeline, demonstrando como os três estágios funcionam em conjunto:

  1. products Registra o evento de alteração por meio do CDF.
  2. products_history Processa o evento e adiciona uma nova versão para a Faca.
  3. products_by_category Recalcula apenas a linha de talheres afetada.

Verificar:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

Limpar

Para limpar o recurso criado por este tutorial, use o seguinte SQL:

SQL
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

Recursos adicionais