DBIOTransactionalCommit(Scala)

Let's first compare the existing Hadoop commit algorithms

Evaluation criteria:

  • Performance: how fast is the protocol at committing files?
  • Transactionality: job output should be made visible transactionally (i.e., all or nothing). If the job fails, readers should not observe corrupt or partial outputs.

First, we look at performance:

// Clear the output directories
spark.range(0).write.mode("overwrite").parquet("/tmp/test-1")
spark.range(0).write.mode("overwrite").parquet("/tmp/test-2")
// Append 10m rows using Hadoop FileOutputCommitter v1
val v1Start = System.currentTimeMillis
spark.range(10e6.toLong)
  .repartition(100).write.mode("append")
  .option("mapreduce.fileoutputcommitter.algorithm.version", "1")
  .parquet("/tmp/test-1")
val v1Time = (System.currentTimeMillis - v1Start) / 1000
v1Start: Long = 1492719587184 v1Time: Long = 112
// Append 10m rows using Hadoop FileOutputCommitter v2
val v2Start = System.currentTimeMillis
spark.range(10e6.toLong)
  .repartition(100).write.mode("append")
  .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
  .parquet("/tmp/test-2")
val v2Time = (System.currentTimeMillis - v2Start) / 1000
v2Start: Long = 1492719700167 v2Time: Long = 25
display(Seq(("Hadoop Commit V1", v1Time), ("Hadoop Commit V2", v2Time)).toDF("algorithm", "time (s)"))
020406080100120Hadoop Commit V1Hadoop Commit V2TOOLTIPalgorithmtime (s)

Now let's compare transactionality by simulating a persistent task failure

// Append more rows using v1
scala.util.Try(spark.range(10000).repartition(7).map { i =>
  if (i == 9999) {
    Thread.sleep(5000)
    throw new RuntimeException("oops!")
  }
  i
}.write.option("mapreduce.fileoutputcommitter.algorithm.version", "1").mode("append").parquet("/tmp/test-1"))
res2: scala.util.Try[Unit] = Failure(org.apache.spark.SparkException: Job aborted.)
// Append more rows using v2
scala.util.Try(spark.range(10000).repartition(7).map { i =>
  if (i == 9999) {
    Thread.sleep(5000)
    throw new RuntimeException("oops!")
  }
  i
}.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2").mode("append").parquet("/tmp/test-2"))
res3: scala.util.Try[Unit] = Failure(org.apache.spark.SparkException: Job aborted.)
val numV1CorruptedRows = spark.read.parquet("/tmp/test-1").count() - 10000000
val numV2CorruptedRows = spark.read.parquet("/tmp/test-2").count() - 10000000
display(Seq(("Hadoop Commit V1", numV1CorruptedRows), ("Hadoop Commit V2", numV2CorruptedRows)).toDF("algorithm", "corrupted rows"))
02,0004,0006,0008,000Hadoop Commit V1Hadoop Commit V2TOOLTIPalgorithmcorrupted rows

DBIO's transactional commit protocol

Can we get both performance and robustness? Yes with DBIO transactional commit:

%sql
-- enable DBIO commit for this session (you can also specify this as a Spark Conf when launching a cluster)
-- DBIO commit will be enabled by default in future Spark versions
set spark.sql.sources.commitProtocolClass=com.databricks.io.CommitProtocol

-- to revert to default protocol run
-- %sql set spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
spark.sql.sources.commitProtocolClasscom.databricks.io.CommitProtocol
spark.range(0).write.mode("overwrite").parquet("/tmp/test-3")
val dbioCommitStart = System.currentTimeMillis

spark.range(10e6.toLong)
  .repartition(100).write.mode("append")
  .parquet("/tmp/test-3")

val dbioCommitTime = (System.currentTimeMillis - dbioCommitStart) / 1000
dbioCommitStart: Long = 1492722300597 dbioCommitTime: Long = 9
scala.util.Try(spark.range(10000).repartition(7).map { i =>
  if (i == 9999) {
    Thread.sleep(5000)
    throw new RuntimeException("oops!")
  }
  i
}.write.mode("append").parquet("/tmp/test-3"))
res5: scala.util.Try[Unit] = Failure(org.apache.spark.SparkException: Job aborted.)
display(Seq(("Hadoop Commit V1", v1Time), ("Hadoop Commit V2", v2Time), ("DBIO Transactional Commit", dbioCommitTime)).toDF("algorithm", "time (s)"))
020406080100120Hadoop Commit V1Hadoop Commit V2DBIO Transactional CommitTOOLTIPalgorithmtime (s)
val dbioCommitCorruptedRows = spark.read.parquet("/tmp/test-3").count() - 10000000
display(Seq(("Hadoop Commit V1", numV1CorruptedRows), ("Hadoop Commit V2", numV2CorruptedRows), ("DBIO Transactional Commit", dbioCommitCorruptedRows)).toDF("algorithm", "corrupted rows"))
02,0004,0006,0008,000Hadoop Commit V1Hadoop Commit V2DBIO Transactional CommitTOOLTIPalgorithmcorrupted rows