Databricks Delta is in Private Preview. Contact your account manager to request access to the Private Preview.
Databricks Delta is a transactional storage layer designed specifically to harness the power of Apache Spark and Databricks DBFS. Databricks Delta stores your data as Parquet files in DBFS and maintains a complete transaction log that efficiently tracks changes to the files.
You can read and write data stored in Delta using the same familiar Apache Spark SQL batch and streaming APIs that you use to work with Hive tables or DBFS directories. However, through the addition of the transaction log and other enhancements, Databricks Delta provides the following functionality:
- ACID Transactions - Multiple writers can simultaneously modify a dataset and see consistent views.
- DELETES/UPDATES/UPSERTS - Writers can modify a dataset without interfering with jobs reading the dataset.
- Data Validation - Ensures that data meets specified invariants (for example,
NOT NULL) by rejecting invalid data.
- Automatic File Management - Speeds up data access by organizing data into large files that can be read efficiently.
- Statistics and Data Skipping - Speeds up reads by 10-100x by tracking statistics about the data in each file and avoiding reading irrelevant information.
Databricks Delta requires Databricks Runtime 4.0 and above. Databricks Delta does not work with older Databricks Runtime versions.
Concurrent writers from multiple clusters
Writes can occur only from one cluster to one Databricks Delta table or path at a given time. If you want to write data from multiple Spark jobs to a single path or table, you should do so only from a single cluster. The result of writing from multiple clusters is undefined.
Bucketing is not supported.
User-specified schemas are not supported. A command such as
A common use case is to store both the intermediate and final results of a multi-stage pipeline in Databricks Delta. Typically these pipelines begin with relatively raw, unprocessed records and become more selective and refined as processing occurs.
The phases of processing are grouped into categories by their level of refinement:
- Bronze - Raw events with very little transformation. Often contains a “firehose” of records from many different parts of the organization.
- Silver - Events are cleaned and normalized, sometimes joined with dimension information.
- Gold - Filtered down and sometimes aggregated, usually related to one particular business objective.
- Platinum - High-level summaries of key business metrics.
Several features of Databricks Delta makes building and running this type of pipeline significantly easier, cheaper, and more effective than using traditional tools:
- Retain large histories of data - Databricks Delta can efficiently store years worth of data at any stage of the pipeline. Having a long history allows you to fix mistakes by reprocessing old data and also allows you to ask new questions about historical events.
- Automatically update with streaming - Streaming can efficiently read from one table and write the results to another, reducing the complexity and management overhead for each hop.
- Query intermediate results - Each step of the process is materialized as an actual table, and can optionally be optimized for fast read performance. If an ad-hoc query results in new insights, it can easily be turned into a materialized table by switching execution to streaming.
- Share intermediate results - When there are multiple downstream jobs that depend on a given computation, a Databricks Delta table is a natural forking point. Each downstream transformation can run against the efficiently columnar encoded table.
- Backfill and correct with batch - Databricks Delta supports a wide range of batch operations that you can use to backfill and correct mistakes transactionally without interrupting any new streaming data that is arriving.
This quick start example demonstrates the basics of working with Databricks Delta. This section shows how to build a pipeline that reads JSON data into a Databricks Delta table and optimizes the table for fast reads.
Create a table¶
Create the table from a dataset. You can use existing Spark SQL code and simply change the format from
json, etc., to
events = spark.read.json("/data/events"). events.write.format("delta").save("/delta/events")
CREATE TABLE events USING delta AS SELECT * FROM json.`/data/events/`
These operations create a new table using the schema that was inferred from the JSON data. For the full set of options available when creating a new Databricks Delta table, see Create a table and Write.
Read the table¶
Once data is stored in Databricks Delta you access it either by specifying the path on DBFS (for example
dbfs:/data/events) or the table name in the metastore:
events = spark.read.format("delta").load("/data/events") // or spark.table("events")
SELECT * FROM delta.`/data/events` -- or SELECT * FROM events
For the full set of options available when reading Databricks Delta, see Read.
Append data to the table¶
As new events arrive, you can atomically append them to the table:
newEvents.write .format("delta") .mode("append") .save("/data/events")// or .saveAsTable("events")
INSERT INTO events VALUES(...) -- or INSERT INTO events SELECT * FROM newEvents
Stream data into the table¶
You can also use Structured Streaming to stream new data automatically into the table as it arrives:
events = spark.readStream.json("/data/events") events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json") .start("/delta/events")
For more information about Databricks Delta integration with Structured Streaming, see Databricks Delta Tables - Streaming Reads and Writes.
Optimize the table¶
Once you have been streaming for awhile, you will likely have a lot of small files in the table. If you want to improve the speed of read queries, you can use
OPTIMIZE to perform automatic operations like collapsing small files into larger ones:
OPTIMIZE events or optimize ‘/data/events’
You can also specify interesting columns that are often present in query predicates for your workload, and Databricks Delta uses this information to cluster related records together:
OPTIMIZE events ZORDER BY eventType, city
For the full set of options available when running
OPTIMIZE, see Optimizing Performance and Cost.
Clean up snapshots¶
Finally, it is important to understand that Databricks Delta provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually you should clean up old snapshots. You can do this by running the
You can control the age of the latest retained snapshot by using the
RETAIN <N> HOURS option:
VACUUM events RETAIN 24 HOURS
For details on how to use
VACUUM effectively, see Garbage collection.
If you are porting existing pipelines to Databricks Delta, you should be aware of the following simplifications and differences compared with the data sources provided by Apache Spark or Apache Hive.
Databricks Delta handles the following operations automatically, which you should never perform manually:
- REFRESH TABLE - Databricks Delta tables always return the most up-to-date information, so there is no need to manually call
REFRESH TABLEafter changes.
- Add/Remove Partitions - Databricks Delta automatically tracks the set of partitions that are present in a table and updates the list as data is added or removed. As a result, there is no need to manually run
ALTER TABLE, or
- Loading a Single Partition Quickly - As an optimization, users sometimes directly load the partition of data they are interested in (for example,
spark.read.parquet("/data/date=2017-01-01")). With Databricks Delta, this is unnecessary, since we can quickly scan the list of files to find the list of relevant ones, using the distributed processing power of Spark. If you are interested in a single partition, specify it using a
WHEREclause (for example,
spark.read.parquet("/data").where("date = '2017-01-01'")).
When you port an existing application to Databricks Delta, you should avoid the following operations, which bypass the transaction log:
- Manually Modifying Data - Databricks Delta uses the transaction log to atomically commit changes to the table. Because the log is the source of truth, files that are written out but not added to the transaction log are not read by Spark. Similarly, if you manually delete a file, a pointer to the file is still present in the transaction log. Instead of manually modifying files stored in a Databricks Delta table, always use the DML commands that are described in this user guide.
- External Readers without VACUUM - The data stored in Databricks Delta is encoded as Parquet files. However, when you read from a Databricks Delta table using an external tool, you must first run
VACUUMto remove any stale copies of data. Otherwise, the other query engine might read duplicate values.
- How does Databricks Delta compare to Hive SerDe tables?
There are several Hive SerDe configuration parameters that Databricks Delta always manages on your behalf; you should never specify them manually:
- Does Databricks Delta support multi-table transactions?
- Databricks Delta does not support multi-table transactions and foreign keys. Databricks Delta supports transactions at the table level.
- Does Databricks Delta support writes or reads using the Spark Streaming DStream API?
- Databricks Delta does not support the DStream API. We recommend Structured Streaming.