Criar um pipeline de CDC integrado para SQL Server
Beta
Este recurso está em Beta. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Pré-visualizações . Consulte Gerenciar prévias do Databricks.
Um pipeline CDC integrado ingere dados de alterações do SQL Server no Databricks usando um único pipeline. Ao contrário da arquitetura baseada em gateway padrão, que exige um gateway de ingestão e um pipeline de ingestão separados, um pipeline CDC integrado executa os estágios de extração e aplicação em uma única atualização de pipeline.
Quando usar o conector integrado de CDC
A tabela a seguir compara pipelines CDC integrados com a arquitetura baseada em gateway padrão:
Recurso | Padrão CDC (baseado em gateway) | CDC Integrado |
|---|---|---|
Número de pipelines | Dois (gateway de ingestão e pipeline de ingestão) | Um (pipeline unificado) |
Configuração | Crie um gateway, então crie um pipeline de ingestão que faz referência ao ID do gateway. | Criar um único pipeline que faça referência a uma conexão do Unity Catalog |
Modo Gateway | O gateway é executado continuamente | O pipeline incorpora a extração em cada atualização. |
Referência de conexão |
|
|
Tipo de Conector | Implícito | Explícito: |
Volume de preparação | O gateway gerencia o volume de preparação internamente. | É possível configurar o volume de teste através de |
Para a configuração do banco de dados de origem, consulte Configurar o Microsoft SQL Server para ingestão no Databricks. A mesma configuração de origem se aplica a ambas as arquiteturas.
Como um pipeline de CDC integrado é executado
Cada atualização de pipeline executa duas etapas em sequência:
- Extração. O pipeline conecta ao banco de dados de origem usando a conexão do Unity Catalog. Na primeira execução ou em um refresh completo, ele captura um Snapshot inicial. Em execuções subsequentes, ele captura alterações incrementais (inserções, atualizações e exclusões) usando o mecanismo de acompanhamento de alterações integrado do banco de dados. O pipeline grava dados extraídos em um volume de preparação do Unity Catalog.
- Aplicativo. O pipeline lê do volume de preparação e aplica alterações nas tabelas de transmissão de destino no Unity Catalog. Operações de merge usam as chaves primárias configuradas e o tipo de SCD. A pipeline garante semântica exatamente uma vez.
Durante o período Beta, cada atualização do pipeline tem um runtime máximo de aproximadamente 30 minutos. Se a origem tiver mais alterações do que uma única atualização pode processar, a próxima atualização agendada será retomada de onde a anterior parou. Para ingerir dados de forma recorrente, programe o pipeline usando uma tarefa do Lakeflow Jobs.
Requisitos
-
O Workspace está habilitado para o Unity Catalog.
-
O compute serverless está habilitado para seu workspace. Consulte Requisitos de computação serverless.
-
Para criar uma conexão: É preciso ter
CREATE CONNECTIONprivilégios no metastore. Consulte Gerenciar privilégios no Unity Catalog.Se o seu conector suporta a criação de pipeline baseada na IU, você pode criar a conexão e o pipeline ao mesmo tempo concluindo os passos desta página. No entanto, se você usa a criação de pipeline baseada em API, você deve criar a conexão no Catalog Explorer antes de concluir os passos nesta página. Consulte Conectar-se a fontes de ingestão gerenciadas.
-
Se você planeja usar uma conexão existente: você tem os privilégios
USE CONNECTIONouALL PRIVILEGESna conexão. -
O usuário possui
USE CATALOGprivilégios no catálogo de destino. -
Ter os privilégios
USE SCHEMA,CREATE TABLEeCREATE VOLUMEem um esquema existente ou os privilégiosCREATE SCHEMAno catálogo de destino. -
Seu workspace deve ter o recurso de conector CDC integrado habilitado. Entre em contato com sua equipe de conta da Databricks.
-
Há acesso à instância primária do SQL Server. O conector CDC integrado não suporta réplicas de leitura, instâncias em espera ou instâncias secundárias.
-
A configuração de origem do SQL Server foi concluída. Consulte Configurar o Microsoft SQL Server para ingestão no Databricks.
-
Você tem as seguintes permissões:
CREATE CONNECTIONno metastore (se estiver criando uma nova conexão do Unity Catalog), ouUSE CONNECTIONem uma conexão existente.USE CATALOGno catálogo de destinos.USE SCHEMAeCREATE TABLEno esquema de destino.CREATE VOLUMEno esquema de destino, ou no esquema especificado emdata_staging_options. É necessário um volume de staging mesmo quedata_staging_optionsnão esteja definido, porque o pipeline cria um automaticamente no esquema de destino.
Compute requirements
Um pipeline CDC integrado é executado em compute clássico ou serverless:
- Compute clássico. O plano de computação clássico precisa alcançar sua instância do SQL Server pela rede. Para SQL Server hospedado na nuvem (Azure SQL, Amazon RDS), permita conexões de entrada do plano de computação do Databricks. Para SQL Server on-premises, use Azure ExpressRoute, AWS Direct Connect ou VPN.
- Compute serverless. Configurar conectividade de rede serverless entre o compute serverless do Databricks e seu banco de dados de origem. Fontes on-premises exigem um caminho de rede através da saída serverless configurada (por exemplo, um gateway de trânsito ou VNet emparelhada com ExpressRoute ou VPN).
Para o compute clássico, é possível usar permissões irrestritas de criação de cluster ou uma política de cluster personalizada com cluster_type fixado em dlt, runtime_engine fixado em STANDARD, e pelo menos 8 núcleos recomendados para uma extração eficiente.
Criar uma conexão do Unity Catalog para SQL Server
Crie uma conexão do Unity Catalog ao SQL Server antes de criar um pipeline. Consulte Criar uma conexão com o SQL Server.
Criar um pipeline CDC integrado
Crie pipelines CDC integrados usando a API, a CLI do Databricks, notebooks ou Pacotes de Automação Declarativa. A criação da interface do usuário ainda não está disponível.
Todas as solicitações de criação de pipeline devem incluir "channel": "PREVIEW".
- Declarative Automation Bundles
- Databricks notebook
- Databricks CLI
- REST API
Defina o recurso de pipeline em um arquivo de pacote (por exemplo, resources/integrated_cdc_pipeline.yml):
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'
resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_catalog: 'my_database'
source_schema: 'dbo'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'
Para executar o pipeline em uma programação, defina um Job (por exemplo, resources/integrated_cdc_job.yml) que aciona o pipeline. Como cada etapa de extração executa por pelo menos 10 minutos, um intervalo de 60 minutos ou mais é um bom ponto de partida:
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'
Implante o pacote com a CLI do Databricks:
databricks bundle deploy
databricks bundle run integrated_cdc_job
Para obter mais informações, consulte O que são Pacotes de Automação Declarativa?.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
pipeline = w.pipelines.create(
name="<pipeline-name>",
pipeline_type="MANAGED_INGESTION",
channel="PREVIEW",
serverless=False,
catalog="<destination-catalog>",
schema="<destination-schema>",
ingestion_definition={
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "<source-database>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
}
}
],
},
)
print(f"Pipeline created: {pipeline.pipeline_id}")
databricks pipelines create --json '{
"name": "<pipeline-name>",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "<source-database>",
"source_schema": "<source-schema>",
"source_table": "<source-table>"
}
}
]
}
}'
O exemplo a seguir replica duas tabelas de um banco de dados do SQL Server. A tabela customers usa SCD Tipo 1, e a tabela orders usa SCD Tipo 2 (que requer SQL Server CDC na origem). Ambos herdam o destino de nível superior main.ingestion. Definir "serverless": true para executar em compute serverless.
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-sqlserver-connection",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "my_database",
"source_schema": "dbo",
"source_table": "customers",
"table_configuration": {
"primary_keys": ["customer_id"],
"scd_type": "SCD_TYPE_1"
}
}
},
{
"table": {
"source_catalog": "my_database",
"source_schema": "dbo",
"source_table": "orders",
"table_configuration": {
"primary_keys": ["order_id"],
"scd_type": "SCD_TYPE_2"
}
}
}
],
"data_staging_options": {
"catalog_name": "main",
"schema_name": "ingestion_staging"
}
}
}
Para replicar todas as tabelas em um esquema de origem, use um objeto schema em vez de objetos table individuais. O pipeline ignora tabelas sem CDC ou acompanhamento de alterações ativado na origem.
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-schema-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-sqlserver-connection",
"connector_type": "CDC",
"objects": [
{
"schema": {
"source_catalog": "my_database",
"source_schema": "dbo",
"destination_catalog": "main",
"destination_schema": "ingestion"
}
}
]
}
}
Para iniciar uma atualização do pipeline:
POST /api/2.0/pipelines/<pipeline-id>/updates
{
"full_refresh": false
}
Programar atualizações recorrentes
Pipelines de CDC integrados operam apenas no modo triggered. Para ingerir dados em um agendamento recorrente, crie uma tarefa Lakeflow Jobs que executa o pipeline. Cada atualização dura aproximadamente 30 minutos e pode não concluir o processamento de todo o backlog de alterações em uma única atualização. Programe os pipelines com frequência suficiente para que as atualizações subsequentes possam acompanhar. Um ponto de partida de 60 minutos funciona bem para a maioria das cargas de trabalho. Se um trigger é disparado enquanto uma atualização anterior ainda está em execução, a nova atualização é enfileirada.
Referência de configuração
Parâmetros do pipeline
Parâmetro | Tipo | Descrição |
|---|---|---|
| string | Um nome para o pipeline. |
| string | Deve ser |
| string | Deve ser |
| Booleana |
|
| string | O catálogo de destinos default. Usado quando um |
| string | O esquema de destino default. Usado quando um |
| string | A conexão do Unity Catalog ao banco de dados de origem. |
| string | Deve ser |
| matriz | A lista de tabelas ou esquemas para ingestão. |
| objeto | Opcional. O catálogo e o esquema onde o pipeline cria o volume de preparo. Usa por padrão o esquema de destino do pipeline. |
Especificação da tabela
Parâmetro | Obrigatório | Descrição |
|---|---|---|
| Sim | Nome da base de dados de origem. |
| Sim | O nome do esquema de origem. |
| Sim | O nome da tabela de origem. |
| Não | Catálogo de destinos. O padrão é o |
| Não | O esquema de destino. O padrão é o |
| Não | O nome da tabela de destino. O padrão é |
Configuração da tabela
Parâmetro | Padrão | Descrição |
|---|---|---|
| Detectado automaticamente | As colunas que identificam cada linha. Detectado automaticamente da chave primária de origem se não especificada. |
|
|
|
| Detectado automaticamente | As colunas usadas para ordenar eventos de CDC. Autodetectado baseado no mecanismo CDC de origem, se não especificado. |
Para mapeamentos de tipos de dados do SQL Server, consulte referência do conector do SQL Server. Os pipelines CDC integrados suportam a ampliação automática de tipo: quando um tipo de coluna de origem é ampliado (por exemplo, INT para BIGINT), a tabela de destino se adapta automaticamente.
Monitorar o pipeline
Depois de criar e iniciar um pipeline CDC integrado, monitore seu status usando o seguinte:
-
Interface do usuário do Databricks. Abra o pipeline na seção Pipelines para exibir o status de atualização, as métricas de ingestão por tabela e a linhagem.
-
API REST.
TextGET /api/2.0/pipelines/<pipeline-id> -
API de Eventos.
TextGET /api/2.0/pipelines/<pipeline-id>/events
A primeira atualização do pipeline realiza um snapshot completo de todas as tabelas selecionadas, o que pode levar mais tempo do que as atualizações incrementais. Para tabelas grandes, o snapshot inicial pode exigir várias atualizações programadas para ser concluído. Cada atualização subsequente retoma de onde a anterior parou.
Para verificar a ingestão:
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;
-- View recent changes (SCD Type 2 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;
Para comportamento de refresh completo e refresh automático completo, consulte Fazer refresh completo das tabelas de destino.
Pipelines CDC integrados têm o autoscale vertical ativado por default. Se uma atualização de pipeline falhar por causa de uma condição de falta de memória, a próxima atualização automaticamente provisiona um driver maior. Para substituir esse comportamento, utilize uma política de cluster personalizada.
Limitações
- Beta. O conector CDC integrado requer habilitação no nível do workspace. Entre em contato com sua equipe de conta da Databricks.
- Apenas modo acionado. Pipelines CDC integrados não são compatíveis com execução contínua (sempre ativa). Programar pipelines com uma tarefa de LakeFlow Jobs.
- Criação somente por API. A criação de pipelines é possível por meio da API REST, da CLI do Databricks, de notebooks e de Bundles de Automação Declarativa. A criação da interface do usuário ainda não é suportada.
- O canal deve ser
PREVIEW. As especificações do pipeline devem incluir"channel": "PREVIEW". - Tipo de conexão e de conector são imutáveis.
connection_nameeconnector_typenão podem ser alterados após a criação do pipeline. Para alterar a origem, criar um novo pipeline. - Máximo recomendado de 300 tabelas por pipeline.
- Somente instâncias primárias. O conector CDC integrado não suporta réplicas de leitura, instâncias em espera ou instâncias secundárias.
- Tabelas sem chaves primárias. O pipeline trata todas as colunas não-LOB como uma chave composta. Linhas duplicadas podem ser recolhidas em uma única linha, a menos que o SCD Tipo 2 esteja habilitado.
- O snapshot inicial pode abranger múltiplas atualizações. Para tabelas grandes, o snapshot inicial pode não concluir em uma única atualização. Atualizações programadas subsequentes retomam do ponto em que a atualização anterior parou.
- Cada atualização leva aproximadamente 30 minutos. Durante o Beta, o pipeline não processa necessariamente todo o backlog de alterações em uma única atualização. As atualizações programadas subsequentes retomam o processamento de onde a atualização anterior parou. Não é possível configurar este runtime durante o Beta.
- Log purge requires full refresh. Se o SQL Server limpar os registros de acompanhamento de alterações ou os registros de CDC antes que o pipeline os processe, realize um refresh completo nas tabelas afetadas. O pipeline detecta essa condição e apresenta um erro no log de eventos.
Solução de problemas
Alguns códigos de erro usam o prefixo INGESTION_GATEWAY_. Esta é uma convenção de nomenclatura legada e não indica que um gateway de ingestão separado seja necessário.
Erro | Causa | Resolução |
|---|---|---|
| O pipeline não está no Mode de publicação direta. | O Direct Publishing Mode é definido automaticamente para pipelines CDC integrados. Caso veja este erro, recrie o pipeline. |
| CDC ou acompanhamento de alterações não está habilitado em uma ou mais tabelas de origem. | Habilitar CDC ou acompanhamento de alterações nas tabelas afetadas. Consulte Configurar o Microsoft SQL Server para ingestão no Databricks. |
| A tabela de origem especificada não existe ou foi removida. | Verifique se a tabela existe e que o usuário da conexão tem acesso. |
| O esquema de origem não existe. | Verifique se o esquema existe no banco de dados de origem. |
| O tipo de banco de dados de origem não é suportado. | O conector CDC integrado suporta SQL Server e Oracle. |
| A especificação da tabela está ausente | Adicionar |
| O sinalizador de recurso do workspace não está ativado. | Entre em contato com sua equipe de conta da Databricks para habilitar o conector CDC integrado no seu workspace. |
Em caso de um problema não abordado aqui:
- Revise o log de eventos do pipeline na UI do Databricks ou por meio de
GET /api/2.0/pipelines/<pipeline-id>/events. - Teste a conexão do Unity Catalog do Catalog Explorer para confirmar se a origem está acessível.
- Confirme se o acompanhamento de alterações ou o CDC está habilitado no banco de dados e nas tabelas de origem.
- Verifique se o usuário do banco de dados tem as permissões do SQL Server listadas em requisitos de usuário do banco de dados do Microsoft SQL Server.
- Verifique se a especificação do seu pipeline inclui
"channel": "PREVIEW".