Pular para o conteúdo principal

Tutorial: Crie um ETL pipeline usando a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow

Aprenda a criar e implantar um ETL (extract, transform, and load) pipeline com captura de dados de alterações (CDC) (CDC) usando LakeFlow Declarative pipeline for data orquestração e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados dos sistemas de origem, transformar esses dados com base nos requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros, e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, o senhor usará os dados de uma tabela customers em um banco de dados MySQL para:

  • Extraia as alterações de um banco de dados transacional usando Debezium ou qualquer outra ferramenta e salve-as em um armazenamento de objetos cloud (pasta S3 , ADLS, GCS). Neste tutorial, você pulará a configuração de um sistema CDC externo e, em vez disso, gerará alguns dados falsos para simplificar o tutorial.
  • Use Auto Loader para carregar incrementalmente as mensagens do armazenamento de objetos cloud e armazenar as mensagens brutas na tabela customers_cdc . O Auto Loader infere o esquema e manipula a evolução do esquema.
  • Adicione uma view customers_cdc_clean para verificar a qualidade dos dados usando expectativas. Por exemplo, o id nunca deve ser null pois ele é usado para executar suas operações de upsert.
  • Execute o AUTO CDC ... INTO (fazendo os upserts) nos dados limpos do CDC para aplicar as alterações na tabela final customers
  • Mostre como o LakeFlow Declarative pipeline pode criar um tipo 2 dimensões que mudam lentamente (SCD) (SCD2) para manter o controle de todas as alterações.

O objetivo é ingerir os dados brutos em tempo quase real e criar uma tabela para sua equipe de analistas, garantindo a qualidade dos dados.

O site tutorial usa a arquitetura medallion lakehouse, na qual ingere dados brutos por meio da camada bronze, limpa e valida os dados com a camada prata e aplica modelagem dimensional e agregação usando a camada ouro. Veja O que é a arquitetura medallion lakehouse? para mais informações.

O fluxo implementado se parece com isto:

Pipeline declarativo LakeFlow com CDC

Para obter mais informações sobre LakeFlow Declarative pipeline,, Auto Loader e, consulte CDC LakeFlow Declarative pipeline, What is?,Auto Loadere What is captura de dados de alterações (CDC) (CDC)?

Requisitos

Para completar este tutorial, você deve atender aos seguintes requisitos:

captura de dados de alterações (CDC) em um site ETL pipeline

A captura de dados de alterações (CDC) (CDC) é o processo que captura as alterações nos registros feitos em um banco de dados transacional (por exemplo, MySQL ou PostgreSQL) ou em um data warehouse. CDC captura operações como exclusão, inclusão e atualização de dados, normalmente como uma transmissão para rematerializar a tabela em sistemas externos. O CDC permite o carregamento incremental e elimina a necessidade de atualização do carregamento em massa.

nota

Para simplificar o tutorial, pule a configuração de um sistema CDC externo. Considere executá-lo e salvar os dados do CDC como arquivos JSON em um armazenamento de blobs (S3, ADLS, GCS). Este tutorial usa a biblioteca faker para gerar os dados usados no tutorial.

Capturando o CDC

Há uma variedade de ferramentas do CDC disponíveis. Uma das soluções líderes em código aberto é o Debezium, mas existem outras implementações que simplificam a fonte de dados, como Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate e AWS DMS.

Neste tutorial, o senhor usa os dados do CDC de um sistema externo, como o Debezium ou o DMS. O Debezium captura todas as linhas alteradas. Normalmente, ele envia o histórico de alterações de dados para Kafka logs ou os salva como um arquivo.

O senhor deve ingerir as informações CDC da tabela customers (formatoJSON ), verificar se estão corretas e, em seguida, materializar a tabela de clientes no lakehouse.

Entrada do CDC do Debezium

Para cada alteração, você recebe uma mensagem JSON contendo todos os campos da linha que está sendo atualizada (id, firstname, lastname, email, address). A mensagem também inclui metadados adicionais:

  • operation: Um código de operações, normalmente (DELETE, APPEND, UPDATE).
  • operation_date: A data e o carimbo de data/hora do registro de cada ação de operações.

