Databricks Delta

Note

Databricks Delta is in Private Preview. Contact your account manager or go to https://databricks.com/product/databricks-delta to request access.

What is Databricks Delta?

Databricks Delta is a transactional storage layer designed specifically to harness the power of Apache Spark and Databricks DBFS. The core abstraction of Databricks Delta is a Databricks Delta table which is an optimized Spark table that stores your data as Parquet files in DBFS and maintains a transaction log that efficiently tracks changes to the table.

You can read and write data stored in Databricks Delta using the same familiar Apache Spark SQL batch and streaming APIs that you use to work with Hive tables or DBFS directories. However, through the addition of the transaction log and other enhancements, Databricks Delta provides the following functionality:

  • ACID transactions - Multiple writers can simultaneously modify a dataset and see consistent views.
  • DELETES/UPDATES/UPSERTS - Writers can modify a dataset without interfering with jobs reading the dataset.
  • Automatic file management - Speeds up data access by organizing data into large files that can be read efficiently.
  • Statistics and data skipping - Speeds up reads by 10-100x by tracking statistics about the data in each file and avoiding reading irrelevant information.

Supported runtime versions

Databricks Delta requires Databricks Runtime 4.1 and above. Tables created using Databricks Runtime versions lower than 4.1 must be upgraded. To upgrade an existing table, first upgrade all jobs that are writing to the table. Then run:

%scala com.databricks.delta.Delta.upgradeTableProtocol("</path/to/table>" or "<tableName>")

Case study: multi-hop pipelines

A common use case is to store both the intermediate and final results of a multi-stage pipeline in Databricks Delta. Typically these pipelines begin with relatively raw, unprocessed records and become more selective and refined as processing occurs.

The phases of processing are grouped into categories by their level of refinement:

  • Bronze - Raw events with very little transformation. Often contains a “firehose” of records from many different parts of the organization.
  • Silver - Events are cleaned and normalized, sometimes joined with dimension information.
  • Gold - Filtered down and sometimes aggregated, usually related to one particular business objective.
  • Platinum - High-level summaries of key business metrics.

Several features of Databricks Delta makes building and running this type of pipeline significantly easier, cheaper, and more effective than using traditional tools:

  • Retain large histories of data - Databricks Delta can efficiently store years worth of data at any stage of the pipeline. Having a long history allows you to fix mistakes by reprocessing old data and also allows you to ask new questions about historical events.
  • Automatically update with streaming - Streaming can efficiently read from one table and write the results to another, reducing the complexity and management overhead for each hop.
  • Query intermediate results - Each step of the process is materialized as an actual table, and can optionally be optimized for fast read performance. If an ad-hoc query results in new insights, it can easily be turned into a materialized table by switching execution to streaming.
  • Share intermediate results - When there are multiple downstream jobs that depend on a given computation, a Databricks Delta table is a natural forking point. Each downstream transformation can run against the efficiently columnar encoded table.
  • Backfill and correct with batch - Databricks Delta supports a wide range of batch operations that you can use to backfill and correct mistakes transactionally without interrupting any new streaming data that is arriving.

Quick start example: importing JSON data

This quick start example demonstrates the basics of working with Databricks Delta. This section shows how to build a pipeline that reads JSON data into a Databricks Delta table and optimizes the table for fast reads.

Create a table

Create a table from a dataset. You can use existing Spark SQL code and simply change the format from parquet, csv, json, and so on, to delta.

  • Using DataFrames:

    events = spark.read.json("/data/events")
    events.write.format("delta").save("/data/events")
    
  • Using SQL:

    CREATE TABLE events
    USING delta
    AS SELECT *
    FROM json.'/data/events/'
    

These operations create a new table using the schema that was inferred from the JSON data. For the full set of options available when creating a new Databricks Delta table, see Create a table and Write to a table.

Read a table

Once data is stored in Databricks Delta, you access it either by specifying the path on DBFS (for example, dbfs:/data/events) or the table name:

  • Using DataFrames:

    events = spark.read.format("delta").load("/data/events")
    

    or

    events = spark.table("events")
    
  • Using SQL:

    SELECT * FROM delta.`/data/events`
    

    or

    SELECT * FROM events
    

Append data to a table

As new events arrive, you can atomically append them to the table:

  • Using DataFrames:

    newEvents.write
      .format("delta")
      .mode("append")
      .save("/data/events")
    

    or

    .saveAsTable("events")
    
  • Using SQL:

    INSERT INTO events VALUES(...)
    

    or

    INSERT INTO events SELECT * FROM newEvents
    

Stream data into a table

You can also use Structured Streaming to stream new data automatically into the table as it arrives:

events = spark.readStream.json("/data/events")
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json")
  .start("/delta/events")

For more information about Databricks Delta integration with Structured Streaming, see Table Streaming Reads and Writes.

Optimize a table

Once you have been streaming for awhile, you will likely have a lot of small files in the table. If you want to improve the speed of read queries, you can use OPTIMIZE to perform automatic operations like collapsing small files into larger ones:

OPTIMIZE events

or

OPTIMIZE delta.`/data/events`

