Ingira dados do Salesforce
Este artigo descreve como ingerir dados do Salesforce e carregá-los em Databricks usando LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e DLT.
O conector de ingestão do Salesforce é compatível com a seguinte fonte:
- Salesforce Sales Cloud
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTION
na metastore.Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. -
Você tem privilégios
USE CATALOG
no catálogo de destino. -
Você tem privilégios
USE SCHEMA
eCREATE TABLE
em um esquema existente ou privilégiosCREATE SCHEMA
no catálogo de destino.
Para ingerir a partir do Salesforce ventas Cloud, recomenda-se o seguinte:
- Crie um usuário do Salesforce que a Databricks possa usar para recuperar dados. Certifique-se de que o usuário tenha acesso à API e a todos os objetos que o senhor planeja ingerir.
Crie uma conexão com o Salesforce
Permissões necessárias: CREATE CONNECTION
na metastore. Entre em contato com um administrador da metastore para conceder isso.
Se quiser criar um pipeline de ingestão usando uma conexão existente, pule para a seção a seguir. Você precisa de USE CONNECTION
ou ALL PRIVILEGES
na conexão.
Para criar uma conexão com o Salesforce, faça o seguinte:
-
No site Databricks workspace, clique em Catalog > External locations > Connections > Create connection .
-
Em Nome da conexão , especifique um nome exclusivo para a conexão do Salesforce.
-
Em Tipo de conexão , clique em Salesforce .
-
Se o senhor estiver ingerindo de um Salesforce sandbox account, defina Is sandbox para
true
. -
Clique em fazer login com o Salesforce .
-
Se estiver fazendo a ingestão a partir de uma sandbox do Salesforce, clique em Usar domínio personalizado . Forneça o URL sandbox e, em seguida, prossiga para log in. A Databricks recomenda fazer login como um usuário do Salesforce dedicado à ingestão da Databricks.
-
Depois de retornar à página Criar conexão , clique em Criar .
Criar um pipeline de ingestão
Permissões necessárias: USE CONNECTION
ou ALL PRIVILEGES
em uma conexão.
Esta etapa descreve como criar o pipeline de ingestão. Cada tabela ingerida corresponde a uma tabela de transmissão com o mesmo nome (mas tudo em letras minúsculas) no destino por default, a menos que o senhor a renomeie explicitamente.
- Databricks UI
- Databricks Asset Bundles
- Databricks CLI
-
Na barra lateral do site Databricks workspace, clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em Salesforce .
O assistente de ingestão do Salesforce é aberto.
-
Na página do pipeline do assistente, insira um nome exclusivo para a ingestão pipeline.
-
No catálogo de destino dropdown, selecione um catálogo. Os dados ingeridos e o evento logs serão gravados nesse catálogo.
-
Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados do Salesforce.
Se não houver conexões com o Salesforce, clique em Criar conexão . Você deve ter o privilégio
CREATE CONNECTION
na metastore. -
Clique em Create pipeline (Criar pipeline) e continue .
-
Na página Origem , selecione as tabelas a serem ingeridas e clique em Avançar .
Se o senhor selecionar Todas as tabelas , o conector de ingestão do Salesforce gravará todas as tabelas existentes e futuras no esquema de origem em Unity Catalog gerenciar tabelas.
-
Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.
Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter os privilégios
USE CATALOG
eCREATE SCHEMA
no catálogo principal. -
Clique em Save pipeline (Salvar pipeline) e continue .
-
Na página Settings (Configurações ), clique em Create programar . Defina a frequência para refresh as tabelas de destino.
-
Opcionalmente, defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.
-
Clique em Save e execute pipeline .
Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init
-
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/sfdc_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/sfdc_job.yml
).
Veja a seguir um exemplo de arquivo
resources/sfdc_pipeline.yml
:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for sfdc_dab
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <salesforce-connection>
objects:
# An array of objects to ingest from Salesforce. This example
# ingests the AccountShare, AccountPartner, and ApexPage objects.
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
table_configuration:
include_columns: # This can be exclude_columns instead
- <column_a>
- <column_b>
- <column_c>
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: ApexPage
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}Veja a seguir um exemplo de arquivo
resources/sfdc_job.yml
:YAMLresources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id} - Um arquivo de definição de pipeline (
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Para criar o pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Para atualizar o pipeline:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Para obter a definição do pipeline:
databricks pipelines get "<pipeline-id>"
Para excluir o pipeline:
databricks pipelines delete "<pipeline-id>"
Para obter mais informações, o senhor pode executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Exemplo de definição de pipeline JSON:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
começar, programar e definir alertas em seu pipeline
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.
Na execução do pipeline, o senhor poderá ver duas visualizações de origem para uma determinada tabela. Um view contém o Snapshot para campos de fórmula. O outro view contém os pulls de dados incrementais para campos sem fórmula. Essas visualizações são unidas na tabela de destino.
Exemplo: ingerir dois objetos do Salesforce em esquemas separados
O exemplo de definição de pipeline nesta seção faz a ingestão de dois objetos do Salesforce em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Exemplo: ingerir um objeto do Salesforce três vezes
O exemplo de definição de pipeline nesta seção ingere um objeto do Salesforce em três tabelas de destino diferentes. O suporte a pipeline de vários destinos é somente de API.
Opcionalmente, você pode renomear uma tabela que você ingere. Se o senhor renomear uma tabela no pipeline, ela se tornará um pipeline somente de API e não será mais possível editar o pipeline na interface do usuário.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename