Pular para o conteúdo principal

APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline

O pipeline declarativo LakeFlow Spark simplifica a captura de dados de alterações (CDC) (CDC) com as APIs AUTO CDC e AUTO CDC FROM SNAPSHOT . Essas APIs automatizam a complexidade de calcular dimensões que mudam lentamente (SCD) (SCD) Tipo 1 e Tipo 2 a partir de um feed CDC ou Snapshot de banco de dados. Para saber mais sobre esses conceitos, consulte captura de dados de alterações (CDC) e Snapshot.

nota

As APIs AUTO CDC substituem as APIs APPLY CHANGES e têm a mesma sintaxe. As APIs APPLY CHANGES ainda estão disponíveis, mas a Databricks recomenda o uso das APIs AUTO CDC em seu lugar.

A API que você utiliza depende da origem dos seus dados de alteração:

  • AUTO CDC : Use isso quando o banco de dados de origem tiver um feed CDC ativado. AUTO CDC processa alterações de um feed de dados de alteração (CDF). É compatível com as interfaces SQL e Python do pipeline.
  • AUTO CDC FROM SNAPSHOT : Use esta opção quando CDC não estiver habilitado no banco de dados de origem e apenas os Snapshots estiverem disponíveis. Esta API compara o Snapshot para determinar as alterações e, em seguida, as processa. Só é suportado na interface Python.

Ambas as APIs suportam a atualização de tabelas usando SCD Tipo 1 e Tipo 2:

  • Utilize o SCD Tipo 1 para atualizar registros diretamente. não é mantido para fins de atualização de registros.
  • Use o SCD tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações de um conjunto específico de colunas.

As APIs AUTO CDC não são suportadas pelo pipeline declarativo Apache Spark .

Para sintaxe e outras referências, consulte AUTO CDC INTO (pipeline), create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.

nota

Esta página descreve como atualizar tabelas em seu pipeline com base em alterações nos dados de origem. Para aprender como registrar e consultar informações de alterações em nível de linha para tabelas Delta , consulte Usar o feed de dados de alterações Delta Lake no Databricks.

Requisitos

Para usar as APIs CDC , seu pipeline deve ser configurado para usar o SDPserverless ou as edições SDP Pro ou Advanced .

Como funciona o AUTO CDC

Para realizar o processamento CDC com AUTO CDC, crie uma tabela de transmissão e use a instrução AUTO CDC ... INTO em SQL ou a função create_auto_cdc_flow() em Python para especificar a origem, a chave e a sequência para o feed de alterações. Para obter uma explicação de como funcionam o sequenciamento e a lógica SCD , consulte captura de dados de alterações (CDC) e Snapshot. Veja os exemplos do AUTO CDC.

Para hidratação inicial a partir de uma fonte com alimentação de mudança, use AUTO CDC com um fluxo once e continue processando a alimentação de mudança. Consulte Replicar uma tabela RDBMS externa usando o AUTO CDC.

Para detalhes de sintaxe, consulte AUTO CDC INTO (pipeline) ou create_auto_cdc_flow.

Como funciona o AUTO CDC do Snapshot

AUTO CDC FROM SNAPSHOT Determina as alterações nos dados de origem comparando os Snapshots em ordem. Só é suportado na interface de pipeline do Python. Você pode ler o Snapshot diretamente de uma tabela Delta , arquivos de armazenamento cloud ou JDBC .

Para realizar o processamento CDC com AUTO CDC FROM SNAPSHOT, crie uma tabela de transmissão e use a função create_auto_cdc_from_snapshot_flow() para especificar o Snapshot, a chave e outros argumentos. Para obter detalhes sobre os dois padrões de ingestão e quando usar cada um, consulte Padrões de processamentoSnapshot. Veja os exemplos de Auto CDC FROM Snapshot.

Para detalhes de sintaxe, consulte create_auto_cdc_from_snapshot_flow.

Use várias colunas para sequenciamento

Para sequenciar por várias colunas (por exemplo, um carimbo de data/hora e um ID para desempatar), use STRUCT para combiná-las. A API ordena os resultados pelo primeiro campo e, em caso de empate, considera o segundo campo, e assim por diante.

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

Exemplos de AUTO CDC

Os exemplos a seguir demonstram o processamento de SCD Tipo 1 e Tipo 2 usando uma fonte de dados de alteração. Os dados de exemplo criam novos registros de usuário, excluem registros de usuário e atualizam registros de usuário. No exemplo SCD Tipo 1, as últimas operações UPDATE chegam atrasadas e são descartadas da tabela de destino, demonstrando o tratamento de eventos fora de ordem.

