Pular para o conteúdo principal

Criar um pipeline de CDC integrado para SQL Server

info

Beta

Este recurso está em Beta. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Pré-visualizações . Consulte Gerenciar prévias do Databricks.

Um pipeline CDC integrado ingere dados de alterações do SQL Server no Databricks usando um único pipeline. Ao contrário da arquitetura baseada em gateway padrão, que exige um gateway de ingestão e um pipeline de ingestão separados, um pipeline CDC integrado executa os estágios de extração e aplicação em uma única atualização de pipeline.

Quando usar o conector integrado de CDC

A tabela a seguir compara pipelines CDC integrados com a arquitetura baseada em gateway padrão:

Recurso

Padrão CDC (baseado em gateway)

CDC Integrado

Número de pipelines

Dois (gateway de ingestão e pipeline de ingestão)

Um (pipeline unificado)

Configuração

Crie um gateway, então crie um pipeline de ingestão que faz referência ao ID do gateway.

Criar um único pipeline que faça referência a uma conexão do Unity Catalog

Modo Gateway

O gateway é executado continuamente

O pipeline incorpora a extração em cada atualização.

Referência de conexão

ingestion_gateway_id

connection_name (uma conexão do Unity Catalog)

Tipo de Conector

Implícito

Explícito: connector_type: CDC

Volume de preparação

O gateway gerencia o volume de preparação internamente.

É possível configurar o volume de teste através de data_staging_options. O pipeline é criado automaticamente se não for especificado.

Para a configuração do banco de dados de origem, consulte Configurar o Microsoft SQL Server para ingestão no Databricks. A mesma configuração de origem se aplica a ambas as arquiteturas.

Como um pipeline de CDC integrado é executado

Cada atualização de pipeline executa duas etapas em sequência:

  1. Extração. O pipeline conecta ao banco de dados de origem usando a conexão do Unity Catalog. Na primeira execução ou em um refresh completo, ele captura um Snapshot inicial. Em execuções subsequentes, ele captura alterações incrementais (inserções, atualizações e exclusões) usando o mecanismo de acompanhamento de alterações integrado do banco de dados. O pipeline grava dados extraídos em um volume de preparação do Unity Catalog.
  2. Aplicativo. O pipeline lê do volume de preparação e aplica alterações nas tabelas de transmissão de destino no Unity Catalog. Operações de merge usam as chaves primárias configuradas e o tipo de SCD. A pipeline garante semântica exatamente uma vez.

Durante o período Beta, cada atualização do pipeline tem um runtime máximo de aproximadamente 30 minutos. Se a origem tiver mais alterações do que uma única atualização pode processar, a próxima atualização agendada será retomada de onde a anterior parou. Para ingerir dados de forma recorrente, programe o pipeline usando uma tarefa do Lakeflow Jobs.

Requisitos

  • O Workspace está habilitado para o Unity Catalog.

  • O compute serverless está habilitado para seu workspace. Consulte Requisitos de computação serverless.

  • Para criar uma conexão: É preciso ter CREATE CONNECTION privilégios no metastore. Consulte Gerenciar privilégios no Unity Catalog.

    Se o seu conector suporta a criação de pipeline baseada na IU, você pode criar a conexão e o pipeline ao mesmo tempo concluindo os passos desta página. No entanto, se você usa a criação de pipeline baseada em API, você deve criar a conexão no Catalog Explorer antes de concluir os passos nesta página. Consulte Conectar-se a fontes de ingestão gerenciadas.

  • Se você planeja usar uma conexão existente: você tem os privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

  • O usuário possui USE CATALOG privilégios no catálogo de destino.

  • Ter os privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou os privilégios CREATE SCHEMA no catálogo de destino.

  • Seu workspace deve ter o recurso de conector CDC integrado habilitado. Entre em contato com sua equipe de conta da Databricks.

  • Há acesso à instância primária do SQL Server. O conector CDC integrado não suporta réplicas de leitura, instâncias em espera ou instâncias secundárias.

  • A configuração de origem do SQL Server foi concluída. Consulte Configurar o Microsoft SQL Server para ingestão no Databricks.

  • Você tem as seguintes permissões:

    • CREATE CONNECTION no metastore (se estiver criando uma nova conexão do Unity Catalog), ou USE CONNECTION em uma conexão existente.
    • USE CATALOG no catálogo de destinos.
    • USE SCHEMA e CREATE TABLE no esquema de destino.
    • CREATE VOLUME no esquema de destino, ou no esquema especificado em data_staging_options. É necessário um volume de staging mesmo que data_staging_options não esteja definido, porque o pipeline cria um automaticamente no esquema de destino.

