Skip to main content

Use liquid clustering for tables

Liquid clustering is a data layout optimization technique that replaces table partitioning and ZORDER. It simplifies table management and optimizes query performance by automatically organizing data based on clustering keys.

Unlike traditional partitioning, you can redefine clustering keys without rewriting existing data. This allows your data layout to evolve alongside changing analytic needs. Liquid clustering applies to both streaming tables and materialized views.

important

Liquid clustering is generally available for Delta tables and in Public Preview for managed Apache Iceberg tables. For Delta tables, GA support is available with Databricks Runtime 15.2 and above. Databricks recommends using the latest Databricks Runtime for the best performance. For managed Apache Iceberg tables, Databricks Runtime 16.4 LTS and above is required.

Managed Apache Iceberg v3 tables also support deletion vectors, row tracking, row-level concurrency, and automatic liquid clustering. These capabilities require Databricks Runtime 18.0 and above. See Use Apache Iceberg v3 features.

When to use liquid clustering

Databricks recommends liquid clustering for all new tables, including streaming tables and materialized views. The following scenarios particularly benefit from clustering:

  • Queries that filter on high cardinality columns.
  • Tables with heavy data skew.
  • Fast growing tables that require maintenance and tuning effort.
  • Tables with concurrent write requirements.
  • Tables with varied or changing access patterns.
  • Tables where a typical partition key might return results from too many or too few partitions.

Enable liquid clustering

You can enable liquid clustering on an existing unpartitioned table or during table creation. Clustering is not compatible with partitioning or ZORDER. Databricks recommends allowing the platform to manage all layout and optimization operations for data in your table. After enabling liquid clustering, run OPTIMIZE jobs to incrementally cluster data. See How to trigger clustering.

Create tables with clustering

To enable liquid clustering, add the CLUSTER BY phrase to a table creation statement, as in the examples below. In Databricks Runtime 14.2 and above, you can use DataFrame APIs and DeltaTable API in Python or Scala to enable liquid clustering for Delta tables.

To create an empty table with clustering:

SQL
CREATE TABLE table1 (col0 INT, col1 STRING) CLUSTER BY (col0);

To create a table from existing data with clustering, CLUSTER BY must appear after the table name, not in the SELECT clause:

SQL
CREATE TABLE table2 CLUSTER BY (col0)
AS SELECT * FROM table1;

To copy a table structure including its clustering configuration:

SQL
CREATE TABLE table3 LIKE table1;
important

When using DataFrame APIs to set clustering keys, you can only specify clustering columns during table creation or when using overwrite mode (such as with CREATE OR REPLACE TABLE operations). You cannot change clustering keys when using append mode.

To change clustering keys on an existing table while appending data, use SQL ALTER TABLE commands to modify the clustering configuration separately from your data write operations. See Change clustering keys.

In Databricks Runtime 16.0 and above, you can create tables with liquid clustering enabled using Structured Streaming writes. Databricks recommends using Databricks Runtime 16.4 and above for the best performance, as in the following examples:

SQL
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1);
warning

Delta tables with liquid clustering enabled use Delta writer version 7 and reader version 3. Delta clients that don't support these protocols cannot read these tables. You cannot downgrade table protocol versions. See Delta Lake feature compatibility and protocols.

To override default feature enablement (such as deletion vectors), see Override default feature enablement (optional).

Enable on existing tables

To enable liquid clustering on an existing unpartitioned Delta table, do the following:

SQL
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

For managed Apache Iceberg tables, consider the following:

  • For tables with the v2 spec, you must explicitly turn off deletion vectors and row tracking when enabling liquid clustering on an existing table.
  • For tables with the v3 spec, turning off these features is not required because deletion vectors and row tracking are supported. See Use Apache Iceberg v3 features.
note

The default behavior does not apply clustering to previously written data. To force reclustering, use OPTIMIZE FULL or OPTIMIZE FULL WHERE <predicate>. See Force reclustering.

Convert a partitioned table to liquid clustering

In Databricks Runtime 18.1 and above, to convert an existing partitioned Delta Lake table to liquid clustering, use REPLACE PARTITIONED BY WITH CLUSTER BY in an ALTER TABLE statement. Conversion minimizes reader and writer downtime and supports both external and managed tables. After conversion, the table supports reads with Databricks Runtime 13.3 LTS and above.

note

For managed Iceberg tables, conversion is not necessary because these tables use partition definitions as liquid clustering keys. Running the conversion command raises an error.

