Ingest data from Salesforce

Preview

LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.

This article describes how to ingest data from Salesforce and load it into Databricks using LakeFlow Connect. The resulting ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.

The Salesforce ingestion connector supports the following source:

  • Salesforce Sales Cloud

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.

  • Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.

  • To create a connection: You have CREATE CONNECTION on the metastore.

    To use an existing connection: You have USE CONNECTION or ALL PRIVILEGES on the connection object.

  • USE CATALOG on the target catalog.

  • USE SCHEMA and CREATE TABLE on an existing schema or CREATE SCHEMA on the target catalog.

Create a Salesforce connection

Permissions required: CREATE CONNECTION on the metastore. Contact a metastore admin to grant this.

To use an existing connection instead, you need USE CONNECTION or ALL PRIVILEGES on the connection.

To create a Salesforce connection, do the following:

  1. In the Databricks workspace, click Catalog > External locations > Connections > Create connection.

  2. For Connection name, specify a unique name for the Salesforce connection.

  3. For Connection type, click Salesforce.

  4. Set Auth type to OAuth and fill out the following fields:

    • Set the client ID to the consumer key that you retrieved from Salesforce.

    • Set the client secret to the consumer secret that you retrieved from Salesforce.

    • Set the OAuth scope to the literal string api  refresh_token.

  5. If you’re ingesting from a Salesforce sandbox account, set Is sandbox to true.

  6. Click Log in with Salesforce.

    Note

    Test connection test that the host is reachable. It does not test user credentials for correct username and password values.

    Salesforce login
  7. Log in with your Salesforce user account. When you log in and authorize, a refresh_token is created automatically.

  8. (Optional) If you’re using a Salesforce sandbox, click Use Custom Domain, then provide the sandbox URL and proceed to login.

    Use custom domain button
    Enter sandbox URL
  9. After returning to the Create Connection page, click Create.

Create a Delta Live Tables pipeline

This step describes how to create the ingestion pipeline. Each ingested table corresponds to a streaming table with the same name (but all lowercase) in the destination by default, unless you explicitly rename it.

  1. Generate a personal access token.

  2. Paste the following code into a Python notebook cell:

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. Paste the following code into a second notebook cell:

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. Paste the following code into a third notebook cell:

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "table": {
             "source_schema": "objects",
             "source_table": "<YOUR_FIRST_TABLE>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>"
             }
          },
          {
             "table": {
             "source_schema": "objects",
             "source_table": "YOUR_SECOND_TABLE",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>"
             }
           }
         ]
       }
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. Run the first notebook cell without modifying it.

  6. Modify the second cell of the template notebook with the details of your pipeline. For example, the tables you want to ingest and the destinations.

  7. Run the second cell of the template notebook. This runs create_pipeline.

    1. You can run list_pipeline to show the pipeline id and its details

    2. You can run edit_pipeline to edit the pipeline definition.

    3. You can run delete_pipeline to delete the pipeline.

To create the pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

To update the pipeline:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

To get the pipeline definition:

databricks pipelines get "<pipeline-id>"

To delete the pipeline:

databricks pipelines delete "<pipeline-id>"

For more information, you can run:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Start, schedule, and set alerts on your pipeline

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.

  4. To set alerts on the pipeline, click Schedule, click More options, and then add a notification.

  5. After ingestion completes, you can query your tables.

Note

When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.