Ingira dados do Salesforce
Esta página descreve como ingerir dados do Salesforce e carregá-los em Databricks usando LakeFlow Connect.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
- Seu workspace está habilitado para Unity Catalog.
- O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .
- Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTION
na metastore. - Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. - Você tem privilégios
USE CATALOG
no catálogo de destino. - Você tem privilégios
USE SCHEMA
eCREATE TABLE
em um esquema existente ou privilégiosCREATE SCHEMA
no catálogo de destino.
Para ingerir do Salesforce, o seguinte é recomendado:
- Crie um usuário do Salesforce que a Databricks possa usar para recuperar dados. Certifique-se de que o usuário tenha acesso à API e a todos os objetos que o senhor planeja ingerir.
Criar um pipeline de ingestão
Permissões necessárias: USE CONNECTION
ou ALL PRIVILEGES
em uma conexão.
Esta etapa descreve como criar o pipeline de ingestão. Cada tabela ingerida é gravada em uma tabela de transmissão com o mesmo nome (mas tudo em letras minúsculas).
- Databricks UI
- Databricks Asset Bundles
- Databricks CLI
-
Na barra lateral do site Databricks workspace, clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em Salesforce .
O assistente de ingestão do Salesforce é aberto.
-
Na página do pipeline do assistente, insira um nome exclusivo para a ingestão pipeline.
-
No catálogo de destino dropdown, selecione um catálogo. Os dados ingeridos e o evento logs serão gravados nesse catálogo.
-
Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados do Salesforce.
Se não houver conexões com o Salesforce, clique em Criar conexão . Você deve ter o privilégio
CREATE CONNECTION
na metastore. -
Clique em Create pipeline (Criar pipeline) e continue .
-
Na página Origem , selecione as tabelas a serem ingeridas e clique em Avançar .
Se você selecionar Todas as tabelas , o conector de ingestão do Salesforce grava todas as tabelas existentes e futuras no esquema de origem no esquema de destino. Há um máximo de 250 objetos por pipeline.
-
Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.
Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter os privilégios
USE CATALOG
eCREATE SCHEMA
no catálogo principal. -
Clique em Save pipeline (Salvar pipeline) e continue .
-
(Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.
-
(Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.
-
Clique em Save e execute pipeline .
Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init
-
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/sfdc_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/sfdc_job.yml
).
Veja a seguir um exemplo de arquivo
resources/sfdc_pipeline.yml
:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for sfdc_dab
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <salesforce-connection>
objects:
# An array of objects to ingest from Salesforce. This example
# ingests the AccountShare, AccountPartner, and ApexPage objects.
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: ApexPage
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}Veja a seguir um exemplo de arquivo
resources/sfdc_job.yml
:YAMLresources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id} - Um arquivo de definição de pipeline (
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Para criar o pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Para atualizar o pipeline:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Para obter a definição do pipeline:
databricks pipelines get "<pipeline-id>"
Para excluir o pipeline:
databricks pipelines delete "<pipeline-id>"
Para obter mais informações, o senhor pode executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Exemplo de definição de pipeline JSON
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
começar, programar e definir alertas em seu pipeline
O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.
Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.
Na execução do pipeline, o senhor poderá ver duas visualizações de origem para uma determinada tabela. Um view contém o Snapshot para campos de fórmula. O outro view contém os pulls de dados incrementais para campos sem fórmula. Essas visualizações são unidas na tabela de destino.
Exemplo: ingerir dois objetos do Salesforce em esquemas separados
O exemplo de definição de pipeline nesta seção faz a ingestão de dois objetos do Salesforce em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Exemplo: ingerir um objeto do Salesforce três vezes
O exemplo de definição de pipeline nesta seção ingere um objeto do Salesforce em três tabelas de destino diferentes. O suporte a pipeline de vários destinos é somente de API.
Opcionalmente, você pode renomear uma tabela que você ingere. Se o senhor renomear uma tabela no pipeline, ela se tornará um pipeline somente de API e não será mais possível editar o pipeline na interface do usuário.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename