Pular para o conteúdo principal

Ingira dados do Salesforce

Esta página descreve como ingerir dados do Salesforce e carregá-los em Databricks usando LakeFlow Connect.

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace deve estar habilitado para o Unity Catalog.

  • O compute sem servidor deve estar habilitado para o seu workspace. Consulte os requisitos do compute sem servidor.

  • Se você planeja criar uma nova conexão: você deve ter privilégios CREATE CONNECTION na metastore.

    Se o conector oferecer suporte à criação pipeline baseada em interface de usuário, um administrador poderá criar a conexão e o pipeline ao mesmo tempo, concluindo os passos nesta página. No entanto, se os usuários que criam o pipeline usarem a criação pipeline baseada em API ou não forem usuários administradores, um administrador deverá primeiro criar a conexão no Catalog Explorer. Veja Conectar às fontes de ingestão de gerenciar.

  • Se você planeja usar uma conexão existente: você deve ter privilégios USE CONNECTION ou ALL PRIVILEGES no objeto de conexão.

  • Você deve ter privilégios USE CATALOG no catálogo de destino.

  • Você deve ter privilégios USE SCHEMA e CREATE TABLE em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

  • O Salesforce aplica restrições de uso aos aplicativos conectados. As permissões na tabela a seguir são necessárias para uma primeira autenticação bem-sucedida. Se você não tiver essas permissões, o Salesforce bloqueará a conexão e exigirá que um administrador instale o aplicativo conectado do Databricks.

Condição

Permissão necessária

O Controle de Acesso à API está habilitado.

Customize Application e Modify All Data ou Manage Connected Apps

O Controle de Acesso à API não está habilitado.

Approve Uninstalled Connected Apps

Para obter informações básicas, consulte Preparar-se para alterações nas restrições de uso de aplicativos conectados na documentação do Salesforce.

Para ingerir do Salesforce, o seguinte é recomendado:

  • Crie um usuário do Salesforce que a Databricks possa usar para recuperar dados. Certifique-se de que o usuário tenha acesso à API e a todos os objetos que o senhor planeja ingerir.

Criar um pipeline de ingestão

Permissões necessárias: USE CONNECTION em uma conexão.

Esta etapa descreve como criar o pipeline de ingestão. Cada tabela ingerida é gravada em uma tabela de transmissão com o mesmo nome (mas tudo em letras minúsculas).

info

Beta

Você pode filtrar linhas durante a ingestão para melhorar o desempenho e reduzir a duplicação de dados. Consulte Selecionar linhas para ingestão.

  1. Na barra lateral do site Databricks workspace, clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em Salesforce .

    O assistente de ingestão do Salesforce é aberto.

  3. Na página do pipeline do assistente, insira um nome exclusivo para a ingestão pipeline.

  4. No catálogo de destino dropdown, selecione um catálogo. Os dados ingeridos e o evento logs serão gravados nesse catálogo.

  5. Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados do Salesforce.

    Se não houver conexões com o Salesforce, clique em Criar conexão . Você deve ter o privilégio CREATE CONNECTION na metastore.

  6. Clique em Create pipeline (Criar pipeline) e continue .

  7. Na página Origem , selecione as tabelas a serem ingeridas e clique em Avançar .

    Se você selecionar Todas as tabelas , o conector de ingestão do Salesforce grava todas as tabelas existentes e futuras no esquema de origem no esquema de destino. Há um máximo de 250 objetos por pipeline.

  8. Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.

    Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter os privilégios USE CATALOG e CREATE SCHEMA no catálogo principal.

  9. Clique em Save pipeline (Salvar pipeline) e continue .

  10. (Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.

  11. (Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.

  12. Clique em Save e execute pipeline .

Exemplo de definição de pipeline JSON

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"table": {

"source_schema": "<source-schema>",

"source_table": "<source-table>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

começar, programar e definir alertas em seu pipeline

O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .

  4. Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.

Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.

Exemplo: Ingerir campos de fórmula incrementalmente

info

Beta

Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.

Por default, os campos de fórmula são ingeridos usando o Snapshot completo em cada execução pipeline . No entanto, você pode habilitar a ingestão incremental para campos de fórmula criando um pipeline com o sinalizador pipelines.enableSalesforceFormulaFieldsMVComputation: true.

Para obter mais informações, consulte Ingerir campos de fórmula do Salesforce de forma incremental.

Python
import json

pipeline_config = {
"configuration": {
"pipelines.enableSalesforceFormulaFieldsMVComputation": "true"
},
"name": "salesforce_formula_fields_ingestion",
"ingestion_definition": {
"connection_name": "my_sf_connection",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "my_source_table",
"destination_table": "my_source_table_destination",
"destination_catalog": "my_catalog",
"destination_schema": "my_schema"
}
}
]
}
}

# Create the pipeline
create_pipeline(json.dumps(pipeline_config))

Substitua os seguintes valores:

  • my_sf_connection: Seu nome de conexão do Salesforce.
  • my_source_table: O nome do seu objeto Salesforce (por exemplo, Account).
  • my_source_table_destination: O nome da sua tabela de destino.
  • my_catalog: O nome do seu catálogo de destino.
  • my_schema: O nome do seu esquema.

Exemplo: ingerir dois objetos do Salesforce em esquemas separados

A definição de pipeline de exemplo nesta seção ingere dois objetos do Salesforce em esquemas separados.

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table

Exemplo: ingerir um objeto do Salesforce três vezes

A definição de pipeline de exemplo nesta seção ingere um objeto do Salesforce em três tabelas de destino diferentes. Opcionalmente, você pode atribuir um novo nome às tabelas ingeridas para diferenciá-las quando várias tabelas forem ingeridas no mesmo esquema de destino (duplicatas não são suportadas).

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename

Recurso adicional