Habilite a integração CDC no SQL Server

Prévia

O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .

Este artigo descreve como habilitar a captura integrada de dados de alterações (CDC) (CDC) em SQL Server. É necessário um acompanhamento de mudança ou o site CDC para a ingestão em Databricks. Para obter orientação sobre qual opção escolher, consulte Change acompanhamento vs. captura de dados de alterações (CDC).

Habilitar a integração CDC para o banco de dados de origem

Para habilitar o banco de dados de origem para CDC, execute o seguinte procedimento armazenado em Azure SQL, substituindo o valor de <database-name>. O senhor deve estar conectado ao banco de dados que deseja habilitar para CDC.

EXEC sys.sp_cdc_enable_db

Para ativar o CDC em um banco de dados no Amazon RDS para SQL Server, execute o seguinte:

EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'

Para obter mais informações, consulte Ativar captura de dados de alterações (CDC) para um banco de dados na documentação SQL Server.

Habilitar a integração CDC na tabela de origem

Para ativar o CDC na tabela de origem, execute o seguinte procedimento armazenado em Azure SQL:

EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name   = N'MyTable',
@role_name     = NULL,
@supports_net_changes = 1
  • Substitua os valores por source_schema, source_name e role_name.

  • @support_net_changes só aceita um valor de 1 se a tabela tiver um primário key.

Para obter mais informações, consulte Ativar captura de dados de alterações (CDC) para uma tabela na documentação SQL Server.

Conceder SELECT no esquema do CDC

Além dos privilégios descritos na configuração da fonte, o usuário do banco de dados precisa do privilégio SELECT no esquema cdc que contém as tabelas de alteração criadas quando o CDC é ativado.

GRANT SELECT ON SCHEMA::cdc to [cdc-username];

Configurar a captura de DDL e a evolução do esquema

O conector do SQL Server pode rastrear a linguagem de definição de dados (DDL) em objetos de banco de dados replicados e aplicar alterações relevantes no esquema da tabela às tabelas de destino ou adicionar novas tabelas no caso de replicação de esquema completo.

Para realizar a captura DDL, é necessária uma configuração adicional do objeto de banco de dados (por exemplo, tabelas internas, procedimentos armazenados e gatilhos). Há duas maneiras de fazer isso:

  • Automático: Requer privilégios adicionais a serem concedidos ao usuário do banco de dados configurado para ingestão no Databricks.

  • Manual: Escolha essa opção se quiser que um usuário somente leitura execute a captura de DDL e a evolução do esquema. Requer a criação manual de objetos de banco de dados.

Configuração automática para captura DDL

O usuário do banco de dados deve estar no grupo db_owner para que todos os bancos de dados que contenham tabelas sejam ingeridos. db_owner é necessário para gerenciar as instâncias de captura à medida que ocorrem alterações de DDL.

Para adicionar o usuário do banco de dados ao grupo db_owner, execute o seguinte comando:

ALTER ROLE db_owner ADD MEMBER [CdcUserName]

Configuração manual para captura DDL

