Create multi-destination pipelines
Applies to: API-based pipeline authoring
Using managed ingestion connectors in Lakeflow Connect, you can write to multiple destination catalogs and schemas from one pipeline. This page gives examples of how to ingest multiple objects into different schemas and how to ingest one object into multiple target tables.
Example: Ingest two objects into different schemas
The example pipeline definitions in this section show how to ingest two objects into different schemas, depending on the pipeline creation interface and the source system.
Google Analytics
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
The following is an example YAML file that you can use in your bundles:
resources:
pipelines:
pipeline_ga4:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-1-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- table:
source_url: <project-2-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
The following is an example Python pipeline spec that you can use in your notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
The following is an example JSON pipeline definition that you can use with CLI commands:
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
The following is an example YAML file that you can use in your bundles:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
The following is an example Python pipeline spec that you can use in your notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
The following is an example JSON pipeline definition that you can use with CLI commands:
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
Workday
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
The following is an example YAML file that you can use in your bundles:
resources:
pipelines:
pipeline_workday:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- report:
source_url: <report-url-2>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
The following is an example Python pipeline spec that you can use in your notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
The following is an example JSON pipeline definition that you can use with CLI commands:
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
Example: Ingest one object three times
The following example pipeline definition shows how to ingest an object into three different destination tables. In the example, the third target table is renamed to differentiate between an object being ingested into the same target schema twice (duplicates aren't supported). If you rename a table in your pipeline, it becomes an API-only pipeline, and you can no longer edit the pipeline in the UI.
Google Analytics
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
The following is an example YAML file that you can use in your bundles:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
},
}
]
}
}
"""
The following is an example JSON pipeline definition that you can use with CLI commands:
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
The following is an example YAML file that you can use in your bundles:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
The following is an example Python pipeline spec that you can use in your notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
The following is an example JSON pipeline definition that you can use with CLI commands:
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
Workday
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
The following is an example YAML file that you can use in your bundles:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
The following is an example Python pipeline spec that you can use in your notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
The following is an example JSON pipeline definition that you can use with CLI commands:
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}