Concurrent Notebooks(Scala)

Loading...

This notebook illustrates various ways of running notebooks concurrently. To see print outs from the child notebooks, click the Notebook job #nnn links.

A caveat: Outputs from notebooks cannot interact with each other. If you're writing to files/tables, output paths should be parameterized via widgets. This is similar to two users running same notebook on the same cluster at the same time; they should share no state.

The parallel-notebooks notebook defines functions as if they were in a library.

%run "./parallel-notebooks"
import scala.concurrent.{Future, Await} import scala.concurrent.duration._ import scala.util.control.NonFatal
defined class NotebookData parallelNotebooks: (notebooks: Seq[NotebookData])scala.concurrent.Future[Seq[String]] parallelNotebook: (notebook: NotebookData)scala.concurrent.Future[String]

The remaining commands invoke NotebookData functions in various ways.

The first version creates a sequence of notebook, timeout, parameter combinations and passes them into the parallelNotebooks function defined in the parallel notebooks notebook. The parallelNotebooks function prevents overwhelming the driver by limiting the the number of threads that are running.

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
 
val notebooks = Seq(
  NotebookData("testing", 15),
  NotebookData("testing-2", 15, Map("Hello" -> "yes")),
  NotebookData("testing-2", 15, Map("Hello" -> "else")),
  NotebookData("testing-2", 15, Map("Hello" -> "lots of notebooks")),
  NotebookData("testing-2", 1, Map("Hello" -> "parallel")) // fails due to timeout, intentionally
)
 
val res = parallelNotebooks(notebooks)
 
Await.result(res, 30 seconds) // this is a blocking call.
res.value
import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps notebooks: Seq[NotebookData] = List(NotebookData(testing,15,Map()), NotebookData(testing-2,15,Map(Hello -> yes)), NotebookData(testing-2,15,Map(Hello -> else)), NotebookData(testing-2,15,Map(Hello -> lots of notebooks)), NotebookData(testing-2,1,Map(Hello -> parallel))) res: scala.concurrent.Future[Seq[String]] = Success(List(SUCCESS, SUCCESS, SUCCESS, SUCCESS, ERROR: com.databricks.NotebookExecutionException: TIMEDOUT)) res0: Option[scala.util.Try[Seq[String]]] = Some(Success(List(SUCCESS, SUCCESS, SUCCESS, SUCCESS, ERROR: com.databricks.NotebookExecutionException: TIMEDOUT)))

This next version defines individual parallel notebooks. It takes the same parameters as above except does it on an individual basis and you have to specify the result sequence that has to run. That has to be a Future for you to receive.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
/*
Note: this is a bit more manual but achieves the same result as the first version.
*/
val n1 = parallelNotebook(NotebookData("testing", 15))
val n2 = parallelNotebook(NotebookData("testing-2", 15, Map("Hello" -> "yes")))
val res = Future.sequence(List(n1, n2)) 

Await.result(res, 30 minutes) // this blocks
res.value
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps n1: scala.concurrent.Future[String] = Success(SUCCESS) n2: scala.concurrent.Future[String] = Success(SUCCESS) res: scala.concurrent.Future[List[String]] = Success(List(SUCCESS, SUCCESS)) res1: Option[scala.util.Try[List[String]]] = Some(Success(List(SUCCESS, SUCCESS)))

This last version is the most manual way of doing it. You individually define the Futures that need to be returned.

val ctx = dbutils.notebook.getContext()
/*
Note: This version illustrates the structure built into the preceding versions.
*/

val myHappyNotebookTown1 = Future {
  dbutils.notebook.setContext(ctx)
  println("Starting First --")
  val x = dbutils.notebook.run("testing", 15)
  println("Finished First -- ")
  x
}

val myHappyNotebookTown2 = Future {
  dbutils.notebook.setContext(ctx)
  println("Starting Second --")
  val x = dbutils.notebook.run("testing-2", 15, Map("Hello" -> "yes"))
  println("Finished Second -- ")
  x
}

val res = Future.sequence(List(myHappyNotebookTown1, myHappyNotebookTown2))

Await.result(res, 30 minutes) // this blocks
res.value
ctx: com.databricks.backend.common.rpc.CommandContext = CommandContext(...) myHappyNotebookTown1: scala.concurrent.Future[String] = Success(SUCCESS) myHappyNotebookTown2: scala.concurrent.Future[String] = Success(SUCCESS) res: scala.concurrent.Future[List[String]] = Success(List(SUCCESS, SUCCESS)) res2: Option[scala.util.Try[List[String]]] = Some(Success(List(SUCCESS, SUCCESS)))

Each example is the exact same as the others except in terms of what it does automatically for you. They can all be used to achieve the same end result.