マルチデスティネーションパイプラインの作成
適用対象 : API ベースのパイプライン作成
LakeFlow Connectでマネージド取り込みコネクタを使用すると、1 つのパイプラインから複数の宛先カタログとスキーマに書き込むことができます。このページでは、複数のオブジェクトを異なるスキーマに取り込む方法と、1 つのオブジェクトを複数のターゲットテーブルに取り込む方法の例を示します。
例: 2 つのオブジェクトを異なるスキーマに取り込む
このセクションのパイプライン定義の例は、パイプライン作成インターフェイスとソースシステムに応じて、2つのオブジェクトを異なるスキーマに取り込む方法を示しています。
Google アナリティクス
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
  pipelines:
    pipeline_ga4:
      name: <pipeline>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_url: <project-1-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-1>
              destination_schema: <target-schema-1>
          - table:
              source_url: <project-2-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-2>
              destination_schema: <target-schema-2>
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_catalog": "<project-1-id>",
          "source_schema": "<property-1-name>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_catalog": "<project-2-id>",
          "source_schema": "<property-2-name>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        }
      }
    ]
  }
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
  "resources": {
    "pipelines": {
      "pipeline_ga4": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_url": "<project-1-id>",
                "source_schema": "<property-1-name>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_url": "<project-2-id>",
                "source_schema": "<property-2-name>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              }
            }
          ]
        }
      }
    }
  }
}
セールスフォース
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_schema: <source-schema-1>
              source_table: <source-table-1>
              destination_catalog: <target-catalog-1> # Location of this table
              destination_schema: <target-schema-1> # Location of this table
          - table:
              source_schema: <source-schema-2>
              source_table: <source-table-2>
              destination_catalog: <target-catalog-2> # Location of this table
              destination_schema: <target-schema-2> # Location of this table
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema-1>",
          "source_table": "<source-table-1>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_schema": "<source-schema-2>",
          "source_table": "<source-table-2>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        }
      }
    ]
  }
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
  "resources": {
    "pipelines": {
      "pipeline_sfdc": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_schema": "<source-schema-1>",
                "source_table": "<source-table-1>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_schema": "<source-schema-2>",
                "source_table": "<source-table-2>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              }
            }
          ]
        }
      }
    }
  }
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
バンドルで使用できる YAML リソースファイルの例を次に示します。
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>
    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_schema: <source-schema-1>
              source_table: <source-table-1>
              destination_catalog: <target-catalog-1> # Location of this table
              destination_schema: <target-schema-1> # Location of this table
          - table:
              source_schema: <source-schema-2>
              source_table: <source-table-2>
              destination_catalog: <target-catalog-2> # Location of this table
              destination_schema: <target-schema-2> # Location of this table
Python ノートブックで使用できるインジェストゲートウェイとインジェストパイプラインの仕様の例を次に示します。
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": <gateway-name>,
  "catalog": <destination-catalog>,
  "target": <destination-schema>,
  "gateway_definition": {
    "connection_id": <connection-id>,
    "gateway_storage_catalog": <destination-catalog>,
    "gateway_storage_schema": <destination-schema>,
    "gateway_storage_name": <destination-schema>
    }
}
ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": <pipeline-name>,
  "ingestion_definition": {
    "ingestion_gateway_id": <gateway-pipeline-id>,
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema-1>",
          "source_table": "<source-table-1>",
          "destination_catalog": "<destination-catalog-1>",
          "destination_schema": "<destination-schema-1>",
        },
        "table": {
          "source_schema": "<source-schema-2>",
          "source_table": "<source-table-2>",
          "destination_catalog": "<destination-catalog-2>",
          "destination_schema": "<destination-schema-2>",
        }
      }
    ]
  }
}
Databricks CLI を使用してインジェスト ゲートウェイを作成するには:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
  "connection_id": "'"<connection-id>"'",
  "gateway_storage_catalog": "'"<staging-catalog>"'",
  "gateway_storage_schema": "'"<staging-schema>"'",
  "gateway_storage_name": "'"<gateway-name>"'"
  }
}'
Databricks CLI を使用してインジェスト パイプラインを作成するには、次のようにします。
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"<gateway-id>"'",
  "objects": [
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-1>"'",
        "destination_schema": "'"<destination-schema-1>"'"
        }},
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-2>"'",
        "destination_schema": "'"<destination-schema-2>"'"
        }}
    ]
  }
}'
勤務
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
  pipelines:
    pipeline_workday:
      name: <pipeline>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: <target-catalog-1>
              destination_schema: <target-schema-1>
          - report:
              source_url: <report-url-2>
              destination_catalog: <target-catalog-2>
              destination_schema: <target-schema-2>
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "report": {
          "source_url": "<report-url-1>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "report": {
          "source_url": "<report-url-2>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        }
      }
    ]
  }
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
  "resources": {
    "pipelines": {
      "pipeline_workday": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "report": {
                "source_url": "<report-url-1>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "report": {
                "source_url": "<report-url-2>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              }
            }
          ]
        }
      }
    }
  }
}
例: 1 つのオブジェクトを 3 回取り込む
次のパイプライン定義例は、オブジェクトを 3 つの異なる宛先テーブルに取り込む方法を示しています。この例では、3 番目のターゲットテーブルの名前が変更され、同じターゲットスキーマに 2 回取り込まれるオブジェクトを区別します (重複はサポートされていません)。パイプライン内のテーブルの名前を変更すると、そのテーブルは API 専用パイプラインになり、UI でパイプラインを編集できなくなります。
Google アナリティクス
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_url: <project-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-1> # Location of first copy
              destination_schema: <target-schema-1>	# Location of first copy
          - table:
              source_url: <project-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-2> # Location of second copy
              destination_schema: <target-schema-2> # Location of second copy
	      - table:
              source_url: <project-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-2> # Location of third copy
              destination_schema: <target-schema-2> # Location of third copy
              destination_table: <custom-target-table-name> # Table rename
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_catalog": "<project-id>",
          "source_schema": "<property-name>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_catalog": "<project-id>",
          "source_schema": "<property-name>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        },
        "table": {
          "source_catalog": "<project-id>",
          "source_schema": "<property-name>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
          "destination_table": "<custom-target-table-name>",
        },
      }
    ]
  }
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
  "resources": {
    "pipelines": {
      "pipeline_ga4": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_url": "<project-id>",
                "source_schema": "<property-name>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_url": "<project-id>",
                "source_schema": "<property-name>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              },
              "table": {
                "source_url": "<project-id>",
                "source_schema": "<property-name>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>",
                "destination_table": "<custom-target-table-name>"
              }
            }
          ]
        }
      }
    }
  }
}
セールスフォース
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <target-catalog-1> # Location of first copy
              destination_schema: <target-schema-1>	# Location of first copy
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <target-catalog-2> # Location of second copy
              destination_schema: <target-schema-2> # Location of second copy
	      - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <target-catalog-2> # Location of third copy
              destination_schema: <target-schema-2> # Location of third copy
              destination_table: <custom-target-table-name> # Table rename
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        },
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
          "destination_table": "<custom-target-table-name>",
        }
      }
    ]
  }
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
  "resources": {
    "pipelines": {
      "pipeline_sfdc": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_schema": "<source-schema>",
                "source_table": "<source-table>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_schema": "<source-schema>",
                "source_table": "<source-table>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              },
              "table": {
                "source_schema": "<source-schema>",
                "source_table": "<source-table>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>",
                "destination_table": "<custom-target-table-name>"
              }
            }
          ]
        }
      }
    }
  }
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
バンドルで使用できる YAML リソースファイルの例を次に示します。
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema-name>
      target: <destination-schema>
      catalog: <destination-catalog>
    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog-1> # Location of the pipeline event log
      schema: <destination-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog-1> # Location of first copy
              destination_schema: <destination-schema-1>	# Location of first copy
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog-2> # Location of second copy
              destination_schema: <destination-schema-2> # Location of second copy
        - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog-2> # Location of third copy
              destination_schema: <destination-schema-2> # Location of third copy
              destination_table: <custom-destination-table-name> # Table rename