Ferramentas como o Debezium podem produzir resultados mais avançados, como o valor da linha antes da alteração, mas este tutorial as omite para simplificar.

o passo 1: Criar um pipeline

Crie um novo pipeline ETL no pipeline declarativo LakeFlow para consultar seu CDC fonte de dados e gerar tabelas em seu workspace.

  1. No seu workspace, clique em Ícone de mais. Novo no canto superior esquerdo.

  2. Clique em pipelineETL .

  3. Altere o título do pipeline para Pipelines with CDC tutorial ou um nome de sua preferência.

  4. Abaixo do título, escolha um catálogo e esquema para os quais você tenha permissões de gravação.

    Este catálogo e esquema são usados por default, se você não especificar um catálogo ou esquema em seu código. Seu código pode gravar em qualquer catálogo ou esquema especificando o caminho completo. Este tutorial usa o padrão que você especificar aqui.

  5. Em Opções avançadas , selecione começar com um arquivo vazio .

  6. Escolha uma pasta para seu código. Você pode selecionar Procurar para navegar pela lista de pastas no workspace. Você pode escolher qualquer pasta para a qual tenha permissões de gravação.

    Para usar o controle de versão, selecione uma pasta Git. Se você precisar criar uma nova pasta, selecione Ícone de mais. botão.

  7. Escolha Python ou SQL para a linguagem do seu arquivo, com base na linguagem que você deseja usar para o tutorial.

  8. Clique em Selecionar para criar o pipeline com essas configurações e abrir o Editor LakeFlow Pipelines .

Agora você tem um pipeline em branco com um catálogo e esquema default . Em seguida, configure os dados de amostra para importar no tutorial.

o passo 2: Crie os dados de amostra para importar neste tutorial

Este passo não é necessário se você estiver importando seus próprios dados de uma fonte existente. Para este tutorial, gere dados falsos como exemplo para o tutorial. Crie um Notebook para executar o script de geração de dados Python . Este código só precisa ser executado uma vez para gerar os dados de amostra, então crie-o dentro da pasta explorations do pipeline, que não é executada como parte de uma atualização pipeline .

nota

Este código usa o Faker para gerar os dados de amostra do CDC. O Faker está disponível para instalação automática, então o tutorial usa %pip install faker. Você também pode definir uma dependência no faker para o Notebook. Consulte Adicionar dependências ao Notebook.

  1. No Editor LakeFlow Pipelines , na barra lateral do navegador ativo à esquerda do editor, clique em Ícone de mais. Adicione e escolha Exploração .

  2. Dê um nome , como Setup data, selecione Python . Você pode deixar a pasta de destino default , que é uma nova pasta explorations .

  3. Clique em Criar . Isso cria um Notebook na nova pasta.

  4. Digite o seguinte código na primeira célula. Você deve alterar a definição de <my_catalog> e <my_schema> para corresponder ao catálogo e esquema default selecionados no procedimento anterior:

    Python
    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"

    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`{volume_name}`')
    volume_folder = f"/Volumes/{catalog}/{db}/raw_data"

    try:
    dbutils.fs.ls(volume_folder+"/customers")
    except:
    print(f"folder doesn't exist, generating the data under {volume_folder}...")
    from pyspark.sql import functions as F
    from faker import Faker
    from collections import OrderedDict
    import uuid
    fake = Faker()
    import random

    fake_firstname = F.udf(fake.first_name)
    fake_lastname = F.udf(fake.last_name)
    fake_email = F.udf(fake.ascii_company_email)
    fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
    fake_address = F.udf(fake.address)
    operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
    fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
    fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

    df = spark.range(0, 100000).repartition(100)
    df = df.withColumn("id", fake_id())
    df = df.withColumn("firstname", fake_firstname())
    df = df.withColumn("lastname", fake_lastname())
    df = df.withColumn("email", fake_email())
    df = df.withColumn("address", fake_address())
    df = df.withColumn("operation", fake_operation())
    df_customers = df.withColumn("operation_date", fake_date())
    df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
  5. Para gerar o dataset utilizado no tutorial, digite Shift + Enter para executar o código:

  6. Opcional. Para visualizar os dados usados neste tutorial, insira o seguinte código na próxima célula e execute o código. Você precisa atualizar o catálogo e o esquema para corresponder ao caminho do código anterior.

    Python
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"

    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))

