Tutorial: Crie um ETL pipeline usando a captura de dados de alterações (CDC) com o DLT
Aprenda a criar e implantar um ETL (extract, transform, and load) pipeline com captura de dados de alterações (CDC) (CDC) usando DLT para obtenção de dados e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados dos sistemas de origem, transformar esses dados com base nos requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros, e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, o senhor usará os dados de uma tabela customers
em um banco de dados MySQL para:
- Extraia as alterações de um banco de dados transacional usando o Debezium ou qualquer outra ferramenta e salve-as em um armazenamento de objetos na nuvem (pasta S3, ADLS, GCS). O senhor deixará de configurar um sistema CDC externo para simplificar o tutorial.
- Use o Auto Loader para carregar de forma incremental as mensagens do armazenamento de objetos na nuvem e armazenar as mensagens brutas na tabela
customers_cdc
. O Auto Loader inferirá o esquema e tratará da evolução do esquema. - Adicione um view
customers_cdc_clean
para verificar a qualidade dos dados usando as expectativas. Por exemplo, o endereçoid
nunca deve sernull
, pois o senhor o utilizará para executar suas operações de upsert. - Execute o
APPLY CHANGES INTO
(fazendo os upserts) nos dados limpos do CDC para aplicar as alterações na tabela finalcustomers
- Mostre como o site DLT pode criar um tipo 2 dimensões que mudam lentamente (SCD) (SCD2) para manter o controle de todas as alterações.
O objetivo é ingerir os dados brutos em tempo quase real e criar uma tabela para sua equipe de analistas, garantindo a qualidade dos dados.
O site tutorial usa a arquitetura medallion lakehouse, na qual ingere dados brutos por meio da camada bronze, limpa e valida os dados com a camada prata e aplica modelagem dimensional e agregação usando a camada ouro. Veja O que é a arquitetura medallion lakehouse? para mais informações.
O fluxo que você implementará tem a seguinte aparência:
Para obter mais informações DLT sobre, Auto Loader CDC DLTe, consulte, O que Auto Loaderé?, e O que é captura de dados de alterações (CDC) (CDC)?
Requisitos
Para completar este tutorial, você deve atender aos seguintes requisitos:
- O senhor pode entrar em um site Databricks workspace.
- O senhor tem Unity Catalog habilitado para o seu workspace.
- O senhor tem serverless compute habilitado para o seu account. O pipeline DLT sem servidor não está disponível em todas as regiões workspace. Veja recurso com disponibilidade regional limitada para as regiões disponíveis.
- Ter permissão para criar um recurso compute ou acesso a um recurso compute.
- Tenha permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGES
ouUSE CATALOG
eCREATE SCHEMA
. - Tenha permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGES
ouUSE SCHEMA
eCREATE VOLUME
.
captura de dados de alterações (CDC) em um site ETL pipeline
A captura de dados de alterações (CDC) (CDC) é o processo que captura as alterações nos registros feitos em um banco de dados transacional (por exemplo, MySQL ou PostgreSQL) ou em um data warehouse. CDC captura operações como exclusão, inclusão e atualização de dados, normalmente como uma transmissão para rematerializar a tabela em sistemas externos. O CDC permite o carregamento incremental e elimina a necessidade de atualização do carregamento em massa.
Para simplificar o tutorial, pule a configuração de um sistema CDC externo. O senhor pode considerá-lo em funcionamento e salvar os dados do CDC como arquivos JSON em um armazenamento de blob (S3, ADLS, GCS).
Capturando o CDC
Há uma variedade de ferramentas do CDC disponíveis. Uma das soluções líderes em código aberto é o Debezium, mas existem outras implementações que simplificam a fonte de dados, como Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate e AWS DMS.
Neste tutorial, o senhor usa os dados do CDC de um sistema externo, como o Debezium ou o DMS. O Debezium captura todas as linhas alteradas. Normalmente, ele envia o histórico de alterações de dados para Kafka logs ou os salva como um arquivo.
O senhor deve ingerir as informações CDC da tabela customers
(formatoJSON ), verificar se estão corretas e, em seguida, materializar a tabela de clientes no lakehouse.
Entrada do CDC do Debezium
Para cada alteração, o senhor receberá uma mensagem JSON contendo todos os campos da linha que está sendo atualizada (id
, firstname
, lastname
, email
, address
). Além disso, o senhor terá informações extras de metadados, incluindo:
operation
: Um código de operações, normalmente (DELETE
,APPEND
,UPDATE
).operation_date
: A data e o carimbo de data/hora do registro de cada ação de operações.
Ferramentas como o Debezium podem produzir resultados mais avançados, como o valor da linha antes da alteração, mas este tutorial as omite para simplificar.
Etapa 0: Configuração dos dados do tutorial
Primeiro, o senhor deve criar um novo Notebook e instalar os arquivos de demonstração usados neste tutorial em seu workspace.
-
Clique em Novo no canto superior esquerdo.
-
Clique em Notebook .
-
Altere o título do Notebook de sem título Notebook <date and time> para DLT tutorial setup.
-
Ao lado do título do Notebook na parte superior, defina o idioma do Notebook default para Python .
-
Para gerar o dataset usado no tutorial, digite o seguinte código na primeira célula e pressione Shift + Enter para executar o código:
Python# You can change the catalog, schema, dbName, and db. If you do so, you must also
# change the names in the rest of the tutorial.
catalog = "main"
schema = dbName = db = "dbdemos_dlt_cdc"
volume_name = "raw_data"
spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
spark.sql(f'USE CATALOG `{catalog}`')
spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
spark.sql(f'USE SCHEMA `{schema}`')
spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"
try:
dbutils.fs.ls(volume_folder+"/customers")
except:
print(f"folder doesn't exists, generating the data under {volume_folder}...")
from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
import uuid
fake = Faker()
import random
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
fake_address = F.udf(fake.address)
operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
df = spark.range(0, 100000).repartition(100)
df = df.withColumn("id", fake_id())
df = df.withColumn("firstname", fake_firstname())
df = df.withColumn("lastname", fake_lastname())
df = df.withColumn("email", fake_email())
df = df.withColumn("address", fake_address())
df = df.withColumn("operation", fake_operation())
df_customers = df.withColumn("operation_date", fake_date())
df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers") -
Para visualizar os dados usados neste tutorial, digite o código na próxima célula e pressione Shift + Enter para executar o código:
Pythondisplay(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
Etapa 1: Criar um pipeline
Primeiro, o senhor criará um pipeline de ETL no DLT. DLT cria o pipeline resolvendo as dependências definidas no Notebook ou em arquivos (chamados de código-fonte ) usando a sintaxe DLT. Cada arquivo de código-fonte pode conter apenas um idioma, mas o senhor pode adicionar Notebook ou arquivos específicos de vários idiomas no site pipeline. Para saber mais, consulte DLT
Deixe o campo Código-fonte em branco para criar e configurar automaticamente um Notebook para criação de código-fonte.
Este tutorial usa serverless compute e Unity Catalog. Para todas as opções de configuração que não forem especificadas, use as configurações do site default. Se o serverless compute não estiver habilitado ou não for compatível com o seu workspace, o senhor poderá concluir o tutorial conforme escrito usando as configurações do default compute . Se o senhor usar as configurações do default compute , deverá selecionar manualmente Unity Catalog em Storage options (Opções de armazenamento) na seção Destination (Destino ) da interface de usuário Create pipeline .
Para criar um novo pipeline de ETL no DLT, siga estas etapas:
- Na barra lateral, clique em pipeline .
- Clique em Create pipeline e ETL pipeline .
- Em nome do pipeline , digite um nome exclusivo pipeline.
- Marque a caixa de seleção sem servidor .
- Selecione Triggered (Acionado ) no modo pipeline . Isso executará os fluxos de transmissão usando o acionador AvailableNow, que processa todos os dados existentes e, em seguida, encerra a transmissão.
- No Destination , para configurar um local do Unity Catalog onde as tabelas são publicadas, selecione um catálogo existente e escreva um novo nome em Schema para criar um novo esquema em seu catálogo.
- Clique em Criar .
A interface do usuário do pipeline é exibida para o novo pipeline.
Um Notebook de código-fonte em branco é criado e configurado automaticamente para o site pipeline. O Notebook é criado em um novo diretório no seu diretório de usuário. O nome do novo diretório e arquivo corresponde ao nome do seu pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline
.
- Um link para acessar esse Notebook está abaixo do campo Código-fonte no painel de detalhes do pipeline . Clique no link para abrir o Notebook antes de prosseguir para a próxima etapa.
- Clique em Connect (Conectar ) no canto superior direito para abrir o menu de configuração do site compute.
- Passe o mouse sobre o nome do pipeline que o senhor criou na Etapa 1.
- Clique em Conectar .
- Ao lado do título do Notebook na parte superior, selecione o idioma do Notebook default (Python ou SQL).
O Notebook só pode conter uma única linguagem de programação. Não misture os códigos Python e SQL no Notebook de código-fonte pipeline.
Ao desenvolver um pipeline de DLT, o senhor pode escolher entre Python ou SQL. Este tutorial inclui exemplos para ambas as linguagens. Com base em sua escolha de idioma, verifique se o idioma do notebook default foi selecionado.
Para saber mais sobre o suporte do Notebook para DLT pipeline desenvolvimento de código, consulte Desenvolver e depurar o pipeline ETL com um Notebook em DLT.
Etapa 2: ingerir dados de forma incremental com o Auto Loader
A primeira etapa é ingerir os dados brutos do armazenamento em nuvem em uma camada de bronze.
Isso pode ser um desafio por vários motivos, pois você deve:
- Operar em escala, potencialmente ingerindo milhões de arquivos pequenos.
- Inferir o esquema e o tipo JSON.
- Tratar registros ruins com esquema JSON incorreto.
- Cuidar da evolução do esquema (por exemplo, uma nova coluna na tabela de clientes).
Auto Loader simplificam essa ingestão, incluindo a inferência de esquemas e a evolução do esquema, ao mesmo tempo em que são dimensionados para milhões de arquivos recebidos. O Auto Loader está disponível em Python usando cloudFiles
e em SQL usando SELECT * FROM STREAM read_files(...)
e pode ser usado com vários formatos (JSON, CSV, Apache Avro, etc.):
Definir a tabela como uma tabela de transmissão garantirá que o senhor consuma apenas os novos dados recebidos. Se o senhor não a definir como uma tabela de transmissão, fará a varredura e a ingestão de todos os dados disponíveis. Consulte as tabelas de transmissão para obter mais informações.
-
Para ingerir o uso de dados de entrada Auto Loader, copie e cole o código a seguir na primeira célula do Notebook. O senhor pode usar Python ou SQL, dependendo do idioma default do Notebook que escolheu na etapa anterior.
aba :::tab-item[Python]
Pythonfrom dlt import *
from pyspark.sql.functions import *
# Create the target bronze table
dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
# Create an Append Flow to ingest the raw data into the bronze table
@append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
):::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_bronze_ingest_flow AS
INSERT INTO customers_cdc_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
format => "json",
inferColumnTypes => "true"
)::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 3: Limpeza e expectativas para monitorar a qualidade dos dados
Depois que a camada de bronze for definida, você criará as camadas prateadas adicionando expectativas para controlar a qualidade dos dados, verificando as seguintes condições:
- O ID nunca deve ser
null
. - O tipo CDC operações deve ser válido.
- O site
json
deve ter sido lido adequadamente pelo Auto Loader.
A linha será descartada se uma dessas condições não for respeitada.
Para obter mais informações, consulte gerenciar a qualidade dos dados com pipeline expectations.
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Para criar uma camada prateada com uma tabela limpa e impor restrições, copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Pythondlt.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)
@append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
dlt.read_stream("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
):::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
)
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_cdc_clean_flow AS
INSERT INTO customers_cdc_clean BY NAME
SELECT * FROM STREAM customers_cdc_bronze;::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 4: Materializar a tabela de clientes com alterações de aplicação
A tabela customers
conterá o view mais atualizado e será uma réplica da tabela original.
Isso não é trivial de implementar manualmente. Você deve considerar coisas como a desduplicação de dados para manter a linha mais recente.
No entanto, o site DLT resolve esses desafios com as operações de alteração de aplicativos .
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Para processar o CDC uso de dados aplicar as alterações em DLT, copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Pythondlt.create_streaming_table(name="customers", comment="Clean, materialized customers")
dlt.apply_changes(
target="customers", # The customer table being materialized
source="customers_cdc_clean", # the incoming CDC
keys=["id"], # what we'll be using to match the rows to upsert
sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
except_column_list=["operation", "operation_date", "_rescued_data"],
):::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers;
APPLY CHANGES INTO customers
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 1;::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 5: dimensões que mudam lentamente (SCD) do tipo 2 (SCD2)
Muitas vezes, é necessário criar uma tabela para acompanhar todas as alterações resultantes de APPEND
, UPDATE
e DELETE
:
- história: O senhor deseja manter um histórico de todas as alterações em sua tabela.
- Rastreabilidade: O senhor deseja ver quais operações ocorreram.
SCD2 com DLT
O Delta é compatível com o fluxo de dados de alteração (CDF), e o site table_change
pode consultar a modificação da tabela em SQL e Python. No entanto, o principal caso de uso do CDF é capturar alterações em um pipeline e não criar um view completo das alterações da tabela desde o início.
As coisas ficam especialmente complexas de implementar se você tiver eventos fora de ordem. Se o senhor precisar sequenciar as alterações por um carimbo de data/hora e receber uma modificação que ocorreu no passado, deverá acrescentar uma nova entrada na tabela SCD e atualizar as entradas anteriores.
A DLT elimina essa complexidade e permite que o senhor crie uma tabela separada contendo todas as modificações desde o início do tempo. Essa tabela pode então ser usada em escala, com partições específicas/colunas de fronteira, se necessário. Os campos fora de ordem serão tratados imediatamente com base no _sequence_by
Para criar uma tabela SCD2, devemos usar o APPLY CHANGES
com a opção extra: STORED AS SCD TYPE 2
em SQL ou stored_as_scd_type="2"
em Python.
O senhor também pode limitar as colunas que o recurso rastreia usando a opção: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Python# create the table
dlt.create_streaming_table(
name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
)
# store all changes as SCD2
dlt.apply_changes(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
) # Enable SCD2 and store individual updates:::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers_history;
APPLY CHANGES INTO customers_history
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 2;::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 6: Crie um site materializado view que rastreie quem mais alterou suas informações
A tabela customers_history
contém todas as alterações históricas que um usuário fez em suas informações. Agora, o senhor criará um view materializado simples na camada de ouro que mantém o controle de quem mais alterou suas informações. Isso poderia ser usado para análise de detecção de fraude ou recomendações de usuários em um cenário real. Além disso, a aplicação de alterações com o SCD2 já removeu as duplicatas para nós, então podemos contar diretamente as linhas por ID de usuário.
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Python@dlt.table(
name = "customers_history_agg",
comment = "Aggregated customer history"
)
def customers_history_agg():
return (
dlt.read("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
):::
:::tab-item[sql]
SQLCREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
SELECT
id,
count("address") as address_count,
count("email") AS email_count,
count("firstname") AS firstname_count,
count("lastname") AS lastname_count
FROM customers_history
GROUP BY id::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 7: Criar um trabalho para executar o DLT pipeline
Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão de dados, processamento e análise usando um Databricks Job.
- Em seu site workspace, clique em
fluxo de trabalho na barra lateral e clique em Create Job .
- Na caixa de título da tarefa, substitua New Job pelo nome do <date and time> trabalho. Por exemplo,
CDC customers workflow
. - Em Nome da tarefa , insira um nome para a primeira tarefa, por exemplo,
ETL_customers_data
. - Em Type , selecione pipeline .
- No pipeline , selecione o DLT pipeline que o senhor criou na etapa 1.
- Clique em Criar .
- Para executar o fluxo de trabalho, clique em executar Now . Para acessar view os detalhes da execução , clique em tab. Clique na tarefa para acessar view detalhes da execução da tarefa.
- Para view os resultados quando o fluxo de trabalho for concluído, clique em Go to the latest successful execution (Ir para a última execução bem-sucedida ) ou no Começar time (Hora de início ) para a execução do trabalho. A página Saída é exibida e exibe os resultados da consulta.
Consulte monitoramento e observabilidade para Databricks Jobs para obter mais informações sobre a execução de trabalhos.
Etapa 8: Programar o trabalho DLT pipeline
Para executar o ETL pipeline em um programador, siga estas etapas:
- Clique em
fluxo de trabalho na barra lateral.
- Na coluna Name (Nome ), clique no nome do trabalho. O painel lateral aparece como os detalhes do siteJob .
- Clique em Add trigger (Adicionar acionador ) no painel Programar & Triggers (Acionadores ) e selecione Scheduled (Programado ) em Trigger type (Tipo de acionador) .
- Especifique o período, a hora inicial e o fuso horário.
- Clique em Salvar .