Notebook Workflows

The %run command allows you to include another notebook within a notebook. This command lets you concatenate various notebooks that represent key ETL steps, Spark analysis steps, or ad-hoc exploration. However, it lacks the ability to build more complex data pipelines.

Notebook workflows are a complement to %run because they let you return values from a notebook. This allows you to easily build complex workflows and pipelines with dependencies. You can properly parameterize runs (for example, get a list of files in a directory and pass the names to another notebook – something that’s not possible with %run) and also create if/then/else workflows based on return values. Notebook workflows allow you to call other notebooks via relative paths.

You implement notebook workflows with dbutils.notebook methods. These methods, like all of the dbutils APIs, are available only in Scala and Python. However, you can use dbutils.notebook.run to invoke an R notebook.

API

The methods available in the dbutils.notebook API to build notebook workflows are: run and exit. Both parameters and return values must be strings.

run(path: String, timeoutSeconds: int, arguments: Map): String

Run a notebook and return its exit value. The method starts an ephemeral job that runs immediately. The timeoutSeconds parameter controls the timeout of the run (0 means no timeout): the call to run throws an exception if it doesn’t finish within the specified time. The arguments parameter sets widget values of the target notebook. Specifically, if the notebook you are running has a widget named A, and you pass a key-value pair ("A": "B") as part of the arguments parameter to the run() call, then retrieving the value of widget A will return "B". You can find the instructions for creating and working with widgets in the Widgets topic.

Note

If Databricks is down for more than 10 minutes, the notebook run fails regardless of timeoutSeconds.

Usage

Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Example

Suppose you have a notebook named workflows with a widget named foo that prints the widget’s value:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

Running dbutils.notebook.run("workflows", 60, {"foo": "bar"}) produces the following result:

NotebookWorkflowWithWidget

The widget had the value you passed in through the workflow, "bar", rather than the default.

exit(value: String): void

Exit a notebook with a value. If you call a notebook using the run method, this is the value returned.

dbutils.notebook.exit("returnValue")

Note

Calling dbutils.notebook.exit in a job causes the notebook to complete successfully. If you want to cause the job to fail, throw an exception.

Example

In the following example, you pass arguments to DataImportNotebook and run different notebooks (DataCleaningNotebook or ErrorHandlingNotebook) based on the result from DataImportNotebook.

NotebookWorkflow

When the notebook workflow runs, you see a link to the running notebook:

NotebookWorkflowRun

Click the notebook link Notebook job #xxxx to view the details of the run:

NotebookWorkflowRunResult

Pass structured data

This section illustrates how to pass structured data between notebooks.

Scala
// Example 1 - returning data through temporary tables.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary table.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
Python
%python

# Example 1 - returning data through temporary tables.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary table.

## In callee notebook
sqlContext.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
%python

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
sqlContext.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
%python

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print json.loads(result)

Handle errors

This section illustrates how to handle errors in notebook workflows.

Scala
// Errors in workflows thrown a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
Python
%python

# Errors in workflows thrown a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print "Retrying error", e
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Run multiple notebooks concurrently

You can run multiple notebooks at the same time by using standard Scala and Python constructs such as Threads (Scala, Python) and Futures (Scala, Python). The advanced notebook workflow notebooks demonstrate how to use these constructs. The notebooks are in Scala but you could easily write the equivalent in Python. To run the example:

  1. Download the notebook archive.
  2. Import the archive into a workspace.
  3. Run the Concurrent Notebooks notebook.