メインコンテンツまでスキップ

Microsoft Dynamics 365 取り込みパイプラインを作成する

備考

プレビュー

この機能は パブリック プレビュー段階です。

このページでは、 LakeFlow Connectを使用してMicrosoft Dynamics 365 からDelta Lakeテーブルにデータを同期する管理された取り込みパイプラインを作成する方法について説明します。

サポートされているインターフェース

以下を使用して D365 取り込みパイプラインを作成できます。

  • Databricksノートブック
  • Databricks CLI
  • Databricksアセットバンドル
注記

現時点では、Dynamics 365 コネクタは UI ベースのパイプラインの作成をサポートしていません。上記のいずれかのプログラム インターフェイスを使用します。

要件

パイプラインを作成する前に、次の前提条件を完了してください。

  • Azure Synapse Link とMicrosoft Entra ID 認証を使用してD365 データ ソースを構成します
  • カタログ エクスプローラーで D365 用のUnity Catalog接続を作成します
  • Dataverse 環境の URL または ID ( source_schema ) を識別します。
  • 取り込む D365 テーブルの論理名 ( source_table値) を識別します。
  • SCD タイプ(タイプ 1 またはタイプ 2) を決定します。
  • ターゲットの Unity Catalog カタログとスキーマを作成または識別します。
ヒント

テーブルの論理名を見つけるには、Dataverse API または Power Apps メーカー ポータルを使用します。Power Apps で、 テーブル に移動し、 論理名 列を表示します。

オプション 1: Databricksノートブック

次のノートブック コードを使用して、D365 取り込みパイプラインを作成します。

Python
# Import the ingestion API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import IngestionPipelineDefinition

# Initialize the workspace client
w = WorkspaceClient()

# Define the pipeline configuration
pipeline_config = IngestionPipelineDefinition(
# Required: Use PREVIEW channel
channel="PREVIEW",

# Your Unity Catalog connection name
connection_name="d365_connection",

# Dataverse environment URL or ID
source_schema="https://yourorg.crm.dynamics.com",

# List of D365 tables to ingest (logical names)
source_table=[
"account", # Accounts
"contact", # Contacts
"opportunity", # Opportunities
"salesorder" # Sales Orders
],

# Target location in Unity Catalog
destination_catalog="main",
destination_schema="d365_data",

# History tracking: SCD_TYPE_1 (history tracking off) or SCD_TYPE_2 (history tracking on)
scd_type="SCD_TYPE_2"
)

# Create the pipeline
pipeline = w.pipelines.create(
name="d365_sales_ingestion",
ingestion_definition=pipeline_config
)

print(f"Pipeline created with ID: {pipeline.pipeline_id}")

複数のスキーマの取り込み

複数の Dataverse 環境からテーブルを取り込むには、個別のパイプラインを作成します。

Python
# Pipeline for production environment
prod_pipeline = w.pipelines.create(
name="d365_prod_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://prod.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_prod",
scd_type="SCD_TYPE_2"
)
)

# Pipeline for test environment
test_pipeline = w.pipelines.create(
name="d365_test_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://test.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_test",
scd_type="SCD_TYPE_2"
)
)

特定の列を選択する

ソース テーブルから特定の列のみを取り込むには、列選択を使用します。

Python
pipeline_config = IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://yourorg.crm.dynamics.com",
source_table=["account"],
destination_catalog="main",
destination_schema="d365_data",
scd_type="SCD_TYPE_1",

# Specify columns to include
table_configuration={
"account": {
"columns": [
"accountid",
"name",
"accountnumber",
"emailaddress1",
"telephone1",
"address1_city",
"address1_stateorprovince"
]
}
}
)

オプション 2: Databricks CLI

Databricks CLI を使用して、構成ファイルからパイプラインを作成します。

d365-pipeline.jsonという名前のファイルを作成します:

JSON
{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": ["account", "contact", "opportunity", "salesorder"],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}

CLI を使用してパイプラインを作成します。

Bash
databricks pipelines create --json @d365-pipeline.json

オプション3: Databricksアセットバンドル

Databricks アセット バンドルを使用して、D365 パイプラインをコードとして管理します。

databricks.ymlという名前のファイルを作成します:

YAML
resources:
pipelines:
d365_sales_ingestion:
name: 'd365_sales_ingestion'
ingestion_definition:
channel: 'PREVIEW'
connection_name: 'd365_connection'
source_schema: 'https://yourorg.crm.dynamics.com'
source_table:
- 'account'
- 'contact'
- 'opportunity'
- 'salesorder'
destination_catalog: 'main'
destination_schema: 'd365_data'
scd_type: 'SCD_TYPE_2'

バンドルをデプロイします。

Bash
databricks bundle deploy

オプション4: Databricks APIs

パイプラインAPI使用して、D365 インジェスト パイプラインを作成します。

APIリクエストの例:

Bash
curl -X POST \
https://<workspace-url>/api/2.0/pipelines \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": [
"account",
"contact",
"opportunity",
"salesorder"
],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}'

パイプラインの作成を確認する

パイプラインを作成した後:

  1. ワークスペースの 「ジョブとパイプライン」 に移動します。
  2. 名前でパイプラインを見つけます。
  3. 詳細を表示するにはパイプラインを選択します。
  4. 初期取り込みを実行するには、 [開始] を選択します。
  5. パイプラインの実行を監視し、パイプラインがターゲット スキーマにテーブルを作成することを確認します。

取り込んだデータを確認するには:

SQL
-- Check the account table
SELECT * FROM main.d365_data.account LIMIT 10;

-- Verify record counts
SELECT COUNT(*) FROM main.d365_data.account;
注記

最初のパイプライン実行では、選択されたすべてのテーブルの完全更新が実行されます。後続の実行では、Azure Synapse Link の変更ログのVersionNumberカーソルに基づいて増分インジェストが使用されます。

次のステップ

パイプラインを作成したら、次の操作を行います。