Pular para o conteúdo principal

Crie um pipeline de CDC integrado para MySQL

info

Beta

Este recurso está em Beta. Os administradores de workspace podem controlar o acesso a este recurso na página **Pré-visualizações**. Consulte Gerenciar prévias do Databricks.

Um pipeline CDC integrado ingere dados de alteração do MySQL para o Databricks usando um único pipeline. Ao contrário da arquitetura padrão baseada em gateway, um pipeline CDC integrado realiza a execução das etapas de extração e aplicação em uma atualização de pipeline. A arquitetura padrão requer um gateway de ingestão e um pipeline de ingestão separados.

Quando usar o conector CDC integrado

A tabela a seguir compara pipelines CDC integrados com a arquitetura padrão baseada em gateway:

Recurso

CDC padrão (baseado em gateway)

CDC integrado

Número de pipelines

Dois (gateway de ingestão e pipeline de ingestão)

Um (pipeline unificado)

Configuração

Crie um gateway e, em seguida, crie um pipeline de ingestão que faz referência ao ID do gateway

Crie um pipeline único que faça referência a uma conexão do Unity Catalog

Modo gateway

O gateway fica em execução continuamente como um processo separado de longa duração.

A extração está incorporada em cada atualização de pipeline programada

Referência de conexão

ingestion_gateway_id

connection_name (uma conexão do Unity Catalog)

Tipo de conector

Comportamento default implícito do CDC

Explícito: connector_type: CDC

Volume de preparação

Gerenciado internamente pelo gateway

Criado automaticamente no esquema de destino ou configurado via data_staging_options

Modo do pipeline

Contínuo

Acionado apenas

Compute

Clássico para o gateway, serverless para o pipeline de ingestão gerenciado

Apenas compute clássico. Serverless não é compatível.

Refresh completo automático

Não é compatível com fluxos existentes do MySQL baseados em gateway

Suportado

Máximo de tabelas

250 por pipeline

250 por pipeline

SCD Tipo 2

Não suportado

Não suportado

Autenticação

Nome de usuário/senha

Nome de usuário/senha

Para a configuração do banco de dados de origem, consulte Configurar o MySQL para ingestão no Databricks. A mesma configuração de origem aplica-se a ambas as arquiteturas.

Como é a execução de um pipeline de CDC integrado

Cada atualização de pipeline realiza a execução de dois estágios em sequência:

  1. **Extração.** O pipeline se conecta ao banco de dados de origem usando a conexão do Unity Catalog. Na primeira execução ou em um refresh completo, ele captura um Snapshot inicial. Em execuções subsequentes, ele captura mudanças incrementais (inserções, atualizações e exclusões) usando o log binário (binlog). O pipeline grava a extração de dados em um volume de preparo do Unity Catalog.
  2. Aplicação. O pipeline lê do volume de preparação e aplica alterações às tabelas de transmissão de destino no Unity Catalog. As operações de merge usam as chaves primárias configuradas e o tipo de SCD. O pipeline garante a semântica exatamente uma vez.

Durante o período Beta, cada atualização de pipeline tem um runtime máximo de aproximadamente 30 minutos. Se a fonte tiver mais alterações do que uma única atualização pode processar, a próxima atualização agendada continua de onde a anterior parou. Para ingerir dados de forma recorrente, programe o pipeline usando uma tarefa do Lakeflow Jobs.

