Pular para o conteúdo principal

Tutorial: Crie um ETL pipeline usando a captura de dados de alterações (CDC) com o DLT

Aprenda a criar e implantar um ETL (extract, transform, and load) pipeline com captura de dados de alterações (CDC) (CDC) usando DLT para obtenção de dados e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados dos sistemas de origem, transformar esses dados com base nos requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros, e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, o senhor usará os dados de uma tabela customers em um banco de dados MySQL para:

  • Extraia as alterações de um banco de dados transacional usando o Debezium ou qualquer outra ferramenta e salve-as em um armazenamento de objetos na nuvem (pasta S3, ADLS, GCS). O senhor deixará de configurar um sistema CDC externo para simplificar o tutorial.
  • Use o Auto Loader para carregar de forma incremental as mensagens do armazenamento de objetos na nuvem e armazenar as mensagens brutas na tabela customers_cdc. O Auto Loader inferirá o esquema e tratará da evolução do esquema.
  • Adicione um view customers_cdc_clean para verificar a qualidade dos dados usando as expectativas. Por exemplo, o endereço id nunca deve ser null, pois o senhor o utilizará para executar suas operações de upsert.
  • Execute o APPLY CHANGES INTO (fazendo os upserts) nos dados limpos do CDC para aplicar as alterações na tabela final customers
  • Mostre como o site DLT pode criar um tipo 2 dimensões que mudam lentamente (SCD) (SCD2) para manter o controle de todas as alterações.

O objetivo é ingerir os dados brutos em tempo quase real e criar uma tabela para sua equipe de analistas, garantindo a qualidade dos dados.

O site tutorial usa a arquitetura medallion lakehouse, na qual ingere dados brutos por meio da camada bronze, limpa e valida os dados com a camada prata e aplica modelagem dimensional e agregação usando a camada ouro. Veja O que é a arquitetura medallion lakehouse? para mais informações.

O fluxo que você implementará tem a seguinte aparência:

DLT pipeline com CDC

Para obter mais informações DLT sobre, Auto Loader CDC DLTe, consulte, O que Auto Loaderé?, e O que é captura de dados de alterações (CDC) (CDC)?

Requisitos

Para completar este tutorial, você deve atender aos seguintes requisitos:

captura de dados de alterações (CDC) em um site ETL pipeline

A captura de dados de alterações (CDC) (CDC) é o processo que captura as alterações nos registros feitos em um banco de dados transacional (por exemplo, MySQL ou PostgreSQL) ou em um data warehouse. CDC captura operações como exclusão, inclusão e atualização de dados, normalmente como uma transmissão para rematerializar a tabela em sistemas externos. O CDC permite o carregamento incremental e elimina a necessidade de atualização do carregamento em massa.

nota

Para simplificar o tutorial, pule a configuração de um sistema CDC externo. O senhor pode considerá-lo em funcionamento e salvar os dados do CDC como arquivos JSON em um armazenamento de blob (S3, ADLS, GCS).

Capturando o CDC

Há uma variedade de ferramentas do CDC disponíveis. Uma das soluções líderes em código aberto é o Debezium, mas existem outras implementações que simplificam a fonte de dados, como Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate e AWS DMS.

Neste tutorial, o senhor usa os dados do CDC de um sistema externo, como o Debezium ou o DMS. O Debezium captura todas as linhas alteradas. Normalmente, ele envia o histórico de alterações de dados para Kafka logs ou os salva como um arquivo.

O senhor deve ingerir as informações CDC da tabela customers (formatoJSON ), verificar se estão corretas e, em seguida, materializar a tabela de clientes no lakehouse.

Entrada do CDC do Debezium

Para cada alteração, o senhor receberá uma mensagem JSON contendo todos os campos da linha que está sendo atualizada (id, firstname, lastname, email, address). Além disso, o senhor terá informações extras de metadados, incluindo:

  • operation: Um código de operações, normalmente (DELETE, APPEND, UPDATE).
  • operation_date: A data e o carimbo de data/hora do registro de cada ação de operações.

Ferramentas como o Debezium podem produzir resultados mais avançados, como o valor da linha antes da alteração, mas este tutorial as omite para simplificar.

Etapa 0: Configuração dos dados do tutorial

