Pular para o conteúdo principal

Ingira dados do Salesforce

Esta página descreve como ingerir dados do Salesforce e carregá-los em Databricks usando LakeFlow Connect.

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.
  • O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .
  • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION na metastore.
  • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES no objeto de conexão.
  • Você tem privilégios USE CATALOG no catálogo de destino.
  • Você tem privilégios USE SCHEMA e CREATE TABLE em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

Para ingerir do Salesforce, o seguinte é recomendado:

  • Crie um usuário do Salesforce que a Databricks possa usar para recuperar dados. Certifique-se de que o usuário tenha acesso à API e a todos os objetos que o senhor planeja ingerir.

Criar um pipeline de ingestão

Permissões necessárias: USE CONNECTION ou ALL PRIVILEGES em uma conexão.

Esta etapa descreve como criar o pipeline de ingestão. Cada tabela ingerida é gravada em uma tabela de transmissão com o mesmo nome (mas tudo em letras minúsculas).

  1. Na barra lateral do site Databricks workspace, clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em Salesforce .

    O assistente de ingestão do Salesforce é aberto.

  3. Na página do pipeline do assistente, insira um nome exclusivo para a ingestão pipeline.

  4. No catálogo de destino dropdown, selecione um catálogo. Os dados ingeridos e o evento logs serão gravados nesse catálogo.

  5. Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados do Salesforce.

    Se não houver conexões com o Salesforce, clique em Criar conexão . Você deve ter o privilégio CREATE CONNECTION na metastore.

  6. Clique em Create pipeline (Criar pipeline) e continue .

  7. Na página Origem , selecione as tabelas a serem ingeridas e clique em Avançar .

    Se você selecionar Todas as tabelas , o conector de ingestão do Salesforce grava todas as tabelas existentes e futuras no esquema de origem no esquema de destino. Há um máximo de 250 objetos por pipeline.

  8. Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.

    Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter os privilégios USE CATALOG e CREATE SCHEMA no catálogo principal.

  9. Clique em Save pipeline (Salvar pipeline) e continue .

  10. (Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.

  11. (Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.

  12. Clique em Save e execute pipeline .

Exemplo de definição de pipeline JSON

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"table": {

"source_schema": "<source-schema>",

"source_table": "<source-table>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

começar, programar e definir alertas em seu pipeline

O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .

  4. Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.

Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.

nota

Na execução do pipeline, o senhor poderá ver duas visualizações de origem para uma determinada tabela. Um view contém o Snapshot para campos de fórmula. O outro view contém os pulls de dados incrementais para campos sem fórmula. Essas visualizações são unidas na tabela de destino.

Exemplo: ingerir dois objetos do Salesforce em esquemas separados

O exemplo de definição de pipeline nesta seção faz a ingestão de dois objetos do Salesforce em esquemas separados. O suporte a pipeline de vários destinos é somente de API.

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table

Exemplo: ingerir um objeto do Salesforce três vezes

O exemplo de definição de pipeline nesta seção ingere um objeto do Salesforce em três tabelas de destino diferentes. O suporte a pipeline de vários destinos é somente de API.

Opcionalmente, você pode renomear uma tabela que você ingere. Se o senhor renomear uma tabela no pipeline, ela se tornará um pipeline somente de API e não será mais possível editar o pipeline na interface do usuário.

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename

Recurso adicional