Change data capture with Delta Live Tables

Preview

Delta Live Tables support for SCD type 2 is in Public Preview.

You can use change data capture (CDC) in Delta Live Tables to update tables based on changes in source data. CDC is supported in the Delta Live Tables SQL and Python interfaces. Delta Live Tables supports updating tables with slowly changing dimensions (SCD) type 1 and type 2:

For syntax and other references, see:

Note

This article describes how to update tables in your Delta Live Tables pipeline based on changes in source data. To learn how to record and query row-level change information for Delta tables, see Use Delta Lake change data feed on Databricks.

How is CDC implemented with Delta Live Tables?

You must specify a column in the source data on which to sequence records, which Delta Live Tables interprets as a monotonically increasing representation of the proper ordering of the source data. Delta Live Tables automatically handles data that arrives out of order. For SCD Type 2 changes, Delta Live Tables propagates the appropriate sequencing values to the __START_AT and __END_AT columns of the target table. There should be at most one distinct update per key at each sequencing value, and NULL sequencing values are unsupported.

To perform CDC processing with Delta Live Tables, you first create a streaming table, and then use an APPLY CHANGES INTO statement to specify the source, keys, and sequencing for the change feed. To create the target streaming table, use the CREATE OR REFRESH STREAMING TABLE statement in SQL or the create_streaming_table() function in Python. To create the statement defining the CDC processing, use the APPLY CHANGES statement in SQL or the apply_changes() function in Python. For syntax details, see Change data capture with SQL in Delta Live Tables or Change data capture with Python in Delta Live Tables.

What data objects are used for Delta Live Tables CDC processing?

When you declare the target table, two data structures are created in the Hive metastore:

  • A view using the name assigned to the target table.

  • An internal backing table used by Delta Live Tables table to manage CDC processing. This table is named by prepending __apply_changes_storage_ to the target table name.

For example, if you declare a target table named dlt_cdc_target, you will see a view named dlt_cdc_target and a table named __apply_changes_storage_dlt_cdc_target in the metastore. Creating a view allows Delta Live Tables to filter out the extra information (for example, tombstones and versions) that is required to handle out-of-order data. To view the processed data, query the target view. You can also query the raw data in the __apply_changes_storage_ table to see deleted records and extra version columns. If you add data manually to the table, the records are assumed to come before other changes because the version columns are missing.

Limitations

  • Metrics for the target table, such as number of output rows, are not available.

  • SCD type 2 updates will add a history row for every input row, even if no columns have changed.

  • The target of the APPLY CHANGES INTO query or apply_changes function cannot be used as a source for a streaming table. A table that reads from the target of an APPLY CHANGES INTO query or apply_changes function must be a live table.

  • Expectations are not supported in an APPLY CHANGES INTO query or apply_changes() function. To use expectations for the source or target dataset:

    • Add expectations on source data by defining an intermediate table with the required expectations and use this dataset as the source for the target table.

    • Add expectations on target data with a downstream table that reads input data from the target table.

SCD type 1 and SCD type 2 on Databricks

The following sections provide examples that demonstrate Delta Live Tables SCD type 1 and type 2 queries that update target tables based on source events that:

  1. Create new user records.

  2. Delete a user record.

  3. Update user records. In the SCD type 1 example, the last UPDATE operations arrive late and are dropped from the target table, demonstrating the handling of out of order events.

All of the following examples assume familiarity with configuring and updating Delta Live Tables pipelines. See Tutorial: Run your first Delta Live Tables pipeline.

In order to run these examples, you must begin by creating a sample dataset. See Generate test data.

The following are the input records for these examples:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

If you uncomment the final row in the example data, it will insert the following record that specifies where records should be truncated:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

Note

All the following examples include options to specify both DELETE and TRUNCATE operations, but each of these are optional.

Process SCD type 1 updates

The following code example demonstrates processing SCD type 1 updates:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

After running the SCD type 1 example, the target table contains the following records:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

After running the SCD type 1 example with the additional TRUNCATE record, records 124 and 126 are truncated because of the TRUNCATE operation at sequenceNum=3, and the target table contains the following record:

userId

name

city

125

Mercedes

Guadalajara

Process SCD type 2 updates

The following code example demonstrates processing SCD type 2 updates:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

After running the SCD type 2 example, the target table contains the following records:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

Track history for only specified columns with SCD type 2

SCD type 2 supports specifying a subset of output columns to generate history on those columns only; changes to other columns are updated in-place rather than generating new history records.

To use track history in Delta Live Tables SCD type 2, you must explicitly enable the feature in your pipeline by adding the following configuration to your Delta Live Tables pipeline settings:

{
  "configuration": {
    "pipelines.enableTrackHistory": "true"
  }
}

If pipelines.enableTrackHistory is not set or set to false, SCD type 2 queries use the default behavior of generating a history record for every input row.

The following example demonstrates using track history with SCD type 2:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;
TRACK HISTORY ON * EXCEPT
  (city)

After running the SCD type 2 with track history example with the additional TRUNCATE record, the target table contains the following records:

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

Generate test data

The code below is provided to generate an example dataset for use in the example queries present in this tutorial. Assuming that you have the proper credentials to create a new schema and create a new table, you can execute these statements with either a notebook or Databricks SQL. The following code is not intended to be run as part of a Delta Live Tables pipeline:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);