Change data capture with Delta Live Tables

Note

This article describes how to update tables in your Delta Live Tables pipeline based on changes in source data. To learn how to record and query row-level change information for Delta tables, see Change data feed.

Preview

Delta Live Tables support for SCD type 2 is in Public Preview.

You can use change data capture (CDC) in Delta Live Tables to update tables based on changes in source data. CDC is supported in the Delta Live Tables SQL and Python interfaces. Delta Live Tables supports updating tables with slowly changing dimensions (SCD) type 1 and type 2:

  • Use SCD type 1 to update records directly. History is not retained for records that are updated.

  • Use SCD type 2 to retain the history of all updates to records.

To represent the effective period of a change, SCD Type 2 stores every change with the generated __START_AT and __END_AT columns. Delta Live Tables uses the column specified by SEQUENCE BY in SQL or sequence_by in Python to generate the __START_AT and __END_AT columns.

Note

The data type of the __START_AT and __END_AT columns is the same as the data type of the specified SEQUENCE BY field.

SQL

Use the APPLY CHANGES INTO statement to use Delta Live Tables CDC functionality:

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]

Clauses

KEYS

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

This clause is required.

WHERE

A condition applied to both source and target to trigger optimizations such as partition pruning. This condition cannot be used to drop source rows; all CDC rows in the source must satisfy this condition or an error is thrown. Using the WHERE clause is optional and should be used when your processing requires specific optimizations.

This clause is optional.

IGNORE NULL UPDATES

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null.

This clause is optional.

The default is to overwrite existing columns with null values.

APPLY AS DELETE WHEN

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.

This clause is optional.

APPLY AS TRUNCATE WHEN

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support truncate.

This clause is optional.

SEQUENCE BY

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

This clause is required.

COLUMNS

Specifies a subset of columns to include in the target table. You can either:

  • Specify the complete list of columns to include: COLUMNS (userId, name, city).

  • Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional.

The default is to include all columns in the target table when the COLUMNS clause is not specified.

STORED AS

Whether to store records as SCD type 1 or SCD type 2.

This clause is optional.

The default is SCD type 1.

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

Python

Use the apply_changes() function in the Python API to use Delta Live Tables CDC functionality. The Delta Live Tables Python CDC interface also provides the create_streaming_live_table() function. You can use this function to create the target table required by the apply_changes() function. See the example queries.

Apply changes function

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>
)

Arguments

target

Type: str

The name of the table to be updated. You can use the create_streaming_live_table() function to create the target table before executing the apply_changes() function.

This parameter is required.

source

Type: str

The data source containing CDC records.

This parameter is required.

keys

Type: list

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

You can specify either:

  • A list of strings: ["userId", "orderId"]

  • A list of Spark SQL col() functions: [col("userId"), col("orderId"]

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is required.

sequence_by

Type: str or col()

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

You can specify either:

  • A string: "sequenceNum"

  • A Spark SQL col() function: col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is required.

ignore_null_updates

Type: bool

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values will be overwritten with null values.

This parameter is optional.

The default is False.

apply_as_deletes

Type: str or expr()

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.

You can specify either:

  • A string: "Operation = 'DELETE'"

  • A Spark SQL expr() function: expr("Operation = 'DELETE'")

This parameter is optional.

apply_as_truncates

Type: str or expr()

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

The apply_as_truncates parameter is supported only for SCD type 1. SCD type 2 does not support truncate.

You can specify either:

  • A string: "Operation = 'TRUNCATE'"

  • A Spark SQL expr() function: expr("Operation = 'TRUNCATE'")

This parameter is optional.

column_list except_column_list

Type: list

A subset of columns to include in the target table. Use column_list to specify the complete list of columns to include. Use except_column_list to specify the columns to exclude. You can declare either value as a list of strings or as Spark SQL col() functions:

  • column_list = ["userId", "name", "city"].

  • column_list = [col("userId"), col("name"), col("city")]

  • except_column_list = ["operation", "sequenceNum"]

  • except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is optional.

The default is to include all columns in the target table when no column_list or except_column_list argument is passed to the function.

stored_as_scd_type

Type: str or int

Whether to store records as SCD type 1 or SCD type 2.

Set to 1 for SCD type 1 or 2 for SCD type 2.

This clause is optional.

The default is SCD type 1.

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the apply_as_deletes argument.

Create a target table for output records

Use the create_streaming_live_table() function to create a target table for the apply_changes() output records.

Note

The create_target_table() function is deprecated. Databricks recommends updating existing code to use the create_streaming_live_table() function.

create_streaming_live_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition"
)

Arguments

name

Type: str

The table name.

This parameter is required.

comment

Type: str

An optional description for the table.

spark_conf

Type: dict

An optional list of Spark configurations for the execution of this query.

table_properties

Type: dict

An optional list of table properties for the table.

partition_cols

Type: array

An optional list of one or more columns to use for partitioning the table.

path

Type: str

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

schema

Type: str or StructType

An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python StructType.

When specifying the schema of the apply_changes target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field. For example, if your target table has the columns key, STRING, value, STRING, and sequencing, LONG:

create_streaming_live_table(
  name = "target",
  comment = "Target for CDC ingestion.",
  partition_cols=["value"],
  path="$tablePath",
  schema=
    StructType(
      [
        StructField('key', StringType()),
        StructField('value', StringType()),
        StructField('sequencing', LongType()),
        StructField('__START_AT', LongType()),
        StructField('__END_AT', LongType())
      ]
    )
)

