Criar pipeline de vários destinos
Aplica-se a : Criação pipeline baseada em API
Conectores SaaS
Conectores de banco de dados
Ao usar os conectores de ingestão gerenciados no LakeFlow Connect, você pode gravar em vários catálogos e esquemas de destino a partir de um único pipeline. Você também pode inserir várias instâncias de um mesmo objeto no mesmo esquema. No entanto, os conectores de gerenciamento não suportam nomes de tabela duplicados no mesmo esquema de destino, portanto, você deve especificar um novo nome para uma das tabelas para diferenciá-las. Consulte Nomear uma tabela de destino.
Exemplo: ingerir dois objetos em esquemas diferentes
Os exemplos de definições de pipeline nesta seção mostram como ingerir dois objetos em esquemas diferentes, dependendo da interface de criação do pipeline e do sistema de origem.
Google analítica
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_ga4:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-1-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- table:
source_url: <project-2-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-1-id>",
"source_schema": "<property-1-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-2-id>",
"source_schema": "<property-2-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
MySQL
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_mysql:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "MYSQL",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_catalog": "<source-catalog-2>",
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
}
}
]
}
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<destination-schema-1>"'"
}},
{"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<destination-schema-2>"'"
}}
]
}
}'
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema-1>
source_table: <source-table-1>
destination_catalog: <target-catalog-1> # Location of this table
destination_schema: <target-schema-1> # Location of this table
- table:
source_schema: <source-schema-2>
source_table: <source-table-2>
destination_catalog: <target-catalog-2> # Location of this table
destination_schema: <target-schema-2> # Location of this table
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_schema": "<source-schema-1>",
"source_table": "<source-table-1>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_schema": "<source-schema-2>",
"source_table": "<source-table-2>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
}
}
]
}
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<destination-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<destination-schema-2>"'"
}}
]
}
}'
Dia de trabalho
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_workday:
name: <pipeline>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: <target-catalog-1>
destination_schema: <target-schema-1>
- report:
source_url: <report-url-2>
destination_catalog: <target-catalog-2>
destination_schema: <target-schema-2>
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url-1>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url-2>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
}
}
]
}
}
}
}
}
Exemplo: ingerir um objeto três vezes
A seguinte definição de pipeline de exemplo mostra como ingerir um objeto em três tabelas de destino diferentes. No exemplo, a terceira tabela de destino recebe um nome exclusivo para diferenciar quando um objeto é ingerido duas vezes no mesmo esquema de destino (duplicatas não são suportadas).
Google analítica
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_url: <project-id>
source_schema: <property-name>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Specify destination table name
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_catalog": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
},
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_ga4": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_url": "<project-id>",
"source_schema": "<property-name>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
MySQL
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema-name>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_mysql:
name: <pipeline-name>
catalog: <destination-catalog-1> # Location of the pipeline event log
schema: <destination-schema-1> # Location of the pipeline event log
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
- table:
source_catalog: <source-catalog>
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-1> # Location of first copy
destination_schema: <destination-schema-1> # Location of first copy
- table:
source_catalog: <source-catalog>
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of second copy
destination_schema: <destination-schema-2> # Location of second copy
- table:
source_catalog: <source-catalog>
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of third copy
destination_schema: <destination-schema-2> # Location of third copy
destination_table: <custom-destination-table-name> # Specify destination table name
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "MYSQL",
"objects": [
{
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
"destination_table": "<custom-destination-table-name>",
}
}
]
}
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<target-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'",
"destination_table": "<custom-destination-table-name>"
}}
]
}
}'
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Specify destination table name
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_sfdc": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
pipelines:
gateway:
name: <gateway-name>
gateway_definition:
connection_id: <connection-id>
gateway_storage_catalog: <destination-catalog>
gateway_storage_schema: <destination-schema>
gateway_storage_name: <destination-schema-name>
target: <destination-schema>
catalog: <destination-catalog>
pipeline_sqlserver:
name: <pipeline-name>
catalog: <destination-catalog-1> # Location of the pipeline event log
schema: <destination-schema-1> # Location of the pipeline event log
ingestion_definition:
ingestion_gateway_id: <gateway-id>
objects:
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-1> # Location of first copy
destination_schema: <destination-schema-1> # Location of first copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of second copy
destination_schema: <destination-schema-2> # Location of second copy
- table:
source_schema: <source-schema>
source_table: <source-table>
destination_catalog: <destination-catalog-2> # Location of third copy
destination_schema: <destination-schema-2> # Location of third copy
destination_table: <custom-destination-table-name> # Specify destination table name
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": <gateway-name>,
"catalog": <destination-catalog>,
"target": <destination-schema>,
"gateway_definition": {
"connection_id": <connection-id>,
"gateway_storage_catalog": <destination-catalog>,
"gateway_storage_schema": <destination-schema>,
"gateway_storage_name": <destination-schema>
}
}
ingestion_pipeline_spec = {
"pipeline_type": "MANAGED_INGESTION",
"name": <pipeline-name>,
"ingestion_definition": {
"ingestion_gateway_id": <gateway-pipeline-id>,
"source_type": "SQLSERVER",
"objects": [
{
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-1>",
"destination_schema": "<destination-schema-1>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
},
"table": {
"source_catalog": <source-catalog>,
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog-2>",
"destination_schema": "<destination-schema-2>",
"destination_table": "<custom-destination-table-name>",
}
}
]
}
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
"connection_id": "'"<connection-id>"'",
"gateway_storage_catalog": "'"<staging-catalog>"'",
"gateway_storage_schema": "'"<staging-schema>"'",
"gateway_storage_name": "'"<gateway-name>"'"
}
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"<gateway-id>"'",
"objects": [
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-1>"'",
"destination_schema": "'"<target-schema-1>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'"
}},
{"table": {
"source_catalog": "<source-catalog>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "'"<destination-catalog-2>"'",
"destination_schema": "'"<target-schema-2>"'",
"destination_table": "<custom-destination-table-name>"
}}
]
}
}'
Dia de trabalho
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
pipelines:
pipeline_sfdc:
name: <pipeline-name>
catalog: <target-catalog-1> # Location of the pipeline event log
schema: <target-schema-1> # Location of the pipeline event log
ingestion_definition:
connection_name: <connection>
objects:
- report:
source_url: <report-url>
destination_catalog: <target-catalog-1> # Location of first copy
destination_schema: <target-schema-1> # Location of first copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of second copy
destination_schema: <target-schema-2> # Location of second copy
- report:
source_url: <report-url>
destination_catalog: <target-catalog-2> # Location of third copy
destination_schema: <target-schema-2> # Location of third copy
destination_table: <custom-target-table-name> # Specify destination table name
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
"name": "<pipeline>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>",
}
}
]
}
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
"resources": {
"pipelines": {
"pipeline_workday": {
"name": "<pipeline>",
"catalog": "<target-catalog-1>",
"schema": "<target-schema-1>",
"ingestion_definition": {
"connection_name": "<connection>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-1>",
"destination_schema": "<target-schema-1>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>"
},
"report": {
"source_url": "<report-url>",
"destination_catalog": "<target-catalog-2>",
"destination_schema": "<target-schema-2>",
"destination_table": "<custom-target-table-name>"
}
}
]
}
}
}
}
}