Workday レポートの取り込み
この記事では、Workday レポートを取り込み、Databricks を使用してLakeFlow Connect に読み込む方法について説明します。結果として得られるインジェスト パイプラインは Unity Catalog によって制御され、サーバレス コンピュートと DLTによって駆動されます。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの有効化を参照してください。
-
接続を作成する予定の場合: メタストアに対する
CREATE CONNECTION
権限があります。既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION
権限またはALL PRIVILEGES
があります。 -
ターゲット・カタログに対する
USE CATALOG
権限があります。 -
既存のスキーマに対する
USE SCHEMA
権限とCREATE TABLE
権限、またはターゲット カタログに対するCREATE SCHEMA
権限を持っている。
Workday から取り込むには、「 取り込み用の Workday レポートを構成する」を参照してください。
ネットワークを構成する
サーバレス エグレス コントロールが有効になっている場合は、レポート URL のホスト名を許可リストに登録します。たとえば、レポート URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json
のホスト名は https://ww1.workday.com
です。サーバレス egress 制御のネットワーク ポリシーの管理を参照してください。
Workday 接続を作成する
必要なアクセス許可: メタストア CREATE CONNECTION
。
Workday 接続を作成するには、次の手順を実行します。
- Databricksワークスペースで、[ カタログ] > [外部ロケーション] > [接続] > [接続の作成 ] をクリックします。
- [ Connection name ] に、Workday 接続の一意の名前を入力します。
- [接続の種類 ] で [ Workday レポート] を選択します。
- [Auth type] で [ OAuth 更新 トークン] を選択し、 ソース セットアップ 中に生成した[Client ID]、[ Client secret]、および[更新トークン] を入力します。
- [ 接続の作成 ] ページで、[ 作成 ] をクリックします。
インジェスト パイプラインを作成する
この手順では、インジェスト パイプラインを設定する方法について説明します。取り込まれた各テーブルは、明示的に名前を変更していない限り、宛先に同じ名前 (ただしすべて小文字) の対応するストリーミングテーブルを取得します。
- Databricks Asset Bundles
- Notebook
- CLI
このタブでは、Databricks Asset Bundle を使用してインジェスト パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「アセットバンドルDatabricks」を参照してください。
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
また、レポートの URL(source_url
)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init
-
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/workday_pipeline.yml
)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/workday_job.yml
)。
次に、
resources/workday_pipeline.yml
ファイルの例を示します。YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for workday_dab
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <workday-connection>
objects:
# An array of objects to ingest from Workday. This example
# ingests a sample report about all active employees. The Employee_ID key is used as
# the primary key for the report.
- report:
source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: All_Active_Employees_Data
table_configuration:
primary_keys:
- Employee_ID
include_columns: # This can be exclude_columns instead
- <column_a>
- <column_b>
- <column_c>次に、
resources/workday_job.yml
ファイルの例を示します。YAMLresources:
jobs:
workday_dab_job:
name: workday_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_workday.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
また、レポートの URL(source_url
)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。
-
個人用アクセス トークンを生成します。
-
次のコードを Python ノートブックのセルに貼り付けて、
<personal-access-token>
値を変更します。Python# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<personal-access-token>" -
次のコードを 2 番目のノートブック セルに貼り付けます。
Python# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str = ""):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response) -
次のコードを 3 番目のノートブック セルに貼り付け、パイプラインの仕様を反映するように変更します。
Python# SHOULD MODIFY
# Update this notebook to configure your ingestion pipeline.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"report": {
"source_url": "<YOUR_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"]
}
}
}, {
"report": {
"source_url": "<YOUR_SECOND_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column_a>", "<column_b>", "<column_c>"]
}
}
}
]
}
}
"""
create_pipeline(pipeline_spec) -
最初のノートブック セルを個人用アクセス トークンで実行します。
-
2 番目のノートブック セルを実行します。
-
パイプラインの詳細で 3 番目のノートブック セルを実行します。これは
create_pipeline
実行されます。list_pipeline
パイプライン ID とその詳細を返します。edit_pipeline
パイプライン定義を編集できます。delete_pipeline
パイプラインを削除します。
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
また、レポートの URL(source_url
)でプロンプトを指定して、フィルタリングされたレポートを取り込むこともできます。
パイプラインを作成するには:
databricks pipelines create --json "<pipeline_definition OR json file path>"
パイプラインを編集するには:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
パイプライン定義を取得するには:
databricks pipelines get "<your_pipeline_id>"
パイプラインを削除するには:
databricks pipelines delete "<your_pipeline_id>"
詳細については、いつでも実行できます。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
JSON パイプライン定義の例:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
パイプラインの開始、スケジュール設定、アラートの設定
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。
例: 2 つの Workday レポートを別々のスキーマに取り込む
このセクションのパイプライン定義の例では、2 つの Workday レポートを別々のスキーマに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>