Delta Lake quickstart

The Delta Lake quickstart provides an overview of the basics of working with Delta Lake. The quickstart shows how to build a pipeline that reads data into a Delta table, modify the table, read the table, display table history, and optimize the table.

You can run the example Python, R, and Scala code in this article from within a notebook attached to a Databricks cluster. You can also run the SQL code in this article from within a query associated with a SQL endpoint in Databricks SQL.

For existing Databricks notebooks that demonstrate these features, see Introductory notebooks.

Create a table

To create a Delta table, you can use existing Apache Spark SQL code and change the write format from parquet, csv, json, and so on, to delta.

For all file types, you read the files into a DataFrame using the corresponding input format (for example, parquet, csv, json, and so on) and then write out the data in Delta format. In this code example, the input files are already in Delta format and are located in Databricks datasets. This code saves the data in Delta format in Databricks File System (DBFS) in the location specified by save_path.

# Define the input and output formats and paths and the table name.
read_format = 'delta'
write_format = 'delta'
load_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
save_path = '/mnt/delta/people-10m'
table_name = 'default.people10m'

# Load the data from its source.
people = spark \
  .read \
  .format(read_format) \
  .load(load_path)

# Write the data to its target.
people.write \
  .format(write_format) \
  .save(save_path)

# Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
library(SparkR)
sparkR.session()

# Define the input and output formats and paths and the table name.
read_format = "delta"
write_format = "delta"
load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
save_path = "/mnt/delta/people-10m/"
table_name = "default.people10m"

# Load the data from its source.
people = read.df(load_path, source = read_format)

# Write the data to its target.
write.df(people, source = write_format, path = save_path)

# Create the table.
sql(paste("CREATE TABLE ", table_name, " USING DELTA LOCATION '", save_path, "'", sep = ""))
// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val save_path = "/mnt/delta/people-10m"
val table_name = "default.people10m"

// Load the data from its source.
val people = spark
  .read
  .format(read_format)
  .load(load_path)

// Write the data to its target.
people.write
  .format(write_format)
  .save(save_path)

// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
-- The path for LOCATION must already exist
-- and must be in Delta format.

CREATE TABLE default.people10m
USING DELTA
LOCATION '/mnt/delta/people-10m'

These operations create a new unmanaged table using the schema that was inferred from the data. For information about available options when you create a Delta table, see Create a table and Write to a table.

If your source files are in Parquet format, you can use the CONVERT TO DELTA statement to convert files in place to create an unmanaged table:

CONVERT TO DELTA parquet.`/mnt/delta/people-10m`

Partition data

To speed up queries that have predicates involving the partition columns, you can partition data. The following code example is similar to the one in Create a table, but this example partitions the data.

# Define the input and output formats and paths and the table name.
read_format = 'delta'
write_format = 'delta'
load_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta'
partition_by = 'gender'
save_path = '/mnt/delta/people-10m'
table_name = 'default.people10m'

# Load the data from its source.
people = spark \
  .read \
  .format(read_format) \
  .load(load_path)

# Write the data to its target.
people.write \
  .partitionBy(partition_by) \
  .format(write_format) \
  .save(save_path)

# Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

If you already ran the Python code example in Create a table, you must first delete the existing table and the saved data:

# Define the table name and the output path.
table_name = 'default.people10m'
save_path = '/mnt/delta/people-10m'

# Delete the table.
spark.sql("DROP TABLE " + table_name)

# Delete the saved data.
dbutils.fs.rm(save_path, True)
library(SparkR)
sparkR.session()

# Define the input and output formats and paths and the table name.
read_format = "delta"
write_format = "delta"
load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
partition_by = "gender"
save_path = "/mnt/delta/people-10m/"
table_name = "default.people10m"

# Load the data from its source.
people = read.df(load_path, source = read_format)

# Write the data to its target.
write.df(people, source = write_format, partitionBy = partition_by, path = save_path)

# Create the table.
sql(paste("CREATE TABLE ", table_name, " USING DELTA LOCATION '", save_path, "'", sep = ""))

If you already ran the R code example in Create a table, you must first delete the existing table and the saved data:

library(SparkR)
sparkR.session()

# Define the table name and the output path.
table_name = "default.people10m"
save_path = "/mnt/delta/people-10m"

# Delete the table.
sql(paste("DROP TABLE ", table_name, sep = ""))

# Delete the saved data.
dbutils.fs.rm(save_path, TRUE)
// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val partition_by = "gender"
val save_path = "/mnt/delta/people-10m"
val table_name = "default.people10m"

// Load the data from its source.
val people = spark
  .read
  .format(read_format)
  .load(load_path)

// Write the data to its target.
people.write
  .partitionBy(partition_by)
  .format(write_format)
  .save(save_path)

// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

If you already ran the Scala code example in Create a table, you must first delete the existing table and the saved data:

// Define the table name and the output path.
val table_name = "default.people10m"
val save_path = "/mnt/delta/people-10m"