Compute requirements

Um pipeline CDC integrado é executado em compute clássico ou serverless:

  • Compute clássico. O plano de compute deve alcançar sua instância do SQL Server pela rede. Para SQL Server hospedado na nuvem (Azure SQL, Amazon RDS), permita conexões de entrada do plano de computação do Databricks. Para SQL Server on-premises, use Azure ExpressRoute, AWS Direct Connect ou VPN.
  • Compute serverless. Configurar conectividade de rede serverless entre o compute serverless do Databricks e seu banco de dados de origem. Fontes on-premises exigem um caminho de rede através da saída serverless configurada (por exemplo, um gateway de trânsito ou VNet emparelhada com ExpressRoute ou VPN).

Para o compute clássico, é possível usar permissões irrestritas de criação de cluster ou uma política de cluster personalizada com cluster_type fixado em dlt, runtime_engine fixado em STANDARD, e pelo menos 8 núcleos recomendados para uma extração eficiente.

Criar uma conexão do Unity Catalog para SQL Server

Crie uma conexão do Unity Catalog ao SQL Server antes de criar um pipeline. Consulte Criar uma conexão com o SQL Server.

Criar um pipeline CDC integrado

Crie pipelines CDC integrados usando a API, a CLI do Databricks, notebooks ou Pacotes de Automação Declarativa. A criação da interface do usuário ainda não está disponível.

importante

Todas as solicitações de criação de pipeline devem incluir "channel": "PREVIEW".

Defina o recurso de pipeline em um arquivo de pacote (por exemplo, resources/integrated_cdc_pipeline.yml):

YAML
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'

resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_catalog: 'my_database'
source_schema: 'dbo'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'

Para executar o pipeline em uma programação, defina um Job (por exemplo, resources/integrated_cdc_job.yml) que aciona o pipeline. Como cada etapa de extração executa por pelo menos 10 minutos, um intervalo de 60 minutos ou mais é um bom ponto de partida:

YAML
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'

Implante o pacote com a CLI do Databricks:

Shell
databricks bundle deploy
databricks bundle run integrated_cdc_job

Para obter mais informações, consulte O que são Pacotes de Automação Declarativa?.

Programar atualizações recorrentes

Pipelines de CDC integrados operam apenas no modo triggered. Para ingerir dados em um agendamento recorrente, crie uma tarefa Lakeflow Jobs que executa o pipeline. Cada atualização dura aproximadamente 30 minutos e pode não concluir o processamento de todo o backlog de alterações em uma única atualização. Programe os pipelines com frequência suficiente para que as atualizações subsequentes possam acompanhar. Um ponto de partida de 60 minutos funciona bem para a maioria das cargas de trabalho. Se um trigger é disparado enquanto uma atualização anterior ainda está em execução, a nova atualização é enfileirada.

Referência de configuração

Parâmetros do pipeline

Parâmetro

Tipo

Descrição

name

string

Um nome para o pipeline.

pipeline_type

string

Deve ser MANAGED_INGESTION.

channel

string

Deve ser PREVIEW.

serverless

Booleana