Isso gera um grande conjunto de dados (com dados falsos do CDC) que você pode usar no restante do tutorial. Na próxima etapa, ingerir o uso de dados Auto Loader.

o passo 3: Ingerir dados incrementalmente com Auto Loader

O próximo passo é ingerir os dados brutos do armazenamento cloud (falso) em uma camada de bronze.

Isso pode ser um desafio por vários motivos, pois você deve:

  • Operar em escala, potencialmente ingerindo milhões de arquivos pequenos.
  • Inferir o esquema e o tipo JSON.
  • Tratar registros ruins com esquema JSON incorreto.
  • Cuidar da evolução do esquema (por exemplo, uma nova coluna na tabela de clientes).

Auto Loader simplificam essa ingestão, incluindo a inferência de esquemas e a evolução do esquema, ao mesmo tempo em que são dimensionados para milhões de arquivos recebidos. O Auto Loader está disponível em Python usando cloudFiles e em SQL usando SELECT * FROM STREAM read_files(...) e pode ser usado com vários formatos (JSON, CSV, Apache Avro, etc.):

Definir a tabela como uma tabela de transmissão garante que você consuma apenas novos dados recebidos. Se você não defini-la como uma tabela de transmissão, ela verifica e ingere todos os dados disponíveis. Consulte tabelas de transmissão para mais informações.

  1. Para ingerir o CDC de entrada uso de dados Auto Loader, copie e cole o seguinte código no arquivo de código que foi criado com seu pipeline (chamado my_transformation.py). Você pode usar Python ou SQL, dependendo da linguagem escolhida ao criar o pipeline. Certifique-se de substituir <catalog> e <schema> pelos que você configurou como default para o pipeline.
Python
from dlt import *
from pyspark.sql.functions import *

# Replace with the catalog and schema name that
# you are using:
path = "/Volumes/<catalog>/<schema>/raw_data/customers"


# Create the target bronze table
dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")

# Create an Append Flow to ingest the raw data into the bronze table
@append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(f"{path}")
)
  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução para iniciar uma atualização para o pipeline conectado. Com apenas um arquivo de origem no seu pipeline, eles são funcionalmente equivalentes.

Quando a atualização for concluída, o editor será atualizado com informações sobre seu pipeline.

  • O gráfico de pipeline (DAG), na barra lateral à direita do seu código, mostra uma única tabela, customers_cdc_bronze.
  • Um resumo da atualização é mostrado na parte superior do navegador ativo pipeline .
  • Os detalhes da tabela gerada são mostrados no painel inferior, e você pode navegar pelos dados da tabela selecionando-a.

Estes são os dados brutos da camada de bronze importados dos dados cloud . No próximo passo, limpe os dados para criar uma tabela de camada prateada.

o passo 4: Limpeza e expectativas para rastrear a qualidade dos dados

Depois que a camada bronze for definida, crie as camadas prata adicionando expectativas para controlar a qualidade dos dados. Verifique as seguintes condições:

  • O ID nunca deve ser null.
  • O tipo CDC operações deve ser válido.
  • O site json deve ter sido lido adequadamente pelo Auto Loader.

As linhas que não atendem a essas condições são descartadas.

Para obter mais informações, consulte gerenciar a qualidade dos dados com pipeline expectations.

  1. Na barra lateral do navegador ativo pipeline , clique em Ícone de mais. Adicione então transformações .

  2. Digite um Nome e escolha uma linguagem (Python ou SQL) para o arquivo de código-fonte. Você pode misturar e combinar idiomas dentro de um pipeline, para que você possa escolher qualquer um para este ou aquele passo.

  3. Para criar uma camada prateada com uma tabela limpa e impor restrições, copie e cole o seguinte código no novo arquivo (escolha Python ou SQL com base na linguagem do arquivo).

Python
from dlt import *
from pyspark.sql.functions import *

