Create an integrated CDC pipeline for SQL Server
This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.
An integrated CDC pipeline ingests change data from SQL Server into Databricks using a single pipeline. Unlike the standard gateway-based architecture, which requires a separate ingestion gateway and ingestion pipeline, an integrated CDC pipeline runs both extraction and application stages in one pipeline update.
When to use the integrated CDC connector
The following table compares integrated CDC pipelines with the standard gateway-based architecture:
Feature | Standard CDC (gateway-based) | Integrated CDC |
|---|---|---|
Number of pipelines | Two (ingestion gateway and ingestion pipeline) | One (unified pipeline) |
Setup | Create a gateway, then create an ingestion pipeline that references the gateway ID | Create a single pipeline that references a Unity Catalog connection |
Gateway mode | The gateway runs continuously | The pipeline embeds extraction in each update |
Connection reference |
|
|
Connector type | Implicit | Explicit: |
Staging volume | The gateway manages the staging volume internally | You configure the staging volume through |
For source database setup, see Configure Microsoft SQL Server for ingestion into Databricks. The same source configuration applies to both architectures.
How an integrated CDC pipeline runs
Each pipeline update runs two stages in sequence:
- Extraction. The pipeline connects to the source database using the Unity Catalog connection. On the first run or a full refresh, it captures an initial snapshot. On subsequent runs, it captures incremental changes (inserts, updates, and deletes) using the database's built-in change tracking mechanism. The pipeline writes extracted data to a Unity Catalog staging volume.
- Application. The pipeline reads from the staging volume and applies changes to destination streaming tables in Unity Catalog. Merge operations use the configured primary keys and SCD type. The pipeline guarantees exactly-once semantics.
During the Beta period, each pipeline update has a maximum runtime of approximately 30 minutes. If the source has more changes than a single update can process, the next scheduled update resumes where the previous one stopped. To ingest data on a recurring basis, schedule the pipeline using a Lakeflow Jobs task.
Requirements
-
Your workspace is enabled for Unity Catalog.
-
Serverless compute is enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a connection: You have
CREATE CONNECTIONprivileges on the metastore. See Manage privileges in Unity Catalog.If your connector supports UI-based pipeline authoring, you can create the connection and the pipeline at the same time by completing the steps on this page. However, if you use API-based pipeline authoring, you must create the connection in Catalog Explorer before you complete the steps on this page. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection. -
You have
USE CATALOGprivileges on the target catalog. -
You have
USE SCHEMA,CREATE TABLE, andCREATE VOLUMEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
- Your workspace must have the integrated CDC connector feature enabled. Contact your Databricks account team.
- You have access to the primary SQL Server instance. The integrated CDC connector does not support read replicas, standby instances, or secondary instances.
- You have completed the SQL Server source setup. See Configure Microsoft SQL Server for ingestion into Databricks.
- You have the following permissions:
CREATE CONNECTIONon the metastore (if creating a new Unity Catalog connection), orUSE CONNECTIONon an existing connection.USE CATALOGon the destination catalog.USE SCHEMAandCREATE TABLEon the destination schema.CREATE VOLUMEon the destination schema, or on the schema specified indata_staging_options. A staging volume is required even ifdata_staging_optionsis not set, because the pipeline autocreates one in the destination schema.
Compute requirements
An integrated CDC pipeline runs on classic or serverless compute:
- Classic compute. The compute plane must reach your SQL Server instance over the network. For cloud-hosted SQL Server (Azure SQL, Amazon RDS), allow incoming connections from the Databricks compute plane. For on-premises SQL Server, use Azure ExpressRoute, AWS Direct Connect, or VPN.
- Serverless compute. Configure serverless network connectivity between Databricks serverless compute and your source database. On-premises sources require a network path through the configured serverless egress (for example, a transit gateway or peered VNet with ExpressRoute or VPN).
For classic compute, you can use unrestricted cluster creation permissions or a custom cluster policy with cluster_type fixed to dlt, runtime_engine fixed to STANDARD, and at least 8 cores recommended for efficient extraction.
Create a Unity Catalog connection to SQL Server
Create a Unity Catalog connection to SQL Server before creating a pipeline. See Create a SQL Server connection.
Create an integrated CDC pipeline
Create integrated CDC pipelines using the API, the Databricks CLI, notebooks, or Declarative Automation Bundles. UI creation is not yet available.
All pipeline creation requests must include "channel": "PREVIEW".
- Declarative Automation Bundles
- Databricks notebook
- Databricks CLI
- REST API
Define the pipeline resource in a bundle file (for example, resources/integrated_cdc_pipeline.yml):
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'
resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_catalog: 'my_database'
source_schema: 'dbo'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'
To run the pipeline on a schedule, define a job (for example, resources/integrated_cdc_job.yml) that triggers the pipeline. Because each extraction stage runs for at least 10 minutes, an interval of 60 minutes or longer is a good starting point:
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'
Deploy the bundle with the Databricks CLI:
databricks bundle deploy
databricks bundle run integrated_cdc_job
For more information, see What are Declarative Automation Bundles?.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
pipeline = w.pipelines.create(
name="<pipeline-name>",
pipeline_type="MANAGED_INGESTION",
channel="PREVIEW",
serverless=False,
catalog="<destination-catalog>",
schema="<destination-schema>",
ingestion_definition={
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "<source-database>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
}
}
],
},
)
print(f"Pipeline created: {pipeline.pipeline_id}")
databricks pipelines create --json '{
"name": "<pipeline-name>",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "<source-database>",
"source_schema": "<source-schema>",
"source_table": "<source-table>"
}
}
]
}
}'
The following example replicates two tables from a SQL Server database. The customers table uses SCD Type 1, and the orders table uses SCD Type 2 (which requires SQL Server CDC on the source). Both inherit the top-level destination main.ingestion. Set "serverless": true to run on serverless compute.
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-sqlserver-connection",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "my_database",
"source_schema": "dbo",
"source_table": "customers",
"table_configuration": {
"primary_keys": ["customer_id"],
"scd_type": "SCD_TYPE_1"
}
}
},
{
"table": {
"source_catalog": "my_database",
"source_schema": "dbo",
"source_table": "orders",
"table_configuration": {
"primary_keys": ["order_id"],
"scd_type": "SCD_TYPE_2"
}
}
}
],
"data_staging_options": {
"catalog_name": "main",
"schema_name": "ingestion_staging"
}
}
}
To replicate every table in a source schema, use a schema object instead of individual table objects. The pipeline skips tables without CDC or change tracking enabled on the source.
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-schema-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-sqlserver-connection",
"connector_type": "CDC",
"objects": [
{
"schema": {
"source_catalog": "my_database",
"source_schema": "dbo",
"destination_catalog": "main",
"destination_schema": "ingestion"
}
}
]
}
}
To start a pipeline update:
POST /api/2.0/pipelines/<pipeline-id>/updates
{
"full_refresh": false
}
Schedule recurring updates
Integrated CDC pipelines run in triggered mode only. To ingest data on a recurring schedule, create a Lakeflow Jobs task that runs the pipeline. Each update runs for approximately 30 minutes and might not finish processing the full change backlog in a single update. Schedule pipelines frequently enough for subsequent updates to catch up. A starting point of 60 minutes works well for most workloads. If a trigger fires while a previous update is still running, the new update is queued.
Configuration reference
Pipeline parameters
Parameter | Type | Description |
|---|---|---|
| string | A name for the pipeline. |
| string | Must be |
| string | Must be |
| Boolean |
|
| string | The default destination catalog. Used when a per-table |
| string | The default destination schema. Used when a per-table |
| string | The Unity Catalog connection to the source database. |
| string | Must be |
| array | The list of tables or schemas to ingest. |
| object | Optional. The catalog and schema where the pipeline creates the staging volume. Defaults to the pipeline's destination schema. |
Table specification
Parameter | Required | Description |
|---|---|---|
| Yes | The source database name. |
| Yes | The source schema name. |
| Yes | The source table name. |
| No | The destination catalog. Defaults to the pipeline's |
| No | The destination schema. Defaults to the pipeline's |
| No | The destination table name. Defaults to |
Table configuration
Parameter | Default | Description |
|---|---|---|
| Autodetected | The columns that identify each row. Autodetected from the source primary key if not specified. |
|
|
|
| Autodetected | The columns used to order CDC events. Autodetected based on the source CDC mechanism if not specified. |
For SQL Server data type mappings, see SQL Server connector reference. Integrated CDC pipelines support automatic type widening: when a source column type is widened (for example, INT to BIGINT), the destination table adapts automatically.
Monitor the pipeline
After you create and start an integrated CDC pipeline, monitor its status using the following:
-
Databricks UI. Open the pipeline in the Pipelines section to view update status, per-table ingestion metrics, and lineage.
-
REST API.
TextGET /api/2.0/pipelines/<pipeline-id> -
Events API.
TextGET /api/2.0/pipelines/<pipeline-id>/events
The first pipeline update performs a full snapshot of all selected tables, which can take longer than incremental updates. For large tables, the initial snapshot might require multiple scheduled updates to complete. Each subsequent update picks up where the previous one left off.
To verify ingestion:
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;
-- View recent changes (SCD Type 2 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;
For full refresh and auto full refresh behavior, see Fully refresh target tables.
Integrated CDC pipelines have vertical autoscaling enabled by default. If a pipeline update fails because of an out-of-memory condition, the next update automatically provisions a larger driver. To override this behavior, use a custom cluster policy.
Limitations
- Beta. The integrated CDC connector requires workspace-level enablement. Contact your Databricks account team.
- Triggered mode only. Integrated CDC pipelines do not support continuous (always-on) execution. Schedule pipelines using a Lakeflow Jobs task.
- API-only creation. Pipeline creation is available through the REST API, the Databricks CLI, notebooks, and Declarative Automation Bundles. UI creation is not yet supported.
- Channel must be
PREVIEW. Pipeline specs must include"channel": "PREVIEW". - Connection and connector type are immutable.
connection_nameandconnector_typecannot be changed after the pipeline is created. To change the source, create a new pipeline. - Recommended maximum of 300 tables per pipeline.
- Primary instances only. The integrated CDC connector does not support read replicas, standby instances, or secondary instances.
- Tables without primary keys. The pipeline treats all non-LOB columns as a composite key. Duplicate rows might collapse to a single row unless you enable SCD Type 2.
- Initial snapshot might span multiple updates. For large tables, the initial snapshot might not finish in a single update. Subsequent scheduled updates resume where the previous update left off.
- Each update runs for approximately 30 minutes. During Beta, the pipeline does not necessarily process the entire change backlog in a single update. Subsequent scheduled updates resume processing where the previous update left off. You cannot configure this runtime during Beta.
- Log purge requires full refresh. If SQL Server purges change tracking logs or CDC logs before the pipeline processes them, perform a full refresh on the affected tables. The pipeline detects this condition and surfaces an error in the event log.
Troubleshooting
Some error codes use the INGESTION_GATEWAY_ prefix. This is a legacy naming convention and does not indicate that a separate ingestion gateway is required.
Error | Cause | Resolution |
|---|---|---|
| The pipeline is not in Direct Publishing Mode. | Direct Publishing Mode is set automatically for integrated CDC pipelines. If you see this error, recreate the pipeline. |
| CDC or change tracking is not enabled on one or more source tables. | Enable CDC or change tracking on the affected tables. See Configure Microsoft SQL Server for ingestion into Databricks. |
| The specified source table does not exist or has been dropped. | Verify that the table exists and that the connection user has access. |
| The source schema does not exist. | Verify the schema exists in the source database. |
| The source database type is not supported. | The integrated CDC connector supports SQL Server and Oracle. |
| The table specification is missing | Add |
| The workspace feature flag is not enabled. | Contact your Databricks account team to enable the integrated CDC connector on your workspace. |
If you encounter an issue not covered here:
- Review the pipeline event log in the Databricks UI or through
GET /api/2.0/pipelines/<pipeline-id>/events. - Test the Unity Catalog connection from Catalog Explorer to confirm the source is reachable.
- Confirm that change tracking or CDC is enabled on the source database and tables.
- Verify that the database user has the SQL Server permissions listed in Microsoft SQL Server database user requirements.
- Check that your pipeline spec includes
"channel": "PREVIEW".