Primeiro, o senhor deve criar um novo Notebook e instalar os arquivos de demonstração usados neste tutorial em seu workspace.

  1. Clique em Novo no canto superior esquerdo.

  2. Clique em Notebook .

  3. Altere o título do Notebook de sem título Notebook <date and time> para DLT tutorial setup.

  4. Ao lado do título do Notebook na parte superior, defina o idioma do Notebook default para Python .

  5. Para gerar o dataset usado no tutorial, digite o seguinte código na primeira célula e pressione Shift + Enter para executar o código:

    Python
    # You can change the catalog, schema, dbName, and db. If you do so, you must also
    # change the names in the rest of the tutorial.
    catalog = "main"
    schema = dbName = db = "dbdemos_dlt_cdc"
    volume_name = "raw_data"

    spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
    volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"

    try:
    dbutils.fs.ls(volume_folder+"/customers")
    except:
    print(f"folder doesn't exists, generating the data under {volume_folder}...")
    from pyspark.sql import functions as F
    from faker import Faker
    from collections import OrderedDict
    import uuid
    fake = Faker()
    import random

    fake_firstname = F.udf(fake.first_name)
    fake_lastname = F.udf(fake.last_name)
    fake_email = F.udf(fake.ascii_company_email)
    fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
    fake_address = F.udf(fake.address)
    operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
    fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
    fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

    df = spark.range(0, 100000).repartition(100)
    df = df.withColumn("id", fake_id())
    df = df.withColumn("firstname", fake_firstname())
    df = df.withColumn("lastname", fake_lastname())
    df = df.withColumn("email", fake_email())
    df = df.withColumn("address", fake_address())
    df = df.withColumn("operation", fake_operation())
    df_customers = df.withColumn("operation_date", fake_date())
    df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
  6. Para visualizar os dados usados neste tutorial, digite o código na próxima célula e pressione Shift + Enter para executar o código:

    Python
    display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))

Etapa 1: Criar um pipeline

Primeiro, o senhor criará um pipeline de ETL no DLT. DLT cria o pipeline resolvendo as dependências definidas no Notebook ou em arquivos (chamados de código-fonte ) usando a sintaxe DLT. Cada arquivo de código-fonte pode conter apenas um idioma, mas o senhor pode adicionar Notebook ou arquivos específicos de vários idiomas no site pipeline. Para saber mais, consulte DLT

important

Deixe o campo Código-fonte em branco para criar e configurar automaticamente um Notebook para criação de código-fonte.

Este tutorial usa serverless compute e Unity Catalog. Para todas as opções de configuração que não forem especificadas, use as configurações do site default. Se o serverless compute não estiver habilitado ou não for compatível com o seu workspace, o senhor poderá concluir o tutorial conforme escrito usando as configurações do default compute . Se o senhor usar as configurações do default compute , deverá selecionar manualmente Unity Catalog em Storage options (Opções de armazenamento) na seção Destination (Destino ) da interface de usuário Create pipeline .

Para criar um novo pipeline de ETL no DLT, siga estas etapas:

  1. Na barra lateral, clique em pipeline .
  2. Clique em Create pipeline e ETL pipeline .
  3. Em nome do pipeline , digite um nome exclusivo pipeline.
  4. Marque a caixa de seleção sem servidor .
  5. Selecione Triggered (Acionado ) no modo pipeline . Isso executará os fluxos de transmissão usando o acionador AvailableNow, que processa todos os dados existentes e, em seguida, encerra a transmissão.
  6. No Destination , para configurar um local do Unity Catalog onde as tabelas são publicadas, selecione um catálogo existente e escreva um novo nome em Schema para criar um novo esquema em seu catálogo.
  7. Clique em Criar .

A interface do usuário do pipeline é exibida para o novo pipeline.

Um Notebook de código-fonte em branco é criado e configurado automaticamente para o site pipeline. O Notebook é criado em um novo diretório no seu diretório de usuário. O nome do novo diretório e arquivo corresponde ao nome do seu pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline.

  1. Um link para acessar esse Notebook está abaixo do campo Código-fonte no painel de detalhes do pipeline . Clique no link para abrir o Notebook antes de prosseguir para a próxima etapa.
  2. Clique em Connect (Conectar ) no canto superior direito para abrir o menu de configuração do site compute.
  3. Passe o mouse sobre o nome do pipeline que o senhor criou na Etapa 1.
  4. Clique em Conectar .
  5. Ao lado do título do Notebook na parte superior, selecione o idioma do Notebook default (Python ou SQL).
important

O Notebook só pode conter uma única linguagem de programação. Não misture os códigos Python e SQL no Notebook de código-fonte pipeline.

Ao desenvolver um pipeline de DLT, o senhor pode escolher entre Python ou SQL. Este tutorial inclui exemplos para ambas as linguagens. Com base em sua escolha de idioma, verifique se o idioma do notebook default foi selecionado.

Para saber mais sobre o suporte do Notebook para DLT pipeline desenvolvimento de código, consulte Desenvolver e depurar o pipeline ETL com um Notebook em DLT.

Etapa 2: ingerir dados de forma incremental com o Auto Loader