The benefits of converting partitioned tables to liquid clustering include:

  • Performance improvements for tables that suffer from poor data-skipping or over-partitioning.
  • Automatic performance improvements, using CLUSTER BY AUTO, for tables with frequently changing query patterns.
  • Clustering columns are flexible and simple to alter, whereas partitioning is rigid and difficult to alter.
  • Reduced write conflicts because tables with liquid clustering allow for row-level concurrency. See Row-level concurrency.

Syntax

SQL
ALTER TABLE <table_name>
REPLACE PARTITIONED BY WITH CLUSTER BY [( <clustering_columns> ) | AUTO]

The CLUSTER BY clause supports the following options:

  • ( <clustering_columns> ): Specifies new clustering columns. Databricks recommends keeping the new clustering columns similar to the original partition columns. Using very different columns triggers a large reclustering operation on the first OPTIMIZE run.
  • AUTO: Uses the current partition columns as the initial clustering columns and lets predictive optimization adapt over time. Only available for Unity Catalog managed tables. See Automatic liquid clustering.
  • No options specified: Uses the current partition columns as the new clustering columns.

For guidance on choosing clustering keys when migrating from partitioned tables, see Migrating from partitioning or Z-order.

Examples

To cluster on different columns than the original partitions, such as for a table partitioned on (year, month, day), do the following:

SQL
ALTER TABLE t1 REPLACE PARTITIONED BY WITH CLUSTER BY (day, id);
OPTIMIZE t1;
note

To benefit from altering clustering columns, you must run OPTIMIZE.

To use automatic liquid clustering and start with the current partition columns, do the following:

SQL
ALTER TABLE t2 REPLACE PARTITIONED BY WITH CLUSTER BY AUTO;

To keep the current partition columns as clustering columns, do the following:

SQL
ALTER TABLE t3 REPLACE PARTITIONED BY WITH CLUSTER BY;

Handle concurrent reads and writes during conversion

After conversion, Databricks Runtime 13.3 LTS and above is supported for reads and writes. Databricks recommends Databricks Runtime 15.4 LTS and above for workloads that read or write to the table during conversion.

See the following table for how to handle concurrent read and write workloads during conversion:

Workload type

Reads during conversion

Writes during conversion

Batch

No downtime. All Databricks Runtime versions can read the table during conversion.

No downtime on Databricks Runtime 15.4 and above. For Databricks Runtime 15.3 and below, Databricks recommends that you pause workloads before converting and then restart workloads after the conversion completes.

Streaming

With schema tracking and column mapping: Restart the stream without losing any commits. Without schema tracking and column mapping: The stream raises an exception. Restart with a new checkpoint location and start version. Commits aren't lost.

Restart the stream without losing any commits.

Verify or roll back a conversion

To confirm conversion, run DESCRIBE EXTENDED to see the new clustering columns. Run DESCRIBE HISTORY to see a series of REORG operations, an UPGRADE PROTOCOL operation, and a REPLACE PARTITIONED BY WITH CLUSTER BY operation.

To roll back a conversion, use RESTORE to return to the previous version. Alternatively, you can rewrite the table using REPLACE TABLE ... PARTITIONED BY (...) AS SELECT * FROM ....

To roll back using RESTORE, run the following commands:

SQL
ALTER TABLE my_table CLUSTER BY NONE;
ALTER TABLE my_table UNSET TBLPROPERTIES ('delta.liquid.hierarchicalClusteringColumns');
RESTORE TABLE my_table TO VERSION AS OF <version_number_before_conversion>;

See RESTORE.

Convert a table partitioned by a timestamp column

To convert a table (t1) that is partitioned by a timestamp column (timestamp_col) and use the timestamp column as a clustering key, you must set additional configurations:

SQL
SET spark.databricks.delta.liquidConversion.statsGeneration.enabled = false;
ALTER TABLE t1 REPLACE PARTITIONED BY WITH CLUSTER BY (timestamp_col, id);
ANALYZE TABLE t1 COMPUTE DELTA STATISTICS;

If you try to convert a timestamp partition column to a clustering column without these configurations, the command raises an error:

Text
ALTER TABLE REPLACE PARTITIONED BY WITH CLUSTER BY cannot auto-generate stats on table with column event_ts due to unsupported type: timestamp. Disable stats auto-generation by setting 'spark.databricks.delta.liquidConversion.statsGeneration.enabled' to 'false' and retry the command again. SQLSTATE: 42000

Conversion limitations

