Manter o pipeline de ingestão PostgreSQL
Visualização
O conector PostgreSQL para LakeFlow Connect está em versão prévia pública. Entre em contato com a equipe da sua account Databricks para se inscrever na Prévia Pública.
Esta página descreve as operações em andamento para a manutenção do pipeline de ingestão PostgreSQL .
Manutenção geral pipeline
A tarefa de manutenção pipeline nesta seção se aplica a todos os conectores de gerenciamento no LakeFlow Connect.
Para tarefas gerais de manutenção pipeline , consulte Tarefas comuns de manutenção pipeline.
Remova os arquivos de preparação não utilizados.
Para pipelines de ingestão criados após 6 de janeiro de 2025, os dados de armazenamento temporário de volume são automaticamente agendados para exclusão após 25 dias e removidos fisicamente após 30 dias. Um pipeline de ingestão que não seja concluído com sucesso em 25 dias ou mais pode resultar em lacunas de dados nas tabelas de destino. Para evitar lacunas, acione uma refresh completa das tabelas de destino.
Para pipelines de ingestão criados antes de 6 de janeiro de 2025, entre em contato com o Suporte Databricks para solicitar a ativação manual do gerenciamento automático de retenção para dados de preparação CDC .
Os seguintes dados são limpos automaticamente:
- Arquivos de dados do CDC
- Arquivos Snapshot
- dados da tabela de preparação
Manutenção pipeline específica para conectores
As tarefas de manutenção pipeline nesta seção são específicas para o conector PostgreSQL .
Adicionar novas tabelas à replicação
Para adicionar novas tabelas a um fluxo de replicação existente:
-
Conceda os privilégios necessários ao usuário de replicação. Para obter uma lista completa dos privilégios necessários, consulte Requisitos de usuário do banco de dados PostgreSQL.
-
Defina a identidade da réplica para as novas tabelas com base em sua estrutura. Consulte a seção "Definir identidade de réplica para tabelas" para obter orientações sobre como escolher a configuração de identidade de réplica correta.
-
Adicione as tabelas à publicação:
SQLALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table; -
Atualize a configuração do pipeline de ingestão para incluir as novas tabelas. Você pode fazer isso através da interface Databricks ou atualizando o
ingestion_definitionno seu pacote Databricks ativo Bundles ou comando CLI . -
Reinicie o gateway de ingestão para descobrir as novas tabelas. O gateway verifica periodicamente a existência de novas tabelas, mas reiniciá-lo acelera o processo de descoberta.
Limpar slots de replicação
Ao excluir um pipeline de ingestão, **o slot de replicação não é removido automaticamente do banco de dados PostgreSQL de origem**. Slots de replicação não utilizados podem causar o acúmulo de arquivos de log de gravação antecipada (WAL), potencialmente preenchendo o espaço em disco no banco de dados de origem.
Para listar todos os slots de replicação:
SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
Para remover um slot de replicação que não seja mais necessário:
SELECT pg_drop_replication_slot('slot_name');
Limpar acompanhamento DDL inline
Se você desativar o acompanhamento DDL embutido, execute os passos abaixo para cada banco de dados para limpar os objetos criados pelo script de auditoria.
-
Remova os gatilhos de evento:
SQLDROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE; -
Remova a tabela de auditoria da publicação:
SQLALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit; -
Eliminar a tabela de auditoria:
SQLDROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
Slots de replicação do monitor
Monitore o status dos slots de replicação para garantir que estejam ativos e consumindo dados WAL:
SELECT slot_name,
active,
wal_status,
active_pid,
restart_lsn,
confirmed_flush_lsn,
pg_current_wal_lsn() AS current_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';
Valores elevados de atraso na replicação podem indicar um dos seguintes problemas:
- O gateway de ingestão não está acompanhando as alterações no banco de dados de origem.
- O gateway de ingestão foi interrompido por um período prolongado.
- Problemas de conectividade de rede entre o gateway e o banco de dados de origem.
Se um slot de replicação estiver inativo (active = false) e você tiver confirmado que o pipeline correspondente não é mais necessário, remova o slot de replicação para liberar o recurso. Consulte Limpar slots de replicação.
Monitorar o uso do disco WAL
Monitore o uso do log de gravação antecipada (WAL) para evitar problemas de espaço em disco:
SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();
Para verificar a retenção de WAL para um slot de replicação específico:
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';
Se max_slot_wal_keep_size estiver configurado corretamente durante a configuração da origem (conforme recomendado em Limitar a retenção de WAL para slots de replicação), os slots de replicação inativos não causarão crescimento ilimitado do WAL. O slot será invalidado quando o limite for atingido, evitando falhas no banco de dados.
Se o uso de disco WAL estiver alto, execute os seguintes passos do sistema operacional:
-
Verifique se o gateway de ingestão está em execução contínua.
-
Verifique os logs do gateway em busca de erros que possam estar impedindo o consumo de dados WAL.
-
Considere definir
max_slot_wal_keep_sizepara limitar a retenção do WAL (PostgreSQL 13 ou superior):SQLALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
SELECT pg_reload_conf();
A configuração max_slot_wal_keep_size pode invalidar os slots de replicação se o limite de retenção do WAL for excedido, exigindo uma refresh completa de todas as tabelas.
Reinicie o gateway de ingestão.
Para diminuir a carga no banco de dados de origem, o gateway de ingestão verifica periodicamente a existência de novas tabelas. Pode levar até 6 horas para o gateway descobrir novas tabelas. Se você quiser acelerar esse processo, reinicie o gateway.
Além disso, reinicie o gateway nas seguintes situações:
- Você fez alterações de configuração no banco de dados de origem.
- O gateway está apresentando erros ou problemas de desempenho.
Atualizar publicações
Caso precise modificar quais tabelas estão incluídas na replicação:
-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;
-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;
-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';
Após atualizar a publicação, reinicie o gateway de ingestão para aplicar as alterações.