Creating and managing jobs with multiple tasks

Preview

This article discusses the orchestration of multiple tasks using Databricks jobs, a feature that is in Public Preview. For information about how to create, run, and manage single-task jobs using the generally-available jobs interface, see Jobs.

A job is a way to run non-interactive code in a Databricks cluster. For example, you can run an extract, transform, and load (ETL) workload interactively or on a schedule. A job consists of one or more tasks. Each task in the job contains code implementing the processing performed by that task and an environment to run the task, including parameters, libraries, and a cluster.

You can implement a task in a Databricks notebook, a Delta Live Tables pipeline, or an application written in Scala, Java, or Python. Legacy Spark Submit applications are also supported.

You can control the execution order of tasks by specifying dependencies between them. You can configure tasks to run in sequence or parallel. The following diagram illustrates a workflow that:

  1. Ingests raw clickstream data and performs processing to sessionize the records.

  2. Ingests order data and joins it with the sessionized clickstream data to create a prepared data set for analysis.

  3. Extracts features from the prepared data.

  4. Performs tasks in parallel to persist the features and train a machine learning model.

    Example multitask workflow

You can create and run a job using the UI, the CLI, or by invoking the Jobs API. You can monitor job run results using the UI, using the CLI, by querying the API, and by configuring email alerts. This article focuses on performing job tasks using the UI. For the other methods, see Jobs CLI and Jobs API.

Important

  • You can create jobs only in a Data Science & Engineering workspace or a Machine Learning workspace.
  • A workspace is limited to 1000 concurrent job runs. A 429 Too Many Requests response is returned when you request a run that cannot be started immediately.
  • The number of jobs a workspace can create in an hour is limited to 5000 (includes “run now” and “runs submit”). This limit also affects jobs created by the REST API and notebook workflows.

Requirements

An administrator must enable support for jobs with multiple tasks in the Databricks admin console.

Create a job

  1. Click Jobs Icon Jobs in the sidebar.

  2. Click Create Job Button.

    The Tasks tab displays with the create task dialog.

    Create task screen
  3. Replace Add a name for your job… with your job name.

  4. In the Task name field, enter a name for the task.

  5. Specify the type of task to run. In the Type drop-down, select Notebook, JAR, Spark Submit, Python, or Pipeline.

    • Notebook: Use the file browser to find the notebook, click the notebook name, and click Confirm.

    • JAR: Specify the Main class. Use the fully qualified name of the class containing the main method, for example, org.apache.spark.examples.SparkPi. Then click Add under Dependent Libraries to add libraries required to run the task. One of these libraries must contain the main class. To learn more about JAR tasks, see JAR jobs.

    • Spark Submit: In the Parameters text box, specify the main class, the path to the library JAR, and all arguments, formatted as a JSON array of strings. The following example configures a spark-submit task to run the DFSReadWriteTest from the Apache Spark examples:

      ["--class","org.apache.spark.examples.DFSReadWriteTest","dbfs:/FileStore/libraries/spark_examples_2_12_3_1_1.jar","/dbfs/databricks-datasets/README.md","/FileStore/examples/output/"]
      

      Important

      There are several limitations for spark-submit tasks:

      • You can run spark-submit tasks only on new clusters.
      • Spark-submit does not support cluster autoscaling. To learn more about autoscaling, see Cluster autoscaling.
      • Spark-submit does not support Databricks Utilities. To use Databricks Utilities, use JAR tasks instead.
      • For more information on which parameters may be passed to a spark-submit task, see SparkSubmitTask.
    • Python: In the Path textbox, enter the URI of a Python script on DBFS or cloud storage; for example, dbfs:/FileStore/myscript.py.

    • Pipeline: In the Pipeline dropdown, choose an existing Delta Live Tables pipeline.

  6. Configure the cluster where the task runs. In the Cluster drop-down, select either New Job Cluster or Existing All-Purpose Cluster. To learn more about selecting and configuring clusters to run tasks, see Cluster configuration.

    • New Job Cluster: Click Edit in the Cluster drop-down and complete the cluster configuration.
    • Existing All-Purpose Cluster: Select an existing cluster in the Cluster drop-down. To open the cluster in a new page, click the External Link icon to the right of the cluster name and description.
  7. You can pass parameters for your task. Each task type has different requirements for formatting and passing the parameters.

    • Notebook: Click Add and specify the key and value of each parameter to pass to the task. You can override or add additional parameters when manually running a task with Run a job with different parameters. Parameters set the value of the notebook widget specified by the key of the parameter. Use task parameter variables to pass a limited set of dynamic values as part of a parameter value.
    • JAR: Use a JSON-formatted array of strings to specify parameters. These strings are passed as arguments to the main method of the main class. See Configure JAR job parameters
    • Spark Submit: Parameters are specified as a JSON-formatted array of strings. Conforming to the Apache Spark spark-submit convention, parameters after the JAR path are passed to the main method of the main class.
    • Python: Use a JSON-formatted array of strings to specify parameters. These strings are passed as arguments which can be parsed using the argparse module in Python.
  8. To access additional options, including Dependent Libraries, Retry Policy, and Timeouts, click Advanced Options. See Task configuration options.

  9. Click Create.

  10. To optionally set the job’s schedule, click Edit schedule in the Job details panel. See Schedule a job.

  11. To optionally allow multiple concurrent runs of the same job, click Edit concurrent runs in the Job details panel. See Maximum concurrent runs.

  12. To optionally specify email addresses to receive alerts on job events, click Edit alerts in the Job details panel. See Job alerts.

  13. To optionally control permission levels on the job, click Edit permissions in the Job details panel. See Control access to jobs.