dlt.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)

@append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
dlt.read_stream("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
)
  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução para iniciar uma atualização para o pipeline conectado.

    Como agora há dois arquivos de origem, eles não fazem a mesma coisa, mas, neste caso, a saída é a mesma.

    • execução pipeline execução de todo o seu pipeline, incluindo o código da etapa 3. Se seus dados de entrada estivessem sendo atualizados, isso puxaria todas as alterações dessa fonte para sua camada bronze. Isso não executa o código da configuração de dados ou passo, porque ele está na pasta explorations e não faz parte do código-fonte do seu pipeline.
    • arquivo de execução execução apenas do arquivo de origem atual. Nesse caso, sem que seus dados de entrada sejam atualizados, isso gera os dados de prata da tabela de bronze armazenada em cache. Seria útil executar apenas este arquivo para uma iteração mais rápida ao criar ou editar seu código pipeline .

Quando a atualização for concluída, você poderá ver que o gráfico do pipeline agora mostra duas tabelas (com a camada prateada dependendo da camada bronze) e o painel inferior mostra os detalhes de ambas as tabelas. A parte superior do navegador ativo pipeline agora mostra vários tempos de execução, mas apenas detalhes da execução mais recente.

Em seguida, crie sua versão final da camada ouro da tabela customers .

o passo 5: Materializando a tabela de clientes com um fluxo AUTO CDC

Até este ponto, as tabelas apenas passaram os dados CDC em cada passo. Agora, crie a tabela customers para conter a view mais atualizada e ser uma réplica da tabela original, não a lista de operações CDC que a criou.

Isso não é trivial de implementar manualmente. Você deve considerar coisas como a desduplicação de dados para manter a linha mais recente.

No entanto, o pipeline LakeFlow Declarative resolve esses desafios com as AUTO CDC operações.

  1. Na barra lateral do navegador ativo pipeline , clique em Ícone de mais. Adicione e transforme .

  2. Digite um Nome e escolha uma linguagem (Python ou SQL) para o novo arquivo de código-fonte. Você pode escolher qualquer idioma para este passo, mas use o código correto, abaixo.

  3. Para processar o CDC uso de dados AUTO CDC no pipeline declarativo LakeFlow , copie e cole o seguinte código no novo arquivo.

Python
from dlt import *
from pyspark.sql.functions import *

dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")

dlt.create_auto_cdc_flow(
target="customers", # The customer table being materialized
source="customers_cdc_clean", # the incoming CDC
keys=["id"], # what we'll be using to match the rows to upsert
sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
except_column_list=["operation", "operation_date", "_rescued_data"],
)
  1. Clique Ícone de reprodução. arquivo de execução para iniciar uma atualização para o pipeline conectado.

Quando a atualização estiver concluída, você poderá ver que seu gráfico de pipeline mostra 3 tabelas, progredindo de bronze para prata e para ouro.

o passo 6: acompanhamento update história com dimensões que mudam lentamente (SCD) do tipo 2 (SCD2)

Muitas vezes, é necessário criar uma tabela para acompanhar todas as alterações resultantes de APPEND, UPDATE e DELETE:

  • história: O senhor deseja manter um histórico de todas as alterações em sua tabela.
  • Rastreabilidade: O senhor deseja ver quais operações ocorreram.

SCD2 com LakeFlow Declarative pipeline

O Delta é compatível com o fluxo de dados de alteração (CDF), e o site table_change pode consultar a modificação da tabela em SQL e Python. No entanto, o principal caso de uso do CDF é capturar alterações em um pipeline e não criar um view completo das alterações da tabela desde o início.

As coisas ficam especialmente complexas de implementar se você tiver eventos fora de ordem. Se o senhor precisar sequenciar as alterações por um carimbo de data/hora e receber uma modificação que ocorreu no passado, deverá acrescentar uma nova entrada na tabela SCD e atualizar as entradas anteriores.

O pipeline declarativo LakeFlow remove essa complexidade e permite que você crie uma tabela separada contendo todas as modificações desde o início dos tempos. Essa tabela pode então ser usada em escala, com partições específicas/colunas zorder, se necessário. Os campos fora de ordem são tratados imediatamente com base na sequência _sequence_by

