Workday レポートの取り込み
このページでは、Workday レポートを取り込み、Databricks を使用してLakeflowコネクト に読み込む方法について説明します。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
- 
ワークスペースでUnity Catalogが有効になっている必要があります。 
- 
サーバレス コンピュートは、ワークスペースで有効にする必要があります。 サーバレス コンピュートの要件を参照してください。 
- 
新しい接続を作成する予定の場合: メタストアに対する CREATE CONNECTION権限が必要です。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。 
- 
既存の接続を使用する予定の場合: 接続オブジェクトに対する USE CONNECTION権限またはALL PRIVILEGESが必要です。
- 
ターゲット・カタログに対する USE CATALOG権限が必要です。
- 
既存のスキーマに対する USE SCHEMA権限とCREATE TABLE権限、またはターゲット・カタログに対するCREATE SCHEMA権限が必要です。
Workday から取り込むには、 ソースのセットアップを完了する必要があります。
オプション 1: Databricks UI
- 
Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。 
- 
[ データの追加 ] ページの [Databricks コネクタ ] で、[ Workday レポート] をクリックします。 インジェスト ウィザードが開きます。 
- 
ウィザードの インジェスト パイプライン ページで、パイプラインの一意の名前を入力します。 
- 
[イベントログの場所 ] で、パイプラインイベントログを保存するカタログとスキーマを選択します。 
- 
ソース データへのアクセスに必要な資格情報を格納する Unity Catalog 接続を選択します。 ソースへの既存の接続がない場合は、[ 接続の作成 ] をクリックし、 ソース設定から取得した認証の詳細を入力します。メタストアに対する CREATE CONNECTION権限が必要です。
- 
パイプラインの作成および続行 をクリックします。 
- 
[レポート ] ページで、[ レポートの追加 ] をクリックし、レポートの URL を入力します。取り込むレポートごとに繰り返し、[ 次へ ] をクリックします。 
- 
宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。 既存のスキーマを使用しない場合は、[ スキーマの作成 ] をクリックします。親カタログに対する USE CATALOG権限とCREATE SCHEMA権限が必要です。
- 
パイプラインを保存と続行 をクリックします。 
- 
(オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。 
- 
(オプション)パイプライン操作の成功または失敗のEメール 通知を設定します。 
- 
パイプラインの保存と実行 をクリックします。 
オプション 2: Databricks アセット バンドル
このセクションでは、Databricks Asset Bundle を使用してインジェスト パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「アセットバンドルDatabricks」を参照してください。
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
- include_columns: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。
- exclude_columns: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
また、レポートの URL(source_url)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。
- 
Workday への Unity Catalog 接続が存在することを確認します。接続を作成する手順については、「 管理されたインジェスト ソースに接続する」を参照してください。 
- 
Databricks CLI を使用して新しいバンドルを作成します。 Bashdatabricks bundle init
- 
バンドルに 2 つの新しいリソース ファイルを追加します。 - パイプライン定義ファイル (resources/workday_pipeline.yml)。
- データ取り込みの頻度を制御するワークフロー ファイル (resources/workday_job.yml)。
 次に、 resources/workday_pipeline.ymlファイルの例を示します。YAMLvariables:
 dest_catalog:
 default: main
 dest_schema:
 default: ingest_destination_schema
 # The main pipeline for workday_dab
 resources:
 pipelines:
 pipeline_workday:
 name: workday_pipeline
 catalog: ${var.dest_catalog}
 schema: ${var.dest_schema}
 ingestion_definition:
 connection_name: <workday-connection>
 objects:
 # An array of objects to ingest from Workday. This example
 # ingests a sample report about all active employees. The Employee_ID key is used as
 # the primary key for the report.
 - report:
 source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
 destination_catalog: ${var.dest_catalog}
 destination_schema: ${var.dest_schema}
 destination_table: All_Active_Employees_Data
 table_configuration:
 primary_keys:
 - Employee_ID
 include_columns: # This can be exclude_columns instead
 - <column_a>
 - <column_b>
 - <column_c>次に、 resources/workday_job.ymlファイルの例を示します。YAMLresources:
 jobs:
 workday_dab_job:
 name: workday_dab_job
 trigger:
 # Run this job every day, exactly one day from the last run
 # See https://docs.databricks.com/api/workspace/jobs/create#trigger
 periodic:
 interval: 1
 unit: DAYS
 email_notifications:
 on_failure:
 - <email-address>
 tasks:
 - task_key: refresh_pipeline
 pipeline_task:
 pipeline_id: ${resources.pipelines.pipeline_workday.id}
- パイプライン定義ファイル (
- 
Databricks CLI を使用してパイプラインをデプロイします。 Bashdatabricks bundle deploy
オプション 3: Databricks ノートブック
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
- include_columns: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。
- exclude_columns: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
また、レポートの URL(source_url)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。
- 
Workday への Unity Catalog 接続が存在することを確認します。接続を作成する手順については、「 管理されたインジェスト ソースに接続する」を参照してください。 
- 
個人用アクセス トークンを生成します。 
- 
次のコードを Python ノートブックのセルに貼り付けて、 <personal-access-token>値を変更します。Python# SHOULD MODIFY
 # This step sets up a PAT to make API calls to the Databricks service.
 api_token = "<personal-access-token>"
- 
次のコードを 2 番目のノートブック セルに貼り付けます。 Python# DO NOT MODIFY
 # This step sets up a connection to make API calls to the Databricks service.
 import requests
 import json
 notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
 workspace_url = notebook_context.apiUrl().get()
 api_url = f"{workspace_url}/api/2.0/pipelines"
 headers = {
 'Authorization': 'Bearer {}'.format(api_token),
 'Content-Type': 'application/json'
 }
 def check_response(response):
 if response.status_code == 200:
 print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
 else:
 print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
 # DO NOT MODIFY
 # These are API definition to be used.
 def create_pipeline(pipeline_definition: str):
 response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
 check_response(response)
 def edit_pipeline(id: str, pipeline_definition: str):
 response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
 check_response(response)
 def delete_pipeline(id: str):
 response = requests.delete(url=f"{api_url}/{id}", headers=headers)
 check_response(response)
 def get_pipeline(id: str):
 response = requests.get(url=f"{api_url}/{id}", headers=headers)
 check_response(response)
 def list_pipeline(filter: str = ""):
 body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
 response = requests.get(url=api_url, headers=headers, data=body)
 check_response(response)
- 
次のコードを 3 番目のノートブック セルに貼り付け、パイプラインの仕様を反映するように変更します。 Python# SHOULD MODIFY
 # Update this notebook to configure your ingestion pipeline.
 pipeline_spec = """
 {
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
 "connection_name": "<YOUR_CONNECTON_NAME>",
 "objects": [
 {
 "report": {
 "source_url": "<YOUR_REPORT_URL>",
 "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
 "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
 "destination_table": "<YOUR_DATABRICKS_TABLE>",
 "table_configuration": {
 "primary_keys": ["<PRIMARY_KEY>"]
 }
 }
 }, {
 "report": {
 "source_url": "<YOUR_SECOND_REPORT_URL>",
 "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
 "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
 "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
 "table_configuration": {
 "primary_keys": ["<PRIMARY_KEY>"],
 "scd_type": "SCD_TYPE_2",
 "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
 }
 }
 }
 ]
 }
 }
 """
 create_pipeline(pipeline_spec)
- 
最初のノートブック セルを個人用アクセス トークンで実行します。 
- 
2 番目のノートブック セルを実行します。 
- 
パイプラインの詳細で 3 番目のノートブック セルを実行します。これは create_pipeline実行されます。- list_pipelineパイプライン ID とその詳細を返します。
- edit_pipelineパイプライン定義を編集できます。
- delete_pipelineパイプラインを削除します。
 
オプション 4: Databricks CLI
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
- include_columns: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。
- exclude_columns: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
また、レポートの URL(source_url)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。
- Workday への Unity Catalog 接続が存在することを確認します。接続を作成する手順については、「 管理されたインジェスト ソースに接続する」を参照してください。
- 次のコマンドを実行して、パイプラインを作成します。
databricks pipelines create --json "<pipeline-definition OR json-file-path>"
パイプライン定義テンプレート
JSON パイプライン定義テンプレートを次に示します。
"ingestion_definition": {
     "connection_name": "<connection-name>",
     "objects": [
       {
         "report": {
           "source_url": "<report-url>",
           "destination_catalog": "<destination-catalog>",
           "destination_schema": "<destination-schema>",
           "table_configuration": {
              "primary_keys": ["<primary-key>"],
              "scd_type": "SCD_TYPE_2",
              "include_columns": ["<column-a>", "<column-b>", "<column-c>"]
           }
         }
       }
     ]
 }
パイプラインの開始、スケジュール設定、アラートの設定
パイプラインの詳細ページでパイプラインのスケジュールを作成できます。
- 
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。 新しいパイプラインがパイプライン リストに表示されます。 
- 
パイプラインの詳細を表示するには、パイプライン名をクリックします。 
- 
パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。 
- 
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。 
パイプラインに追加するスケジュールごとに、 Lakeflowコネクト によってそのジョブが自動的に作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。オプションで、ジョブにタスクを追加できます。
例: 2 つの Workday レポートを別々のスキーマに取り込む
このセクションのパイプライン定義の例では、2 つの Workday レポートを別々のスキーマに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。
resources:
  pipelines:
    pipeline_workday:
      name: workday_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <workday-connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: my_catalog_1
              destination_schema: my_schema_1
              destination_table: my_table_1
              table_configuration:
                primary_keys:
                  - <primary_key_column>
          - report:
              source_url: <report-url-2>
              destination_catalog: my_catalog_2
              destination_schema: my_schema_2
              destination_table: my_table_2
              table_configuration:
                primary_keys:
                  - <primary_key_column>