Pular para o conteúdo principal

Replicar uma tabela RDBMS externa usando AUTO CDC

Esta página orienta o senhor sobre como replicar uma tabela de um sistema de gerenciamento de banco de dados relacional externo (RDBMS) para Databricks usando o pipeline declarativo AUTO CDC API em LakeFlow. Você aprenderá:

  • Padrões comuns para configurar as fontes.
  • Como realizar uma cópia completa única do fluxo de uso de dados existente em once.
  • Como ingerir continuamente novas alterações usando um fluxo change.

Esse padrão é ideal para criar tabelas de dimensões que mudam lentamente (SCD) (SCD) ou manter uma tabela de destino em sincronia com um sistema externo de registro.

Antes de começar

Este guia pressupõe que o senhor tenha acesso ao seguinte conjunto de dados da sua fonte:

  • Um Snapshot completo da tabela de origem no armazenamento cloud. Esse site dataset é usado para a carga inicial.
  • Um feed de alterações contínuas, preenchido no mesmo local de armazenamento cloud (por exemplo, usando Debezium, Kafka ou CDC com base em log). Esse feed é a entrada para o processo AUTO CDC em andamento.

Configurar a visualização da fonte

Primeiro, defina duas visualizações de origem para preencher a tabela de destino rdbms_orders a partir de um caminho de armazenamento cloud orders_snapshot_path. Ambos são construídos como visualização de transmissão sobre dados brutos no armazenamento cloud. O uso da visualização proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no processo AUTO CDC.

  • A primeira fonte view é um Snapshot completo (full_orders_snapshot)
  • O segundo é um feed de mudanças contínuas (rdbms_orders_change_feed).

Os exemplos deste guia usam o armazenamento cloud como fonte, mas o senhor pode usar qualquer fonte compatível com as tabelas de transmissão.

full_orders_snapshot()

Esse passo cria um pipeline declarativo LakeFlow view que lê o Snapshot completo inicial dos dados dos pedidos.

O exemplo Python a seguir:

  • Usa o site spark.readStream com Auto Loader (format("cloudFiles"))
  • read.json de um diretório definido por orders_snapshot_path
  • Define includeExistingFiles como true para garantir que os dados históricos já presentes no caminho sejam processados.
  • Define inferColumnTypes como true para inferir o esquema automaticamente
  • Retorna todas as colunas com .select("\*")
Python
@dlt.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)

rdbms_orders_change_feed()

Esse passo cria um segundo pipeline declarativo LakeFlow view que lê dados de alterações incrementais (por exemplo, de CDC logs ou tabelas de alterações). Ele lê em orders_cdc_path e pressupõe que os arquivos JSON no estilo CDC são inseridos nesse caminho regularmente.

Python
@dlt.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

Hidratação inicial (uma vez que flui)

Agora que as fontes estão configuradas, a lógica do AUTO CDC mescla ambas as fontes em uma tabela de transmissão de destino. Primeiro, use um fluxo único AUTO CDC com ONCE=TRUE para copiar o conteúdo completo da tabela RDBMS em uma tabela de transmissão. Isso prepara a tabela de destino com dados históricos sem reproduzi-los em atualizações futuras.

Python
# Step 1: Create the target streaming table

create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = lit(0), # constant sequence since this is a static snapshot
stored_as_scd_type = "1"
)

O fluxo once é executado apenas uma vez. Os novos arquivos adicionados a full_orders_snapshot após a criação do pipeline são ignorados.

important

Realizar um refresh completo na tabela de transmissão rdbms_orders re-execução do fluxo once. Se os dados iniciais do Snapshot no armazenamento cloud tiverem sido removidos, isso resultará em perda de dados.

Alimentação de mudança contínua (fluxo de mudança)

Após o carregamento inicial do Snapshot, use outro fluxo AUTO CDC para ingerir continuamente as alterações do feed CDC do RDBMS. Isso mantém sua tabela rdbms_orders atualizada com inserções, atualizações e exclusões.

Python
# Step 3: Change Flow — Ingest ongoing CDC stream from source system

autocdc(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "change_seq",
stored_as_scd_type = "1",
ignore_null_updates = True,
apply_as_deletes = "op = 'DELETE'"
)

Considerações

Idempotência de preenchimento

Um fluxo once só volta a ser executado quando a tabela de destino é totalmente atualizada.

Vários fluxos

O senhor pode usar vários fluxos de alteração para merge em correções, dados de chegada tardia ou feeds alternativos, mas todos devem compartilhar um esquema e uma chave.

refresh completo

Um refresh completo na tabela de transmissão rdbms_orders re-execução do fluxo once. Isso pode levar à perda de dados se o local de armazenamento inicial do cloud tiver eliminado os dados do Snapshot inicial.

Ordem de execução do fluxo

A ordem de execução do fluxo não importa. O resultado final é o mesmo.

Recurso adicional