Skip to main content

Replicate an external RDBMS table using AUTO CDC

This page walks you through how to replicate a table from an external relational database management system (RDBMS) into Databricks using the AUTO CDC API in Lakeflow Declarative Pipelines. You’ll learn:

  • Common patterns for setting up the sources.
  • How to perform a one-time full copy of the existing data using a once flow.
  • How to continuously ingest new changes using a change flow.

This pattern is ideal for building slowly changing dimension (SCD) tables or keeping a target table in sync with an external system of record.

Before you begin

This guide assumes that you have access to the following datasets from your source:

  • A full snapshot of the source table in cloud storage. This dataset is used for the initial load.
  • A continuous change feed, populated into the same cloud storage location (for example, using Debezium, Kafka, or log-based CDC). This feed is the input for the ongoing AUTO CDC process.

Set up source views

First, define two source views to populate the rdbms_orders target table from a cloud storage path orders_snapshot_path. Both are built as streaming views over raw data in cloud storage. Using views provides higher efficiency because the data doesn’t have to be written before used in the AUTO CDC process.

  • The first source view is a full snapshot (full_orders_snapshot)
  • The second is a continuous change feed (rdbms_orders_change_feed).

The examples in this guide use cloud storage as the source, but you can use any source supported by streaming tables.

full_orders_snapshot()

This step creates a Lakeflow Declarative Pipelines view that reads the initial full snapshot of the orders data.

The following Python example:

  • Uses spark.readStream with Auto Loader (format("cloudFiles"))
  • Reads JSON files from a directory defined by orders_snapshot_path
  • Sets includeExistingFiles to true to ensure historical data already present in the path is processed
  • Sets inferColumnTypes to true to infer the schema automatically
  • Returns all columns with .select("\*")
Python
@dlt.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)

rdbms_orders_change_feed()

This step creates a second Lakeflow Declarative Pipelines view that reads incremental change data (for example, from CDC logs or change tables). It reads from orders_cdc_path and assumes that CDC-style JSON files are dropped into this path regularly.

Python
@dlt.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

Initial hydration (once flow)

Now that the sources are set up, AUTO CDC logic merges both sources into a target streaming table. First, use a one-time AUTO CDC flow with ONCE=TRUE to copy the full contents of the RDBMS table into a streaming table. This prepares the target table with historical data without replaying it in future updates.

Python
# Step 1: Create the target streaming table

create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = lit(0), # constant sequence since this is a static snapshot
stored_as_scd_type = "1"
)

The once flow only runs one time. New files that are added to full_orders_snapshot after pipeline creation are ignored.

important

Performing a full refresh on the rdbms_orders streaming table re-runs the once flow. If the initial snapshot data in cloud storage has been removed, this results in data loss.

Continuous change feed (change flow)

After the initial snapshot load, use another AUTO CDC flow to continuously ingest changes from the RDBMS’s CDC feed. This keeps your rdbms_orders table up to date with inserts, updates, and deletes.

Python
# Step 3: Change Flow — Ingest ongoing CDC stream from source system

autocdc(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "change_seq",
stored_as_scd_type = "1",
ignore_null_updates = True,
apply_as_deletes = "op = 'DELETE'"
)

Considerations

Backfill idempotency

A once flow only re-runs when the target table is fully refreshed.

Multiple flows

You can use multiple change flows to merge in corrections, late-arriving data, or alternative feeds, but all must share a schema and keys.

Full refresh

A full refresh on the rdbms_orders streaming table re-runs the once flow. This can lead to data loss if the initial cloud storage location has pruned away the initial snapshot data.

Flow execution order

The order of flow execution doesn’t matter. The end result is the same.

Additional resources