To add another task, click Add Task Button below the task you just created.

Run a job

Select a job and click the Runs tab. You can run a job immediately or schedule the job to run later.

Run a job immediately

To run the job immediately, click Run Now Button.

Tip

You can perform a test run of a job with a notebook task by clicking Run Now. If you need to make changes to the notebook, clicking Run Now again after editing the notebook will automatically run the new version of the notebook.

Run a job with different parameters

You can use Run Now with Different Parameters to re-run a job with different parameters or different values for existing parameters.

  1. Click Blue Down Caret next to Run Now and select Run Now with Different Parameters or, in the Active Runs table, click Run Now with Different Parameters. Enter the new parameters depending on the type of task.

    Run job with different parameters
    • Notebook: You can enter parameters as key value pairs or a JSON object. You can use this dialog to set the values of widgets.
    • JAR and spark-submit: You can enter a list of parameters or a JSON document. The provided parameters are merged with the default parameters for the triggered run. If you delete keys, the default parameters are used. You can also add task parameter variables for the run.
  2. Click Run.

Schedule a job

To define a schedule for the job:

  1. Click Edit schedule in the Job details panel and set the Schedule Type to Scheduled.

  2. Specify the period, starting time, and time zone. Optionally select the Show Cron Syntax checkbox to display and edit the schedule in Quartz Cron Syntax.

    Note

    • Databricks enforces a minimum interval of 10 seconds between subsequent runs triggered by the schedule of a job regardless of the seconds configuration in the cron expression.
    • You can choose a time zone that observes daylight saving time or UTC. If you select a zone that observes daylight saving time, an hourly job will be skipped or may appear to not fire for an hour or two when daylight saving time begins or ends. To run at every hour (absolute time), choose UTC.
    • The job scheduler is not intended for low latency jobs. Due to network or cloud issues, job runs may occasionally be delayed up to several minutes. In these situations, scheduled jobs will run immediately upon service availability.
  3. Click Save

Pause and resume a job schedule

To pause a job, you can either:

  • Click Pause in the Job details panel.
  • Click Edit schedule in the Job details panel and set the Schedule Type to Manual (Paused)

To resume a paused job schedule, set the Schedule Type to Scheduled.

View jobs

Click Jobs Icon Jobs in the sidebar. The Jobs list displays. The Jobs page lists all defined jobs, the cluster definition, the schedule if any, and the result of the last run.

