PostgreSQLからデータを取り込む
プレビュー
LakeFlow ConnectのPostgreSQLコネクタはパブリック プレビュー段階です。 パブリック プレビューに登録するには、Databricks アカウント チームにお問い合わせください。
このページではLakeFlow Connectを使用してPostgreSQLからデータを取り込み、 Databricksにロードする方法について説明します。 PostgreSQLコネクタは、 Azure ExpressRoute、 AWS Direct Connect、または VPN ネットワークを使用するAWS RDS PostgreSQL 、Aurora PostgreSQL 、 Amazon EC2 、 Azure Database for PostgreSQL 、 Azure仮想マシン、 GCPクラウドSQL for PostgreSQL 、およびPostgreSQLデータベースをサポートします。
始める前に
インジェスト ゲートウェイとインジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースで Unity Catalog が有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。
-
接続を作成する場合: メタストアに対する
CREATE CONNECTION権限があります。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合は、このページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン オーサリングを使用する場合は、このページのステップを完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する予定の場合: 接続に対して
USE CONNECTION権限またはALL PRIVILEGES権限があります。 -
ターゲット カタログに対する
USE CATALOG権限があります。 -
既存のスキーマに対する
USE SCHEMA、CREATE TABLE、およびCREATE VOLUME権限、またはターゲット カタログに対するCREATE SCHEMA権限があります。 -
プライマリ PostgreSQL インスタンスにアクセスできます。論理レプリケーションはプライマリ インスタンスでのみサポートされ、読み取りレプリカではサポートされません。
-
クラスターを作成するための無制限の権限、またはカスタム ポリシー (API のみ)。ゲートウェイのカスタム ポリシーは、次の要件を満たす必要があります。
-
ファミリー: ジョブコンピュート
-
ポリシー ファミリを上書きします:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
Databricks では、ゲートウェイのパフォーマンスに影響を与えないため、インジェスト ゲートウェイには可能な限り小さいワーカー ノードを指定することをお勧めします。次のコンピュート ポリシーにより、 Databricksワークロードのニーズに合わせて取り込みゲートウェイをスケーリングできます。 ソース データベースから効率的かつパフォーマンスの高いデータ抽出を可能にするには、最小要件として 8 コアが必要です。
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "n2-highmem-64"
},
"node_type_id": {
"type": "fixed",
"value": "n2-standard-4"
}
}クラスターポリシーの詳細については、 「コンピュート ポリシーの選択」を参照してください。
-
PostgreSQL から取り込むには、ソースのセットアップも完了する必要があります。
オプション1: Databricks UI
PostgreSQL の UI サポートは近日中に提供される予定です。現時点では、オプション 2 で説明されているノートブックまたは CLI ワークフローを使用します。
管理者ユーザーは、Databricks UI で接続とパイプラインを同時に作成できます。これは、管理された取り込みパイプラインを作成する最も簡単な方法です。
-
Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
-
[データの追加] ページの [Databricks コネクタ] で、 [PostgreSQL] をクリックします。
取り込みウィザードが開きます。
-
ウィザードの 「取り込みゲートウェイ」 ページで、ゲートウェイの一意の名前を入力します。
-
取り込みデータをステージングするためのカタログとスキーマを選択し、 「次へ」 をクリックします。
-
[取り込みパイプライン] ページで、パイプラインの一意の名前を入力します。
-
宛先カタログ では、取り込んだデータを保存するカタログを選択します。
-
ソース データにアクセスするために必要な資格情報を保存するUnity Catalog接続を選択します。
ソースへの既存の接続がない場合は、 「接続の作成」を クリックし、 「Databricks への取り込み用に PostgreSQL を構成する」から取得した認証詳細を入力します。メタストアに対する
CREATE CONNECTION権限が必要です。 -
パイプラインの作成および続行 をクリックします。
-
[ソース] ページで、取り込むテーブルを選択します。
-
必要に応じて、デフォルトの履歴追跡設定を変更します。詳細については、 「履歴追跡の有効化 ( SCDタイプ 2)」を参照してください。
-
次へ をクリックします。
-
宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、 「スキーマの作成」 をクリックします。親カタログに対する
USE CATALOGおよびCREATE SCHEMA権限が必要です。 -
保存して続行 をクリックします。
-
データベース設定 ページで、取り込み元となる各データベースのレプリケーション スロット名とパブリケーション名を入力します。これらは、Databricks への取り込み用に PostgreSQL を構成する際に作成されました。
-
次へ をクリックします。
-
(オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。
-
(オプション) パイプライン操作の成功または失敗に関する電子メール通知を設定します。
-
パイプラインの保存と実行 をクリックします。
オプション2: その他のインターフェース
Databricks Asset Bundles、 Databricks APIs 、 Databricks SDK、またはDatabricks CLIを使用して取り込む前に、既存のUnity Catalog接続にアクセスできる必要があります。 手順については、 「管理対象取り込みソースへの接続」を参照してください。
ステージングカタログとスキーマを作成する
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。
- CLI
export CONNECTION_NAME="my_postgresql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_postgresql_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="postgresql-instance.example.com"
export DB_PORT="5432"
export DB_DATABASE="your_database"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "POSTGRESQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"database": "'"$DB_DATABASE"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
ゲートウェイと取り込み パイプラインを作成する
インジェスチョン ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、Unity Catalog ステージング ボリュームに保存します。ゲートウェイを継続的なパイプラインとして実行する必要があります。これは、PostgreSQL にとって、Write-Ahead Log (WAL) の肥大化を防ぎ、レプリケーション スロットに未使用の変更が蓄積されないようにするために重要です。
取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。
- Databricks Asset Bundles
- Notebook
- CLI
このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/postgresql_pipeline.yml)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/postgresql_job.yml)。
以下は
resources/postgresql_pipeline.ymlファイルの例です。YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: postgresql-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <postgresql-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_postgresql:
name: postgresql-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
source_type: POSTGRESQL
objects:
# Modify this with your tables!
- table:
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
source_catalog: your_database
source_schema: public
source_table: orders
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: your_database
source_schema: public
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
source_configurations:
- catalog:
source_catalog: your_database
postgres:
slot_config:
slot_name: db_slot
publication_name: db_pub
target: ${var.dest_schema}
catalog: ${var.dest_catalog}以下は
resources/postgresql_job.ymlファイルの例です。YAMLresources:
jobs:
postgresql_dab_job:
name: postgresql_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_postgresql.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
次のノートブックのConfigurationセルを、ソース接続、ターゲット カタログ、ターゲット スキーマ、およびソースから取り込むテーブルで更新します。
ゲートウェイと取り込み パイプラインを作成する
ゲートウェイを作成するには:
gateway_json=$(cat <<EOF
{
"name": "$GATEWAY_PIPELINE_NAME",
"gateway_definition": {
"connection_name": "$CONNECTION_NAME",
"gateway_storage_catalog": "$STAGING_CATALOG",
"gateway_storage_schema": "$STAGING_SCHEMA",
"gateway_storage_name": "$GATEWAY_PIPELINE_NAME"
}
}
EOF
)
output=$(databricks pipelines create --json "$gateway_json")
echo $output
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
取り込みパイプラインを作成するには:
pipeline_json=$(cat <<EOF
{
"name": "$INGESTION_PIPELINE_NAME",
"ingestion_definition": {
"ingestion_gateway_id": "$GATEWAY_PIPELINE_ID",
"source_type": "POSTGRESQL",
"objects": [
{
# Modify this with your tables!
"table": {
# Ingest the table public.orders to dest_catalog.dest_schema.orders.
"source_catalog": "your_database",
"source_schema": "public",
"source_table": "table",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA",
"destination_table": "<YOUR_DATABRICKS_TABLE>"
}
},
{
"schema": {
# Ingest all tables in the public schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
"source_catalog": "your_database",
"source_schema": "public",
"destination_catalog": "$TARGET_CATALOG",
"destination_schema": "$TARGET_SCHEMA"
}
}
],
"source_configurations": [
{
"catalog": {
"source_catalog": "your_database",
"postgres": {
"slot_config": {
"slot_name": "db_slot", # Slot created during source setup
"publication_name": "db_pub" # Publication created during source setup
}
}
}
}
]
}
}
EOF
)
databricks pipelines create --json "$pipeline_json"
Databricks CLI v0.276.0 以降が必要です。
パイプラインを開始、スケジュール、アラートを設定する
パイプラインのアラートの開始、スケジュール設定、および設定については、 「一般的なパイプライン メンテナンス タスク」を参照してください。
データ取り込みの成功を確認する
パイプラインの詳細ページのリスト ビューには、データが取り込まれるときに処理されるレコードの数が表示されます。これらの数字は自動的に更新されます。

Upserted records列とDeleted records列はデフォルトでは表示されません。列の設定をクリックすると有効になりますボタンをクリックして選択します。