Databricks Delta is a next-generation unified analytics engine built on top of Apache Spark. Databricks Delta provides ACID transactions, optimized layouts and indexes, and execution engine improvements for building data pipelines to support big data use cases: batch and streaming ingests, fast interactive queries, and machine learning. Specifically, Delta offers:
- ACID transactions: Serializable isolation levels ensure that readers never see inconsistent data.
- Efficient upserts: Fine-grained updates easily handle late coming data and changing records.
- High throughput streaming ingestion: Ingest high volume data directly into query tables.
- Optimized data layout: Choose a data layout that suits your query patterns; Delta automatically manages the layout to reduce query latency.
- Schema enforcement and evolution: Automatically handles schema variations to clean bad records during ingestion.
- Data versioning and time travel: Automatically versions your data for easy rollback and lets you time travel to query earlier versions.
- Execution engine optimizations: Optimizes operations with nested data types, higher order functions, range and data skew joins.
Delta requires Databricks Runtime 4.1 or above. If you created a Delta table using a Databricks Runtime lower than 4.1, you must upgrade the table version. For details, see Table Versioning.
This quickstart provides an overview of the basics of working with Databricks Delta. It shows how to build a pipeline that reads JSON data into a Delta table, modify the table, read the table and display table history, and optimize the table.
To try out Databricks Delta, see Try Databricks.
For runnable notebooks that demonstrate these features, see Introductory Notebooks.
In this topic:
To create a Delta table, you can use existing Spark SQL code and change the format from
json, and so on, to
For all file types, you read the files into a DataFrame and write out in
events = spark.read.json("/databricks-datasets/structured-streaming/events/") events.write.format("delta").save("/delta/events") spark.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")
CREATE TABLE events USING delta AS SELECT * FROM json.`/data/events/`
These operations create a new managed table using the schema that was inferred from the JSON data. For the full set of options available when you create a new Delta table, see Create a table and Write to a table.
If your source files are in Parquet format, you can use the SQL
Convert to Delta statement to convert files in place to create an unmanaged table:
CONVERT TO DELTA parquet.`/delta/events`
To speed up queries that have predicates involving the partition columns, you can partition data.
To partition data when you create a Delta table, specify
PARTITION BY columns.
CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING delta PARTITIONED BY (date)
Delta supports a rich set of operations to modify tables.
You can write data into a Delta table using Structured Streaming. The Delta transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.
# Source of data inputPath = "/databricks-datasets/structured-streaming/events/" # Define the schema to speed up processing jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) eventsDF = ( spark .readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time .json(inputPath) ) eventsDF.writeStream .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .table("events")
For more information about Delta integration with Structured Streaming, see Table Streaming Reads and Writes.
To merge a set of updates and insertions into an existing table, you use the
MERGE INTO statement. For example, the following statement takes a stream of updates and merges it into the
events table. When there is already an event present with the same
eventId, Delta updates the data column using the given expression. When there is no matching event, Delta adds a new row.
MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
You must specify a value for every column in your table when you perform an
INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.
You access data in Delta tables either by specifying the path on DBFS (
"/delta/events") or the table name (
events = spark.read.format("delta").load("/delta/events")
events = spark.table("events")
SELECT * FROM delta.`/delta/events`
SELECT * FROM events
To view the history of a table, use the
DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.
To query an older version of a table, specify a version or timestamp in a
SELECT statement. For example, to query version 0 from the history above, use:
SELECT * FROM events VERSION AS OF 0
SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'
Because version 1 is at timestamp
'2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range
'2019-01-29 00:37:58' to
'2019-01-29 00:38:09' inclusive.
For details, see Query an older snapshot of a table (time travel).
Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use
OPTIMIZE to collapse small files into larger ones:
To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Delta data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the
ZORDER BY clause. For example, to co-locate by
OPTIMIZE events ZORDER BY (eventType)
For the full set of options available when running
OPTIMIZE, see Optimizing Performance with File Management.
Delta provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the
You control the age of the latest retained snapshot by using the
RETAIN <N> HOURS option:
VACUUM events RETAIN 24 HOURS
For details on using
VACUUM effectively, see Garbage collection.
For answer to frequently asked questions, see Frequently Asked Questions (FAQ).
For reference information on Delta SQL commands, see SQL Guide.
For detailed information on Delta features and applications, see these blog posts:
- New Databricks Delta Features Simplify Data Pipelines
- Introducing Delta Time Travel for Large Scale Data Lakes
- Simplifying Change Data Capture with Databricks Delta
- Building a Real-Time Attribution Pipeline with Databricks Delta
- Processing Petabytes of Data in Seconds with Databricks Delta
- Simplifying Streaming Stock Data Analysis Using Databricks Delta
- Build a Mobile Gaming Events Data Pipeline with Databricks Delta