メインコンテンツまでスキップ

Microsoft Dynamics 365 取り込み用のデータ ソースを構成する

備考

プレビュー

この機能は パブリック プレビュー段階です。

LakeFlow Connectを使用してDatabricksに取り込むためのデータ ソースとしてMicrosoft Dynamics 365 を設定する方法を学びます。

コネクタがソース データにアクセスする方法については、 「コネクタは D365 データにどのようにアクセスしますか?」を参照してください。 。 サポートされている Dataverse アプリケーションの一覧については、 「どの Dynamics 365 アプリケーションがサポートされていますか?」を参照してください。

前提条件

D365 データ ソースを設定する前に、以下が必要です。

  • リソースを作成する権限を持つアクティブな Azure サブスクリプション。
  • 管理者アクセス権を持つ Microsoft Dynamics 365 環境。
  • D365 インスタンスに関連付けられた Dataverse 環境。
  • Databricks のワークスペース管理者またはメタストア管理者の権限。
  • Dataverse 環境で Azure Synapse Link を作成および構成するためのアクセス許可。
  • ADLS Gen2 ストレージ アカウント (またはそれを作成するためのアクセス許可)。
  • Microsoft Entra ID アプリケーションを作成および構成するための権限。
  • Dataverse API v9.2 以降。
  • Azure Storage REST API バージョン 2021-08-06。
  • Azure Synapse Link for Dataverse バージョン 1.0 以降。

仮想エンティティまたは直接テーブルを構成する(オプション)

仮想エンティティと直接テーブルにより、データをコピーせずに、Dataverse 以外のソース (D365 Finance & Operations など) のデータを Dataverse で使用できるようになります。非 Dataverse ソースの場合は、Azure Synapse Link を設定する前に、仮想エンティティまたは直接テーブルを構成する必要があります。

仮想エンティティを構成するには:

  1. Power Apps で、 環境 ページに移動し、 Dynamics 365 アプリ をクリックします。

  2. Dataverse 内の仮想エンティティとして F&O エンティティをリンクするには、 Finance and Operations 仮想エンティティ ソリューションをインストールします。

  3. Dataverse と F&O アプリケーションの間でサービス間 (S2S) 認証を設定します。 これにより、Dataverse はアプリケーションと通信できるようになります。詳細については、Microsoft のドキュメントを参照してください。

  4. Dataverse と F&O アプリケーションの間でサービス間 (S2S) 認証をセットアップします。 これにより、Dataverse はアプリケーションと通信できるようになります。詳細については、Microsoft のドキュメント「Dataverse 仮想エンティティの構成」を参照してください。

  5. 取り込む仮想エンティティごとに、 [詳細プロパティ][変更の追跡] を有効にします。

  6. デフォルトでは、F&O 仮想エンティティ ソリューションは、Dataverse テーブルのリストにいくつかの仮想エンティティをデフォルトで公開します。ただし、追加のエンティティを手動で公開することは可能です。

    1. Dataverse 環境の 「詳細設定」 ページに移動します。
    2. 詳細検索にアクセスするには、右上のフィルター アイコンをクリックします。
    3. ドロップダウン メニューから [利用可能な財務および運用エンティティ] を選択し、 [結果] をクリックします。
    4. 公開する仮想エンティティを選択します。
    5. エンティティ管理 ページで、 「表示」を 「True」 に切り替えて、 「保存して閉じる」 をクリックします。

これで、名前がmserp_で始まるエンティティが、Dataverse テーブルのリストに表示されるようになります。

重要

仮想エンティティと直接テーブルは、Azure Synapse Link に表示される前に、適切に構成および同期されている必要があります。利用可能になるまでに少し時間がかかります。

このステップでは、 Synapse Link for Dataverse からAzureデータレイクを使用して、取り込むテーブルを選択します。

注記

Synapse Link for Dataverse to Azureデータレイクは、以前はAzure Data Lake Storage Gen2へのデータのエクスポートとして知られていたサービスに代わるものです。 名前にもかかわらず、この機能は Azure Synapse Analytics を使用したり依存したりしません。これは、Dataverse から ADLS Gen 2 への継続的なエクスポート サービスです。

注記

デフォルトでは、Microsoft は現在、Synapse Link プロファイルごとに最大 1,000 個のテーブルをサポートしています。アプリケーションで複数のテーブルが選択されている場合は、複数のプロファイルを作成する必要があります。

  1. Power Apps ポータルで、 [分析] をクリックし、 [Azure Synapse にリンク] をクリックします

  2. [新しいリンク] をクリックします。Dataverse は、同じテナントからアクティブなサブスクリプションを自動的に入力します。ドロップダウンから適切なサブスクリプションを選択します。

  3. [Azure Synapse Analytics ワークスペースに接続する] チェックボックスをオンにしないでください。(コネクタは現在 Parquet をサポートしていないため、データは CSV 形式で保存する必要があります。)

