Transactional Writes to Cloud Storage with DBIO

New in version 2.1.0-db4.

Databricks’ DBIO package provides transactional writes to cloud storage for Spark jobs. This solves a number of performance and correctness issues that come when Spark is used in a cloud-native setting (e.g. writing directly to storage services like S3).

DBIO transactional commit will be enabled by default starting with Spark 2.2. It can also be manually enabled in Spark 2.1 as follows:

%sql SET spark.sql.sources.commitProtocolClass=com.databricks.io.CommitProtocol

To revert to the legacy Hadoop write protocol, run:

%sql SET spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol

Tip

When DBIO transactional commit is enabled, metadata files starting with _started_<id> and _committed_<id> will accompany data files created by Spark jobs. Generally you shouldn’t alter these files directly. Rather, use the VACUUM command described later in this page.

Compatibility with External Systems and Older Spark Versions

Spark versions 2.1-db2+, 2.0-db4+, and 1.6-db2+ have the necessary read-side support for compatibility with DBIO commit. These Spark versions will ignore uncommitted output files unconditionally when reads are done through Spark APIs.

Older Spark versions that don’t have this read-side support can still read data as before, but their reads are not transactional and therefore may observe files that are uncommitted or in-progress.

To clean up any uncommitted files left over from Spark jobs, we also provide a VACUUM command to remove them. Normally VACUUM happens automatically after Spark jobs complete, but it can also be run manually in case a job is aborted.

For example, VACUUM ... RETAIN 1 HOUR will remove uncommitted files older than one hour.

Warning

It is not recommended to vacuum with a horizon of less than one hour as this may cause data inconsistency.

Usage in SQL,

-- recursively vacuum an output path
%sql VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]

-- vacuums all partitions of a catalog table
%sql VACUUM tableName [RETAIN <N> HOURS]

In other languages,

// recursively vacuum an output path
spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]")

// vacuums all partitions of a catalog table
spark.sql("VACUUM tableName [RETAIN <N> HOURS]")