Skip to main content

Maintain PostgreSQL ingestion pipelines

Preview

The PostgreSQL connector for Lakeflow Connect is in Public Preview. Reach out to your Databricks account team to enroll in the Public Preview.

This page describes the ongoing operations for maintaining PostgreSQL ingestion pipelines.

General pipeline maintenance

The pipeline maintenance tasks in this section apply to all managed connectors in Lakeflow Connect.

For general pipeline maintenance tasks, see Common pipeline maintenance tasks.

Remove unused staging files

For ingestion pipelines that you create after January 6, 2025, volume staging data is automatically scheduled for deletion after 25 days and physically removed after 30 days. An ingestion pipeline that has not completed successfully for 25 days or longer might result in data gaps in the destination tables. To avoid gaps, trigger a full refresh of the target tables.

For ingestion pipelines created before January 6, 2025, contact Databricks Support to request manual enablement of automatic retention management for staging CDC data.

The following data is automatically cleaned up:

  • CDC data files
  • Snapshot files
  • Staging table data

Connector-specific pipeline maintenance

The pipeline maintenance tasks in this section are specific to the PostgreSQL connector.

Add new tables to replication

To add new tables to an existing replication flow:

  1. Grant the necessary privileges to the replication user. For a complete list of required privileges, see PostgreSQL database user requirements.

  2. Set the replica identity for the new tables based on their structure. See Set replica identity for tables for guidance on choosing the correct replica identity setting.

  3. Add the tables to the publication:

    SQL
    ALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table;
  4. Update the ingestion pipeline configuration to include the new tables. You can do this through the Databricks UI or by updating the ingestion_definition in your Declarative Automation Bundles bundle or CLI command.

  5. Restart the ingestion gateway to discover the new tables. The gateway periodically checks for new tables, but restarting the gateway speeds up the discovery process.

Clean up replication slots

When you delete an ingestion pipeline, the replication slot is not automatically removed from the source PostgreSQL database.

important

Unused replication slots can cause Write-Ahead Log (WAL) files to accumulate, potentially filling up disk space on the source database.

To list all replication slots:

SQL
SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

To drop a replication slot that is no longer needed, you must be connected as a user with the REPLICATION privilege. If you are connected as a superuser or admin, switch to the replication user first:

SQL
SET ROLE databricks_replication;
SELECT pg_drop_replication_slot('databricks_slot');
RESET ROLE;

Clean up inline DDL tracking

If you disable inline DDL tracking, run the steps below for each database to clean up objects created by the audit script.

  1. Drop the event triggers:

    SQL
    DROP EVENT TRIGGER IF EXISTS lakeflow_ddl_audit_trigger_1_0 CASCADE;
    DROP EVENT TRIGGER IF EXISTS lakeflow_drop_ddl_audit_trigger_1_0 CASCADE;
  2. Remove the audit table from the publication:

    SQL
    ALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit_table_1_0;
  3. Drop the audit functions:

    SQL
    DROP FUNCTION IF EXISTS public.lakeflow_ddl_audit_function_1_0() CASCADE;
    DROP FUNCTION IF EXISTS public.lakeflow_drop_ddl_audit_function_1_0() CASCADE;
  4. Drop the audit table:

    SQL
    DROP TABLE IF EXISTS public.lakeflow_ddl_audit_table_1_0 CASCADE;

Monitor replication slots

Monitor the status of replication slots to ensure they are active and consuming WAL data:

SQL
SELECT slot_name,
active,
wal_status,
active_pid,
restart_lsn,
confirmed_flush_lsn,
pg_current_wal_lsn() AS current_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';

Large replication lag values can indicate one of the following issues:

  • The ingestion gateway is not keeping up with the changes in the source database.
  • The ingestion gateway has been stopped for an extended period.
  • Network connectivity issues between the gateway and the source database.

If a replication slot is inactive (active = false) and you have confirmed that the corresponding pipeline is no longer needed, drop the replication slot to free up the resources. See Clean up replication slots.

Monitor WAL disk usage

Monitor Write-Ahead Log (WAL) disk usage to prevent disk space issues:

SQL
SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();

To check WAL retention for a specific replication slot:

SQL
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';
note

If max_slot_wal_keep_size is properly configured during source setup (as recommended in Limit WAL retention for replication slots), inactive replication slots will not cause unbounded WAL growth. The slot will be invalidated when the limit is reached, preventing database crashes.

If WAL disk usage is high, perform the following steps:

  1. Verify that the ingestion gateway is running continuously.

  2. Check the gateway logs for errors that might be preventing it from consuming WAL data.

  3. Consider setting max_slot_wal_keep_size to limit WAL retention (PostgreSQL 13 or above):

    SQL
    ALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
    SELECT pg_reload_conf();
    warning

    Setting max_slot_wal_keep_size can cause replication slots to be invalidated if the WAL retention limit is exceeded, requiring a full refresh of all tables.

Restart the ingestion gateway

To decrease the load on the source database, the ingestion gateway only checks periodically for new tables. It might take up to 6 hours for the gateway to discover new tables. If you want to speed up this process, restart the gateway.

Additionally, restart the gateway in the following situations:

  • You have made configuration changes to the source database.
  • The gateway is experiencing errors or performance issues.

Update publications

If you need to modify which tables are included in replication:

SQL
-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;

-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;

-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';

After updating the publication, restart the ingestion gateway to apply the changes.

Remove all Lakeflow Connect objects from the source database

If you no longer need to replicate from a PostgreSQL database, run the following steps to remove all objects created during source setup. Run these commands as a superuser or the table owner, except where noted.

  1. If inline DDL tracking was configured, clean it up first. See Clean up inline DDL tracking.

  2. Drop the replication slot. This requires the REPLICATION privilege:

    SQL
    SET ROLE databricks_replication;
    SELECT pg_drop_replication_slot('databricks_slot');
    RESET ROLE;
  3. Drop the publication:

    SQL
    DROP PUBLICATION IF EXISTS databricks_publication;
  4. Revoke privileges and remove the replication user:

    SQL
    REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA schema_name FROM databricks_replication;
    REVOKE USAGE ON SCHEMA schema_name FROM databricks_replication;
    REVOKE CONNECT ON DATABASE your_database FROM databricks_replication;
    DROP USER IF EXISTS databricks_replication;