Ingest data from Salesforce
This page describes how to ingest data from Salesforce and load it into Databricks using Lakeflow Connect.
Before you begin
To create an ingestion pipeline, you must meet the following requirements:
- Your workspace is enabled for Unity Catalog.
- Serverless compute is enabled for your workspace. See Enable serverless compute.
- If you plan to create a connection: You have
CREATE CONNECTION
privileges on the metastore. - If you plan to use an existing connection: You have
USE CONNECTION
privileges orALL PRIVILEGES
on the connection object. - You have
USE CATALOG
privileges on the target catalog. - You have
USE SCHEMA
andCREATE TABLE
privileges on an existing schema orCREATE SCHEMA
privileges on the target catalog.
To ingest from Salesforce, the following is recommended:
- Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.
Create an ingestion pipeline
Permissions required: USE CONNECTION
or ALL PRIVILEGES
on a connection.
This step describes how to create the ingestion pipeline. Each ingested table is written to a streaming table with the same name (but all lowercase).
- Databricks UI
- Databricks Asset Bundles
- Databricks CLI
-
In the sidebar of the Databricks workspace, click Data Ingestion.
-
On the Add data page, under Databricks connectors, click Salesforce.
The Salesforce ingestion wizard opens.
-
On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.
-
In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.
-
Select the Unity Catalog connection that stores the credentials required to access Salesforce data.
If there are no Salesforce connections, click Create connection. You must have the
CREATE CONNECTION
privilege on the metastore. -
Click Create pipeline and continue.
-
On the Source page, select the tables to ingest, and then click Next.
If you select All tables, the Salesforce ingestion connector writes all existing and future tables in the source schema to the destination schema. There is a maximum of 250 objects per pipeline.
-
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don't want to use an existing schema, click Create schema. You must have the
USE CATALOG
andCREATE SCHEMA
privileges on the parent catalog. -
Click Save pipeline and continue.
-
(Optional) On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.
-
(Optional) Set email notifications for pipeline operation success or failure.
-
Click Save and run pipeline.
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init
-
Add two new resource files to the bundle:
- A pipeline definition file (
resources/sfdc_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/sfdc_job.yml
).
The following is an example
resources/sfdc_pipeline.yml
file:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for sfdc_dab
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <salesforce-connection>
objects:
# An array of objects to ingest from Salesforce. This example
# ingests the AccountShare, AccountPartner, and ApexPage objects.
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: ApexPage
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}The following is an example
resources/sfdc_job.yml
file:YAMLresources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:
include_columns
: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.exclude_columns
: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
To create the pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
To update the pipeline:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
To get the pipeline definition:
databricks pipelines get "<pipeline-id>"
To delete the pipeline:
databricks pipelines delete "<pipeline-id>"
For more information, you can run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Example JSON pipeline definition
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
Start, schedule, and set alerts on your pipeline
You can create a schedule for the pipeline on the pipeline details page.
-
After the pipeline has been created, revisit the Databricks workspace, and then click Pipelines.
The new pipeline appears in the pipeline list.
-
To view the pipeline details, click the pipeline name.
-
On the pipeline details page, you can schedule the pipeline by clicking Schedule.
-
To set notifications on the pipeline, click Settings, and then add a notification.
For each schedule that you add to a pipeline, Lakeflow Connect automatically creates a job for it. The ingestion pipeline is a task within the job. You can optionally add more tasks to the job.
When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.
Example: Ingest two Salesforce objects into separate schemas
The example pipeline definition in this section ingests two Salesforce objects into separate schemas. Multi-destination pipeline support is API-only.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Example: Ingest one Salesforce object three times
The example pipeline definition in this section ingests a Salesforce object into three different destination tables. Multi-destination pipeline support is API-only.
You can optionally rename a table that you ingest. If you rename a table in your pipeline, it becomes an API-only pipeline, and you can no longer edit the pipeline in the UI.
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename