Crie um pipeline de CDC integrado para MySQL
Beta
Este recurso está em Beta. Os administradores de workspace podem controlar o acesso a este recurso na página **Pré-visualizações**. Consulte Gerenciar prévias do Databricks.
Um pipeline CDC integrado ingere dados de alteração do MySQL para o Databricks usando um único pipeline. Ao contrário da arquitetura padrão baseada em gateway, um pipeline CDC integrado realiza a execução das etapas de extração e aplicação em uma atualização de pipeline. A arquitetura padrão requer um gateway de ingestão e um pipeline de ingestão separados.
Quando usar o conector CDC integrado
A tabela a seguir compara pipelines CDC integrados com a arquitetura padrão baseada em gateway:
Recurso | CDC padrão (baseado em gateway) | CDC integrado |
|---|---|---|
Número de pipelines | Dois (gateway de ingestão e pipeline de ingestão) | Um (pipeline unificado) |
Configuração | Crie um gateway e, em seguida, crie um pipeline de ingestão que faz referência ao ID do gateway | Crie um pipeline único que faça referência a uma conexão do Unity Catalog |
Modo gateway | O gateway fica em execução continuamente como um processo separado de longa duração. | A extração está incorporada em cada atualização de pipeline programada |
Referência de conexão |
|
|
Tipo de conector | Comportamento default implícito do CDC | Explícito: |
Volume de preparação | Gerenciado internamente pelo gateway | Criado automaticamente no esquema de destino ou configurado via |
Modo do pipeline | Contínuo | Acionado apenas |
Compute | Clássico para o gateway, serverless para o pipeline de ingestão gerenciado | Apenas compute clássico. Serverless não é compatível. |
Refresh completo automático | Não é compatível com fluxos existentes do MySQL baseados em gateway | Suportado |
Máximo de tabelas | 250 por pipeline | 250 por pipeline |
SCD Tipo 2 | Não suportado | Não suportado |
Autenticação | Nome de usuário/senha | Nome de usuário/senha |
Para a configuração do banco de dados de origem, consulte Configurar o MySQL para ingestão no Databricks. A mesma configuração de origem aplica-se a ambas as arquiteturas.
Como é a execução de um pipeline de CDC integrado
Cada atualização de pipeline realiza a execução de dois estágios em sequência:
- **Extração.** O pipeline se conecta ao banco de dados de origem usando a conexão do Unity Catalog. Na primeira execução ou em um refresh completo, ele captura um Snapshot inicial. Em execuções subsequentes, ele captura mudanças incrementais (inserções, atualizações e exclusões) usando o log binário (binlog). O pipeline grava a extração de dados em um volume de preparo do Unity Catalog.
- Aplicação. O pipeline lê do volume de preparação e aplica alterações às tabelas de transmissão de destino no Unity Catalog. As operações de merge usam as chaves primárias configuradas e o tipo de SCD. O pipeline garante a semântica exatamente uma vez.
Durante o período Beta, cada atualização de pipeline tem um runtime máximo de aproximadamente 30 minutos. Se a fonte tiver mais alterações do que uma única atualização pode processar, a próxima atualização agendada continua de onde a anterior parou. Para ingerir dados de forma recorrente, programe o pipeline usando uma tarefa do Lakeflow Jobs.
Requisitos
-
O workspace está habilitado para o Unity Catalog.
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTIONno metastore. Consulte Gerenciar privilégios no Unity Catalog.Se o conector suportar a autoria de pipeline baseada em UI, pode criar a conexão e o pipeline ao mesmo tempo, concluindo os passos nesta página. No entanto, se usar a autoria de pipeline baseada em API, deve criar a conexão no Catalog Explorer antes de concluir os passos nesta página. Consulte Conectar-se a fontes de ingestão gerenciadas.
-
Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTIONouALL PRIVILEGESna conexão. -
Você tem privilégios
USE CATALOGno catálogo de destino. -
É preciso ter os privilégios
USE SCHEMA,CREATE TABLEeCREATE VOLUMEem um esquema existente ou os privilégiosCREATE SCHEMAno catálogo de destino. -
Seu workspace deve ter o recurso de conector CDC integrado habilitado. Entre em contato com sua equipe de conta da Databricks.
-
Você tem acesso à instância **primária** do MySQL. O conector CDC integrado não oferece suporte a réplicas de leitura.
-
Habilite o registro em log binário no banco de dados de origem com
binlog_format=ROWebinlog_row_image=FULL. -
A configuração da origem MySQL foi concluída. Consulte Configurar MySQL para ingestão no Databricks.
-
Você tem as seguintes permissões:
CREATE CONNECTIONno metastore (se estiver criando uma nova conexão do Unity Catalog), ouUSE CONNECTIONem uma conexão existente.USE CATALOGno catálogo de destino.USE SCHEMAeCREATE TABLEno esquema de destino.CREATE VOLUMEno esquema de destino, ou no esquema especificado emdata_staging_options. Um volume de staging é necessário mesmo quedata_staging_optionsnão esteja definido, porque o pipeline autocria um no esquema de destino.
Requisitos de compute
Pipelines CDC integrados para MySQL exigem compute clássico. Compute serverless não é compatível.
- Compute clássico : O plano de compute é executado na VPC ou VNet do seu workspace Databricks e deve acessar sua instância MySQL pela rede. Os caminhos de rede suportados incluem emparelhamento de VPC ou VNet, endpoints públicos e, para MySQL on-premises, AWS Direct Connect, Azure ExpressRoute ou VPN.
Para compute clássico, use permissões de criação irrestrita de clusters ou uma política de cluster personalizada com cluster_type fixo em dlt e runtime_engine fixo em STANDARD. A Databricks recomenda pelo menos 8 núcleos para uma extração eficiente.
Criar uma conexão do Unity Catalog com o MySQL
Criar uma conexão Unity Catalog ao MySQL antes de criar um pipeline. Consulte Criar uma conexão MySQL.
Criar um pipeline CDC integrado
Crie pipelines de CDC integrados usando a API, a CLI do Databricks, notebooks ou Pacotes de Automação Declarativos. A criação na interface do usuário ainda não está disponível.
Todas as solicitações de criação de pipeline devem incluir "channel": "PREVIEW".
- Declarative Automation Bundles
- Databricks notebook
- Databricks CLI
- REST API
Defina o recurso de pipeline em um arquivo de pacote (por exemplo, resources/integrated_cdc_pipeline.yml):
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'
resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_schema: 'my_database'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'
Para a execução do pipeline em um programar, defina um Job (por exemplo, resources/integrated_cdc_job.yml) que aciona o pipeline. Como cada estágio de extração tem uma execução de pelo menos 10 minutos, um intervalo de 60 minutos ou mais é um bom ponto de partida:
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'
Implante o pacote com a CLI do Databricks:
databricks bundle deploy
databricks bundle run integrated_cdc_job
Para obter mais informações, consulte O que são Pacotes de Automação Declarativa?.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
pipeline = w.pipelines.create(
name="<pipeline-name>",
pipeline_type="MANAGED_INGESTION",
channel="PREVIEW",
serverless=False,
catalog="<destination-catalog>",
schema="<destination-schema>",
ingestion_definition={
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_schema": "<source-database>",
"source_table": "<source-table>",
}
}
],
},
)
print(f"Pipeline created: {pipeline.pipeline_id}")
databricks pipelines create --json '{
"name": "<pipeline-name>",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_schema": "<source-database>",
"source_table": "<source-table>"
}
}
]
}
}'
O exemplo a seguir replica duas tabelas de um banco de dados MySQL. Ambos herdam o destino de nível superior main.ingestion. "serverless": false é obrigatório para pipelines de CDC integrados do MySQL.
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-mysql-connection",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_schema": "my_database",
"source_table": "customers",
"table_configuration": {
"primary_keys": ["customer_id"],
"scd_type": "SCD_TYPE_1"
}
}
},
{
"table": {
"source_schema": "my_database",
"source_table": "orders",
"table_configuration": {
"primary_keys": ["order_id"],
"scd_type": "SCD_TYPE_1"
}
}
}
],
"data_staging_options": {
"catalog_name": "main",
"schema_name": "ingestion_staging"
}
}
}
Para replicar todas as tabelas em um banco de dados de origem, use um objeto schema em vez de objetos table individuais:
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-schema-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-mysql-connection",
"connector_type": "CDC",
"objects": [
{
"schema": {
"source_schema": "my_database",
"destination_catalog": "main",
"destination_schema": "ingestion"
}
}
]
}
}
Para começar uma atualização de pipeline:
POST /api/2.0/pipelines/<pipeline-id>/updates
{
"full_refresh": false
}
Programar atualizações recorrentes
Pipelines de CDC integrados funcionam apenas em modo de execução acionada. Para ingerir dados em um agendamento recorrente, crie uma tarefa do Lakeflow Jobs que execute o pipeline. Cada atualização é executada por aproximadamente 30 minutos e pode não concluir o processamento das pendências completas de alterações em uma única atualização. Programe pipelines em intervalos de 60 minutos ou com mais frequência para que as atualizações subsequentes sejam sincronizadas. Se um gatilho é acionado enquanto uma atualização anterior ainda está em execução, a nova atualização é enfileirada.
Referência de configuração
Parâmetros do pipeline
Parâmetro | Tipo | Descrição |
|---|---|---|
| string | Um nome para o pipeline. |
| string | Deve ser |
| string | Deve ser |
| Booleana | Deve ser |
| string | O catálogo de destino default. Usado quando |
| string | O esquema de destino default. Usado quando um |
| string | A conexão Unity Catalog ao banco de dados de origem. |
| string | Deve ser |
| matriz | A lista de tabelas ou esquemas para ingerir. |
| objeto | Opcional. O catálogo e o esquema onde o pipeline cria o volume de preparação. Usa o esquema de destino do pipeline por default. |
Especificação de tabela
Parâmetro | Obrigatório | Descrição |
|---|---|---|
| Sim | O nome do banco de dados MySQL de origem. |
| Sim | O nome da tabela de origem. |
| Não | O catálogo de destino. Usa como default o |
| Não | Esquema de destino. default para |
| Não | O nome da tabela de destino. O default é |
Configuração da tabela
Parâmetro | Padrão | Descrição |
|---|---|---|
| Autodetectado | As colunas que identificam cada linha. Autodetectado a partir da key primária da fonte, se não especificado. |
|
|
|
| Autodetectado | As colunas usadas para ordenar eventos de CDC. Detectado automaticamente com base no mecanismo CDC de origem, se não for especificado. |
| Desativada | Configura o refresh automático completo quando operações DDL não suportadas são detectadas. Consulte política de refresh automático completo. |
Para mapeamentos de tipo de dados do MySQL, consulte referência do conector MySQL. Pipelines CDC integrados suportam o alargamento automático de tipo: quando um tipo de coluna de origem é alargado (por exemplo, INT para BIGINT), a tabela de destino se adapta automaticamente.
Monitorar o pipeline
Depois de criar e começar um pipeline do CDC integrado, monitore seu status usando o seguinte:
-
**UI do Databricks.** Abra o pipeline na seção **Pipelines** para view o status da atualização, métricas de ingestão por tabela e linhagem.
-
API REST.
TextGET /api/2.0/pipelines/<pipeline-id> -
API de Eventos.
TextGET /api/2.0/pipelines/<pipeline-id>/events
A primeira execução do pipeline executa um Snapshot completo de todas as tabelas selecionadas, o que pode levar mais tempo do que as atualizações incrementais. Para tabelas grandes, o Snapshot inicial pode exigir múltiplas atualizações programadas para ser concluído. Cada atualização subsequente retoma de onde a anterior parou.
Para verificar a ingestão:
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;
-- View recent changes (SCD Type 1 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;
Para o comportamento de refresh completo e refresh automático completo, consulte Fazer refresh completo de tabelas de destino.
Pipelines de CDC integrados permitem o autoscale vertical por default. Se uma atualização de pipeline falhar devido a uma condição de falta de memória, a próxima atualização provisiona automaticamente um driver maior. Para substituir este comportamento, utilize uma política de cluster personalizada.
Limitações
- Beta. O conector CDC integrado requer habilitação em nível de workspace. Entre em contato com sua equipe de conta da Databricks.
- Somente modo acionado. Pipelines CDC integrados não dão suporte à execução contínua (sempre ativa). Programe pipelines usando uma tarefa de Jobs do Lakeflow.
- **Criação somente por API.** A criação de pipelines está disponível por meio da API REST, da CLI do Databricks, de Notebooks e de Bundles de Automação Declarativa. A criação por UI ainda não é suportada.
- O canal deve ser
PREVIEW. As especificações do pipeline devem incluir"channel": "PREVIEW". - **Tipo de conexão e conector são imutáveis.**
connection_nameeconnector_typenão podem ser alterados depois que o pipeline for criado. Para alterar a origem, crie um novo pipeline. - Máximo de 250 tabelas por pipeline.
- Somente instâncias primárias. O conector CDC integrado não oferece suporte a réplicas de leitura. Conecte-se à instância primária do MySQL.
- O SCD Tipo 2 não é suportado.
- Tabelas sem chaves primárias. O pipeline trata todas as colunas não LOB como uma chave composta. Linhas duplicadas podem ser agrupadas em uma única linha.
- O Snapshot inicial pode abranger várias atualizações. Para tabelas grandes, o Snapshot inicial pode não terminar em uma única atualização. As atualizações programadas subsequentes continuam de onde a atualização anterior parou.
- Cada atualização tem a duração de aproximadamente 30 minutos de execução. Durante o Beta, o pipeline não processa necessariamente todo o backlog de alterações em uma única atualização. As atualizações agendadas subsequentes retomam o processamento de onde a atualização anterior parou. Não é possível configurar este runtime durante o Beta.
- A limpeza do binlog requer um refresh completo. Se o log binário do MySQL for limpo antes que o pipeline processe as alterações, execute um refresh completo nas tabelas afetadas. O pipeline detecta essa condição e apresenta um erro no log de eventos.
- **Compute serverless não é compatível.** Pipelines de CDC integrados do MySQL exigem compute clássico.
Solução de problemas
Alguns códigos de erro usam o prefixo INGESTION_GATEWAY_. Esta é uma convenção de nomenclatura legada e não indica que um gateway de ingestão separado seja necessário.
Erro | Causa | Resolução |
|---|---|---|
| O pipeline não está no Mode de Publicação Direta. | O Direct Publishing Mode é definido automaticamente para pipelines CDC integrados. Se você vir este erro, recrie o pipeline. |
| O log binário não está habilitado ou | Habilite o log binário com |
| A tabela de origem especificada não existe ou foi descartada. | Verifique se a tabela existe e se o usuário da conexão tem acesso. |
| O esquema de origem não existe. | Verifique se o esquema existe no banco de dados de origem. |
| O tipo de banco de dados de origem não é compatível. | O conector CDC integrado suporta MySQL, SQL Server e Oracle. |
| A especificação da tabela está faltando | Adicionar |
| O sinalizador de recurso do workspace não está ativado. | Entre em contato com sua equipe de account da Databricks para habilitar o conector CDC integrado em seu workspace. |
Se você encontrar um problema não abordado aqui:
- Analise o registro de eventos do pipeline na interface de usuário do Databricks ou por meio de
GET /api/2.0/pipelines/<pipeline-id>/events. - Teste a conexão do Unity Catalog do Catalog Explorer para confirmar que a origem é acessível.
- Confirme se o log binário está habilitado no banco de dados de origem com
binlog_format=ROWebinlog_row_image=FULL. - Verifique se o usuário do banco de dados tem as permissões MySQL listadas em Conceder privilégios de usuário MySQL.
- Verifique se a especificação do seu pipeline inclui
"channel": "PREVIEW".