Note

  • You must ensure that a target table is created before you execute the APPLY CHANGES INTO query or apply_changes function. See the example queries.

  • Metrics for the target table, such as number of output rows, are not available.

  • SCD type 2 updates will add a history row for every input row, even if no columns have changed.

  • The target of the APPLY CHANGES INTO query or apply_changes function cannot be used as a source for a streaming live table. A table that reads from the target of an APPLY CHANGES INTO query or apply_changes function must be a live table.

  • Expectations are not supported in an APPLY CHANGES INTO query or apply_changes() function. To use expectations for the source or target dataset:

    • Add expectations on source data by defining an intermediate table with the required expectations and use this dataset as the source for the target table.

    • Add expectations on target data with a downstream table that reads input data from the target table.

Table properties

The following table properties are added to control the behavior of tombstone management for DELETE events:

Table properties

pipelines.cdc.tombstoneGCThresholdInSeconds

Set this value to match the highest expected interval between out-of-order data.

pipelines.cdc.tombstoneGCFrequencyInSeconds

Controls how frequently tombstones are checked for cleanup.

Default value: 5 minutes

Examples

These examples demonstrate Delta Live Tables SCD type 1 and type 2 queries that update target tables based on source events that:

  1. Create new user records.

  2. Delete a user record.

  3. Update user records. In the SCD type 1 example, the last UPDATE operations arrive late and are dropped from the target table, demonstrating the handling of out of order events.

The following are the input records for these examples:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

After running the SCD type 1 example, the target table contains the following records:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

The following input records include an additional record with the TRUNCATE operation and can be used with the SCD type 1 example code:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

null

null

null

TRUNCATE

3

After running the SCD type 1 example with the additional TRUNCATE record, records 124 and 126 are truncated because of the TRUNCATE operation at sequenceNum=3, and the target table contains the following record:

userId

name

city

125

Mercedes

Guadalajara

After running the SCD type 2 example, the target table contains the following records:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

Generate test data

To create the test records for this example:

  1. Go to your Databricks landing page and select Create a notebook or click Create Icon Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.

  2. In the Create Notebook dialog, give your notebook a name; for example, Generate test CDC records. Select SQL from the Default Language drop-down menu.

  3. If there are running clusters, the Cluster drop-down displays. Select the cluster you want to attach the notebook to. You can also create a new cluster to attach to after you create the notebook.

  4. Click Create.

  5. Copy the following query and paste it into the first cell of the new notebook:

    CREATE SCHEMA IF NOT EXISTS cdc_data;
    
    CREATE TABLE
      cdc_data.users
    AS SELECT
      col1 AS userId,
      col2 AS name,
      col3 AS city,
      col4 AS operation,
      col5 AS sequenceNum
    FROM (
      VALUES
      -- Initial load.
      (124, "Raul",     "Oaxaca",      "INSERT", 1),
      (123, "Isabel",   "Monterrey",   "INSERT", 1),
      -- New users.
      (125, "Mercedes", "Tijuana",     "INSERT", 2),
      (126, "Lily",     "Cancun",      "INSERT", 2),
      -- Isabel is removed from the system and Mercedes moved to Guadalajara.
      (123, null,       null,          "DELETE", 6),
      (125, "Mercedes", "Guadalajara", "UPDATE", 6),
      -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
      (125, "Mercedes", "Mexicali",    "UPDATE", 5),
      (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
      -- Uncomment to test TRUNCATE.
      -- ,(null, null,      null,          "TRUNCATE", 3)
    );
    
  6. To run the notebook and populate the test records, in the cell actions menu Cell actions at the far right, click Run Icon and select Run Cell, or press shift+enter.

Create and run the SCD type 1 example pipeline

  1. Go to your Databricks landing page and select Create a notebook or click Create Icon Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.

  2. In the Create Notebook dialog, give your notebook a name; for example, DLT CDC example. Select Python or SQL from the Default Language drop-down menu based on your preferred language. You can leave Cluster set to the default value. The Delta Live Tables runtime creates a cluster before it runs your pipeline.

  3. Click Create.

  4. Copy the Python or SQL query and paste it into the first cell of the notebook.

  5. Create a new pipeline and add the notebook in the Notebook Libraries field. To publish the output of the pipeline processing, you can optionally enter a database name in the Target field.

  6. Start the pipeline. If you configured the Target value, you can view and validate the results of the query.

Example queries

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_live_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Create and run the SCD type 2 example pipeline

  1. Go to your Databricks landing page and select Create a notebook or click Create Icon Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.

  2. In the Create Notebook dialog, give your notebook a name; for example, DLT CDC example. Select Python or SQL from the Default Language drop-down menu based on your preferred language. You can leave Cluster set to the default value. The Delta Live Tables runtime creates a cluster before it runs your pipeline.

  3. Click Create.

  4. Copy the Python or SQL query and paste it into the first cell of the notebook.

  5. Create a new pipeline and add the notebook in the Notebook Libraries field. To publish the output of the pipeline processing, you can optionally enter a database name in the Target field.

  6. Start the pipeline. If you configured the Target value, you can view and validate the results of the query.

Example queries

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_live_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;