メインコンテンツまでスキップ

Databricks への取り込み用に PostgreSQL を構成する

備考

プレビュー

LakeFlow ConnectのPostgreSQLコネクタはパブリック プレビュー段階です。 パブリック プレビューに登録するには、Databricks アカウント チームにお問い合わせください。

このページでは、 LakeFlow Connectを使用してPostgreSQLからDatabricksに取り込むためのソース セットアップ タスクについて説明します。

設定および取り込み時に使用される資格情報

PostgreSQL データ取り込みでは、2つの異なる段階で2つの異なる資格情報セットを使用します。どの資格情報をどこで使用すべきかを把握することで、セットアップ時の認証エラーと権限エラーを防ぎます。

ステージ

使用する資格情報

理由

ソース設定(このページ)

ソースデータベースに直接接続されているPostgreSQL 管理者、スーパーユーザー、またはテーブル所有者(例: psql またはクラウドプロバイダーの管理コンソールを介して)。

レプリケーションユーザーの作成、権限の付与、およびパブリケーションの作成には、レプリケーションユーザーに付与されていないスーパーユーザー権限またはテーブル所有者権限が必要です。レプリケーションスロットはレプリケーションユーザー自身によって作成されるため、データベースに接続した管理者がそのロールに切り替えて作成します。特権の完全なリストについては、PostgreSQL データベースユーザーの要件を参照してください。

接続と取り込みパイプライン

