Enable built-in CDC in SQL Server

Preview

LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.

This article describes how to enable built-in change data capture (CDC) in SQL Server. Either change tracking or CDC is required for ingestion into Databricks. For guidance on which option to choose, see Change tracking vs. change data capture.

Enable built-in CDC for the source database

To enable the source database for CDC, run the following stored procedure in Azure SQL, replacing the value for <database-name>. You must be logged into the database you want to enable for CDC.

EXEC sys.sp_cdc_enable_db

To enable CDC in a database in Amazon RDS for SQL Server, run the following:

EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'

For more information, see Enable change data capture for a database in the SQL Server documentation.

Enable built-in CDC on the source table

To enable CDC on the source table, run the following stored procedure in Azure SQL:

EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name   = N'MyTable',
@role_name     = NULL,
@supports_net_changes = 1
  • Replace the values for source_schema, source_name, and role_name.

  • @support_net_changes only supports a value of 1 if the table has a primary key.

For more information, see Enable change data capture for a table in the SQL Server documentation.

Grant SELECT on the CDC schema

In addition to the privileges described in the source setup, the database user needs the SELECT privilege on the schema cdc that contains the change tables that are created when CDC is enabled.

GRANT SELECT ON SCHEMA::cdc to [cdc-username];

Set up DDL capture and schema evolution

The SQL Server connector can track the data definition language (DDL) on replicated database objects and apply relevant table schema changes to the destination tables or add new tables in case of full schema replication.

To perform DDL capture, additional database object setup is required (for example, internal tables, stored procedures, and triggers). There are two ways to do this:

  • Automatic: Requires additional privileges to be granted to the database user configured for ingestion into Databricks.

  • Manual: Choose this if you want a read-only user to perform DDL capture and schema evolution. Requires manual creation of database objects.

Automatic setup for DDL capture

The database user must be in the db_owner group for all databases that contain tables to be ingested. db_owner is required to manage capture instances as DDL changes occur.

To add the database user to the db_owner group, run the following command:

ALTER ROLE db_owner ADD MEMBER [CdcUserName]

Manual setup for DDL capture