Job list

You can filter jobs in the Jobs list:

  • Using keywords.
  • Selecting only the jobs you own.
  • Selecting all jobs you have permissions to access. Access to this filter requires that Jobs access control is enabled.

By default, jobs are sorted by job name in ascending order. To sort the list of jobs, click any column header.

View job runs

  1. Click Jobs Icon Jobs in the sidebar.
  2. In the Name column, click a job name. The Runs tab shows active runs and completed runs.

Databricks maintains a history of your job runs for up to 60 days. If you need to preserve job runs for longer, Databricks recommends that you export results before they expire. For more information, see Export job run results.

View job run details

The job run details page contains information about the success or failure of each task in the job run. You can access job run details from the Runs tab for the job.

To view job run details from the Runs tab:

  1. Click the View Details link for the run in the Run column of the Completed Runs (past 60 days) table.
  2. Click on a task to view:
    • the cluster that ran the task
    • the Spark UI for the task
    • logs for the task
    • metrics for the task

Export job run results

You can export notebook run results and job run logs for all job types.

Export notebook run results

You can persist job runs by exporting their results. For notebook job runs, you can export a rendered notebook that can later be imported into your Databricks workspace.

  1. On the job detail page, click a job run name in the Run column.

  2. Click the notebook task to export.

  3. Click Export to HTML.

    Export run result

Export job run logs

You can also export the logs for your job run. To automate this process, you can set up your job to automatically deliver logs to DBFS or S3 through the Job API. For more information, see the NewCluster and ClusterLogConf fields in the Job Create API call.

Clone a job

You can quickly create a new job by cloning the configuration of an existing job. Cloning a job creates an identical copy of the job, except for the job ID. To clone a job:

  1. On the Runs tab, click More ….
  2. Select Clone from the dropdown menu.

View JSON

You can view the JSON definition of a job. This can be useful if you want to create a similar job using the Job API. To view the JSON definition:

  1. On the Runs tab, click More ….
  2. Select View JSON from the dropdown menu.

Delete a job

To delete a job:

  1. On the Runs tab, click More ….
  2. Select Delete from the dropdown menu.

Edit a job

Some configuration options are available on the job, and other options are available on individual tasks. For example, the maximum concurrent runs can be set on the job only, while parameters and a cluster must be defined for each task.

To change the configuration for a job:

  1. Click Jobs Icon Jobs in the sidebar.
  2. In the Name column, click the job name.

The side panel displays the Job details. You can change the maximum number of concurrent runs, alerts, and permissions.

Maximum concurrent runs

The maximum number of parallel runs for this job. Databricks skips the run if the job has already reached its maximum number of active runs when attempting to start a new run. Set this value higher than the default of 1 to perform multiple runs of the same job concurrently. This is useful, for example, if you trigger your job on a frequent schedule and want to allow consecutive runs to overlap with each other, or you want to trigger multiple runs that differ by their input parameters.

See Streaming tasks for tips on how to configure maximum concurrent runs for Spark Streaming jobs.

Job alerts

You can add email alerts to send in case of job failure, success, or timeout. You can also opt-out of alerts for skipped job runs.

Control access to jobs

Job access control enables job owners and administrators to grant fine-grained permissions on their jobs. Job owners can choose which other users or groups can view the results of the job. Owners can also choose who can manage their job runs (Run now and Cancel run permissions).

See Jobs access control for details.

Edit a task

To edit a task, select the job containing the task, click the Tasks tab and click on the task you want to edit.

Task dependencies

You can define the order of execution of tasks in a job using the Depends on dropdown. You can set this field to one or more tasks in the job.

Edit task dependencies

Note

Depends on is not visible if the job consists of only a single task.

Configuring task dependencies creates a Directed Acyclic Graph (DAG) of task execution, a common way of representing execution order in job schedulers. For example, consider the following job consisting of four tasks:

Task dependencies example diagram
  • Task 1 is the root task and does not depend on any other task.
  • Task 2 and Task 3 depend on Task 1 completing first.
  • Finally, Task 4 depends on Task 2 and Task 3 completing successfully.

