Workday レポートの取り込み

プレビュー

LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、LakeFlow Connect を使用して Workday レポートを取り込み、Databricks にロードする方法について説明します。 結果として得られる取り込みパイプラインはUnity Catalogによって管理され、サーバレス コンピュートとDelta Live Tablesによって強化されます。

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースは Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。

  • 接続を作成するには: メタストアに CREATE CONNECTION があります。

    既存の接続を使用するには: 接続オブジェクトに USE CONNECTION または ALL PRIVILEGES があります。

  • USE CATALOG ターゲットカタログ上。

  • USE SCHEMA ターゲットカタログ上の既存のスキーマまたはCREATE SCHEMACREATE TABLE

インジェストのための Workday レポートの構成

インジェスト用の Workday レポートの構成」を参照してください。

Workday 接続を作成する

必要なアクセス許可: メタストア CREATE CONNECTION

Workday 接続を作成するには、次の手順を実行します。

  1. Databricksワークスペースで、 [カタログ] > [外部ロケーション] > [接続] > [接続の作成] をクリックします。

  2. [ Connection name] に、Workday 接続の一意の名前を入力します。

  3. [接続の種類] で [Workday レポート] を選択します。

  4. [Auth type ] で [OAuth 更新 トークン ] を選択し、 ソース セットアップ 中に生成した [Client ID ]、[ Client secret ]、および [更新トークン ] を入力します。

  5. [ 接続の作成 ] ページで、[ 作成] をクリックします。

注:

接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。

Delta Live Tablesパイプラインの作成

このステップでは、インジェスト パイプラインのセットアップ方法を説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、宛先に同じ名前 (ただしすべて小文字) の対応するストリーミング テーブルを取得します。

  1. 個人アクセストークンを生成します。

  2. 次のコードを Python ノートブックのセルに貼り付けて、 <personal-access-token>値を変更します。

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. 次のコードをノートブックの 2 番目のセルに貼り付けます。

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. 次のコードを 3 番目のノートブック セルに貼り付け、パイプラインの仕様を反映するように変更します。

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>,
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": [<PRIMARY_KEY>]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>,
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": [<PRIMARY_KEY>],
                   "scd_type": "SCD_TYPE_2"
                }
             }
          }
       ]
    },
    "channel": "CURRENT"
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. 最初のノートブックセルをあなたの個人的なアクセスアドレスで実行します。

  6. 2 番目のノートブック セルを実行します。

  7. パイプラインの詳細を使用して、3 番目のノートブック セルを実行します。 これはcreate_pipelineを実行します。

    • list_pipeline パイプライン ID とその詳細を返します。

    • edit_pipeline パイプライン定義を編集できます。

    • delete_pipeline パイプラインを削除します。

パイプラインを作成するには:

databricks pipelines create --json "<pipeline_definition OR json file path>"

パイプラインを編集するには:

databricks pipelines update --json "<<pipeline_definition OR json file path>"

パイプライン定義を取得するには:

databricks pipelines get "<your_pipeline_id>"

パイプラインを削除するには:

databricks pipelines delete "<your_pipeline_id>"

さらに詳しい情報が必要な場合は、いつでも以下を実行してください。

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

パイプラインを開始、スケジュール、アラートを設定する

  1. パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。

  4. パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。

  5. インジェストが完了したら、テーブルに対してクエリを実行できます。