Requisitos

  • O workspace está habilitado para o Unity Catalog.

  • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION no metastore. Consulte Gerenciar privilégios no Unity Catalog.

    Se o conector suportar a autoria de pipeline baseada em UI, pode criar a conexão e o pipeline ao mesmo tempo, concluindo os passos nesta página. No entanto, se usar a autoria de pipeline baseada em API, deve criar a conexão no Catalog Explorer antes de concluir os passos nesta página. Consulte Conectar-se a fontes de ingestão gerenciadas.

  • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

  • Você tem privilégios USE CATALOG no catálogo de destino.

  • É preciso ter os privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou os privilégios CREATE SCHEMA no catálogo de destino.

  • Seu workspace deve ter o recurso de conector CDC integrado habilitado. Entre em contato com sua equipe de conta da Databricks.

  • Você tem acesso à instância **primária** do MySQL. O conector CDC integrado não oferece suporte a réplicas de leitura.

  • Habilite o registro em log binário no banco de dados de origem com binlog_format=ROW e binlog_row_image=FULL.

  • A configuração da origem MySQL foi concluída. Consulte Configurar MySQL para ingestão no Databricks.

  • Você tem as seguintes permissões:

    • CREATE CONNECTION no metastore (se estiver criando uma nova conexão do Unity Catalog), ou USE CONNECTION em uma conexão existente.
    • USE CATALOG no catálogo de destino.
    • USE SCHEMA e CREATE TABLE no esquema de destino.
    • CREATE VOLUME no esquema de destino, ou no esquema especificado em data_staging_options. Um volume de staging é necessário mesmo que data_staging_options não esteja definido, porque o pipeline autocria um no esquema de destino.

Requisitos de compute

Pipelines CDC integrados para MySQL exigem compute clássico. Compute serverless não é compatível.

  • Compute clássico : O plano de compute é executado na VPC ou VNet do seu workspace Databricks e deve acessar sua instância MySQL pela rede. Os caminhos de rede suportados incluem emparelhamento de VPC ou VNet, endpoints públicos e, para MySQL on-premises, AWS Direct Connect, Azure ExpressRoute ou VPN.

Para compute clássico, use permissões de criação irrestrita de clusters ou uma política de cluster personalizada com cluster_type fixo em dlt e runtime_engine fixo em STANDARD. A Databricks recomenda pelo menos 8 núcleos para uma extração eficiente.

Criar uma conexão do Unity Catalog com o MySQL

Criar uma conexão Unity Catalog ao MySQL antes de criar um pipeline. Consulte Criar uma conexão MySQL.

Criar um pipeline CDC integrado

Crie pipelines de CDC integrados usando a API, a CLI do Databricks, notebooks ou Pacotes de Automação Declarativos. A criação na interface do usuário ainda não está disponível.

importante

Todas as solicitações de criação de pipeline devem incluir "channel": "PREVIEW".

Defina o recurso de pipeline em um arquivo de pacote (por exemplo, resources/integrated_cdc_pipeline.yml):

YAML
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'

resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_schema: 'my_database'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'

Para a execução do pipeline em um programar, defina um Job (por exemplo, resources/integrated_cdc_job.yml) que aciona o pipeline. Como cada estágio de extração tem uma execução de pelo menos 10 minutos, um intervalo de 60 minutos ou mais é um bom ponto de partida:

YAML
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'

Implante o pacote com a CLI do Databricks:

Shell
databricks bundle deploy
databricks bundle run integrated_cdc_job

Para obter mais informações, consulte O que são Pacotes de Automação Declarativa?.

Programar atualizações recorrentes

Pipelines de CDC integrados funcionam apenas em modo de execução acionada. Para ingerir dados em um agendamento recorrente, crie uma tarefa do Lakeflow Jobs que execute o pipeline. Cada atualização é executada por aproximadamente 30 minutos e pode não concluir o processamento das pendências completas de alterações em uma única atualização. Programe pipelines em intervalos de 60 minutos ou com mais frequência para que as atualizações subsequentes sejam sincronizadas. Se um gatilho é acionado enquanto uma atualização anterior ainda está em execução, a nova atualização é enfileirada.

Referência de configuração

Parâmetros do pipeline

Parâmetro

Tipo

Descrição

name

string

Um nome para o pipeline.

pipeline_type

string

Deve ser MANAGED_INGESTION.

channel

string

Deve ser PREVIEW.

serverless

Booleana

