Databricks Delta

What is Delta?

Databricks Delta is a transactional storage layer that is designed specifically to harness the power of Apache Spark and cloud storage systems supported by DBFS. Delta stores your data as parquet files in DBFS while also maintaining a transaction log that efficiently and correctly tracks which files are currently part of the table.

You can read and write data stored in Delta using the same familiar Apache Spark SQL batch and streaming APIs that you would use to work with Hive tables or DBFS directories. However, through the addition of the transaction log, Delta is able to add the following additional functionality:

  • ACID Transactions - Multiple writers from different clusters can safely modify the same table without fear of corruption or inconsistency.
  • DELETES/UPDATES/UPSERTS - Transactionally modify existing data without disturbing jobs that are reading the same dataset.
  • Data Validation -Ensure that all data meets specified invariants (for example, NOT NULL) by rejecting invalid data.
  • Automatic File Management - Automatic support for organizing data into large files that can be read efficiently.
  • Statistics and Data Skipping - Track statistics about the data in each file and will avoid reading irrelevant information, speeding-up reads by 10-100x.

Supported Runtime Versions and Current Limitations

Delta requires Databricks Runtime 3.5 or greater. Delta will not work on older Spark versions, including reading from Delta tables.

Attention

Databricks Delta is currently in private preview. Customers must explicitly be approved to use Delta. It is not enabled by default in any customer accounts. Please reach out to your account manager to request access to the private preview.

Concurrent writers from multiple clusters

Warning

Currently in Delta, writes can only occur from one cluster to one Delta table or path at a given time. That means that if you want to write out data from multiple spark jobs to a single path or table, you should do so from a single cluster.

Databricks Runtime 4.0 will lift this restriction and support writers from multiple clusters.

Upgrading Delta from Databricks Runtime 3.4 to 3.5

There are no backwards-incompatible changes. However, in Runtime 3.5 we begin automatically collecting and using data skipping statistics on your behalf. Since 3.4 does not have a way to collect these statistics, writing to tables using both versions 3.4 and 3.5 is discouraged. We recommend using 3.5 as soon as possible.

Other Notable Limitations

Delta does not currently support bucketing.

Case Study: Multi-hop Pipelines with Delta

A common use case is to use Delta to store intermediate and final results in a multi-stage pipeline. Typically these pipelines begin with relatively raw, unprocessed records and become more selective and refined as processing occurs.

The phases of processing are grouped into categories by their level of refinement:

  • Bronze - Raw events with very little transformation. Often contains a “firehose” of records from many different parts of the organization.
  • Silver - Events are cleaned and normalized, sometimes joined with dimension information.
  • Gold - Filtered down and sometimes aggregated, usually related to one particular business objective.
  • Platinum - High-level summaries of key business metrics.

Several features of Delta makes building and running this type of pipeline significantly easier, cheaper, and more effective than using traditional big data tools:

  • Retain large histories of data - Delta can efficiently store years’ worth of data at any stage of the pipeline. Having a long history allows you to fix mistakes by reprocessing old data and also allows you to ask new questions about historical events.
  • Automatically update with streaming - Streaming can efficiently read from one table and write the results to another, reducing the complexity and management overhead for each hop.
  • Query intermediate results - Each step of the process is materialized as an actual table, and can optionally be OPTIMIZED for fast read performance. If an ad-hoc query results in new insights, it can easily be turned into a materialized table by switching execution to streaming.
  • Share intermediate results - When there are multiple downstream jobs that depend on a given computation, a Delta table is a natural forking point. Each downstream transformation can execute against the efficiently columnar encoded table.
  • Backfill and correct with batch - Mistakes are inevitable when working with big data, and there is always old information to consider. Delta supports a wide range of batch operations that you can use to backfill and correct mistakes transactionally without interrupting any new streaming data that is arriving.

Quick Start Example

This quick start example demonstrates the basics of working with Delta.

Note

Remember, Delta is not generally available to all customers and is currently only in Private Preview with select customers. Reach out to your account executive to request participation in the private preview.

Example: Importing JSON data into Delta