Databricks runs upstream tasks before running downstream tasks, running as many of them in parallel as possible. The following diagram illustrates the order of processing for these tasks:

Task dependencies example flow

Task configuration options

Individual tasks have the following configuration options:

Cluster

To configure the cluster where a task runs, click the Cluster dropdown. To learn more about selecting and configuring clusters to run tasks, see Cluster configuration.

Note

Each task either runs on an all-purpose cluster or in a new job cluster. You cannot use the same job cluster for more than one task in a job.

Dependent libraries

You can configure task dependencies to install dependent libraries on the cluster. You must set all task dependencies to ensure they are installed before the run starts.

To add a dependent library, click Advanced options and select Add Dependent Libraries to open the Add Dependent Library chooser. Follow the recommendations in Library dependencies for specifying dependencies.

Important

If you have configured a library to automatically install on all clusters or you select an existing terminated cluster that has libraries installed, the job execution does not wait for library installation to complete. If a job requires a specific library, you should attach the library to the job in the Dependent Libraries field.

Parameter variables

You can pass templated variables into a job task as part of the task’s parameters. These variables are replaced with the appropriate values when the job task runs. You can use task parameter values to pass the context about a job run, such as the run ID or the job’s start time.

When a job runs, the task parameter variable surrounded by double curly braces is replaced and appended to an optional string value included as part of the value. For example, to pass a parameter named MyJobId with a value of my-job-6 for any run of job ID 6, add the following task parameter:

{
  "MyJobID": "my-job-{{job_id}}"
}

The content between double curly braces is not evaluated as an expression. You can’t specify operations or functions within double-curly braces. Whitespace is not stripped from within double curly braces. For example, {{  job_id  }} is not equivalent to {{job_id}}.

The following table lists the supported task parameter variables:

Variable Description Example value
{{job_id}} The unique identifier assigned to a job. 1276862
{{parent_run_id}} The unique identifier assigned to the run of a job with multiple tasks. 3447835
{{run_id}} The unique identifier assigned to the run of a task. 3447843
{{start_date}} The date a job run started. The format is yyyy-MM-dd in UTC timezone. 2021-02-15
{{start_time}} The timestamp of the run’s start of execution after the cluster is created and ready. The format is milliseconds since UNIX epoch in UTC timezone, as returned by System.currentTimeMillis(). 1551622063030
{{task_retry_count}} The number of retries that have been attempted to run a task if the first attempt fails. The value is 0 for the first attempt and increments with each retry.
{{task_key}} The unique name assigned to a task that’s part of a job with multiple tasks. “clean_raw_data”

You can set these variables with any task when you Create a job, Edit a task, or Run a job with different parameters.

Timeout

The maximum completion time for a task. If the task does not complete in this time, Databricks sets its status to Timed Out. To set the timeout for the task, click Advanced options and select Edit Timeout.

Retries

A policy that determines when and how many times failed tasks are retried. By default, Databricks does not retry a failed task. To set the retries for the task, click Advanced options and select Edit Retry Policy.

Note

If you configure both Timeout and Retries, the timeout applies to each retry.

Alerts

Email alerts can be sent in case of task failure, success, or timeout. To set alerts on a task, click Advanced options and select Edit Email Notifications.

Delete a task

To delete a task, click the Tasks tab, select the task to be deleted, and click Delete Task Button.

Best practices

Cluster configuration

Cluster configuration is important when you operationalize a job. The following provides general guidance on choosing and configuring job clusters, followed by recommendations for specific job and task types.

Choose the best cluster type for your job

  • New Job Clusters are dedicated clusters created and started when you run a task and terminated immediately after the task completes. In production, Databricks recommends using new clusters so that each task runs in a fully isolated environment.
  • When you run a task on a new cluster, the task is treated as a data engineering (task) workload, subject to the task workload pricing. When you run a task on an existing all-purpose cluster, the task is treated as a data analytics (all-purpose) workload, subject to all-purpose workload pricing.
  • If you select a terminated existing cluster and the job owner has Can Restart permission, Databricks starts the cluster when the job is scheduled to run.
  • Existing all-purpose clusters work best for tasks such as updating dashboards at regular intervals.

