Tips for Running Streaming Apps inside Databricks

This guide explains some pointers for developing production quality Spark Streaming applications in Databricks Notebooks. Mainly, we will focus on the following issues you may come across while developing these applications:

  • Serialization Issues
  • Checkpoint Recovery Issues

Notebooks allow for a great development environment. Users can iterate on their code as quickly as it takes to run a single cell. Once the development cycle is over, the notebook can easily be transferred into a production workload using “Jobs”.

This development process can become tedious however, more so for Spark Streaming applications. The following section describes tips on how to overcome one of the most common issues users hit when developing Spark Streaming applications: NotSerializableException

Serialization Issues

When developing Spark applications, it is common to hit a stacktrace like below:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
  ...
Caused by: java.io.NotSerializableException

It’s hard to reason about what gets caught in which closure which requires what class to be serializable (or understand the previous sentence!). Here are some guidelines on the best ways to avoid such NotSerializableExecptions:

  • Declare functions inside an Object as much as possible
  • If you need to use SparkContext or SQLContext inside closures (e.g. inside foreachRDD), then use SparkContext.get() and SQLContext.getActiveOrCreate() instead
  • Redefine variables provided to class constructors inside functions

Please refer to the example at the end of this notebook for specific implementation examples.

Checkpointing Recovery Issues

When developing production quality Spark Streaming applications, there is a requirement that stands out, and that is fault tolerance. As a long running application, it is imperative that the Spark Streaming application can pick up from where it left off if failure occurs.

Checkpointing is one of the mechanisms that make Spark Streaming fault tolerant. When checkpointing is enabled, two things happen:

  • The Directed Acyclic Graph (DAG) of all DStream transformations are serialized and stored in reliable storage (DBFS, S3)
  • If a stateful operation is being run (e.g. mapWithState, updateStateByKey), the state is serialized and stored in reliable storage (DBFS, S3) after each batch is processed

Checkpointing can be enabled in your streaming application by supplying a checkpoint directory.

val checkpointDir = ...

def creatingFunc(): StreamingContext = {
   val newSsc = ...                      // create and setup a new StreamingContext

   newSsc.checkpoint(checkpointDir)      // enable checkpointing

   ...
}

// Recreate context from checkpoints info in checkpointDir, or create a new one by calling the function.
val ssc = StreamingContet.getOrCreate(checkpointDir, creatingFunc _)

What you need to make sure is that anything that is used within DStream operations (e.g. transform, foreachRDD, ...) needs to be serializable so that Spark Streaming can store it for driver fault tolerance. Once your DAG is serialized and stored, your application can be restarted on a separate cluster and still work, given there were no code changes.

This is great news for Jobs scheduled with jars. You may schedule retries for your Spark Streaming Job, and in case of failures, your job will be restarted and you may pick up from where you left off. There is a caveat though when working with notebooks. If your notebook job is restarted, you may come across a nasty stacktrace that looks like this:

org.apache.spark.SparkException: Failed to read checkpoint from directory ...
  at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:367)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:862)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getActiveOrCreate$1.apply(StreamingContext.scala:838)
    at org.apache.spark.streaming.StreamingContext$$anonfun$getActiveOrCreate$1.apply(StreamingContext.scala:838)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.streaming.StreamingContext$.getActiveOrCreate(StreamingContext.scala:838)
Caused by: java.io.IOException: java.lang.ClassNotFoundException: line8c9ff88e00d34452b053d892b6d2a6d720.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$4

What is that ugly class that Spark couldn’t find?

Each notebook uses a REPL behind the scenes. This makes any classes, and functions defined to be wrapped inside closures causing the ugly, obscure class name. For example, consider the following code.

dstream.map { x => (x, 1) }

This inline function x => (x, 1) will be compiled by the REPL into a anonymous class and functions with the name $read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$.

These class anonymous classes get serialized and saved into the checkpoint files. Once the cluster is restarted, the new REPL will have a different id causing a different generated class name. Hence, the classes in the checkpoint files cannot be deserialized for recovering the StreamingContext and this causes the ClassNotFoundException.

Solution

How can you workaround this issue? The solution is a special feature available in Databricks Notebooks called Package Cells

package x.y.z

Then, anything inside that cell will be compiled with the given package namespace instead of the ugly anonymous ones! To fix this, you can move the function to a different “package” cell in the same notebook. It will have the following contents.

package example  // whatever package name you want

object MyFunctions {
  def mapFunc(x: String) = (x, 1)
}

This generates the class with the fully qualified name example.MyFunctions. Then the earlier code can be changed to the following.

dstream.map(example.MyFunctions.mapFunc)

