Criar pipeline de vários destinos
Aplica-se a :  Criação de pipeline baseado em API
Usando os conectores de ingestão gerenciar em LakeFlow Connect, o senhor pode gravar em vários catálogos e esquemas de destino a partir de um único pipeline. Esta página fornece exemplos de como ingerir vários objetos em diferentes esquemas e como ingerir um objeto em várias tabelas de destino.
Exemplo: ingerir dois objetos em esquemas diferentes
Os exemplos de definições de pipeline nesta seção mostram como ingerir dois objetos em esquemas diferentes, dependendo da interface de criação do pipeline e do sistema de origem.
Google analítica
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
  pipelines:
    pipeline_ga4:
      name: <pipeline>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_url: <project-1-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-1>
              destination_schema: <target-schema-1>
          - table:
              source_url: <project-2-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-2>
              destination_schema: <target-schema-2>
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_catalog": "<project-1-id>",
          "source_schema": "<property-1-name>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_catalog": "<project-2-id>",
          "source_schema": "<property-2-name>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        }
      }
    ]
  }
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
  "resources": {
    "pipelines": {
      "pipeline_ga4": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_url": "<project-1-id>",
                "source_schema": "<property-1-name>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_url": "<project-2-id>",
                "source_schema": "<property-2-name>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              }
            }
          ]
        }
      }
    }
  }
}
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_schema: <source-schema-1>
              source_table: <source-table-1>
              destination_catalog: <target-catalog-1> # Location of this table
              destination_schema: <target-schema-1> # Location of this table
          - table:
              source_schema: <source-schema-2>
              source_table: <source-table-2>
              destination_catalog: <target-catalog-2> # Location of this table
              destination_schema: <target-schema-2> # Location of this table
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema-1>",
          "source_table": "<source-table-1>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_schema": "<source-schema-2>",
          "source_table": "<source-table-2>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        }
      }
    ]
  }
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
  "resources": {
    "pipelines": {
      "pipeline_sfdc": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_schema": "<source-schema-1>",
                "source_table": "<source-table-1>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_schema": "<source-schema-2>",
                "source_table": "<source-table-2>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              }
            }
          ]
        }
      }
    }
  }
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>
    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_schema: <source-schema-1>
              source_table: <source-table-1>
              destination_catalog: <target-catalog-1> # Location of this table
              destination_schema: <target-schema-1> # Location of this table
          - table:
              source_schema: <source-schema-2>
              source_table: <source-table-2>
              destination_catalog: <target-catalog-2> # Location of this table
              destination_schema: <target-schema-2> # Location of this table
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": <gateway-name>,
  "catalog": <destination-catalog>,
  "target": <destination-schema>,
  "gateway_definition": {
    "connection_id": <connection-id>,
    "gateway_storage_catalog": <destination-catalog>,
    "gateway_storage_schema": <destination-schema>,
    "gateway_storage_name": <destination-schema>
    }
}
ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": <pipeline-name>,
  "ingestion_definition": {
    "ingestion_gateway_id": <gateway-pipeline-id>,
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema-1>",
          "source_table": "<source-table-1>",
          "destination_catalog": "<destination-catalog-1>",
          "destination_schema": "<destination-schema-1>",
        },
        "table": {
          "source_schema": "<source-schema-2>",
          "source_table": "<source-table-2>",
          "destination_catalog": "<destination-catalog-2>",
          "destination_schema": "<destination-schema-2>",
        }
      }
    ]
  }
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
  "connection_id": "'"<connection-id>"'",
  "gateway_storage_catalog": "'"<staging-catalog>"'",
  "gateway_storage_schema": "'"<staging-schema>"'",
  "gateway_storage_name": "'"<gateway-name>"'"
  }
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"<gateway-id>"'",
  "objects": [
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-1>"'",
        "destination_schema": "'"<destination-schema-1>"'"
        }},
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-2>"'",
        "destination_schema": "'"<destination-schema-2>"'"
        }}
    ]
  }
}'
Dia de trabalho
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
  pipelines:
    pipeline_workday:
      name: <pipeline>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1> # Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: <target-catalog-1>
              destination_schema: <target-schema-1>
          - report:
              source_url: <report-url-2>
              destination_catalog: <target-catalog-2>
              destination_schema: <target-schema-2>
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "report": {
          "source_url": "<report-url-1>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "report": {
          "source_url": "<report-url-2>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        }
      }
    ]
  }
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
  "resources": {
    "pipelines": {
      "pipeline_workday": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "report": {
                "source_url": "<report-url-1>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "report": {
                "source_url": "<report-url-2>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              }
            }
          ]
        }
      }
    }
  }
}
Exemplo: ingerir um objeto três vezes
O exemplo de definição de pipeline a seguir mostra como ingerir um objeto em três tabelas de destino diferentes. No exemplo, a terceira tabela de destino é renomeada para diferenciar um objeto que está sendo ingerido duas vezes no mesmo esquema de destino (não há suporte para duplicatas). Se o senhor renomear uma tabela no pipeline, ela se tornará um pipeline somente de API e não será mais possível editar o pipeline na interface do usuário.
Google analítica
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_url: <project-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-1> # Location of first copy
              destination_schema: <target-schema-1>	# Location of first copy
          - table:
              source_url: <project-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-2> # Location of second copy
              destination_schema: <target-schema-2> # Location of second copy
	      - table:
              source_url: <project-id>
              source_schema: <property-name>
              destination_catalog: <target-catalog-2> # Location of third copy
              destination_schema: <target-schema-2> # Location of third copy
              destination_table: <custom-target-table-name> # Table rename
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_catalog": "<project-id>",
          "source_schema": "<property-name>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_catalog": "<project-id>",
          "source_schema": "<property-name>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        },
        "table": {
          "source_catalog": "<project-id>",
          "source_schema": "<property-name>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
          "destination_table": "<custom-target-table-name>",
        },
      }
    ]
  }
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
  "resources": {
    "pipelines": {
      "pipeline_ga4": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_url": "<project-id>",
                "source_schema": "<property-name>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_url": "<project-id>",
                "source_schema": "<property-name>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              },
              "table": {
                "source_url": "<project-id>",
                "source_schema": "<property-name>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>",
                "destination_table": "<custom-target-table-name>"
              }
            }
          ]
        }
      }
    }
  }
}
Salesforce
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <target-catalog-1> # Location of first copy
              destination_schema: <target-schema-1>	# Location of first copy
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <target-catalog-2> # Location of second copy
              destination_schema: <target-schema-2> # Location of second copy
	      - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <target-catalog-2> # Location of third copy
              destination_schema: <target-schema-2> # Location of third copy
              destination_table: <custom-target-table-name> # Table rename
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        },
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
          "destination_table": "<custom-target-table-name>",
        }
      }
    ]
  }
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
  "resources": {
    "pipelines": {
      "pipeline_sfdc": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "table": {
                "source_schema": "<source-schema>",
                "source_table": "<source-table>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "table": {
                "source_schema": "<source-schema>",
                "source_table": "<source-table>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              },
              "table": {
                "source_schema": "<source-schema>",
                "source_table": "<source-table>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>",
                "destination_table": "<custom-target-table-name>"
              }
            }
          ]
        }
      }
    }
  }
}
SQL Server
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
A seguir, um exemplo de arquivo de recurso YAML que o senhor pode usar em seu pacote:
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema-name>
      target: <destination-schema>
      catalog: <destination-catalog>
    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog-1> # Location of the pipeline event log
      schema: <destination-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog-1> # Location of first copy
              destination_schema: <destination-schema-1>	# Location of first copy
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog-2> # Location of second copy
              destination_schema: <destination-schema-2> # Location of second copy
        - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog-2> # Location of third copy
              destination_schema: <destination-schema-2> # Location of third copy
              destination_table: <custom-destination-table-name> # Table rename
