Skip to main content

The AUTO CDC APIs: Simplify change data capture with pipelines

Lakeflow Spark Declarative Pipelines simplifies change data capture (CDC) with the AUTO CDC and AUTO CDC FROM SNAPSHOT APIs. These APIs automate the complexity of computing slowly changing dimensions (SCD) Type 1 and Type 2 from either a CDC feed or database snapshots. To learn more about these concepts, see Change data capture and snapshots.

note

The AUTO CDC APIs replace the APPLY CHANGES APIs and have the same syntax. The APPLY CHANGES APIs are still available, but Databricks recommends using the AUTO CDC APIs in their place.

The API you use depends on the source of your change data:

  • AUTO CDC: Use this when the source database has a CDC feed enabled. AUTO CDC processes changes from a change data feed (CDF). It is supported in both the pipeline SQL and Python interfaces.
  • AUTO CDC FROM SNAPSHOT: Use this when CDC is not enabled on the source database and only snapshots are available. This API compares snapshots to determine changes and then processes them. It is supported only in the Python interface.

Both APIs support updating tables using SCD Type 1 and Type 2:

  • Use SCD Type 1 to update records directly. History is not retained for updated records.
  • Use SCD Type 2 to retain a history of records, either on all updates or on updates to a specified set of columns.

The AUTO CDC APIs are not supported by Apache Spark Declarative Pipelines.

For syntax and other references, see AUTO CDC INTO (pipelines), create_auto_cdc_flow, and create_auto_cdc_from_snapshot_flow.

note

This page describes how to update tables in your pipelines based on changes in source data. To learn how to record and query row-level change information for Delta tables, see Use Delta Lake change data feed on Databricks.

Requirements

To use the CDC APIs, your pipeline must be configured to use the SDP Pro or Advanced editions.

How AUTO CDC works

To perform CDC processing with AUTO CDC, create a streaming table and then use the AUTO CDC ... INTO statement in SQL or the create_auto_cdc_flow() function in Python to specify the source, keys, and sequencing for the change feed. For an explanation of how sequencing and SCD logic work, see Change data capture and snapshots. See the AUTO CDC examples.

For initial hydration from a source with a change feed, use AUTO CDC with a once flow and then continue processing the change feed. See Replicate an external RDBMS table using AUTO CDC.

For syntax details, see AUTO CDC INTO (pipelines) or create_auto_cdc_flow.

How AUTO CDC FROM SNAPSHOT works

AUTO CDC FROM SNAPSHOT determines changes in source data by comparing in-order snapshots. It is supported only in the Python pipeline interface. You can read snapshots from a Delta table, cloud storage files, or JDBC directly.

To perform CDC processing with AUTO CDC FROM SNAPSHOT, create a streaming table and then use the create_auto_cdc_from_snapshot_flow() function to specify the snapshot, keys, and other arguments. For details on the two ingestion patterns and when to use each, see Snapshot processing patterns. See the AUTO CDC FROM SNAPSHOT examples.

For syntax details, see create_auto_cdc_from_snapshot_flow.

Use multiple columns for sequencing

To sequence by multiple columns (for example, a timestamp and an ID to break ties), use a STRUCT to combine them. The API orders by the first field first, and in the event of a tie, considers the second field, and so on.

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

AUTO CDC examples

The following examples demonstrate SCD Type 1 and Type 2 processing using a change data feed source. The sample data creates new user records, deletes a user record, and updates user records. In the SCD Type 1 example, the last UPDATE operations arrive late and are dropped from the target table, demonstrating out-of-order event handling.

The following are the input records used in these examples. This data is created by running the query in the Create sample data section.

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

If you uncomment the final row in the sample data generation query, it inserts the following record that specifies to truncate the table (clear the table) at sequenceNum=3:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

note

All the following examples include options to specify both DELETE and TRUNCATE operations, but each is optional.

Create sample data

Run the following statements to create a sample dataset. This code is not intended to be run as part of a pipeline definition. Run it from the exploration folder of your pipeline, rather than the transformations folder.

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

Process SCD Type 1 updates

SCD Type 1 keeps only the latest version of each record. The following example reads from the change data feed created above and applies changes to a streaming table target. Develop Lakeflow Spark Declarative Pipelines to run this code.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

After running the SCD Type 1 example, the target table contains the following records:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

User 123 (Isabel) was deleted and does not appear. User 125 (Mercedes) shows only the latest city (Guadalajara) because SCD Type 1 overwrites previous values. The earlier UPDATE at sequenceNum=5 was dropped because a later update at sequenceNum=6 arrived.

After running the example with the TRUNCATE record uncommented, the table is cleared at sequenceNum=3. This means that records 124 and 126 are not in the table, and the final target table contains only the following record:

userId

name

city

125

Mercedes

Guadalajara

Process SCD Type 2 updates

SCD Type 2 preserves a complete history of changes by creating new rows for each version of a record, with __START_AT and __END_AT columns indicating when each version was active.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

After running the SCD Type 2 example, the target table contains the following records:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

The table preserves complete history. User 123 has two versions (ended at sequence 6 when deleted). User 125 has three versions showing city changes. Records with __END_AT = null are currently active.

