Transactional Writes to Cloud Storage with DBIO¶
The Databricks DBIO package provides transactional writes to cloud storage for Spark jobs. This solves a number of performance and correctness issues that come when Spark is used in a cloud-native setting (for example, writing directly to storage services like S3).
DBIO transactional commit is enabled by default in Spark 2.2+. You can enable it manually in Spark 2.1 using the following:
%sql SET spark.sql.sources.commitProtocolClass=com.databricks.io.CommitProtocol
To revert to the legacy Hadoop write protocol, run:
%sql SET spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
When DBIO transactional commit is enabled, metadata files starting with
_committed_<id> will accompany data files created by Spark jobs. Generally you shouldn’t alter these files directly. Rather, use the
VACUUM command described below.
Compatibility with external systems and older Spark versions¶
1.6-db2+ have the necessary read-side support for compatibility with DBIO commit. These Spark versions ignore uncommitted output files unconditionally when reads are done through Spark APIs.
Older Spark versions that don’t have this read-side support can still read data as before, but their reads are not transactional and therefore may observe files that are uncommitted or in progress.
Clean up uncommitted files¶
To clean up uncommitted files left over from Spark jobs, use the
VACUUM command to remove them. Normally
VACUUM happens automatically after Spark jobs complete, but you can also run it manually if a job is aborted.
VACUUM ... RETAIN 1 HOUR removes uncommitted files older than one hour.
- Avoid vacuuming with a horizon of less than one hour. It can cause data inconsistency.
- You cannot use
VACUUMdirectly on cloud storage. To
VACUUMstorage, you must mount it to DBFS and run
VACUUMon the mounted directory.
-- recursively vacuum an output path %sql VACUUM '/path/to/output/directory' [RETAIN <N> HOURS] -- vacuums all partitions of a catalog table %sql VACUUM tableName [RETAIN <N> HOURS]
// recursively vacuum an output path spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]") // vacuums all partitions of a catalog table spark.sql("VACUUM tableName [RETAIN <N> HOURS]")