Event Log Replay(Scala)

Loading...

Enter the path to the cluster event logs in the event_log_path field.

Run the notebook to replay the Apache Spark UI events that are recorded in the logs.

Note: If you are storing event logs on DBFS, the event log path will be similar to this example: dbfs:/cluster-logs/<cluster-name>/eventlog/<cluster-name-cluster-ip>/<log-id>/

%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
 
val eventLogPath = dbutils.widgets.get("event_log_path")
package org.apache.spark.util
 
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
 
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(json)
  }
}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
 
 
/**
 * Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
 * load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
 * all the events a cluser went through.
 */
def replaySparkEvents(pathToEventLogs: String): Unit = {
  val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
  require(eventLogFiles.nonEmpty, "No event logs found at this path")
  val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
  
  val lineIterator = inOrder.iterator.map { file =>
    val path = new Path(file)
    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
    val fileStream = fs.open(path)
    val stream = if (file.endsWith(".gz")) {
      new GZIPInputStream(fileStream)
    } else {
      fileStream
    }
    print("File process:"+file)
    val lines = IOUtils.readLines(stream)
    (stream, lines)
  }
  
  val lines = lineIterator.flatMap(_._2)
  val streams = lineIterator.map(_._1)
  
  val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
  val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
  val listenerBus = sc.listenerBus
  var currentLine: String = null
 
  try {
    while (lines.hasNext) {
      try {
        val entry = lines.next()
        currentLine = entry
        listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
      } catch {
        case e: java.lang.ClassNotFoundException =>
          // Ignore unknown events, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unknown event.
          if (!unrecognizedEvents.contains(e.getMessage)) {
            println(s"Drop unrecognized event: ${e.getMessage}")
            unrecognizedEvents.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case e: UnrecognizedPropertyException =>
          // Ignore unrecognized properties, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unrecognized property.
          if (!unrecognizedProperties.contains(e.getMessage)) {
            println(s"Drop unrecognized property: ${e.getMessage}")
            unrecognizedProperties.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case jpe: JsonParseException =>
          // We can only ignore exception from last line of the file that might be truncated
          // the last entry may not be the very last line in the event log, but we treat it
          // as such in a best effort to replay the given input
          if (lines.hasNext) {
            throw jpe
          } else {
            println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
          }
      }
    }
  } finally {
    streams.foreach(IOUtils.closeQuietly)
  }
}
replaySparkEvents(eventLogPath)