メインコンテンツまでスキップ

Databricks への取り込み用に PostgreSQL を構成する

備考

プレビュー

LakeFlow ConnectのPostgreSQLコネクタはパブリック プレビュー段階です。 パブリック プレビューに登録するには、Databricks アカウント チームにお問い合わせください。

このページでは、 LakeFlow Connectを使用してPostgreSQLからDatabricksに取り込むためのソース セットアップ タスクについて説明します。

チェンジデータキャプチャ用の論理レプリケーション

PostgreSQL コネクタは論理レプリケーションを使用してソース テーブルの変更を追跡します。論理レプリケーションにより、コネクタは、トリガーやソース データベースでの大きなオーバーヘッドを必要とせずに、データの変更 (挿入、更新、削除) をキャプチャできます。

LakeFlow PostgreSQL論理レプリケーションには以下が必要です。

  1. LakeFlow Connect PostgreSQLバージョン 13 以降からのデータ レプリケーションをサポートしています。

  2. 論理レプリケーション用にデータベースを構成します。

    AWS RDS と Aurora の場合は、 rds.logical_replication1に設定します。

  3. 複製するすべてのテーブルを含むパブリケーションを作成します。

  4. レプリケートされるカタログごとにレプリケーション スロットを作成します。

注記

レプリケーション スロットを作成する前にパブリケーションを作成する必要があります。

論理レプリケーションの詳細については、 PostgreSQL Web サイトの論理レプリケーションのドキュメントを参照してください。

ソース設定タスクの概要

Databricks にデータを取り込む前に、PostgreSQL で次のタスクを完了します。

  1. PostgreSQL 13以上を検証する

  2. ネットワーク アクセスを構成する (セキュリティ グループ、ファイアウォール ルール、または VPN)

  3. 論理レプリケーションを構成します。

  4. オプション: スキーマ変更の自動検出のためにインライン DDL トラッキングを構成します。インライン DDL トラッキングを選択する場合は、Databricks サポートにお問い合わせください。

重要

複数の PostgreSQL データベースからレプリケートする予定の場合は、データベースごとに個別のパブリケーション スロットとレプリケーション スロットを作成する必要があります。インライン DDL 追跡スクリプト (使用されている場合) も各データベースで実行する必要があります。

論理レプリケーションを構成する

PostgreSQL で論理レプリケーションを有効にするには、データベース設定を構成し、必要なオブジェクトをセットアップします。

WALレベルを論理に設定する

論理レプリケーションには、先行書き込みログ (WAL) を構成する必要があります。この設定では通常、データベースの再起動が必要です。

  1. 現在のwal_level設定を確認してください:

    SQL
    SHOW wal_level;
  2. 値がlogicalでない場合は、サーバー構成でwal_level = logicalを設定し、PostgreSQL サービスを再起動してください。

レプリケーションユーザーを作成する

レプリケーション権限を持つ Databricks 取り込み専用のユーザーを作成します。

SQL
CREATE USER databricks_replication WITH PASSWORD 'your_secure_password';
GRANT CONNECT ON DATABASE your_database TO databricks_replication;
ALTER USER databricks_replication WITH REPLICATION;

詳細な権限要件については、 「PostgreSQL データベース ユーザー要件」を参照してください。

テーブルのレプリカIDを設定する

複製するテーブルごとに、レプリカ ID を構成します。正しい設定はテーブル構造によって異なります。

表構造

必要なレプリカID

コマンド

テーブルには主キーがあり、TOAST 可能な列 (たとえば、大きな値を持つTEXTBYTEAVARCHAR(n) ) は含まれていません

DEFAULT

テーブルには主キーがありますが、大きな可変長(TOASTable)列が含まれています

FULL

テーブルに主キーがありません

FULL

レプリカ ID 設定の詳細については、 PostgreSQLドキュメントの「レプリカ ID」を参照してください。

出版物を作成する

レプリケートするテーブルを含む各データベースにパブリケーションを作成します。

SQL
-- Create a publication for specific tables
CREATE PUBLICATION databricks_publication FOR TABLE schema_name.table1, schema_name.table2;

-- Or create a publication for all tables in a database
CREATE PUBLICATION databricks_publication FOR ALL TABLES;
注記

レプリケートする各 PostgreSQL データベースごとに個別のパブリケーションを作成する必要があります。

レプリケーションスロットを構成する

レプリケーション スロットを作成する前に、次のサーバーを構成します。

レプリケーションスロットのWAL保持を制限する

疑問 : max_slot_wal_keep_size

max_slot_wal_keep_size-1 (デフォルト値) に 設定しないことをお勧めします 。これは、遅延または非アクティブなレプリケーション スロットによる保持により、無制限の WAL 膨張が可能になるためです。ワークロードに応じて、この問題を有限の値に設定します。

max_slot_wal_keep_sizeについて詳しくは、 PostgreSQL公式ドキュメントをご覧ください。

注記

一部のマネージド クラウド プロバイダーは、この問題の変更を許可せず、代わりに組み込みスロット モニタリングと自動クリーンアップに依存しています。 運用アラートを設定する前に、プラットフォームの動作を確認してください。

詳細については、以下を参照してください。

レプリケーションスロットの容量を構成する

疑問 : max_replication_slots

