Configure data source for Microsoft Dynamics 365 ingestion
This feature is in Public Preview.
This page describes how to set up Microsoft Dynamics 365 as a data source for ingestion into Databricks using Lakeflow Connect. The Dynamics 365 connector uses Azure Synapse Link for Dataverse to export data to Azure Data Lake Storage (ADLS) Gen2, which Databricks then ingests.
Prerequisites
Before you configure the D365 data source, you must have:
- An active Azure subscription with permissions to create resources.
- A Microsoft Dynamics 365 environment with administrator access.
- A Dataverse environment associated with your D365 instance.
- Workspace administrator or metastore administrator permissions in Databricks.
- Permissions to create and configure Azure Synapse Link in your Dataverse environment.
- An ADLS Gen2 storage account (or permissions to create one).
- Permissions to create and configure Microsoft Entra ID applications.
- Dataverse API v9.2 or later.
- Azure Storage REST API version 2021-08-06.
- Azure Synapse Link for Dataverse version 1.0 or later.
Which Dynamics 365 applications are supported?
The Dynamics 365 connector supports two categories of applications:
Dataverse-native applications: Apps such as the following store data directly in Dataverse and don't require virtual entities or direct tables:
- Dynamics 365 Sales
- Dynamics 365 Customer Service
- Dynamics 365 Marketing
- Dynamics 365 Field Service
Non-Dataverse-native applications: Apps such as the following require either virtual entities or direct tables to expose data in Dataverse:
- Dynamics 365 Finance & Operations (F&O)
Step 1: Configure virtual entities or direct tables (optional)
Virtual entities and direct tables make data from non-Dataverse sources (like D365 Finance & Operations) available in Dataverse without copying the data. For non-Dataverse sources, you must configure virtual entities or direct tables before setting up Azure Synapse Link.
To configure virtual entities:
- In Power Apps, go to the Environments page, then click Dynamics 365 apps.
- To link F&O entities as virtual entities in Dataverse, install the Finance and Operations Virtual Entity solution.
- Set up Service To Service (S2S) authorization between Dataverse and your F&O application. This lets Dataverse communicate with your application. For details, see the Microsoft documentation:
- For every virtual entity that you wish to ingest, enable Track Changes under Advanced Properties.
- By default, the F&O Virtual Entity solution exposes some virtual entities by default in the list of Dataverse tables. However, you can expose additional entities manually:
- Go to your Dataverse environment's Advanced Settings page.
- Click the filter icon in the top right to access the advanced search.
- Select Available Finance and Operation Entities from the drop-down menu, then click Results.
- Select the virtual entity that you want to expose.
- On the Entity Admin page, toggle Visible to True, then click Save and Close.
You can now see the entity in the list of Dataverse tables with a name that starts with mserp_.
Virtual entities and direct tables must be properly configured and synchronized before they appear in Azure Synapse Link. Allow some time for them to become available.
Step 2: Configure Azure Synapse Link
In this step, you'll use the Synapse Link for Dataverse to Azure Data Lake to choose the tables that you want to ingest.
Synapse Link for Dataverse to Azure Data Lake replaces the service formerly known as Export data to Azure Data Lake Storage Gen2. Despite the naming, this feature doesn't use or depend on Azure Synapse Analytics; it is a continuous export service from Dataverse to ADLS Gen 2.
By default, Microsoft currently supports up to 1,000 tables per Synapse Link profile. If your application has more tables selected, you'll need to create multiple profiles.
-
In the Power Apps Portal, click Analyze, then Link To Azure Synapse.
-
Click New Link. Dataverse will automatically populate your active subscriptions from the same tenant. Select the appropriate subscription from the dropdown.
-
Don't select the Connect to your Azure Synapse Analytics Workspace checkbox. (The data must land in CSV because the connector doesn't currently support Parquet.)
noteTo use this connector, you can't add Dataverse tables to an existing storage account that's already linked to a different Synapse Link profile. You must have access to an unlinked Azure subscription so that you can create a new Synapse Link profile.
-
On the Synapse Link Creation Page, click Advanced. Then, toggle Show Advanced Configuration Settings.
-
Toggle Enable Incremental Update Folder Structure, and set the desired Synapse Link update interval. The minimum is 5 minutes. This interval applies to all tables included in this Synapse Link. (You'll set a schedule for your Databricks pipeline in a separate step.)
-
Select the tables that you want to sync—leaving the Append only and Partition settings as the default.
- If ingesting from a Dataverse-native app, select the relevant Dataverse tables directly from the Dataverse section.
- If ingesting from F&O, you can either select direct tables from the D365 Finance & Operations section or virtual entities from the Dataverse section (prefix
mserp_). For more information about virtual entities, see Step 1.
-
Click Save.
-
After you save your changes, the initial Synapse Link sync should begin.
noteFor F&O users, this initial sync can take hours for large tables with hundreds of gigabytes. But if the initial sync for an entity takes too long, you can speed it up by creating an index on the table via the F&O app.
- Navigate to the table that you want to index in the F&O environment.
- Create an extension for the table.
- Within the table extension, define a new index.
- Add the fields that you want to include in the index. (This helps in speeding up the database search based on these fields.)
- Save and deploy the changes to your F&O environment.
Step 3: Create an Entra ID application for ingestion
In this step, you'll collect the Entra ID information needed to create a Unity Catalog connection that supports ingestion into Databricks.
-
Collect the tenant ID of your Entra ID tenant (
portal.azure.com>> Microsoft Entra ID >> Overview tab >> Tenant ID, listed on the right-hand panel). -
When you create an Azure Synapse Link, Azure Synapse creates an ADLS container for syncing the selected tables. Locate the ADLS container name by visiting the Synapse Link’s Admin Page.
-
Collect the access credentials for the ADLS container.
- Create a Microsoft Entra ID App, if you don't have one already.
- Collect the client secret.
- Collect the App ID (
portal.azure.com>> Microsoft Entra ID >> Manage >> App Registrations).
-
Grant the Entra ID App access to the ADLS container, if you haven't already.
noteEnsure that your Entra ID application has access to the ADLS containers associated with each Synapse Link profile. If you're ingesting data from multiple environments or applications, confirm that the application has role assignments on all relevant containers.
-
Go to Azure Storage Accounts and select your container or storage account. (Databricks recommends the container level to maintain least privileges.)
-
Click Access Control (IAM), then Add role assignment.
-
Select the Storage Blob Contributor → Read/Write/Delete access role. (If your organization doesn't allow this, reach out to your Databricks account team.)
-
Click Next, then Select Members.
-
Choose User, group, or service principal, then Search for your App Registration. (If the app is not present in the search result, you can explicitly enter its object ID in the search bar, then press Enter).
-
Click Review + Assign.
-
To confirm that the permissions are configured correctly, you can check your container’s Access Control.
-
Step 4: Create a Dynamics 365 pipeline
Option A: use the UI
In the left-hand menu, click New, then Add or upload data.
On the Add data page, click the Dynamics 365 tile.
Follow the instructions in the wizard from there.
Option B: use the API
B1. Create a Dynamics 365 connection In this step, you'll create a Unity Catalog connection to securely store your D365 credentials and begin ingestion into Databricks.
-
In your workspace, click Catalog >> External Data >> Connections >> Create Connection.
-
Provide a unique Connection name, then select Dynamics 365 as the Connection type.
-
Enter the Client secret and Client ID of the Entra ID App created in the previous step. Don't modify the scope. Click Next.
-
Enter the Azure Storage Account Name, Tenant ID, and ADLS container name, then click Create Connection.
-
Make note of the connection name.
B2. Create a Dynamics 365 pipeline
In this step, you'll set up the ingestion pipeline. Each ingested table gets a corresponding streaming table with the same name in the destination.
There are two options for creating the ingestion pipeline:
- Use a notebook
- Use the Databricks command-line interface (CLI)
Both approaches make API calls to a Databricks service that creates the pipeline.
If you prefer to use a notebook:
- Copy the notebook template in the Appendix.
- Run the first cell of the notebook without modifying it.
- Modify the second cell of the notebook with the details of your pipeline (for example, the table that you want to ingest from, where you want to store the data, etc.).
- Run the second cell of the template notebook; this runs
create_pipeline. - You can run
list_pipelineto show the pipeline id and its details. - You can run
edit_pipelineto edit the pipeline definition. - You can run
delete_pipelineto delete the pipeline.
If you prefer to use the Databricks CLI:
To create the pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
To edit the pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
To get the pipeline definition:
databricks pipelines get "<your_pipeline_id>"
To delete the pipeline:
databricks pipelines delete "<your_pipeline_id>"
For more information, you can always run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
B3. Start, schedule, and set alerts on your pipeline
After the pipeline is created, revisit the Databricks workspace.
- In the left-hand panel, click Pipelines. Your new pipeline should appear on this list.
- To view the pipeline details, click on the pipeline's name.
- On the pipeline details page, you can immediately run your pipeline by clicking Start. You can also schedule the pipeline by clicking Schedule. For details on scheduling, see Pipeline task for jobs.
You can also set notifications on your pipeline by adding them to your schedule(s).
- From the pipeline details page, click Schedule. Then, select one of your schedules.
- You'll land on the job that powers the pipeline's schedule. In the right-hand panel, set up any desired notifications under Job notifications.
Finally, you can add notifications for any given schedule:
- To create a new schedule, click Schedule.
- To add a notification to that schedule, click More options.
Step 5: Configure additional features (optional)
The connector offers additional features, such as SCD type 2 for history tracking, column-level selection and deselection, and Databricks Asset Bundles for CI/CD. See the Appendix for examples.
Appendix
Notebook template
Cell 1
Do not modify this cell.
# DO NOT MODIFY
# This sets up the API utils for creating managed ingestion pipelines in Databricks.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
def stop_pipeline(id: str):
print("cannot stop pipeline")
Cell 2
Don't modify the PREVIEW channel in the code below.
If you want to ingest all the tables synced by your Azure Synapse Link, use the schema-level spec. (Note, however, that Databricks doesn't recommend adding more than 250 tables per pipeline.) If you only want to ingest specific tables, use the table-level spec.
Ensure that the source table name matches the table name that appears in the Name column on the Synapse Link Manage Page.
# Option A: schema-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
# Option B: table-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
Example: SCD type 2
By default, the API uses SCD type 1. This means that it overwrites the data in the destination if it's edited in the source. If you prefer to preserve historical data and use SCD type 2, then specify that in the configuration. For example:
# Schema-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
# Table-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
Example: Column-level selection and deselection
By default, the API ingests all the columns in your selected table. However, you can choose to include or exclude specific columns. For example:
# Table spec with included and excluded columns.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"include_columns": ["<COLUMN_A>", "<COLUMN_B>", "<COLUMN_C>"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)