This will allow recovery from checkpoint files. Therefore you should: - Move all classes used in your application inside package cells - Define all functions in Objects inside package cells

in order to correctly recover from driver failures. Please take a look at the following example for tips on what to look out for.

Example Spark Streaming Application

Below is a toy example of adding 1 to your stream of integers in a reliable, fault tolerant manner and then visualize them. We will use all the tips and tricks in this notebook to develop and debug our application.

We will use the following receiver to generate toy data for us:

package com.databricks.example

import scala.util.Random

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._

/** This is a dummy receiver that will generate toy data for us. */
class DummySource extends Receiver[(Int, Long)](StorageLevel.MEMORY_AND_DISK_2) {

  /** Start the thread that receives data over a connection */
  def onStart() {
    new Thread("Dummy Source") { override def run() { receive() } }.start()
  }

  def onStop() {  }

  /** Periodically generate a random number from 0 to 9, and the timestamp */
  private def receive() {
    while(!isStopped()) {
      store(Iterator((Random.nextInt(10), System.currentTimeMillis)))
      Thread.sleep(1000)
    }
  }
}

Here’s how you should NOT set up your stream.

package com.databricks.example

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import com.databricks.example._

class AddOneStream(sc: SparkContext, sqlContext: SQLContext, cpDir: String) {

  // THIS WILL CAUSE A NotSerializableException while checkpointing,
  // as it will unintentionally bring sqlContext object and associated non-serializable REPL objects in scope
  import sqlContext.implicits._

  def creatingFunc(): StreamingContext = {

    val batchInterval = Seconds(1)
    val ssc = new StreamingContext(sc, batchInterval)
    ssc.checkpoint(cpDir)

    val stream = ssc.receiverStream(new DummySource())

    // This function WILL CAUSE A NotSerializableException on class AddOneStream while running the job,
    // as they will require `AddOneStream` to be serialized in order to be used inside DStream functions.
    def addOne(value: (Int, Long)): (Int, Long) = {
      (value._1 + 1, value._2)
    }

    stream.map(addOne).window(Minutes(1)).foreachRDD { rdd =>

      // THIS WILL CAUSE A NotSerializableException while checkpointing,
      // as it will unintentionally bring sqlContext object and associated non-serializable REPL objects in scope
      sqlContext.createDataFrame(rdd).toDF("value", "time")
        .withColumn("date", from_unixtime($"time" / 1000))
        .registerTempTable("demo_numbers")
    }

    ssc
  }
}
import com.databricks.example._
import org.apache.spark.streaming._
val cpDir = "dbfs:/home/examples/serialization"

val addOneStream = new AddOneStream(sc, sqlContext, cpDir)
val ssc = StreamingContext.getActiveOrCreate(cpDir, addOneStream.creatingFunc _)
ssc.start()

The previous definition didn’t work due to the following reasons: - The addOne function will require AddOneStream to be serialized, but it’s not Serializable. - We tried to serialize sqlContext within the foreachRDD - import sqlContext.implicits._ was defined within the class, but it was accessed within the foreachRDD, also requiring AddOneStream to be serialized.

All of these problems can be worked around as follows:

package com.databricks.example2

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import com.databricks.example._

class AddOneStream(sc: SparkContext, sqlContext: SQLContext, cpDir: String) {

  // import the functions defined in the object.
  import AddOneStream._

  def creatingFunc(): StreamingContext = {

    val batchInterval = Seconds(1)
    val ssc = new StreamingContext(sc, batchInterval)

    // Set the active SQLContext so that we can access it statically within the foreachRDD
    SQLContext.setActive(sqlContext)

    ssc.checkpoint(cpDir)

    val stream = ssc.receiverStream(new DummySource())

    stream.map(addOne).window(Minutes(1)).foreachRDD { rdd =>

      // Access the SQLContext using getOrCreate
      val _sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      _sqlContext.createDataFrame(rdd).toDF("value", "time")
        .withColumn("date", from_unixtime(col("time") / 1000)) // we could have imported _sqlContext.implicits._ and used $"time"
        .registerTempTable("demo_numbers")
    }

    ssc
  }
}

object AddOneStream {
  def addOne(value: (Int, Long)): (Int, Long) = {
    (value._1 + 1, value._2)
  }
}
import com.databricks.example2._
import org.apache.spark.streaming._

val addOneStream = new AddOneStream(sc, sqlContext, cpDir)
val ssc = StreamingContext.getActiveOrCreate(cpDir, addOneStream.creatingFunc _)
ssc.start()

Feel free to run the following cell with the accomplishment of running a production ready streaming application!

%sql
select * from demo_numbers