A seguir, estão os registros de entrada usados nestes exemplos. Esses dados são criados executando a consulta na seção Criar dados de exemplo .

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

Se você remover o comentário da última linha na consulta de geração de dados de exemplo, será inserido o seguinte registro que especifica o truncamento da tabela (limpeza da tabela) em sequenceNum=3:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

nota

Todos os exemplos a seguir incluem opções para especificar as operações DELETE e TRUNCATE , mas cada uma é opcional.

Criar dados de exemplo

Execute as seguintes instruções para criar um dataset de exemplo. Este código não se destina a ser executado como parte de uma definição pipeline . execute-o a partir da pasta de exploração do seu pipeline, em vez da pasta de transformações.

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

Atualizações do Processo SCD Tipo 1

O SCD Tipo 1 armazena apenas a versão mais recente de cada registro. O exemplo a seguir lê os dados de alteração do feed criado acima e aplica as alterações a uma tabela de transmissão de destino. Desenvolva o pipeline declarativo LakeFlow Spark para executar este código.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

Após executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

O usuário 123 (Isabel) foi excluído e não aparece mais. O usuário 125 (Mercedes) mostra apenas a cidade mais recente (Guadalajara) porque o SCD Tipo 1 sobrescreve os valores anteriores. O UPDATE anterior em sequenceNum=5 foi removido porque uma atualização posterior em sequenceNum=6 chegou.

Após executar o exemplo com o registro TRUNCATE descomentado, a tabela é limpa em sequenceNum=3. Isso significa que os registros 124 e 126 não estão na tabela, e a tabela de destino final contém apenas o seguinte registro:

userId

name

city

125

Mercedes

Guadalajara

Atualizações do Processo SCD Tipo 2

SCD Tipo 2 preserva um histórico completo de alterações, criando novas linhas para cada versão de um registro, com colunas __START_AT e __END_AT indicando quando cada versão estava ativa.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

Após executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

A mesa preserva toda a história. O usuário 123 possui duas versões (terminada na sequência 6 quando excluída). O usuário 125 possui três versões que mostram alterações na cidade. Registros com __END_AT = null estão atualmente ativos.

Rastreie um subconjunto de colunas com SCD Tipo 2.

Por default, SCD Tipo 2 cria uma nova versão sempre que o valor de qualquer coluna é alterado. Você pode especificar um subconjunto de colunas para rastrear, de forma que as alterações em outras colunas atualizem a versão atual no mesmo local, em vez de gerar um novo registro no histórico.

O exemplo a seguir exclui a coluna city da história acompanhamento:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

Como as alterações city não são rastreadas, as atualizações de cidade sobrescrevem a linha atual em vez de criar uma nova versão. A tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

CDC AUTOMÁTICO DE exemplos de instantâneos

As seções a seguir fornecem exemplos de uso de AUTO CDC FROM SNAPSHOT para processar Snapshot em tabelas de destino SCD Tipo 1 ou Tipo 2. Para obter informações sobre quando usar esta API, consulte captura de dados de alterações (CDC) e Snapshot.

Exemplo: Captura instantânea do processo usando o tempo de ingestão pipeline

Utilize essa abordagem quando os snapshots chegarem regularmente e em ordem, e você puder confiar no timestamp de execução pipeline para o versionamento. Um novo Snapshot é incorporado a cada atualização pipeline .

Você pode ler Snapshots de vários tipos de origem, incluindo tabelas Delta , arquivos de armazenamento cloud e conexões JDBC .

o passo 1: Criar dados de exemplo

Crie uma tabela contendo dados de instantâneo. Execute o seguinte código a partir de um Notebook ou Databricks SQL na pasta explorations do seu pipeline:

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');

o passo 2: execução AUTO CDC FROM Snapshot

Desenvolva o pipeline declarativo LakeFlow Spark para executar o código nesta etapa.

Escolha um tipo de origem para a view de instantâneo (o código de criação de exemplo gera uma tabela Delta ):

Opção A: Ler de uma tabela Delta

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")

Opção B: Ler do armazenamento cloud

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")

Opção C: Ler do JDBC (somente compute clássica)

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)

Todas as opções, escreva para o destino

Em seguida, adicione a tabela e o fluxo de destino:

Python
dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)

Após a primeira execução do pipeline, todos os registros são inseridos como linhas ativas:

userId

city

__START_AT

__END_AT

1

Oaxaca

0

null

2

Monterrey

0

null

3

Tijuana

