Notebook Workflows

Overview

The Run a notebook from another notebook command allows users to include other Databricks notebooks within a current user’s notebook. This functionality has been good for users to concatenate various notebooks that represent key ETL steps, Spark analysis steps or ad-hoc exploration. However, it has lacked the ability for users to use the Databricks platform to build more complex data pipelines.

Notebook workflows are a complement to Run a notebook from another notebook because they support return values from a notebook. This ability allows users to build complex workflows and pipelines with dependencies easily. Users can then properly parameterize runs (e.g. get a list of files in a directory and pass the names to another notebook - something that’s not possible today with %run) and also create if/then/else workflows based on return values. Notebook workflows are available in Scala, Python, and R languages and require Spark 1.4 or later releases. Notebook workflows support calling other notebooks via relative paths and both parameters and return values must be strings.

Go to Advanced usage - returning structured data (Scala) to learn how to return structured data from the API.

Usage and APIs

The following APIs are available in dbutils.notebook to build pipelines using Notebook workflows: run and exit. To get basic information about them, you can run dbutils.notebook.help().

  • run(path: String, timeoutSeconds: int, arguments: Map): String

This method runs a notebook and returns its exit value (it kicks of an ephemeral job that runs immediately). The timeoutSeconds parameter controls the timeout of the run (0 means no timeout), i.e. the call to run will throw an exception if it doesn’t finish within the specified time.

Note

Currently, if the Databricks web application is down for more than 10 minutes, the notebook run will fail regardless of timeoutSeconds.

Here are example usages in Scala and Python.

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

The arguments parameter sets widget values of the target notebook. This allows you to build simple workflows that call the same notebook with different options each time. Specifically, if the notebook you are running has a widget named A, and you pass a key-value pair ("A": "B") as part of the arguments parameter to the run() call, then retrieving the value of widget A will return "B". You can find the instructions for creating and working with widgets on the Widgets documentation page.

For example, suppose we have a notebook named workflows with a single widget named foo that prints its value.

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

Then running dbutils.notebook.run("workflows", 60, {"foo": "bar"}) will produce the following result:

NotebookWorkflowWithWidget

As you can see, the widget had the value we passed in rather than the default.

  • exit(value: String): void

This method lets you exit a notebook with a value. If you call the notebook by using the run method, this is the value we will return.

dbutils.notebook.exit("returnValue")

Note

Calling dbutils.notebook.exit in a job will cause the notebook to complete successfully. If you want to cause the job to fail, throw an exception instead.

Workflow example

In the following example, users can pass arguments to DataImportNotebook and run different notebooks (DataCleaningNotebook or ErrorHandlingNotebook) based on the result from DataImportNotebook.

NotebookWorkflow

When the notebook workflow runs, users will see a link to the running notebook as displayed in the screen below.

NotebookWorkflowRun

Click on the notebook link ‘Notebook job #1298’ to view the details of the run as shown below.

NotebookWorkflowRunResult

Advanced usage - returning structured data (Scala)

// Example 1 - returning data through temporary tables.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary table.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

Advanced usage - error handling (Scala)

// Errors in workflows thrown a WorkflowException. For now, WorkflowException provides a simple
// message of why the run failed. We are evaluating a more comprehensive errors API - please
// file/upvote a feature request at feedback.databricks.com.
import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Advanced usage - returning structured data (Python)

%python

# Example 1 - returning data through temporary tables.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary table.

## In callee notebook
sqlContext.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
%python

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
sqlContext.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
%python

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print json.loads(result)

Advanced usage - error handling (Python)

%python

# Errors in workflows thrown a WorkflowException. For now, WorkflowException provides a simple
# message of why the run failed. We are evaluating a more comprehensive errors API - please
# file/upvote a feature request at feedback.databricks.com.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print "Retrying error", e
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Advanced usage - running multiple notebooks concurrently

Import this Databricks archive to be able to run the above demo. Place them in the same folder in order to be able to run the notebooks. The provided code is in Scala however one could easily write the same in python.

It is possible to run multiple notebooks at the same time by using standard Scala and Python constructs such as Threads (scala, python) and Futures (scala, python). If you require better API support for concurrency, please file/upvote a feature request at feedback.databricks.com.