The following limitations apply to the REPLACE PARTITIONED BY WITH CLUSTER BY conversion command:

  • Streaming tables and materialized views created from a Lakeflow Spark Declarative Pipelines pipeline are not supported. To use liquid clustering, you must update the pipeline definition to use CLUSTER BY instead of PARTITIONED BY.
  • Tables that use Delta Sharing with partition filtering are not supported. For information on partition filtering for Delta Sharing, see Specify table partitions to share.

Remove clustering keys

To remove clustering keys, use the following syntax:

SQL
ALTER TABLE table_name CLUSTER BY NONE;

Choose clustering keys

tip

Databricks recommends using automatic liquid clustering for supported tables, which intelligently selects clustering keys based on your query patterns. See Automatic liquid clustering.

Key selection guidelines

When you manually specify clustering keys, choose columns based on the columns most frequently used in query filters. You can define clustering keys in any order. If two columns are highly correlated, you only need to include one of them as a clustering key.

You can specify up to four clustering keys. For smaller tables (less than 10 TB), using more clustering keys can degrade performance when filtering on a single column. For example, filtering with four keys performs worse than filtering with two keys. However, as table size increases, this performance difference becomes negligible for single-column queries.

Clustering keys must be columns that have statistics collected. By default, the first 32 columns in a Delta table have statistics collected. See Specify statistics columns.

Supported data types

Clustering supports these data types for clustering keys:

  • Date
  • Timestamp
  • TimestampNTZ (Databricks Runtime 14.3 LTS and above)
  • String
  • Integer, Long, Short, Byte
  • Float, Double, Decimal

Migrating from partitioning or Z-order

important

Databricks recommends that you use the automatic conversion with REPLACE PARTITIONED BY WITH CLUSTER BY command. See Convert a partitioned table to liquid clustering.

If you're converting an existing table, consider the following recommendations:

Current data optimization technique

Recommendation for clustering keys

Hive-style partitioning

Use partition columns as clustering keys.

Z-order indexing

Use the ZORDER BY columns as clustering keys.

Hive-style partitioning and Z-order

Use both partition columns and ZORDER BY columns as clustering keys.

Generated columns to reduce cardinality (for example, date for a timestamp)

Use the original column as a clustering key, and don't create a generated column.

Automatic liquid clustering

In Databricks Runtime 15.4 LTS and above, you can enable automatic liquid clustering for Unity Catalog managed Delta tables. For Unity Catalog managed Apache Iceberg v3 tables, automatic liquid clustering requires Databricks Runtime 18.0 and above. Automatic liquid clustering allows Databricks to intelligently choose clustering keys to optimize query performance, using the CLUSTER BY AUTO clause.

note

Automatic liquid clustering is also supported for materialized views and streaming tables, including Lakeflow Spark Declarative Pipelines and standalone pipelines. Specify CLUSTER BY AUTO in your pipeline or SQL definition.

How automatic liquid clustering works

Automatic liquid clustering provides intelligent optimization based on your usage patterns:

  • Requires predictive optimization: Automatic key selection and clustering operations run asynchronously as a maintenance operation. See Predictive optimization for Unity Catalog managed tables.
  • Analyzes query workload: Databricks analyzes the table's historical query workload and identifies the best candidate columns for clustering.
  • Adapts to changes: If your query patterns or data distributions change over time, automatic liquid clustering selects new keys to optimize performance.
  • Cost-aware selection: Databricks changes clustering keys only when the predicted cost savings from data skipping improvements outweigh the data clustering cost.

Automatic liquid clustering might not select keys for the following reasons:

  • The table is too small to benefit from liquid clustering.
  • The table already has an effective clustering scheme, either from previous manual keys or natural insertion order that matches query patterns.
  • The table does not have frequent queries.
  • You are not using Databricks Runtime 15.4 LTS or above.

You can apply automatic liquid clustering for all Unity Catalog managed tables, regardless of data and query characteristics. The heuristics decide whether it's cost-beneficial to select clustering keys.

Databricks Runtime version compatibility

You can read or write tables with automatic clustering enabled from all Databricks Runtime versions that support liquid clustering. However, intelligent key selection relies on metadata introduced in Databricks Runtime 15.4 LTS.

Use Databricks Runtime 15.4 LTS or above to ensure that automatically selected keys benefit all of your workloads and that these workloads are considered when selecting new keys.

Enable or turn off automatic liquid clustering

To create a table with automatic liquid clustering:

SQL
CREATE OR REPLACE TABLE table1 (column01 int, column02 string) CLUSTER BY AUTO;

To enable automatic liquid clustering on an existing table, including tables with manually specified keys:

SQL
ALTER TABLE table1 CLUSTER BY AUTO;
note

SQL doesn't support initial clustering column hints when enabling automatic liquid clustering. To provide hints for initial key selection, use the Python API.

To turn off automatic liquid clustering:

SQL
ALTER TABLE table1 CLUSTER BY NONE;

To turn off automatic liquid clustering and specify clustering columns:

SQL
ALTER TABLE table1 CLUSTER BY (column01, column02);

If an existing table has automatic liquid clustering enabled, running CREATE OR REPLACE table_name without CLUSTER BY AUTO turns off automatic clustering and does not preserve clustering columns. To preserve automatic liquid clustering and any previously selected columns, include CLUSTER BY AUTO in the replace statement. With CLUSTER BY AUTO, predictive optimization uses the historical query workload for the table to identify the best clustering keys.

Check if automatic clustering is enabled

To check if a table has automatic liquid clustering enabled, use DESCRIBE TABLE or SHOW TBLPROPERTIES.

If automatic liquid clustering is enabled, the clusterByAuto property is set to true. The clusteringColumns property shows the current clustering columns that were automatically or manually selected.

Limitations

Automatic liquid clustering isn't available for managed Apache Iceberg v2 tables. It is supported for managed Apache Iceberg v3 tables in Databricks Runtime 18.0 and above.

Write data to a clustered table

To write to a clustered Delta table, you must use a Delta writer client that supports all Delta write protocol table features used by liquid clustering. To write to a clustered Iceberg table, you can use Unity Catalog's Iceberg REST Catalog API. On Databricks, you must use Databricks Runtime 13.3 LTS and above.

Operations that support clustering on write

Operations that cluster on write include the following:

  • INSERT INTO operations
  • CTAS and RTAS statements
  • COPY INTO from Parquet format
  • spark.write.mode("append")

Size thresholds for clustering

Clustering on write only triggers when data in the transaction meets a size threshold. These thresholds vary by the number of clustering columns and are lower for Unity Catalog managed tables than other Delta tables.

Number of clustering columns

Threshold size for Unity Catalog managed tables

Threshold size for other Delta tables

1

64 MB

256 MB

2

256 MB

1 GB

3

512 MB

2 GB

4

1 GB

4 GB

Because not all operations apply liquid clustering, Databricks recommends frequently running OPTIMIZE to ensure that all data is efficiently clustered.

Streaming workloads

Structured Streaming workloads support clustering on write when you set the Spark config spark.databricks.delta.liquid.eagerClustering.streaming.enabled to true. Clustering for these workloads only triggers if at least one of the last five streaming updates exceeds a size threshold from the table above.

How to trigger clustering

Predictive optimization automatically runs OPTIMIZE commands for enabled tables. See Predictive optimization for Unity Catalog managed tables. When using Predictive optimization, Databricks recommends disabling any scheduled OPTIMIZE jobs.

To trigger clustering, you must use Databricks Runtime 13.3 LTS or above. Databricks recommends Databricks Runtime 17.2 and above for faster OPTIMIZE performance on large tables. Use the OPTIMIZE command on your table:

SQL
OPTIMIZE table_name;

Liquid clustering is incremental, meaning that OPTIMIZE only rewrites data as necessary to accommodate data that needs clustering. OPTIMIZE does not rewrite data files with clustering keys that do not match the data being clustered. See Force reclustering.

If you are not using predictive optimization, Databricks recommends scheduling regular OPTIMIZE jobs to cluster data. For tables experiencing many updates or inserts, Databricks recommends scheduling an OPTIMIZE job every one or two hours. Because liquid clustering is incremental, most OPTIMIZE jobs for clustered tables run quickly.

Force reclustering

In Databricks Runtime 16.0 and above, you can force reclustering of all records in a table with the following syntax:

SQL
OPTIMIZE table_name FULL;
important

Running OPTIMIZE FULL reclusters all existing data as necessary. For large tables that have not previously been clustered on the specified keys, this operation might take hours.

Run OPTIMIZE FULL when you enable clustering for the first time or change clustering keys. If you have previously run OPTIMIZE FULL and there has been no change to clustering keys, OPTIMIZE FULL runs the same as OPTIMIZE. In this scenario, OPTIMIZE uses an incremental approach and only rewrites files that haven't previously been compacted. Always use OPTIMIZE FULL to ensure that data layout reflects the current clustering keys.