注記

このコネクタを使用するには、別の Synapse Link プロファイルに既にリンクされている既存のストレージ アカウントに Dataverse テーブルを追加することはできません。新しい Synapse Link プロファイルを作成するには、リンクされていない Azure サブスクリプションにアクセスできる必要があります。

  1. Synapse リンク作成ページ で、 [詳細設定] をクリックします。次に、 「詳細構成設定を表示」 を切り替えます。

  2. フォルダー構造の増分更新を有効にするを 切り替えて、必要な Synapse Link 更新間隔を設定します。最短は5分です。この間隔は、この Synapse Link に含まれるすべてのテーブルに適用されます。( Databricksパイプラインのスケジュールは別のステップで設定します。)

  3. 同期するテーブルを選択します。 追加のみパーティション 設定はデフォルトのままにします。

    • Dataverse ネイティブ アプリから取り込む場合は、 Dataverse セクションから関連する Dataverse テーブルを直接選択します。
    • F&O から取り込む場合は、 D365 Finance & Operations セクションから直接テーブルを選択するか、 Dataverse セクションから仮想エンティティ (プレフィックスmserp_ ) を選択できます。仮想エンティティの詳細については、ステップ 1 を参照してください。
  4. 保存 をクリックします。

  5. 変更を保存すると、 Synapse Link の 初期同期が開始されます。

注記

F&O ユーザーの場合、数百ギガバイトの大きなテーブルでは、この初期同期に数時間かかることがあります。ただし、エンティティの初期同期に時間がかかりすぎる場合は、F&O アプリを使用してテーブルにインデックスを作成することで同期を高速化できます。

  • F&O 環境でインデックスを作成するテーブルに移動します。
  • テーブルの拡張機能を作成します。
  • テーブル拡張内で、新しいインデックスを定義します。
  • インデックスに含めるフィールドを追加します。(これにより、これらのフィールドに基づくデータベース検索が高速化されます。)
  • 変更を保存し、F&O 環境に展開します。

取り込み用のEntra IDアプリケーションを作成する

このステップでは、 Databricksへの取り込みをサポートするUnity Catalog接続を作成するために必要な Entra ID 情報を収集します。

  1. Entra ID テナントの テナント ID を収集します ( portal.azure.com >> Microsoft Entra ID >> 概要タブ >> テナント ID 、右側のパネルにリストされています)。

  2. Azure Synapse Link を作成すると、Azure Synapse によって、選択したテーブルを同期するための ADLS コンテナーが作成されます。Synapse Link の 管理ページ にアクセスして、 ADLS コンテナー名を 見つけます。

  3. ADLS コンテナーのアクセス資格情報を収集します。

    1. Microsoft Entra ID アプリをまだお持ちでない場合は作成してください
    2. クライアントシークレット を収集します。
    3. アプリ ID を収集します ( portal.azure.com >> Microsoft Entra ID >> 管理 >> アプリの登録 )。
  4. まだ行っていない場合は、Entra ID アプリに ADLS コンテナーへのアクセスを許可します。

注記

Entra ID アプリケーションが各 Synapse Link プロファイルに関連付けられた ADLS コンテナーにアクセスできることを確認します。複数の環境またはアプリケーションからデータを取り込む場合は、関連するすべてのコンテナーに対するロールがアプリケーションに割り当てられていることを確認します。

  1. Azure Storage アカウントに移動し、コンテナーまたはストレージ アカウントを選択します。 (Databricks では、最小限の権限を維持するためにコンテナー レベルを推奨しています。)
  2. [アクセス制御 (IAM)] をクリックし、 [ロールの割り当ての追加] をクリックします
  3. ストレージ BLOB コントリビューター → 読み取り/書き込み/削除アクセス ロールを選択します。(組織でこれが許可されていない場合は、Databricks アカウント チームにお問い合わせください。)
  4. 「次へ」 をクリックし、 「メンバーを選択」をクリックします
  5. ユーザー、グループ、またはサービスプリンシパル を選択し、 アプリの登録を検索します 。 (検索結果にアプリが表示されない場合は、検索バーにそのオブジェクト ID を明示的に入力し、 Enter キー を押します)。
  6. [レビュー + 割り当て] をクリックします。
  7. 権限が正しく構成されていることを確認するには、コンテナの アクセス制御を 確認します。

