Table utility commands

Delta tables support a number of utility commands.

Vacuum

You can remove files no longer referenced by a Delta table and are older than the retention threshold by running the vacuum command on the table. vacuum is not triggered automatically. The default retention threshold for the files is 7 days.

Important

  • vacuum deletes only data files, not log files. Log files are deleted automatically and asynchronously after checkpoint operations. The default retention period of log files is 30 days, configurable through the delta.logRetentionPeriod property which you set with the ALTER TABLE SET TBLPROPERTIES SQL method. See Table properties.
  • The ability to time travel back to a version older than the retention period is lost after running vacuum.
VACUUM eventsTable   -- vacuum files not required by versions older than the default retention period

VACUUM '/data/events' -- vacuum files in path-based table

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old

VACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

See Vacuum a Delta table for syntax details.

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

Note

The Java API is available in Databricks Runtime 6.0 and above.

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

deltaTable.vacuum();        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100);     // vacuum files not required by versions more than 100 hours old

See the API reference for Scala, Java, and Python syntax details.

Warning

We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.

Delta Lake has a safety check to prevent you from running a dangerous vacuum command. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Apache Spark configuration property spark.databricks.delta.retentionDurationCheck.enabled to false. You must choose an interval that is longer than the longest running concurrent transaction and the longest period that any stream can lag behind the most recent update to the table.

Describe History

You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

See Describe History for details.

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

Note

The Java API is available in Databricks Runtime 6.0 and above.

import io.delta.tables.*;

DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);

DataFrame fullHistoryDF = deltaTable.history();       // get the full history of the table

DataFrame lastOperationDF = deltaTable.history(1);    // fetch the last operation on the DeltaTable

See the API reference for Scala/Java/Python syntax details.

History schema

The output of this operation has the following columns.

Column Type Description
version long Table version generated by the operation.
timestamp timestamp When this version was committed.
userId string ID of the user that ran the operation.
userName string Name of the user that ran the operation.
operation string Name of the operation.
operationParameters map Parameters of the operation (for example, predicates.)
job struct Details of the job that ran the operation.
notebook struct Details of notebook from which the operation was run.
clusterId string ID of the cluster on which the operation ran.
readVersion long Version of the table that was read to perform the write operation.
isolationLevel string Isolation level used for this operation.
isBlindAppend boolean Whether this operation appended data.
operationMetrics map Metrics of the operation (for example, number of rows and files modified.)
userMetadata string User-defined commit metadata if it was specified
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Note

  • Operation metrics are available only when the history command and the operation in the history were run using Databricks Runtime 6.5 or above.
  • A few of the other columns are not available if you write into a Delta table using the following methods:
  • Columns added in the future will always be added after the last column.

Operation metrics keys

The operationMetrics column is a map.

The following tables list the key definitions by operation.

Operation Metric name Description
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO    
  numFiles Number of files written.
  numOutputBytes Size in bytes of the written contents.
  numOutputRows Number of rows written.
STREAMING UPDATE    
  numAddedFiles Number of files added.
  numRemovedFiles Number of files removed.
  numOutputRows Number of rows written.
  numOutputBytes Size of write in bytes.
DELETE    
  numAddedFiles Number of files added. Not provided when partitions of the table are deleted.
  numRemovedFiles Number of files removed.
  numDeletedRows Number of rows removed. Not provided when partitions of the table are deleted.
  numCopiedRows Number of rows copied in the process of deleting files.
TRUNCATE numRemovedFiles Number of files removed.
MERGE    
  numSourceRows Number of rows in the source DataFrame.
  numTargetRowsInserted Number of rows inserted into the target table.
  numTargetRowsUpdated Number of rows updated in the target table.
  numTargetRowsDeleted Number of rows deleted in the target table.
  numTargetRowsCopied Number of target rows copied.
  numOutputRows Total number of rows written out.
  numTargetFilesAdded Number of files added to the sink(target).
  numTargetFilesRemoved Number of files removed from the sink(target).
UPDATE    
  numAddedFiles Number of files added.
  numRemovedFiles Number of files removed.
  numUpdatedRows Number of rows updated.
  numCopiedRows Number of rows just copied over in the process of updating files.
FSCK numRemovedFiles Number of files removed.
CONVERT numConvertedFiles Number of Parquet files that have been converted.
Operation Metric name Description
CLONE*    
  sourceTableSize Size in bytes of the source table at the version that’s cloned.
  sourceNumOfFiles Number of files in the source table at the version that’s cloned.
  numRemovedFiles Number of files removed from the target table if a previous Delta table was replaced.
  removedFilesSize Total size in bytes of the files removed from the target table if a previous Delta table was replaced.
  numCopiedFiles Number of files that were copied over to the new location. 0 for shallow clones.
  copiedFilesSize Total size in bytes of the files that were copied copied over to the new location. 0 for shallow clones.
