The Delta Lake quickstart provides an overview of the basics of working with Delta Lake. The quickstart shows how to load data into a Delta table, modify the table, read the table, display table history, and optimize the table.
You can run the example Python, R, Scala, and SQL code in this article from within a notebook attached to a Databricks cluster. You can also run the SQL code in this article from within a query associated with a SQL warehouse in Databricks SQL.
For existing Databricks notebooks that demonstrate these features, see Introductory notebooks.
Some of the following code examples use a two-level namespace notation consisting of a schema (also called a database) and a table or view (for example,
default.people10m). To use these examples with Unity Catalog, replace the two-level namespace with Unity Catalog three-level namespace notation consisting of a catalog, schema, and table or view (for example,
All tables created on Databricks use Delta Lake by default.
# Load the data from its source. df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta") # Write the data to a table. table_name = "people_10m" df.write.saveAsTable(table_name)
library(SparkR) sparkR.session() # Load the data from its source. df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta") # Write the data to a table. table_name = "people_10m" saveAsTable( df = df, tableName = table_name )
// Load the data from its source. val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta") // Write the data to a table. val table_name = "people_10m" people.write.saveAsTable("people_10m")
DROP TABLE IF EXISTS people_10m; CREATE TABLE IF NOT EXISTS people_10m AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
The preceding operations create a new managed table by using the schema that was inferred from the data. For information about available options when you create a Delta table, see Create a table and Write to a table.
For managed tables, Databricks determines the location for the data. To get the location, you can use the DESCRIBE DETAIL statement, for example:
display(spark.sql('DESCRIBE DETAIL people_10m'))
display(sql("DESCRIBE DETAIL people_10m"))
display(spark.sql("DESCRIBE DETAIL people_10m"))
DESCRIBE DETAIL people_10m;
To merge a set of updates and insertions into an existing Delta table, you use the MERGE INTO statement. For example, the following statement takes data from the source table and merges it into the target Delta table. When there is a matching row in both tables, Delta Lake updates the data column using the given expression. When there is no matching row, Delta Lake adds a new row. This operation is known as an upsert.
CREATE OR REPLACE TEMP VIEW people_updates ( id, firstName, middleName, lastName, gender, birthDate, ssn, salary ) AS VALUES (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250), (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500), (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000), (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500), (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250), (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900); MERGE INTO people_10m USING people_updates ON people_10m.id = people_updates.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *;
If you specify
*, this updates or inserts all columns in the target table. This assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.
You must specify a value for every column in your table when you perform an
INSERT operation (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.
To see the results, query the table.
SELECT * FROM people_10m WHERE id >= 9999998
You access data in Delta tables by specifying the path on DBFS (
"/tmp/delta/people-10m") or the table name (
people_df = spark.read.table(table_name) display(people_df)
people_df = tableToDF(table_name) display(people_df)
val people_df = spark.read.table(table_name) display(people_df)
SELECT * FROM people_10m
To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.
DESCRIBE HISTORY people_10m
Delta Lake time travel allows you to query an older snapshot of a Delta table.
To query an older version of a table, specify a version or timestamp in a
SELECT statement. For example, to query version 0 from the history above, use:
SELECT * FROM people_10m VERSION AS OF 0
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
For timestamps, only date or timestamp strings are accepted, for example,
DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table, for example in Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m") display(df1)
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m") display(df2)
For details, see Query an older snapshot of a table (time travel).
Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use
OPTIMIZE to collapse small files into larger ones:
To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the
ZORDER BY clause. For example, to co-locate by
OPTIMIZE people_10m ZORDER BY (gender)
For the full set of options available when running
OPTIMIZE, see Compaction (bin-packing).
Delta Lake provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the
For details on using
VACUUM effectively, see Remove files no longer referenced by a Delta table.