Ingerir dados do PostgreSQL
Visualização
O conector PostgreSQL para LakeFlow Connect está em versão prévia pública. Entre em contato com a equipe da sua account Databricks para se inscrever na Prévia Pública.
Esta página descreve como ingerir dados do PostgreSQL e carregá-los no Databricks usando LakeFlow Connect. O conector PostgreSQL oferece suporte AWS RDS PostgreSQL, Aurora PostgreSQL, Amazon EC2, Banco de Dados Azure para PostgreSQL, máquinas virtuais Azure , Cloud SQL para PostgreSQL GCP e bancos de dados PostgreSQL on-premises usando Azure ExpressRoute, AWS Direct Connect ou redes VPN.
Antes de começar
Para criar um gateway de ingestão e um pipeline de ingestão, você deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
compute sem servidor está habilitado para seu workspace. Consulte os requisitos compute sem servidor.
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTIONno metastore.Se o seu conector suportar a criação pipeline baseada em interface de usuário, você poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se você usar a criação pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir os passos desta página. Consulte Conectar para gerenciar fontes de ingestão.
-
Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTIONouALL PRIVILEGESna conexão. -
Você tem privilégios
USE CATALOGno catálogo de destino. -
Você tem privilégios
USE SCHEMA,CREATE TABLEeCREATE VOLUMEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino. -
Você tem acesso a uma instância primária do PostgreSQL. A replicação lógica só é suportada em instâncias primárias e não em réplicas de leitura.
-
Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:
-
Família: Computação Job
-
Substituições de família de políticas:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
Databricks recomenda especificar os menores nós worker possíveis para os gateways de ingestão, pois isso não afeta o desempenho do gateway. A seguinte política compute permite que Databricks aumente a capacidade do gateway de ingestão para atender às necessidades da sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração de dados eficiente e de alto desempenho do seu banco de dados de origem.
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "n2-highmem-64"
},
"node_type_id": {
"type": "fixed",
"value": "n2-standard-4"
}
}Para obter mais informações sobre política de cluster, consulte Selecionar uma política compute.
-
Para importar dados do PostgreSQL, você também precisa concluir a configuração da fonte.
Opção 1: Interface do usuário do Databricks
O suporte de interface de usuário para PostgreSQL estará disponível em breve. Por enquanto, utilize o fluxo de trabalho do Notebook ou CLI descrito na Opção 2.
Os usuários administradores podem criar uma conexão e um pipeline simultaneamente na interface do usuário do Databricks. Esta é a maneira mais simples de criar um pipeline de ingestão gerencial.
-
Na barra lateral do workspace do Databricks , clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em PostgreSQL .
O assistente de ingestão é aberto.
-
Na página do gateway de ingestão do assistente, insira um nome exclusivo para o gateway.
-
Selecione um catálogo e um esquema para armazenar os dados de ingestão e clique em Avançar .
-
Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.
-
Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.
-
Selecione a conexão Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.
Caso não existam conexões com a fonte, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração do PostgreSQL para ingestão no Databricks. Você deve ter privilégios
CREATE CONNECTIONno metastore. -
Clique em Criar pipeline e continue .
-
Na página Origem , selecione as tabelas que deseja importar.
-
Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).
-
Clique em Avançar .
-
Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.
Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios
USE CATALOGeCREATE SCHEMAno catálogo pai. -
Clique em Salvar e continuar .
-
Na página Configuração do Banco de Dados , insira o nome do slot de replicação e o nome da publicação para cada banco de dados do qual você deseja ingerir dados. Esses arquivos foram criados durante a configuração do PostgreSQL para ingestão no Databricks.
-
Clique em Avançar .
-
(Opcional) Na página Configurações , clique em Criar programar . Defina a frequência de refresh das tabelas de destino.
-
(Opcional) Configure notificações email para operações pipeline bem-sucedidas ou com falha.
-
Clique em Salvar e pipelinede execução .
Opção 2: Outras interfaces
Antes de realizar a ingestão de dados usando Databricks Ativo Bundles, Databricks APIs, Databricks SDKs ou Databricks CLI, você precisa ter acesso a uma conexão existente Unity Catalog . Para obter instruções, consulte Conectar-se às fontes de ingestão de gerenciamento.
Crie o catálogo e o esquema de preparação.
O catálogo e o esquema de preparação podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de encenação não pode ser um catálogo estrangeiro.
- CLI
export CONNECTION_NAME="my_postgresql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_postgresql_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="postgresql-instance.example.com"
export DB_PORT="5432"
export DB_DATABASE="your_database"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "POSTGRESQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"database": "'"$DB_DATABASE"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Crie o gateway e o pipeline de ingestão.
O gateway de ingestão extrai dados de snapshot e de alterações do banco de dados de origem e os armazena no volume de preparação Unity Catalog . Você deve executar o gateway como um pipeline contínuo. Isso é fundamental para PostgreSQL evitar o inchaço do log de escrita antecipada (WAL) e garantir que os slots de replicação não acumulem alterações não consumidas.
O pipeline de ingestão aplica os dados de snapshot e de alteração do volume de preparação às tabelas de transmissão de destino.
- Databricks Asset Bundles
- Notebook
- CLI
Esta tab descreve como implantar um pipeline de ingestão usando Databricks Ativo Bundles. Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte PacotesDatabricks ativos.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init -
Adicione dois novos arquivos de recursos ao pacote:
- Um arquivo de definição de pipeline (
resources/postgresql_pipeline.yml). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/postgresql_job.yml).
Segue abaixo um exemplo de arquivo
resources/postgresql_pipeline.yml:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: postgresql-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <postgresql-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_postgresql:
name: postgresql-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
source_type: POSTGRESQL
objects:
# Modify this with your tables!
- table:
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
source_catalog: your_database
source_schema: public
source_table: orders
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: your_database
source_schema: public
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
source_configurations:
- catalog:
source_catalog: your_database
postgres:
slot_config:
slot_name: db_slot
publication_name: db_pub
target: ${var.dest_schema}
catalog: ${var.dest_catalog}Segue abaixo um exemplo de arquivo
resources/postgresql_job.yml:YAMLresources:
jobs:
postgresql_dab_job:
name: postgresql_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_postgresql.id} - Um arquivo de definição de pipeline (
-
Implante o pipeline usando a CLI Databricks :
Bashdatabricks bundle deploy
Atualize a célula Configuration no seguinte Notebook com a conexão de origem, catálogo de destino, esquema de destino e tabelas a serem ingeridas da origem.
Criar gateway e pipeline de ingestão
Para criar o gateway:
gateway_json=$(cat <<EOF
{
"name": "$GATEWAY_PIPELINE_NAME",
"gateway_definition": {
"connection_name": "$CONNECTION_NAME",
"gateway_storage_catalog": "$STAGING_CATALOG",
"gateway_storage_schema": "$STAGING_SCHEMA",
"gateway_storage_name": "$GATEWAY_PIPELINE_NAME"
}
}
EOF
)
output=$(databricks pipelines create --json "$gateway_json")
echo $output
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
Para criar o pipeline de ingestão:
pipeline_json=$(cat <<EOF
{
"name": "$INGESTION_PIPELINE_NAME",
"ingestion_definition": {
"ingestion_gateway_id": "$GATEWAY_PIPELINE_ID",
"source_type": "POSTGRESQL",
"objects": [
{
# Modify this with your tables!
"table": {
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
"source_catalog": "your_database",
"source_schema": "public",
"source_table": "table",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA",
"destination_table": "<YOUR_DATABRICKS_TABLE>"
}
},
{
"schema": {
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
"source_catalog": "your_database",
"source_schema": "public",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA"
}
}
],
"source_configurations": [
{
"catalog": {
"source_catalog": "your_database",
"postgres": {
"slot_config": {
"slot_name": "db_slot", # Slot created during source setup
"publication_name": "db_pub" # Publication created during source setup
}
}
}
}
]
}
}
EOF
)
databricks pipelines create --json "$pipeline_json"
Requer o Databricks CLI versão 0.276.0 ou posterior.
começar, programar e definir alerta em seu pipeline
Para obter informações sobre como iniciar, programar e configurar alertas em seu pipeline, consulte Tarefa comum de manutenção pipeline.
Verificar se a ingestão de dados foi bem-sucedida
A view em lista na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

As colunas Upserted records e Deleted records não são exibidas por default. Você pode habilitá-las clicando na configuração das colunas. botão e selecionando-os.