Ingest Workday reports

Preview

LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.

This article describes how to ingest Workday reports and load them into Databricks using LakeFlow Connect. The resulting ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.

  • Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.

  • To create a connection: You have CREATE CONNECTION on the metastore.

    To use an existing connection: You have USE CONNECTION or ALL PRIVILEGES on the connection object.

  • USE CATALOG on the target catalog.

  • USE SCHEMA and CREATE TABLE on an existing schema or CREATE SCHEMA on the target catalog.

Configure Workday reports for ingestion

See Configure Workday reports for ingestion.

Create a Workday connection

Permissions required: CREATE CONNECTION on the metastore.

To create a Workday connection, do the following:

  1. In your Databricks workspace, click Catalog > External locations > Connections > Create connection.

  2. For Connection name, enter a unique name for the Workday connection.

  3. For Connection type, select Workday Reports.

  4. For Auth type, select OAuth Refresh Token and then enter the Client ID, Client secret, and Refresh token that you generated during source setup.

  5. On the Create Connection page, click Create.

Create an ingestion pipeline

This step describes how to set up the ingestion pipeline. Each ingested table gets a corresponding streaming table with the same name (but all lowercase) in the destination unless you’ve explicitly renamed it.

This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DABs). Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.

  1. Create a new bundle using the Databricks CLI:

    databricks bundle init
    
  2. Add two new resource files to the bundle:

    • A pipeline definition file (resources/workday_pipeline.yml).

    • A workflow file that controls the frequency of data ingestion (resources/workday_job.yml).

    The following is an example resources/workday_pipeline.yml file:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for workday_dab
    resources:
      pipelines:
        pipeline_workday:
          name: workday_pipeline
          channel: PREVIEW
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <workday-connection>
            objects:
              # An array of objects to ingest from Workday. This example
              # ingests a sample report about all active employees. The Employee_ID key is used as
              # the primary key for the report.
              - report:
                  source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  destination_table: All_Active_Employees_Data
                  table_configuration:
                     primary_keys:
                        - Employee_ID
    

    The following is an example resources/workday_job.yml file:

    resources:
      jobs:
        workday_dab_job:
          name: workday_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_workday.id}
    
  3. Deploy the pipeline using the Databricks CLI:

    databricks bundle deploy
    
  1. Generate a personal access token.

  2. Paste the following code into a Python notebook cell, modifying the <personal-access-token> value:

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. Paste the following code into a second notebook cell:

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. Paste the following code into a third notebook cell, modifying to reflect your pipeline specifications:

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"],
                   "scd_type": "SCD_TYPE_2"
                }
             }
          }
       ]
    },
    "channel": "PREVIEW"
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. Run the first notebook cell with your personal access token.

  6. Run the second notebook cell.

  7. Run the third notebook cell with your pipeline details. This runs create_pipeline.

    • list_pipeline returns the pipeline ID and its details.

    • edit_pipeline allows you to edit the pipeline definition.

    • delete_pipeline deletes the pipeline.

To create the pipeline:

databricks pipelines create --json "<pipeline_definition OR json file path>"

To edit the pipeline:

databricks pipelines update --json "<<pipeline_definition OR json file path>"

To get the pipeline definition:

databricks pipelines get "<your_pipeline_id>"

To delete the pipeline:

databricks pipelines delete "<your_pipeline_id>"

For more information, you can always run:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Start, schedule, and set alerts on your pipeline

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.

  4. To set alerts on the pipeline, click Schedule, click More options, and then add a notification.

  5. After ingestion completes, you can query your tables.