Pular para o conteúdo principal

Crie um pipeline de ingestão MySQL.

info

Visualização

O conector MySQL está em versão prévia pública. Entre em contato com a equipe da sua account Databricks para solicitar acesso.

Aprenda como importar dados do MySQL para Databricks usando LakeFlow Connect. O conector MySQL oferece suporte Amazon RDS para MySQL, Aurora MySQL, Banco de Dados Azure para MySQL, Google Cloud SQL para MySQL e MySQL em execução no EC2.

Antes de começar

Para criar um gateway de ingestão e um pipeline de ingestão, você deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.

  • compute sem servidor está habilitado para seu workspace. Consulte os requisitos compute sem servidor.

  • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION no metastore.

    Se o seu conector suportar a criação pipeline baseada em interface de usuário, você poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se você usar a criação pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir os passos desta página. Consulte Conectar para gerenciar fontes de ingestão.

  • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

  • Você tem privilégios USE CATALOG no catálogo de destino.

  • Você tem privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

  • Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:

    • Família: Computação Job

    • Substituições de família de políticas:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • A seguinte política compute permite que Databricks aumente a capacidade do gateway de ingestão para atender às necessidades da sua carga de trabalho. O requisito mínimo é de 4 núcleos. No entanto, para um melhor desempenho na extração de snapshots, Databricks recomenda o uso de instâncias maiores com mais memória e núcleos de CPU.

      Python
      {
      "driver_node_type_id": {
      "type": "fixed",
      "value": "r5n.2xlarge"
      },
      "node_type_id": {
      "type": "fixed",
      "value": "m5n.large"
      }
      }

    Para obter mais informações sobre política de cluster, consulte Selecionar uma política compute.

Para importar dados do MySQL, você também precisa concluir a configuração da fonte.

Opção 1: Interface do usuário do Databricks

Os usuários administradores podem criar uma conexão e um pipeline simultaneamente na interface do usuário. Esta é a maneira mais simples de criar um pipeline de ingestão gerencial.

  1. Na barra lateral do workspace do Databricks , clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em MySQL . O assistente de ingestão é aberto.

  3. Na página do gateway de ingestão do assistente, insira um nome exclusivo para o gateway.

  4. Selecione um catálogo e um esquema para os dados de ingestão de preparação e clique em Avançar .

  5. Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.

  6. Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.

  7. Selecione a conexão Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.

    Caso não existam conexões com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da origem. Você deve ter privilégios CREATE CONNECTION no metastore.

nota

O botão Testar Conexão pode falhar para usuários do MySQL que utilizam autenticação sha256_password ou caching_sha2_password . Essa é uma limitação conhecida. Você ainda pode prosseguir com a criação da conexão.

  1. Clique em Criar pipeline e continue .

  2. Na página Origem , selecione os bancos de dados e tabelas que deseja importar.

  3. Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).

  4. Clique em Avançar .

  5. Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.

    Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo pai.

  6. Clique em Salvar e continuar .

  7. (Opcional) Na página Configurações , clique em Criar programar . Defina a frequência de refresh das tabelas de destino.

  8. (Opcional) Configure notificações email para operações pipeline bem-sucedidas ou com falha.

  9. Clique em Salvar e pipelinede execução .

Opção 2: Interfaces programáticas

Antes de realizar a ingestão de dados usando Databricks Ativo Bundles, Databricks APIs, Databricks SDKs ou Databricks CLI, você precisa ter acesso a uma conexão existente Unity Catalog . Para obter instruções, consulte Conectar-se às fontes de ingestão de gerenciamento.

Crie o catálogo e o esquema de preparação.

O catálogo e o esquema de preparação podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de encenação não pode ser um catálogo estrangeiro.

Bash
export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"

output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "MYSQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Crie o gateway e o pipeline de ingestão.

O gateway de ingestão extrai dados de snapshot e de alterações do banco de dados de origem e os armazena em um volume de preparação Unity Catalog . Você deve executar o gateway como um pipeline contínuo. Isso permite a implementação de políticas de retenção de binlog no banco de dados de origem.

O pipeline de ingestão aplica os dados de snapshot e de alteração do volume de preparação às tabelas de transmissão de destino.

Esta tab descreve como implantar um pipeline de ingestão usando Databricks Ativo Bundles. Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte PacotesDatabricks ativos.

  1. Crie um novo pacote usando a CLI do Databricks:

    Bash
    databricks bundle init
  2. Adicione dois novos arquivos de recursos ao pacote:

    • Um arquivo de definição de pipeline (resources/mysql_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (resources/mysql_job.yml).

    Segue abaixo um exemplo de arquivo resources/mysql_pipeline.yml :

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: mysql-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <mysql-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    pipeline_mysql:
    name: mysql-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table mydb.customers to dest_catalog.dest_schema.customers
    source_schema: public
    source_table: customers
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema
    # The destination table name will be the same as it is on the source
    source_schema: sales
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    Segue abaixo um exemplo de arquivo resources/mysql_job.yml :

    YAML
    resources:
    jobs:
    mysql_dab_job:
    name: mysql_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_mysql.id}
  3. Implante o pipeline usando a CLI Databricks :

    Bash
    databricks bundle deploy

Recursos adicionais