Use Delta Lake change data feed on Databricks

Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.

Important

Change data feed works in tandem with table history to provide change information. Because cloning a Delta table creates a separate history, the change data feed on cloned tables doesn’t match that of the original table.

Incrementally process change data

Databricks recommends using change data feed in combination with Structured Streaming to incrementally process changes from Delta tables. You must use Structured Streaming for Databricks to automatically track versions for your table’s change data feed.

Note

Delta Live Tables provides functionality for easy propagation of change data and storing results as SCD (slowly changing dimension) type 1 or type 2 tables. See APPLY CHANGES API: Simplify change data capture in Delta Live Tables.

To read the change data feed from a table, you must enable change data feed on that table. See Enable change data feed.

Set the option readChangeFeed to true when configuring a stream against a table to read the change data feed, as shown in the following syntax example:

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

By default, the stream returns the latest snapshot of the table when the stream first starts as an INSERT and future changes as change data.

Change data commits as part of the Delta Lake transaction, and becomes available at the same time the new data commits to the table.

You can optionally specify a starting version. See Should I specify a starting version?.

Change data feed also supports batch execution, which requires specifying a starting version. See Read changes in batch queries.

Options like rate limits (maxFilesPerTrigger, maxBytesPerTrigger) and excludeRegex are also supported when reading change data.

Rate limiting can be atomic for versions other than the starting snapshot version. That is, the entire commit version will be rate limited or the entire commit will be returned.

Should I specify a starting version?

You can optionally specify a starting version if you want to ignore changes that happened before a particular version. You can specify a version using a timestamp or the version ID number recorded in the Delta transaction log.

Note

A starting version is required for batch reads, and many batch patterns can benefit from setting an optional ending version.

When you’re configuring Structured Streaming workloads involving change data feed, it’s important to understand how specifying a starting version impacts processing.

Many streaming workloads, especially new data processing pipelines, benefit from the default behavior. With the default behavior, the first batch is processed when the stream first records all existing records in the table as INSERT operations in the change data feed.

If your target table already contains all the records with appropriate changes up to a certain point, specify a starting version to avoid processing the source table state as INSERT events.

The following example syntax recovering from a streaming failure in which the checkpoint was corrupted. In this example, assume the following conditions:

  1. Change data feed was enabled on the source table at table creation.

  2. The target downstream table has processed all changes up to and including version 75.

  3. Version history for the source table is available for versions 70 and above.

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

In this example, you must also specify a new checkpoint location.

Important

If you specify a starting version, the stream fails to start from a new checkpoint if the starting version is no longer present in the table history. Delta Lake cleans up historic versions automatically, meaning that all specified starting versions are eventually deleted.

See Can I use change data feed to replay the entire history of a table?.

Read changes in batch queries

You can use batch query syntax to read all changes starting from a particular version or to read changes within a specified range of versions.

You specify a version as an integer and a timestamps as a string in the format yyyy-MM-dd[ HH:mm:ss[.SSS]].

The start and end versions are inclusive in the queries. To read the changes from a particular start version to the latest version of the table, specify only the starting version.

If you provide a version lower or timestamp older than one that has recorded change events—that is, when the change data feed was enabled—an error is thrown indicating that the change data feed was not enabled.

The following syntax examples demonstrate using starting and ending version options with batch reads:

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")
// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Note

By default, if a user passes in a version or timestamp exceeding the last commit on a table, the error timestampGreaterThanLatestCommit is thrown. In Databricks Runtime 11.3 LTS and above, change data feed can handle the out of range version case if the user sets the following configuration to true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

If you provide a start version greater than the last commit on a table or a start timestamp newer than the last commit on a table, then when the preceding configuration is enabled, an empty read result is returned.

If you provide an end version greater than the last commit on a table or an end timestamp newer than the last commit on a table, then when the preceding configuration is enabled in batch read mode, all changes between the start version and the last commit are be returned.

What is the schema for the change data feed?

When you read from the change data feed for a table, the schema for the latest table version is used.

Note

Most schema change and evolution operations are fully supported. Table with column mapping enabled do not support all use cases and demonstrate different behavior. See Change data feed limitations for tables with column mapping enabled.

In addition to the data columns from the schema of the Delta table, change data feed contains metadata columns that identify the type of change event:

Column name

Type

Values

_change_type

String

insert, update_preimage , update_postimage, delete (1)

_commit_version

Long

The Delta log or table version containing the change.

_commit_timestamp

Timestamp

The timestamp associated when the commit was created.

(1) preimage is the value before the update, postimage is the value after the update.

Note

You cannot enable change data feed on a table if the schema contains columns with the same names as these added columns. Rename columns in the table to resolve this conflict before trying to enable change data feed.

Enable change data feed

You can only read the change data feed for enabled tables. You must explicitly enable the change data feed option using one of the following methods:

  • New table: Set the table property delta.enableChangeDataFeed = true in the CREATE TABLE command.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Existing table: Set the table property delta.enableChangeDataFeed = true in the ALTER TABLE command.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • All new tables:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Important

Only changes made after you enable the change data feed are recorded. Past changes to a table are not captured.

Change data storage

Enabling change data feed causes a small increase in storage costs for a table. Change data records are generated as the query runs, and are generally much smaller than the total size of rewritten files.

Databricks records change data for UPDATE, DELETE, and MERGE operations in the _change_data folder under the table directory. Some operations, such as insert-only operations and full-partition deletions, do not generate data in the _change_data directory because Databricks can efficiently compute the change data feed directly from the transaction log.

All reads against data files in the _change_data folder should go through supported Delta Lake APIs.

The files in the _change_data folder follow the retention policy of the table. Change data feed data is deleted when the VACUUM command runs.

Can I use change data feed to replay the entire history of a table?

Change data feed is not intended to serve as a permanent record of all changes to a table. Change data feed only records changes that occur after it’s enabled.

Change data feed and Delta Lake allow you to always reconstruct a full snapshot of a source table, meaning you can start a new streaming read against a table with change data feed enabled and capture the current version of that table and all changes that occur after.

You must treat records in the change data feed as transient and only accessible for a specified retention window. The Delta transaction log removes table versions and their corresponding change data feed versions at regular intervals. When a version is removed from the transaction log, you can no longer read the change data feed for that version.

If your use case requires maintaining a permanent history of all changes to a table, you should use incremental logic to write records from the change data feed to a new table. The following code example demonstrates using trigger.AvailableNow, which leverages the incremental processing of Structured Streaming but processes available data as a batch workload. You can schedule this workload asynchronously with your main processing pipelines to create a backup of change data feed for auditing purposes or full replayability.

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Change data feed limitations for tables with column mapping enabled

With column mapping enabled on a Delta table, you can drop or rename columns in the table without rewriting data files for existing data. With column mapping enabled, change data feed has limitations after performing non-additive schema changes such as renaming or dropping a column, changing data type, or nullability changes.

Important

  • You cannot read change data feed for a transaction or range in which a non-additive schema change occurs using batch semantics.

  • In Databricks Runtime 12.2 LTS and below, tables with column mapping enabled that have experienced non-additive schema changes do not support streaming reads on change data feed. See Streaming with column mapping and schema changes.

  • In Databricks Runtime 11.3 LTS and below, you cannot read change data feed for tables with column mapping enabled that have experienced column renaming or dropping.

In Databricks Runtime 12.2 LTS and above, you can perform batch reads on change data feed for tables with column mapping enabled that have experienced non-additive schema changes. Instead of using the schema of the latest version of the table, read operations use the schema of the end version of the table specified in the query. Queries still fail if the version range specified spans a non-additive schema change.