Ingest data from Salesforce

Preview

LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.

This article describes how to ingest data from Salesforce and load it into Databricks using LakeFlow Connect. The resulting ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.

The Salesforce ingestion connector supports the following source:

  • Salesforce Sales Cloud

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.

  • Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.

  • To create a connection: You have CREATE CONNECTION on the metastore.

    To use an existing connection: You have USE CONNECTION or ALL PRIVILEGES on the connection object.

  • USE CATALOG on the target catalog.

  • USE SCHEMA and CREATE TABLE on an existing schema or CREATE SCHEMA on the target catalog.

  • (Recommended) Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.

Create a Salesforce connection

Permissions required: CREATE CONNECTION on the metastore. Contact a metastore admin to grant this.

To use an existing connection instead, you need USE CONNECTION or ALL PRIVILEGES on the connection.

To create a Salesforce connection, do the following:

  1. In the Databricks workspace, click Catalog > External locations > Connections > Create connection.

  2. For Connection name, specify a unique name for the Salesforce connection.

  3. For Connection type, click Salesforce.

  4. Set Auth type to OAuth.

  5. If you’re ingesting from a Salesforce sandbox account, set Is sandbox to true.

  6. Click Log in with Salesforce.

    Salesforce login
  7. If you’re ingesting from a Salesforce sandbox, click Use Custom Domain. Provide the sandbox URL, and then proceed to login. Databricks recommends logging in as a Salesforce user that’s dedicated to Databricks ingestion.

    Use custom domain button
    Enter sandbox URL
  8. After returning to the Create Connection page, click Create.

Create an ingestion pipeline

This step describes how to create the ingestion pipeline. Each ingested table corresponds to a streaming table with the same name (but all lowercase) in the destination by default, unless you explicitly rename it.

  1. Generate a personal access token.

  2. Paste the following code into a Python notebook cell:

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. Paste the following code into a second notebook cell:

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. Paste the following code into a third notebook cell:

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    
    pipeline_spec = """
    {
    "name": "<pipeline-name>",
    "ingestion_definition": {
       "connection_name": "<connection-name>",
       "objects": [
          {
             "table": {
             "source_schema": "objects",
             "source_table": "<source-table1>",
             "destination_catalog": "<databricks-catalog>",
             "destination_schema": "<databricks-schema>",
             "destination_table": "<databricks-table>"
             }
          },
          {
             "table": {
             "source_schema": "objects",
             "source_table": "<source-table2>",
             "destination_catalog": "<databricks-catalog>",
             "destination_schema": "<databricks-schema>",
             "destination_table": "<databricks-table>"
             }
           }
         ]
       }
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. Run the first notebook cell without modifying it.

  6. Modify the second cell of the template notebook with the details of your pipeline. For example, the tables you want to ingest and the destinations.

  7. Run the second cell of the template notebook. This runs create_pipeline.

    1. You can run list_pipeline to show the pipeline id and its details.

    2. You can run edit_pipeline to edit the pipeline definition.

    3. You can run delete_pipeline to delete the pipeline.

To create the pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

To update the pipeline:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

To get the pipeline definition:

databricks pipelines get "<pipeline-id>"

To delete the pipeline:

databricks pipelines delete "<pipeline-id>"

For more information, you can run:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Start, schedule, and set alerts on your pipeline

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.

  4. To set alerts on the pipeline, click Schedule, click More options, and then add a notification.

  5. After ingestion completes, you can query your tables.

Note

When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.