Skip to main content

create_auto_cdc_from_snapshot_flow

Preview

This functionality is in Public Preview.

The create_auto_cdc_from_snapshot_flow function creates a flow that uses Lakeflow Declarative Pipelines change data capture (CDC) functionality to process source data from database snapshots. See How is CDC implemented with the AUTO CDC FROM SNAPSHOT API?.

note

This function replaces the previous function apply_changes_from_snapshot(). The two functions have the same signature. Databricks recommends updating to use the new name.

important

You must have a target streaming table for this operation. You can optionally specify the columns and their types for your target table. When specifying the columns and their types for the create_auto_cdc_from_snapshot_flow() target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

To create the required target table, you can use the create_streaming_table() function.

Syntax

Python
import dlt

dlt.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
note

For AUTO CDC FROM SNAPSHOT processing, the default behavior is to insert a new row when a matching record with the same key(s) does not exist in the target. If a matching record does exist, it is updated only if any of the values in the row have changed. Rows with keys present in the target but no longer present in the source are deleted.

To learn more about CDC processing with snapshots, see The AUTO CDC APIs: Simplify change data capture with Lakeflow Declarative Pipelines. For examples of using the create_auto_cdc_from_snapshot_flow() function, see the periodic snapshot ingestion and historical snapshot ingestion examples.

Parameters

Parameter

Type

Description

target

str

Required. The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the create_auto_cdc_from_snapshot_flow() function.

source

str or lambda function

Required. Either the name of a table or view to snapshot periodically or a Python lambda function that returns the snapshot DataFrame to be processed and the snapshot version. See Implement the source argument.

keys

list

Required. The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:

  • A list of strings: ["userId", "orderId"]

  • A list of Spark SQL col() functions: [col("userId"), col("orderId"].

    Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

stored_as_scd_type

str or int

Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2. The default is SCD type 1.

track_history_column_list or track_history_except_column_list

list

A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions:

  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId). The default is to include all columns in the target table when no track_history_column_list or track_history_except_column_list argument is passed to the function.

Implement the source argument

The create_auto_cdc_from_snapshot_flow() function includes the source argument. For processing historical snapshots, the source argument is expected to be a Python lambda function that returns two values to the create_auto_cdc_from_snapshot_flow() function: a Python DataFrame containing the snapshot data to be processed and a snapshot version.

The following is the signature of the lambda function:

Python
lambda Any => Optional[(DataFrame, Any)]
  • The argument to the lambda function is the most recently processed snapshot version.
  • The return value of the lambda function is None or a tuple of two values: The first value of the tuple is a DataFrame containing the snapshot to be processed. The second value of the tuple is the snapshot version that represents the logical order of the snapshot.

An example that implements and calls the lambda function:

Python
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None

create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)

The Lakeflow Declarative Pipelines runtime performs the following steps each time the pipeline that contains the create_auto_cdc_from_snapshot_flow() function is triggered:

  1. Runs the next_snapshot_and_version function to load the next snapshot DataFrame and the corresponding snapshot version.
  2. If no DataFrame returns, the run is terminated and the pipeline update is marked as complete.
  3. Detects the changes in the new snapshot and incrementally applies them to the target table.
  4. Returns to step #1 to load the next snapshot and its version.