// Delete the table.
spark.sql("DROP TABLE " + table_name)

// Delete the saved data.
dbutils.fs.rm(save_path, true)

To partition data when you create a Delta table using SQL, specify the PARTITIONED BY columns.

-- The path in LOCATION must already exist
-- and must be in Delta format.

CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)
LOCATION '/mnt/delta/people-10m'

If you already ran the SQL code example in Create a table, you must first delete the existing table:

DROP TABLE default.people10m

Modify a table

Delta Lake supports a rich set of operations to modify tables.

Stream writes to a table

You can write data into a Delta table using Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.

The following code example starts Structured Streaming. It monitors the DBFS location specified in json_read_path, scanning for JSON files that are uploaded to this location. As Structured Streaming notices a file upload, it attempts to write the data to the DBFS location specified in save_path by using the schema specified in read_schema. Structured Streaming continues monitoring for uploaded files until the code is stopped. Structured Streaming uses the DBFS location specified in checkpoint_path to help ensure that uploaded files are evaluated only once.

# Define the schema and the input, checkpoint, and output paths.
read_schema = ("id int, " +
               "firstName string, " +
               "middleName string, " +
               "lastName string, " +
               "gender string, " +
               "birthDate timestamp, " +
               "ssn string, " +
               "salary int")
json_read_path = '/FileStore/streaming-uploads/people-10m'
checkpoint_path = '/mnt/delta/people-10m/checkpoints'
save_path = '/mnt/delta/people-10m'

people_stream = (spark \
  .readStream \
  .schema(read_schema) \
  .option('maxFilesPerTrigger', 1) \
  .option('multiline', True) \
  .json(json_read_path))

people_stream.writeStream \
  .format('delta') \
  .outputMode('append') \
  .option('checkpointLocation', checkpoint_path) \
  .start(save_path)
library(SparkR)
sparkR.session()

# Define the schema and the input, checkpoint, and output paths.
read_schema = "id int, firstName string, middleName string, lastName string, gender string, birthDate timestamp, ssn string, salary int"
json_read_path = "/FileStore/streaming-uploads/people-10m"
checkpoint_path = "/mnt/delta/people-10m/checkpoints"
save_path = "/mnt/delta/people-10m"

people_stream = read.stream(
  "json",
  path = json_read_path,
  schema = read_schema,
  multiline = TRUE,
  maxFilesPerTrigger = 1
)

write.stream(
  people_stream,
  path = save_path,
  mode = "append",
  checkpointLocation = checkpoint_path
)
// Define the schema and the input, checkpoint, and output paths.
val read_schema = ("id int, " +
                  "firstName string, " +
                  "middleName string, " +
                  "lastName string, " +
                  "gender string, " +
                  "birthDate timestamp, " +
                  "ssn string, " +
                  "salary int")
val json_read_path = "/FileStore/streaming-uploads/people-10m"
val checkpoint_path = "/mnt/delta/people-10m/checkpoints"
val save_path = "/mnt/delta/people-10m"

val people_stream = (spark
  .readStream
  .schema(read_schema)
  .option("maxFilesPerTrigger", 1)
  .option("multiline", true)
  .json(json_read_path))

people_stream.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpoint_path)
  .start(save_path)

To test this behavior, here is a conforming JSON file that you can upload to the location specified in json_read_path, and then query the location in save_path to see the data written by Structured Streaming.

[
  {
    "id": 10000021,
    "firstName": "Joe",
    "middleName": "Alexander",
    "lastName": "Smith",
    "gender": "M",
    "birthDate": 188712000,
    "ssn": "123-45-6789",
    "salary": 50000
  },
  {
    "id": 10000022,
    "firstName": "Mary",
    "middleName": "Jane",
    "lastName": "Doe",
    "gender": "F",
    "birthDate": "1968-10-27T04:00:00.000+000",
    "ssn": "234-56-7890",
    "salary": 75500
  }
]

For more information about Delta Lake integration with Structured Streaming, see Table streaming reads and writes.

Batch upserts

To merge a set of updates and insertions into an existing Delta table, you use the MERGE INTO statement. For example, the following statement takes data from the source table and merges it into the target Delta table. When there is a matching row in both tables, Delta Lake updates the data column using the given expression. When there is no matching row, Delta Lake adds a new row. This operation is known as an upsert.

MERGE INTO default.people10m
USING default.people10m_upload
ON default.people10m.id = default.people10m_upload.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

If you specify *, this updates or inserts all columns in the target table. This assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.

You must specify a value for every column in your table when you perform an INSERT operation (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

To test the preceding example, create the source table as follows:

CREATE TABLE default.people10m_upload (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA

To test the WHEN MATCHED clause, fill the source table with the following rows, and then run the preceding MERGE INTO statement. Because both tables have rows that match the ON clause, the target table’s matching rows are updated.

INSERT INTO default.people10m_upload VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000)

To see the results, query the table.

SELECT * FROM default.people10m WHERE id BETWEEN 9999998 AND 10000000 SORT BY id ASC

To test the WHEN NOT MATCHED clause, fill the source table with the following rows, and then run the preceding MERGE INTO statement. Because the target table does not have the following rows, these rows are added to the target table.

INSERT INTO default.people10m_upload VALUES
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900)

