Workday レポートの取り込み
プレビュー
Workday Reports コネクタは パブリック プレビュー段階です。
この記事では、Workday レポートを取り込み、Databricks を使用してLakeFlow Connect に読み込む方法について説明します。結果として得られるインジェスト パイプラインは Unity Catalog によって制御され、サーバレス コンピュートと DLT によって駆動されます。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 Enable サーバレス コンピュートを参照してください。
-
接続を作成する予定の場合: メタストアに対する
CREATE CONNECTION
権限があります。既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION
権限またはALL PRIVILEGES
があります。 -
ターゲット・カタログに対する
USE CATALOG
権限があります。 -
既存のスキーマに対する
USE SCHEMA
権限とCREATE TABLE
権限、またはターゲット カタログに対するCREATE SCHEMA
権限を持っている。
Workday から取り込むには、「 取り込み用の Workday レポートを構成する」を参照してください。
Workday 接続を作成する
必要なアクセス許可: メタストア CREATE CONNECTION
。
Workday 接続を作成するには、次の手順を実行します。
- Databricksワークスペースで、[ カタログ] > [外部ロケーション] > [接続] > [接続の作成 ] をクリックします。
- [ Connection name ] に、Workday 接続の一意の名前を入力します。
- [接続の種類 ] で [ Workday レポート] を選択します。
- [Auth type] で [ OAuth 更新 トークン] を選択し、 ソース セットアップ 中に生成した[Client ID]、[ Client secret]、および[更新トークン] を入力します。
- [ 接続の作成 ] ページで、[ 作成 ] をクリックします。
インジェスト パイプラインを作成する
この手順では、インジェスト パイプラインを設定する方法について説明します。 取り込まれた各テーブルは、明示的に名前を変更していない限り、宛先に同じ名前 (ただしすべて小文字) の対応するストリーミング テーブルを取得します。
- Databricks Asset Bundles
- Notebook
- CLI
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DABs). Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init
-
Add two new resource files to the bundle:
- A pipeline definition file (
resources/workday_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/workday_job.yml
).
The following is an example
resources/workday_pipeline.yml
file:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for workday_dab
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <workday-connection>
objects:
# An array of objects to ingest from Workday. This example
# ingests a sample report about all active employees. The Employee_ID key is used as
# the primary key for the report.
- report:
source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: All_Active_Employees_Data
table_configuration:
primary_keys:
- Employee_IDThe following is an example
resources/workday_job.yml
file:YAMLresources:
jobs:
workday_dab_job:
name: workday_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_workday.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
-
Generate a personal access token.
-
Paste the following code into a Python notebook cell, modifying the
<personal-access-token>
value:Python# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<personal-access-token>" -
Paste the following code into a second notebook cell:
Python# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str = ""):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response) -
Paste the following code into a third notebook cell, modifying to reflect your pipeline specifications:
Python# SHOULD MODIFY
# Update this notebook to configure your ingestion pipeline.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"report": {
"source_url": "<YOUR_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"]
}
}
}, {
"report": {
"source_url": "<YOUR_SECOND_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"],
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec) -
Run the first notebook cell with your personal access token.
-
Run the second notebook cell.
-
Run the third notebook cell with your pipeline details. This runs
create_pipeline
.list_pipeline
returns the pipeline ID and its details.edit_pipeline
allows you to edit the pipeline definition.delete_pipeline
deletes the pipeline.
To create the pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
To edit the pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
To get the pipeline definition:
databricks pipelines get "<your_pipeline_id>"
To delete the pipeline:
databricks pipelines delete "<your_pipeline_id>"
For more information, you can always run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
パイプラインの開始、スケジュール設定、アラートの設定
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、[ スケジュール] をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。