apply_changes_from_snapshot
This functionality is in Public Preview.
The apply_changes_from_snapshot
function uses DLT change data capture (CDC) functionality to process source data from database snapshots. See How is CDC implemented with the APPLY CHANGES FROM SNAPSHOT
API?.
You must have a target streaming table for this operation. You can optionally specify the columns and their types for your target table. When specifying the columns and their types for the apply_changes_from_snapshot()
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
To create the required target table, you can use the create_streaming_table() function.
Syntax
import dlt
dlt.apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
For APPLY CHANGES FROM SNAPSHOT
processing, the default behavior is to insert a new row when a matching record with the same key(s) does not exist in the target. If a matching record does exist, it is updated only if any of the values in the row have changed. Rows with keys present in the target but no longer present in the source are deleted.
To learn more about CDC processing with snapshots, see The APPLY CHANGES APIs: Simplify change data capture with DLT. For examples of using the apply_changes_from_snapshot()
function, see the periodic snapshot ingestion and historical snapshot ingestion examples.
Parameters
Parameter | Type | Description |
---|---|---|
|
| Required. The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the |
|
| Required. Either the name of a table or view to snapshot periodically or a Python lambda function that returns the snapshot DataFrame to be processed and the snapshot version. See Implement the |
|
| Required. The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:
|
|
| Whether to store records as SCD type 1 or SCD type 2. Set to |
|
| A subset of output columns to be tracked for history in the target table. Use
Arguments to |
Implement the source
argument
The apply_changes_from_snapshot()
function includes the source
argument. For processing historical snapshots, the source
argument is expected to be a Python lambda function that returns two values to the apply_changes_from_snapshot()
function: a Python DataFrame containing the snapshot data to be processed and a snapshot version.
The following is the signature of the lambda function:
lambda Any => Optional[(DataFrame, Any)]
- The argument to the lambda function is the most recently processed snapshot version.
- The return value of the lambda function is
None
or a tuple of two values: The first value of the tuple is a DataFrame containing the snapshot to be processed. The second value of the tuple is the snapshot version that represents the logical order of the snapshot.
An example that implements and calls the lambda function:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
The DLT runtime performs the following steps each time the pipeline that contains the apply_changes_from_snapshot()
function is triggered:
- Runs the
next_snapshot_and_version
function to load the next snapshot DataFrame and the corresponding snapshot version. - If no DataFrame returns, the run is terminated and the pipeline update is marked as complete.
- Detects the changes in the new snapshot and incrementally applies them to the target table.
- Returns to step #1 to load the next snapshot and its version.