Deve ser false para pipelines de CDC integrados do MySQL. Compute serverless não é compatível.

catalog

string

O catálogo de destino default. Usado quando destination_catalog por tabela não é especificado.

schema

string

O esquema de destino default. Usado quando um destination_schema por tabela não é especificado.

ingestion_definition.connection_name

string

A conexão Unity Catalog ao banco de dados de origem.

ingestion_definition.connector_type

string

Deve ser CDC.

ingestion_definition.objects

matriz

A lista de tabelas ou esquemas para ingerir.

ingestion_definition.data_staging_options

objeto

Opcional. O catálogo e o esquema onde o pipeline cria o volume de preparação. Usa o esquema de destino do pipeline por default.

Especificação de tabela

Parâmetro

Obrigatório

Descrição

source_schema

Sim

O nome do banco de dados MySQL de origem.

source_table

Sim

O nome da tabela de origem.

destination_catalog

Não

O catálogo de destino. Usa como default o catalog do pipeline.

destination_schema

Não

Esquema de destino. default para schema do pipeline.

destination_table

Não

O nome da tabela de destino. O default é source_table.

Configuração da tabela

Parâmetro

Padrão

Descrição

primary_keys

Autodetectado

As colunas que identificam cada linha. Autodetectado a partir da key primária da fonte, se não especificado.

scd_type

SCD_TYPE_1

SCD_TYPE_1 mantém somente a versão mais recente. SCD Tipo 2 não é suportado para pipelines CDC integrados do MySQL.

sequence_by

Autodetectado

As colunas usadas para ordenar eventos de CDC. Detectado automaticamente com base no mecanismo CDC de origem, se não for especificado.

auto_full_refresh_policy

Desativada

Configura o refresh automático completo quando operações DDL não suportadas são detectadas. Consulte política de refresh automático completo.

Para mapeamentos de tipo de dados do MySQL, consulte referência do conector MySQL. Pipelines CDC integrados suportam o alargamento automático de tipo: quando um tipo de coluna de origem é alargado (por exemplo, INT para BIGINT), a tabela de destino se adapta automaticamente.

Monitorar o pipeline

Depois de criar e começar um pipeline do CDC integrado, monitore seu status usando o seguinte:

  • **UI do Databricks.** Abra o pipeline na seção **Pipelines** para view o status da atualização, métricas de ingestão por tabela e linhagem.

  • API REST.

    Text
    GET /api/2.0/pipelines/<pipeline-id>
  • API de Eventos.

    Text
    GET /api/2.0/pipelines/<pipeline-id>/events

A primeira execução do pipeline executa um Snapshot completo de todas as tabelas selecionadas, o que pode levar mais tempo do que as atualizações incrementais. Para tabelas grandes, o Snapshot inicial pode exigir múltiplas atualizações programadas para ser concluído. Cada atualização subsequente retoma de onde a anterior parou.

Para verificar a ingestão:

SQL
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;