Partial recluster

In Databricks Runtime 18.1 and above, you can force reclustering for a subset of records using OPTIMIZE FULL WHERE <predicate>. A file is included if any part of its range overlaps with the predicate. See Parameters.

SQL
OPTIMIZE events FULL WHERE event_date >= '2025-01-01';

Read data from a clustered table

You can read data in a clustered Delta table using any Delta Lake client that supports reading deletion vectors. Using the Iceberg REST Catalog API, you can read data in a clustered Iceberg table. Liquid clustering improves query performance through automatic data skipping when filtering on clustering keys.

SQL
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

Manage clustering keys

See how a table is clustered

You can use DESCRIBE commands to see the clustering keys for a table, as in the following examples:

SQL
DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

Change clustering keys

You can change clustering keys for a table at any time by running an ALTER TABLE command, as in the following example:

SQL
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

When you change clustering keys, subsequent OPTIMIZE and write operations use the new clustering approach, but existing data is not rewritten. To rewrite existing data with the updated clustering keys, see Force reclustering.

You can also turn off clustering by setting the keys to NONE, as in the following example:

SQL
ALTER TABLE table_name CLUSTER BY NONE;

Setting cluster keys to NONE does not rewrite clustered data, but prevents future OPTIMIZE operations from using clustering keys.

Use liquid clustering from an external engine

You can enable liquid clustering on managed Iceberg tables from external Iceberg engines. To enable liquid clustering, specify partition columns when creating a table. Unity Catalog interprets the partitions as clustering keys. For example, run the command below in OSS Spark:

SQL
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;

To turn off liquid clustering:

SQL
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;

To change clustering keys using Iceberg partition evolution:

SQL
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;

If you specify a partition using a bucket transform, Unity Catalog drops the expression and uses the column as a clustering key:

SQL
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));

Compatibility for tables with liquid clustering

Liquid clustering uses Delta table features that require specific Databricks Runtime versions for reading and writing. Tables created with liquid clustering in Databricks Runtime 14.1 and above use checkpoint V2 by default. You can read and write tables with checkpoint V2 in Databricks Runtime 13.3 LTS and above. See Checkpoint V2.

To support readers using Databricks Runtime 12.2 LTS to 13.2, disable checkpoint V2 and downgrade the table protocol. See Downgrade to classic.

Override default feature enablement (optional)

You can override default Delta table feature enablement during liquid clustering enablement. This prevents upgrades of the reader and writer protocols associated with those table features. You must have an existing table to complete the following steps:

  1. Use ALTER TABLE to set the table property that disables one or more features. For example, to disable deletion vectors run the following:

    SQL
    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
  2. Enable liquid clustering on the table by running the following:

    SQL
    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)

The following table provides information on the Delta features you can override and how enablement impacts compatibility with Databricks Runtime versions.

Delta feature

Runtime compatibility

Property to override enablement

Impact to liquid clustering if turned off

Deletion vectors

Reads and writes require Databricks Runtime 12.2 LTS and above.

'delta.enableDeletionVectors' = false

Disabling deletion vectors disables row-level concurrency, making transactions and clustering operations more likely to conflict. See Row-level concurrency.

DELETE, MERGE, and UPDATE commands might run slower.

Row tracking

Writes require Databricks Runtime 13.3 LTS and above. Can be read from any Databricks Runtime version.

'delta.enableRowTracking' = false

Disabling row tracking disables row-level concurrency, making transactions and clustering operations more likely to conflict. See Row-level concurrency.

Checkpoint V2

Reads and writes require Databricks Runtime 13.3 LTS and above.

'delta.checkpointPolicy' = 'classic'

No impact on liquid clustering behavior. See Checkpoint V2.

Limitations

  • Databricks Runtime 15.1 and below: Clustering on write does not support source queries that include filters, joins, or aggregations.
  • Databricks Runtime 15.4 LTS and below: You cannot create a table with liquid clustering enabled using a Structured Streaming write. You can use Structured Streaming to write data to an existing table with liquid clustering enabled.
  • Apache Iceberg v2: Row-level concurrency isn't supported on managed Apache Iceberg v2 tables because deletion vectors and row tracking aren't supported.
    • Row-level concurrency is supported on managed Apache Iceberg v3 tables because the v3 spec supports deletion vectors and row tracking. See Use Apache Iceberg v3 features.