APPLY CHANGES INTO (DLT)
Use the APPLY CHANGES INTO
statement to use DLT change data capture (CDC) functionality. This statement creates a flow to read changes from a CDC source and apply them to a streaming target.
- To learn about CDC, see What is change data capture (CDC)?.
- For more details about using 'APPLY CHANGES` with CDC, see The APPLY CHANGES APIs: Simplify change data capture with DLT.
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
You define data quality constraints for an APPLY CHANGES
target using the same CONSTRAINT
clause as non-APPLY CHANGES
queries. See Manage data quality with pipeline expectations.
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the APPLY AS DELETE WHEN
condition.
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. For SCD type 2 tables, when specifying the schema of the target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
See The APPLY CHANGES APIs: Simplify change data capture with DLT.
Parameters
-
source
The source for the data. The source must be a streaming source. Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and the
SkipChangeCommits
option to handle errors.For more information on streaming data, see Transform data with pipelines.
-
KEYS
The column or combination of columns that uniquely identify a row in the source data. The values in these columns are used to identify which CDC events apply to specific records in the target table.
To define a combination of columns, use a comma-separated list of columns.
This clause ia required.
-
IGNORE NULL UPDATES
Allows ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a
null
value will retain their existing values in the target. This also applies to nested columns with anull
value.This clause is optional.
The default is to overwrite existing columns with
null
values. -
APPLY AS DELETE WHEN
Specifies when a CDC event should be treated as a
DELETE
rather than an upsert.For SCD type 2 sources, to handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the
pipelines.cdc.tombstoneGCThresholdInSeconds
table property.This clause is optional.
-
APPLY AS TRUNCATE WHEN
Specifies when a CDC event should be treated as a full table
TRUNCATE
. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.The
APPLY AS TRUNCATE WHEN
clause is supported only for SCD type 1. SCD type 2 does not support the truncate operation.This clause is optional.
-
SEQUENCE BY
The column name specifying the logical order of CDC events in the source data. DLT uses this sequencing to handle change events that arrive out of order.
If multiple columns are needed for sequencing, use a
STRUCT
expression: it will order by the first struct field first, then by the second field if there's a tie, and so on.Specified columns must be sortable data types.
This clause is required.
-
COLUMNS
Specifies a subset of columns to include in the target table. You can either:
- Specify the complete list of columns to include:
COLUMNS (userId, name, city)
. - Specify a list of columns to exclude:
COLUMNS * EXCEPT (operation, sequenceNum)
This clause is optional.
The default is to include all columns in the target table when the
COLUMNS
clause is not specified. - Specify the complete list of columns to include:
-
STORED AS
Whether to store records as SCD type 1 or SCD type 2.
This clause is optional.
The default is SCD type 1.
-
TRACK HISTORY ON
Specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either:
- Specify the complete list of columns to track:
COLUMNS (userId, name, city)
. - Specify a list of columns to be excluded from tracking:
COLUMNS * EXCEPT (operation, sequenceNum)
This clause is optional. The default is to track history for all the output columns when there are any changes, equivalent to
TRACK HISTORY ON *
. - Specify the complete list of columns to track:
Examples
-- Create a streaming table, then use apply changes into to populate it:
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city)