Skip to main content

Create a Microsoft Dynamics 365 ingestion pipeline

Preview

This feature is in Public Preview.

This page describes how to create a managed ingestion pipeline that syncs data from Microsoft Dynamics 365 into Delta Lake tables using Lakeflow Connect.

Supported interfaces

You can create D365 ingestion pipelines using:

  • Databricks notebooks
  • Databricks CLI
  • Databricks Asset Bundles
note

At this time, the Dynamics 365 connector doesn't support UI-based pipeline authoring. Use one of the programmatic interfaces listed above.

Requirements

Before creating a pipeline, complete the following prerequisites:

  • Configure your D365 data source with Azure Synapse Link and Microsoft Entra ID authentication.
  • Create a Unity Catalog connection for D365 in Catalog Explorer.
  • Identify the Dataverse environment URL or ID (your source_schema).
  • Identify the logical names of D365 tables you want to ingest (your source_table values).
  • Decide on your SCD type (Type 1 or Type 2).
  • Create or identify target Unity Catalog catalog and schema.
tip

To find table logical names, use the Dataverse API or Power Apps maker portal. In Power Apps, go to Tables and view the Logical name column.

Option 1: Databricks notebook

Use the following notebook code to create a D365 ingestion pipeline:

Python
# Import the ingestion API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import IngestionPipelineDefinition

# Initialize the workspace client
w = WorkspaceClient()

# Define the pipeline configuration
pipeline_config = IngestionPipelineDefinition(
# Required: Use PREVIEW channel
channel="PREVIEW",

# Your Unity Catalog connection name
connection_name="d365_connection",

# Dataverse environment URL or ID
source_schema="https://yourorg.crm.dynamics.com",

# List of D365 tables to ingest (logical names)
source_table=[
"account", # Accounts
"contact", # Contacts
"opportunity", # Opportunities
"salesorder" # Sales Orders
],

# Target location in Unity Catalog
destination_catalog="main",
destination_schema="d365_data",

# History tracking: SCD_TYPE_1 (history tracking off) or SCD_TYPE_2 (history tracking on)
scd_type="SCD_TYPE_2"
)

# Create the pipeline
pipeline = w.pipelines.create(
name="d365_sales_ingestion",
ingestion_definition=pipeline_config
)

print(f"Pipeline created with ID: {pipeline.pipeline_id}")

Ingesting multiple schemas

To ingest tables from multiple Dataverse environments, create separate pipelines:

Python
# Pipeline for production environment
prod_pipeline = w.pipelines.create(
name="d365_prod_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://prod.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_prod",
scd_type="SCD_TYPE_2"
)
)

# Pipeline for test environment
test_pipeline = w.pipelines.create(
name="d365_test_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://test.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_test",
scd_type="SCD_TYPE_2"
)
)

Selecting specific columns

To ingest only specific columns from source tables, use column selection:

Python
pipeline_config = IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://yourorg.crm.dynamics.com",
source_table=["account"],
destination_catalog="main",
destination_schema="d365_data",
scd_type="SCD_TYPE_1",

# Specify columns to include
table_configuration={
"account": {
"columns": [
"accountid",
"name",
"accountnumber",
"emailaddress1",
"telephone1",
"address1_city",
"address1_stateorprovince"
]
}
}
)

Option 2: Databricks CLI

Use the Databricks CLI to create pipelines from a configuration file.

Create a file named d365-pipeline.json:

JSON
{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": ["account", "contact", "opportunity", "salesorder"],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}

Create the pipeline using the CLI:

Bash
databricks pipelines create --json @d365-pipeline.json

Option 3: Databricks Asset Bundles

Use Databricks Asset Bundles to manage D365 pipelines as code.

Create a file named databricks.yml:

YAML
resources:
pipelines:
d365_sales_ingestion:
name: 'd365_sales_ingestion'
ingestion_definition:
channel: 'PREVIEW'
connection_name: 'd365_connection'
source_schema: 'https://yourorg.crm.dynamics.com'
source_table:
- 'account'
- 'contact'
- 'opportunity'
- 'salesorder'
destination_catalog: 'main'
destination_schema: 'd365_data'
scd_type: 'SCD_TYPE_2'

Deploy the bundle:

Bash
databricks bundle deploy

Option 4: Databricks APIs

Use the Pipelines API to create D365 ingestion pipelines.

Example API request:

Bash
curl -X POST \
https://<workspace-url>/api/2.0/pipelines \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": [
"account",
"contact",
"opportunity",
"salesorder"
],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}'

Verify pipeline creation

After creating the pipeline:

  1. Navigate to Jobs & Pipelines in your workspace.
  2. Locate your pipeline by name.
  3. Select the pipeline to view details.
  4. Select Start to run the initial ingestion.
  5. Monitor the pipeline run and verify that the pipeline creates tables in your target schema.

To verify the ingested data:

SQL
-- Check the account table
SELECT * FROM main.d365_data.account LIMIT 10;

-- Verify record counts
SELECT COUNT(*) FROM main.d365_data.account;
note

The initial pipeline run performs a full refresh of all selected tables. Subsequent runs use incremental ingestion based on the VersionNumber cursor from Azure Synapse Link changelogs.

Next steps

After creating your pipeline: