Criar pipeline de vários destinos
Aplica-se a : Autoria de pipeline baseada em API
Autoria de pipeline baseada em UI
Usando os conectores de ingestão gerenciar em LakeFlow Connect, o senhor pode gravar em vários catálogos e esquemas de destino a partir de um único pipeline. Esta página fornece exemplos de como ingerir vários objetos em diferentes esquemas e como ingerir um objeto em várias tabelas de destino.
Exemplo: ingerir dois objetos em esquemas diferentes
Os exemplos de definições de pipeline nesta seção mostram como ingerir dois objetos em esquemas diferentes, dependendo da interface de criação do pipeline e do sistema de origem.
Google analítica
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_ga4:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-1-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- table:
source_url: <project-2-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
}
}
]
}
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<destination-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<destination-schema-2>"'"
}}
]
}
}'
Dia de trabalho
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_workday:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- report:
source_url: <report-url-2>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
Exemplo: ingerir um objeto três vezes
O exemplo de definição de pipeline a seguir mostra como ingerir um objeto em três tabelas de destino diferentes. No exemplo, a terceira tabela de destino é renomeada para diferenciar um objeto que está sendo ingerido duas vezes no mesmo esquema de destino (não há suporte para duplicatas). Se o senhor renomear uma tabela no pipeline, ela se tornará um pipeline somente de API e não será mais possível editar o pipeline na interface do usuário.
Google analítica
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
},
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema-name>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <destination-catalog-1> # Location of the pipeline event log
schema: <destination-schema-1> # Location of the pipeline event log
ingestion_definition:
ingestion_gateway_id: <gateway-id>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-1> # Location of first copy
destination_schema: <destination-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of second copy
destination_schema: <destination-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of third copy
destination_schema: <destination-schema-2> # Location of third copy
destination_table: <custom-destination-table-name> # Table rename
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
"destination_table": "<custom-destination-table-name>",
}
}
]
}
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<target-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'",
"destination_table": "<custom-destination-table-name>"
}}
]
}
}'
Dia de trabalho
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Table rename
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}