A seguir, exemplos de especificações de gateway de ingestão e pipeline de ingestão que o senhor pode usar em um notebook Python:
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": <gateway-name>,
  "catalog": <destination-catalog>,
  "target": <destination-schema>,
  "gateway_definition": {
    "connection_id": <connection-id>,
    "gateway_storage_catalog": <destination-catalog>,
    "gateway_storage_schema": <destination-schema>,
    "gateway_storage_name": <destination-schema>
    }
}
ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": <pipeline-name>,
  "ingestion_definition": {
    "ingestion_gateway_id": <gateway-pipeline-id>,
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_catalog": <source-catalog>,
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog-1>",
          "destination_schema": "<destination-schema-1>",
        },
        "table": {
          "source_catalog": <source-catalog>,
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog-2>",
          "destination_schema": "<destination-schema-2>",
        },
        "table": {
          "source_catalog": <source-catalog>,
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog-2>",
          "destination_schema": "<destination-schema-2>",
          "destination_table": "<custom-destination-table-name>",
        }
      }
    ]
  }
}
Para criar o gateway de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<gateway-name>"'",
"gateway_definition": {
  "connection_id": "'"<connection-id>"'",
  "gateway_storage_catalog": "'"<staging-catalog>"'",
  "gateway_storage_schema": "'"<staging-schema>"'",
  "gateway_storage_name": "'"<gateway-name>"'"
  }
}'
Para criar o pipeline de ingestão usando a CLI do Databricks:
databricks pipelines create --json '{
"name": "'"<pipeline-name>"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"<gateway-id>"'",
  "objects": [
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-1>"'",
        "destination_schema": "'"<target-schema-1>"'"
        }},
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-2>"'",
        "destination_schema": "'"<target-schema-2>"'"
        }},
    {"table": {
        "source_catalog": "<source-catalog>",
        "source_schema": "<source-schema>",
        "source_table": "<source-table>",
        "destination_catalog": "'"<destination-catalog-2>"'",
        "destination_schema": "'"<target-schema-2>"'",
        "destination_table": "<custom-destination-table-name>"
        }}
    ]
  }
}'
Dia de trabalho
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
Veja a seguir um exemplo de arquivo YAML que você pode usar em seus pacotes:
resources:
  pipelines:
    pipeline_sfdc:
      name: <pipeline-name>
      catalog: <target-catalog-1> # Location of the pipeline event log
      schema: <target-schema-1>	# Location of the pipeline event log
      ingestion_definition:
        connection_name: <connection>
        objects:
          - report:
              source_url: <report-url>
              destination_catalog: <target-catalog-1> # Location of first copy
              destination_schema: <target-schema-1>	# Location of first copy
          - report:
              source_url: <report-url>
              destination_catalog: <target-catalog-2> # Location of second copy
              destination_schema: <target-schema-2> # Location of second copy
	      - report:
              source_url: <report-url>
              destination_catalog: <target-catalog-2> # Location of third copy
              destination_schema: <target-schema-2> # Location of third copy
              destination_table: <custom-target-table-name> # Table rename
