// Clear the output directories spark.range(0).write.mode("overwrite").parquet("/tmp/test-1") spark.range(0).write.mode("overwrite").parquet("/tmp/test-2")
// Append 10m rows using Hadoop FileOutputCommitter v1 val v1Start = System.currentTimeMillis spark.range(10e6.toLong) .repartition(100).write.mode("append") .option("mapreduce.fileoutputcommitter.algorithm.version", "1") .parquet("/tmp/test-1") val v1Time = (System.currentTimeMillis - v1Start) / 1000
v1Start: Long = 1492719587184
v1Time: Long = 112
// Append 10m rows using Hadoop FileOutputCommitter v2 val v2Start = System.currentTimeMillis spark.range(10e6.toLong) .repartition(100).write.mode("append") .option("mapreduce.fileoutputcommitter.algorithm.version", "2") .parquet("/tmp/test-2") val v2Time = (System.currentTimeMillis - v2Start) / 1000
v2Start: Long = 1492719700167
v2Time: Long = 25
display(Seq(("Hadoop Commit V1", v1Time), ("Hadoop Commit V2", v2Time)).toDF("algorithm", "time (s)"))
// Append more rows using v1 scala.util.Try(spark.range(10000).repartition(7).map { i => if (i == 9999) { Thread.sleep(5000) throw new RuntimeException("oops!") } i }.write.option("mapreduce.fileoutputcommitter.algorithm.version", "1").mode("append").parquet("/tmp/test-1"))
res2: scala.util.Try[Unit] = Failure(org.apache.spark.SparkException: Job aborted.)
// Append more rows using v2 scala.util.Try(spark.range(10000).repartition(7).map { i => if (i == 9999) { Thread.sleep(5000) throw new RuntimeException("oops!") } i }.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2").mode("append").parquet("/tmp/test-2"))
res3: scala.util.Try[Unit] = Failure(org.apache.spark.SparkException: Job aborted.)
val numV1CorruptedRows = spark.read.parquet("/tmp/test-1").count() - 10000000 val numV2CorruptedRows = spark.read.parquet("/tmp/test-2").count() - 10000000 display(Seq(("Hadoop Commit V1", numV1CorruptedRows), ("Hadoop Commit V2", numV2CorruptedRows)).toDF("algorithm", "corrupted rows"))
%sql -- enable DBIO commit for this session (you can also specify this as a Spark Conf when launching a cluster) -- DBIO commit will be enabled by default in future Spark versions set spark.sql.sources.commitProtocolClass=com.databricks.io.CommitProtocol -- to revert to default protocol run -- %sql set spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
spark.range(0).write.mode("overwrite").parquet("/tmp/test-3") val dbioCommitStart = System.currentTimeMillis spark.range(10e6.toLong) .repartition(100).write.mode("append") .parquet("/tmp/test-3") val dbioCommitTime = (System.currentTimeMillis - dbioCommitStart) / 1000
dbioCommitStart: Long = 1492722300597
dbioCommitTime: Long = 9
scala.util.Try(spark.range(10000).repartition(7).map { i => if (i == 9999) { Thread.sleep(5000) throw new RuntimeException("oops!") } i }.write.mode("append").parquet("/tmp/test-3"))
res5: scala.util.Try[Unit] = Failure(org.apache.spark.SparkException: Job aborted.)
display(Seq(("Hadoop Commit V1", v1Time), ("Hadoop Commit V2", v2Time), ("DBIO Transactional Commit", dbioCommitTime)).toDF("algorithm", "time (s)"))
Let's first compare the existing Hadoop commit algorithms
Evaluation criteria:
First, we look at performance:
Last refresh: Never