Databricks への取り込み用に PostgreSQL を構成する
プレビュー
LakeFlow ConnectのPostgreSQLコネクタはパブリック プレビュー段階です。 パブリック プレビューに登録するには、Databricks アカウント チームにお問い合わせください。
このページでは、 LakeFlow Connectを使用してPostgreSQLからDatabricksに取り込むためのソース セットアップ タスクについて説明します。
チェンジデータキャプチャ用の論理レプリケーション
PostgreSQL コネクタは論理レプリケーションを使用してソース テーブルの変更を追跡します。論理レプリケーションにより、コネクタは、トリガーやソース データベースでの大きなオーバーヘッドを必要とせずに、データの変更 (挿入、更新、削除) をキャプチャできます。
LakeFlow PostgreSQL論理レプリケーションには以下が必要です。
-
LakeFlow Connect PostgreSQLバージョン 13 以降からのデータ レプリケーションをサポートしています。
-
論理レプリケーション用にデータベースを構成します。
PostgreSQL問題
wal_levelはlogicalに設定する必要があります。 -
複製するすべてのテーブルを含むパブリケーションを作成します。
-
レプリケートされるカタログごとにレプリケーション スロットを作成します。
レプリケーション スロットを作成する前にパブリケーションを作成する必要があります。
論理レプリケーションの詳細については、 PostgreSQL Web サイトの論理レプリケーションのドキュメントを参照してください。
ソース設定タスクの概要
Databricks にデータを取り込む前に、PostgreSQL で次のタスクを完了します。
-
PostgreSQL 13以上を検証する
-
ネットワーク アクセスを構成する (セキュリティ グループ、ファイアウォール ルール、または VPN)
-
論理レプリケーションを構成します。
-
論理レプリケーションを有効にする (
wal_level = logical) -
必要な権限を持つレプリケーション ユーザーを作成します。PostgreSQLデータベースのユーザー要件を参照してください
-
テーブルのレプリカ ID を設定します。テーブルのレプリカ ID の設定を参照してください
-
パブリケーションとレプリケーションスロットの作成
-
-
オプション: スキーマ変更の自動検出のためにインライン DDL トラッキングを構成します。インライン DDL トラッキングを選択する場合は、Databricks サポートにお問い合わせください。
複数の PostgreSQL データベースからレプリケートする予定の場合は、データベースごとに個別のパブリケーション スロットとレプリケーション スロットを作成する必要があります。インライン DDL 追跡スクリプト (使用されている場合) も各データベースで実行する必要があります。
論理レプリケーションを構成する
PostgreSQL で論理レプリケーションを有効にするには、データベース設定を構成し、必要なオブジェクトをセットアップします。
WALレベルを論理に設定する
論理レプリケーションには、先行書き込みログ (WAL) を構成する必要があります。この設定では通常、データベースの再起動が必要です。
-
現在の
wal_level設定を確認してください:SQLSHOW wal_level; -
値が
logicalでない場合は、サーバー構成でwal_level = logicalを設定し、PostgreSQL サービスを再起動してください。
レプリケーションユーザーを作成する
レプリケーション権限を持つ Databricks 取り込み専用のユーザーを作成します。
CREATE USER databricks_replication WITH PASSWORD 'your_secure_password';
GRANT CONNECT ON DATABASE your_database TO databricks_replication;
ALTER USER databricks_replication WITH REPLICATION;
詳細な権限要件については、 「PostgreSQL データベース ユーザー要件」を参照してください。
テーブルのレプリカIDを設定する
複製するテーブルごとに、レプリカ ID を構成します。正しい設定はテーブル構造によって異なります。
表構造 | 必要なレプリカID | コマンド |
|---|---|---|
テーブルには主キーがあり、TOAST 可能な列 (たとえば、大きな値を持つ |
| |
テーブルには主キーがありますが、大きな可変長(TOASTable)列が含まれています |
| |
テーブルに主キーがありません |
|
レプリカ ID 設定の詳細については、 PostgreSQLドキュメントの「レプリカ ID」を参照してください。
出版物を作成する
レプリケートするテーブルを含む各データベースにパブリケーションを作成します。
-- Create a publication for specific tables
CREATE PUBLICATION databricks_publication FOR TABLE schema_name.table1, schema_name.table2;
-- Or create a publication for all tables in a database
CREATE PUBLICATION databricks_publication FOR ALL TABLES;
レプリケートする各 PostgreSQL データベースごとに個別のパブリケーションを作成する必要があります。
レプリケーションスロットを構成する
レプリケーション スロットを作成する前に、次のサーバーを構成します。
レプリケーションスロットのWAL保持を制限する
疑問 : max_slot_wal_keep_size
max_slot_wal_keep_sizeを-1 (デフォルト値) に 設定しないことをお勧めします 。これは、遅延または非アクティブなレプリケーション スロットによる保持により、無制限の WAL 膨張が可能になるためです。ワークロードに応じて、この問題を有限の値に設定します。
max_slot_wal_keep_sizeについて詳しくは、 PostgreSQL公式ドキュメントをご覧ください。
一部のマネージド クラウド プロバイダーは、この問題の変更を許可せず、代わりに組み込みスロット モニタリングと自動クリーンアップに依存しています。 運用アラートを設定する前に、プラットフォームの動作を確認してください。
詳細については、以下を参照してください。
レプリケーションスロットの容量を構成する
疑問 : max_replication_slots
複製される各 PostgreSQL データベースには、論理レプリケーション スロットが 1 つ必要です。この String には、少なくとも複製されるデータベースの数と、既存のレプリケーションのニーズを設定します。
WAL送信者を設定する
疑問 : max_wal_senders
この問題は、WAL データをサブスクライバにストリーミングするブロードキャスト WAL 送信側プロセスの最大数を定義します。 ほとんどの場合、効率的で一貫性のあるデータ レプリケーションを確保するには、レプリケーション スロットごとに 1 つの WAL 送信プロセスを用意する必要があります。
他の既存の使用状況を考慮して、 max_wal_senders少なくとも使用中のレプリケーション スロットの数と同じになるように構成します。運用の柔軟性を確保するため、少し高めに設定することをお勧めします。
レプリケーションスロットを作成する
Databricks インジェスト ゲートウェイが変更を追跡するために使用するレプリケーション スロットを各データベースに作成します。
-- Create a replication slot with the pgoutput plugin
SELECT pg_create_logical_replication_slot('databricks_slot', 'pgoutput');
- レプリケーション スロットは、コネクタによって消費されるまで WAL データを保持します。WAL の保持を制限し、無制限の WAL の増加を防ぐために、
max_slot_wal_keep_size問題を構成します。 詳細については、「レプリケーション スロットの構成」を参照してください。 - インジェスト パイプラインを削除する場合は、関連付けられているレプリケーション スロットを手動で削除する必要があります。「レプリケーション スロットのクリーンアップ」を参照してください。
オプション: インラインDDLトラッキングを構成する
インライン DDL トラッキングは、コネクタがソース データベースからのスキーマの変更を自動的に検出して適用できるようにするオプションの機能です。この機能はデフォルトで無効になっています。
インライン DDL 追跡は現在プレビュー段階であり、ワークスペースで有効にするには Databricks サポートに連絡する必要があります。
どのスキーマ変更が自動的に処理され、どのスキーマ変更に完全な更新が必要かについては、 「マネージド コネクタはスキーマ進化をどのように処理しますか?」を参照してください。 そしてスキーマの進化。
インラインDDLトラッキングを設定する
ワークスペースでインライン DDL 追跡が有効になっている場合は、 各PostgreSQLデータベースで 次のステップを完了します。
-
lakeflow_pg_ddl_change_tracking.sqlスクリプトをダウンロードして実行します。
SQL\i lakeflow_pg_ddl_change_tracking.sql -
トリガーと監査テーブルが正常に作成されたことを確認します。
SQL-- Check for the DDL audit table
SELECT * FROM pg_tables WHERE tablename = 'lakeflow_ddl_audit';
-- Check for the event triggers
SELECT * FROM pg_event_trigger WHERE evtname LIKE 'lakeflow%'; -
DDL 監査テーブルをパブリケーションに追加します。
SQLALTER PUBLICATION databricks_publication ADD TABLE public.lakeflow_ddl_audit;
クラウド固有の構成に関する注意事項
AWS RDS と Aurora
-
問題グループで
rds.logical_replication問題が1に設定されていることを確認してください。 -
Databricks ワークスペースからの接続を許可するようにセキュリティ グループを構成します。
-
レプリケーション ユーザーには
rds_replicationロールが必要です:SQLGRANT rds_replication TO databricks_replication;
PostgreSQL 用 Azure データベース
- Azure ポータルまたは CLI を使用して、サーバー パラメーターで論理レプリケーションを有効にします。
- Databricks ワークスペースからの接続を許可するようにファイアウォール ルールを構成します。
- フレキシブル サーバーでは、論理レプリケーションがサポートされています。単一サーバーの場合は、サポートされている層を使用していることを確認してください。
GCPクラウドSQL for PostgreSQL
- インスタンス設定で
cloudsql.logical_decodingフラグを有効にします。 - Databricks ワークスペースからの接続を許可するように承認済みネットワークを構成します。
- pglogical 拡張機能を使用する場合は、
cloudsql.enable_pglogicalフラグがonに設定されていることを確認してください。
構成を確認する
セットアップ タスクを完了したら、論理レプリケーションが適切に構成されていることを確認します。
-
wal_levelがlogicalに設定されていることを確認します。SQLSHOW wal_level; -
レプリケーション ユーザーに
replication権限があることを確認します。SQLSELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'databricks_replication'; -
出版物が存在することを確認します。
SQLSELECT * FROM pg_publication WHERE pubname = 'databricks_publication'; -
レプリケーション スロットが存在することを確認します。
SQLSELECT slot_name, slot_type, active, restart_lsn
FROM pg_replication_slots
WHERE slot_name = 'databricks_slot'; -
テーブルのレプリカ ID を確認します。
SQLSELECT schemaname, tablename, relreplident
FROM pg_tables t
JOIN pg_class c ON t.tablename = c.relname
WHERE schemaname = 'your_schema';FULLレプリカIDの場合、
relreplident列にはf表示されます。
次のステップ
ソースのセットアップが完了したら、PostgreSQL からデータを取り込むための取り込みゲートウェイとパイプラインを作成できます。「PostgreSQL からデータを取り込む」を参照してください。