Ingerir dados do Salesforce
Prévia
O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .
Este artigo descreve como ingerir dados do Salesforce e carregá-los em Databricks usando o LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e Delta Live Tables.
O conector de ingestão do Salesforce é compatível com a seguinte fonte:
Salesforce ventas cloud
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
Seu workspace está habilitado para Unity Catalog.
O compute sem servidor está habilitado para Notebook, fluxo de trabalho e Delta Live Tables. Consulte Ativar serverless compute .
Para criar uma conexão: Você tem
CREATE CONNECTION
na metastore.Para usar uma conexão existente: Você tem
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão.USE CATALOG
no catálogo de destino.USE SCHEMA
eCREATE TABLE
em um esquema existente ouCREATE SCHEMA
no catálogo de destino.
Criar uma conexão com o Salesforce
Permissões necessárias: CREATE CONNECTION
no metastore. Entre em contato com um administrador do metastore para conceder isso.
Para usar uma conexão existente, o senhor precisa de USE CONNECTION
ou ALL PRIVILEGES
na conexão.
Para criar uma conexão com o Salesforce, faça o seguinte:
No site Databricks workspace, clique em Catalog > External locations > Connections > Create connection.
Para Connection name (Nome da conexão), especifique um nome exclusivo para a conexão do Salesforce.
Para Tipo de conexão, clique em Salesforce.
Defina o tipo de autenticação como OAuth e preencha os campos a seguir:
Defina o ID do cliente como o consumidor key que o senhor recuperou do Salesforce.
Defina o segredo do cliente como o segredo do consumidor que o senhor recuperou do Salesforce.
Defina o escopo de OAuth para as strings literais
api refresh_token
.
Se o senhor estiver ingerindo a partir de um Salesforce sandbox account, defina Is sandbox como
true
.Clique em fazer login com o Salesforce.
Observação
Testar a conexão para verificar se o host está acessível. Ele não testa as credenciais do usuário quanto aos valores corretos de nome de usuário e senha.
Faça login com seu usuário do Salesforce account. Quando o senhor acessar log in e autorizar, um
refresh_token
será criado automaticamente.(Opcional) Se estiver usando uma sandbox do Salesforce, clique em Use Custom Domain (Usar domínio personalizado), forneça o URL da sandbox e prossiga para o login.
Depois de retornar à página Create Connection (Criar conexão ), clique em Create (Criar).
Criar um pipeline do Delta Live Tables
Este passo descreve como criar a ingestão pipeline. Cada tabela ingerida corresponde a uma tabela de transmissão com o mesmo nome (mas tudo em letras minúsculas) no destino por default, a menos que o senhor a renomeie explicitamente.
Cole o código a seguir em uma célula do Python Notebook:
# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
Cole o código a seguir em uma segunda célula do Notebook:
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
Cole o código a seguir em uma terceira célula do Notebook:
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<YOUR_PIPELINE_NAME>", "ingestion_definition": { "connection_name": "<YOUR_CONNECTON_NAME>", "objects": [ { "table": { "source_schema": "objects", "source_table": "<YOUR_FIRST_TABLE>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_TABLE>" } }, { "table": { "source_schema": "objects", "source_table": "YOUR_SECOND_TABLE", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>" } } ] } } """ create_pipeline(pipeline_spec)
executar a primeira célula do Notebook sem modificá-la.
Modifique a segunda célula do padrão Notebook com os detalhes de seu pipeline. Por exemplo, as tabelas que o senhor deseja ingerir e os destinos.
executar a segunda célula do Notebook padrão. Essa execução
create_pipeline
.O senhor pode executar list_pipeline para mostrar o pipeline id e seus detalhes
O senhor pode executar edit_pipeline para editar a definição do site pipeline.
O senhor pode executar delete_pipeline para excluir o pipeline.
Para criar o pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Para atualizar o pipeline:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
Para obter a definição do pipeline:
databricks pipelines get "<pipeline-id>"
Para excluir o pipeline:
databricks pipelines delete "<pipeline-id>"
Para obter mais informações, o senhor pode executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
começar, programar e definir alertas em seu pipeline
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em Delta Live Tables.
O novo pipeline aparece na lista pipeline.
Para acessar view os detalhes de pipeline, clique no nome pipeline.
Na página de detalhes do pipeline, execute o pipeline clicando em começar. O senhor pode programar o pipeline clicando em programar.
Para definir o alerta no site pipeline, clique em programar, clique em Mais opções e, em seguida, adicione uma notificação.
Após a conclusão da ingestão, o senhor pode consultar suas tabelas.
Observação
Na execução do pipeline, o senhor poderá ver duas visualizações de origem para uma determinada tabela. Um view contém o Snapshot para campos de fórmula. O outro view contém os pulls de dados incrementais para campos sem fórmula. Essas visualizações são unidas na tabela de destino.