Optimize performance with file management

To improve query speed, Delta Lake on Databricks supports the ability to optimize the layout of data stored in cloud storage. Delta Lake on Databricks supports two layout algorithms: bin-packing and Z-Ordering.

This article describes how to run the optimization commands, how the two layout algorithms work, and how to clean up stale table snapshots.

Compaction (bin-packing)

Delta Lake on Databricks can improve the speed of read queries from a table by coalescing small files into larger ones. You trigger compaction by running the OPTIMIZE command:

OPTIMIZE delta.`/data/events`

or

OPTIMIZE events

If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using WHERE:

OPTIMIZE events WHERE date >= '2017-01-01'

Note

  • Bin-packing optimization is idempotent, meaning that if it is run twice on the same dataset, the second run has no effect.
  • Bin-packing aims to produce evenly-balanced data files with respect to their size on disk, but not necessarily number of tuples per file. However, the two measures are most often correlated.

Readers of Delta tables use snapshot isolation, which means that they are not interrupted when OPTIMIZE removes unnecessary files from the transaction log. OPTIMIZE makes no data related changes to the table, so a read before and after an OPTIMIZE has the same results. Performing OPTIMIZE on a table that is a streaming source does not affect any current or future streams that treat this table as a source. OPTIMIZE returns the file statistics (min, max, total, and so on) for the files removed and the files added by the operation. Optimize stats also contains the Z-Ordering statistics, the number of batches, and partitions optimized.

Note

Available in Databricks Runtime 6.0 and above.

You can also compact small files automatically using Auto Optimize.

Data skipping

Data skipping information is collected automatically when you write data into a Delta table. Delta Lake on Databricks takes advantage of this information (minimum and maximum values) at query time to provide faster queries. You do not need to configure data skipping; the feature is activated whenever applicable. However, its effectiveness depends on the layout of your data. For best results, apply Z-Ordering.

For an example of the benefits of Delta Lake on Databricks data skipping and Z-Ordering, see the notebooks in Optimization examples. By default Delta Lake on Databricks collects statistics on the first 32 columns defined in your table schema. You can change this value using the table property dataSkippingNumIndexedCols. Adding more columns to collect statistics would add additional overhead as you write files.

Collecting statistics on long strings is an expensive operation. To avoid collecting statistics on long strings, you can either configure the table property dataSkippingNumIndexedCols to avoid columns containing long strings or move columns containing long strings to a column greater than dataSkippingNumIndexedCols using ALTER TABLE CHANGE COLUMN. See

For the purposes of collecting statistics, each field within a nested column is considered as an individual column.

You can read more on this article in the blog post: Processing Petabytes of Data in Seconds with Databricks Delta.

Z-Ordering (multi-dimensional clustering)

Z-Ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the ZORDER BY clause:

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

If you expect a column to be commonly used in query predicates and if that column has high cardinality (that is, a large number of distinct values), then use ZORDER BY.

You can specify multiple columns for ZORDER BY as a comma-separated list. However, the effectiveness of the locality drops with each additional column. Z-Ordering on columns that do not have statistics collected on them would be ineffective and a waste of resources as data skipping requires column-local stats such as min, max, and count. You can configure statistics collection on certain columns by re-ordering columns in the schema or increasing the number of columns to collect statistics on. See the section Data skipping for more details.

Note

  • Z-Ordering is not idempotent but aims to be an incremental operation. The time it takes for Z-Ordering is not guaranteed to reduce over multiple runs. However, if no new data was added to a partition that was just Z-Ordered, another Z-Ordering of that partition will not have any effect.

  • Z-Ordering aims to produce evenly-balanced data files with respect to the number of tuples, but not necessarily data size on disk. The two measures are most often correlated, but there can be situations when that is not the case, leading to skew in optimize task times.

    For example, if you ZORDER BY date and your most recent records are all much wider (for example longer arrays or string values) than the ones in the past, it is expected that the OPTIMIZE job’s task durations will be skewed, as well as the resulting file sizes. This is, however, only a problem for the OPTIMIZE command itself; it should not have any negative impact on subsequent queries.

Tune file size

