import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
def replaySparkEvents(pathToEventLogs: String): Unit = {
val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
require(eventLogFiles.nonEmpty, "No event logs found at this path")
val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
val lineIterator = inOrder.iterator.map { file =>
val path = new Path(file)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
val fileStream = fs.open(path)
val stream = if (file.endsWith(".gz")) {
new GZIPInputStream(fileStream)
} else {
fileStream
}
print("File process:"+file)
val lines = IOUtils.readLines(stream)
(stream, lines)
}
val lines = lineIterator.flatMap(_._2)
val streams = lineIterator.map(_._1)
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
val listenerBus = sc.listenerBus
var currentLine: String = null
try {
while (lines.hasNext) {
try {
val entry = lines.next()
currentLine = entry
listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: java.lang.ClassNotFoundException =>
if (!unrecognizedEvents.contains(e.getMessage)) {
println(s"Drop unrecognized event: ${e.getMessage}")
unrecognizedEvents.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case e: UnrecognizedPropertyException =>
if (!unrecognizedProperties.contains(e.getMessage)) {
println(s"Drop unrecognized property: ${e.getMessage}")
unrecognizedProperties.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case jpe: JsonParseException =>
if (lines.hasNext) {
throw jpe
} else {
println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
}
}
}
} finally {
streams.foreach(IOUtils.closeQuietly)
}
}
Enter the path to the cluster event logs in the event_log_path field.
Run the notebook to replay the Apache Spark UI events that are recorded in the logs.
Note: If you are storing event logs on DBFS, the event log path will be similar to this example:
dbfs:/cluster-logs/<cluster-name>/eventlog/<cluster-name-cluster-ip>/<log-id>/