Change data capture with Delta Live Tables
Note
This article describes how to update tables in your Delta Live Tables pipeline based on changes in source data. To learn how to record and query row-level change information for Delta tables, see Change data feed.
Preview
Delta Live Tables support for SCD type 2 is in Public Preview.
You can use change data capture (CDC) in Delta Live Tables to update tables based on changes in source data. CDC is supported in the Delta Live Tables SQL and Python interfaces. Delta Live Tables supports updating tables with slowly changing dimensions (SCD) type 1 and type 2:
Use SCD type 1 to update records directly. History is not retained for records that are updated.
Use SCD type 2 to retain the history of all updates to records.
To represent the effective period of a change, SCD Type 2 stores every change with the generated __START_AT
and __END_AT
columns. Delta Live Tables uses the column specified by SEQUENCE BY
in SQL or sequence_by
in Python to generate the __START_AT
and __END_AT
columns.
Note
The data type of the __START_AT
and __END_AT
columns is the same as the data type of the specified SEQUENCE BY
field.
SQL
Use the APPLY CHANGES INTO
statement to use Delta Live Tables CDC functionality:
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
Clauses |
---|
KEYS The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. This clause is required. |
WHERE A condition applied to both source and target to trigger optimizations such as partition pruning. This condition cannot be used to drop source rows; all CDC rows in the source must satisfy this condition or an error is thrown. Using the WHERE clause is optional and should be used when your processing requires specific optimizations. This clause is optional. |
IGNORE NULL UPDATES Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row
and IGNORE NULL UPDATES is specified, columns with a This clause is optional. The default is to overwrite existing columns with |
APPLY AS DELETE WHEN Specifies when a CDC event should be treated as a This clause is optional. |
APPLY AS TRUNCATE WHEN Specifies when a CDC event should be treated as a full table The This clause is optional. |
SEQUENCE BY The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. This clause is required. |
COLUMNS Specifies a subset of columns to include in the target table. You can either:
This clause is optional. The default is to include all columns in the target table when the |
STORED AS Whether to store records as SCD type 1 or SCD type 2. This clause is optional. The default is SCD type 1. |
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the APPLY AS DELETE WHEN
condition.
Python
Use the apply_changes()
function in the Python API to use Delta Live Tables CDC functionality. The Delta Live Tables Python CDC interface also provides the create_streaming_live_table() function. You can use this function to create the target table required by the apply_changes()
function. See the example queries.
Apply changes function
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>
)
Arguments |
---|
target Type: The name of the table to be updated. You can use the create_streaming_live_table()
function to create the target table before executing the This parameter is required. |
source Type: The data source containing CDC records. This parameter is required. |
keys Type: The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:
Arguments to This parameter is required. |
sequence_by Type: The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. You can specify either:
Arguments to This parameter is required. |
ignore_null_updates Type: Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row
and This parameter is optional. The default is |
apply_as_deletes Type: Specifies when a CDC event should be treated as a You can specify either:
This parameter is optional. |
apply_as_truncates Type: Specifies when a CDC event should be treated as a full table The You can specify either:
This parameter is optional. |
column_list except_column_list Type: A subset of columns to include in the target table. Use
Arguments to This parameter is optional. The default is to include all columns in the target table when no |
stored_as_scd_type Type: Whether to store records as SCD type 1 or SCD type 2. Set to This clause is optional. The default is SCD type 1. |
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the apply_as_deletes
argument.
Create a target table for output records
Use the create_streaming_live_table()
function to create a target table for the apply_changes()
output records.
Note
The create_target_table()
function is deprecated. Databricks recommends updating existing code to use the create_streaming_live_table()
function.
create_streaming_live_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition"
)
Arguments |
---|
name Type: The table name. This parameter is required. |
comment Type: An optional description for the table. |
spark_conf Type: An optional list of Spark configurations for the execution of this query. |
table_properties Type: An optional list of table properties for the table. |
partition_cols Type: An optional list of one or more columns to use for partitioning the table. |
path Type: An optional storage location for table data. If not set, the system will default to the pipeline storage location. |
schema Type: An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python
|
When specifying the schema of the apply_changes
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field. For example, if your target table has the columns key, STRING
, value, STRING
, and sequencing, LONG
:
create_streaming_live_table(
name = "target",
comment = "Target for CDC ingestion.",
partition_cols=["value"],
path="$tablePath",
schema=
StructType(
[
StructField('key', StringType()),
StructField('value', StringType()),
StructField('sequencing', LongType()),
StructField('__START_AT', LongType()),
StructField('__END_AT', LongType())
]
)
)
Note
You must ensure that a target table is created before you execute the
APPLY CHANGES INTO
query orapply_changes
function. See the example queries.Metrics for the target table, such as number of output rows, are not available.
SCD type 2 updates will add a history row for every input row, even if no columns have changed.
The target of the
APPLY CHANGES INTO
query orapply_changes
function cannot be used as a source for a streaming live table. A table that reads from the target of anAPPLY CHANGES INTO
query orapply_changes
function must be a live table.Expectations are not supported in an
APPLY CHANGES INTO
query orapply_changes()
function. To use expectations for the source or target dataset:Add expectations on source data by defining an intermediate table with the required expectations and use this dataset as the source for the target table.
Add expectations on target data with a downstream table that reads input data from the target table.
Table properties
The following table properties are added to control the behavior of tombstone management for DELETE
events:
Table properties |
---|
pipelines.cdc.tombstoneGCThresholdInSeconds Set this value to match the highest expected interval between out-of-order data. |
pipelines.cdc.tombstoneGCFrequencyInSeconds Controls how frequently tombstones are checked for cleanup. Default value: 5 minutes |
Examples
These examples demonstrate Delta Live Tables SCD type 1 and type 2 queries that update target tables based on source events that:
Create new user records.
Delete a user record.
Update user records. In the SCD type 1 example, the last
UPDATE
operations arrive late and are dropped from the target table, demonstrating the handling of out of order events.
The following are the input records for these examples:
userId |
name |
city |
operation |
sequenceNum |
---|---|---|---|---|
124 |
Raul |
Oaxaca |
INSERT |
1 |
123 |
Isabel |
Monterrey |
INSERT |
1 |
125 |
Mercedes |
Tijuana |
INSERT |
2 |
126 |
Lily |
Cancun |
INSERT |
2 |
123 |
null |
null |
DELETE |
6 |
125 |
Mercedes |
Guadalajara |
UPDATE |
6 |
125 |
Mercedes |
Mexicali |
UPDATE |
5 |
123 |
Isabel |
Chihuahua |
UPDATE |
5 |
After running the SCD type 1 example, the target table contains the following records:
userId |
name |
city |
---|---|---|
124 |
Raul |
Oaxaca |
125 |
Mercedes |
Guadalajara |
126 |
Lily |
Cancun |
The following input records include an additional record with the TRUNCATE
operation and can be used with the SCD type 1 example code:
userId |
name |
city |
operation |
sequenceNum |
---|---|---|---|---|
124 |
Raul |
Oaxaca |
INSERT |
1 |
123 |
Isabel |
Monterrey |
INSERT |
1 |
125 |
Mercedes |
Tijuana |
INSERT |
2 |
126 |
Lily |
Cancun |
INSERT |
2 |
123 |
null |
null |
DELETE |
6 |
125 |
Mercedes |
Guadalajara |
UPDATE |
6 |
125 |
Mercedes |
Mexicali |
UPDATE |
5 |
123 |
Isabel |
Chihuahua |
UPDATE |
5 |
null |
null |
null |
TRUNCATE |
3 |
After running the SCD type 1 example with the additional TRUNCATE
record, records 124
and 126
are truncated because of the TRUNCATE
operation at sequenceNum=3
, and the target table contains the following record:
userId |
name |
city |
---|---|---|
125 |
Mercedes |
Guadalajara |
After running the SCD type 2 example, the target table contains the following records:
userId |
name |
city |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Monterrey |
1 |
5 |
123 |
Isabel |
Chihuahua |
5 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Tijuana |
2 |
5 |
125 |
Mercedes |
Mexicali |
5 |
6 |
125 |
Mercedes |
Guadalajara |
6 |
null |
126 |
Lily |
Cancun |
2 |
null |
Generate test data
To create the test records for this example:
Go to your Databricks landing page and select Create a notebook or click
Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.
In the Create Notebook dialog, give your notebook a name; for example, Generate test CDC records. Select SQL from the Default Language drop-down menu.
If there are running clusters, the Cluster drop-down displays. Select the cluster you want to attach the notebook to. You can also create a new cluster to attach to after you create the notebook.
Click Create.
Copy the following query and paste it into the first cell of the new notebook:
CREATE SCHEMA IF NOT EXISTS cdc_data; CREATE TABLE cdc_data.users AS SELECT col1 AS userId, col2 AS name, col3 AS city, col4 AS operation, col5 AS sequenceNum FROM ( VALUES -- Initial load. (124, "Raul", "Oaxaca", "INSERT", 1), (123, "Isabel", "Monterrey", "INSERT", 1), -- New users. (125, "Mercedes", "Tijuana", "INSERT", 2), (126, "Lily", "Cancun", "INSERT", 2), -- Isabel is removed from the system and Mercedes moved to Guadalajara. (123, null, null, "DELETE", 6), (125, "Mercedes", "Guadalajara", "UPDATE", 6), -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state. (125, "Mercedes", "Mexicali", "UPDATE", 5), (123, "Isabel", "Chihuahua", "UPDATE", 5) -- Uncomment to test TRUNCATE. -- ,(null, null, null, "TRUNCATE", 3) );
To run the notebook and populate the test records, in the cell actions menu
at the far right, click
and select Run Cell, or press shift+enter.
Create and run the SCD type 1 example pipeline
Go to your Databricks landing page and select Create a notebook or click
Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.
In the Create Notebook dialog, give your notebook a name; for example, DLT CDC example. Select Python or SQL from the Default Language drop-down menu based on your preferred language. You can leave Cluster set to the default value. The Delta Live Tables runtime creates a cluster before it runs your pipeline.
Click Create.
Copy the Python or SQL query and paste it into the first cell of the notebook.
Create a new pipeline and add the notebook in the Notebook Libraries field. To publish the output of the pipeline processing, you can optionally enter a database name in the Target field.
Start the pipeline. If you configured the Target value, you can view and validate the results of the query.
Example queries
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_live_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Create and run the SCD type 2 example pipeline
Go to your Databricks landing page and select Create a notebook or click
Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.
In the Create Notebook dialog, give your notebook a name; for example, DLT CDC example. Select Python or SQL from the Default Language drop-down menu based on your preferred language. You can leave Cluster set to the default value. The Delta Live Tables runtime creates a cluster before it runs your pipeline.
Click Create.
Copy the Python or SQL query and paste it into the first cell of the notebook.
Create a new pipeline and add the notebook in the Notebook Libraries field. To publish the output of the pipeline processing, you can optionally enter a database name in the Target field.
Start the pipeline. If you configured the Target value, you can view and validate the results of the query.
Example queries
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_live_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;