This section describes how to tune the size of files in Delta tables.

Set a target size

Note

Available in Databricks Runtime 8.2 and above.

If you want to tune the size of files in your Delta table, set the table property delta.targetFileSize to the desired size. If this property is set, all data layout optimization operations (for example, Optimize with Compaction (bin-packing) or Z-Ordering (multi-dimensional clustering), Auto Compaction, and Optimized Writes) will make a best-effort attempt to generate files of the specified size.

Table property

delta.targetFileSize

Type: Size in bytes or higher units.

The target file size. For example, 104857600 (bytes) or 100mb.

Default value: None

For existing tables, you can set and unset properties using the SQL command ALTER TABLE SET TBL PROPERTIES. You can also set these properties automatically when creating new tables using Spark session configurations. See Table properties for details.

Autotune based on workload

Note

Available in Databricks Runtime 8.2 and above.

To minimize the need for manual tuning, Databricks can automatically tune the file size of Delta tables based on workloads operating on the table. Databricks can automatically detect if a Delta table has frequent MERGE operations that rewrite files and may choose to reduce the size of rewritten files in anticipation of further file rewrites in the future. For example, when executing a MERGE operation, if 9 out of last 10 previous operations on the table were also MERGEs, then When to opt in to Optimized Writes and Auto Compaction used by MERGE (if enabled) will generate smaller file sizes than it would otherwise. This helps in reducing the duration of future MERGE operations.

Autotune is activated after a few rewrite operations have occurred. However, if you anticipate a Delta table will experience frequent MERGE, UPDATE, or DELETE operations and want this tuning immediately, you can explicitly tune file sizes for rewrites by setting the table property delta.tuneFileSizesForRewrites. Set this property to true to always use lower file sizes for all data layout optimization operations on the table. Set it to false to never tune to lower file sizes, that is, prevent auto-detection from being activated.

Table property

delta.tuneFileSizesForRewrites

Type: Boolean

Whether to tune file sizes for data layout optimization.

Default value: None

For existing tables, you can set and unset properties using the SQL command ALTER TABLE SET TBL PROPERTIES. You can also set these properties automatically when creating new tables using Spark session configurations. See Table properties for details.

Autotune based on table size

Note

Available in Databricks Runtime 8.4 and above.

To minimize the need for manual tuning, Databricks automatically tunes the file size of Delta tables based on the size of the table. Databricks will use smaller file sizes for smaller tables and larger file sizes for larger tables so that the number of files in the table does not grow too large. Databricks does not autotune tables that you have tuned with a specific target size or based on a workload with frequent rewrites.

The target file size is based on the current size of the Delta table. For tables smaller than 2.56TB, the autotuned target file size is 256MB. For tables with a size between 2.56TB and 10TB, the target size will grow linearly from 256MB to 1GB. For tables larger than 10TB, the target file size is 1GB.

Note

When the target file size for a table grows, existing files are not re-optimized into larger files by the OPTIMIZE command. A large table can therefore always have some files that are smaller than the target size. If it is required to optimize those smaller files into larger files as well, you can configure a fixed target file size for the table using the delta.targetFileSize table property.

When a table is written incrementally, the target file sizes and file counts will be approximately as follows based on table size. The file counts in this table are only an example. The actual results will be different depending on many factors.

Table size Target file size Approximate number of files in table
10GB 256MB 40
1TB 256MB 4096
2.56TB 256MB 10240
3TB 307MB 12108
5TB 512MB 17339
7TB 716MB 20784
10TB 1GB 24437
20TB 1GB 34437
50TB 1GB 64437
100TB 1GB 114437

Improve interactive query performance

Delta Engine offers a few additional mechanisms to improve query performance.

Manage data recency

At the beginning of each query Delta tables auto-update to the latest version of the table. This process can be observed in notebooks when the command status reports: Updating the Delta table's state. However, when running historical analysis on a table, you may not necessarily need up-to-the-last-minute data, especially for tables where streaming data is being ingested frequently. In these cases, queries can be run on stale snapshots of your Delta table. This can lower latency in getting results from queries.

