Use Delta Lake change data feed on Databricks
Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.
Change data feed works in tandem with table history to provide change information. Because cloning a Delta table creates a separate history, the change data feed on cloned tables doesn't match that of the original table.
Incrementally process change data
Databricks recommends using change data feed in combination with Structured Streaming to incrementally process changes from Delta tables. You must use Structured Streaming for Databricks to automatically track versions for your table's change data feed.
Lakeflow Declarative Pipelines provides functionality for easy propagation of change data and storing results as SCD (slowly changing dimension) type 1 or type 2 tables. See The AUTO CDC APIs: Simplify change data capture with Lakeflow Declarative Pipelines.
To read the change data feed from a table, you must enable change data feed on that table. See Enable change data feed.
Set the option readChangeFeed to true when configuring a stream against a table to read the change data feed, as shown in the following syntax example:
- Python
- Scala
(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)
spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
By default, the stream returns the latest snapshot of the table when the stream first starts as an INSERT and future changes as change data.
Change data commits as part of the Delta Lake transaction, and becomes available at the same time the new data commits to the table.
You can optionally specify a starting version. See Should I specify a starting version?.
Change data feed also supports batch execution, which requires specifying a starting version. See Read changes in batch queries.
Options like rate limits (maxFilesPerTrigger, maxBytesPerTrigger) and excludeRegex are also supported when reading change data.
Rate limiting can be atomic for versions other than the starting snapshot version. That is, the entire commit version will be rate limited or the entire commit will be returned.
Should I specify a starting version?
You can optionally specify a starting version if you want to ignore changes that happened before a particular version. You can specify a version using a timestamp or the version ID number recorded in the Delta transaction log.
A starting version is required for batch reads, and many batch patterns can benefit from setting an optional ending version.
When you're configuring Structured Streaming workloads involving change data feed, it's important to understand how specifying a starting version impacts processing.
Many streaming workloads, especially new data processing pipelines, benefit from the default behavior. With the default behavior, the first batch is processed when the stream first records all existing records in the table as INSERT operations in the change data feed.
If your target table already contains all the records with appropriate changes up to a certain point, specify a starting version to avoid processing the source table state as INSERT events.
The following example syntax recovering from a streaming failure in which the checkpoint was corrupted. In this example, assume the following conditions:
- Change data feed was enabled on the source table at table creation.
- The target downstream table has processed all changes up to and including version 75.
- Version history for the source table is available for versions 70 and above.
- Python
- Scala
(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)
spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
In this example, you must also specify a new checkpoint location.
If you specify a starting version, the stream fails to start from a new checkpoint if the starting version is no longer present in the table history. Delta Lake cleans up historic versions automatically, meaning that all specified starting versions are eventually deleted.
See Can I use change data feed to replay the entire history of a table?.
Read changes in batch queries
You can use batch query syntax to read all changes starting from a particular version or to read changes within a specified range of versions.
You specify a version as an integer and a timestamps as a string in the format yyyy-MM-dd[ HH:mm:ss[.SSS]].
The start and end versions are inclusive in the queries. To read the changes from a particular start version to the latest version of the table, specify only the starting version.
If you provide a version lower or timestamp older than one that has recorded change events—that is, when the change data feed was enabled—an error is thrown indicating that the change data feed was not enabled.
The following syntax examples demonstrate using starting and ending version options with batch reads:
- SQL
- Python
- Scala
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")
// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")
By default, if a user passes in a version or timestamp exceeding the last commit on a table, the error timestampGreaterThanLatestCommit is thrown. In Databricks Runtime 11.3 LTS and above, change data feed can handle the out of range version case if the user sets the following configuration to true:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
If you provide a start version greater than the last commit on a table or a start timestamp newer than the last commit on a table, then when the preceding configuration is enabled, an empty read result is returned.
If you provide an end version greater than the last commit on a table or an end timestamp newer than the last commit on a table, then when the preceding configuration is enabled in batch read mode, all changes between the start version and the last commit are be returned.
What is the schema for the change data feed?
When you read from the change data feed for a table, the schema for the latest table version is used.
Most schema change and evolution operations are fully supported. Table with column mapping enabled do not support all use cases and demonstrate different behavior. See Change data feed limitations for tables with column mapping enabled.
In addition to the data columns from the schema of the Delta table, change data feed contains metadata columns that identify the type of change event:
| Column name | Type | Values | 
|---|---|---|
| 
 | String | 
 | 