Use a pool to reduce cluster start times

To decrease new job cluster start time, create a pool and configure the job’s cluster to use the pool.

Automatic availability zones

To take advantage of automatic availability zones (Auto-AZ), you must enable it with the Clusters API, setting awsattributes.zone_id = "auto". See Availability zones.

Notebook jobs

Total notebook cell output (the combined output of all notebook cells) is subject to a 20MB size limit. Additionally, individual cell output is subject to an 8MB size limit. If the output of a notebook or cell exceeds the size limit, the run is canceled and marked as failed.

If you need help finding cells that are near or beyond the limit, run the notebook against an all-purpose cluster and use this notebook autosave technique.

Streaming tasks

Spark Streaming jobs should never have maximum concurrent runs set to greater than 1. Streaming jobs should also be set to run with a cron schedule "* * * * * *" (every second) or every minute. Moreover, retries should never be enabled for streaming jobs.

Since a streaming task runs continuously, it should always be the final task in a job.

JAR jobs

When running a JAR job, keep in mind the following:

Output size limits

Note

Available in Databricks Runtime 6.3 and above.

Task output, such as log output emitted to stdout, is subject to a 20MB size limit. If the total output has a larger size, the run is canceled and marked as failed.

To avoid encountering this limit, you can prevent stdout from being returned from the driver to Databricks by setting the spark.databricks.driver.disableScalaOutput Spark configuration to true. By default the flag value is false. The flag controls cell output for Scala JAR jobs and Scala notebooks. If the flag is enabled, Spark does not return job execution results to the client. The flag does not affect the data that is written in the cluster’s log files. Setting this flag is recommended only for job clusters for JAR jobs, because it will disable notebook results.

Use the shared SparkContext

Because Databricks is a managed service, some code changes may be necessary to ensure that your Apache Spark jobs run correctly. JAR job programs must use the shared SparkContext API to get the SparkContext. Because Databricks initializes the SparkContext, programs that invoke new SparkContext() will fail. To get the SparkContext, use only the shared SparkContext created by Databricks:

val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()

There are also several methods you should avoid when using the shared SparkContext.

  • Do not call SparkContext.stop().
  • Do not call System.exit(0) or sc.stop() at the end of your Main program. This can cause undefined behavior.

Use try-finally blocks for job clean up

Consider a JAR that consists of two parts:

  • jobBody() which contains the main part of the job.
  • jobCleanup() which has to be executed after jobBody() whether that function succeeded or returned an exception.

As an example, jobBody() may create tables, and you can use jobCleanup() to drop these tables.

The safe way to ensure that the clean up method is called is to put a try-finally block in the code:

try {
  jobBody()
} finally {
  jobCleanup()
}

You must not try to clean up using sys.addShutdownHook(jobCleanup) or the following code:

val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)

Due to the way the lifetime of Spark containers is managed in Databricks, the shutdown hooks are not run reliably.

Configure JAR job parameters

You pass parameters to JAR jobs with a JSON string array. For more information, see SparkJarTask. To access these parameters, inspect the String array passed into your main function.

Library dependencies

The Spark driver has certain library dependencies that you cannot override. These libraries take priority over any of your own libraries that conflict with them.

To get the full list of the driver library dependencies, run the following command inside a notebook attached to a cluster of the same Spark version (or the cluster with the driver you want to examine).

%sh
ls /databricks/jars

Manage library dependencies

A good rule of thumb when dealing with library dependencies while creating JARs for jobs is to list Spark and Hadoop as provided dependencies. On Maven, add Spark and/or Hadoop as provided dependencies as shown in the following example.

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>

In sbt, add Spark and Hadoop as provided dependencies as shown in the following example.

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"

Tip

Specify the correct Scala version for your dependencies based on the version you are running.