Ingest data from Salesforce
Preview
LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.
This article describes how to ingest data from Salesforce and load it into Databricks using LakeFlow Connect. The resulting ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.
The Salesforce ingestion connector supports the following source:
Salesforce Sales Cloud
Before you begin
To create an ingestion pipeline, you must meet the following requirements:
Your workspace is enabled for Unity Catalog.
Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.
To create a connection: You have
CREATE CONNECTION
on the metastore.To use an existing connection: You have
USE CONNECTION
orALL PRIVILEGES
on the connection object.USE CATALOG
on the target catalog.USE SCHEMA
andCREATE TABLE
on an existing schema orCREATE SCHEMA
on the target catalog.(Recommended) Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.
Create a Salesforce connection
Permissions required: CREATE CONNECTION
on the metastore. Contact a metastore admin to grant this.
To use an existing connection instead, you need USE CONNECTION
or ALL PRIVILEGES
on the connection.
To create a Salesforce connection, do the following:
In the Databricks workspace, click Catalog > External locations > Connections > Create connection.
For Connection name, specify a unique name for the Salesforce connection.
For Connection type, click Salesforce.
Set Auth type to OAuth.
If you’re ingesting from a Salesforce sandbox account, set Is sandbox to
true
.Click Log in with Salesforce.
If you’re ingesting from a Salesforce sandbox, click Use Custom Domain. Provide the sandbox URL, and then proceed to login. Databricks recommends logging in as a Salesforce user that’s dedicated to Databricks ingestion.
After returning to the Create Connection page, click Create.
Create an ingestion pipeline
This step describes how to create the ingestion pipeline. Each ingested table corresponds to a streaming table with the same name (but all lowercase) in the destination by default, unless you explicitly rename it.
Paste the following code into a Python notebook cell:
# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
Paste the following code into a second notebook cell:
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
Paste the following code into a third notebook cell:
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<pipeline-name>", "ingestion_definition": { "connection_name": "<connection-name>", "objects": [ { "table": { "source_schema": "objects", "source_table": "<source-table1>", "destination_catalog": "<databricks-catalog>", "destination_schema": "<databricks-schema>", "destination_table": "<databricks-table>" } }, { "table": { "source_schema": "objects", "source_table": "<source-table2>", "destination_catalog": "<databricks-catalog>", "destination_schema": "<databricks-schema>", "destination_table": "<databricks-table>" } } ] } } """ create_pipeline(pipeline_spec)
Run the first notebook cell without modifying it.
Modify the second cell of the template notebook with the details of your pipeline. For example, the tables you want to ingest and the destinations.
Run the second cell of the template notebook. This runs
create_pipeline
.You can run list_pipeline to show the pipeline id and its details.
You can run edit_pipeline to edit the pipeline definition.
You can run delete_pipeline to delete the pipeline.
To create the pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
To update the pipeline:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
To get the pipeline definition:
databricks pipelines get "<pipeline-id>"
To delete the pipeline:
databricks pipelines delete "<pipeline-id>"
For more information, you can run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Start, schedule, and set alerts on your pipeline
After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.
The new pipeline appears in the pipeline list.
To view the pipeline details, click the pipeline name.
On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.
To set alerts on the pipeline, click Schedule, click More options, and then add a notification.
After ingestion completes, you can query your tables.
Note
When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.