A primeira etapa é ingerir os dados brutos do armazenamento em nuvem em uma camada de bronze.

Isso pode ser um desafio por vários motivos, pois você deve:

  • Operar em escala, potencialmente ingerindo milhões de arquivos pequenos.
  • Inferir o esquema e o tipo JSON.
  • Tratar registros ruins com esquema JSON incorreto.
  • Cuidar da evolução do esquema (por exemplo, uma nova coluna na tabela de clientes).

Auto Loader simplificam essa ingestão, incluindo a inferência de esquemas e a evolução do esquema, ao mesmo tempo em que são dimensionados para milhões de arquivos recebidos. O Auto Loader está disponível em Python usando cloudFiles e em SQL usando SELECT * FROM STREAM read_files(...) e pode ser usado com vários formatos (JSON, CSV, Apache Avro, etc.):

Definir a tabela como uma tabela de transmissão garantirá que o senhor consuma apenas os novos dados recebidos. Se o senhor não a definir como uma tabela de transmissão, fará a varredura e a ingestão de todos os dados disponíveis. Consulte as tabelas de transmissão para obter mais informações.

  1. Para ingerir o uso de dados de entrada Auto Loader, copie e cole o código a seguir na primeira célula do Notebook. O senhor pode usar Python ou SQL, dependendo do idioma default do Notebook que escolheu na etapa anterior.

    aba :::tab-item[Python]

    Python
    from dlt import *
    from pyspark.sql.functions import *

    # Create the target bronze table
    dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")

    # Create an Append Flow to ingest the raw data into the bronze table
    @append_flow(
    target = "customers_cdc_bronze",
    name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
    return (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
    )

    :::

    :::tab-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";

    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
    "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
    format => "json",
    inferColumnTypes => "true"
    )

    ::: ::::

  2. Clique em começar para começar uma atualização para o site conectado pipeline.

Etapa 3: Limpeza e expectativas para monitorar a qualidade dos dados

Depois que a camada de bronze for definida, você criará as camadas prateadas adicionando expectativas para controlar a qualidade dos dados, verificando as seguintes condições:

  • O ID nunca deve ser null.
  • O tipo CDC operações deve ser válido.
  • O site json deve ter sido lido adequadamente pelo Auto Loader.

A linha será descartada se uma dessas condições não for respeitada.

Para obter mais informações, consulte gerenciar a qualidade dos dados com pipeline expectations.

  1. Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.

  2. Para criar uma camada prateada com uma tabela limpa e impor restrições, copie e cole o código a seguir na nova célula do Notebook.

    aba :::tab-item[Python]

    Python
    dlt.create_streaming_table(
    name = "customers_cdc_clean",
    expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
    )

    @append_flow(
    target = "customers_cdc_clean",
    name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
    return (
    dlt.read_stream("customers_cdc_bronze")
    .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
    )

    :::

    :::tab-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
    CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";

    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;

    ::: ::::

  3. Clique em começar para começar uma atualização para o site conectado pipeline.

Etapa 4: Materializar a tabela de clientes com alterações de aplicação

A tabela customers conterá o view mais atualizado e será uma réplica da tabela original.

Isso não é trivial de implementar manualmente. Você deve considerar coisas como a desduplicação de dados para manter a linha mais recente.

No entanto, o site DLT resolve esses desafios com as operações de alteração de aplicativos .

  1. Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.

  2. Para processar o CDC uso de dados aplicar as alterações em DLT, copie e cole o código a seguir na nova célula do Notebook.

    aba :::tab-item[Python]

    Python
    dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")

    dlt.apply_changes(
    target="customers", # The customer table being materialized
    source="customers_cdc_clean", # the incoming CDC
    keys=["id"], # what we'll be using to match the rows to upsert
    sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
    except_column_list=["operation", "operation_date", "_rescued_data"],
    )

    :::

    :::tab-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers;

    APPLY CHANGES INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;

    ::: ::::

  3. Clique em começar para começar uma atualização para o site conectado pipeline.

Etapa 5: dimensões que mudam lentamente (SCD) do tipo 2 (SCD2)

Muitas vezes, é necessário criar uma tabela para acompanhar todas as alterações resultantes de APPEND, UPDATE e DELETE:

  • história: O senhor deseja manter um histórico de todas as alterações em sua tabela.
  • Rastreabilidade: O senhor deseja ver quais operações ocorreram.

SCD2 com DLT

O Delta é compatível com o fluxo de dados de alteração (CDF), e o site table_change pode consultar a modificação da tabela em SQL e Python. No entanto, o principal caso de uso do CDF é capturar alterações em um pipeline e não criar um view completo das alterações da tabela desde o início.

As coisas ficam especialmente complexas de implementar se você tiver eventos fora de ordem. Se o senhor precisar sequenciar as alterações por um carimbo de data/hora e receber uma modificação que ocorreu no passado, deverá acrescentar uma nova entrada na tabela SCD e atualizar as entradas anteriores.

A DLT elimina essa complexidade e permite que o senhor crie uma tabela separada contendo todas as modificações desde o início do tempo. Essa tabela pode então ser usada em escala, com partições específicas/colunas de fronteira, se necessário. Os campos fora de ordem serão tratados imediatamente com base no _sequence_by

Para criar uma tabela SCD2, devemos usar o APPLY CHANGES com a opção extra: STORED AS SCD TYPE 2 em SQL ou stored_as_scd_type="2" em Python.

nota

O senhor também pode limitar as colunas que o recurso rastreia usando a opção: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.

  2. Copie e cole o código a seguir na nova célula do Notebook.

    aba :::tab-item[Python]

    Python
    # create the table
    dlt.create_streaming_table(
    name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )

    # store all changes as SCD2
    dlt.apply_changes(
    target="customers_history",
    source="customers_cdc_clean",
    keys=["id"],
    sequence_by=col("operation_date"),
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "operation_date", "_rescued_data"],
    stored_as_scd_type="2",
    ) # Enable SCD2 and store individual updates

    :::

    :::tab-item[sql]

    SQL
    CREATE OR REFRESH STREAMING TABLE customers_history;

    APPLY CHANGES INTO customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;

    ::: ::::

  3. Clique em começar para começar uma atualização para o site conectado pipeline.