ソースのセットアップ中に作成する専用のレプリケーションユーザー(例:databricks_replication

取り込みゲートウェイは、変更を読み取るレプリケーションユーザーとしてPostgreSQLに認証します。Unity Catalog接続を作成する際に、これらの認証情報を入力します。PostgreSQL接続を作成を参照してください。

注記

ソース設定タスクは管理者として実行しますが、取り込みパイプラインは管理者認証情報を使用しません。レプリケーションユーザーの資格情報のみがUnity Catalog接続に格納されます。

チェンジデータキャプチャ用の論理レプリケーション

PostgreSQL コネクタは論理レプリケーションを使用してソース テーブルの変更を追跡します。論理レプリケーションにより、コネクタは、トリガーやソース データベースでの大きなオーバーヘッドを必要とせずに、データの変更 (挿入、更新、削除) をキャプチャできます。

LakeFlow PostgreSQL論理レプリケーションには以下が必要です。

  1. LakeFlow Connect PostgreSQLバージョン 13 以降からのデータ レプリケーションをサポートしています。

  2. 論理レプリケーション用にデータベースを構成します。

    AWS RDS と Aurora の場合は、 rds.logical_replication1に設定します。

  3. 複製するすべてのテーブルを含むパブリケーションを作成します。

  4. レプリケートされるカタログごとにレプリケーション スロットを作成します。

注記

レプリケーション スロットを作成する前にパブリケーションを作成する必要があります。

論理レプリケーションの詳細については、 PostgreSQL Web サイトの論理レプリケーションのドキュメントを参照してください。

ソース設定タスクの概要

Databricks にデータを取り込む前に、PostgreSQL で次のタスクを完了します。

  1. PostgreSQL 13以上を検証する

  2. ネットワーク アクセスを構成する (セキュリティ グループ、ファイアウォール ルール、または VPN)

  3. 論理レプリケーションを構成します。

  4. オプション: スキーマ変更の自動検出のためにインライン DDL トラッキングを構成します。インライン DDL トラッキングを選択する場合は、Databricks サポートにお問い合わせください。

重要

複数の PostgreSQL データベースからレプリケートする予定の場合は、データベースごとに個別のパブリケーション スロットとレプリケーション スロットを作成する必要があります。インライン DDL 追跡スクリプト (使用されている場合) も各データベースで実行する必要があります。

論理レプリケーションを構成する

PostgreSQL で論理レプリケーションを有効にするには、データベース設定を構成し、必要なオブジェクトをセットアップします。

WALレベルを論理に設定する

論理レプリケーションには、先行書き込みログ (WAL) を構成する必要があります。この設定では通常、データベースの再起動が必要です。

  1. 現在のwal_level設定を確認してください:

    SQL
    SHOW wal_level;
  2. 値がlogicalでない場合は、サーバー構成でwal_level = logicalを設定し、PostgreSQL サービスを再起動してください。

レプリケーションユーザーを作成する

レプリケーション権限を持つ Databricks 取り込み専用のユーザーを作成します。

SQL
CREATE USER databricks_replication WITH PASSWORD 'your_secure_password';
GRANT CONNECT ON DATABASE your_database TO databricks_replication;
GRANT USAGE ON SCHEMA schema_name TO databricks_replication;
GRANT SELECT ON TABLE schema_name.table_name TO databricks_replication;
ALTER USER databricks_replication WITH REPLICATION;

詳細な権限要件については、 「PostgreSQL データベース ユーザー要件」を参照してください。

テーブルのレプリカIDを設定する

複製するテーブルごとに、レプリカ ID を構成します。正しい設定はテーブル構造によって異なります。

表構造

必要なレプリカID

コマンド

テーブルには主キーがあり、TOAST 可能な列 (たとえば、大きな値を持つTEXTBYTEAVARCHAR(n) ) は含まれていません

DEFAULT

SQL
ALTER TABLE schema_name.table_name REPLICA IDENTITY DEFAULT;

テーブルには主キーがありますが、大きな可変長(TOASTable)列が含まれています

FULL

SQL
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;

テーブルに主キーがありません

FULL

SQL
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;

レプリカ ID 設定の詳細については、 PostgreSQLドキュメントの「レプリカ ID」を参照してください。

出版物を作成する

複製したいテーブルを含むパブリケーションを、各データベースに作成します。このコマンドをテーブル所有者またはスーパーユーザーとして実行してください。

SQL
-- Create a publication for specific tables
CREATE PUBLICATION databricks_publication FOR TABLE schema_name.table1, schema_name.table2;

-- Or create a publication for all tables in a database
CREATE PUBLICATION databricks_publication FOR ALL TABLES;
注記
  • レプリケートする各 PostgreSQL データベースごとに個別のパブリケーションを作成する必要があります。
  • CREATE PUBLICATION ... FOR TABLE 記載されているテーブルの所有権が必要です。FOR ALL TABLESはスーパーユーザー権限が必要です。このコマンドは、レプリケーションユーザーとしてではなく、テーブル所有者またはデータベースのスーパーユーザーとして実行してください。
  • 不要なネットワークトラフィックを削減するため、レプリケーションに必要のないテーブルをパブリケーションに追加しないようにしてください。

レプリケーションスロットを構成する

レプリケーション スロットを作成する前に、次のサーバーを構成します。

レプリケーションスロットのWAL保持を制限する

疑問 : max_slot_wal_keep_size

max_slot_wal_keep_size-1 (デフォルト値) に 設定しないことをお勧めします 。これは、遅延または非アクティブなレプリケーション スロットによる保持により、無制限の WAL 膨張が可能になるためです。ワークロードに応じて、この問題を有限の値に設定します。

max_slot_wal_keep_sizeについて詳しくは、 PostgreSQL公式ドキュメントをご覧ください。

注記

一部のマネージド クラウド プロバイダーは、この問題の変更を許可せず、代わりに組み込みスロット モニタリングと自動クリーンアップに依存しています。 運用アラートを設定する前に、プラットフォームの動作を確認してください。

詳細については、以下を参照してください。

レプリケーションスロットの容量を構成する

疑問 : max_replication_slots

複製される各 PostgreSQL データベースには、論理レプリケーション スロットが 1 つ必要です。この String には、少なくとも複製されるデータベースの数と、既存のレプリケーションのニーズを設定します。

WAL送信者を設定する

疑問 : max_wal_senders

この問題は、WAL データをサブスクライバにストリーミングするブロードキャスト WAL 送信側プロセスの最大数を定義します。 ほとんどの場合、効率的で一貫性のあるデータ レプリケーションを確保するには、レプリケーション スロットごとに 1 つの WAL 送信プロセスを用意する必要があります。

他の既存の使用状況を考慮して、 max_wal_senders少なくとも使用中のレプリケーション スロットの数と同じになるように構成します。運用の柔軟性を確保するため、少し高めに設定することをお勧めします。

レプリケーションスロットを作成する

Databricksの取り込みゲートウェイが変更を追跡するために使用するレプリケーションスロットを、各データベースに作成します。レプリケーションスロットは、 REPLICATION権限を持つユーザーによって作成されなければなりません。スーパーユーザーまたは管理者として接続している場合は、まずレプリケーションユーザーに切り替えてください。

SQL
SET ROLE databricks_replication;

-- Databricks supports only the pgoutput plugin for replication slots
SELECT pg_create_logical_replication_slot('databricks_slot', 'pgoutput');

-- Switch back to the admin or table owner role for subsequent steps
RESET ROLE;
重要
  • レプリケーション スロットは、コネクタによって消費されるまで WAL データを保持します。WAL の保持を制限し、無制限の WAL の増加を防ぐために、 max_slot_wal_keep_size問題を構成します。 詳細については、「レプリケーション スロットの構成」を参照してください。
  • インジェスト パイプラインを削除する場合は、関連付けられているレプリケーション スロットを手動で削除する必要があります。「レプリケーション スロットのクリーンアップ」を参照してください。

オプション: インラインDDLトラッキングを構成する

インライン DDL トラッキングは、コネクタがソース データベースからのスキーマの変更を自動的に検出して適用できるようにするオプションの機能です。この機能はデフォルトで無効になっています。

警告

インライン DDL 追跡は現在プレビュー段階であり、ワークスペースで有効にするには Databricks サポートに連絡する必要があります。

どのスキーマ変更が自動的に処理され、どのスキーマ変更に完全な更新が必要かについては、 「マネージド コネクタはスキーマ進化をどのように処理しますか?」を参照してください。 そしてスキーマの進化

インラインDDLトラッキングを設定する

ワークスペースでインライン DDL 追跡が有効になっている場合は、 各PostgreSQLデータベースで 次のステップを完了します。

  1. lakeflow_pg_ddl_change_tracking.sqlスクリプトをダウンロードして実行します。

    SQL
    \i lakeflow_pg_ddl_change_tracking.sql

    このスクリプトは、 publicスキーマ内に以下のオブジェクトを作成します。オブジェクト名には、スクリプトのバージョンを追跡するバージョン接尾辞(現在は_1_0 )が含まれています。

    • 監査テーブルpublic.lakeflow_ddl_audit_table_1_0 — キャプチャされた DDL イベントを格納します。
    • イベントトリガー関数public.lakeflow_ddl_audit_function_1_0ALTER TABLEイベント用)およびpublic.lakeflow_drop_ddl_audit_function_1_0DROP TABLEイベント用)。
    • イベントトリガーlakeflow_ddl_audit_trigger_1_0ddl_command_endで発生)およびlakeflow_drop_ddl_audit_trigger_1_0sql_dropで発生)。
  2. トリガーと監査テーブルが正常に作成されたことを確認します。

    SQL
    -- Check for the DDL audit table
    SELECT * FROM pg_tables WHERE tablename LIKE 'lakeflow_ddl_audit_table%';

    -- Check for the event triggers
    SELECT * FROM pg_event_trigger WHERE evtname LIKE 'lakeflow%';

    監査テーブルlakeflow_ddl_audit_table_1_0と2つのイベントトリガー( lakeflow_ddl_audit_trigger_1_0lakeflow_drop_ddl_audit_trigger_1_0 )が表示されるはずです。

  3. DDL監査テーブルをパブリケーションに追加してください。このコマンドは、レプリケーションユーザーとしてではなく、パブリケーションの所有者として実行する必要があります。

    SQL
    ALTER PUBLICATION databricks_publication ADD TABLE public.lakeflow_ddl_audit_table_1_0;

