Habilite a integração CDC no SQL Server
Prévia
O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .
Este artigo descreve como habilitar a captura integrada de dados de alterações (CDC) (CDC) em SQL Server. É necessário um acompanhamento de mudança ou o site CDC para a ingestão em Databricks. Para obter orientação sobre qual opção escolher, consulte Change acompanhamento vs. captura de dados de alterações (CDC).
Habilitar a integração CDC para o banco de dados de origem
Para habilitar o banco de dados de origem para CDC, execute o seguinte procedimento armazenado em Azure SQL, substituindo o valor de <database-name>
. O senhor deve estar conectado ao banco de dados que deseja habilitar para CDC.
EXEC sys.sp_cdc_enable_db
Para ativar o CDC em um banco de dados no Amazon RDS para SQL Server, execute o seguinte:
EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'
Para obter mais informações, consulte Ativar captura de dados de alterações (CDC) para um banco de dados na documentação SQL Server.
Habilitar a integração CDC na tabela de origem
Para ativar o CDC na tabela de origem, execute o seguinte procedimento armazenado em Azure SQL:
EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name = N'MyTable',
@role_name = NULL,
@supports_net_changes = 1
Substitua os valores por
source_schema
,source_name
erole_name
.@support_net_changes
só aceita um valor de1
se a tabela tiver um primário key.
Para obter mais informações, consulte Ativar captura de dados de alterações (CDC) para uma tabela na documentação SQL Server.
Conceder SELECT no esquema do CDC
Além dos privilégios descritos na configuração da fonte, o usuário do banco de dados precisa do privilégio SELECT
no esquema cdc
que contém as tabelas de alteração criadas quando o CDC é ativado.
GRANT SELECT ON SCHEMA::cdc to [cdc-username];
Configurar a captura de DDL e a evolução do esquema
O conector do SQL Server pode rastrear a linguagem de definição de dados (DDL) em objetos de banco de dados replicados e aplicar alterações relevantes no esquema da tabela às tabelas de destino ou adicionar novas tabelas no caso de replicação de esquema completo.
Para realizar a captura DDL, é necessária uma configuração adicional do objeto de banco de dados (por exemplo, tabelas internas, procedimentos armazenados e gatilhos). Há duas maneiras de fazer isso:
Automático: Requer privilégios adicionais a serem concedidos ao usuário do banco de dados configurado para ingestão no Databricks.
Manual: Escolha essa opção se quiser que um usuário somente leitura execute a captura de DDL e a evolução do esquema. Requer a criação manual de objetos de banco de dados.
Configuração automática para captura DDL
O usuário do banco de dados deve estar no grupo db_owner
para que todos os bancos de dados que contenham tabelas sejam ingeridos. db_owner
é necessário para gerenciar as instâncias de captura à medida que ocorrem alterações de DDL.
Para adicionar o usuário do banco de dados ao grupo db_owner
, execute o seguinte comando:
ALTER ROLE db_owner ADD MEMBER [CdcUserName]
Configuração manual para captura DDL
Se quiser executar a captura DDL e a evolução do esquema usando um usuário somente leitura, crie os seguintes objetos manualmente como um usuário com privilégios db_owner
:
Crie o procedimento armazenado
dbo.refreshCaptureInstance_1
:CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max), @columnList nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max), @captureInstanceCount int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']'; SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']'; SET @minLSN = (SELECT committedCursor FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName); IF @minLSN IS NULL OR @minLSN = '' BEGIN SET @minLSN = '0x00000000000000000000' END SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); print @columnList print @columnListValues DECLARE @mergeStmt NVARCHAR(MAX); SET @mergeStmt = ' MERGE ' + @newCaptureInstanceFullPath + ' AS target USING ' + @oldCaptureInstanceFullPath + ' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');'; EXEC (@mergeStmt); END COMMIT TRAN
Crie o procedimento armazenado
dbo.mergeCaptureInstance_1
:CREATE PROCEDURE dbo.mergeCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath nvarchar(max), @oldCaptureInstanceFullPath nvarchar(max), @updateStmt nvarchar(max), @columnList nvarchar(max), @columnListValues nvarchar(max), @oldCaptureInstanceName nvarchar(max), @newCaptureInstanceName nvarchar(max), @captureInstanceCount int, @captureInstanceTracker nvarchar(max), @minLSN varchar(max); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@schemaName + '.' + @tableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName) + '_CT'; SET @newCaptureInstanceFullPath = '[cdc].[' + @newCaptureInstanceName + ']'; SET @oldCaptureInstanceFullPath = '[cdc].[' + @oldCaptureInstanceName + ']'; SET @minLSN = (SELECT committedCursor FROM dbo.captureInstanceTracker_1 WHERE schemaName = @schemaName AND tableName = @tableName); IF @minLSN IS NULL OR @minLSN = '' BEGIN SET @minLSN = '0x00000000000000000000' END SET @columnList = (SELECT STUFF((SELECT ',[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); SET @columnListValues = (SELECT STUFF((SELECT ',source.[' + A.COLUMN_NAME + ']' FROM INFORMATION_SCHEMA.COLUMNS A join INFORMATION_SCHEMA.COLUMNS B on A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA='cdc' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA='cdc' for xml path('')), 1,1,'')); print @columnList print @columnListValues DECLARE @mergeStmt NVARCHAR(MAX); SET @mergeStmt = ' MERGE ' + @newCaptureInstanceFullPath + ' AS target USING ' + @oldCaptureInstanceFullPath + ' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > ' + @minLSN + ' THEN INSERT (' + @columnList + ') VALUES (' + @columnListValues + ');'; EXEC (@mergeStmt); END COMMIT TRAN
Crie o procedimento armazenado
dbo.disableOldCaptureInstance_1
:CREATE PROCEDURE dbo.disableOldCaptureInstance_1 @schemaName varchar(max), @tableName varchar(max) WITH EXECUTE AS SELF AS SET NOCOUNT ON Declare @oldCaptureInstance nvarchar(max); BEGIN TRAN SET @oldCaptureInstance = (SELECT oldCaptureInstance from dbo.captureInstanceTracker_1 WHERE schemaName=@schemaName AND tableName=@tableName); IF @oldCaptureInstance IS NOT NULL BEGIN EXEC sys.sp_cdc_disable_table @source_schema= @schemaName, @source_name= @tableName, @capture_instance=@OldCaptureInstance; UPDATE dbo.captureInstanceTracker_1 SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName; END COMMIT TRAN
Crie a tabela
dbo.captureInstanceTracker
:CREATE TABLE dbo.captureInstanceTracker_1 (oldCaptureInstance VARCHAR(max), newCaptureInstance VARCHAR(MAX), schemaName VARCHAR(100) not null, tableName VARCHAR(255) not null, committedCursor VARCHAR(max), triggerReinit bit PRIMARY KEY(schemaName, tableName));
Crie o gatilho
alterTableTrigger_1
:CREATE TRIGGER alterTableTrigger_1 on database for alter_table AS SET NOCOUNT ON BEGIN DECLARE @IsCdcEnabledDBLevel bit,@IsCdcEnabledTableLevel bit, @isColumnAdd nvarchar(max), @DbName nvarchar(max),@EventData XML, @SchemaName nvarchar(max), @TableName nvarchar(max); SET @DbName = DB_NAME(); SET @EventData = EVENTDATA(); SET @SchemaName = @EventData.value('(/EVENT_INSTANCE/SchemaName)[1]', 'NVARCHAR(255)'); SET @TableName = @EventData.value('(/EVENT_INSTANCE/ObjectName)[1]', 'NVARCHAR(255)'); SET @isColumnAdd = @EventData.value('(/EVENT_INSTANCE/AlterTableActionList/Create)[1]', 'NVARCHAR(255)'); SET @IsCdcEnabledDBLevel = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName); SET @IsCdcEnabledTableLevel = (SELECT is_tracked_by_cdc from sys.tables where schema_id=schema_id(@SchemaName) and name = @TableName); IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel=1 AND @isColumnAdd is not null) BEGIN EXECUTE dbo.refreshCaptureInstance_1 @SchemaName, @TableName; END END
Privilégios de usuário somente de leitura para captura de DDL
As seguintes permissões são necessárias para que usuários somente leitura executem a captura de alterações DDL com tabelas integradas CDC:
GRANT VIEW DEFINITION ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT VIEW DEFINITION TO [CdcUserName];
GRANT VIEW DATABASE PERFORMANCE STATE TO [CdcUserName];
GRANT UPDATE ON object::dbo.captureInstanceTracker_1 TO [CdcUserName];
GRANT EXECUTE ON schema :: dbo TO [CdcUserName];
GRANT EXECUTE ON object::dbo.mergeCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.disableOldCaptureInstance_1 TO [CdcUserName];
GRANT EXECUTE ON object::dbo.refreshCaptureInstance_1 TO [CdcUserName];