| 
 | Long | The Delta log or table version containing the change. | 
| 
 | Timestamp | The timestamp associated when the commit was created. | 
(1) preimage is the value before the update, postimage is the value after the update.
You cannot enable change data feed on a table if the schema contains columns with the same names as these added columns. Rename columns in the table to resolve this conflict before trying to enable change data feed.
Enable change data feed
You can only read the change data feed for enabled tables. You must explicitly enable the change data feed option using one of the following methods:
- 
New table: Set the table property delta.enableChangeDataFeed = truein theCREATE TABLEcommand.SQLCREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
- 
Existing table: Set the table property delta.enableChangeDataFeed = truein theALTER TABLEcommand.SQLALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
- 
All new tables: SQLset spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Only changes made after you enable the change data feed are recorded. Past changes to a table are not captured.
How are data changes recorded?
Delta Lake attempts to record data changes in the most efficient manner and might leverage other Delta Lake features enabled on a table to optimize the way data changes are represented in storage.
Enabling change data feed might cause a small increase in storage costs for a table. This is because data changes might be recorded in separate files. Some operations, such as insert-only operations and full-partition deletions, do not generate change data files because Databricks can efficiently compute the change data feed directly from the transaction log.
Any files that record data changes follow the retention policy of the table. Change data files are deleted when the VACUUM command runs, and changes reconstructed from the transaction log following the checkpoint retention policy.
You should not attempt to reconstruct the change data feed by directly querying files that record data changes. Always use Delta Lake APIs when working with the change data feed.
Can I use change data feed to replay the entire history of a table?
Change data feed is not intended to serve as a permanent record of all changes to a table. Change data feed only records changes that occur after it's enabled.
Change data feed and Delta Lake allow you to always reconstruct a full snapshot of a source table, meaning you can start a new streaming read against a table with change data feed enabled and capture the current version of that table and all changes that occur after.
You must treat records in the change data feed as transient and only accessible for a specified retention window. The Delta transaction log removes table versions and their corresponding change data feed versions at regular intervals. When a version is removed from the transaction log, you can no longer read the change data feed for that version.
If your use case requires maintaining a permanent history of all changes to a table, you should use incremental logic to write records from the change data feed to a new table. The following code example demonstrates using trigger.AvailableNow, which leverages the incremental processing of Structured Streaming but processes available data as a batch workload. You can schedule this workload asynchronously with your main processing pipelines to create a backup of change data feed for auditing purposes or full replayability.
- Python
- Scala
(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")
Change data feed limitations for tables with column mapping enabled
With column mapping enabled on a Delta table, you can drop or rename columns in the table without rewriting data files for existing data. With column mapping enabled, change data feed has limitations after performing non-additive schema changes such as renaming or dropping a column, changing data type, or nullability changes.
- You cannot read change data feed for a transaction or range in which a non-additive schema change occurs using batch semantics.
- In Databricks Runtime 12.2 LTS and below, tables with column mapping enabled that have experienced non-additive schema changes do not support streaming reads on change data feed. See Streaming with column mapping and schema changes.
- In Databricks Runtime 11.3 LTS and below, you cannot read change data feed for tables with column mapping enabled that have experienced column renaming or dropping.
In Databricks Runtime 12.2 LTS and above, you can perform batch reads on change data feed for tables with column mapping enabled that have experienced non-additive schema changes. Instead of using the schema of the latest version of the table, read operations use the schema of the end version of the table specified in the query. Queries still fail if the version range specified spans a non-additive schema change.