Track a column subset with SCD Type 2

By default, SCD Type 2 creates a new version whenever any column value changes. You can specify a subset of columns to track, so that changes to other columns update the current version in place rather than generating a new history record.

The following example excludes the city column from history tracking:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

Because city changes are not tracked, city updates overwrite the current row instead of creating a new version. The target table contains the following records:

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

AUTO CDC FROM SNAPSHOT examples

The following sections provide examples of using AUTO CDC FROM SNAPSHOT to process snapshots into SCD Type 1 or Type 2 target tables. For background on when to use this API, see Change data capture and snapshots.

Example: Process snapshots using pipeline ingestion time

Use this approach when snapshots arrive regularly and in order and you can rely on the pipeline run timestamp for versioning. A new snapshot is ingested with each pipeline update.

You can read snapshots from multiple source types, including Delta tables, cloud storage files, and JDBC connections.

Step 1: Create sample data

Create a table containing snapshot data. Run the following code from a notebook or Databricks SQL in the explorations folder of your pipeline:

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');

Step 2: Run AUTO CDC FROM SNAPSHOT

Develop Lakeflow Spark Declarative Pipelines to run the code in this step.

Choose a source type for the snapshot view (the sample creation code generates a Delta table):

Option A: Read from a Delta table

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")

Option B: Read from cloud storage

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")

Option C: Read from JDBC (classic compute only)

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)

All options, write to the target

Then add the target table and flow:

Python
dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)

After the first pipeline run, all records are inserted as active rows:

userId

city

__START_AT

__END_AT

1

Oaxaca

0

null

2

Monterrey

0

null

3

Tijuana

0

null

note

To use SCD Type 1 instead and keep only the current state, set stored_as_scd_type=1. In this case, the target table does not include __START_AT and __END_AT columns.

Step 3: Simulate a new snapshot and rerun

Update the source table to simulate a new snapshot arriving (run this code from a notebook or SQL file in the explorations folder of your pipline):

SQL
TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');

Run the pipeline again. AUTO CDC FROM SNAPSHOT compares the new snapshot to the previous one and detects that user 1 was deleted, users 2 and 3 were updated, and users 4 and 6 were inserted. This generates a change feed, and uses AUTO CDC to create the output table.

After the second run with SCD Type 2, the target table contains the following records:

userId

city

__START_AT

__END_AT

1

Oaxaca

0

1

2

Monterrey

0

1

2

Carmel

1

null

3

Tijuana

0

1

3

Los Angeles

1

null

4

Death Valley

1

null

6

Kings Canyon

1

null

User 1 was ended (deleted). Users 2 and 3 each have two versions showing their city changes. Users 4 and 6 were newly inserted.

After the second run with SCD Type 1, the target table shows only the current state:

userId

city

2

Carmel

3

Los Angeles

4

Death Valley

6

Kings Canyon

Example: Process snapshots using version functions

Use this approach when you need explicit control over snapshot ordering. For example, use this approach when multiple snapshots arrive at the same time, or snapshots arrive out of order. You write a function that specifies which snapshot to process next and its version number. The API processes snapshots in ascending version order:

  • If multiple snapshots are in storage, they are all processed in order.
  • If a snapshot arrives out of order (for example, snapshot_3 arrives after snapshot_4), it is skipped.
  • If there are no new snapshots, the function returns None and no processing occurs.

Step 1: Prepare snapshot files

Create CSV files containing snapshot data and add them to a volume or cloud storage location. Name the files chronologically (for example, snapshot_1.csv, snapshot_2.csv).

Each file should contain columns for userId and city. For example:

snapshot_1.csv:

userId

city

1

Oaxaca

2

Monterrey

3

Tijuana

snapshot_2.csv:

userId

city

2

Carmel

3

Los Angeles

4

Death Valley

Step 2: Run AUTO CDC FROM SNAPSHOT with a version function

Create a new notebook and paste the following pipeline code. Then Develop Lakeflow Spark Declarative Pipelines.

Python
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue

snapshot_versions.sort()

if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None

snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
note

To use SCD Type 1 instead, set stored_as_scd_type=1.

After processing snapshot_1.csv, the target table contains the following records:

userId

city

__START_AT

__END_AT

1

Oaxaca

1

null

2

Monterrey

1

null

3

Tijuana

1

null

After processing snapshot_2.csv, the target table contains the following records:

userId

city

__START_AT

__END_AT

1

Oaxaca

1

2

2

Monterrey

1

2

2

Carmel

2

null

3

Tijuana

1

2

3

Los Angeles

2

null

4

Death Valley

2

null

note

Remember that, for SCD Type 1, the table looks exactly like the most recent snapshot. The difference is that downstream queries can use the change feed to only process changed records.

Step 3: Add new snapshots

Add a new CSV file to the storage location with modified data (for example, changed city values, new rows, or removed rows). Then run the pipeline again to process the new snapshot.

Limitations

  • The sequencing column must be a sortable data type. NULL sequencing values are not supported.
  • AUTO CDC FROM SNAPSHOT is supported only in the Python pipeline interface; the SQL interface is not supported.

Additional resources