If you want to perform the DDL capture and schema evolution using a read-only user, create the following objects manually as a user with db_owner privileges:

  1. Create the dbo.refreshCaptureInstance_1 stored procedure:

    CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max)
    AS
        SET NOCOUNT ON
    
        BEGIN TRAN
    
        DECLARE
            @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max),
            @columnList                 nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max),
            @captureInstanceCount       int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max);
    
        SET @captureInstanceCount = (SELECT COUNT(*)
                                    FROM cdc.change_tables
                                    WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName));
        IF (@captureInstanceCount = 2)
        BEGIN
            SET @oldCaptureInstanceName = (SELECT oldCaptureInstance
                                            FROM dbo.captureInstanceTracker_1
                                            WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
            SET @newCaptureInstanceName = (SELECT newCaptureInstance
                                            FROM dbo.captureInstanceTracker_1
                                            WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
    
            SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']';
            SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']';
            SET @minLSN = (SELECT committedCursor
                            FROM dbo.captureInstanceTracker_1
                            WHERE schemaName = @schemaName AND tableName = @tableName);
            IF @minLSN IS NULL OR @minLSN = ''
            BEGIN
                SET @minLSN = '0x00000000000000000000'
            END
    
            SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                            A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                            AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
            SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                                    A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                                    AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
    
            print @columnList
            print @columnListValues
    
            DECLARE @mergeStmt NVARCHAR(MAX);
            SET @mergeStmt = '
            MERGE ' + @newCaptureInstanceFullPath + ' AS target
            USING ' + @oldCaptureInstanceFullPath + ' AS source
            ON source.__$start_lsn = target.__$start_lsn
                AND source.__$seqval = target.__$seqval
                AND source.__$operation = target.__$operation
            WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN
            INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');';
    
            EXEC (@mergeStmt);
        END
        COMMIT TRAN
    
  2. Create the dbo.mergeCaptureInstance_1 stored procedure:

    CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max)
    AS
    SET NOCOUNT ON
    
    BEGIN TRAN
    
    DECLARE
        @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max),
        @columnList                 nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max),
        @captureInstanceCount       int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max);
    
    SET @captureInstanceCount = (SELECT COUNT(*)
                                FROM cdc.change_tables
                                WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName));
    IF (@captureInstanceCount = 2)
    BEGIN
        SET @oldCaptureInstanceName = (SELECT oldCaptureInstance
                                        FROM dbo.captureInstanceTracker_1
                                        WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
        SET @newCaptureInstanceName = (SELECT newCaptureInstance
                                        FROM dbo.captureInstanceTracker_1
                                        WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
    
        SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']';
        SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']';
        SET @minLSN = (SELECT committedCursor
                        FROM dbo.captureInstanceTracker_1
                        WHERE schemaName = @schemaName AND tableName = @tableName);
        IF @minLSN IS NULL OR @minLSN = ''
        BEGIN
            SET @minLSN = '0x00000000000000000000'
        END
    
        SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                        A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                        AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
        SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                                A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                                AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
    
        print @columnList
        print @columnListValues
    
        DECLARE @mergeStmt NVARCHAR(MAX);
        SET @mergeStmt = '
        MERGE ' + @newCaptureInstanceFullPath + ' AS target
        USING ' + @oldCaptureInstanceFullPath + ' AS source
        ON source.__$start_lsn = target.__$start_lsn
            AND source.__$seqval = target.__$seqval
            AND source.__$operation = target.__$operation
        WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN
        INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');';
    
        EXEC (@mergeStmt);
    END
    COMMIT TRAN
    
  3. Create the dbo.disableOldCaptureInstance_1 stored procedure:

    CREATE PROCEDURE dbo.disableOldCaptureInstance_1
        @schemaName varchar(max),
        @tableName varchar(max) WITH EXECUTE AS SELF
    AS
        SET NOCOUNT ON
    
        Declare @oldCaptureInstance nvarchar(max);
        BEGIN TRAN
          SET @oldCaptureInstance = (SELECT oldCaptureInstance from dbo.captureInstanceTracker_1 WHERE schemaName=@schemaName AND tableName=@tableName);
          IF @oldCaptureInstance IS NOT NULL
            BEGIN
              EXEC sys.sp_cdc_disable_table
                    @source_schema= @schemaName,
                    @source_name= @tableName,
                    @capture_instance=@OldCaptureInstance;
    
              UPDATE dbo.captureInstanceTracker_1 SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName;
            END
    
        COMMIT TRAN
    
  4. Create the table dbo.captureInstanceTracker:

    CREATE TABLE dbo.captureInstanceTracker_1
        (oldCaptureInstance VARCHAR(max),
        newCaptureInstance VARCHAR(MAX),
        schemaName VARCHAR(100) not null,
        tableName VARCHAR(255) not null,
        committedCursor VARCHAR(max),
        triggerReinit bit
        PRIMARY KEY(schemaName, tableName));
    
  5. Create the trigger alterTableTrigger_1:

    CREATE TRIGGER alterTableTrigger_1 on database for alter_table
    AS
        SET NOCOUNT ON
        BEGIN
          DECLARE @IsCdcEnabledDBLevel bit,@IsCdcEnabledTableLevel bit, @isColumnAdd nvarchar(max),
              @DbName nvarchar(max),@EventData XML, @SchemaName nvarchar(max), @TableName nvarchar(max);
    
          SET @DbName = DB_NAME();
          SET @EventData = EVENTDATA();
          SET @SchemaName = @EventData.value('(/EVENT_INSTANCE/SchemaName)[1]',  'NVARCHAR(255)');
          SET @TableName = @EventData.value('(/EVENT_INSTANCE/ObjectName)[1]',  'NVARCHAR(255)');
    
          SET @isColumnAdd = @EventData.value('(/EVENT_INSTANCE/AlterTableActionList/Create)[1]', 'NVARCHAR(255)');
          SET @IsCdcEnabledDBLevel = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName);
          SET @IsCdcEnabledTableLevel = (SELECT is_tracked_by_cdc from sys.tables where schema_id=schema_id(@SchemaName) and name = @TableName);
          IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel=1 AND @isColumnAdd is not null)
            BEGIN
              EXECUTE dbo.refreshCaptureInstance_1 @SchemaName, @TableName;
            END
        END
    

Read-only user privileges for DDL capture

The following permissions are required for read-only users to run DDL change capture with built-in CDC tables:

GRANT VIEW DEFINITION ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION TO [CdcUserName];
GRANT VIEW DATABASE PERFORMANCE STATE TO [CdcUserName];
GRANT UPDATE ON object::dbo.captureInstanceTracker_1 TO [CdcUserName];
GRANT EXECUTE ON schema :: dbo TO [CdcUserName];
GRANT EXECUTE ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];