SQL Server で組み込みの CDC を有効にする

プレビュー

LakeFlow Connect はゲート パブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、 で組み込み チェンジデータキャプチャ () を有効にする方法について説明します。CDCSQL ServerDatabricks へのインジェストには、変更追跡または CDC が必要です。 選択するオプションのガイダンスについては、「 変更の追跡とチェンジデータキャプチャ」を参照してください。

ソース データベースの組み込み CDC を有効にする

CDC のソース データベースを有効にするには、Azure SQL で次のストアド プロシージャを実行し、 <database-name>の値を置き換えます。 CDCを有効にするデータベースにログインする必要があります。

EXEC sys.sp_cdc_enable_db

Amazon RDS for SQL Server のデータベースで CDC を有効にするには、次のコマンドを実行します。

EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'

詳細については、 ドキュメントの「 データベースのチェンジデータキャプチャを有効にする SQL Server」を参照してください。

ソース テーブルで組み込み CDC を有効にする

ソース テーブルで CDC を有効にするには、Azure SQL で次のストアド プロシージャを実行します。

EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name   = N'MyTable',
@role_name     = NULL,
@supports_net_changes = 1
  • source_schemasource_namerole_nameの値を置き換えます。

  • @support_net_changes テーブルにプライマリキーがある場合にのみ、 1 の値をサポートします。

詳細については、 ドキュメントの「 テーブルのチェンジデータキャプチャを有効にする SQL Server」を参照してください。

CDC スキーマに対する SELECT の付与

ソース設定で説明されている権限に加えて、データベース・ユーザーには、CDC が有効になったときに作成される変更テーブルを含むスキーマcdcに対する SELECT 権限が必要です。

GRANT SELECT ON SCHEMA::cdc to [cdc-username];

DDL キャプチャとスキーマの進化を設定する

SQL Server コネクタは、レプリケートされたデータベース オブジェクトのデータ定義言語 (DDL) を追跡し、関連するテーブル スキーマの変更を宛先テーブルに適用したり、フル スキーマ レプリケーションの場合は新しいテーブルを追加したりできます。

DDL キャプチャを実行するには、追加のデータベース オブジェクトのセットアップが必要です (内部テーブル、ストアド プロシージャ、トリガーなど)。 これを行うには、次の 2 つの方法があります。

  • 自動: Databricks へのインジェスト用に構成されたデータベース ユーザーに追加の権限を付与する必要があります。

  • 手動: 読み取り専用ユーザーが DDL キャプチャとスキーマ進化を実行する場合は、これを選択します。 データベース・オブジェクトを手動で作成する必要があります。

DDL キャプチャの自動セットアップ

データベース・ユーザーは、取り込むテーブルを含むすべてのデータベースの db_owner ・グループに属している必要があります。 DDL の変更が発生したときにキャプチャ インスタンスを管理するには、db_ownerが必要です。

データベース・ユーザーを db_owner グループに追加するには、次のコマンドを実行します。

ALTER ROLE db_owner ADD MEMBER [CdcUserName]

DDL キャプチャの手動セットアップ

