Replicar uma tabela RDBMS externa usando AUTO CDC
Esta página orienta o senhor sobre como replicar uma tabela de um sistema de gerenciamento de banco de dados relacional externo (RDBMS) para Databricks usando o pipeline declarativo AUTO CDC
API em LakeFlow. Você aprenderá:
- Padrões comuns para configurar as fontes.
- Como realizar uma cópia completa única do fluxo de uso de dados existente em
once
. - Como ingerir continuamente novas alterações usando um fluxo
change
.
Esse padrão é ideal para criar tabelas de dimensões que mudam lentamente (SCD) (SCD) ou manter uma tabela de destino em sincronia com um sistema externo de registro.
Antes de começar
Este guia pressupõe que o senhor tenha acesso ao seguinte conjunto de dados da sua fonte:
- Um Snapshot completo da tabela de origem no armazenamento cloud. Esse site dataset é usado para a carga inicial.
- Um feed de alterações contínuas, preenchido no mesmo local de armazenamento cloud (por exemplo, usando Debezium, Kafka ou CDC com base em log). Esse feed é a entrada para o processo
AUTO CDC
em andamento.
Configurar a visualização da fonte
Primeiro, defina duas visualizações de origem para preencher a tabela de destino rdbms_orders
a partir de um caminho de armazenamento cloud orders_snapshot_path
. Ambos são construídos como visualização de transmissão sobre dados brutos no armazenamento cloud. O uso da visualização proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no processo AUTO CDC
.
- A primeira fonte view é um Snapshot completo (
full_orders_snapshot
) - O segundo é um feed de mudanças contínuas (
rdbms_orders_change_feed
).
Os exemplos deste guia usam o armazenamento cloud como fonte, mas o senhor pode usar qualquer fonte compatível com as tabelas de transmissão.
full_orders_snapshot()
Esse passo cria um pipeline declarativo LakeFlow view que lê o Snapshot completo inicial dos dados dos pedidos.
- Python
- SQL
O exemplo Python a seguir:
- Usa o site
spark.readStream
com Auto Loader (format("cloudFiles")
) - read.json de um diretório definido por
orders_snapshot_path
- Define
includeExistingFiles
comotrue
para garantir que os dados históricos já presentes no caminho sejam processados. - Define
inferColumnTypes
comotrue
para inferir o esquema automaticamente - Retorna todas as colunas com
.select("\*")
@dlt.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
O exemplo a seguir SQL passa as opções como um mapa de strings par key-value. orders_snapshot_path
deve estar disponível como uma variável SQL (por exemplo, definida usando parâmetros de pipeline ou interpolada manualmente).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Esse passo cria um segundo pipeline declarativo LakeFlow view que lê dados de alterações incrementais (por exemplo, de CDC logs ou tabelas de alterações). Ele lê em orders_cdc_path
e pressupõe que os arquivos JSON no estilo CDC são inseridos nesse caminho regularmente.
- Python
- SQL
@dlt.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
No exemplo de SQL a seguir, ${orders_cdc_path}
é uma variável e pode ser interpolada definindo um valor nas configurações do pipeline ou definindo explicitamente uma variável no código.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Hidratação inicial (uma vez que flui)
Agora que as fontes estão configuradas, a lógica do AUTO CDC
mescla ambas as fontes em uma tabela de transmissão de destino. Primeiro, use um fluxo único AUTO CDC
com ONCE=TRUE
para copiar o conteúdo completo da tabela RDBMS em uma tabela de transmissão. Isso prepara a tabela de destino com dados históricos sem reproduzi-los em atualizações futuras.
- Python
- SQL
# Step 1: Create the target streaming table
create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = lit(0), # constant sequence since this is a static snapshot
stored_as_scd_type = "1"
)
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
AUTO CDC ONCE INTO rdbms_orders
FROM full_orders_snapshot
KEYS (order_id)
SEQUENCE BY literal(0)
STORED AS SCD TYPE 1;
O fluxo once
é executado apenas uma vez. Os novos arquivos adicionados a full_orders_snapshot
após a criação do pipeline são ignorados.
Realizar um refresh completo na tabela de transmissão rdbms_orders
re-execução do fluxo once
. Se os dados iniciais do Snapshot no armazenamento cloud tiverem sido removidos, isso resultará em perda de dados.
Alimentação de mudança contínua (fluxo de mudança)
Após o carregamento inicial do Snapshot, use outro fluxo AUTO CDC
para ingerir continuamente as alterações do feed CDC do RDBMS. Isso mantém sua tabela rdbms_orders
atualizada com inserções, atualizações e exclusões.
- Python
- SQL
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
autocdc(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "change_seq",
stored_as_scd_type = "1",
ignore_null_updates = True,
apply_as_deletes = "op = 'DELETE'"
)
-- Step 3: Continuous CDC ingestion
AUTO CDC INTO rdbms_orders
FROM rdbms_orders_change_feed
KEYS (order_id)
SEQUENCE BY change_seq
STORED AS SCD TYPE 1
IGNORE NULL UPDATES
APPLY AS DELETES WHERE op = 'DELETE';
Considerações
Idempotência de preenchimento | Um fluxo |
---|---|
Vários fluxos | O senhor pode usar vários fluxos de alteração para merge em correções, dados de chegada tardia ou feeds alternativos, mas todos devem compartilhar um esquema e uma chave. |
refresh completo | Um refresh completo na tabela de transmissão |
Ordem de execução do fluxo | A ordem de execução do fluxo não importa. O resultado final é o mesmo. |
Recurso adicional
- totalmente gerenciado SQL Server connector in LakeFlow Connect