In this section we show how to build a pipeline that ETLs JSON data into a Delta table and optimizes the table for fast reads.

We start by creating the table using an initial collection of input data. In general, you can use existing Spark SQL code and simply change the format to “delta” (from “parquet”, “csv”, “json”, etc).

Using DataFrames:

events = spark.read.json("/data/events").
events.write.format("delta").save("/delta/events")

Using SQL:

CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`

These operations create a new table using the schema that was inferred from the JSON data. For the full set of options available when creating a new Delta table, see Writes and Creating tables.

Once data is stored in Delta you can access it either by specifying the path on DBFS (for example dbfs:/data/events) or the table name in the metastore:

Using DataFrames:

events = spark.read.format("delta").load("/data/events")
// or
spark.table("events")

Using SQL:

SELECT * FROM delta.`/data/events`
-- or
SELECT * FROM events

For the full set of options available when reading Delta, see Reads.

As new events arrive, we can atomically append them to the table:

Using DataFrames:

newEvents.write
  .format("delta")
  .mode("append")
  .save("/data/events")// or .saveAsTable("events")

Using SQL:

INSERT INTO events VALUES(...)
-- or
INSERT INTO events SELECT * FROM newEvents

You can also use Structured Streaming to stream new data automatically into the table as it arrives:

events = spark.readStream.json("/data/events")
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json")
  .save("/delta/events")

For more information about Delta’s integration with structured streaming, see Delta Tables - Streaming Reads and Writes.

Once we have been streaming for awhile, we will likely have a lot of small files in the table. If we want to improve the speed of read queries, we can use OPTIMIZE to perform automatic operations like collapsing small files into larger ones:

OPTIMIZE events or optimize ‘/data/events’

You can also specify interesting columns that are often present in query predicates for your workload, and Delta will use this information to cluster together related records:

OPTIMIZE events ZORDER BY eventType, city

For the full set of options available when running OPTIMIZE, see Optimizing Performance and Cost.

Finally, it is important to understand that Delta provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. Eventually you should clean up old snapshots. You can do this by running the vacuum command:

VACUUM events

You can control the age of the latest retained snapshot by using the retain hours option:

VACUUM events RETAIN 2 HOURS

In the extreme, you can set the retention to 0 hours to delete any files that are not currently valid in the table. You can use this approach to force the blobstore into a consistent state with the transaction log, so that external tools can read the data in the table. However, this approach causes any jobs that query an older snapshot to fail, because the files are no longer available to read.

Tips for Porting Existing Workloads to Delta

If you are porting existing pipelines to Delta, you should be aware of the following simplifications and differences compared with the data sources provided by Apache Spark or Apache Hive.

Delta handles the following operations automatically. You should never do them yourself:

  • REFRESH TABLE - Delta tables always return the most up-to-date information, so there is no need to manually call REFRESH TABLE after changes
  • Add/Remove Partitions - Delta automatically tracks the set of partitions that are present in a table and updates the list as data is added or removed. As a result, there is no need to manually run ALTER TABLE ADD/REMOVE PARTITION
  • Loading a Single Partition Quickly - As an optimization, users sometimes directly load the partition of data they are interested in (for example, spark.read.parquet("/data/date=2017-01-01")). With Delta, this is unnecessary, since we can quickly scan the list of files to find the list of relevant ones, using the distributed processing power of Spark. If you are interested in a single partition, specify that using a WHERE clause (for example, spark.read.parquet("/data").where("date = '2017-01-01'")).

When you port an existing application to Delta, you should avoid the following operations, which bypass the transaction log:

  • Manually Modifying Data - Delta uses the transaction log to atomically commit changes to the table. Because the log is the source of truth, files that are written out but not added to the transaction log are not be read by Spark. Similarly, if you manually delete a file, a pointer to the file will still be present in the transaction log. Instead of manually modifying files stored in a Delta table, always use the DML commands that are described in this user guide.
  • External Readers without VACUUM - We believe in open standards, and therefore the data stored in Delta is encoded as parquet files. However, when you read from a Delta table using an external tool, you must first run VACUUM to remove any stale copies of data. Otherwise, the other query engine might read duplicate values.