Replicar uma tabela RDBMS externa usando AUTO CDC
Esta página explica como replicar uma tabela de um sistema de gerenciamento de banco de dados relacional externo (RDBMS) para Databricks usando a API AUTO CDC
no pipeline declarativo LakeFlow . Você aprenderá:
- Padrões comuns para configurar as fontes.
- Como executar uma cópia completa única do fluxo de uso de dados a
once
existente. - Como ingerir continuamente novas alterações usando um fluxo
change
.
Este padrão é ideal para construir tabelas de dimensões que mudam lentamente (SCD) ou manter uma tabela de destino sincronizada com um sistema de registro externo.
Antes de começar
Este guia pressupõe que você tenha acesso ao seguinte conjunto de dados da sua fonte:
- Um instantâneo completo da tabela de origem no armazenamento cloud . Este dataset é usado para o carregamento inicial.
- Um feed de alterações contínuas, preenchido no mesmo local de armazenamento cloud (por exemplo, usando Debezium, Kafka ou CDC baseado em log ). Este feed é a entrada para o processo
AUTO CDC
em andamento.
Configurar visualização de origem
Primeiro, defina duas visualizações de origem para preencher a tabela de destino rdbms_orders
a partir de um caminho de armazenamento cloud orders_snapshot_path
. Ambos são construídos como visualização de transmissão sobre dados brutos em armazenamento cloud . O uso da visualização proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no processo AUTO CDC
.
- A primeira view da fonte é um Snapshot completo (
full_orders_snapshot
) - O segundo é um feed de mudança contínua (
rdbms_orders_change_feed
).
Os exemplos neste guia usam armazenamento cloud como fonte, mas você pode usar qualquer fonte suportada pelas tabelas de transmissão.
full_orders_snapshot()
Este passo cria uma view de pipeline declarativa LakeFlow que lê o instantâneo inicial completo dos dados dos pedidos.
- Python
- SQL
O seguinte exemplo em Python:
- Utiliza
spark.readStream
com Auto Loader (format("cloudFiles")
) - arquivos read.json de um diretório definido por
orders_snapshot_path
- Define
includeExistingFiles
comotrue
para garantir que os dados históricos já presentes no caminho sejam processados - Define
inferColumnTypes
comotrue
para inferir o esquema automaticamente - Retorna todas as colunas com
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
O exemplo SQL a seguir passa opções como um mapa de strings par key-valor. orders_snapshot_path
deve estar disponível como uma variável SQL (por exemplo, definida usando parâmetros de pipeline ou interpolada manualmente).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Este passo cria uma segunda view de pipeline declarativa LakeFlow que lê dados de alterações incrementais (por exemplo, de logs CDC ou tabelas de alterações). Ele lê de orders_cdc_path
e assume que arquivos JSON no estilo CDC são colocados nesse caminho regularmente.
- Python
- SQL
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
No exemplo SQL a seguir, ${orders_cdc_path}
é uma variável e pode ser interpolada definindo um valor nas configurações do pipeline ou definindo explicitamente uma variável no seu código.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Hidratação inicial (fluxo único)
Agora que as fontes estão configuradas, AUTO CDC
logicamente mescla ambas as fontes em uma tabela de transmissão de destino. Primeiro, use um fluxo AUTO CDC
único com ONCE=TRUE
para copiar o conteúdo completo da tabela RDBMS em uma tabela de transmissão. Isso prepara a tabela de destino com dados históricos sem reproduzi-los em atualizações futuras.
- Python
- SQL
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
O fluxo once
é executado apenas uma vez. Novos arquivos adicionados a full_orders_snapshot
após a criação do pipeline são ignorados.
Realizando uma refresh completa na tabela de transmissão rdbms_orders
re-execução do fluxo once
. Se os dados iniciais do Snapshot no armazenamento cloud forem removidos, isso resultará em perda de dados.
Alimentação de mudança contínua (fluxo de mudança)
Após o carregamento inicial do Snapshot, use outro fluxo AUTO CDC
para ingerir continuamente as alterações do feed CDC do RDBMS . Isso mantém sua tabela rdbms_orders
atualizada com inserções, atualizações e exclusões.
- Python
- SQL
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Considerações
Idempotência de preenchimento | Um fluxo |
---|---|
Fluxos múltiplos | Você pode usar vários fluxos de alteração para merge correções, dados recebidos tardiamente ou feeds alternativos, mas todos devem compartilhar um esquema e uma chave. |
refresh completo | Uma refresh completa na tabela de transmissão |
Ordem de execução do fluxo | A ordem de execução do fluxo não importa. O resultado final é o mesmo. |
Recurso adicional
- ConectorSQL Server totalmente gerenciado no LakeFlow Connect