Python ノートブックで使用できるインジェストゲートウェイとインジェストパイプラインの仕様の例を次に示します。
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": <gateway-name>,
  "catalog": <destination-catalog>,
  "target": <destination-schema>,
  "gateway_definition": {
    "connection_id": <connection-id>,
    "gateway_storage_catalog": <destination-catalog>,
    "gateway_storage_schema": <destination-schema>,
    "gateway_storage_name": <destination-schema>
    }
}
ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": <pipeline-name>,
  "ingestion_definition": {
    "ingestion_gateway_id": <gateway-pipeline-id>,
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_catalog": <source-catalog>,
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog-1>",
          "destination_schema": "<destination-schema-1>",
        },
        "table": {
          "source_catalog": <source-catalog>,
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog-2>",
          "destination_schema": "<destination-schema-2>",
        },
        "table": {
          "source_catalog": <source-catalog>,
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog-2>",
          "destination_schema": "<destination-schema-2>",
          "destination_table": "<custom-destination-table-name>",
        }
      }
    ]
  }
}
Databricks CLI を使用してインジェスト ゲートウェイを作成するには:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
  "connection_id": "'"<connection-id>"'",
  "gateway_storage_catalog": "'"<staging-catalog>"'",
  "gateway_storage_schema": "'"<staging-schema>"'",
  "gateway_storage_name": "'"<gateway-name>"'"
  }
}'
Databricks CLI を使用してインジェスト パイプラインを作成するには、次のようにします。
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"<gateway-id>"'",
  "objects": [
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-1>"'",
        "destination_schema": "'"<target-schema-1>"'"
        }},
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-2>"'",
        "destination_schema": "'"<target-schema-2>"'"
        }},
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-2>"'",
        "destination_schema": "'"<target-schema-2>"'",
        "destination_table": "<custom-destination-table-name>"
        }}
    ]
  }
}'
勤務
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
以下は、バンドルで使用できる YAML ファイルの例です。
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - report:
              source_url: <report-url>
              destination_catalog: <target-catalog-1> # Location of first copy
              destination_schema: <target-schema-1>	# Location of first copy
          - report:
              source_url: <report-url>
              destination_catalog: <target-catalog-2> # Location of second copy
              destination_schema: <target-schema-2> # Location of second copy
	      - report:
              source_url: <report-url>
              destination_catalog: <target-catalog-2> # Location of third copy
              destination_schema: <target-schema-2> # Location of third copy
              destination_table: <custom-target-table-name> # Table rename
以下は、ノートブックで使用できる Python パイプライン仕様の例です。
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "report": {
          "source_url": "<report-url>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "report": {
          "source_url": "<report-url>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        },
        "report": {
          "source_url": "<report-url>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
          "destination_table": "<custom-target-table-name>",
        }
      }
    ]
  }
}
"""
CLI コマンドで使用できる JSON パイプライン定義の例を次に示します。
{
  "resources": {
    "pipelines": {
      "pipeline_workday": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "report": {
                "source_url": "<report-url>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "report": {
                "source_url": "<report-url>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              },
              "report": {
                "source_url": "<report-url>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>",
                "destination_table": "<custom-target-table-name>"
              }
            }
          ]
        }
      }
    }
  }
}