Ingerir dados do PostgreSQL
Visualização
O conector PostgreSQL para LakeFlow Connect está em versão prévia pública. Entre em contato com a equipe da sua account Databricks para se inscrever na Prévia Pública.
Esta página descreve como ingerir dados do PostgreSQL e carregá-los no Databricks usando LakeFlow Connect. O conector PostgreSQL oferece suporte AWS RDS PostgreSQL, Aurora PostgreSQL, Amazon EC2, Banco de Dados Azure para PostgreSQL, máquinas virtuais Azure , Cloud SQL para PostgreSQL GCP e bancos de dados PostgreSQL on-premises usando Azure ExpressRoute, AWS Direct Connect ou redes VPN.
Antes de começar
Para criar um gateway de ingestão e um pipeline de ingestão, você deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
compute sem servidor está habilitado para seu workspace. Consulte os requisitos compute sem servidor.
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTIONno metastore.Se o seu conector suportar a criação pipeline baseada em interface de usuário, você poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se você usar a criação pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir os passos desta página. Consulte Conectar para gerenciar fontes de ingestão.
-
Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTIONouALL PRIVILEGESna conexão. -
Você tem privilégios
USE CATALOGno catálogo de destino. -
Você tem privilégios
USE SCHEMA,CREATE TABLEeCREATE VOLUMEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino. -
Você tem acesso a uma instância primária do PostgreSQL. A replicação lógica só é suportada em instâncias primárias e não em réplicas de leitura.
-
Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:
-
Família: Computação Job
-
Substituições de família de políticas:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
Databricks recomenda especificar os menores nós worker possíveis para os gateways de ingestão, pois isso não afeta o desempenho do gateway. A seguinte política compute permite que Databricks aumente a capacidade do gateway de ingestão para atender às necessidades da sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração de dados eficiente e de alto desempenho do seu banco de dados de origem.
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "n2-highmem-64"
},
"node_type_id": {
"type": "fixed",
"value": "n2-standard-4"
}
}Para obter mais informações sobre política de cluster, consulte Selecionar uma política compute.
-
Para importar dados do PostgreSQL, você também precisa concluir a configuração da fonte.
Opção 1: Interface do usuário do Databricks
O suporte de interface de usuário para PostgreSQL estará disponível em breve. Por enquanto, utilize o fluxo de trabalho do Notebook ou CLI descrito na Opção 2.
Os usuários administradores podem criar uma conexão e um pipeline simultaneamente na interface do usuário do Databricks. Esta é a maneira mais simples de criar um pipeline de ingestão gerencial.
-
Na barra lateral do workspace do Databricks , clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em PostgreSQL .
O assistente de ingestão é aberto.
-
Na página do gateway de ingestão do assistente, insira um nome exclusivo para o gateway.
-
Selecione um catálogo e um esquema para armazenar os dados de ingestão e clique em Avançar .
-
Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.
-
Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.
-
Selecione a conexão Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.
Caso não existam conexões com a fonte, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração do PostgreSQL para ingestão no Databricks. Você deve ter privilégios
CREATE CONNECTIONno metastore. -
Clique em Criar pipeline e continue .
-
Na página Origem , selecione as tabelas que deseja importar.
-
Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).
-
Clique em Avançar .
-
Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.
Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios
USE CATALOGeCREATE SCHEMAno catálogo pai. -
Clique em Salvar e continuar .
-
Na página Configuração do Banco de Dados , insira o nome do slot de replicação e o nome da publicação para cada banco de dados do qual você deseja ingerir dados. Esses arquivos foram criados durante a configuração do PostgreSQL para ingestão no Databricks.
-
Clique em Avançar .
-
(Opcional) Na página Configurações , clique em Criar programar . Defina a frequência de refresh das tabelas de destino.
-
(Opcional) Configure notificações email para operações pipeline bem-sucedidas ou com falha.
-
Clique em Salvar e pipelinede execução .
Opção 2: Interfaces programáticas
Antes de realizar a ingestão de dados usando Databricks Ativo Bundles, Databricks APIs, Databricks SDKs, Databricks CLI ou Terraform, você precisa ter acesso a uma conexão existente Unity Catalog . Para obter instruções, consulte Conectar-se às fontes de ingestão de gerenciamento.
Crie o catálogo e o esquema de preparação.
O catálogo e o esquema de preparação podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de encenação não pode ser um catálogo estrangeiro.
- CLI
export CONNECTION_NAME="my_postgresql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_postgresql_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="postgresql-instance.example.com"
export DB_PORT="5432"
export DB_DATABASE="your_database"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "POSTGRESQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"database": "'"$DB_DATABASE"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Crie o gateway e o pipeline de ingestão.
O gateway de ingestão extrai dados de snapshot e de alterações do banco de dados de origem e os armazena no volume de preparação Unity Catalog . Você deve executar o gateway como um pipeline contínuo. Isso é fundamental para PostgreSQL evitar o inchaço do log de escrita antecipada (WAL) e garantir que os slots de replicação não acumulem alterações não consumidas.
O pipeline de ingestão aplica os dados de snapshot e de alteração do volume de preparação às tabelas de transmissão de destino.
Databricks Asset Bundles
Você pode implantar um pipeline de ingestão usando Databricks ativo Bundles. Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte PacotesDatabricks ativos.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init -
Adicione dois novos arquivos de recursos ao pacote:
- Um arquivo de definição de pipeline (
resources/postgresql_pipeline.yml). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/postgresql_job.yml).
Segue abaixo um exemplo de arquivo
resources/postgresql_pipeline.yml:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: postgresql-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <postgresql-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_postgresql:
name: postgresql-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
source_type: POSTGRESQL
objects:
# Modify this with your tables!
- table:
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
source_catalog: your_database
source_schema: public
source_table: orders
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: your_database
source_schema: public
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
source_configurations:
- catalog:
source_catalog: your_database
postgres:
slot_config:
slot_name: db_slot
publication_name: db_pub
target: ${var.dest_schema}
catalog: ${var.dest_catalog}Segue abaixo um exemplo de arquivo
resources/postgresql_job.yml:YAMLresources:
jobs:
postgresql_dab_job:
name: postgresql_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_postgresql.id} - Um arquivo de definição de pipeline (
-
Implante o pipeline usando a CLI Databricks :
Bashdatabricks bundle deploy
Notebook Databricks
Atualize a célula Configuration no seguinte Notebook com a conexão de origem, catálogo de destino, esquema de destino e tabelas a serem ingeridas da origem.
Create gateway and ingestion pipeline
CLI do Databricks
Para criar o gateway:
gateway_json=$(cat <<EOF
{
"name": "$GATEWAY_PIPELINE_NAME",
"gateway_definition": {
"connection_name": "$CONNECTION_NAME",
"gateway_storage_catalog": "$STAGING_CATALOG",
"gateway_storage_schema": "$STAGING_SCHEMA",
"gateway_storage_name": "$GATEWAY_PIPELINE_NAME"
}
}
EOF
)
output=$(databricks pipelines create --json "$gateway_json")
echo $output
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
Para criar o pipeline de ingestão:
pipeline_json=$(cat <<EOF
{
"name": "$INGESTION_PIPELINE_NAME",
"ingestion_definition": {
"ingestion_gateway_id": "$GATEWAY_PIPELINE_ID",
"source_type": "POSTGRESQL",
"objects": [
{
# Modify this with your tables!
"table": {
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
"source_catalog": "your_database",
"source_schema": "public",
"source_table": "table",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA",
"destination_table": "<YOUR_DATABRICKS_TABLE>"
}
},
{
"schema": {
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
"source_catalog": "your_database",
"source_schema": "public",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA"
}
}
],
"source_configurations": [
{
"catalog": {
"source_catalog": "your_database",
"postgres": {
"slot_config": {
"slot_name": "db_slot", # Slot created during source setup
"publication_name": "db_pub" # Publication created during source setup
}
}
}
}
]
}
}
EOF
)
databricks pipelines create --json "$pipeline_json"
Requer o Databricks CLI versão 0.276.0 ou posterior.
Terraform
Você pode usar Terraform para implantar e gerenciar um pipeline de ingestão PostgreSQL . Para um framework de exemplo completo, incluindo configurações Terraform para criação de gateways e pipeline de ingestão, consulte os exemplos Terraform LakeFlow Connect catalogados no GitHub.
começar, programar e definir alerta em seu pipeline
Para obter informações sobre como iniciar, programar e configurar alertas em seu pipeline, consulte Tarefa comum de manutenção pipeline.
Verificar se a ingestão de dados foi bem-sucedida
A view em lista na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

As colunas Upserted records e Deleted records não são exibidas por default. Você pode habilitá-las clicando na configuração das colunas. botão e selecionando-os.