複製される各 PostgreSQL データベースには、論理レプリケーション スロットが 1 つ必要です。この String には、少なくとも複製されるデータベースの数と、既存のレプリケーションのニーズを設定します。

WAL送信者を設定する

疑問 : max_wal_senders

この問題は、WAL データをサブスクライバにストリーミングするブロードキャスト WAL 送信側プロセスの最大数を定義します。 ほとんどの場合、効率的で一貫性のあるデータ レプリケーションを確保するには、レプリケーション スロットごとに 1 つの WAL 送信プロセスを用意する必要があります。

他の既存の使用状況を考慮して、 max_wal_senders少なくとも使用中のレプリケーション スロットの数と同じになるように構成します。運用の柔軟性を確保するため、少し高めに設定することをお勧めします。

レプリケーションスロットを作成する

Databricks インジェスト ゲートウェイが変更を追跡するために使用するレプリケーション スロットを各データベースに作成します。

SQL
-- Create a replication slot with the pgoutput plugin
SELECT pg_create_logical_replication_slot('databricks_slot', 'pgoutput');
重要
  • レプリケーション スロットは、コネクタによって消費されるまで WAL データを保持します。WAL の保持を制限し、無制限の WAL の増加を防ぐために、 max_slot_wal_keep_size問題を構成します。 詳細については、「レプリケーション スロットの構成」を参照してください。
  • インジェスト パイプラインを削除する場合は、関連付けられているレプリケーション スロットを手動で削除する必要があります。「レプリケーション スロットのクリーンアップ」を参照してください。

オプション: インラインDDLトラッキングを構成する

インライン DDL トラッキングは、コネクタがソース データベースからのスキーマの変更を自動的に検出して適用できるようにするオプションの機能です。この機能はデフォルトで無効になっています。

警告

インライン DDL 追跡は現在プレビュー段階であり、ワークスペースで有効にするには Databricks サポートに連絡する必要があります。

どのスキーマ変更が自動的に処理され、どのスキーマ変更に完全な更新が必要かについては、 「マネージド コネクタはスキーマ進化をどのように処理しますか?」を参照してください。 そしてスキーマの進化

インラインDDLトラッキングを設定する

ワークスペースでインライン DDL 追跡が有効になっている場合は、 各PostgreSQLデータベースで 次のステップを完了します。

  1. lakeflow_pg_ddl_change_tracking.sqlスクリプトをダウンロードして実行します。

    SQL
    \i lakeflow_pg_ddl_change_tracking.sql
  2. トリガーと監査テーブルが正常に作成されたことを確認します。

    SQL
    -- Check for the DDL audit table
    SELECT * FROM pg_tables WHERE tablename = 'lakeflow_ddl_audit';

    -- Check for the event triggers
    SELECT * FROM pg_event_trigger WHERE evtname LIKE 'lakeflow%';
  3. DDL 監査テーブルをパブリケーションに追加します。

    SQL
    ALTER PUBLICATION databricks_publication ADD TABLE public.lakeflow_ddl_audit;

クラウド固有の構成に関する注意事項

AWS RDS と Aurora

  • 問題グループでrds.logical_replication問題が1に設定されていることを確認してください。

  • Databricks ワークスペースからの接続を許可するようにセキュリティ グループを構成します。

  • レプリケーション ユーザーにはrds_replicationロールが必要です:

    SQL
    GRANT rds_replication TO databricks_replication;

PostgreSQL 用 Azure データベース

  • Azure ポータルまたは CLI を使用して、サーバー パラメーターで論理レプリケーションを有効にします。
  • Databricks ワークスペースからの接続を許可するようにファイアウォール ルールを構成します。
  • フレキシブル サーバーでは、論理レプリケーションがサポートされています。単一サーバーの場合は、サポートされている層を使用していることを確認してください。

GCPクラウドSQL for PostgreSQL

  • インスタンス設定でcloudsql.logical_decodingフラグを有効にします。
  • Databricks ワークスペースからの接続を許可するように承認済みネットワークを構成します。
  • pglogical 拡張機能を使用する場合は、 cloudsql.enable_pglogicalフラグがonに設定されていることを確認してください。

構成を確認する

セットアップ タスクを完了したら、論理レプリケーションが適切に構成されていることを確認します。

  1. wal_levellogicalに設定されていることを確認します。

    SQL
    SHOW wal_level;
  2. レプリケーション ユーザーにreplication権限があることを確認します。

    SQL
    SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'databricks_replication';
  3. 出版物が存在することを確認します。

    SQL
    SELECT * FROM pg_publication WHERE pubname = 'databricks_publication';
  4. レプリケーション スロットが存在することを確認します。

    SQL
    SELECT slot_name, slot_type, active, restart_lsn
    FROM pg_replication_slots
    WHERE slot_name = 'databricks_slot';
  5. テーブルのレプリカ ID を確認します。

    SQL
    SELECT schemaname, tablename, relreplident
    FROM pg_tables t
    JOIN pg_class c ON t.tablename = c.relname
    WHERE schemaname = 'your_schema';

    FULLレプリカIDの場合、 relreplident列にはf表示されます。

次のステップ

ソースのセットアップが完了したら、PostgreSQL からデータを取り込むための取り込みゲートウェイとパイプラインを作成できます。「PostgreSQL からデータを取り込む」を参照してください。