読み取り専用ユーザーを使用して DDL キャプチャとスキーマの進化を実行する場合は、 db_owner 権限を持つユーザーとして次のオブジェクトを手動で作成します。

  1. dbo.refreshCaptureInstance_1ストアドプロシージャを作成します。

    CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max)
    AS
        SET NOCOUNT ON
    
        BEGIN TRAN
    
        DECLARE
            @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max),
            @columnList                 nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max),
            @captureInstanceCount       int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max);
    
        SET @captureInstanceCount = (SELECT COUNT(*)
                                    FROM cdc.change_tables
                                    WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName));
        IF (@captureInstanceCount = 2)
        BEGIN
            SET @oldCaptureInstanceName = (SELECT oldCaptureInstance
                                            FROM dbo.captureInstanceTracker_1
                                            WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
            SET @newCaptureInstanceName = (SELECT newCaptureInstance
                                            FROM dbo.captureInstanceTracker_1
                                            WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
    
            SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']';
            SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']';
            SET @minLSN = (SELECT committedCursor
                            FROM dbo.captureInstanceTracker_1
                            WHERE schemaName = @schemaName AND tableName = @tableName);
            IF @minLSN IS NULL OR @minLSN = ''
            BEGIN
                SET @minLSN = '0x00000000000000000000'
            END
    
            SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                            A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                            AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
            SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                                    A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                                    AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
    
            print @columnList
            print @columnListValues
    
            DECLARE @mergeStmt NVARCHAR(MAX);
            SET @mergeStmt = '
            MERGE ' + @newCaptureInstanceFullPath + ' AS target
            USING ' + @oldCaptureInstanceFullPath + ' AS source
            ON source.__$start_lsn = target.__$start_lsn
                AND source.__$seqval = target.__$seqval
                AND source.__$operation = target.__$operation
            WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN
            INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');';
    
            EXEC (@mergeStmt);
        END
        COMMIT TRAN
    
  2. dbo.mergeCaptureInstance_1ストアドプロシージャを作成します。

    CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max)
    AS
    SET NOCOUNT ON
    
    BEGIN TRAN
    
    DECLARE
        @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max),
        @columnList                 nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max),
        @captureInstanceCount       int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max);
    
    SET @captureInstanceCount = (SELECT COUNT(*)
                                FROM cdc.change_tables
                                WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName));
    IF (@captureInstanceCount = 2)
    BEGIN
        SET @oldCaptureInstanceName = (SELECT oldCaptureInstance
                                        FROM dbo.captureInstanceTracker_1
                                        WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
        SET @newCaptureInstanceName = (SELECT newCaptureInstance
                                        FROM dbo.captureInstanceTracker_1
                                        WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
    
        SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']';
        SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']';
        SET @minLSN = (SELECT committedCursor
                        FROM dbo.captureInstanceTracker_1
                        WHERE schemaName = @schemaName AND tableName = @tableName);
        IF @minLSN IS NULL OR @minLSN = ''
        BEGIN
            SET @minLSN = '0x00000000000000000000'
        END
    
        SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                        A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                        AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
        SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                                A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                                AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
    
        print @columnList
        print @columnListValues
    
        DECLARE @mergeStmt NVARCHAR(MAX);
        SET @mergeStmt = '
        MERGE ' + @newCaptureInstanceFullPath + ' AS target
        USING ' + @oldCaptureInstanceFullPath + ' AS source
        ON source.__$start_lsn = target.__$start_lsn
            AND source.__$seqval = target.__$seqval
            AND source.__$operation = target.__$operation
        WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN
        INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');';
    
        EXEC (@mergeStmt);
    END
    COMMIT TRAN
    
  3. dbo.disableOldCaptureInstance_1ストアドプロシージャを作成します。

    CREATE PROCEDURE dbo.disableOldCaptureInstance_1
        @schemaName varchar(max),
        @tableName varchar(max) WITH EXECUTE AS SELF
    AS
        SET NOCOUNT ON
    
        Declare @oldCaptureInstance nvarchar(max);
        BEGIN TRAN
          SET @oldCaptureInstance = (SELECT oldCaptureInstance from dbo.captureInstanceTracker_1 WHERE schemaName=@schemaName AND tableName=@tableName);
          IF @oldCaptureInstance IS NOT NULL
            BEGIN
              EXEC sys.sp_cdc_disable_table
                    @source_schema= @schemaName,
                    @source_name= @tableName,
                    @capture_instance=@OldCaptureInstance;
    
              UPDATE dbo.captureInstanceTracker_1 SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName;
            END
    
        COMMIT TRAN
    
  4. dbo.captureInstanceTrackerテーブルを作成します。

    CREATE TABLE dbo.captureInstanceTracker_1
        (oldCaptureInstance VARCHAR(max),
        newCaptureInstance VARCHAR(MAX),
        schemaName VARCHAR(100) not null,
        tableName VARCHAR(255) not null,
        committedCursor VARCHAR(max),
        triggerReinit bit
        PRIMARY KEY(schemaName, tableName));
    
  5. トリガー alterTableTrigger_1を作成します。

    CREATE TRIGGER alterTableTrigger_1 on database for alter_table
    AS
        SET NOCOUNT ON
        BEGIN
          DECLARE @IsCdcEnabledDBLevel bit,@IsCdcEnabledTableLevel bit, @isColumnAdd nvarchar(max),
              @DbName nvarchar(max),@EventData XML, @SchemaName nvarchar(max), @TableName nvarchar(max);
    
          SET @DbName = DB_NAME();
          SET @EventData = EVENTDATA();
          SET @SchemaName = @EventData.value('(/EVENT_INSTANCE/SchemaName)[1]',  'NVARCHAR(255)');
          SET @TableName = @EventData.value('(/EVENT_INSTANCE/ObjectName)[1]',  'NVARCHAR(255)');
    
          SET @isColumnAdd = @EventData.value('(/EVENT_INSTANCE/AlterTableActionList/Create)[1]', 'NVARCHAR(255)');
          SET @IsCdcEnabledDBLevel = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName);
          SET @IsCdcEnabledTableLevel = (SELECT is_tracked_by_cdc from sys.tables where schema_id=schema_id(@SchemaName) and name = @TableName);
          IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel=1 AND @isColumnAdd is not null)
            BEGIN
              EXECUTE dbo.refreshCaptureInstance_1 @SchemaName, @TableName;
            END
        END
    

DDL キャプチャの読み取り専用ユーザー権限

読み取り専用ユーザーが組み込みの CDC テーブルを使用して DDL 変更キャプチャを実行するには、次のアクセス許可が必要です。

GRANT VIEW DEFINITION ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION TO [CdcUserName];
GRANT VIEW DATABASE PERFORMANCE STATE TO [CdcUserName];
GRANT UPDATE ON object::dbo.captureInstanceTracker_1 TO [CdcUserName];
GRANT EXECUTE ON schema :: dbo TO [CdcUserName];
GRANT EXECUTE ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];