APPLY CHANGES API: Simplify change data capture in Delta Live Tables
Delta Live Tables simplifies change data capture (CDC) with the APPLY CHANGES
API. Previously, the MERGE INTO
statement was commonly used for processing CDC records on Databricks. However, MERGE INTO
can produce incorrect results because of out-of-sequence records, or require complex logic to re-order records.
By automatically handling out-of-sequence records, the APPLY CHANGES
API in Delta Live Tables ensures correct processing of CDC records and removes the need to develop complex logic for handling out-of-sequence records.
The APPLY CHANGES
API is supported in the Delta Live Tables SQL and Python interfaces, including support for updating tables with SCD type 1 and type 2:
Use SCD type 1 to update records directly. History is not retained for records that are updated.
Use SCD type 2 to retain a history of records, either on all updates or on updates to a specified set of columns.
For syntax and other references, see:
Note
This article describes how to update tables in your Delta Live Tables pipeline based on changes in source data. To learn how to record and query row-level change information for Delta tables, see Use Delta Lake change data feed on Databricks.
How is CDC implemented with Delta Live Tables?
You must specify a column in the source data on which to sequence records, which Delta Live Tables interprets as a monotonically increasing representation of the proper ordering of the source data. Delta Live Tables automatically handles data that arrives out of order. For SCD Type 2 changes, Delta Live Tables propagates the appropriate sequencing values to the __START_AT
and __END_AT
columns of the target table. There should be one distinct update per key at each sequencing value, and NULL sequencing values are unsupported.
To perform CDC processing with Delta Live Tables, you first create a streaming table, and then use an APPLY CHANGES INTO
statement to specify the source, keys, and sequencing for the change feed. To create the target streaming table, use the CREATE OR REFRESH STREAMING TABLE
statement in SQL or the create_streaming_table()
function in Python. To create the statement defining the CDC processing, use the APPLY CHANGES
statement in SQL or the apply_changes()
function in Python. For syntax details, see Change data capture with SQL in Delta Live Tables or Change data capture with Python in Delta Live Tables.
What data objects are used for Delta Live Tables CDC processing?
When you declare the target table in the Hive metastore, two data structures are created:
A view using the name assigned to the target table.
An internal backing table used by Delta Live Tables to manage CDC processing. This table is named by prepending
__apply_changes_storage_
to the target table name.
For example, if you declare a target table named dlt_cdc_target
, you will see a view named dlt_cdc_target
and a table named __apply_changes_storage_dlt_cdc_target
in the metastore. Creating a view allows Delta Live Tables to filter out the extra information (for example, tombstones and versions) required to handle out-of-order data. To view the processed data, query the target view. Because the schema of the __apply_changes_storage_
table might change to support future features or enhancements, you should not query the table for production use. If you add data manually to the table, the records are assumed to come before other changes because the version columns are missing.
If a pipeline publishes to Unity Catalog, the internal backing tables are not accessible to users.
Get data about records processed by a Delta Live Tables CDC query
The following metrics are captured by apply changes
queries:
num_upserted_rows
: The number of output rows upserted into the dataset during an update.num_deleted_rows
: The number of existing output rows deleted from the dataset during an update.
The num_output_rows
metric, which is output for non-CDC flows, is not captured for apply changes
queries.
Limitations
The target of the APPLY CHANGES INTO
query or apply_changes
function cannot be used as a source for a streaming table. A table that reads from the target of an APPLY CHANGES INTO
query or apply_changes
function must be a materialized view.
SCD type 1 and SCD type 2 on Databricks
The following sections provide examples that demonstrate Delta Live Tables SCD type 1 and type 2 queries that update target tables based on source events that:
Create new user records.
Delete a user record.
Update user records. In the SCD type 1 example, the last
UPDATE
operations arrive late and are dropped from the target table, demonstrating the handling of out-of-order events.
The following examples assume familiarity with configuring and updating Delta Live Tables pipelines. See Tutorial: Run your first Delta Live Tables pipeline.
To run these examples, you must begin by creating a sample dataset. See Generate test data.
The following are the input records for these examples:
userId |
name |
city |
operation |
sequenceNum |
---|---|---|---|---|
124 |
Raul |
Oaxaca |
INSERT |
1 |
123 |
Isabel |
Monterrey |
INSERT |
1 |
125 |
Mercedes |
Tijuana |
INSERT |
2 |
126 |
Lily |
Cancun |
INSERT |
2 |
123 |
null |
null |
DELETE |
6 |
125 |
Mercedes |
Guadalajara |
UPDATE |
6 |
125 |
Mercedes |
Mexicali |
UPDATE |
5 |
123 |
Isabel |
Chihuahua |
UPDATE |
5 |
If you uncomment the final row in the example data, it will insert the following record that specifies where records should be truncated:
userId |
name |
city |
operation |
sequenceNum |
---|---|---|---|---|
null |
null |
null |
TRUNCATE |
3 |
Note
All the following examples include options to specify both DELETE
and TRUNCATE
operations, but each of these are optional.
Process SCD type 1 updates
The following code example demonstrates processing SCD type 1 updates:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
After running the SCD type 1 example, the target table contains the following records:
userId |
name |
city |
---|---|---|
124 |
Raul |
Oaxaca |
125 |
Mercedes |
Guadalajara |
126 |
Lily |
Cancun |
After running the SCD type 1 example with the additional TRUNCATE
record, records 124
and 126
are truncated because of the TRUNCATE
operation at sequenceNum=3
, and the target table contains the following record:
userId |
name |
city |
---|---|---|
125 |
Mercedes |
Guadalajara |
Process SCD type 2 updates
The following code example demonstrates processing SCD type 2 updates:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
After running the SCD type 2 example, the target table contains the following records:
userId |
name |
city |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Monterrey |
1 |
5 |
123 |
Isabel |
Chihuahua |
5 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Tijuana |
2 |
5 |
125 |
Mercedes |
Mexicali |
5 |
6 |
125 |
Mercedes |
Guadalajara |
6 |
null |
126 |
Lily |
Cancun |
2 |
null |
An SCD type 2 query can also specify a subset of output columns to be tracked for history in the target table. Changes to other columns are updated in place rather than generating new history records. The following example demonstrates excluding the city
column from tracking:
The following example demonstrates using track history with SCD type 2:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
After running this example without the additional TRUNCATE
record, the target table contains the following records:
userId |
name |
city |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
Isabel |
Chihuahua |
1 |
6 |
124 |
Raul |
Oaxaca |
1 |
null |
125 |
Mercedes |
Guadalajara |
2 |
null |
126 |
Lily |
Cancun |
2 |
null |
Generate test data
The code below is provided to generate an example dataset for use in the example queries present in this tutorial. Assuming that you have the proper credentials to create a new schema and create a new table, you can execute these statements with either a notebook or Databricks SQL. The following code is not intended to be run as part of a Delta Live Tables pipeline:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Add, change, or delete data in a target streaming table
If your pipeline publishes tables to Unity Catalog, you can use data manipulation language (DML) statements, including insert, update, delete, and merge statements, to modify the target streaming tables created by APPLY CHANGES INTO
statements.
Note
DML statements that modify the table schema of a streaming table are not supported. Ensure that your DML statements do not attempt to evolve the table schema.
DML statements that update a streaming table can be run only in a shared Unity Catalog cluster or a SQL warehouse using Databricks Runtime 13.3 LTS and above.
Because streaming requires append-only data sources, if your processing requires streaming from a source streaming table with changes (for example, by DML statements), set the skipChangeCommits flag when reading the source streaming table. When
skipChangeCommits
is set, transactions that delete or modify records on the source table are ignored. If your processing does not require a streaming table, you can use a materialized view (which does not have the append-only restriction) as the target table.
Because Delta Live Tables uses a specified SEQUENCE BY
column and propagates appropriate sequencing values to the __START_AT
and __END_AT
columns of the target table (for SCD type 2), you must ensure that DML statements use valid values for these columns to maintain the proper ordering of records. See How is CDC implemented with Delta Live Tables?.
For more information about using DML statements with streaming tables, see Add, change, or delete data in a streaming table.
The following example inserts an active record with a start sequence of 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);