Ingest data from Salesforce
Learn how to ingest data from Salesforce into Databricks using Lakeflow Connect.
Requirements
-
To create an ingestion pipeline, you must first meet the following requirements:
-
Your workspace must be enabled for Unity Catalog.
-
Serverless compute must be enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a new connection: You must have
CREATE CONNECTIONprivileges on the metastore. See Manage privileges in Unity Catalog.If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You must have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection object. -
You must have
USE CATALOGprivileges on the target catalog. -
You must have
USE SCHEMAandCREATE TABLEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
-
-
Salesforce applies usage restrictions to connected apps. The permissions in the following table are required for a successful first-time authentication. If you lack these permissions, Salesforce blocks the connection and requires an admin to install the Databricks connected app.
Condition
Required permission
API Access Control is enabled.
Customize Applicationand eitherModify All DataorManage Connected AppsAPI Access Control is not enabled.
Approve Uninstalled Connected AppsFor background, see Prepare for Connected App Usage Restrictions Change in the Salesforce documentation.
-
To ingest from Salesforce, the following is recommended:
-
Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.
Create an ingestion pipeline
You can filter rows during ingestion to improve performance and reduce data duplication. See Select rows to ingest.
- Databricks UI
- Databricks Asset Bundles
- In the sidebar of the Databricks workspace, click Data Ingestion.
- On the Add data page, under Databricks connectors, click Salesforce.
- On the Connection page of the ingestion wizard, select the connection that stores your Salesforce access credentials. If you have the
CREATE CONNECTIONprivilege on the metastore, you can clickCreate connection to create a new connection with the authentication details in Salesforce.
- Click Next.
- On the Ingestion setup page, enter a unique name for the pipeline.
- Select a catalog and a schema to write event logs to. If you have
USE CATALOGandCREATE SCHEMAprivileges on the catalog, you can clickCreate schema in the drop-down menu to create a new schema.
- Click Create pipeline and continue.
- On the Source page, select the tables to ingest. If you select All tables, all existing and future tables in the source schema will be ingested.
- Click Save and continue.
- On the Destination page, select a catalog and a schema to load data into. (Optional) If you have
USE CATALOGandCREATE SCHEMAprivileges on the catalog, you can clickCreate schema in the drop-down menu to create a new schema.
- Click Save and continue.
- (Optional) On the Schedules and notifications page, click
Create schedule. Set the frequency to refresh the destination tables.
- (Optional) Click
Add notification to set email notifications for pipeline operation success or failure, then click Save and run pipeline.
Use Declarative Automation Bundles to manage Salesforce pipelines as code. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Declarative Automation Bundles?.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init -
Add two new resource files to the bundle:
- A pipeline definition file (for example,
resources/sfdc_pipeline.yml). See pipeline.ingestion_definition and Examples. - A job definition file that controls the frequency of data ingestion (for example,
resources/sfdc_job.yml).
- A pipeline definition file (for example,
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
Examples
Use these examples to configure your pipeline.
Ingest formula fields incrementally
This feature is in Beta.
By default, formula fields are ingested using full snapshots on each pipeline run. However, you can enable incremental ingestion for formula fields by setting the flag pipelines.enableSalesforceFormulaFieldsMVComputation: "true" in the configuration block of your pipeline definition.
The following pipeline definition file enables incremental formula field ingestion:
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog
schema: my_schema
configuration:
pipelines.enableSalesforceFormulaFieldsMVComputation: 'true'
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Account
destination_catalog: my_catalog
destination_schema: my_schema
For more information, see Ingest Salesforce formula fields incrementally.
Ingest two Salesforce objects into separate schemas
The following pipeline definition file ingests two Salesforce objects into separate schemas:
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
Ingest one Salesforce object three times
The following pipeline definition file ingests a Salesforce object into three different destination tables. You can optionally give ingested tables a new name to differentiate when multiples are ingested into the same destination schema (duplicates aren't supported).
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename
Bundle job definition file
The following is an example job definition file to use with Declarative Automation Bundles. The job runs every day, exactly one day from the last run.
resources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
Common patterns
For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.
Next steps
Start, schedule, and set alerts on your pipeline. See Common pipeline maintenance tasks.