Crie um pipeline de ingestão MySQL.
Visualização
O conector MySQL está em versão prévia pública. Entre em contato com a equipe da sua account Databricks para solicitar acesso.
Aprenda como importar dados do MySQL para Databricks usando LakeFlow Connect. O conector MySQL oferece suporte Amazon RDS para MySQL, Aurora MySQL, Banco de Dados Azure para MySQL, Google Cloud SQL para MySQL e MySQL em execução no EC2.
Antes de começar
Para criar um gateway de ingestão e um pipeline de ingestão, você deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
compute sem servidor está habilitado para seu workspace. Consulte os requisitos compute sem servidor.
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTIONno metastore.Se o seu conector suportar a criação pipeline baseada em interface de usuário, você poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se você usar a criação pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir os passos desta página. Consulte Conectar para gerenciar fontes de ingestão.
-
Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTIONouALL PRIVILEGESna conexão. -
Você tem privilégios
USE CATALOGno catálogo de destino. -
Você tem privilégios
USE SCHEMA,CREATE TABLEeCREATE VOLUMEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino. -
Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:
-
Família: Computação Job
-
Substituições de família de políticas:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
A seguinte política compute permite que Databricks aumente a capacidade do gateway de ingestão para atender às necessidades da sua carga de trabalho. O requisito mínimo é de 4 núcleos. No entanto, para um melhor desempenho na extração de snapshots, Databricks recomenda o uso de instâncias maiores com mais memória e núcleos de CPU.
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "n2-highmem-8"
},
"node_type_id": {
"type": "fixed",
"value": "n2-standard-4"
}
}
Para obter mais informações sobre política de cluster, consulte Selecionar uma política compute.
-
Para importar dados do MySQL, você também precisa concluir a configuração da fonte.
Opção 1: Interface do usuário do Databricks
Os usuários administradores podem criar uma conexão e um pipeline simultaneamente na interface do usuário. Esta é a maneira mais simples de criar um pipeline de ingestão gerencial.
-
Na barra lateral do workspace do Databricks , clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em MySQL . O assistente de ingestão é aberto.
-
Na página do gateway de ingestão do assistente, insira um nome exclusivo para o gateway.
-
Selecione um catálogo e um esquema para os dados de ingestão de preparação e clique em Avançar .
-
Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.
-
Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.
-
Selecione a conexão Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.
Caso não existam conexões com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da origem. Você deve ter privilégios
CREATE CONNECTIONno metastore.
O botão Testar Conexão pode falhar para usuários do MySQL que utilizam autenticação sha256_password ou caching_sha2_password . Essa é uma limitação conhecida. Você ainda pode prosseguir com a criação da conexão.
-
Clique em Criar pipeline e continue .
-
Na página Origem , selecione os bancos de dados e tabelas que deseja importar.
-
Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).
-
Clique em Avançar .
-
Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.
Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios
USE CATALOGeCREATE SCHEMAno catálogo pai. -
Clique em Salvar e continuar .
-
(Opcional) Na página Configurações , clique em Criar programar . Defina a frequência de refresh das tabelas de destino.
-
(Opcional) Configure notificações email para operações pipeline bem-sucedidas ou com falha.
-
Clique em Salvar e pipelinede execução .
Opção 2: Interfaces programáticas
Antes de realizar a ingestão de dados usando Databricks Ativo Bundles, Databricks APIs, Databricks SDKs, Databricks CLI ou Terraform, você precisa ter acesso a uma conexão existente Unity Catalog . Para obter instruções, consulte Conectar-se às fontes de ingestão de gerenciamento.
Crie o catálogo e o esquema de preparação.
O catálogo e o esquema de preparação podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de encenação não pode ser um catálogo estrangeiro.
export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "MYSQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Crie o gateway e o pipeline de ingestão.
O gateway de ingestão extrai dados de snapshot e de alterações do banco de dados de origem e os armazena em um volume de preparação Unity Catalog . Você deve executar o gateway como um pipeline contínuo. Isso permite a implementação de políticas de retenção de binlog no banco de dados de origem.
O pipeline de ingestão aplica os dados de snapshot e de alteração do volume de preparação às tabelas de transmissão de destino.
Databricks Asset Bundles
Esta tab descreve como implantar um pipeline de ingestão usando Databricks Ativo Bundles. Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte PacotesDatabricks ativos.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init -
Adicione dois novos arquivos de recursos ao pacote:
- Um arquivo de definição de pipeline (
resources/mysql_pipeline.yml). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/mysql_job.yml).
Segue abaixo um exemplo de arquivo
resources/mysql_pipeline.yml:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: mysql-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <mysql-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_mysql:
name: mysql-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table mydb.customers to dest_catalog.dest_schema.customers
source_schema: public
source_table: customers
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema
# The destination table name will be the same as it is on the source
source_schema: sales
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}Segue abaixo um exemplo de arquivo
resources/mysql_job.yml:YAMLresources:
jobs:
mysql_dab_job:
name: mysql_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_mysql.id} - Um arquivo de definição de pipeline (
-
Implante o pipeline usando a CLI Databricks :
Bashdatabricks bundle deploy
Notebook Databricks
Atualize a célula Configuration no seguinte Notebook com a conexão de origem, catálogo de destino, esquema de destino e tabelas a serem ingeridas da origem.
Create gateway and ingestion pipeline
CLI do Databricks
Para criar o gateway:
export GATEWAY_PIPELINE_NAME="mysql-gateway"
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
Para criar o pipeline de ingestão:
export INGESTION_PIPELINE_NAME="mysql-ingestion-pipeline"
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_schema": "public",
"source_table": "customers",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "customers"
}},
{"schema": {
"source_schema": "sales",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
Terraform
Você pode usar Terraform para implantar e gerenciar o pipeline de ingestão MySQL . Para um framework de exemplo completo, incluindo configurações Terraform para criação de gateways e pipeline de ingestão, consulte os exemplos Terraform LakeFlow Connect catalogados no GitHub.