Delta Tables - Batch Reads and Writes

Delta supports most of the options provided by Spark SQL’s DataFrame read/write APIs.

Metadata (DDL)

Delta supports metadata definition using standard DDL.

Creating tables

Delta supports creating tables in the metastore using standard DDL.

CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION ‘/mnt/delta/events’

Specifying columns to PARTITION BY

You can optionally specify PARTITION BY when you create a new Delta table in order to cluster the data. This clustering will be used to speed up queries or DML that have predicates involving the partition columns.

Specifying LOCATION

If you want to control the location of the table, you also specify the LOCATION as a path on DBFS.

Tables created with a specified LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path was specified, an unmanaged table’s files are not deleted when the table is DROP-ed.

Changing the schema

When you append data to a Delta table with an updated schema, Delta automatically evolves the schema on your behalf. In future versions, you will be able to configure this behavior (that is to say, reject writes that conflict with the current schema).

Any readers that read from that Delta table will see the updated schema instead of the old one.

Warning

You must recreate any views that reference tables with updated schemas because of the way the Spark analyzer checks types.

Reads

You can load a Delta table as a DataFrame by specifying either its table name or path:

spark.read.format("delta").load("/delta/events")
table("events")

The DataFrame returned automatically reads the most recent snapshot of the table for any query executed. You should never need to run REFRESH TABLE. Delta automatically uses partitioning and statistics to read the minimum amount of data when there are applicable predicates in the query.

Writes

The DataFrameWriter (scala/java/python) can be used to write data into Delta as an atomic operation. At a minimum you must specify the format “delta”:

spark.write.format("delta").save("/delta/events")

Partitioning

You can optionally specify partition columns when creating a new Delta table. Partitioning is used to speed up queries or DML that have predicates involving the partition columns.

A common pattern is to partition by date, for example:

spark.write.format("delta").partitionBy("date").save("/delta/events")

Appending with DataFrames

By using mode “append” you can atomically add new data to an existing Delta table:

df.write.format("delta").mode("append").save("/delta/events")

Similarly you can append data to a Delta table registered in the metastore:

df.write.format("delta").mode("append").saveAsTable("/delta/events")

Overwriting Data Using DataFrames

If you want to atomically replace all of the data in a table, you can use “overwrite” mode:

df.write
  .format("delta")
  .mode("overwrite")
  .save("/delta/events")

You can selectively overwrite only the data that matches predicates over partition columns. The following command atomically replaces the month of January with the data in df:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/data")

This sample code writes out the data in df, validates that it all falls within the specified partitions, and performs an atomic replacement.

Insert Into

You can use SQL to append data. For example, you can SQL to insert new values:

INSERT INTO events VALUES("2017-01-01", "0", "click", "{...}")

When you use INSERT INTO, you must list the VALUES in the order of the schema of the table.

You can also insert the results of a SQL query:

INSERT INTO events SELECT * FROM json.`/data/events/new/`

You can also get the same ordering semantics when using DataFrames by running insertInto:

df.write.format("delta").mode("append").insertInto("delta_table")

Update

UPDATE allows you to apply expressions to change the value of columns when a row matches a given predicate. For example, you can use UPDATE to fix a spelling mistake in the eventType:

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

Similar to delete, update operations automatically make use of the partitioning of the table when possible.

Upserts (MERGE INTO)

MERGE INTO allows you to merge a set of updates and insertions into an existing dataset. For example, the following MERGE INTO takes a stream of updates and merges it into the events table. When there is already an event present with the same eventId, Delta updates the data column using the given expression. When there is no matching event, Delta adds a new row.

Here’s a worked example:

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

Currently you must specify a value for every column in your table when you perform an INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

Deletes

Delta tables allow users to remove data that matches a predicate. For instance, to delete all events from before 2017, you can run the following DML.

DELETE FROM events WHERE date < '2017-01-01'

Delete operations automatically make use of the partitioning of the table when possible. This optimization means that it will be significantly faster to delete data based on partition predicates.

Validating New Data

Delta automatically validates that the schema of the DataFrame being written is compatible with the schema of the table. Columns that are present in the table but not in the DataFrame are set to null. This operation fails if there are any extra columns in the DataFrame that are not present in the table. Delta has DDL to add new columns explicitly and the ability to update the schema automatically. See Changing the schema.

If you specify other options such as partitionBy() in combination with append mode, Delta validates that they match and throws an error for any mismatch. When paritionBy is not present, appends automatically follow the partitioning of the existing data.

Views on Delta Tables

Delta supports the creation of views on top of Delta tables just like you might with a data source table. These views integrate with Structured Data Access Controls to allow for column and row level security. The core challenge when you operate with views is resolving the schemas. If you alter a Delta table’s schema, any derivative views must be recreated to account for any additions to the schema.

For instance, if I add a new column to a Delta table, I have to make sure that this column is available in the appropriate views built on top of that base table.