Skip to main content

Ingest Workday reports

This page describes how to ingest Workday reports and load them into Databricks using Lakeflow Connect.

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace must be enabled for Unity Catalog.

  • Serverless compute must be enabled for your workspace. See Enable serverless compute.

  • If you plan to create a new connection: You must have CREATE CONNECTION privileges on the metastore.

    If your connector supports UI-based pipeline authoring, you can create the connection and the pipeline at the same time by completing the steps on this page. However, if you use API-based pipeline authoring, you must create the connection in Catalog Explorer before you complete the steps on this page. See Connect to managed ingestion sources.

  • If you plan to use an existing connection: You must have USE CONNECTION privileges or ALL PRIVILEGES on the connection object.

  • You must have USE CATALOG privileges on the target catalog.

  • You must have USE SCHEMA and CREATE TABLE privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

To ingest from Workday, you must complete the source setup.

Configure networking

If you have serverless egress control enabled, allowlist the host names of your report URLs. For example, the report URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json has the host name https://ww1.workday.com. See Manage network policies for serverless egress control.

Option 1: Databricks UI

  1. In the sidebar of the Databricks workspace, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click Workday Reports.

    The ingestion wizard opens.

  3. On the Ingestion pipeline page of the wizard, enter a unique name for the pipeline.

  4. For Event log location, select a catalog and schema to store the pipeline event log.

  5. Select the Unity Catalog connection that stores the credentials required to access the source data.

    If there are no existing connections to the source, click Create connection and enter the authentication details you obtained from the source setup. You must have CREATE CONNECTION privileges on the metastore.

  6. Click Create pipeline and continue.

  7. On the Report page, click Add report and enter the report URL. Repeat for each report that you want to ingest, then click Next.

  8. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  9. Click Save pipeline and continue.

  10. (Optional) On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.

  11. (Optional) Set email notifications for pipeline operation success or failure.

  12. Click Save and run pipeline.

Option 2: Databricks Asset Bundles

This section describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.

You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:

  • include_columns: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
  • exclude_columns: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.

You can also specify prompts in the report URL (source_url), which allows you to ingest filtered reports.

  1. Confirm that a Unity Catalog connection to Workday exists. For steps to create a connection, see Connect to managed ingestion sources.

  2. Create a new bundle using the Databricks CLI:

    Bash
    databricks bundle init
  3. Add two new resource files to the bundle:

    • A pipeline definition file (resources/workday_pipeline.yml).
    • A workflow file that controls the frequency of data ingestion (resources/workday_job.yml).

    The following is an example resources/workday_pipeline.yml file:

    YAML
    variables:
    dest_catalog:
    default: main
    dest_schema:
    default: ingest_destination_schema

    # The main pipeline for workday_dab
    resources:
    pipelines:
    pipeline_workday:
    name: workday_pipeline
    catalog: ${var.dest_catalog}
    schema: ${var.dest_schema}
    ingestion_definition:
    connection_name: <workday-connection>
    objects:
    # An array of objects to ingest from Workday. This example
    # ingests a sample report about all active employees. The Employee_ID key is used as
    # the primary key for the report.
    - report:
    source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    destination_table: All_Active_Employees_Data
    table_configuration:
    primary_keys:
    - Employee_ID
    include_columns: # This can be exclude_columns instead
    - <column_a>
    - <column_b>
    - <column_c>

    The following is an example resources/workday_job.yml file:

    YAML
    resources:
    jobs:
    workday_dab_job:
    name: workday_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_workday.id}
  4. Deploy the pipeline using the Databricks CLI:

    Bash
    databricks bundle deploy

Option 3: Databricks notebook

You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:

  • include_columns: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
  • exclude_columns: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.

You can also specify prompts in the report URL (source_url), which allows you to ingest filtered reports.

  1. Confirm that a Unity Catalog connection to Workday exists. For steps to create a connection, see Connect to managed ingestion sources.

  2. Generate a personal access token.

  3. Paste the following code into a Python notebook cell, modifying the <personal-access-token> value:

    Python
    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
  4. Paste the following code into a second notebook cell:

    Python
    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json

    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"

    headers = {
    'Authorization': 'Bearer {}'.format(api_token),
    'Content-Type': 'application/json'
    }

    def check_response(response):
    if response.status_code == 200:
    print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
    else:
    print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)

    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)

    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)

    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)

    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
  5. Paste the following code into a third notebook cell, modifying to reflect your pipeline specifications:

    Python
    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.

    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
    "connection_name": "<YOUR_CONNECTON_NAME>",
    "objects": [
    {
    "report": {
    "source_url": "<YOUR_REPORT_URL>",
    "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
    "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
    "destination_table": "<YOUR_DATABRICKS_TABLE>",
    "table_configuration": {
    "primary_keys": ["<PRIMARY_KEY>"]
    }
    }
    }, {
    "report": {
    "source_url": "<YOUR_SECOND_REPORT_URL>",
    "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
    "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
    "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
    "table_configuration": {
    "primary_keys": ["<PRIMARY_KEY>"],
    "scd_type": "SCD_TYPE_2",
    "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
    }
    }
    }
    ]
    }
    }
    """

    create_pipeline(pipeline_spec)
  6. Run the first notebook cell with your personal access token.

  7. Run the second notebook cell.

  8. Run the third notebook cell with your pipeline details. This runs create_pipeline.

    • list_pipeline returns the pipeline ID and its details.
    • edit_pipeline allows you to edit the pipeline definition.
    • delete_pipeline deletes the pipeline.

Option 4: Databricks CLI

You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:

  • include_columns: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
  • exclude_columns: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.

You can also specify prompts in the report URL (source_url), which allows you to ingest filtered reports.

  1. Confirm that a Unity Catalog connection to Workday exists. For steps to create a connection, see Connect to managed ingestion sources.
  2. Run the following command to create the pipeline:
databricks pipelines create --json "<pipeline-definition OR json-file-path>"

Pipeline definition template

The following is a JSON pipeline definition template:

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"report": {

"source_url": "<report-url>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"primary_keys": ["<primary-key>"],

"scd_type": "SCD_TYPE_2",

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

Start, schedule, and set alerts on your pipeline

You can create a schedule for the pipeline on the pipeline details page.

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Pipelines.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, you can schedule the pipeline by clicking Schedule.

  4. To set notifications on the pipeline, click Settings, and then add a notification.

For each schedule that you add to a pipeline, Lakeflow Connect automatically creates a job for it. The ingestion pipeline is a task within the job. You can optionally add more tasks to the job.

Example: Ingest two Workday reports into separate schemas

The example pipeline definition in this section ingests two Workday reports into separate schemas. Multi-destination pipeline support is API-only.

YAML
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>

Additional resources