APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline
O pipeline declarativo LakeFlow Spark simplifica a captura de dados de alterações (CDC) (CDC) com as APIs AUTO CDC e AUTO CDC FROM SNAPSHOT . Essas APIs automatizam a complexidade de calcular dimensões que mudam lentamente (SCD) (SCD) Tipo 1 e Tipo 2 a partir de um feed CDC ou Snapshot de banco de dados. Para saber mais sobre esses conceitos, consulte captura de dados de alterações (CDC) e Snapshot.
As APIs AUTO CDC substituem as APIs APPLY CHANGES e têm a mesma sintaxe. As APIs APPLY CHANGES ainda estão disponíveis, mas a Databricks recomenda o uso das APIs AUTO CDC em seu lugar.
A API que você utiliza depende da origem dos seus dados de alteração:
AUTO CDC: Use isso quando o banco de dados de origem tiver um feed CDC ativado.AUTO CDCprocessa alterações de um feed de dados de alteração (CDF). É compatível com as interfaces SQL e Python do pipeline.AUTO CDC FROM SNAPSHOT: Use esta opção quando CDC não estiver habilitado no banco de dados de origem e apenas os Snapshots estiverem disponíveis. Esta API compara o Snapshot para determinar as alterações e, em seguida, as processa. Só é suportado na interface Python.
Ambas as APIs suportam a atualização de tabelas usando SCD Tipo 1 e Tipo 2:
- Utilize o SCD Tipo 1 para atualizar registros diretamente. não é mantido para fins de atualização de registros.
- Use o SCD tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações de um conjunto específico de colunas.
As APIs AUTO CDC não são suportadas pelo pipeline declarativo Apache Spark .
Para sintaxe e outras referências, consulte AUTO CDC INTO (pipeline), create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.
Esta página descreve como atualizar tabelas em seu pipeline com base em alterações nos dados de origem. Para aprender como registrar e consultar informações de alterações em nível de linha para tabelas Delta , consulte Usar o feed de dados de alterações Delta Lake no Databricks.
Requisitos
Para usar as APIs CDC , seu pipeline deve ser configurado para usar o SDPserverless ou as edições SDP Pro ou Advanced .
Como funciona o AUTO CDC
Para realizar o processamento CDC com AUTO CDC, crie uma tabela de transmissão e use a instrução AUTO CDC ... INTO em SQL ou a função create_auto_cdc_flow() em Python para especificar a origem, a chave e a sequência para o feed de alterações. Para obter uma explicação de como funcionam o sequenciamento e a lógica SCD , consulte captura de dados de alterações (CDC) e Snapshot. Veja os exemplos do AUTO CDC.
Para hidratação inicial a partir de uma fonte com alimentação de mudança, use AUTO CDC com um fluxo once e continue processando a alimentação de mudança. Consulte Replicar uma tabela RDBMS externa usando o AUTO CDC.
Para detalhes de sintaxe, consulte AUTO CDC INTO (pipeline) ou create_auto_cdc_flow.
Como funciona o AUTO CDC do Snapshot
AUTO CDC FROM SNAPSHOT Determina as alterações nos dados de origem comparando os Snapshots em ordem. Só é suportado na interface de pipeline do Python. Você pode ler o Snapshot diretamente de uma tabela Delta , arquivos de armazenamento cloud ou JDBC .
Para realizar o processamento CDC com AUTO CDC FROM SNAPSHOT, crie uma tabela de transmissão e use a função create_auto_cdc_from_snapshot_flow() para especificar o Snapshot, a chave e outros argumentos. Para obter detalhes sobre os dois padrões de ingestão e quando usar cada um, consulte Padrões de processamentoSnapshot. Veja os exemplos de Auto CDC FROM Snapshot.
Para detalhes de sintaxe, consulte create_auto_cdc_from_snapshot_flow.
Use várias colunas para sequenciamento
Para sequenciar por várias colunas (por exemplo, um carimbo de data/hora e um ID para desempatar), use STRUCT para combiná-las. A API ordena os resultados pelo primeiro campo e, em caso de empate, considera o segundo campo, e assim por diante.
- SQL
- Python
SEQUENCE BY STRUCT(timestamp_col, id_col)
sequence_by = struct("timestamp_col", "id_col")
Exemplos de AUTO CDC
Os exemplos a seguir demonstram o processamento de SCD Tipo 1 e Tipo 2 usando uma fonte de dados de alteração. Os dados de exemplo criam novos registros de usuário, excluem registros de usuário e atualizam registros de usuário. No exemplo SCD Tipo 1, as últimas operações UPDATE chegam atrasadas e são descartadas da tabela de destino, demonstrando o tratamento de eventos fora de ordem.
A seguir, estão os registros de entrada usados nestes exemplos. Esses dados são criados executando a consulta na seção Criar dados de exemplo .
userId | name | city | operation | sequenceNum |
|---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Se você remover o comentário da última linha na consulta de geração de dados de exemplo, será inserido o seguinte registro que especifica o truncamento da tabela (limpeza da tabela) em sequenceNum=3:
userId | name | city | operation | sequenceNum |
|---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Todos os exemplos a seguir incluem opções para especificar as operações DELETE e TRUNCATE , mas cada uma é opcional.
Criar dados de exemplo
Execute as seguintes instruções para criar um dataset de exemplo. Este código não se destina a ser executado como parte de uma definição pipeline . execute-o a partir da pasta de exploração do seu pipeline, em vez da pasta de transformações.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Atualizações do Processo SCD Tipo 1
O SCD Tipo 1 armazena apenas a versão mais recente de cada registro. O exemplo a seguir lê os dados de alteração do feed criado acima e aplica as alterações a uma tabela de transmissão de destino. Desenvolva o pipeline declarativo LakeFlow Spark para executar este código.
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Após executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:
userId | name | city |
|---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
O usuário 123 (Isabel) foi excluído e não aparece mais. O usuário 125 (Mercedes) mostra apenas a cidade mais recente (Guadalajara) porque o SCD Tipo 1 sobrescreve os valores anteriores. O UPDATE anterior em sequenceNum=5 foi removido porque uma atualização posterior em sequenceNum=6 chegou.
Após executar o exemplo com o registro TRUNCATE descomentado, a tabela é limpa em sequenceNum=3. Isso significa que os registros 124 e 126 não estão na tabela, e a tabela de destino final contém apenas o seguinte registro:
userId | name | city |
|---|---|---|
125 | Mercedes | Guadalajara |
Atualizações do Processo SCD Tipo 2
SCD Tipo 2 preserva um histórico completo de alterações, criando novas linhas para cada versão de um registro, com colunas __START_AT e __END_AT indicando quando cada versão estava ativa.
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Após executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:
userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
A mesa preserva toda a história. O usuário 123 possui duas versões (terminada na sequência 6 quando excluída). O usuário 125 possui três versões que mostram alterações na cidade. Registros com __END_AT = null estão atualmente ativos.
Rastreie um subconjunto de colunas com SCD Tipo 2.
Por default, SCD Tipo 2 cria uma nova versão sempre que o valor de qualquer coluna é alterado. Você pode especificar um subconjunto de colunas para rastrear, de forma que as alterações em outras colunas atualizem a versão atual no mesmo local, em vez de gerar um novo registro no histórico.
O exemplo a seguir exclui a coluna city da história acompanhamento:
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Como as alterações city não são rastreadas, as atualizações de cidade sobrescrevem a linha atual em vez de criar uma nova versão. A tabela de destino contém os seguintes registros:
userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
CDC AUTOMÁTICO DE exemplos de instantâneos
As seções a seguir fornecem exemplos de uso de AUTO CDC FROM SNAPSHOT para processar Snapshot em tabelas de destino SCD Tipo 1 ou Tipo 2. Para obter informações sobre quando usar esta API, consulte captura de dados de alterações (CDC) e Snapshot.
Exemplo: Captura instantânea do processo usando o tempo de ingestão pipeline
Utilize essa abordagem quando os snapshots chegarem regularmente e em ordem, e você puder confiar no timestamp de execução pipeline para o versionamento. Um novo Snapshot é incorporado a cada atualização pipeline .
Você pode ler Snapshots de vários tipos de origem, incluindo tabelas Delta , arquivos de armazenamento cloud e conexões JDBC .
o passo 1: Criar dados de exemplo
Crie uma tabela contendo dados de instantâneo. Execute o seguinte código a partir de um Notebook ou Databricks SQL na pasta explorations do seu pipeline:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
o passo 2: execução AUTO CDC FROM Snapshot
Desenvolva o pipeline declarativo LakeFlow Spark para executar o código nesta etapa.
Escolha um tipo de origem para a view de instantâneo (o código de criação de exemplo gera uma tabela Delta ):
Opção A: Ler de uma tabela Delta
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Opção B: Ler do armazenamento cloud
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opção C: Ler do JDBC (somente compute clássica)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Todas as opções, escreva para o destino
Em seguida, adicione a tabela e o fluxo de destino:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Após a primeira execução do pipeline, todos os registros são inseridos como linhas ativas:
userId | city | __START_AT | __END_AT |
|---|---|---|---|
1 | Oaxaca | 0 | null |
2 | Monterrey | 0 | null |
3 | Tijuana | 0 | null |
Para usar o SCD Tipo 1 e manter apenas o estado atual, defina stored_as_scd_type=1. Neste caso, a tabela de destino não inclui as colunas __START_AT e __END_AT .
o passo 3: Simule um novo Snapshot e execute novamente
Atualize a tabela de origem para simular a chegada de um novo Snapshot (execute este código a partir de um Notebook ou arquivo SQL na pasta explorations do seu pipeline):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
executar o pipeline novamente. AUTO CDC FROM SNAPSHOT compara o novo Snapshot com o anterior e detecta que o usuário 1 foi excluído, os usuários 2 e 3 foram atualizados e os usuários 4 e 6 foram inseridos. Isso gera um feed de alterações e usa AUTO CDC para criar a tabela de saída.
Após a segunda execução com SCD Tipo 2, a tabela de destino contém os seguintes registros:
userId | city | __START_AT | __END_AT |
|---|---|---|---|
1 | Oaxaca | 0 | 1 |
2 | Monterrey | 0 | 1 |
2 | Carmelo | 1 | null |
3 | Tijuana | 0 | 1 |
3 | Los Angeles | 1 | null |
4 | Vale da Morte | 1 | null |
6 | Kings Canyon | 1 | null |
O usuário 1 foi encerrado (excluído). Os usuários 2 e 3 têm duas versões cada, mostrando as alterações em suas cidades. Os usuários 4 e 6 foram inseridos recentemente.
Após a segunda execução com SCD Tipo 1, a tabela de destino mostra apenas o estado atual:
userId | city |
|---|---|
2 | Carmelo |
3 | Los Angeles |
4 | Vale da Morte |
6 | Kings Canyon |
Exemplo: Captura instantânea do processo usando funções de versão
Utilize essa abordagem quando precisar de controle explícito sobre a ordem dos snapshots. Por exemplo, utilize esta abordagem quando vários Snapshots chegarem ao mesmo tempo ou quando os Snapshots chegarem fora de ordem. Você escreve uma função que especifica qual Snapshot processar em seguida e seu número de versão. A API processa os Snapshots em ordem crescente de versão:
- Se houver vários Snapshots armazenados, todos serão processados em ordem.
- Se um Snapshot chegar fora de ordem (por exemplo,
snapshot_3chegar depoissnapshot_4), ele será ignorado. - Se não houver novos Snapshots, a função retorna
Nonee nenhum processamento ocorre.
o passo 1: Preparar arquivos de instantâneo
Crie arquivos CSV contendo dados de Snapshot e adicione-os a um volume ou local de armazenamento cloud . Nomeie os arquivos cronologicamente (por exemplo, snapshot_1.csv, snapshot_2.csv).
Cada arquivo deve conter colunas para userId e city. Por exemplo:
snapshot_1.csv :
userId | city |
|---|---|
1 | Oaxaca |
2 | Monterrey |
3 | Tijuana |
snapshot_2.csv :
userId | city |
|---|---|
2 | Carmelo |
3 | Los Angeles |
4 | Vale da Morte |
o passo 2: execução AUTO CDC FROM Snapshot com função de versão
Crie um novo Notebook e cole o seguinte código pipeline . Em seguida, desenvolva o pipeline declarativo LakeFlow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Para usar o SCD Tipo 1 em vez disso, defina stored_as_scd_type=1.
Após o processamento snapshot_1.csv, a tabela de destino contém os seguintes registros:
userId | city | __START_AT | __END_AT |
|---|---|---|---|
1 | Oaxaca | 1 | null |
2 | Monterrey | 1 | null |
3 | Tijuana | 1 | null |
Após o processamento snapshot_2.csv, a tabela de destino contém os seguintes registros:
userId | city | __START_AT | __END_AT |
|---|---|---|---|
1 | Oaxaca | 1 | 2 |
2 | Monterrey | 1 | 2 |
2 | Carmelo | 2 | null |
3 | Tijuana | 1 | 2 |
3 | Los Angeles | 2 | null |
4 | Vale da Morte | 2 | null |
Lembre-se de que, para SCD Tipo 1, a tabela é exatamente igual ao Snapshot mais recente. A diferença é que as consultas subsequentes podem usar o feed de alterações para processar apenas os registros modificados.
o passo 3: Adicionar novo instantâneo
Adicione um novo arquivo CSV ao local de armazenamento com os dados modificados (por exemplo, valores de cidade alterados, novas linhas ou linhas removidas). Em seguida, execute o pipeline novamente para processar o novo Snapshot.
Limitações
- A coluna de sequenciamento deve ser de um tipo de dados classificável. Os valores de sequenciamento
NULLnão são suportados. AUTO CDC FROM SNAPSHOTÉ suportado apenas na interface de pipeline Python; a interface SQL não é suportada.
Recurso adicional
- captura de dados de alterações (CDC) e Snapshot: Aprenda sobre conceitos CDC , Snapshot e tipos SCD .
- Replicar uma tabela RDBMS externa usando
AUTO CDC: Aprenda como realizar a hidratação inicial com um fluxooncee, em seguida, continuar o processamento de alterações. - Tópicos avançados do AUTO CDC: Aprenda sobre operações de alteração em destinos do AUTO CDC, leitura de feeds de dados de alteração e processamento de métricas.
- tutorial: Construa um pipeline ETL usando captura de dados de alterações (CDC)