You can also specify interesting columns that are often present in query predicates for your workload, and Databricks Delta uses this information to cluster related records together:

OPTIMIZE events ZORDER BY eventType, city

For the full set of options available when running OPTIMIZE, see Optimizing Performance and Cost.

Clean up snapshots

Databricks Delta provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. Eventually you should clean up old snapshots. You can do this by running the VACUUM command:

VACUUM events

You control the age of the latest retained snapshot by using the RETAIN <N> HOURS option:

VACUUM events RETAIN 24 HOURS

For details on how to use VACUUM effectively, see Garbage collection.

Port existing workloads to Databricks Delta

When you port existing workloads to Databricks Delta, you should be aware of the following simplifications and differences compared with the data sources provided by Apache Spark and Apache Hive.

Databricks Delta handles the following operations automatically, which you should never perform manually:

  • REFRESH TABLE - Databricks Delta tables always return the most up-to-date information, so there is no need to manually call REFRESH TABLE after changes.
  • Add or remove partitions - Databricks Delta automatically tracks the set of partitions present in a table and updates the list as data is added or removed. As a result, there is no need to manually run ALTER TABLE [ADD|DROP] PARTITION or MSCK.
  • Load a single partition quickly - As an optimization, users sometimes directly load the partition of data they are interested in. For example, spark.read.parquet("/data/date=2017-01-01"). With Databricks Delta, this is unnecessary, since it can quickly scan the list of files to find the list of relevant ones. If you are interested in a single partition, specify it using a WHERE clause. For example, spark.read.parquet("/data").where("date = '2017-01-01'").

When you port an existing application to Databricks Delta, you should avoid the following operations, which bypass the transaction log:

  • Manually modify data - Databricks Delta uses the transaction log to atomically commit changes to the table. Because the log is the source of truth, files that are written out but not added to the transaction log are not read by Spark. Similarly, even if you manually delete a file, a pointer to the file is still present in the transaction log. Instead of manually modifying files stored in a Databricks Delta table, always use the DML commands that are described in this guide.
  • External readers - The data stored in Databricks Delta is encoded as Parquet files. However, when you read from a Databricks Delta table using an external tool, you must first clean up stale copies of data. Otherwise, the other query engine might read duplicate values.

Limitations

Unsupported features:

Warning

Writes to a single table must originate from a single cluster. There is experimental support for writes from multiple clusters in the same workspace. Contact Databricks support if you are interested in trying this feature.

  • INSERT INTO [OVERWRITE] with static partitions.
  • Subqueries in the WHERE conditions of UPDATE and DELETE.
  • Bucketing.
  • Specifying a schema when reading from a table. A command such as spark.read.format("delta").schema(df.schema).load(path) will fail.

Unsupported DDLs (compared with Parquet tables):

  • ANALYZE TABLE PARTITION
  • ALTER TABLE [ADD|DROP] PARTITION
  • ALTER TABLE SET LOCATION
  • ALTER TABLE RECOVER PARTITIONS
  • ALTER TABLE SET SERDEPROPERTIES
  • CREATE TABLE LIKE
  • INSERT OVERWRITE DIRECTORY
  • LOAD DATA
  • TRUNCATE

Frequently asked questions

How does Databricks Delta compare to Hive SerDe tables?

There are several Hive SerDe configuration parameters that Databricks Delta always manages on your behalf; you should never specify them manually:

  • ROWFORMAT
  • SERDE
  • OUTPUTFORMAT AND INPUTFORMAT
  • COMPRESSION
  • STORED AS
Does Databricks Delta support multi-table transactions?
Databricks Delta does not support multi-table transactions and foreign keys. Databricks Delta supports transactions at the table level.
Does Databricks Delta support writes or reads using the Spark Streaming DStream API?
Databricks Delta does not support the DStream API. We recommend Structured Streaming.
Why isn’t OPTIMIZE automatic?

The OPTIMIZE operation starts up many Spark jobs in order to optimize the file sizing via compaction (and optionally perform ZOrdering). Since much of what OPTIMIZE does is compact small files, you must first accumulate many small files before this operation would have an effect. Therefore, the OPTIMIZE operation is not run automatically.

Moreover, running OPTIMIZE, especially with ZORDER, is an expensive operation in time and resources. If Databricks ran OPTIMIZE automatically or waited to write out data in batches, it would remove the ability to run low-latency Databricks Delta streams (where a Databricks Delta table is the source). Many customers have Databricks Delta tables that are never optimized because they only stream data from these tables, obviating the query benefits that OPTIMIZE would provide.

Lastly, Databricks Delta automatically collects statistics about the files that are written to the table (whether through an OPTIMIZE operation or not). This means that reads from Databricks Delta tables leverage this information whether or not the table or a partition has had the OPTIMIZE operation run on it.

How often should I run OPTIMIZE?

There is a trade-off between cost and performance when you run OPTIMIZE. You should run OPTIMIZE more often if you want better end-user query performance (necessarily at a higher cost because of used resources). You should run it less often if you want to optimize more cost.

We recommend you start by running OPTIMIZE on a daily basis (preferably at night when spot prices are low). Then modify your job from there.