メインコンテンツまでスキップ

Workday レポートの取り込み

このページでは、Workday レポートを取り込み、Databricks を使用してLakeflowコネクト に読み込む方法について説明します。

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースでUnity Catalogが有効になっている必要があります。

  • サーバレス コンピュートがワークスペースで有効になっている必要があります。 Enable サーバレス コンピュートを参照してください。

  • 新しい接続を作成する予定の場合: メタストアに対する CREATE CONNECTION 権限が必要です。

    コネクタが UI ベースのパイプラインオーサリングをサポートしている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。ただし、API ベースのパイプラインオーサリングを使用する場合は、このページの手順を完了する前に、Catalog Explorer で接続を作成する必要があります。「管理された取り込みソースに接続する」を参照してください

  • 既存の接続を使用する予定の場合: 接続オブジェクトに対する USE CONNECTION 権限または ALL PRIVILEGES が必要です。

  • ターゲット・カタログに対する USE CATALOG 権限が必要です。

  • 既存のスキーマに対する USE SCHEMA 権限と CREATE TABLE 権限、またはターゲット・カタログに対する CREATE SCHEMA 権限が必要です。

Workday から取り込むには、 ソースのセットアップを完了する必要があります。

ネットワークを構成する

サーバレス エグレス コントロールが有効になっている場合は、レポート URL のホスト名を許可リストに登録します。たとえば、レポート URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json のホスト名は https://ww1.workday.comです。サーバレス egress 制御については、Manage network ポリシーを参照してください。

オプション 1: Databricks UI

  1. Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。

  2. [ データの追加 ] ページの [Databricks コネクタ ] で、[ Workday レポート] をクリックします。

    インジェスト ウィザードが開きます。

  3. ウィザードの インジェスト パイプライン ページで、パイプラインの一意の名前を入力します。

  4. [イベントログの場所 ] で、パイプラインイベントログを保存するカタログとスキーマを選択します。

  5. ソース データへのアクセスに必要な資格情報を格納する Unity Catalog 接続を選択します。

    ソースへの既存の接続がない場合は、[ 接続の作成 ] をクリックし、 ソース設定から取得した認証の詳細を入力します。メタストアに対する CREATE CONNECTION 権限が必要です。

  6. パイプラインの作成および続行 をクリックします。

  7. [レポート ] ページで、[ レポートの追加 ] をクリックし、レポートの URL を入力します。取り込むレポートごとに繰り返し、[ 次へ ] をクリックします。

  8. 宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、[ スキーマの作成 ] をクリックします。親カタログに対する USE CATALOG 権限と CREATE SCHEMA 権限が必要です。

  9. パイプラインを保存と続行 をクリックします。

  10. (オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。

  11. (オプション)パイプライン操作の成功または失敗のEメール 通知を設定します。

  12. パイプラインの保存と実行 をクリックします。

オプション 2: Databricks アセット バンドル

このセクションでは、Databricks Asset Bundle を使用してインジェスト パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「アセットバンドルDatabricks」を参照してください。

パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。

  • include_columns: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。
  • exclude_columns: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。

また、レポートの URL(source_url)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。

  1. Workday への Unity Catalog 接続が存在することを確認します。接続を作成する手順については、「 管理されたインジェスト ソースに接続する」を参照してください。

  2. Databricks CLI を使用して新しいバンドルを作成します。

    Bash
    databricks bundle init
  3. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/workday_pipeline.yml)。
    • データ取り込みの頻度を制御するワークフロー ファイル (resources/workday_job.yml)。

    次に、 resources/workday_pipeline.yml ファイルの例を示します。

    YAML
    variables:
    dest_catalog:
    default: main
    dest_schema:
    default: ingest_destination_schema

    # The main pipeline for workday_dab
    resources:
    pipelines:
    pipeline_workday:
    name: workday_pipeline
    catalog: ${var.dest_catalog}
    schema: ${var.dest_schema}
    ingestion_definition:
    connection_name: <workday-connection>
    objects:
    # An array of objects to ingest from Workday. This example
    # ingests a sample report about all active employees. The Employee_ID key is used as
    # the primary key for the report.
    - report:
    source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    destination_table: All_Active_Employees_Data
    table_configuration:
    primary_keys:
    - Employee_ID
    include_columns: # This can be exclude_columns instead
    - <column_a>
    - <column_b>
    - <column_c>

    次に、 resources/workday_job.yml ファイルの例を示します。

    YAML
    resources:
    jobs:
    workday_dab_job:
    name: workday_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_workday.id}
  4. Databricks CLI を使用してパイプラインをデプロイします。

    Bash
    databricks bundle deploy

