PostgreSQL取り込みパイプラインの維持
プレビュー
LakeFlow ConnectのPostgreSQLコネクタはパブリック プレビュー段階です。 パブリック プレビューに登録するには、Databricks アカウント チームにお問い合わせください。
このページでは、PostgreSQL 取り込みパイプラインを維持するための継続的な操作について説明します。
一般的なパイプラインメンテナンス
このセクションのパイプライン メンテナンス タスクは、 Lakeflowコネクトのすべてのマネージド コネクタに適用されます。
一般的なパイプライン メンテナンス タスクについては、 「一般的なパイプライン メンテナンス タスク」を参照してください。
未使用のステージングファイルを削除する
2025 年 1 月 6 日以降に作成する取り込みパイプラインの場合、ボリューム ステージング データは 25 日後に自動的に削除されるようにスケジュールされ、30 日後に物理的に削除されます。取り込みパイプラインが 25 日間以上正常に完了していない場合、宛先テーブルにデータ ギャップが発生する可能性があります。ギャップを回避するには、ターゲット テーブルの完全な更新をトリガーします。
2025 年 1 月 6 日より前に作成された取り込みパイプラインについては、Databricks サポートに連絡して、ステージング CDC データの自動保持管理を手動で有効にするようリクエストしてください。
次のデータは自動的にクリーンアップされます:
- CDCデータファイル
- スナップショットファイル
- ステージングテーブルデータ
コネクタ固有のパイプラインのメンテナンス
このセクションのパイプラインのメンテナンス タスクは、PostgreSQL コネクタに固有です。
レプリケーションに新しいテーブルを追加する
既存のレプリケーション フローに新しいテーブルを追加するには:
-
レプリケーション ユーザーに必要な権限を付与します。必要な権限の完全なリストについては、 「PostgreSQL データベース ユーザー要件」を参照してください。
-
新しいテーブルの構造に基づいて、レプリカ ID を設定します。正しいレプリカ ID 設定を選択するためのガイダンスについては、 「テーブルのレプリカ ID を設定する」を参照してください。
-
パブリケーションにテーブルを追加します。
SQLALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table; -
新しいテーブルを含めるように取り込みパイプライン構成を更新します。これは、Databricks UI を使用するか、Databricks Asset Bundles バンドルまたは CLI コマンドで
ingestion_definitionを更新することによって実行できます。 -
新しいテーブルを検出するには、インジェスト ゲートウェイを再起動します。ゲートウェイは定期的に新しいテーブルをチェックしますが、ゲートウェイを再起動すると検出プロセスが高速化されます。
レプリケーションスロットをクリーンアップする
インジェスト パイプラインを削除しても、** レプリケーション スロットはソース PostgreSQL データベースから自動的に削除されません **。未使用のレプリケーション スロットにより、先行書き込みログ (WAL) ファイルが蓄積され、ソース データベースのディスク領域がいっぱいになる可能性があります。
すべてのレプリケーション スロットを一覧表示するには:
SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
不要になったレプリケーション スロットを削除するには:
SELECT pg_drop_replication_slot('slot_name');
インラインDDLトラッキングをクリーンアップする
インライン DDL 追跡を無効にする場合は、各データベースに対して以下のステップを実行して、監査スクリプトによって作成されたオブジェクトをクリーンアップします。
-
イベントトリガーをドロップします。
SQLDROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE; -
パブリケーションから監査テーブルを削除します。
SQLALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit; -
監査テーブルを削除します。
SQLDROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
レプリケーションスロットを監視する
レプリケーション スロットのステータスを監視して、アクティブであり、WAL データを消費していることを確認します。
SELECT slot_name,
active,
wal_status,
active_pid,
restart_lsn,
confirmed_flush_lsn,
pg_current_wal_lsn() AS current_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';
レプリケーション ラグの値が大きい場合は、次のいずれかの問題を示している可能性があります。
- インジェスチョン ゲートウェイがソース データベースの変更に対応できていません。
- 取り込みゲートウェイは長期間にわたって停止されています。
- ゲートウェイとソース データベース間のネットワーク接続の問題。
レプリケーション スロットが非アクティブ ( active = false ) で、対応するパイプラインが不要になったことを確認した場合は、レプリケーション スロットをドロップしてリソースを解放します。「レプリケーション スロットのクリーンアップ」を参照してください。
WALディスク使用量を監視する
ディスク領域の問題を防ぐために、Write-Ahead Log (WAL) ディスク使用量を監視します。
SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();
特定のレプリケーション スロットの WAL 保持を確認するには:
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';
ソースのセットアップ時にmax_slot_wal_keep_sizeが適切に構成されている場合 (レプリケーション スロットの WAL 保持を制限するで推奨されているとおり)、非アクティブなレプリケーション スロットによって WAL が無制限に増加することはありません。制限に達するとスロットは無効になり、データベースのクラッシュを防ぎます。
WAL ディスクの使用率が高い場合は、次のステップを実行します。
-
インジェストゲートウェイが継続的に実行されていることを確認します。
-
ゲートウェイ ログで、WAL データの消費を妨げている可能性のあるエラーがないか確認してください。
-
WAL 保持を制限するには、
max_slot_wal_keep_sizeを設定することを検討してください (PostgreSQL 13 以降)。SQLALTER SYSTEM SET max_slot_wal_keep_size = '10GB';
SELECT pg_reload_conf();
max_slot_wal_keep_sizeを設定すると、WAL 保持制限を超えた場合にレプリケーション スロットが無効になり、すべてのテーブルの完全な更新が必要になる可能性があります。
取り込みゲートウェイを再起動します
ソース データベースの負荷を軽減するために、インジェスト ゲートウェイは新しいテーブルのみを定期的にチェックします。ゲートウェイが新しいテーブルを検出するまでに最大 6 時間かかる場合があります。このプロセスを高速化したい場合は、ゲートウェイを再起動してください。
さらに、次の状況ではゲートウェイを再起動します。
- ソース データベースの構成を変更しました。
- ゲートウェイでエラーまたはパフォーマンスの問題が発生しています。
出版物の更新
レプリケーションに含めるテーブルを変更する必要がある場合:
-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;
-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;
-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';
パブリケーションを更新した後、変更を適用するためにインジェスト ゲートウェイを再起動します。