true para compute serverless, false para compute Classic. Compute serverless exige rede serverless para seu banco de dados de origem.

catalog

string

O catálogo de destinos default. Usado quando um destination_catalog por tabela não é especificado.

schema

string

O esquema de destino default. Usado quando um destination_schema por tabela não é especificado.

ingestion_definition.connection_name

string

A conexão do Unity Catalog ao banco de dados de origem.

ingestion_definition.connector_type

string

Deve ser CDC.

ingestion_definition.objects

matriz

A lista de tabelas ou esquemas para ingestão.

ingestion_definition.data_staging_options

objeto

Opcional. O catálogo e o esquema onde o pipeline cria o volume de preparo. Usa por padrão o esquema de destino do pipeline.

Especificação da tabela

Parâmetro

Obrigatório

Descrição

source_catalog

Sim

Nome da base de dados de origem.

source_schema

Sim

O nome do esquema de origem.

source_table

Sim

O nome da tabela de origem.

destination_catalog

Não

Catálogo de destinos. O padrão é o catalog do pipeline.

destination_schema

Não

O esquema de destino. O padrão é o schema do pipeline.

destination_table

Não

O nome da tabela de destino. O padrão é source_table.

Configuração da tabela

Parâmetro

Padrão

Descrição

primary_keys

Detectado automaticamente

As colunas que identificam cada linha. Detectado automaticamente da chave primária de origem se não especificada.

scd_type

SCD_TYPE_1

SCD_TYPE_1 mantém apenas a última versão. SCD_TYPE_2 mantém o histórico completo e exige o CDC do SQL Server na origem. SCD tipo 2 não é compatível com acompanhamento de alterações.

sequence_by

Detectado automaticamente

As colunas usadas para ordenar eventos de CDC. Autodetectado baseado no mecanismo CDC de origem, se não especificado.

Para mapeamentos de tipos de dados do SQL Server, consulte referência do conector do SQL Server. Os pipelines CDC integrados suportam a ampliação automática de tipo: quando um tipo de coluna de origem é ampliado (por exemplo, INT para BIGINT), a tabela de destino se adapta automaticamente.

Monitorar o pipeline

Depois de criar e iniciar um pipeline CDC integrado, monitore seu status usando o seguinte:

  • Interface do usuário do Databricks. Abra o pipeline na seção Pipelines para exibir o status de atualização, as métricas de ingestão por tabela e a linhagem.

  • API REST.

    Text
    GET /api/2.0/pipelines/<pipeline-id>
  • API de Eventos.

    Text
    GET /api/2.0/pipelines/<pipeline-id>/events

A primeira atualização do pipeline realiza um snapshot completo de todas as tabelas selecionadas, o que pode levar mais tempo do que as atualizações incrementais. Para tabelas grandes, o snapshot inicial pode exigir várias atualizações programadas para ser concluído. Cada atualização subsequente retoma de onde a anterior parou.

Para verificar a ingestão:

SQL
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;