Dynamics 365 パイプラインを作成する

UIを使用する

  1. 左側のメニューで、 [新規] をクリックし、 [データの追加またはアップロード] を クリックします。
  2. データの追加 ページで、 Dynamics 365 タイルをクリックします。
  3. そこからウィザードの指示に従います。

APIを使用する

Dynamics 365接続を作成する

このステップでは、 Unity Catalog接続を作成して、D365 資格情報を安全に保存し、 Databricksへの取り込みを開始します。

  1. ワークスペースで、クリックします。 データアイコン。カタログ
  2. クリックプラグアイコン。 接続し てから、 「接続」 をクリックします。
  3. 「接続を作成」 ボタンをクリックしてください。
  4. 一意の 接続名 を指定し、 接続タイプ として Dynamics 365 を選択します。
  5. 前のステップで作成した Entra ID アプリの クライアント シークレットクライアント ID を入力します。 スコープを変更しないでください。 「次へ」 をクリックします。
  6. Azure ストレージ アカウント名テナント IDADLS コンテナー名 を入力し、 [接続の作成] をクリックします。
  7. 接続名をメモします。

Dynamics 365 パイプラインを作成する

このステップでは、取り込みパイプラインを設定します。取り込まれた各テーブルは、宛先に同じ名前の対応するストリーミング テーブルを取得します。

取り込みパイプラインを作成するには、次の 2 つのオプションがあります。

  • ノートブックを使用する
  • Databricks CLIを使用する

どちらのアプローチでも、パイプラインを作成する Databricks サービスへの API 呼び出しが行われます。

ノートブック:

  1. ノートブックテンプレートをコピーします。
  2. ノートブックの最初のセルを変更せずに実行します。
  3. ノートブックの 2 番目のセルをパイプラインの詳細 (たとえば、取り込み元のテーブル、データを保存する場所など) に変更します。
  4. テンプレート ノートブックの 2 番目のセルを実行します。この実行create_pipeline .
  5. list_pipelineを実行すると、パイプライン ID とその詳細を表示できます。
  6. パイプライン定義を編集するには、 edit_pipelineを実行できます。
  7. パイプラインを削除するには、 delete_pipelineを実行できます。

CLI:

パイプラインを作成するには:

Bash
databricks pipelines create --json "<pipeline_definition OR json file path>"

パイプラインを編集するには:

Bash
databricks pipelines update --json "<<pipeline_definition OR json file path>"

パイプライン定義を取得するには:

Bash
databricks pipelines get "<your_pipeline_id>"

パイプラインを削除するには:

Bash
databricks pipelines delete "<your_pipeline_id>"

さらに詳しい情報が必要な場合は、いつでも以下を実行してください。

Bash
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

追加機能を設定する(オプション)

コネクタには、履歴追跡用の SCD タイプ 2、列レベルの選択と選択解除などの追加機能も用意されています。マネージド インジェスト パイプラインの一般的なパターンを参照してください。

ノートブックテンプレート

セル1

このセルを変更しないでください。

Python
# DO NOT MODIFY

# This sets up the API utils for creating managed ingestion pipelines in Databricks.

import requests
import json

notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"

headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}

def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)

def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)

def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)

def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)

def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)

def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)

def stop_pipeline(id: str):
print("cannot stop pipeline")

セル2

以下のコード内の PREVIEW チャンネルを変更しないでください。

Azure Synapse Link によって同期されたすべてのテーブルを取り込む場合は、スキーマ レベルの仕様を使用します。(ただし、Databricks ではパイプラインごとに 250 を超えるテーブルを追加することは推奨されていません。)特定のテーブルのみを取り込む場合は、テーブル レベルの仕様を使用します。

ソース テーブル名が、Synapse Link 管理ページの [名前] 列に表示されるテーブル名と一致していることを確認します。

Python
# Option A: schema-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)
Python
# Option B: table-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)

例: SCDタイプ2

デフォルトでは、API は SCD タイプ 1 を使用します。つまり、ソースでデータが編集されると、宛先のデータが上書きされます。履歴データを保持し、 SCDタイプ 2 を使用する場合は、構成でそれを指定します。 例えば:

Python
# Schema-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)
Python
# Table-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)

例: 列レベルの選択と選択解除

デフォルトでは、API は選択したテーブル内のすべての列を取り込みます。ただし、特定の列を含めるか除外するかを選択できます。例えば:

Python
# Table spec with included and excluded columns.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"include_columns": ["<COLUMN_A>", "<COLUMN_B>", "<COLUMN_C>"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)