クラウド固有の構成に関する注意事項

AWS RDS と Aurora

  • 問題グループでrds.logical_replication問題が1に設定されていることを確認してください。

  • Databricks ワークスペースからの接続を許可するようにセキュリティ グループを構成します。

  • レプリケーション ユーザーにはrds_replicationロールが必要です:

    SQL
    GRANT rds_replication TO databricks_replication;

PostgreSQL 用 Azure データベース

  • Azure ポータルまたは CLI を使用して、サーバー パラメーターで論理レプリケーションを有効にします。
  • Databricks ワークスペースからの接続を許可するようにファイアウォール ルールを構成します。
  • フレキシブル サーバーでは、論理レプリケーションがサポートされています。単一サーバーの場合は、サポートされている層を使用していることを確認してください。

GCPクラウドSQL for PostgreSQL

  • インスタンス設定でcloudsql.logical_decodingフラグを有効にします。
  • Databricks ワークスペースからの接続を許可するように承認済みネットワークを構成します。
  • pglogical 拡張機能を使用する場合は、 cloudsql.enable_pglogicalフラグがonに設定されていることを確認してください。

構成を確認する

セットアップ タスクを完了したら、論理レプリケーションが適切に構成されていることを確認します。

  1. wal_levellogicalに設定されていることを確認します。

    SQL
    SHOW wal_level;
  2. レプリケーション ユーザーにreplication権限があることを確認します。

    SQL
    SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'databricks_replication';
  3. レプリケーションユーザーがテーブルに対するSELECT権限を持っていることを確認してください。schema_name.table_name複製するスキーマとテーブルに置き換えてください (例: public.my_table ):

    SQL
    SELECT has_table_privilege('databricks_replication', 'schema_name.table_name', 'SELECT');
  4. 出版物が存在することを確認します。

    SQL
    SELECT * FROM pg_publication WHERE pubname = 'databricks_publication';
  5. レプリケーション スロットが存在することを確認します。

    SQL
    SELECT slot_name, slot_type, active, restart_lsn
    FROM pg_replication_slots
    WHERE slot_name = 'databricks_slot';
  6. テーブルのレプリカ ID を確認します。

    SQL
    SELECT schemaname, tablename, relreplident
    FROM pg_tables t
    JOIN pg_class c ON t.tablename = c.relname
    WHERE schemaname = 'your_schema';

    relreplident列には、デフォルトのレプリカID(主キーを使用)の場合はd 、完全なレプリカID(主キーのないテーブルまたはTOASTable列を持つテーブルに必要)の場合はf表示される必要があります。

次のステップ

ソースのセットアップが完了したら、PostgreSQL からデータを取り込むための取り込みゲートウェイとパイプラインを作成できます。「PostgreSQL からデータを取り込む」を参照してください。