Skip to main content

APPLY CHANGES INTO (DLT)

Use the APPLY CHANGES INTO statement to use DLT change data capture (CDC) functionality. This statement creates a flow to read changes from a CDC source and apply them to a streaming target.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

You define data quality constraints for an APPLY CHANGES target using the same CONSTRAINT clause as non-APPLY CHANGES queries. See Manage data quality with pipeline expectations.

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

important

You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. For SCD type 2 tables, when specifying the schema of the target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

See The APPLY CHANGES APIs: Simplify change data capture with DLT.

Parameters

  • source

    The source for the data. The source must be a streaming source. Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and the SkipChangeCommits option to handle errors.

    For more information on streaming data, see Transform data with pipelines.

  • KEYS

    The column or combination of columns that uniquely identify a row in the source data. The values in these columns are used to identify which CDC events apply to specific records in the target table.

    To define a combination of columns, use a comma-separated list of columns.

    This clause ia required.

  • IGNORE NULL UPDATES

    Allows ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null value will retain their existing values in the target. This also applies to nested columns with a null value.

    This clause is optional.

    The default is to overwrite existing columns with null values.

  • APPLY AS DELETE WHEN

    Specifies when a CDC event should be treated as a DELETE rather than an upsert.

    For SCD type 2 sources, to handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.

    This clause is optional.

  • APPLY AS TRUNCATE WHEN

    Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

    The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support the truncate operation.

    This clause is optional.

  • SEQUENCE BY

    The column name specifying the logical order of CDC events in the source data. DLT uses this sequencing to handle change events that arrive out of order.

    If multiple columns are needed for sequencing, use a STRUCT expression: it will order by the first struct field first, then by the second field if there's a tie, and so on.

    Specified columns must be sortable data types.

    This clause is required.

  • COLUMNS

    Specifies a subset of columns to include in the target table. You can either:

    • Specify the complete list of columns to include: COLUMNS (userId, name, city).
    • Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum)

    This clause is optional.

    The default is to include all columns in the target table when the COLUMNS clause is not specified.

  • STORED AS

    Whether to store records as SCD type 1 or SCD type 2.

    This clause is optional.

    The default is SCD type 1.

  • TRACK HISTORY ON

    Specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either:

    • Specify the complete list of columns to track: COLUMNS (userId, name, city).
    • Specify a list of columns to be excluded from tracking: COLUMNS * EXCEPT (operation, sequenceNum)

    This clause is optional. The default is to track history for all the output columns when there are any changes, equivalent to TRACK HISTORY ON *.

Examples

SQL
-- Create a streaming table, then use apply changes into to populate it:
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city)