Pular para o conteúdo principal

Replicar uma tabela RDBMS externa usando AUTO CDC

Esta página explica como replicar uma tabela de um sistema de gerenciamento de banco de dados relacional externo (RDBMS) para Databricks usando a API AUTO CDC no pipeline declarativo LakeFlow . Você aprenderá:

  • Padrões comuns para configurar as fontes.
  • Como executar uma cópia completa única do fluxo de uso de dados a once existente.
  • Como ingerir continuamente novas alterações usando um fluxo change .

Este padrão é ideal para construir tabelas de dimensões que mudam lentamente (SCD) ou manter uma tabela de destino sincronizada com um sistema de registro externo.

Antes de começar

Este guia pressupõe que você tenha acesso ao seguinte conjunto de dados da sua fonte:

  • Um instantâneo completo da tabela de origem no armazenamento cloud . Este dataset é usado para o carregamento inicial.
  • Um feed de alterações contínuas, preenchido no mesmo local de armazenamento cloud (por exemplo, usando Debezium, Kafka ou CDC baseado em log ). Este feed é a entrada para o processo AUTO CDC em andamento.

Configurar visualização de origem

Primeiro, defina duas visualizações de origem para preencher a tabela de destino rdbms_orders a partir de um caminho de armazenamento cloud orders_snapshot_path. Ambos são construídos como visualização de transmissão sobre dados brutos em armazenamento cloud . O uso da visualização proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no processo AUTO CDC .

  • A primeira view da fonte é um Snapshot completo (full_orders_snapshot)
  • O segundo é um feed de mudança contínua (rdbms_orders_change_feed).

Os exemplos neste guia usam armazenamento cloud como fonte, mas você pode usar qualquer fonte suportada pelas tabelas de transmissão.

full_orders_snapshot()

Este passo cria uma view de pipeline declarativa LakeFlow que lê o instantâneo inicial completo dos dados dos pedidos.

O seguinte exemplo em Python:

  • Utiliza spark.readStream com Auto Loader (format("cloudFiles"))
  • arquivos read.json de um diretório definido por orders_snapshot_path
  • Define includeExistingFiles como true para garantir que os dados históricos já presentes no caminho sejam processados
  • Define inferColumnTypes como true para inferir o esquema automaticamente
  • Retorna todas as colunas com .select("\*")
Python
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)

rdbms_orders_change_feed()

Este passo cria uma segunda view de pipeline declarativa LakeFlow que lê dados de alterações incrementais (por exemplo, de logs CDC ou tabelas de alterações). Ele lê de orders_cdc_path e assume que arquivos JSON no estilo CDC são colocados nesse caminho regularmente.

Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

Hidratação inicial (fluxo único)

Agora que as fontes estão configuradas, AUTO CDC logicamente mescla ambas as fontes em uma tabela de transmissão de destino. Primeiro, use um fluxo AUTO CDC único com ONCE=TRUE para copiar o conteúdo completo da tabela RDBMS em uma tabela de transmissão. Isso prepara a tabela de destino com dados históricos sem reproduzi-los em atualizações futuras.

Python
from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

O fluxo once é executado apenas uma vez. Novos arquivos adicionados a full_orders_snapshot após a criação do pipeline são ignorados.

important

Realizando uma refresh completa na tabela de transmissão rdbms_orders re-execução do fluxo once . Se os dados iniciais do Snapshot no armazenamento cloud forem removidos, isso resultará em perda de dados.

Alimentação de mudança contínua (fluxo de mudança)

Após o carregamento inicial do Snapshot, use outro fluxo AUTO CDC para ingerir continuamente as alterações do feed CDC do RDBMS . Isso mantém sua tabela rdbms_orders atualizada com inserções, atualizações e exclusões.

Python
from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

Considerações

Idempotência de preenchimento

Um fluxo once só será reexecutado quando a tabela de destino for totalmente atualizada.

Fluxos múltiplos

Você pode usar vários fluxos de alteração para merge correções, dados recebidos tardiamente ou feeds alternativos, mas todos devem compartilhar um esquema e uma chave.

refresh completo

Uma refresh completa na tabela de transmissão rdbms_orders reexecução do fluxo once . Isso pode levar à perda de dados se o local de armazenamento cloud inicial tiver eliminado os dados iniciais do Snapshot.

Ordem de execução do fluxo

A ordem de execução do fluxo não importa. O resultado final é o mesmo.

Recurso adicional