OPTIMIZE    
  numAddedFiles Number of files added.
  numRemovedFiles Number of files optimized.
  numAddedBytes Number of bytes added after the table was optimized.
  numRemovedBytes Number of bytes removed.
  minFileSize Size of the smallest file after the table was optimized.
  p25FileSize Size of the 25th percentile file after the table was optimized.
  p50FileSize Median file size after the table was optimized.
  p75FileSize Size of the 75th percentile file after the table was optimized.
  maxFileSize Size of the largest file after the table was optimized.

*Requires Databricks Runtime 7.3 or above.

Describe Detail

You can retrieve more information about the table (for example, number of files, data size) using DESCRIBE DETAIL.

DESCRIBE DETAIL '/data/events/'

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY eventsTable

See Describe Detail for details.

Detail schema

The output of this operation has only one row with the following schema.

Column Type Description
format string Format of the table, that is, “delta”.
id string Unique ID of the table.
name string Name of the table as defined in the metastore.
description string Description of the table.
location string Location of the table.
createdAt timestamp When the table was created.
lastModified timestamp When the table was last modified.
partitionColumns array of strings Names of the partition columns if the table is partitioned.
numFiles long Number of the files in the latest version of the table.
properties string-string map All the properties set for this table.
minReaderVersion int Minimum version of readers (according to the log protocol) that can read the table.
minWriterVersion int Minimum version of writers (according to the log protocol) that can write to the table.
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|                  id|              name|description|            location|           createdAt|       lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable|       null|file:/Users/tdas/...|2020-06-05 12:20:...|2020-06-05 12:20:20|              []|      10|      12345|        []|               1|               2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+

Generate

You can generate manifest files for Delta tables that can be used by other processing engines (that is, other than Apache Spark) to read the Delta tables. For example, to generated manifest files that can be used by Presto and Athena to read Delta tables, you can run the following:

GENERATE symlink_format_manifest FOR TABLE delta.`/mnt/events`

GENERATE symlink_format_manifest FOR TABLE eventsTable

Note

The Python API is available in Databricks Runtime 6.3 and above.

deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

Note

The Scala API is available in Databricks Runtime 6.3 and above.

val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

Note

The Java API is available in Databricks Runtime 6.3 and above.

DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");

Convert to Delta

Convert an existing Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns as a DDL-formatted string (that is, <column-name1> <type>, <column-name2> <type>, ...).

-- Convert unpartitioned parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`

-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)

See the Convert To Delta (Delta Lake on Databricks) command for details.

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *

# Convert unpartitioned parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables.*;

// Convert unpartitioned parquet table at path '<path-to-table>'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`");

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");

Note

Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum. You should avoid updating or appending data files during the conversion process. After the table is converted, make sure all writes go through Delta Lake.

Convert a Delta table to a Parquet table

You can easily convert a Delta table back to a Parquet table using the following steps:

  1. If you have performed Delta Lake operations that can change the data files (for example, delete or merge), run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.
  2. Delete the _delta_log directory in the table directory.

Clone a Delta table

Preview

This feature is in Public Preview.

Note

Available in Databricks Runtime 7.2 and above.

You can create a copy of an existing Delta table at a specific version using the clone command. Clones can be either deep or shallow.

  • A deep clone is a clone copies the source table data to the clone target in addition to the metadata of the existing table. Additionally, stream metadata is also cloned such that a stream that writes to the Delta table can be stopped on a source table and continued on the target of a clone from where it left off.
  • A shallow clone is a clone that does not copy the data files to the clone target. The table metadata is equivalent to the source. These clones are cheaper to create.

Any changes made to either deep or shallow clones affect only the clones themselves and not the source table.

The metadata that is cloned includes: schema, partitioning information, invariants, nullability. For deep clones only, stream and COPY INTO metadata are also cloned. Metadata not cloned are the table description and user-defined commit metadata.

