A Spark 2.0.0 Cluster Takes a Longer Time to Append DataΒΆ

If you find that a cluster using Spark 2.0.0 version takes a longer time to append data to an existing dataset and in particular, all of Spark jobs have finished, but your command has not finished, it is because driver node is moving the output files of tasks from the job temporary directory to the final destination one-by-one, which is slow with S3. To resolve this issue, please set mapreduce.fileoutputcommitter.algorithm.version to 2. Please note that this issue does not affect overwriting a dataset or writing data to a new location.

Note

Starting from Spark 2.0.1-db1, the default value of mapreduce.fileoutputcommitter.algorithm.version is 2 in Databricks. If you are using Spark 2.0.0 version, please manually set this config if you are experience this slowness issue.

How to confirm if I am experiencing this issue?

You can confirm if you are experiencing this issue by checking the following things:

  1. All of your spark jobs have finished and your cell has not finished. The progress bar should look like
../../../_images/progress-bar.png
  1. The thread dump of the driver (you can find it on the executor page of the Spark UI) shows that there is a thread spending a long time inside the commitJob method of FileOutputCommitter class.

How to set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2?

You can set this config by using any of the following methods:

  1. When you launch your cluster, you can put spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 in the Spark config.
  2. In your notebook, you can run %sql set mapreduce.fileoutputcommitter.algorithm.version=2 or spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2") (spark is a SparkSession object provided with Databricks notebooks).
  3. When you write data using Dataset API, you can set it in the option, i.e. dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2").

What is the cause?

When Spark appends data to an existing dataset, Spark uses FileOutputCommitter to manage staging output files and final output files. The behavior of FileOutputCommitter has direct impact on the performance of jobs that write data.

A FileOutputCommitter has two methods, commitTask and commitJob. Apache Spark 2.0 and higher versions use Apache Hadoop 2, which uses the value of mapreduce.fileoutputcommitter.algorithm.version to control how commitTask and commitJob work. In Hadoop 2, the default value of mapreduce.fileoutputcommitter.algorithm.version is 1. For this version, commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data to from job temporary directory the final destination [1]. Because the driver is doing the work of commitJob, for S3, this operation can take a long time. A user may often think that his/her cell is “hanging”. However, when the value of mapreduce.fileoutputcommitter.algorithm.version is 2, commitTask will move data generated by a task directly to the final destination and commitJob is basically a no-op.

[1]https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java#L360-L364