Workday レポートの取り込み
プレビュー
LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、LakeFlow Connect を使用して Workday レポートを取り込み、Databricks にロードする方法について説明します。 結果として得られる取り込みパイプラインはUnity Catalogによって管理され、サーバレス コンピュートとDelta Live Tablesによって強化されます。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
ワークスペースは Unity Catalog に対して有効になっています。
サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。
接続を作成するには: メタストアに
CREATE CONNECTION
があります。既存の接続を使用するには: 接続オブジェクトに
USE CONNECTION
またはALL PRIVILEGES
があります。USE CATALOG
ターゲットカタログ上。USE SCHEMA
ターゲットカタログ上の既存のスキーマまたはCREATE SCHEMA
CREATE TABLE
。
インジェストのための Workday レポートの構成
「 インジェスト用の Workday レポートの構成」を参照してください。
Workday 接続を作成する
必要なアクセス許可: メタストア CREATE CONNECTION
。
Workday 接続を作成するには、次の手順を実行します。
Databricksワークスペースで、 [カタログ] > [外部ロケーション] > [接続] > [接続の作成] をクリックします。
[ Connection name] に、Workday 接続の一意の名前を入力します。
[接続の種類] で [Workday レポート] を選択します。
[Auth type ] で [OAuth 更新 トークン ] を選択し、 ソース セットアップ 中に生成した [Client ID ]、[ Client secret ]、および [更新トークン ] を入力します。
[ 接続の作成 ] ページで、[ 作成] をクリックします。
注:
接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。
Delta Live Tablesパイプラインの作成
このステップでは、インジェスト パイプラインのセットアップ方法を説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、宛先に同じ名前 (ただしすべて小文字) の対応するストリーミング テーブルを取得します。
個人アクセストークンを生成します。
次のコードを Python ノートブックのセルに貼り付けて、
<personal-access-token>
値を変更します。# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
次のコードをノートブックの 2 番目のセルに貼り付けます。
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
次のコードを 3 番目のノートブック セルに貼り付け、パイプラインの仕様を反映するように変更します。
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<YOUR_PIPELINE_NAME>", "ingestion_definition": { "connection_name": "<YOUR_CONNECTON_NAME>", "objects": [ { "report": { "source_url": "<YOUR_REPORT_URL>, "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_TABLE>", "table_configuration": { "primary_keys": [<PRIMARY_KEY>] } } }, { "report": { "source_url": "<YOUR_SECOND_REPORT_URL>, "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>", "table_configuration": { "primary_keys": [<PRIMARY_KEY>], "scd_type": "SCD_TYPE_2" } } } ] }, "channel": "CURRENT" } """ create_pipeline(pipeline_spec)
最初のノートブックセルをあなたの個人的なアクセスアドレスで実行します。
2 番目のノートブック セルを実行します。
パイプラインの詳細を使用して、3 番目のノートブック セルを実行します。 これは
create_pipeline
を実行します。list_pipeline
パイプライン ID とその詳細を返します。edit_pipeline
パイプライン定義を編集できます。delete_pipeline
パイプラインを削除します。
パイプラインを作成するには:
databricks pipelines create --json "<pipeline_definition OR json file path>"
パイプラインを編集するには:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
パイプライン定義を取得するには:
databricks pipelines get "<your_pipeline_id>"
パイプラインを削除するには:
databricks pipelines delete "<your_pipeline_id>"
さらに詳しい情報が必要な場合は、いつでも以下を実行してください。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
パイプラインを開始、スケジュール、アラートを設定する
パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
パイプラインの詳細を表示するには、パイプライン名をクリックします。
パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。
パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。
インジェストが完了したら、テーブルに対してクエリを実行できます。