Transactional writes to cloud storage with DBIO

The Databricks DBIO package provides transactional writes to cloud storage for Apache Spark jobs. This solves a number of performance and correctness issues that come when Spark is used in a cloud-native setting (for example, writing directly to storage services like S3).


When DBIO transactional commit is enabled, metadata files starting with _started_<id> and _committed_<id> will accompany data files created by AS jobs. Generally you shouldn’t alter these files directly. Rather, use the VACUUM command.

Enable and disable transactional writes

DBIO transactional commit is enabled by default in Databricks Runtime 3.0 and above. You can enable it manually in Databricks Runtime 2.1.1-db4 using the following:


To revert to the legacy Hadoop write protocol, run:

SET spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol

Compatibility with external systems and older Spark versions

Spark versions 2.1-db2+, 2.0-db4+, and 1.6-db2+ have the necessary read-side support for compatibility with DBIO commit. These Spark versions ignore uncommitted output files unconditionally when reads are done through Spark APIs.

Older Spark versions that don’t have this read-side support can still read data as before, but their reads are not transactional and therefore may observe files that are uncommitted or in progress.

Clean up uncommitted files

To clean up uncommitted files left over from Spark jobs, use the VACUUM command to remove them. Normally VACUUM happens automatically after Spark jobs complete, but you can also run it manually if a job is aborted.

For example, VACUUM ... RETAIN 1 HOUR removes uncommitted files older than one hour.


  • Avoid vacuuming with a horizon of less than one hour. It can cause data inconsistency.
  • You cannot use VACUUM directly on cloud storage. To vacuum storage, you must mount it to DBFS and run VACUUM on the mounted directory.

Also see Vacuum.

-- recursively vacuum an output path
VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]

-- vacuum all partitions of a catalog table
// recursively vacuum an output path
spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]")

// vacuum all partitions of a catalog table
spark.sql("VACUUM tableName [RETAIN <N> HOURS]")