You can configure how stale your table data is by setting the Spark session configuration spark.databricks.delta.stalenessLimit with a time string value, for example, 1h, 15m, 1d for 1 hour, 15 minutes, and 1 day respectively. This configuration is session specific, therefore won’t affect other users accessing this table from other notebooks, jobs, or BI tools. In addition, this setting doesn’t prevent your table from updating; it only prevents a query from having to wait for the table to update. The update still occurs in the background, and will share resources fairly across the cluster. If the staleness limit is exceeded, then the query will block on the table state update.

Enhanced checkpoints for low-latency queries

Delta Lake writes checkpoints as an aggregate state of a Delta table at an optimized frequency. These checkpoints serve as the starting point to compute the latest state of the table. Without checkpoints, Delta Lake would have to read a large collection of JSON files (“delta” files) representing commits to the transaction log to compute the state of a table. In addition, the column-level statistics Delta Lake uses to perform data skipping are stored in the checkpoint.

Important

Delta Lake checkpoints are different than Structured Streaming checkpoints.

In Databricks Runtime 7.2 and below, column-level statistics are stored in Delta Lake checkpoints as a JSON column.

In Databricks Runtime 7.3 LTS and above, column-level statistics are stored as a struct. The struct format makes Delta Lake reads much faster, because:

  • Delta Lake doesn’t perform expensive JSON parsing to obtain column-level statistics.
  • Parquet column pruning capabilities significantly reduce the I/O required to read the statistics for a column.

The struct format enables a collection of optimizations that reduce the overhead of Delta Lake read operations from seconds to tens of milliseconds, which significantly reduces the latency for short queries.

Manage column-level statistics in checkpoints

You manage how statistics are written in checkpoints using the table properties delta.checkpoint.writeStatsAsJson and delta.checkpoint.writeStatsAsStruct. If both table properties are false, Delta Lake cannot perform data skipping.

In Databricks Runtime 7.3 LTS and above:

  • Batch writes write statistics in both JSON and struct format. delta.checkpoint.writeStatsAsJson is true.
  • delta.checkpoint.writeStatsAsStruct is undefined by default.
  • Readers use the struct column when available and otherwise fall back to using the JSON column.

For streaming writes:

  • Databricks Runtime 7.5 and above: write statistics in both JSON format and struct format.
  • Databricks Runtime 7.3 LTS and 7.4: write statistics in only JSON format (to minimize the impact of checkpoints on write latency). To also write the struct format, see Trade-offs with statistics in checkpoints.

In Databricks Runtime 7.2 and below, readers only use the JSON column. Therefore, if delta.checkpoint.writeStatsAsJson is false, such readers cannot perform data skipping.

Important

Enhanced checkpoints do not break compatibility with open source Delta Lake readers. However, setting delta.checkpoint.writeStatsAsJson to false may have implications on proprietary Delta Lake readers. Contact your vendors to learn more about performance implications.

Trade-offs with statistics in checkpoints

Since writing statistics in a checkpoint has a cost (usually < a minute even for large tables), there is a tradeoff between the time taken to write a checkpoint and compatibility with Databricks Runtime 7.2 and below. If you are able to upgrade all of your workloads to Databricks Runtime 7.3 LTS or above you can reduce the cost of writing a checkpoint by disabling the legacy JSON statistics. This tradeoff is summarized in the following table.

If data skipping is not useful in your application, you can set both properties to false, and no statistics are collected or written. We do not recommend this configuration.

  writeStatsAsStruct
false true
writeStatsAsJson false
  • No data skipping
  • Faster queries on Databricks Runtime 7.3 LTS and above
  • Slightly slower checkpoints
  • No data skipping in readers on Databricks Runtime 7.2 and below
true
  • Databricks Runtime 7.2 and below
  • Slower queries
  • Faster queries on Databricks Runtime 7.3 LTS and above
  • Maintains compatibility with readers on Databricks Runtime 7.2 and below
  • Checkpoints have the highest latency (order of seconds)

Enable enhanced checkpoints for Structured Streaming queries

If your Structured Streaming workloads don’t have low latency requirements (sub-minute latencies), you can enable enhanced checkpoints by running the following SQL command:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

