Create a Microsoft Dynamics 365 ingestion pipeline
This feature is in Public Preview.
This page describes how to create a managed ingestion pipeline that syncs data from Microsoft Dynamics 365 into Delta Lake tables using Lakeflow Connect.
Supported interfaces
You can create D365 ingestion pipelines using:
- Databricks notebooks
- Databricks CLI
- Databricks Asset Bundles
At this time, the Dynamics 365 connector doesn't support UI-based pipeline authoring. Use one of the programmatic interfaces listed above.
Requirements
Before creating a pipeline, complete the following prerequisites:
- Configure your D365 data source with Azure Synapse Link and Microsoft Entra ID authentication.
- Create a Unity Catalog connection for D365 in Catalog Explorer.
- Identify the Dataverse environment URL or ID (your
source_schema). - Identify the logical names of D365 tables you want to ingest (your
source_tablevalues). - Decide on your SCD type (Type 1 or Type 2).
- Create or identify target Unity Catalog catalog and schema.
To find table logical names, use the Dataverse API or Power Apps maker portal. In Power Apps, go to Tables and view the Logical name column.
Option 1: Databricks notebook
Use the following notebook code to create a D365 ingestion pipeline:
# Import the ingestion API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import IngestionPipelineDefinition
# Initialize the workspace client
w = WorkspaceClient()
# Define the pipeline configuration
pipeline_config = IngestionPipelineDefinition(
# Required: Use PREVIEW channel
channel="PREVIEW",
# Your Unity Catalog connection name
connection_name="d365_connection",
# Dataverse environment URL or ID
source_schema="https://yourorg.crm.dynamics.com",
# List of D365 tables to ingest (logical names)
source_table=[
"account", # Accounts
"contact", # Contacts
"opportunity", # Opportunities
"salesorder" # Sales Orders
],
# Target location in Unity Catalog
destination_catalog="main",
destination_schema="d365_data",
# History tracking: SCD_TYPE_1 (history tracking off) or SCD_TYPE_2 (history tracking on)
scd_type="SCD_TYPE_2"
)
# Create the pipeline
pipeline = w.pipelines.create(
name="d365_sales_ingestion",
ingestion_definition=pipeline_config
)
print(f"Pipeline created with ID: {pipeline.pipeline_id}")
Ingesting multiple schemas
To ingest tables from multiple Dataverse environments, create separate pipelines:
# Pipeline for production environment
prod_pipeline = w.pipelines.create(
name="d365_prod_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://prod.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_prod",
scd_type="SCD_TYPE_2"
)
)
# Pipeline for test environment
test_pipeline = w.pipelines.create(
name="d365_test_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://test.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_test",
scd_type="SCD_TYPE_2"
)
)
Selecting specific columns
To ingest only specific columns from source tables, use column selection:
pipeline_config = IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://yourorg.crm.dynamics.com",
source_table=["account"],
destination_catalog="main",
destination_schema="d365_data",
scd_type="SCD_TYPE_1",
# Specify columns to include
table_configuration={
"account": {
"columns": [
"accountid",
"name",
"accountnumber",
"emailaddress1",
"telephone1",
"address1_city",
"address1_stateorprovince"
]
}
}
)
Option 2: Databricks CLI
Use the Databricks CLI to create pipelines from a configuration file.
Create a file named d365-pipeline.json:
{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": ["account", "contact", "opportunity", "salesorder"],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}
Create the pipeline using the CLI:
databricks pipelines create --json @d365-pipeline.json
Option 3: Databricks Asset Bundles
Use Databricks Asset Bundles to manage D365 pipelines as code.
Create a file named databricks.yml:
resources:
pipelines:
d365_sales_ingestion:
name: 'd365_sales_ingestion'
ingestion_definition:
channel: 'PREVIEW'
connection_name: 'd365_connection'
source_schema: 'https://yourorg.crm.dynamics.com'
source_table:
- 'account'
- 'contact'
- 'opportunity'
- 'salesorder'
destination_catalog: 'main'
destination_schema: 'd365_data'
scd_type: 'SCD_TYPE_2'
Deploy the bundle:
databricks bundle deploy
Option 4: Databricks APIs
Use the Pipelines API to create D365 ingestion pipelines.
Example API request:
curl -X POST \
https://<workspace-url>/api/2.0/pipelines \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": [
"account",
"contact",
"opportunity",
"salesorder"
],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}'
Verify pipeline creation
After creating the pipeline:
- Navigate to Jobs & Pipelines in your workspace.
- Locate your pipeline by name.
- Select the pipeline to view details.
- Select Start to run the initial ingestion.
- Monitor the pipeline run and verify that the pipeline creates tables in your target schema.
To verify the ingested data:
-- Check the account table
SELECT * FROM main.d365_data.account LIMIT 10;
-- Verify record counts
SELECT COUNT(*) FROM main.d365_data.account;
The initial pipeline run performs a full refresh of all selected tables. Subsequent runs use incremental ingestion based on the VersionNumber cursor from Azure Synapse Link changelogs.
Next steps
After creating your pipeline:
- Schedule pipeline updates to run automatically.
- Monitor pipeline health and set up alerts.
- Perform a full refresh if you need to reload all data.
- Review troubleshooting guidance if issues occur.
- Understand schema evolution behavior for handling source changes.