Etapa 6: Crie um site materializado view que rastreie quem mais alterou suas informações

A tabela customers_history contém todas as alterações históricas que um usuário fez em suas informações. Agora, o senhor criará um view materializado simples na camada de ouro que mantém o controle de quem mais alterou suas informações. Isso poderia ser usado para análise de detecção de fraude ou recomendações de usuários em um cenário real. Além disso, a aplicação de alterações com o SCD2 já removeu as duplicatas para nós, então podemos contar diretamente as linhas por ID de usuário.

  1. Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.

  2. Copie e cole o código a seguir na nova célula do Notebook.

    aba :::tab-item[Python]

    Python
    @dlt.table(
    name = "customers_history_agg",
    comment = "Aggregated customer history"
    )
    def customers_history_agg():
    return (
    dlt.read("customers_history")
    .groupBy("id")
    .agg(
    count("address").alias("address_count"),
    count("email").alias("email_count"),
    count("firstname").alias("firstname_count"),
    count("lastname").alias("lastname_count")
    )
    )

    :::

    :::tab-item[sql]

    SQL
    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
    id,
    count("address") as address_count,
    count("email") AS email_count,
    count("firstname") AS firstname_count,
    count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id

    ::: ::::

  3. Clique em começar para começar uma atualização para o site conectado pipeline.

Etapa 7: Criar um trabalho para executar o DLT pipeline

Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão de dados, processamento e análise usando um Databricks Job.

  1. Em seu site workspace, clique em fluxo de trabalho Icon fluxo de trabalho na barra lateral e clique em Create Job .
  2. Na caixa de título da tarefa, substitua New Job pelo nome do <date and time> trabalho. Por exemplo, CDC customers workflow.
  3. Em Nome da tarefa , insira um nome para a primeira tarefa, por exemplo, ETL_customers_data.
  4. Em Type , selecione pipeline .
  5. No pipeline , selecione o DLT pipeline que o senhor criou na etapa 1.
  6. Clique em Criar .
  7. Para executar o fluxo de trabalho, clique em executar Now . Para acessar view os detalhes da execução , clique em tab. Clique na tarefa para acessar view detalhes da execução da tarefa.
  8. Para view os resultados quando o fluxo de trabalho for concluído, clique em Go to the latest successful execution (Ir para a última execução bem-sucedida ) ou no Começar time (Hora de início ) para a execução do trabalho. A página Saída é exibida e exibe os resultados da consulta.

Consulte monitoramento e observabilidade para Databricks Jobs para obter mais informações sobre a execução de trabalhos.

Etapa 8: Programar o trabalho DLT pipeline

Para executar o ETL pipeline em um programador, siga estas etapas:

  1. Clique em fluxo de trabalho Icon fluxo de trabalho na barra lateral.
  2. Na coluna Name (Nome ), clique no nome do trabalho. O painel lateral aparece como os detalhes do siteJob .
  3. Clique em Add trigger (Adicionar acionador ) no painel Programar & Triggers (Acionadores ) e selecione Scheduled (Programado ) em Trigger type (Tipo de acionador) .
  4. Especifique o período, a hora inicial e o fuso horário.
  5. Clique em Salvar .

Recurso adicional