マルチデスティネーションパイプラインの作成
適用対象 : API ベースのパイプライン作成
UI ベースのパイプライン作成
LakeFlow Connectでマネージド取り込みコネクタを使用すると、1 つのパイプラインから複数の宛先カタログとスキーマに書き込むことができます。このページでは、複数のオブジェクトを異なるスキーマに取り込む方法と、1 つのオブジェクトを複数のターゲットテーブルに取り込む方法の例を示します。
例: 2 つのオブジェクトを異なるスキーマに取り込む
このセクションのパイプライン定義の例は、パイプライン作成インターフェイスとソースシステムに応じて、2つのオブジェクトを異なるスキーマに取り込む方法を示しています。
Google アナリティクス
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
pipelines:
pipeline_ga4:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-1-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- table:
source_url: <project-2-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
セールスフォース
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
pipelines:
pipeline_sfdc:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
バンドルで使用できる YAML リソースファイルの例を次に示します。
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
Python ノートブックで使用できるインジェストゲートウェイとインジェストパイプラインの仕様の例を次に示します。
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
}
}
]
}
}
Databricks CLI を使用してインジェスト ゲートウェイを作成するには:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Databricks CLI を使用してインジェスト パイプラインを作成するには、次のようにします。
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<destination-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<destination-schema-2>"'"
}}
]
}
}'
勤務
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
pipelines:
pipeline_workday:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- report:
source_url: <report-url-2>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
例: 1 つのオブジェクトを 3 回取り込む
次のパイプライン定義例は、オブジェクトを 3 つの異なる宛先テーブルに取り込む方法を示しています。この例では、3 番目のターゲットテーブルの名前が変更され、同じターゲットスキーマに 2 回取り込まれるオブジェクトを区別します (重複はサポートされていません)。パイプライン内のテーブルの名前を変更すると、そのテーブルは API 専用パイプラインになり、UI でパイプラインを編集できなくなります。
Google アナリティクス
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
},
}
]
}
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
セールスフォース
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
バンドルで使用できる YAML リソースファイルの例を次に示します。
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema-name>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <destination-catalog-1> # Location of the pipeline event log
schema: <destination-schema-1> # Location of the pipeline event log
ingestion_definition:
ingestion_gateway_id: <gateway-id>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-1> # Location of first copy
destination_schema: <destination-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of second copy
destination_schema: <destination-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of third copy
destination_schema: <destination-schema-2> # Location of third copy
destination_table: <custom-destination-table-name> # Table rename
Python ノートブックで使用できるインジェストゲートウェイとインジェストパイプラインの仕様の例を次に示します。
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
"destination_table": "<custom-destination-table-name>",
}
}
]
}
}
Databricks CLI を使用してインジェスト ゲートウェイを作成するには:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Databricks CLI を使用してインジェスト パイプラインを作成するには、次のようにします。
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<target-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'",
"destination_table": "<custom-destination-table-name>"
}}
]
}
}'
勤務
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}