Skip to main content

Ingest data from Salesforce

This page describes how to ingest data from Salesforce and load it into Databricks using Lakeflow Connect.

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.
  • Serverless compute is enabled for your workspace. See Enable serverless compute.
  • If you plan to create a connection: You have CREATE CONNECTION privileges on the metastore.
  • If you plan to use an existing connection: You have USE CONNECTION privileges or ALL PRIVILEGES on the connection object.
  • You have USE CATALOG privileges on the target catalog.
  • You have USE SCHEMA and CREATE TABLE privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

To ingest from Salesforce, the following is recommended:

  • Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.

Create an ingestion pipeline

Permissions required: USE CONNECTION or ALL PRIVILEGES on a connection.

This step describes how to create the ingestion pipeline. Each ingested table is written to a streaming table with the same name (but all lowercase).

  1. In the sidebar of the Databricks workspace, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click Salesforce.

    The Salesforce ingestion wizard opens.

  3. On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.

  4. In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.

  5. Select the Unity Catalog connection that stores the credentials required to access Salesforce data.

    If there are no Salesforce connections, click Create connection. You must have the CREATE CONNECTION privilege on the metastore.

  6. Click Create pipeline and continue.

  7. On the Source page, select the tables to ingest, and then click Next.

    If you select All tables, the Salesforce ingestion connector writes all existing and future tables in the source schema to the destination schema. There is a maximum of 250 objects per pipeline.

  8. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have the USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  9. Click Save pipeline and continue.

  10. (Optional) On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.

  11. (Optional) Set email notifications for pipeline operation success or failure.

  12. Click Save and run pipeline.

Example JSON pipeline definition

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"table": {

"source_schema": "<source-schema>",

"source_table": "<source-table>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

Start, schedule, and set alerts on your pipeline

You can create a schedule for the pipeline on the pipeline details page.

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Pipelines.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, you can schedule the pipeline by clicking Schedule.

  4. To set notifications on the pipeline, click Settings, and then add a notification.

For each schedule that you add to a pipeline, Lakeflow Connect automatically creates a job for it. The ingestion pipeline is a task within the job. You can optionally add more tasks to the job.

note

When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.

Example: Ingest two Salesforce objects into separate schemas

The example pipeline definition in this section ingests two Salesforce objects into separate schemas. Multi-destination pipeline support is API-only.

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table

Example: Ingest one Salesforce object three times

The example pipeline definition in this section ingests a Salesforce object into three different destination tables. Multi-destination pipeline support is API-only.

You can optionally rename a table that you ingest. If you rename a table in your pipeline, it becomes an API-only pipeline, and you can no longer edit the pipeline in the UI.

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename

Additional resources