Enable built-in CDC in SQL Server
Preview
LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.
This article describes how to enable built-in change data capture (CDC) in SQL Server. Either change tracking or CDC is required for ingestion into Databricks. For guidance on which option to choose, see Change tracking vs. change data capture.
Enable built-in CDC for the source database
To enable the source database for CDC, run the following stored procedure in Azure SQL, replacing the value for <database-name>
. You must be logged into the database you want to enable for CDC.
EXEC sys.sp_cdc_enable_db
To enable CDC in a database in Amazon RDS for SQL Server, run the following:
EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'
For more information, see Enable change data capture for a database in the SQL Server documentation.
Enable built-in CDC on the source table
To enable CDC on the source table, run the following stored procedure in Azure SQL:
EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name = N'MyTable',
@role_name = NULL,
@supports_net_changes = 1
Replace the values for
source_schema
,source_name
, androle_name
.@support_net_changes
only supports a value of1
if the table has a primary key.
For more information, see Enable change data capture for a table in the SQL Server documentation.
Grant SELECT on the CDC schema
In addition to the privileges described in the source setup, the database user needs the SELECT
privilege on the schema cdc
that contains the change tables that are created when CDC is enabled.
GRANT SELECT ON SCHEMA::cdc to [cdc-username];
Set up DDL capture and schema evolution
The SQL Server connector can track the data definition language (DDL) on replicated database objects and apply relevant table schema changes to the destination tables or add new tables in case of full schema replication.
To perform DDL capture, additional database object setup is required (for example, internal tables, stored procedures, and triggers). There are two ways to do this:
Automatic: Requires additional privileges to be granted to the database user configured for ingestion into Databricks.
Manual: Choose this if you want a read-only user to perform DDL capture and schema evolution. Requires manual creation of database objects.
Automatic setup for DDL capture
The database user must be in the db_owner
group for all databases that contain tables to be ingested. db_owner
is required to manage capture instances as DDL changes occur.
To add the database user to the db_owner
group, run the following command:
ALTER ROLE db_owner ADD MEMBER [CdcUserName]
Manual setup for DDL capture
If you want to perform the DDL capture and schema evolution using a read-only user, create the following objects manually as a user with db_owner
privileges:
Create the
dbo.refreshCaptureInstance_1
stored procedure:CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max), @columnList nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max), @captureInstanceCount int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']'; SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']'; SET @minLSN = (SELECT committedCursor FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName); IF @minLSN IS NULL OR @minLSN = '' BEGIN SET @minLSN = '0x00000000000000000000' END SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); print @columnList print @columnListValues DECLARE @mergeStmt NVARCHAR(MAX); SET @mergeStmt = ' MERGE ' + @newCaptureInstanceFullPath + ' AS target USING ' + @oldCaptureInstanceFullPath + ' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');'; EXEC (@mergeStmt); END COMMIT TRAN
Create the
dbo.mergeCaptureInstance_1
stored procedure:CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max), @columnList nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max), @captureInstanceCount int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']'; SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']'; SET @minLSN = (SELECT committedCursor FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName); IF @minLSN IS NULL OR @minLSN = '' BEGIN SET @minLSN = '0x00000000000000000000' END SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); print @columnList print @columnListValues DECLARE @mergeStmt NVARCHAR(MAX); SET @mergeStmt = ' MERGE ' + @newCaptureInstanceFullPath + ' AS target USING ' + @oldCaptureInstanceFullPath + ' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');'; EXEC (@mergeStmt); END COMMIT TRAN
Create the
dbo.disableOldCaptureInstance_1
stored procedure:CREATE PROCEDURE dbo.disableOldCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) WITH EXECUTE AS SELF AS SET NOCOUNT ON Declare @oldCaptureInstance nvarchar(max); BEGIN TRAN SET @oldCaptureInstance = (SELECT oldCaptureInstance from dbo.captureInstanceTracker_1 WHERE schemaName=@schemaName AND tableName=@tableName); IF @oldCaptureInstance IS NOT NULL BEGIN EXEC sys.sp_cdc_disable_table @source_schema= @schemaName, @source_name= @tableName, @capture_instance=@OldCaptureInstance; UPDATE dbo.captureInstanceTracker_1 SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName; END COMMIT TRAN
Create the table
dbo.captureInstanceTracker
:CREATE TABLE dbo.captureInstanceTracker_1 (oldCaptureInstance VARCHAR(max), newCaptureInstance VARCHAR(MAX), schemaName VARCHAR(100) not null, tableName VARCHAR(255) not null, committedCursor VARCHAR(max), triggerReinit bit PRIMARY KEY(schemaName, tableName));
Create the trigger
alterTableTrigger_1
:CREATE TRIGGER alterTableTrigger_1 on database for alter_table AS SET NOCOUNT ON BEGIN DECLARE @IsCdcEnabledDBLevel bit,@IsCdcEnabledTableLevel bit, @isColumnAdd nvarchar(max), @DbName nvarchar(max),@EventData XML, @SchemaName nvarchar(max), @TableName nvarchar(max); SET @DbName = DB_NAME(); SET @EventData = EVENTDATA(); SET @SchemaName = @EventData.value('(/EVENT_INSTANCE/SchemaName)[1]', 'NVARCHAR(255)'); SET @TableName = @EventData.value('(/EVENT_INSTANCE/ObjectName)[1]', 'NVARCHAR(255)'); SET @isColumnAdd = @EventData.value('(/EVENT_INSTANCE/AlterTableActionList/Create)[1]', 'NVARCHAR(255)'); SET @IsCdcEnabledDBLevel = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName); SET @IsCdcEnabledTableLevel = (SELECT is_tracked_by_cdc from sys.tables where schema_id=schema_id(@SchemaName) and name = @TableName); IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel=1 AND @isColumnAdd is not null) BEGIN EXECUTE dbo.refreshCaptureInstance_1 @SchemaName, @TableName; END END
Read-only user privileges for DDL capture
The following permissions are required for read-only users to run DDL change capture with built-in CDC tables:
GRANT VIEW DEFINITION ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION TO [CdcUserName];
GRANT VIEW DATABASE PERFORMANCE STATE TO [CdcUserName];
GRANT UPDATE ON object::dbo.captureInstanceTracker_1 TO [CdcUserName];
GRANT EXECUTE ON schema :: dbo TO [CdcUserName];
GRANT EXECUTE ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];