Ingest data from Salesforce

Preview

LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.

This article describes how to ingest data from Salesforce and load it into Databricks using LakeFlow Connect. The resulting ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.

The Salesforce ingestion connector supports the following source:

  • Salesforce Sales Cloud

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.

  • Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.

  • To create a connection: You have CREATE CONNECTION on the metastore.

    To use an existing connection: You have USE CONNECTION or ALL PRIVILEGES on the connection object.

  • USE CATALOG on the target catalog.

  • USE SCHEMA and CREATE TABLE on an existing schema or CREATE SCHEMA on the target catalog.

  • (Recommended) Create a Salesforce user that Databricks can use to retrieve data. Make sure that the user has API access and access to all of the objects that you plan to ingest.

Create a Salesforce connection

Permissions required: CREATE CONNECTION on the metastore. Contact a metastore admin to grant this.

If you want to create an ingestion pipeline using an existing connection, skip to the following section. You need USE CONNECTION or ALL PRIVILEGES on the connection.

To create a Salesforce connection, do the following:

  1. In the Databricks workspace, click Catalog > External locations > Connections > Create connection.

  2. For Connection name, specify a unique name for the Salesforce connection.

  3. For Connection type, click Salesforce.

  4. Set Auth type to OAuth.

  5. If you’re ingesting from a Salesforce sandbox account, set Is sandbox to true.

  6. Click Log in with Salesforce.

    Salesforce login
  7. If you’re ingesting from a Salesforce sandbox, click Use Custom Domain. Provide the sandbox URL, and then proceed to login. Databricks recommends logging in as a Salesforce user that’s dedicated to Databricks ingestion.

    Use custom domain button
    Enter sandbox URL
  8. After returning to the Create Connection page, click Create.

Create an ingestion pipeline

Permissions required: USE CONNECTION or ALL PRIVILEGES on a connection.

This step describes how to create the ingestion pipeline. Each ingested table corresponds to a streaming table with the same name (but all lowercase) in the destination by default, unless you explicitly rename it.

  1. In the sidebar of the Databricks workspace, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click Salesforce.

    The Salesforce ingestion wizard opens.

  3. On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.

  4. In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.

  5. Select the Unity Catalog connection that stores the credentials required to access Salesforce data.

    If there are no Salesforce connections, click Create connection. You must have the CREATE CONNECTION privilege on the metastore.

  6. Click Create pipeline and continue.

  7. On the Source page, select the Salesforce tables to ingest into Databricks, and then click Next.

    If you select a schema, the Salesforce ingestion connector writes all existing and future tables in the source schema to Unity Catalog managed tables.

  8. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don’t want to use an existing schema, click Create schema. You must have the USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  9. Click Save pipeline and continue.

  10. On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.

  11. Optionally, set email notifications for pipeline operation success or failure.

  12. Click Save and run pipeline.

This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DABs). Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.

  1. Create a new bundle using the Databricks CLI:

    databricks bundle init
    
  2. Add two new resource files to the bundle:

    • A pipeline definition file (resources/sfdc_pipeline.yml).

    • A workflow file that controls the frequency of data ingestion (resources/sfdc_job.yml).

    The following is an example resources/sfdc_pipeline.yml file:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    The following is an example resources/sfdc_job.yml file:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Deploy the pipeline using the Databricks CLI:

    databricks bundle deploy
    

To create the pipeline:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

To update the pipeline:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

To get the pipeline definition:

databricks pipelines get "<pipeline-id>"

To delete the pipeline:

databricks pipelines delete "<pipeline-id>"

For more information, you can run:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Start, schedule, and set alerts on your pipeline

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.

  4. To set alerts on the pipeline, click Schedule, click More options, and then add a notification.

  5. After ingestion completes, you can query your tables.

Note

When the pipeline runs, you might see two source views for a given table. One view contains the snapshots for formula fields. The other view contains the incremental data pulls for non-formula fields. These views are joined in the destination table.