オプション 3: Databricks ノートブック

パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。

  • include_columns: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。
  • exclude_columns: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。

また、レポートの URL(source_url)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。

  1. Workday への Unity Catalog 接続が存在することを確認します。接続を作成する手順については、「 管理されたインジェスト ソースに接続する」を参照してください。

  2. 個人用アクセス トークンを生成します。

  3. 次のコードを Python ノートブックのセルに貼り付けて、 <personal-access-token> 値を変更します。

    Python
    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
  4. 次のコードを 2 番目のノートブック セルに貼り付けます。

    Python
    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json

    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"

    headers = {
    'Authorization': 'Bearer {}'.format(api_token),
    'Content-Type': 'application/json'
    }

    def check_response(response):
    if response.status_code == 200:
    print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
    else:
    print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)

    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)

    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)

    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)

    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
  5. 次のコードを 3 番目のノートブック セルに貼り付け、パイプラインの仕様を反映するように変更します。

    Python
    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.

    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
    "connection_name": "<YOUR_CONNECTON_NAME>",
    "objects": [
    {
    "report": {
    "source_url": "<YOUR_REPORT_URL>",
    "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
    "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
    "destination_table": "<YOUR_DATABRICKS_TABLE>",
    "table_configuration": {
    "primary_keys": ["<PRIMARY_KEY>"]
    }
    }
    }, {
    "report": {
    "source_url": "<YOUR_SECOND_REPORT_URL>",
    "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
    "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
    "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
    "table_configuration": {
    "primary_keys": ["<PRIMARY_KEY>"],
    "scd_type": "SCD_TYPE_2",
    "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
    }
    }
    }
    ]
    }
    }
    """

    create_pipeline(pipeline_spec)
  6. 最初のノートブック セルを個人用アクセス トークンで実行します。

  7. 2 番目のノートブック セルを実行します。

  8. パイプラインの詳細で 3 番目のノートブック セルを実行します。これは create_pipeline実行されます。

    • list_pipeline パイプライン ID とその詳細を返します。
    • edit_pipeline パイプライン定義を編集できます。
    • delete_pipeline パイプラインを削除します。

オプション 4: Databricks CLI

パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。

  • include_columns: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。
  • exclude_columns: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。

また、レポートの URL(source_url)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。

  1. Workday への Unity Catalog 接続が存在することを確認します。接続を作成する手順については、「 管理されたインジェスト ソースに接続する」を参照してください。
  2. 次のコマンドを実行して、パイプラインを作成します。
databricks pipelines create --json "<pipeline-definition OR json-file-path>"

パイプライン定義テンプレート

JSON パイプライン定義テンプレートを次に示します。

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"report": {

"source_url": "<report-url>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"primary_keys": ["<primary-key>"],

"scd_type": "SCD_TYPE_2",

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

パイプラインの開始、スケジュール設定、アラートの設定

パイプラインの詳細ページでパイプラインのスケジュールを作成できます。

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。

パイプラインに追加するスケジュールごとに、 Lakeflowコネクト によってそのジョブが自動的に作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。オプションで、ジョブにタスクを追加できます。

例: 2 つの Workday レポートを別々のスキーマに取り込む

このセクションのパイプライン定義の例では、2 つの Workday レポートを別々のスキーマに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。

YAML
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>

追加のリソース