CREATE STREAMING TABLE ... FLOW AUTO CDC
Applies to: Databricks SQL
This feature is in Beta. Requires Databricks Runtime 17.3 and above.
Use the FLOW AUTO CDC clause with CREATE STREAMING TABLE to process change data capture (CDC) records from a source into a streaming table.
Previously, the MERGE INTO statement was commonly used for processing CDC records on Databricks. However, MERGE INTO can produce incorrect results because of out-of-sequence records or requires complex logic to re-order records.
AUTO CDC simplifies CDC by automatically handling out-of-order records. You specify keys to identify records, a sequence column for ordering, and whether to store results as SCD type 1 (direct updates) or SCD type 2 (history tracking).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified keys or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.
Parameters
-
sourceThe source for the data. The source must be a streaming source. Use the
STREAMkeyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources.For more information on streaming data, see Transform data with pipelines.
-
KEYSThe column or combination of columns that uniquely identify a row in the source data. The values in these columns are used to identify which CDC events apply to specific records in the target table.
To define a combination of columns, use a comma-separated list of columns.
This clause is required.
-
IGNORE NULL UPDATESAllows ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and
IGNORE NULL UPDATESis specified, columns with anullvalue retain their existing values in the target. This also applies to nested columns with anullvalue.This clause is optional.
The default is to overwrite existing columns with
nullvalues. -
APPLY AS DELETE WHENSpecifies when a CDC event should be treated as a
DELETErather than an upsert.For SCD type 2 sources, to handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the
pipelines.cdc.tombstoneGCThresholdInSecondstable property.This clause is optional.
-
APPLY AS TRUNCATE WHENSpecifies when a CDC event should be treated as a full table
TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.The
APPLY AS TRUNCATE WHENclause is supported only for SCD type 1. SCD type 2 does not support the truncate operation.This clause is optional.
-
SEQUENCE BYThe column name specifying the logical order of CDC events in the source data. The pipeline processing uses this sequencing to handle change events that arrive out of order.
If multiple columns are needed for sequencing, use a
STRUCTexpression: it will order by the first struct field first, then by the second field if there's a tie, and so on.Specified columns must be sortable data types.
This clause is required.
-
COLUMNSSpecifies a subset of columns to include in the target table. You can either:
- Specify the complete list of columns to include:
COLUMNS (userId, name, city). - Specify a list of columns to exclude:
COLUMNS * EXCEPT (operation, sequenceNum)
This clause is optional.
The default is to include all columns in the target table when the
COLUMNSclause is not specified. - Specify the complete list of columns to include:
-
STORED ASWhether to store records as SCD type 1 or SCD type 2.
This clause is optional.
The default is SCD type 1.
-
TRACK HISTORY ONSpecifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either:
- Specify the complete list of columns to track:
COLUMNS (userId, name, city). - Specify a list of columns to be excluded from tracking:
COLUMNS * EXCEPT (operation, sequenceNum)
This clause is optional. The default is to track history for all the output columns when there are any changes, equivalent to
TRACK HISTORY ON *. - Specify the complete list of columns to track:
Examples
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);