Se quiser executar a captura DDL e a evolução do esquema usando um usuário somente leitura, crie os seguintes objetos manualmente como um usuário com privilégios db_owner:

  1. Crie o procedimento armazenado dbo.refreshCaptureInstance_1:

    CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max)
    AS
        SET NOCOUNT ON
    
        BEGIN TRAN
    
        DECLARE
            @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max),
            @columnList                 nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max),
            @captureInstanceCount       int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max);
    
        SET @captureInstanceCount = (SELECT COUNT(*)
                                    FROM cdc.change_tables
                                    WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName));
        IF (@captureInstanceCount = 2)
        BEGIN
            SET @oldCaptureInstanceName = (SELECT oldCaptureInstance
                                            FROM dbo.captureInstanceTracker_1
                                            WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
            SET @newCaptureInstanceName = (SELECT newCaptureInstance
                                            FROM dbo.captureInstanceTracker_1
                                            WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
    
            SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']';
            SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']';
            SET @minLSN = (SELECT committedCursor
                            FROM dbo.captureInstanceTracker_1
                            WHERE schemaName = @schemaName AND tableName = @tableName);
            IF @minLSN IS NULL OR @minLSN = ''
            BEGIN
                SET @minLSN = '0x00000000000000000000'
            END
    
            SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                            A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                            AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
            SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                                    A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                                    AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
    
            print @columnList
            print @columnListValues
    
            DECLARE @mergeStmt NVARCHAR(MAX);
            SET @mergeStmt = '
            MERGE ' + @newCaptureInstanceFullPath + ' AS target
            USING ' + @oldCaptureInstanceFullPath + ' AS source
            ON source.__$start_lsn = target.__$start_lsn
                AND source.__$seqval = target.__$seqval
                AND source.__$operation = target.__$operation
            WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN
            INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');';
    
            EXEC (@mergeStmt);
        END
        COMMIT TRAN
    
  2. Crie o procedimento armazenado dbo.mergeCaptureInstance_1:

    CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max)
    AS
    SET NOCOUNT ON
    
    BEGIN TRAN
    
    DECLARE
        @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max),
        @columnList                 nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max),
        @captureInstanceCount       int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max);
    
    SET @captureInstanceCount = (SELECT COUNT(*)
                                FROM cdc.change_tables
                                WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName));
    IF (@captureInstanceCount = 2)
    BEGIN
        SET @oldCaptureInstanceName = (SELECT oldCaptureInstance
                                        FROM dbo.captureInstanceTracker_1
                                        WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
        SET @newCaptureInstanceName = (SELECT newCaptureInstance
                                        FROM dbo.captureInstanceTracker_1
                                        WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT';
    
        SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']';
        SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']';
        SET @minLSN = (SELECT committedCursor
                        FROM dbo.captureInstanceTracker_1
                        WHERE schemaName = @schemaName AND tableName = @tableName);
        IF @minLSN IS NULL OR @minLSN = ''
        BEGIN
            SET @minLSN = '0x00000000000000000000'
        END
    
        SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                        A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                        AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
        SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on
                                A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc'
                                AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,''));
    
    
        print @columnList
        print @columnListValues
    
        DECLARE @mergeStmt NVARCHAR(MAX);
        SET @mergeStmt = '
        MERGE ' + @newCaptureInstanceFullPath + ' AS target
        USING ' + @oldCaptureInstanceFullPath + ' AS source
        ON source.__$start_lsn = target.__$start_lsn
            AND source.__$seqval = target.__$seqval
            AND source.__$operation = target.__$operation
        WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN
        INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');';
    
        EXEC (@mergeStmt);
    END
    COMMIT TRAN
    
  3. Crie o procedimento armazenado dbo.disableOldCaptureInstance_1:

    CREATE PROCEDURE dbo.disableOldCaptureInstance_1
        @schemaName varchar(max),
        @tableName varchar(max) WITH EXECUTE AS SELF
    AS
        SET NOCOUNT ON
    
        Declare @oldCaptureInstance nvarchar(max);
        BEGIN TRAN
          SET @oldCaptureInstance = (SELECT oldCaptureInstance from dbo.captureInstanceTracker_1 WHERE schemaName=@schemaName AND tableName=@tableName);
          IF @oldCaptureInstance IS NOT NULL
            BEGIN
              EXEC sys.sp_cdc_disable_table
                    @source_schema= @schemaName,
                    @source_name= @tableName,
                    @capture_instance=@OldCaptureInstance;
    
              UPDATE dbo.captureInstanceTracker_1 SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName;
            END
    
        COMMIT TRAN
    
  4. Crie a tabela dbo.captureInstanceTracker:

    CREATE TABLE dbo.captureInstanceTracker_1
        (oldCaptureInstance VARCHAR(max),
        newCaptureInstance VARCHAR(MAX),
        schemaName VARCHAR(100) not null,
        tableName VARCHAR(255) not null,
        committedCursor VARCHAR(max),
        triggerReinit bit
        PRIMARY KEY(schemaName, tableName));
    
  5. Crie o gatilho alterTableTrigger_1:

    CREATE TRIGGER alterTableTrigger_1 on database for alter_table
    AS
        SET NOCOUNT ON
        BEGIN
          DECLARE @IsCdcEnabledDBLevel bit,@IsCdcEnabledTableLevel bit, @isColumnAdd nvarchar(max),
              @DbName nvarchar(max),@EventData XML, @SchemaName nvarchar(max), @TableName nvarchar(max);
    
          SET @DbName = DB_NAME();
          SET @EventData = EVENTDATA();
          SET @SchemaName = @EventData.value('(/EVENT_INSTANCE/SchemaName)[1]',  'NVARCHAR(255)');
          SET @TableName = @EventData.value('(/EVENT_INSTANCE/ObjectName)[1]',  'NVARCHAR(255)');
    
          SET @isColumnAdd = @EventData.value('(/EVENT_INSTANCE/AlterTableActionList/Create)[1]', 'NVARCHAR(255)');
          SET @IsCdcEnabledDBLevel = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName);
          SET @IsCdcEnabledTableLevel = (SELECT is_tracked_by_cdc from sys.tables where schema_id=schema_id(@SchemaName) and name = @TableName);
          IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel=1 AND @isColumnAdd is not null)
            BEGIN
              EXECUTE dbo.refreshCaptureInstance_1 @SchemaName, @TableName;
            END
        END
    

Privilégios de usuário somente de leitura para captura de DDL

As seguintes permissões são necessárias para que usuários somente leitura executem a captura de alterações DDL com tabelas integradas CDC:

GRANT VIEW DEFINITION ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION TO [CdcUserName];
GRANT VIEW DATABASE PERFORMANCE STATE TO [CdcUserName];
GRANT UPDATE ON object::dbo.captureInstanceTracker_1 TO [CdcUserName];
GRANT EXECUTE ON schema :: dbo TO [CdcUserName];
GRANT EXECUTE ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];