-- View recent changes (SCD Type 2 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;

Para comportamento de refresh completo e refresh automático completo, consulte Fazer refresh completo das tabelas de destino.

Pipelines CDC integrados têm o autoscale vertical ativado por default. Se uma atualização de pipeline falhar por causa de uma condição de falta de memória, a próxima atualização automaticamente provisiona um driver maior. Para substituir esse comportamento, utilize uma política de cluster personalizada.

Limitações

  • Beta. O conector CDC integrado requer habilitação no nível do workspace. Entre em contato com sua equipe de conta da Databricks.
  • Apenas modo acionado. Pipelines CDC integrados não são compatíveis com execução contínua (sempre ativa). Programar pipelines com uma tarefa de LakeFlow Jobs.
  • Criação somente por API. A criação de pipelines é possível por meio da API REST, da CLI do Databricks, de notebooks e de Bundles de Automação Declarativa. A criação da interface do usuário ainda não é suportada.
  • O canal deve ser PREVIEW. As especificações do pipeline devem incluir "channel": "PREVIEW".
  • Tipo de conexão e de conector são imutáveis. connection_name e connector_type não podem ser alterados após a criação do pipeline. Para alterar a origem, criar um novo pipeline.
  • Máximo recomendado de 300 tabelas por pipeline.
  • Somente instâncias primárias. O conector CDC integrado não suporta réplicas de leitura, instâncias em espera ou instâncias secundárias.
  • Tabelas sem chaves primárias. O pipeline trata todas as colunas não-LOB como uma chave composta. Linhas duplicadas podem ser recolhidas em uma única linha, a menos que o SCD Tipo 2 esteja habilitado.
  • O snapshot inicial pode abranger múltiplas atualizações. Para tabelas grandes, o snapshot inicial pode não concluir em uma única atualização. Atualizações programadas subsequentes retomam do ponto em que a atualização anterior parou.
  • Cada atualização leva aproximadamente 30 minutos. Durante o Beta, o pipeline não processa necessariamente todo o backlog de alterações em uma única atualização. As atualizações programadas subsequentes retomam o processamento de onde a atualização anterior parou. Não é possível configurar este runtime durante o Beta.
  • Log purge requires full refresh. Se o SQL Server limpar os registros de acompanhamento de alterações ou os registros de CDC antes que o pipeline os processe, realize um refresh completo nas tabelas afetadas. O pipeline detecta essa condição e apresenta um erro no log de eventos.

Solução de problemas

nota

Alguns códigos de erro usam o prefixo INGESTION_GATEWAY_. Esta é uma convenção de nomenclatura legada e não indica que um gateway de ingestão separado seja necessário.

Erro

Causa

Resolução

NOT_IN_DEFAULT_PUBLISHING_MODE

O pipeline não está no Mode de publicação direta.

O Direct Publishing Mode é definido automaticamente para pipelines CDC integrados. Caso veja este erro, recrie o pipeline.

INGESTION_GATEWAY_CDC_NOT_ENABLED

CDC ou acompanhamento de alterações não está habilitado em uma ou mais tabelas de origem.

Habilitar CDC ou acompanhamento de alterações nas tabelas afetadas. Consulte Configurar o Microsoft SQL Server para ingestão no Databricks.

INGESTION_GATEWAY_MISSING_TABLE_IN_SOURCE

A tabela de origem especificada não existe ou foi removida.

Verifique se a tabela existe e que o usuário da conexão tem acesso.

INGESTION_GATEWAY_SOURCE_SCHEMA_MISSING_ENTITY

O esquema de origem não existe.

Verifique se o esquema existe no banco de dados de origem.

UNSUPPORTED_SOURCE_TYPE_FOR_CDC_CONNECTOR

O tipo de banco de dados de origem não é suportado.

O conector CDC integrado suporta SQL Server e Oracle.

SOURCE_TABLE_REQUIRED

A especificação da tabela está ausente source_table.

Adicionar source_table a cada especificação de tabela na matriz objects.

Integrated CDC connector is disabled

O sinalizador de recurso do workspace não está ativado.

Entre em contato com sua equipe de conta da Databricks para habilitar o conector CDC integrado no seu workspace.

Em caso de um problema não abordado aqui:

  1. Revise o log de eventos do pipeline na UI do Databricks ou por meio de GET /api/2.0/pipelines/<pipeline-id>/events.
  2. Teste a conexão do Unity Catalog do Catalog Explorer para confirmar se a origem está acessível.
  3. Confirme se o acompanhamento de alterações ou o CDC está habilitado no banco de dados e nas tabelas de origem.
  4. Verifique se o usuário do banco de dados tem as permissões do SQL Server listadas em requisitos de usuário do banco de dados do Microsoft SQL Server.
  5. Verifique se a especificação do seu pipeline inclui "channel": "PREVIEW".

Recursos adicionais