Pular para o conteúdo principal

Ingerir dados do PostgreSQL

info

Visualização

O conector PostgreSQL para LakeFlow Connect está em versão prévia pública. Entre em contato com a equipe da sua account Databricks para se inscrever na Prévia Pública.

Esta página descreve como ingerir dados do PostgreSQL e carregá-los no Databricks usando LakeFlow Connect. O conector PostgreSQL oferece suporte AWS RDS PostgreSQL, Aurora PostgreSQL, Amazon EC2, Banco de Dados Azure para PostgreSQL, máquinas virtuais Azure , Cloud SQL para PostgreSQL GCP e bancos de dados PostgreSQL on-premises usando Azure ExpressRoute, AWS Direct Connect ou redes VPN.

Antes de começar

Para criar um gateway de ingestão e um pipeline de ingestão, você deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.

  • compute sem servidor está habilitado para seu workspace. Consulte os requisitos compute sem servidor.

  • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION no metastore.

    Se o seu conector suportar a criação pipeline baseada em interface de usuário, você poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se você usar a criação pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir os passos desta página. Consulte Conectar para gerenciar fontes de ingestão.

  • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

  • Você tem privilégios USE CATALOG no catálogo de destino.

  • Você tem privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

  • Você tem acesso a uma instância primária do PostgreSQL. A replicação lógica só é suportada em instâncias primárias e não em réplicas de leitura.

  • Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:

    • Família: Computação Job

    • Substituições de família de políticas:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • Databricks recomenda especificar os menores nós worker possíveis para os gateways de ingestão, pois isso não afeta o desempenho do gateway. A seguinte política compute permite que Databricks aumente a capacidade do gateway de ingestão para atender às necessidades da sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração de dados eficiente e de alto desempenho do seu banco de dados de origem.

      Python
      {
      "driver_node_type_id": {
      "type": "fixed",
      "value": "r5n.16xlarge"
      },
      "node_type_id": {
      "type": "fixed",
      "value": "m5n.large"
      }
      }

      Para obter mais informações sobre política de cluster, consulte Selecionar uma política compute.

Para importar dados do PostgreSQL, você também precisa concluir a configuração da fonte.

Opção 1: Interface do usuário do Databricks

nota

O suporte de interface de usuário para PostgreSQL estará disponível em breve. Por enquanto, utilize o fluxo de trabalho do Notebook ou CLI descrito na Opção 2.

Os usuários administradores podem criar uma conexão e um pipeline simultaneamente na interface do usuário do Databricks. Esta é a maneira mais simples de criar um pipeline de ingestão gerencial.

  1. Na barra lateral do workspace do Databricks , clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em PostgreSQL .

    O assistente de ingestão é aberto.

  3. Na página do gateway de ingestão do assistente, insira um nome exclusivo para o gateway.

  4. Selecione um catálogo e um esquema para armazenar os dados de ingestão e clique em Avançar .

  5. Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.

  6. Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.

  7. Selecione a conexão Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.

    Caso não existam conexões com a fonte, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração do PostgreSQL para ingestão no Databricks. Você deve ter privilégios CREATE CONNECTION no metastore.

  8. Clique em Criar pipeline e continue .

  9. Na página Origem , selecione as tabelas que deseja importar.

  10. Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).

  11. Clique em Avançar .

  12. Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.

    Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo pai.

  13. Clique em Salvar e continuar .

  14. Na página Configuração do Banco de Dados , insira o nome do slot de replicação e o nome da publicação para cada banco de dados do qual você deseja ingerir dados. Esses arquivos foram criados durante a configuração do PostgreSQL para ingestão no Databricks.

  15. Clique em Avançar .

  16. (Opcional) Na página Configurações , clique em Criar programar . Defina a frequência de refresh das tabelas de destino.

  17. (Opcional) Configure notificações email para operações pipeline bem-sucedidas ou com falha.

  18. Clique em Salvar e pipelinede execução .

Opção 2: Outras interfaces

Antes de realizar a ingestão de dados usando Databricks Ativo Bundles, Databricks APIs, Databricks SDKs ou Databricks CLI, você precisa ter acesso a uma conexão existente Unity Catalog . Para obter instruções, consulte Conectar-se às fontes de ingestão de gerenciamento.

Crie o catálogo e o esquema de preparação.

O catálogo e o esquema de preparação podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de encenação não pode ser um catálogo estrangeiro.

Bash
export CONNECTION_NAME="my_postgresql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_postgresql_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="postgresql-instance.example.com"
export DB_PORT="5432"
export DB_DATABASE="your_database"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"

output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "POSTGRESQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"database": "'"$DB_DATABASE"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Crie o gateway e o pipeline de ingestão.

O gateway de ingestão extrai dados de snapshot e de alterações do banco de dados de origem e os armazena no volume de preparação Unity Catalog . Você deve executar o gateway como um pipeline contínuo. Isso é fundamental para PostgreSQL evitar o inchaço do log de escrita antecipada (WAL) e garantir que os slots de replicação não acumulem alterações não consumidas.

O pipeline de ingestão aplica os dados de snapshot e de alteração do volume de preparação às tabelas de transmissão de destino.

Esta tab descreve como implantar um pipeline de ingestão usando Databricks Ativo Bundles. Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte PacotesDatabricks ativos.

  1. Crie um novo pacote usando a CLI do Databricks:

    Bash
    databricks bundle init
  2. Adicione dois novos arquivos de recursos ao pacote:

    • Um arquivo de definição de pipeline (resources/postgresql_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (resources/postgresql_job.yml).

    Segue abaixo um exemplo de arquivo resources/postgresql_pipeline.yml :

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: postgresql-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <postgresql-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    pipeline_postgresql:
    name: postgresql-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    source_type: POSTGRESQL
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table public.orders to dest_catalog.dest_schema.orders.
    source_catalog: your_database
    source_schema: public
    source_table: orders
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: your_database
    source_schema: public
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    source_configurations:
    - catalog:
    source_catalog: your_database
    postgres:
    slot_config:
    slot_name: db_slot
    publication_name: db_pub
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    Segue abaixo um exemplo de arquivo resources/postgresql_job.yml :

    YAML
    resources:
    jobs:
    postgresql_dab_job:
    name: postgresql_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_postgresql.id}
  3. Implante o pipeline usando a CLI Databricks :

    Bash
    databricks bundle deploy

começar, programar e definir alerta em seu pipeline

Para obter informações sobre como iniciar, programar e configurar alertas em seu pipeline, consulte Tarefa comum de manutenção pipeline.

Verificar se a ingestão de dados foi bem-sucedida

A view em lista na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

Verificar replicação

As colunas Upserted records e Deleted records não são exibidas por default. Você pode habilitá-las clicando na configuração das colunas.Ícone de configuração de colunas botão e selecionando-os.

Recursos adicionais