A seguir, um exemplo de Python pipeline spec que o senhor pode usar em seu Notebook:
pipeline_spec = """
{
  "name": "<pipeline>",
  "ingestion_definition": {
    "connection_name": "<connection>",
    "objects": [
      {
        "report": {
          "source_url": "<report-url>",
          "destination_catalog": "<target-catalog-1>",
          "destination_schema": "<target-schema-1>",
        },
        "report": {
          "source_url": "<report-url>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
        },
        "report": {
          "source_url": "<report-url>",
          "destination_catalog": "<target-catalog-2>",
          "destination_schema": "<target-schema-2>",
          "destination_table": "<custom-target-table-name>",
        }
      }
    ]
  }
}
"""
A seguir, um exemplo de definição de JSON pipeline que o senhor pode usar com CLI comando:
{
  "resources": {
    "pipelines": {
      "pipeline_workday": {
        "name": "<pipeline>",
        "catalog": "<target-catalog-1>",
        "schema": "<target-schema-1>",
        "ingestion_definition": {
          "connection_name": "<connection>",
          "objects": [
            {
              "report": {
                "source_url": "<report-url>",
                "destination_catalog": "<target-catalog-1>",
                "destination_schema": "<target-schema-1>"
              },
              "report": {
                "source_url": "<report-url>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>"
              },
              "report": {
                "source_url": "<report-url>",
                "destination_catalog": "<target-catalog-2>",
                "destination_schema": "<target-schema-2>",
                "destination_table": "<custom-target-table-name>"
              }
            }
          ]
        }
      }
    }
  }
}