Ingerir dados do Salesforce

Prévia

O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .

Este artigo descreve como ingerir dados do Salesforce e carregá-los em Databricks usando o LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e Delta Live Tables.

O conector de ingestão do Salesforce é compatível com a seguinte fonte:

  • Salesforce ventas cloud

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.

  • O compute sem servidor está habilitado para Notebook, fluxo de trabalho e Delta Live Tables. Consulte Ativar serverless compute .

  • Para criar uma conexão: Você tem CREATE CONNECTION na metastore.

    Para usar uma conexão existente: Você tem USE CONNECTION ou ALL PRIVILEGES no objeto de conexão.

  • USE CATALOG no catálogo de destino.

  • USE SCHEMA e CREATE TABLE em um esquema existente ou CREATE SCHEMA no catálogo de destino.

Criar uma conexão com o Salesforce

Permissões necessárias: CREATE CONNECTION no metastore. Entre em contato com um administrador do metastore para conceder isso.

Para usar uma conexão existente, o senhor precisa de USE CONNECTION ou ALL PRIVILEGES na conexão.

Para criar uma conexão com o Salesforce, faça o seguinte:

  1. No site Databricks workspace, clique em Catalog > External locations > Connections > Create connection.

  2. Para Connection name (Nome da conexão), especifique um nome exclusivo para a conexão do Salesforce.

  3. Para Tipo de conexão, clique em Salesforce.

  4. Defina o tipo de autenticação como OAuth e preencha os campos a seguir:

    • Defina o ID do cliente como o consumidor key que o senhor recuperou do Salesforce.

    • Defina o segredo do cliente como o segredo do consumidor que o senhor recuperou do Salesforce.

    • Defina o escopo de OAuth para as strings literais api  refresh_token.

  5. Se o senhor estiver ingerindo a partir de um Salesforce sandbox account, defina Is sandbox como true.

  6. Clique em fazer login com o Salesforce.

    Observação

    Testar a conexão para verificar se o host está acessível. Ele não testa as credenciais do usuário quanto aos valores corretos de nome de usuário e senha.

    Login no Salesforce
  7. Faça login com seu usuário do Salesforce account. Quando o senhor acessar log in e autorizar, um refresh_token será criado automaticamente.

  8. (Opcional) Se estiver usando uma sandbox do Salesforce, clique em Use Custom Domain (Usar domínio personalizado), forneça o URL da sandbox e prossiga para o login.

    Usar botão de domínio personalizado
    Digite o URL da sandbox
  9. Depois de retornar à página Create Connection (Criar conexão ), clique em Create (Criar).

Criar um pipeline do Delta Live Tables

Este passo descreve como criar a ingestão pipeline. Cada tabela ingerida corresponde a uma tabela de transmissão com o mesmo nome (mas tudo em letras minúsculas) no destino por default, a menos que o senhor a renomeie explicitamente.

  1. Gerar um site pessoal access token.

  2. Cole o código a seguir em uma célula do Python Notebook:

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. Cole o código a seguir em uma segunda célula do Notebook:

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. Cole o código a seguir em uma terceira célula do Notebook:

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "table": {
             "source_schema": "objects",
             "source_table": "<YOUR_FIRST_TABLE>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>"
             }
          },
          {
             "table": {
             "source_schema": "objects",
             "source_table": "YOUR_SECOND_TABLE",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>"
             }
           }
         ]
       }
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. executar a primeira célula do Notebook sem modificá-la.

  6. Modifique a segunda célula do padrão Notebook com os detalhes de seu pipeline. Por exemplo, as tabelas que o senhor deseja ingerir e os destinos.

  7. executar a segunda célula do Notebook padrão. Essa execução create_pipeline.

    1. O senhor pode executar list_pipeline para mostrar o pipeline id e seus detalhes

    2. O senhor pode executar edit_pipeline para editar a definição do site pipeline.

    3. O senhor pode executar delete_pipeline para excluir o pipeline.

Para criar o pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Para atualizar o pipeline:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

Para obter a definição do pipeline:

databricks pipelines get "<pipeline-id>"

Para excluir o pipeline:

databricks pipelines delete "<pipeline-id>"

Para obter mais informações, o senhor pode executar:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

começar, programar e definir alertas em seu pipeline

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em Delta Live Tables.

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, execute o pipeline clicando em começar. O senhor pode programar o pipeline clicando em programar.

  4. Para definir o alerta no site pipeline, clique em programar, clique em Mais opções e, em seguida, adicione uma notificação.

  5. Após a conclusão da ingestão, o senhor pode consultar suas tabelas.

Observação

Na execução do pipeline, o senhor poderá ver duas visualizações de origem para uma determinada tabela. Um view contém o Snapshot para campos de fórmula. O outro view contém os pulls de dados incrementais para campos sem fórmula. Essas visualizações são unidas na tabela de destino.