0

null

nota

Para usar o SCD Tipo 1 e manter apenas o estado atual, defina stored_as_scd_type=1. Neste caso, a tabela de destino não inclui as colunas __START_AT e __END_AT .

o passo 3: Simule um novo Snapshot e execute novamente

Atualize a tabela de origem para simular a chegada de um novo Snapshot (execute este código a partir de um Notebook ou arquivo SQL na pasta explorations do seu pipeline):

SQL
TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');

executar o pipeline novamente. AUTO CDC FROM SNAPSHOT compara o novo Snapshot com o anterior e detecta que o usuário 1 foi excluído, os usuários 2 e 3 foram atualizados e os usuários 4 e 6 foram inseridos. Isso gera um feed de alterações e usa AUTO CDC para criar a tabela de saída.

Após a segunda execução com SCD Tipo 2, a tabela de destino contém os seguintes registros:

userId

city

__START_AT

__END_AT

1

Oaxaca

0

1

2

Monterrey

0

1

2

Carmelo

1

null

3

Tijuana

0

1

3

Los Angeles

1

null

4

Vale da Morte

1

null

6

Kings Canyon

1

null

O usuário 1 foi encerrado (excluído). Os usuários 2 e 3 têm duas versões cada, mostrando as alterações em suas cidades. Os usuários 4 e 6 foram inseridos recentemente.

Após a segunda execução com SCD Tipo 1, a tabela de destino mostra apenas o estado atual:

userId

city

2

Carmelo

3

Los Angeles

4

Vale da Morte

6

Kings Canyon

Exemplo: Captura instantânea do processo usando funções de versão

Utilize essa abordagem quando precisar de controle explícito sobre a ordem dos snapshots. Por exemplo, utilize esta abordagem quando vários Snapshots chegarem ao mesmo tempo ou quando os Snapshots chegarem fora de ordem. Você escreve uma função que especifica qual Snapshot processar em seguida e seu número de versão. A API processa os Snapshots em ordem crescente de versão:

  • Se houver vários Snapshots armazenados, todos serão processados em ordem.
  • Se um Snapshot chegar fora de ordem (por exemplo, snapshot_3 chegar depois snapshot_4), ele será ignorado.
  • Se não houver novos Snapshots, a função retorna None e nenhum processamento ocorre.

o passo 1: Preparar arquivos de instantâneo

Crie arquivos CSV contendo dados de Snapshot e adicione-os a um volume ou local de armazenamento cloud . Nomeie os arquivos cronologicamente (por exemplo, snapshot_1.csv, snapshot_2.csv).

Cada arquivo deve conter colunas para userId e city. Por exemplo:

snapshot_1.csv :

userId

city

1

Oaxaca

2

Monterrey

3

Tijuana

snapshot_2.csv :

userId

city

2

Carmelo

3

Los Angeles

4

Vale da Morte

o passo 2: execução AUTO CDC FROM Snapshot com função de versão

Crie um novo Notebook e cole o seguinte código pipeline . Em seguida, desenvolva o pipeline declarativo LakeFlow Spark.

Python
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue

snapshot_versions.sort()

if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None

snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
nota

Para usar o SCD Tipo 1 em vez disso, defina stored_as_scd_type=1.

Após o processamento snapshot_1.csv, a tabela de destino contém os seguintes registros:

userId

city

__START_AT

__END_AT

1

Oaxaca

1

null

2

Monterrey

1

null

3

Tijuana

1

null

Após o processamento snapshot_2.csv, a tabela de destino contém os seguintes registros:

userId

city

__START_AT

__END_AT

1

Oaxaca

1

2

2

Monterrey

1

2

2

Carmelo

2

null

3

Tijuana

1

2

3

Los Angeles

2

null

4

Vale da Morte

2

null

nota

Lembre-se de que, para SCD Tipo 1, a tabela é exatamente igual ao Snapshot mais recente. A diferença é que as consultas subsequentes podem usar o feed de alterações para processar apenas os registros modificados.

o passo 3: Adicionar novo instantâneo

Adicione um novo arquivo CSV ao local de armazenamento com os dados modificados (por exemplo, valores de cidade alterados, novas linhas ou linhas removidas). Em seguida, execute o pipeline novamente para processar o novo Snapshot.

Limitações

  • A coluna de sequenciamento deve ser de um tipo de dados classificável. Os valores de sequenciamento NULL não são suportados.
  • AUTO CDC FROM SNAPSHOT É suportado apenas na interface de pipeline Python; a interface SQL não é suportada.

Recurso adicional