SQL Server で組み込みの CDC を有効にする
プレビュー
LakeFlow Connect はゲート パブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、 で組み込み チェンジデータキャプチャ () を有効にする方法について説明します。CDCSQL ServerDatabricks へのインジェストには、変更追跡または CDC が必要です。 選択するオプションのガイダンスについては、「 変更の追跡とチェンジデータキャプチャ」を参照してください。
ソース データベースの組み込み CDC を有効にする
CDC のソース データベースを有効にするには、Azure SQL で次のストアド プロシージャを実行し、 <database-name>
の値を置き換えます。 CDCを有効にするデータベースにログインする必要があります。
EXEC sys.sp_cdc_enable_db
Amazon RDS for SQL Server のデータベースで CDC を有効にするには、次のコマンドを実行します。
EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'
詳細については、 ドキュメントの「 データベースのチェンジデータキャプチャを有効にする SQL Server」を参照してください。
ソース テーブルで組み込み CDC を有効にする
ソース テーブルで CDC を有効にするには、Azure SQL で次のストアド プロシージャを実行します。
EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name = N'MyTable',
@role_name = NULL,
@supports_net_changes = 1
source_schema
、source_name
、role_name
の値を置き換えます。@support_net_changes
テーブルにプライマリキーがある場合にのみ、1
の値をサポートします。
詳細については、 ドキュメントの「 テーブルのチェンジデータキャプチャを有効にする SQL Server」を参照してください。
CDC スキーマに対する SELECT の付与
ソース設定で説明されている権限に加えて、データベース・ユーザーには、CDC が有効になったときに作成される変更テーブルを含むスキーマcdc
に対する SELECT
権限が必要です。
GRANT SELECT ON SCHEMA::cdc to [cdc-username];
DDL キャプチャとスキーマの進化を設定する
SQL Server コネクタは、レプリケートされたデータベース オブジェクトのデータ定義言語 (DDL) を追跡し、関連するテーブル スキーマの変更を宛先テーブルに適用したり、フル スキーマ レプリケーションの場合は新しいテーブルを追加したりできます。
DDL キャプチャを実行するには、追加のデータベース オブジェクトのセットアップが必要です (内部テーブル、ストアド プロシージャ、トリガーなど)。 これを行うには、次の 2 つの方法があります。
自動: Databricks へのインジェスト用に構成されたデータベース ユーザーに追加の権限を付与する必要があります。
手動: 読み取り専用ユーザーが DDL キャプチャとスキーマ進化を実行する場合は、これを選択します。 データベース・オブジェクトを手動で作成する必要があります。
DDL キャプチャの自動セットアップ
データベース・ユーザーは、取り込むテーブルを含むすべてのデータベースの db_owner
・グループに属している必要があります。 DDL の変更が発生したときにキャプチャ インスタンスを管理するには、db_owner
が必要です。
データベース・ユーザーを db_owner
グループに追加するには、次のコマンドを実行します。
ALTER ROLE db_owner ADD MEMBER [CdcUserName]
DDL キャプチャの手動セットアップ
読み取り専用ユーザーを使用して DDL キャプチャとスキーマの進化を実行する場合は、 db_owner
権限を持つユーザーとして次のオブジェクトを手動で作成します。
dbo.refreshCaptureInstance_1
ストアドプロシージャを作成します。CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max), @columnList nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max), @captureInstanceCount int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']'; SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']'; SET @minLSN = (SELECT committedCursor FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName); IF @minLSN IS NULL OR @minLSN = '' BEGIN SET @minLSN = '0x00000000000000000000' END SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); print @columnList print @columnListValues DECLARE @mergeStmt NVARCHAR(MAX); SET @mergeStmt = ' MERGE ' + @newCaptureInstanceFullPath + ' AS target USING ' + @oldCaptureInstanceFullPath + ' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');'; EXEC (@mergeStmt); END COMMIT TRAN
dbo.mergeCaptureInstance_1
ストアドプロシージャを作成します。CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max), @columnList nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max), @captureInstanceCount int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']'; SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']'; SET @minLSN = (SELECT committedCursor FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName); IF @minLSN IS NULL OR @minLSN = '' BEGIN SET @minLSN = '0x00000000000000000000' END SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); print @columnList print @columnListValues DECLARE @mergeStmt NVARCHAR(MAX); SET @mergeStmt = ' MERGE ' + @newCaptureInstanceFullPath + ' AS target USING ' + @oldCaptureInstanceFullPath + ' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');'; EXEC (@mergeStmt); END COMMIT TRAN
dbo.disableOldCaptureInstance_1
ストアドプロシージャを作成します。CREATE PROCEDURE dbo.disableOldCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) WITH EXECUTE AS SELF AS SET NOCOUNT ON Declare @oldCaptureInstance nvarchar(max); BEGIN TRAN SET @oldCaptureInstance = (SELECT oldCaptureInstance from dbo.captureInstanceTracker_1 WHERE schemaName=@schemaName AND tableName=@tableName); IF @oldCaptureInstance IS NOT NULL BEGIN EXEC sys.sp_cdc_disable_table @source_schema= @schemaName, @source_name= @tableName, @capture_instance=@OldCaptureInstance; UPDATE dbo.captureInstanceTracker_1 SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName; END COMMIT TRAN
dbo.captureInstanceTracker
テーブルを作成します。CREATE TABLE dbo.captureInstanceTracker_1 (oldCaptureInstance VARCHAR(max), newCaptureInstance VARCHAR(MAX), schemaName VARCHAR(100) not null, tableName VARCHAR(255) not null, committedCursor VARCHAR(max), triggerReinit bit PRIMARY KEY(schemaName, tableName));
トリガー
alterTableTrigger_1
を作成します。CREATE TRIGGER alterTableTrigger_1 on database for alter_table AS SET NOCOUNT ON BEGIN DECLARE @IsCdcEnabledDBLevel bit,@IsCdcEnabledTableLevel bit, @isColumnAdd nvarchar(max), @DbName nvarchar(max),@EventData XML, @SchemaName nvarchar(max), @TableName nvarchar(max); SET @DbName = DB_NAME(); SET @EventData = EVENTDATA(); SET @SchemaName = @EventData.value('(/EVENT_INSTANCE/SchemaName)[1]', 'NVARCHAR(255)'); SET @TableName = @EventData.value('(/EVENT_INSTANCE/ObjectName)[1]', 'NVARCHAR(255)'); SET @isColumnAdd = @EventData.value('(/EVENT_INSTANCE/AlterTableActionList/Create)[1]', 'NVARCHAR(255)'); SET @IsCdcEnabledDBLevel = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName); SET @IsCdcEnabledTableLevel = (SELECT is_tracked_by_cdc from sys.tables where schema_id=schema_id(@SchemaName) and name = @TableName); IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel=1 AND @isColumnAdd is not null) BEGIN EXECUTE dbo.refreshCaptureInstance_1 @SchemaName, @TableName; END END
DDL キャプチャの読み取り専用ユーザー権限
読み取り専用ユーザーが組み込みの CDC テーブルを使用して DDL 変更キャプチャを実行するには、次のアクセス許可が必要です。
GRANT VIEW DEFINITION ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION TO [CdcUserName];
GRANT VIEW DATABASE PERFORMANCE STATE TO [CdcUserName];
GRANT UPDATE ON object::dbo.captureInstanceTracker_1 TO [CdcUserName];
GRANT EXECUTE ON schema :: dbo TO [CdcUserName];
GRANT EXECUTE ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];