To see the results, query the table.

SELECT * FROM default.people10m WHERE id BETWEEN 20000001 AND 20000003 SORT BY id ASC

To run any of the preceding SQL statements in Python, R, or Scala, pass the statement as a string argument to the spark.sql function in Python or Scala or the sql function in R.

Read a table

You access data in Delta tables either by specifying the path on DBFS ("/mnt/delta/people-10m") or the table name ("default.people10m"):

people = spark.read.format('delta').load('/mnt/delta/people-10m')

display(people)

or

people = spark.table('default.people10m')

display(people)
library(SparkR)
sparkR.session()

people = read.df(path = "/mnt/delta/people-10m", source = "delta")

display(people)

or

library(SparkR)
sparkR.session()

people = tableToDF("default.people10m")

display(people)
val people = spark.read.format("delta").load("/mnt/delta/people-10m")

display(people)

or

val people = spark.table("default.people10m")

display(people)
SELECT * FROM delta.`/mnt/delta/people-10m`

or

SELECT * FROM default.people10m

Display table history

To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.

Query an earlier version of the table (time travel)

Delta Lake time travel allows you to query an older snapshot of a Delta table.

To query an older version of a table, specify a version or timestamp in a SELECT statement. For example, to query version 0 from the history above, use:

spark.sql('SELECT * FROM default.people10m VERSION AS OF 0')

or

spark.sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")
library(SparkR)
sparkR.session()

sql("SELECT * FROM default.people10m VERSION AS OF 0")

or

library(SparkR)
sparkR.session()

sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")
spark.sql("SELECT * FROM default.people10m VERSION AS OF 0")

or

spark.sql("SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'")
SELECT * FROM default.people10m VERSION AS OF 0

or

SELECT * FROM default.people10m TIMESTAMP AS OF '2019-01-29 00:37:58'

For timestamps, only date or timestamp strings are accepted, for example, "2019-01-01" and "2019-01-01'T'00:00:00.000Z".

Note

Because version 1 is at timestamp '2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range '2019-01-29 00:37:58' to '2019-01-29 00:38:09' inclusive.

DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table, for example in Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/mnt/delta/people-10m')

display(df1)

or

df2 = spark.read.format('delta').option('versionAsOf', 2).load('/mnt/delta/people-10m')

display(df2)

For details, see Query an older snapshot of a table (time travel).

Optimize a table

Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use OPTIMIZE to collapse small files into larger ones:

spark.sql("OPTIMIZE delta.`/mnt/delta/people-10m`")

or

spark.sql('OPTIMIZE default.people10m')
library(SparkR)
sparkR.session()

sql("OPTIMIZE delta.`/mnt/delta/people-10m`")

or

library(SparkR)
sparkR.session()

sql("OPTIMIZE default.people10m")
spark.sql("OPTIMIZE delta.`/mnt/delta/people-10m`")

or

spark.sql("OPTIMIZE default.people10m")
OPTIMIZE delta.`/mnt/delta/people-10m`

or

OPTIMIZE default.people10m

Z-order by columns

To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the ZORDER BY clause. For example, to co-locate by gender, run:

spark.sql("OPTIMIZE delta.`/mnt/delta/people-10m` ZORDER BY (gender)")

or

spark.sql('OPTIMIZE default.people10m ZORDER BY (gender)')
library(SparkR)
sparkR.session()

sql("OPTIMIZE delta.`/mnt/delta/people-10m` ZORDER BY (gender)")

or

library(SparkR)
sparkR.session()

sql("OPTIMIZE default.people10m ZORDER BY (gender)")
spark.sql("OPTIMIZE delta.`/mnt/delta/people-10m` ZORDER BY (gender)")

or

spark.sql("OPTIMIZE default.people10m ZORDER BY (gender)")
OPTIMIZE delta.`/mnt/delta/people-10m`
  ZORDER BY (gender)

or

OPTIMIZE default.people10m
  ZORDER BY (gender)

For the full set of options available when running OPTIMIZE, see Compaction (bin-packing).

Clean up snapshots

Delta Lake provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the VACUUM command:

spark.sql('VACUUM default.people10m')
library(SparkR)
sparkR.session()

sql("VACUUM default.people10m")
spark.sql("VACUUM default.people10m")
VACUUM default.people10m

You control the age of the latest retained snapshot by using the RETAIN <N> HOURS option:

spark.sql('VACUUM default.people10m RETAIN 24 HOURS')
library(SparkR)
sparkR.session()

sql("VACUUM default.people10m RETAIN 24 HOURS")
spark.sql("VACUUM default.people10m RETAIN 24 HOURS")
VACUUM default.people10m RETAIN 24 HOURS

For details on using VACUUM effectively, see Remove files no longer referenced by a Delta table.