Delta Lake liquid clustering is available in Public Preview in Databricks Runtime 13.2 and above.
Delta Lake liquid clustering replaces table partitioning and
ZORDER to simplify data layout decisions and optimize query performance. Liquid clustering provides flexibility to redefine clustering keys without rewriting existing data, allowing data layout to evolve alongside analytic needs over time.
Databricks supports enhance concurrency for Delta tables with liquid clustering enabled. See Row-level concurrency on Databricks.
Databricks Runtime 13.2 and above is required to write or
OPTIMIZE Delta tables with liquid clustering enabled.
Databricks recommends liquid clustering for all new Delta tables. The following are examples of scenarios that benefit from clustering:
Tables often filtered by high cardinality columns.
Tables with significant skew in data distribution.
Tables that grow quickly and require maintenance and tuning effort.
Tables with concurrent write requirements.
Tables with access patterns that change over time.
Tables where a typical partition key could leave the table with too many or too few partitions.
You must enable Delta Lake liquid clustering when first creating a table. Clustering is not compatible with partitioning or
ZORDER, and requires that the Databricks client manages all layout and optimization operations for data in your table. Once enabled, run
OPTIMIZE jobs as normal to incrementally cluster data. See How to trigger clustering.
To enable liquid clustering, add the
CLUSTER BY phrase to a table creation statement, as in the examples below:
-- Create an empty table CREATE TABLE table1(col0 int, col1 string) USING DELTA CLUSTER BY (col0); -- Using a CTAS statement CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery LOCATION ‘table_location’ AS SELECT * FROM table1; -- Using a LIKE statement to copy configurations CREATE TABLE table3 LIKE table1;
Tables created with liquid clustering enabled have numerous Delta table features enabled at creation and use Delta writer version 7 and reader version 3. Table protocol versions cannot be downgraded, and tables with clustering enabled are not readable by Delta Lake clients that do not support all enabled Delta reader protocol table features. See How does Databricks manage Delta Lake feature compatibility?.
Databricks recommends choosing clustering keys based on commonly used query filters. Clustering keys can be defined in any order. If two columns are correlated, you only need to add one of them as a clustering key.
If you’re converting an existing table, consider the following recommendations:
Current data optimization technique
Recommendation for clustering keys
Use partition columns as clustering keys.
Hive-style partitioning and Z-order
Use both partition columns and
Generated columns to reduce cardinality (for example, date for a timestamp)
Use the original column as a clustering key, and don’t create a generated column.
Databricks provides row-level concurrency for clustered tables that can reduce the number of conflicts between concurrent write operations, including
To reduce the likelihood of row-level conflicts, you can filter concurrent queries, as in the following example:
INSERT INTO table_name REPLACE WHERE column_name = "some_value" SELECT * FROM staging_table;
You must use a Delta writer client that supports all Delta write protocol table features used by liquid clustering. On Databricks, you must use Databricks Runtime 13.2 and above.
Most operations do not automatically cluster data on write. Operations that cluster on write include the following:
COPY INTOfrom Parquet format
Clustering on write is a best effort application, and is not applied in the following situations:
If a write operation exceeds 512GB of data.
SELECTsubquery contains a transformation, filter, or join.
If projected columns are not the same as the source table.
Because not all operations apply liquid clustering, Databricks recommends frequently running
OPTIMIZE to ensure that all data is efficiently clustered.
To trigger clustering, you must use Databricks Runtime 13.2 or above. Use the
OPTIMIZE command on your table, as in the following example:
Liquid clustering is incremental, meaning that data is only rewritten as necessary to accommodate data that needs to be clustered. Data files with clustering keys that do not match data to be clustered are not rewritten.
For best performance, Databricks recommends scheduling regular
OPTIMIZE jobs to cluster data. For tables experiencing many updates or inserts, Databricks recommends scheduling an
OPTIMIZE job every one or two hours. Because liquid clustering is incremental, most
OPTIMIZE jobs for clustered tables run quickly.
You can read data in a clustered table using any Delta Lake client that supports reading deletion vectors. For best query results, include clustering keys in your query filters, as in the following example:
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
You can change clustering keys for a table at any time by running an
ALTER TABLE command, as in the following example:
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
When you change clustering keys, subsequent
OPTIMIZE and write operations use the new clustering approach, but existing data is not rewritten.
You can also turn off clustering by setting the keys to
NONE, as in the following example:
ALTER TABLE table_name CLUSTER BY NONE;
Setting cluster keys to
NONE does not rewrite data that has already been clustered, but prevents future
OPTIMIZE operations from using clustering keys.
You can use
DESCRIBE commands to see the clustering keys for a table, as in the following examples:
DESCRIBE TABLE table_name; DESCRIBE DETAIL table_name;
The following limitations exist:
You can only specify columns with statistics collected for clustering keys. By default, the first 32 columns in a Delta table have statistics collected.
You can specify up to 4 columns as clustering keys.
Structured Streaming workloads do not support clustering-on-write.