If you do not use Databricks Runtime 7.2 or below to query your data, you can also improve the checkpoint write latency by setting the following table properties:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Disable writes from clusters that write checkpoints without the stats struct

Writers in Databricks Runtime 7.2 and below write checkpoints without the stats struct, which prevents optimizations for Databricks Runtime 7.3 LTS readers.

To block clusters running Databricks Runtime 7.2 and below from writing to a Delta table, you can upgrade the Delta table using the upgradeTableProtocol method:

from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

Warning

Applying the upgradeTableProtocol method prevents clusters running Databricks Runtime 7.2 and below from writing to your table and this change is irreversible. We recommend upgrading your tables only after you are committed to the new format. You can try out these optimizations by creating a shallow CLONE of your tables using Databricks Runtime 7.3 LTS.

Once you upgrade the table writer version, writers must obey your settings for 'delta.checkpoint.writeStatsAsStruct' and 'delta.checkpoint.writeStatsAsJson'.

The following table summarizes how to take advantage of enhanced checkpoints in various versions of Databricks Runtime, table protocol versions, and writer types.

  Without Protocol Upgrade With Protocol Upgrade
Databricks Runtime 7.2 and below writer Databricks Runtime 7.3 LTS and above batch writer Databricks Runtime 7.3 LTS and above streaming writer Databricks Runtime 7.2 and below writer Databricks Runtime 7.3 LTS and above batch writer Databricks Runtime 7.3 LTS and above streaming writer
Databricks Runtime 7.2 and below reader performance No improvement No improvement No improvement Cannot use writer No improvement No improvement
Databricks Runtime 7.3 LTS and above reader performance No improvement Improved by default Opt-in by table property (1) Cannot use writer Improved by default Opt-in by table property (1)

(1) Set the table property 'delta.checkpoint.writeStatsAsStruct' = 'true'

Disable writes from clusters using old checkpoint formats

Writers from Databricks Runtime 7.2 and below can write old format checkpoints, which would prevent optimizations for Databricks Runtime 7.3 LTS writers. To block clusters running Databricks Runtime 7.2 and below from writing to a Delta table, you can upgrade the Delta table using the upgradeTableProtocol method:

from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

Warning

Applying the upgradeTableProtocol method prevents clusters running Databricks Runtime 7.2 and below from writing to your table. The change is irreversible. Therefore, we recommend upgrading your tables only after you are committed to the new format. You can try out these optimizations by creating a shallow CLONE of your tables using Databricks Runtime 7.3 LTS:

Frequently asked questions (FAQ)

Why isn’t OPTIMIZE automatic?

The OPTIMIZE operation starts up many Spark jobs in order to optimize the file sizing via compaction (and optionally perform Z-Ordering). Since much of what OPTIMIZE does is compact small files, you must first accumulate many small files before this operation has an effect. Therefore, the OPTIMIZE operation is not run automatically.

Moreover, running OPTIMIZE, especially with ZORDER, is an expensive operation in time and resources. If Databricks ran OPTIMIZE automatically or waited to write out data in batches, it would remove the ability to run low-latency Delta Lake streams (where a Delta table is the source). Many customers have Delta tables that are never optimized because they only stream data from these tables, obviating the query benefits that OPTIMIZE would provide.

Lastly, Delta Lake automatically collects statistics about the files that are written to the table (whether through an OPTIMIZE operation or not). This means that reads from Delta tables leverage this information whether or not the table or a partition has had the OPTIMIZE operation run on it.

How often should I run OPTIMIZE?

When you choose how often to run OPTIMIZE, there is a trade-off between performance and cost. You should run OPTIMIZE more often if you want better end-user query performance (necessarily at a higher cost because of resource usage). You should run it less often if you want to optimize cost.

We recommend you start by running OPTIMIZE on a daily basis (preferably at night when spot prices are low). Then modify your job from there.

What’s the best instance type to run OPTIMIZE (bin-packing and Z-Ordering) on?

Both operations are CPU intensive operations doing large amounts of Parquet decoding and encoding.

For these workloads we recommend the c5d series.