Important

  • Shallow clones reference data files in the source directory. If you run vacuum on the source table clients will no longer be able to read the referenced data files and a FileNotFoundException will be thrown. In this case, running clone with replace over the shallow clone will repair the clone. If this occurs often, consider using a deep clone instead which does not depend on the source table.
  • Deep clones do not depend on the source from which they were cloned, but are expensive to create because a deep clone copies the data as well as the metadata.
  • Cloning with replace to a target that already has a table at that path creates a Delta log if one does not exist at that path. You can clean up any existing data by running vacuum. If the existing table is a Delta table, a new commit is created on the existing Delta table that includes the new metadata and new data from the source table.
  • Cloning a table is not the same as Create Table As Select or CTAS. A clone copies the metadata of the source table in addition to the data. Cloning also has simpler syntax: you don’t need to specify partitioning, format, invariants, nullability and so on as they are taken from the source table.
  • A cloned table has an independent history from its source table. Time travel queries on a cloned table will not work with the same inputs as they work on its source table.
 CREATE TABLE delta.`/data/target/` CLONE delta.`/data/source/` -- Create a deep clone of /data/source at /data/target

 CREATE OR REPLACE TABLE db.target_table CLONE db.source_table -- Replace the target

 CREATE TABLE IF NOT EXISTS TABLE delta.`/data/target/` CLONE db.source_table -- No-op if the target table exists

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source`

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` VERSION AS OF version

 CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` TIMESTAMP AS OF timestamp_expression -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)
 from delta.tables import *

 deltaTable = DeltaTable.forPath(spark, pathToTable)  # path-based tables, or
 deltaTable = DeltaTable.forName(spark, tableName)    # Hive metastore-based tables

 deltaTable.clone(target, isShallow, replace) # clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) # clone the source at a specific version

# clone the source at a specific timestamp such as timestamp=“2019-01-01”
 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace)
 import io.delta.tables._

 val deltaTable = DeltaTable.forPath(spark, pathToTable)
 val deltaTable = DeltaTable.forName(spark, tableName)

 deltaTable.clone(target, isShallow, replace) // clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version

 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp
 import io.delta.tables.*;

 DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
 DeltaTable deltaTable = DeltaTable.forName(spark, tableName);

 deltaTable.clone(target, isShallow, replace) // clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version

 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp

See Clone (Delta Lake on Databricks) for syntax details.

Permissions

You must configure the permissions required for CLONE separately for Databricks table ACLs and your cloud provider.

Table ACLs

You need the following permissions for both deep and shallow clones:

  • SELECT permission on the source table.
  • If you are using CLONE to create a new table, CREATE permission on the database in which you are creating the table.
  • If you are using CLONE to replace a table, you must have MODIFY permission on the table.

Cloud permissions

If you have created a deep clone, any user that reads the deep clone must have read access to the clone’s directory. To make changes to the clone, users must have write access to the clone’s directory.

If you have created a shallow clone, any user that reads the shallow clone needs permission to read the files in the original table, since the data files remain in the source table with shallow clones, as well as the clone’s directory. To make changes to the clone, users will need write access to the clone’s directory.

Clone use cases

  • Data archiving: Data may need to be kept for longer than is feasible with time travel or for disaster recovery. In these cases, you can create a deep clone to preserve the state of a table at a certain point in time for archival. Incremental archiving is also possible to keep a continually updating state of a source table for disaster recovery.

    -- Every month run
    CREATE OR REPLACE TABLE delta.`/some/archive/path` CLONE my_prod_table
    
  • Machine learning flow reproduction: When doing machine learning, you may want to archive a certain version of a table on which you trained an ML model. Future models can be tested using this archived data set.

        -- Trained model on version 15 of Delta table
    CREATE TABLE delta.`/model/dataset` CLONE entire_dataset VERSION AS OF 15
    
  • Short-term experiments on a production table: In order to test out a workflow on a production table without corrupting the table, you can easily create a shallow clone. This allows you to run arbitrary workflows on the cloned table that contains all the production data but does not affect any production workloads.

    -- Perform shallow clone
    CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;
    
    UPDATE my_test WHERE user_id is null SET invalid=true;
    -- Run a bunch of validations. Once happy:
    
    -- This should leverage the update information in the clone to prune to only
    -- changed files in the clone if possible
    MERGE INTO my_prod_table
    USING my_test
    ON my_test.user_id <=> my_prod_table.user_id
    WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;
    
    DROP TABLE my_test;
    
  • Data sharing: Other business units within a single organization may want to access the same data but may not require the latest updates. Instead of giving access to the source table directly, clones with different permissions can be provided for different business units. The performance of the clone can exceed that of a simple view as well.

    -- Perform deep clone
    CREATE OR REPLACE TABLE shared_table CLONE my_prod_table;
    
    -- Grant other users access to the shared table
    GRANT SELECT ON shared_table TO `<user-name>@<user-domain>.com`;