Transactional Writes to Cloud Storage with DBIO

The Databricks DBIO package provides transactional writes to cloud storage for Spark jobs. This solves a number of performance and correctness issues that come when Spark is used in a cloud-native setting (for example, writing directly to storage services like S3).

DBIO transactional commit is enabled by default in Spark 2.2+. You can enable it manually in Spark 2.1 using the following:

%sql SET spark.sql.sources.commitProtocolClass=com.databricks.io.CommitProtocol

To revert to the legacy Hadoop write protocol, run:

%sql SET spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol

Tip

When DBIO transactional commit is enabled, metadata files starting with _started_<id> and _committed_<id> will accompany data files created by Spark jobs. Generally you shouldn’t alter these files directly. Rather, use the VACUUM command described below.

Compatibility with external systems and older Spark versions

Spark versions 2.1-db2+, 2.0-db4+, and 1.6-db2+ have the necessary read-side support for compatibility with DBIO commit. These Spark versions ignore uncommitted output files unconditionally when reads are done through Spark APIs.

Older Spark versions that don’t have this read-side support can still read data as before, but their reads are not transactional and therefore may observe files that are uncommitted or in progress.

Clean up uncommitted files

To clean up uncommitted files left over from Spark jobs, use the VACUUM command to remove them. Normally VACUUM happens automatically after Spark jobs complete, but you can also run it manually if a job is aborted.

For example, VACUUM ... RETAIN 1 HOUR removes uncommitted files older than one hour.

Important

  • Avoid vacuuming with a horizon of less than one hour. It can cause data inconsistency.
  • You cannot use VACUUM directly on cloud storage. To VACUUM storage, you must mount it to DBFS and run VACUUM on the mounted directory.

SQL:

-- recursively vacuum an output path
%sql VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]

-- vacuums all partitions of a catalog table
%sql VACUUM tableName [RETAIN <N> HOURS]

Other languages:

// recursively vacuum an output path
spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]")

// vacuums all partitions of a catalog table
spark.sql("VACUUM tableName [RETAIN <N> HOURS]")