Tutorial: Crie um ETL pipeline usando a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow
Aprenda a criar e implantar um ETL (extract, transform, and load) pipeline com captura de dados de alterações (CDC) (CDC) usando LakeFlow Declarative pipeline for data orquestração e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados dos sistemas de origem, transformar esses dados com base nos requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros, e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, o senhor usará os dados de uma tabela customers
em um banco de dados MySQL para:
- Extraia as alterações de um banco de dados transacional usando o Debezium ou qualquer outra ferramenta e salve-as em um armazenamento de objetos na nuvem (pasta S3, ADLS, GCS). O senhor deixará de configurar um sistema CDC externo para simplificar o tutorial.
- Use o Auto Loader para carregar de forma incremental as mensagens do armazenamento de objetos na nuvem e armazenar as mensagens brutas na tabela
customers_cdc
. O Auto Loader inferirá o esquema e tratará da evolução do esquema. - Adicione um view
customers_cdc_clean
para verificar a qualidade dos dados usando as expectativas. Por exemplo, o endereçoid
nunca deve sernull
, pois o senhor o utilizará para executar suas operações de upsert. - Execute o
AUTO CDC ... INTO
(fazendo os upserts) nos dados limpos do CDC para aplicar as alterações na tabela finalcustomers
- Mostre como o LakeFlow Declarative pipeline pode criar um tipo 2 dimensões que mudam lentamente (SCD) (SCD2) para manter o controle de todas as alterações.
O objetivo é ingerir os dados brutos em tempo quase real e criar uma tabela para sua equipe de analistas, garantindo a qualidade dos dados.
O site tutorial usa a arquitetura medallion lakehouse, na qual ingere dados brutos por meio da camada bronze, limpa e valida os dados com a camada prata e aplica modelagem dimensional e agregação usando a camada ouro. Veja O que é a arquitetura medallion lakehouse? para mais informações.
O fluxo que você implementará tem a seguinte aparência:
Para obter mais informações sobre LakeFlow Declarative pipeline,, Auto Loader e, consulte CDC LakeFlow Declarative pipeline, What is?,Auto Loadere What is captura de dados de alterações (CDC) (CDC)?
Requisitos
Para completar este tutorial, você deve atender aos seguintes requisitos:
- O senhor pode entrar em um site Databricks workspace.
- O senhor tem Unity Catalog habilitado para o seu workspace.
- O senhor tem serverless compute habilitado para o seu account. sem servidor LakeFlow O pipeline declarativo não está disponível em todas as regiões workspace. Veja recurso com disponibilidade regional limitada para as regiões disponíveis.
- Ter permissão para criar um recurso compute ou acesso a um recurso compute.
- Tenha permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGES
ouUSE CATALOG
eCREATE SCHEMA
. - Tenha permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGES
ouUSE SCHEMA
eCREATE VOLUME
.
captura de dados de alterações (CDC) em um site ETL pipeline
A captura de dados de alterações (CDC) (CDC) é o processo que captura as alterações nos registros feitos em um banco de dados transacional (por exemplo, MySQL ou PostgreSQL) ou em um data warehouse. CDC captura operações como exclusão, inclusão e atualização de dados, normalmente como uma transmissão para rematerializar a tabela em sistemas externos. O CDC permite o carregamento incremental e elimina a necessidade de atualização do carregamento em massa.
Para simplificar o tutorial, pule a configuração de um sistema CDC externo. O senhor pode considerá-lo em funcionamento e salvar os dados do CDC como arquivos JSON em um armazenamento de blob (S3, ADLS, GCS).
Capturando o CDC
Há uma variedade de ferramentas do CDC disponíveis. Uma das soluções líderes em código aberto é o Debezium, mas existem outras implementações que simplificam a fonte de dados, como Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate e AWS DMS.
Neste tutorial, o senhor usa os dados do CDC de um sistema externo, como o Debezium ou o DMS. O Debezium captura todas as linhas alteradas. Normalmente, ele envia o histórico de alterações de dados para Kafka logs ou os salva como um arquivo.
O senhor deve ingerir as informações CDC da tabela customers
(formatoJSON ), verificar se estão corretas e, em seguida, materializar a tabela de clientes no lakehouse.
Entrada do CDC do Debezium
Para cada alteração, o senhor receberá uma mensagem JSON contendo todos os campos da linha que está sendo atualizada (id
, firstname
, lastname
, email
, address
). Além disso, o senhor terá informações extras de metadados, incluindo:
operation
: Um código de operações, normalmente (DELETE
,APPEND
,UPDATE
).operation_date
: A data e o carimbo de data/hora do registro de cada ação de operações.
Ferramentas como o Debezium podem produzir resultados mais avançados, como o valor da linha antes da alteração, mas este tutorial as omite para simplificar.
Etapa 0: Configuração dos dados do tutorial
Primeiro, o senhor deve criar um novo Notebook e instalar os arquivos de demonstração usados neste tutorial em seu workspace.
-
Clique em Novo no canto superior esquerdo.
-
Clique em Notebook .
-
Altere o título do Notebook de sem título Notebook <date and time> para pipeline tutorial setup.
-
Ao lado do título do Notebook na parte superior, defina o idioma do Notebook default para Python .
-
Para gerar o dataset usado no tutorial, digite o seguinte código na primeira célula e pressione Shift + Enter para executar o código:
Python# You can change the catalog, schema, dbName, and db. If you do so, you must also
# change the names in the rest of the tutorial.
catalog = "main"
schema = dbName = db = "dbdemos_dlt_cdc"
volume_name = "raw_data"
spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
spark.sql(f'USE CATALOG `{catalog}`')
spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
spark.sql(f'USE SCHEMA `{schema}`')
spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"
try:
dbutils.fs.ls(volume_folder+"/customers")
except:
print(f"folder doesn't exists, generating the data under {volume_folder}...")
from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
import uuid
fake = Faker()
import random
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
fake_address = F.udf(fake.address)
operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
df = spark.range(0, 100000).repartition(100)
df = df.withColumn("id", fake_id())
df = df.withColumn("firstname", fake_firstname())
df = df.withColumn("lastname", fake_lastname())
df = df.withColumn("email", fake_email())
df = df.withColumn("address", fake_address())
df = df.withColumn("operation", fake_operation())
df_customers = df.withColumn("operation_date", fake_date())
df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers") -
Para visualizar os dados usados neste tutorial, digite o código na próxima célula e pressione Shift + Enter para executar o código:
Pythondisplay(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
Etapa 1: Criar um pipeline
Primeiro, o senhor criará um ETL pipeline em LakeFlow Declarative pipeline. LakeFlow O pipeline declarativo cria o pipeline resolvendo as dependências definidas no Notebook ou nos arquivos (chamados de código-fonte ) usando a sintaxe do pipeline declarativo LakeFlow. Cada arquivo de código-fonte pode conter apenas um idioma, mas o senhor pode adicionar Notebook ou arquivos específicos de vários idiomas no site pipeline. Para saber mais, consulte LakeFlow Declarative pipeline
Deixe o campo Código-fonte em branco para criar e configurar automaticamente um Notebook para criação de código-fonte.
Este tutorial usa serverless compute e Unity Catalog. Para todas as opções de configuração que não forem especificadas, use as configurações do site default. Se o serverless compute não estiver habilitado ou não for compatível com o seu workspace, o senhor poderá concluir o tutorial conforme escrito usando as configurações do default compute . Se o senhor usar as configurações do default compute , deverá selecionar manualmente Unity Catalog em Storage options (Opções de armazenamento) na seção Destination (Destino ) da interface de usuário Create pipeline .
Para criar um novo ETL pipeline no pipeline LakeFlow Declarative, siga estas etapas:
- Na barra lateral, clique em pipeline .
- Clique em Create pipeline e ETL pipeline .
- Em nome do pipeline , digite um nome exclusivo pipeline.
- Marque a caixa de seleção sem servidor .
- Selecione Triggered (Acionado ) no modo pipeline . Isso executará os fluxos de transmissão usando o acionador AvailableNow, que processa todos os dados existentes e, em seguida, encerra a transmissão.
- No Destination , para configurar um local do Unity Catalog onde as tabelas são publicadas, selecione um catálogo existente e escreva um novo nome em Schema para criar um novo esquema em seu catálogo.
- Clique em Criar .
A interface do usuário do pipeline é exibida para o novo pipeline.
Um Notebook de código-fonte em branco é criado e configurado automaticamente para o site pipeline. O Notebook é criado em um novo diretório no seu diretório de usuário. O nome do novo diretório e arquivo corresponde ao nome do seu pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline
.
- Um link para acessar esse Notebook está abaixo do campo Código-fonte no painel de detalhes do pipeline . Clique no link para abrir o Notebook antes de prosseguir para a próxima etapa.
- Clique em Connect (Conectar ) no canto superior direito para abrir o menu de configuração do site compute.
- Passe o mouse sobre o nome do pipeline que o senhor criou na Etapa 1.
- Clique em Conectar .
- Ao lado do título do Notebook na parte superior, selecione o idioma do Notebook default (Python ou SQL).
O Notebook só pode conter uma única linguagem de programação. Não misture os códigos Python e SQL no Notebook de código-fonte pipeline.
Ao desenvolver o pipeline declarativo LakeFlow, o senhor pode escolher entre Python ou SQL. Este tutorial inclui exemplos para ambas as linguagens. Com base em sua escolha de idioma, verifique se o idioma do notebook default foi selecionado.
Para saber mais sobre o suporte do Notebook para o desenvolvimento do código do pipeline LakeFlow Declarative, consulte Desenvolver e depurar o pipeline ETL com um Notebook no pipeline LakeFlow Declarative.
Etapa 2: ingerir dados de forma incremental com o Auto Loader
A primeira etapa é ingerir os dados brutos do armazenamento em nuvem em uma camada de bronze.
Isso pode ser um desafio por vários motivos, pois você deve:
- Operar em escala, potencialmente ingerindo milhões de arquivos pequenos.
- Inferir o esquema e o tipo JSON.
- Tratar registros ruins com esquema JSON incorreto.
- Cuidar da evolução do esquema (por exemplo, uma nova coluna na tabela de clientes).
Auto Loader simplificam essa ingestão, incluindo a inferência de esquemas e a evolução do esquema, ao mesmo tempo em que são dimensionados para milhões de arquivos recebidos. O Auto Loader está disponível em Python usando cloudFiles
e em SQL usando SELECT * FROM STREAM read_files(...)
e pode ser usado com vários formatos (JSON, CSV, Apache Avro, etc.):
Definir a tabela como uma tabela de transmissão garantirá que o senhor consuma apenas os novos dados recebidos. Se o senhor não a definir como uma tabela de transmissão, fará a varredura e a ingestão de todos os dados disponíveis. Consulte as tabelas de transmissão para obter mais informações.
-
Para ingerir o uso de dados de entrada Auto Loader, copie e cole o código a seguir na primeira célula do Notebook. O senhor pode usar Python ou SQL, dependendo do idioma default do Notebook que escolheu na etapa anterior.
aba :::tab-item[Python]
Pythonfrom dlt import *
from pyspark.sql.functions import *
# Create the target bronze table
dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
# Create an Append Flow to ingest the raw data into the bronze table
@append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
):::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_bronze_ingest_flow AS
INSERT INTO customers_cdc_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
format => "json",
inferColumnTypes => "true"
)::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 3: Limpeza e expectativas para monitorar a qualidade dos dados
Depois que a camada de bronze for definida, você criará as camadas prateadas adicionando expectativas para controlar a qualidade dos dados, verificando as seguintes condições:
- O ID nunca deve ser
null
. - O tipo CDC operações deve ser válido.
- O site
json
deve ter sido lido adequadamente pelo Auto Loader.
A linha será descartada se uma dessas condições não for respeitada.
Para obter mais informações, consulte gerenciar a qualidade dos dados com pipeline expectations.
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Para criar uma camada prateada com uma tabela limpa e impor restrições, copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Pythondlt.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)
@append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
dlt.read_stream("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
):::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
)
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_cdc_clean_flow AS
INSERT INTO customers_cdc_clean BY NAME
SELECT * FROM STREAM customers_cdc_bronze;::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 4: Materialização da tabela de clientes com um fluxo AUTO CDC
A tabela customers
conterá o view mais atualizado e será uma réplica da tabela original.
Isso não é trivial de implementar manualmente. Você deve considerar coisas como a desduplicação de dados para manter a linha mais recente.
No entanto, o pipeline LakeFlow Declarative resolve esses desafios com as AUTO CDC
operações.
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Para processar o CDC uso de dados
AUTO CDC
no LakeFlow Declarative pipeline, copie e cole o código a seguir na nova célula do Notebook.aba :::tab-item[Python]
Pythondlt.create_streaming_table(name="customers", comment="Clean, materialized customers")
dlt.create_auto_cdc_flow(
target="customers", # The customer table being materialized
source="customers_cdc_clean", # the incoming CDC
keys=["id"], # what we'll be using to match the rows to upsert
sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
except_column_list=["operation", "operation_date", "_rescued_data"],
):::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers;
CREATE FLOW customers_cdc_flow
AS AUTO CDC INTO customers
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 1;::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 5: dimensões que mudam lentamente (SCD) do tipo 2 (SCD2)
Muitas vezes, é necessário criar uma tabela para acompanhar todas as alterações resultantes de APPEND
, UPDATE
e DELETE
:
- história: O senhor deseja manter um histórico de todas as alterações em sua tabela.
- Rastreabilidade: O senhor deseja ver quais operações ocorreram.
SCD2 com LakeFlow Declarative pipeline
O Delta é compatível com o fluxo de dados de alteração (CDF), e o site table_change
pode consultar a modificação da tabela em SQL e Python. No entanto, o principal caso de uso do CDF é capturar alterações em um pipeline e não criar um view completo das alterações da tabela desde o início.
As coisas ficam especialmente complexas de implementar se você tiver eventos fora de ordem. Se o senhor precisar sequenciar as alterações por um carimbo de data/hora e receber uma modificação que ocorreu no passado, deverá acrescentar uma nova entrada na tabela SCD e atualizar as entradas anteriores.
LakeFlow O pipeline declarativo elimina essa complexidade e permite que o senhor crie uma tabela separada contendo todas as modificações desde o início do tempo. Essa tabela pode então ser usada em escala, com partições específicas/colunas de fronteira, se necessário. Os campos fora de ordem serão tratados imediatamente com base no _sequence_by
Para criar uma tabela SCD2, devemos usar a opção: STORED AS SCD TYPE 2
em SQL ou stored_as_scd_type="2"
em Python.
O senhor também pode limitar as colunas que o recurso rastreia usando a opção: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Python# create the table
dlt.create_streaming_table(
name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
)
# store all changes as SCD2
dlt.create_auto_cdc_flow(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
) # Enable SCD2 and store individual updates:::
:::tab-item[sql]
SQLCREATE OR REFRESH STREAMING TABLE customers_history;
CREATE FLOW cusotmers_history_cdc
AS AUTO CDC INTO
customers_history
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 2;::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 6: Crie um site materializado view que rastreie quem mais alterou suas informações
A tabela customers_history
contém todas as alterações históricas que um usuário fez em suas informações. Agora, o senhor criará um view materializado simples na camada de ouro que mantém o controle de quem mais alterou suas informações. Isso poderia ser usado para análise de detecção de fraude ou recomendações de usuários em um cenário real. Além disso, a aplicação de alterações com o SCD2 já removeu as duplicatas para nós, então podemos contar diretamente as linhas por ID de usuário.
-
Clique em Edit e Insert cell below (Editar e inserir célula abaixo) para inserir uma nova célula vazia.
-
Copie e cole o código a seguir na nova célula do Notebook.
aba :::tab-item[Python]
Python@dlt.table(
name = "customers_history_agg",
comment = "Aggregated customer history"
)
def customers_history_agg():
return (
dlt.read("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
):::
:::tab-item[sql]
SQLCREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
SELECT
id,
count("address") as address_count,
count("email") AS email_count,
count("firstname") AS firstname_count,
count("lastname") AS lastname_count
FROM customers_history
GROUP BY id::: ::::
-
Clique em começar para começar uma atualização para o site conectado pipeline.
Etapa 7: Criar um trabalho para executar o ETL pipeline
Em seguida, crie um fluxo de trabalho para automatizar as etapas de ingestão de dados, processamento e análise usando um Databricks Job.
- Em seu site workspace, clique em
fluxo de trabalho na barra lateral e clique em Create Job .
- Na caixa de título da tarefa, substitua New Job pelo nome do <date and time> trabalho. Por exemplo,
CDC customers workflow
. - Em Nome da tarefa , insira um nome para a primeira tarefa, por exemplo,
ETL_customers_data
. - Em Type , selecione pipeline .
- No pipeline , selecione o site pipeline que o senhor criou na etapa 1.
- Clique em Criar .
- Para executar o fluxo de trabalho, clique em executar Now . Para acessar view os detalhes da execução , clique em tab. Clique na tarefa para acessar view detalhes da execução da tarefa.
- Para view os resultados quando o fluxo de trabalho for concluído, clique em Go to the latest successful execution (Ir para a última execução bem-sucedida ) ou no Começar time (Hora de início ) para a execução do trabalho. A página Saída é exibida e exibe os resultados da consulta.
Consulte monitoramento e observabilidade para LakeFlow Jobs para obter mais informações sobre a execução de trabalhos.
Etapa 8: programar o trabalho
Para executar o ETL pipeline em um programador, siga estas etapas:
- Clique em
fluxo de trabalho na barra lateral.
- Na coluna Name (Nome ), clique no nome do trabalho. O painel lateral aparece como os detalhes do siteJob .
- Clique em Add trigger (Adicionar acionador ) no painel Programar & Triggers (Acionadores ) e selecione Scheduled (Programado ) em Trigger type (Tipo de acionador) .
- Especifique o período, a hora inicial e o fuso horário.
- Clique em Salvar .