-- View recent changes (SCD Type 1 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;

Para o comportamento de refresh completo e refresh automático completo, consulte Fazer refresh completo de tabelas de destino.

Pipelines de CDC integrados permitem o autoscale vertical por default. Se uma atualização de pipeline falhar devido a uma condição de falta de memória, a próxima atualização provisiona automaticamente um driver maior. Para substituir este comportamento, utilize uma política de cluster personalizada.

Limitações

  • Beta. O conector CDC integrado requer habilitação em nível de workspace. Entre em contato com sua equipe de conta da Databricks.
  • Somente modo acionado. Pipelines CDC integrados não dão suporte à execução contínua (sempre ativa). Programe pipelines usando uma tarefa de Jobs do Lakeflow.
  • **Criação somente por API.** A criação de pipelines está disponível por meio da API REST, da CLI do Databricks, de Notebooks e de Bundles de Automação Declarativa. A criação por UI ainda não é suportada.
  • O canal deve ser PREVIEW. As especificações do pipeline devem incluir "channel": "PREVIEW".
  • **Tipo de conexão e conector são imutáveis.** connection_name e connector_type não podem ser alterados depois que o pipeline for criado. Para alterar a origem, crie um novo pipeline.
  • Máximo de 250 tabelas por pipeline.
  • Somente instâncias primárias. O conector CDC integrado não oferece suporte a réplicas de leitura. Conecte-se à instância primária do MySQL.
  • O SCD Tipo 2 não é suportado.
  • Tabelas sem chaves primárias. O pipeline trata todas as colunas não LOB como uma chave composta. Linhas duplicadas podem ser agrupadas em uma única linha.
  • O Snapshot inicial pode abranger várias atualizações. Para tabelas grandes, o Snapshot inicial pode não terminar em uma única atualização. As atualizações programadas subsequentes continuam de onde a atualização anterior parou.
  • Cada atualização tem a duração de aproximadamente 30 minutos de execução. Durante o Beta, o pipeline não processa necessariamente todo o backlog de alterações em uma única atualização. As atualizações agendadas subsequentes retomam o processamento de onde a atualização anterior parou. Não é possível configurar este runtime durante o Beta.
  • A limpeza do binlog requer um refresh completo. Se o log binário do MySQL for limpo antes que o pipeline processe as alterações, execute um refresh completo nas tabelas afetadas. O pipeline detecta essa condição e apresenta um erro no log de eventos.
  • **Compute serverless não é compatível.** Pipelines de CDC integrados do MySQL exigem compute clássico.

Solução de problemas

nota

Alguns códigos de erro usam o prefixo INGESTION_GATEWAY_. Esta é uma convenção de nomenclatura legada e não indica que um gateway de ingestão separado seja necessário.

Erro

Causa

Resolução

NOT_IN_DEFAULT_PUBLISHING_MODE

O pipeline não está no Mode de Publicação Direta.

O Direct Publishing Mode é definido automaticamente para pipelines CDC integrados. Se você vir este erro, recrie o pipeline.

INGESTION_GATEWAY_CDC_NOT_ENABLED

O log binário não está habilitado ou binlog_format não está definido como ROW.

Habilite o log binário com binlog_format=ROW e binlog_row_image=FULL. Consulte Configurar MySQL para ingestão no Databricks.

INGESTION_GATEWAY_MISSING_TABLE_IN_SOURCE

A tabela de origem especificada não existe ou foi descartada.

Verifique se a tabela existe e se o usuário da conexão tem acesso.

INGESTION_GATEWAY_SOURCE_SCHEMA_MISSING_ENTITY

O esquema de origem não existe.

Verifique se o esquema existe no banco de dados de origem.

UNSUPPORTED_SOURCE_TYPE_FOR_CDC_CONNECTOR

O tipo de banco de dados de origem não é compatível.

O conector CDC integrado suporta MySQL, SQL Server e Oracle.

SOURCE_TABLE_REQUIRED

A especificação da tabela está faltando source_table.

Adicionar source_table a cada especificação de tabela na matriz objects.

Integrated CDC connector is disabled

O sinalizador de recurso do workspace não está ativado.

Entre em contato com sua equipe de account da Databricks para habilitar o conector CDC integrado em seu workspace.

Se você encontrar um problema não abordado aqui:

  1. Analise o registro de eventos do pipeline na interface de usuário do Databricks ou por meio de GET /api/2.0/pipelines/<pipeline-id>/events.
  2. Teste a conexão do Unity Catalog do Catalog Explorer para confirmar que a origem é acessível.
  3. Confirme se o log binário está habilitado no banco de dados de origem com binlog_format=ROW e binlog_row_image=FULL.
  4. Verifique se o usuário do banco de dados tem as permissões MySQL listadas em Conceder privilégios de usuário MySQL.
  5. Verifique se a especificação do seu pipeline inclui "channel": "PREVIEW".

Recursos adicionais