Table Utility Commands

Delta tables support a number of utility commands.

Vacuum

You can remove files no longer referenced by a Delta table and are older than the retention threshold by running the vacuum command on the table. vacuum is not triggered automatically. The default retention threshold for the files is 7 days.

Important

The ability to time travel back to a version older than the retention period is lost after running vacuum.

SQL

VACUUM '/data/events' -- vacuum files not required by versions older than the default retention period

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable

VACUUM eventsTable DRY RUN           -- do dry run to get the list of files to be deleted

See Vacuum a Delta table for details.

Scala

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

Python

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     # vacuum files not required by versions more than 100 hours old

See the Delta Lake API Reference for details.

Warning

We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If VACUUM cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when VACUUM deletes files that have not yet been committed.

Delta Lake has a safety check to prevent you from running a dangerous VACUUM command. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Apache Spark configuration property spark.databricks.delta.retentionDurationCheck.enabled to false. You must choose an interval that is longer than the longest running concurrent transaction and the longest period that any stream can lag behind the most recent update to the table.

History

You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

SQL

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only
DESCRIBE HISTORY eventsTable

See Describe History for details.

Scala

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

Python

Note

The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

See the Delta Lake API Reference for details.

The returned data has the following structure.

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      5|2019-07-29 14:07:47|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          4|          null|        false|
|      4|2019-07-29 14:07:41|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          3|          null|        false|
|      3|2019-07-29 14:07:29|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          2|          null|        false|
|      2|2019-07-29 14:06:56|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          1|          null|        false|
|      1|2019-07-29 14:04:31|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-07-29 14:01:40|  null|    null|    WRITE|[mode -> ErrorIfE...|null|    null|     null|       null|          null|         true|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

Note

This information is not available if you write into a Delta table using the following methods:

Convert to Delta

Convert an existing Parquet table to a Delta table in-place. This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema by reading the footers of all Parquet files. If your data is partitioned, you must specify the schema of the partition columns.

SQL

-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`

-- Convert partitioned parquet table at path 'path/to/table' and partitioned by integer column named 'part'
CONVERT TO DELTA parquet.`path/to/table` PARTITIONED BY (part int)

See the Convert To Delta (Delta Lake on Databricks) command for details.

Scala

Note

The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

// Convert unpartitioned parquet table at path '/path/to/table'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

// Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

See the Delta Lake API Reference for more details.

Note

Any file not tracked by Delta Lake is invisible and can be deleted when you run vacuum. You should avoid updating or appending data files during the conversion process. After the table is converted, make sure all writes go through Delta Lake.

Convert Delta table to a Parquet table

You can easily convert a Delta table back to a Parquet table by the following steps:

  1. If you have performed Delta Lake operations that can change the data files (for example, Delete or Merge), then first run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.
  2. Delete the _delta_log directory in the table directory.