Para criar uma tabela SCD2, devemos usar a opção: STORED AS SCD TYPE 2 em SQL ou stored_as_scd_type="2" em Python.

nota

O senhor também pode limitar as colunas que o recurso rastreia usando a opção: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. Na barra lateral do navegador ativo pipeline , clique em Ícone de mais. Adicione e transforme .

  2. Digite um Nome e escolha uma linguagem (Python ou SQL) para o novo arquivo de código-fonte.

  3. Copie e cole o seguinte código no novo arquivo.

Python
from dlt import *
from pyspark.sql.functions import *

# create the table
dlt.create_streaming_table(
name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
)

# store all changes as SCD2
dlt.create_auto_cdc_flow(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
) # Enable SCD2 and store individual updates
  1. Clique Ícone de reprodução. arquivo de execução para iniciar uma atualização para o pipeline conectado.

Quando a atualização estiver concluída, o gráfico do pipeline incluirá a nova tabela customers_history , também dependente da tabela da camada prateada, e o painel inferior mostrará os detalhes de todas as 4 tabelas.

o passo 7: Crie uma view materializada que rastreie quem mais alterou suas informações

A tabela customers_history contém todas as alterações históricas que um usuário fez em suas informações. Crie uma view materializada simples na camada ouro que monitore quem mais alterou suas informações. Isso pode ser usado para análise de detecção de fraudes ou recomendações de usuários em um cenário do mundo real. Além disso, a aplicação de alterações com o SCD2 já removeu duplicatas, então você pode contar diretamente as linhas por ID de usuário.

  1. Na barra lateral do navegador ativo pipeline , clique em Ícone de mais. Adicione e transforme .

  2. Digite um Nome e escolha uma linguagem (Python ou SQL) para o novo arquivo de código-fonte.

  3. Copie e cole o seguinte código no novo arquivo de origem.

Python
from dlt import *
from pyspark.sql.functions import *

@dlt.table(
name = "customers_history_agg",
comment = "Aggregated customer history"
)
def customers_history_agg():
return (
dlt.read("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
)
  1. Clique Ícone de reprodução. arquivo de execução para iniciar uma atualização para o pipeline conectado.

Após a conclusão da atualização, há uma nova tabela no gráfico pipeline que depende da tabela customers_history , e você pode view la no painel inferior. Seu pipeline agora está completo. Você pode testá-lo executando um pipelinede execução completo. Os únicos passos que faltam são programar o pipeline para atualizar regularmente.

o passo 8: Crie um Job para executar o pipeline ETL

Em seguida, crie um fluxo de trabalho para automatizar a ingestão de dados, o processamento e a análise dos passos em seu pipeline usando um trabalho Databricks .

  1. Na parte superior do editor, escolha o botão programar .
  2. Se a caixa de diálogo do programa aparecer, escolha Adicionar programa .
  3. Isso abre a caixa de diálogo Novo programa , onde você pode criar um Job para executar seu pipeline em um programa.
  4. Opcionalmente, dê um nome ao trabalho.
  5. Por default, o programador é configurado para executar uma vez por dia. Você pode aceitar esse padrão ou definir seu próprio programa. Escolher Avançado lhe dá a opção de definir um horário específico para a execução do trabalho. Selecionar Mais opções permite que você crie notificações quando o trabalho for executado.
  6. Selecione Criar para aplicar as alterações e criar o trabalho.

Agora o Job será executado diariamente para manter seu pipeline atualizado. Você pode escolher programar novamente para view a lista de programas. Você pode gerenciar programas para seu pipeline a partir dessa caixa de diálogo, incluindo adicionar, editar ou remover programas.

Clicar no nome do programa (ou Job) leva você para a página do Job na lista Jobs & pipeline . A partir daí você pode view detalhes sobre a execução do Job, incluindo a história de execução, ou executar o Job imediatamente com o botão executar agora .

Consulte monitoramento e observabilidade para LakeFlow Jobs para obter mais informações sobre a execução de trabalhos.

Recurso adicional