Replicate an external RDBMS table using AUTO CDC
This page walks you through how to replicate a table from an external relational database management system (RDBMS) into Databricks using the AUTO CDC
API in Lakeflow Declarative Pipelines. You’ll learn:
- Common patterns for setting up the sources.
- How to perform a one-time full copy of the existing data using a
once
flow. - How to continuously ingest new changes using a
change
flow.
This pattern is ideal for building slowly changing dimension (SCD) tables or keeping a target table in sync with an external system of record.
Before you begin
This guide assumes that you have access to the following datasets from your source:
- A full snapshot of the source table in cloud storage. This dataset is used for the initial load.
- A continuous change feed, populated into the same cloud storage location (for example, using Debezium, Kafka, or log-based CDC). This feed is the input for the ongoing
AUTO CDC
process.
Set up source views
First, define two source views to populate the rdbms_orders
target table from a cloud storage path orders_snapshot_path
. Both are built as streaming views over raw data in cloud storage. Using views provides higher efficiency because the data doesn’t have to be written before used in the AUTO CDC
process.
- The first source view is a full snapshot (
full_orders_snapshot
) - The second is a continuous change feed (
rdbms_orders_change_feed
).
The examples in this guide use cloud storage as the source, but you can use any source supported by streaming tables.
full_orders_snapshot()
This step creates a Lakeflow Declarative Pipelines view that reads the initial full snapshot of the orders data.
- Python
- SQL
The following Python example:
- Uses
spark.readStream
with Auto Loader (format("cloudFiles")
) - Reads JSON files from a directory defined by
orders_snapshot_path
- Sets
includeExistingFiles
totrue
to ensure historical data already present in the path is processed - Sets
inferColumnTypes
totrue
to infer the schema automatically - Returns all columns with
.select("\*")
@dlt.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
The following SQL example passes options as a map of string key-value pairs. orders_snapshot_path
should be available as a SQL variable (for example, defined using pipeline parameters or manually interpolated).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
This step creates a second Lakeflow Declarative Pipelines view that reads incremental change data (for example, from CDC logs or change tables). It reads from orders_cdc_path
and assumes that CDC-style JSON files are dropped into this path regularly.
- Python
- SQL
@dlt.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
In the following SQL example, ${orders_cdc_path}
is a variable and can be interpolated by setting a value in your pipeline settings or explicitly setting a variable in your code.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Initial hydration (once flow)
Now that the sources are set up, AUTO CDC
logic merges both sources into a target streaming table. First, use a one-time AUTO CDC
flow with ONCE=TRUE
to copy the full contents of the RDBMS table into a streaming table. This prepares the target table with historical data without replaying it in future updates.
- Python
- SQL
# Step 1: Create the target streaming table
create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = lit(0), # constant sequence since this is a static snapshot
stored_as_scd_type = "1"
)
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
AUTO CDC ONCE INTO rdbms_orders
FROM full_orders_snapshot
KEYS (order_id)
SEQUENCE BY literal(0)
STORED AS SCD TYPE 1;
The once
flow only runs one time. New files that are added to full_orders_snapshot
after pipeline creation are ignored.
Performing a full refresh on the rdbms_orders
streaming table re-runs the once
flow. If the initial snapshot data in cloud storage has been removed, this results in data loss.
Continuous change feed (change flow)
After the initial snapshot load, use another AUTO CDC
flow to continuously ingest changes from the RDBMS’s CDC feed. This keeps your rdbms_orders
table up to date with inserts, updates, and deletes.
- Python
- SQL
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
autocdc(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "change_seq",
stored_as_scd_type = "1",
ignore_null_updates = True,
apply_as_deletes = "op = 'DELETE'"
)
-- Step 3: Continuous CDC ingestion
AUTO CDC INTO rdbms_orders
FROM rdbms_orders_change_feed
KEYS (order_id)
SEQUENCE BY change_seq
STORED AS SCD TYPE 1
IGNORE NULL UPDATES
APPLY AS DELETES WHERE op = 'DELETE';
Considerations
Backfill idempotency | A |
---|---|
Multiple flows | You can use multiple change flows to merge in corrections, late-arriving data, or alternative feeds, but all must share a schema and keys. |
Full refresh | A full refresh on the |
Flow execution order | The order of flow execution doesn’t matter. The end result is the same. |
Additional resources
- Fully-managed SQL Server connector in Lakeflow Connect