Delta Lake supports most of the options provided by Apache Spark DataFrame read and write APIs for performing batch reads and writes on tables.
For information on Delta Lake SQL commands, see SQL.
Delta Lake supports creating tables in the metastore using standard DDL:
CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA
When you create a table in the metastore using Delta Lake, it creates a symlink-like pointer in the metastore to the transaction log and data that are stored on DBFS. This pointer makes it easier for other users to discover and refer to the data without having to worry about exactly where it is stored. However, the metastore is not the source of truth about what is valid in the table. That responsibility stays with Delta Lake.
You can partition data to speed up queries or DML that have predicates involving the partition columns. To partition data when you create a Delta table, specify partition by columns. A common pattern is to partition by date, for example:
CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA PARTITIONED BY (date) LOCATION '/delta/events'
To control the location of the Delta table files, you can optionally specify the
LOCATION as a path on DBFS.
Tables created with a specified
LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you
DROP the table.
When you run
CREATE TABLE with a
LOCATION that already contains data stored using Delta Lake, Delta Lake does the following:
If you specify only the table name and location, for example:
CREATE TABLE events USING DELTA LOCATION '/delta/events'
the table in the Hive metastore automatically inherits the schema, partitioning, and table properties of the existing data. This functionality can be used to “import” data into the metastore.
If you specify any configuration (schema, partitioning, or table properties), Delta Lake verifies that the specification exactly matches the configuration of the existing data.
If the specified configuration does not exactly match the configuration of the data, Delta Lake throws an exception that describes the discrepancy.
You can load a Delta table as a DataFrame by specifying a path:
SELECT * FROM delta.`/delta/events`
Alternatively you can specify a table name:
SELECT * FROM 'events'
The DataFrame returned automatically reads the most recent snapshot of the table for any query; you never need to run
REFRESH TABLE. Delta Lake automatically uses partitioning and statistics to read the minimum amount of data when there are applicable predicates in the query.
Available in Databricks Runtime 5.3 and above.
Delta Lake time travel allows you to query an older snapshot of a Delta table. Time travel has many use cases, including:
- Re-creating analyses, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
- Writing complex temporal queries.
- Fixing mistakes in your data.
- Providing snapshot isolation for a set of queries for fast changing tables.
This section describes the supported methods for querying older versions of tables, data retention concerns, and provides examples.
There are several ways to query an older version of a Delta table.
SELECT * FROM events TIMESTAMP AS OF timestamp_expression SELECT * FROM events VERSION AS OF version
timestamp_expressioncan be any one of:
'2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18', that is, a date string
current_timestamp() - interval 12 hours
- Any other expression that is or can be cast to a timestamp
versionis a long value that can be obtained from the output of
DESCRIBE HISTORY events.
version can be subqueries.
DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table.
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")
timestamp_string, only date or timestamp strings are accepted. For example,
A common pattern is to use the latest state of the Delta table throughout the execution of a Databricks job to update downstream applications.
Because Delta tables auto update, a DataFrame loaded from a Delta table may return different results across invocations if the underlying data is updated. By using time travel, you can fix the data returned by the DataFrame across invocations:
latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect() df = spark.read.format("delta").option("versionAsOf", latest_version).load("/delta/events") # Every query that stems off df will use the same snapshot
You may have a parametrized pipeline, where the input path of your pipeline is a parameter of your job. After the execution of your job, you may want to reproduce the output some time in the future. In this case, you can use the
@ syntax to specify the timestamp or version:
df1 = spark.read.format("delta").load("/delta/events@20190101000000000") # table on 2019-01-01 00:00:00.000 df2 = spark.read.format("delta").load("/delta/events@v123") # table on version 123
SELECT * FROM events@20190101000000000 SELECT * FROM events@v123
The timestamp must be in
yyyyMMddHHmmssSSS format. You can obtain table versions by running
DESCRIBE HISTORY <table> and specify a version after
@ by prepending a
v to the version. For example, to query version
123 for the table
By default, Delta tables keep a commit history of 30 days. This means that you can potentially specify a version from 30 days ago. However, there are some caveats:
- All writers to the Delta table must be using Databricks Runtime 5.1 or above.
- You must not have run VACUUM on your Delta table. If you have run
VACUUM, then you may lose the ability to go back to a version older than the default 7 day data retention period.
You configure retention periods using the following table properties:
delta.logRetentionDuration = "interval <interval>": Configure how long you can go back in time. Default is
interval 30 days.
delta.deletedFileRetentionDuration = "interval <interval>": Configure how long stale data files are kept around before being deleted with
VACUUM. Default is
interval 1 week.
For full access to 30 days of historical data, set
delta.deletedFileRetentionDuration = "interval 30 days" on your table. This setting may cause your storage costs to go up.
Fix accidental deletes to a table for the user
INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Fix accidental incorrect updates to a table:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Query the number of new customers added over the last week.
SELECT count(distinct userId) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
append mode you can atomically add new data to an existing Delta table:
To atomically replace all of the data in a table, you can use
You can selectively overwrite only the data that matches predicates over partition columns. The following command atomically replaces the month of January with the data in
df.write .format("delta") .mode("overwrite") .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") .save("/delta/events")
This sample code writes out the data in
df, validates that it all falls within the specified partitions, and performs an atomic replacement.
Unlike the file APIs in Apache Spark, Delta Lake remembers and enforces the schema of a table. This means that by default overwrites do not replace the schema of an existing table.
For Delta Lake support for updating tables, see Update a table.
Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema of the table. Delta Lake uses the following rules to determine whether a write from a DataFrame to a table is compatible:
- All DataFrame columns must exist in the target table. If there are columns in the DataFrame not present in the table, an exception is raised. Columns present in the table but not in the DataFrame are set to null.
- DataFrame column data types must match the column data types in the target table. If they don’t match, an exception is raised.
- DataFrame column names cannot differ only by case. This means that you cannot have columns such as “Foo” and “foo” defined in the same table. While you can use Spark in case sensitive or insensitive (default) mode, Parquet is case sensitive when storing and returning column information. Delta Lake is case-preserving but insensitive when storing the schema and has this restriction to avoid potential mistakes, data corruption, or loss issues.
Delta Lake on Databricks has DDL to explicitly add new columns explicitly and the ability to update schema automatically.
If you specify other options, such as
partitionBy, in combination with append mode, Delta Lake validates that they match and throws an error for any mismatch. When
partitionBy is not present, appends automatically follow the partitioning of the existing data.
Available in Databricks Runtime 4.1 and above.
Delta Lake lets you update the schema of a table. The following types of changes are supported:
- Adding new columns (at arbitrary positions)
- Reordering existing columns
You can make these changes explicitly using DDL or implicitly using DML.
When you update a Delta table schema, streams that read from that table terminate. If you want the stream to continue you must restart it.
For recommended methods, see Structured Streaming in Production.
You can use the following DDL to explicitly change the schema of a table.
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
By default, nullability is
To add a column to a nested field, use:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
If the schema before running
ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)is:
- root | - colA | - colB | +-field1 | +-field2
the schema after is:
- root | - colA | - colB | +-field1 | +-nested | +-field2
Adding nested columns is supported only for structs. Arrays and maps are not supported.
Change column comment or ordering
ALTER TABLE table_name CHANGE [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
To change a column in a nested field, use:
ALTER TABLE table_name CHANGE [COLUMN] col_name.nested_col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
If the schema before running
ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRSTis:
- root | - colA | - colB | +-field1 | +-field2
the schema after is:
- root | - colA | - colB | +-field2 | +-field1
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
When running the following DSL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
if the schema before is:
- root | - colA | - colB | +-field1 | +-field2
the schema after is:
- root | - colC | - colB | +-field2 | +-nested | +-field1 | - colA
Change column type or name
Changing a column’s type or name or dropping a column requires rewriting the table. To do this, use the
Change a column type
spark.read.table(...) .withColumn("date", col("date").cast("date")) .write .mode("overwrite") .option("overwriteSchema", "true") .table(...)
Change a column name
spark.read.table(...) .withColumnRenamed("date", "date_created") .write .mode("overwrite") .option("overwriteSchema", "true") .table(...)
Delta Lake can automatically update the schema of a table as part of a DML transaction (either appending or overwriting), and make the schema compatible with the data being written.
Columns that are present in the DataFrame but missing from the table are automatically added as part of a write transaction when:
When both options are specified, the option from the
DataFrameWriter takes precedence.
The added columns are appended to the end of the struct they are present in. Case is preserved when appending a new column.
mergeSchemais not supported when table access control is enabled (as it elevates a request that requires
MODIFYto one that requires
mergeSchemacannot be used with
Because Parquet doesn’t support
NullType columns are dropped from the DataFrame when writing into Delta tables, but are still stored in the schema. When a different data type is received for that column, Delta Lake merges the schema to the new data type. If Delta Lake receives a
NullType for an existing column, the old schema is retained and the new column is dropped during the write.
NullType in streaming is not supported. Since you must set schemas when using streaming this should be very rare.
NullType is also not accepted for complex types such as
By default, overwriting the data in a table does not overwrite the schema. When overwriting a table using
replaceWhere, you may still want to overwrite the schema of the data being written. You replace the schema and partitioning of the table by setting the
overwriteSchema option to
Delta Lake supports the creation of views on top of Delta tables just like you might with a data source table.
These views integrate with table access control to allow for column and row level security.
The core challenge when you operate with views is resolving the schemas. If you alter a Delta table schema, you must recreate derivative views to account for any additions to the schema. For instance, if you add a new column to a Delta table, you must make sure that this column is available in the appropriate views built on top of that base table.
You can store your own metadata as a table property using
TBLPROPERTIES are stored as part of Delta table metadata. You cannot define new
TBLPROPERTIES in a
CREATE statement if a Delta table already exists in a given location. See table creation for more details.
In addition, to tailor behavior and performance, Delta Lake supports certain Delta table properties:
- Block deletes and modifications of a table:
- Configure the number of columns for which statistics are collected:
delta.dataSkippingNumIndexedCols=<number-of-columns>. This property takes affect only for new data that is written out.
- Configure the time travel retention properties:
- Randomize file prefixes to avoid hot spots in S3 metadata:
delta.randomizeFilePrefixes=true. For tables that require a lot (thousands of requests per second) of fast read/write operations, we strongly recommend dedicating an S3 bucket to a table (locating table at the root of the bucket), and enabling randomized file prefixes to get the best experience.
These are the only supported
delta.-prefixed table properties.
You can also set
delta.-prefixed properties during the first commit to a Delta table using Spark configurations. For example, to initialize a Delta table with the property
delta.appendOnly=true, set the Spark configuration
true. For example:
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
Provides information about schema, partitioning, table size, and so on. For example, you can see the current reader and writer versions of a table:
Provides provenance information, including the operation, user, and so on, for each write to a table. This information is not recorded by versions of Databricks Runtime below 4.1 and tables created using these versions will show this information as
null. Table history is retained for 30 days.
The Data sidebar provides a visual view of this detailed table information and history for Delta tables. In addition to the table schema and sample data, you can click the History tab to see the table history that displays with
For an example of the various Delta table metadata commands, see the end of the following notebook: