テーブルにリキッドクラスタリングを使用する
リキッドクラスタリングは、テーブルのパーティショニングとZORDERに代わるデータ レイアウト最適化手法です。 クラスタリング キーに基づいてデータを自動的に整理することで、テーブル管理を簡素化し、クエリ パフォーマンスを最適化します。
従来のパーティショニングとは異なり、既存のデータを書き換えずにクラスタリング キーを再定義できます。これにより、分析ニーズの変化に合わせてデータ レイアウトを進化させることができます。リキッドクラスタリングはストリーミングテーブルとマテリアライズドビューの両方に適用されます。
リキッドクラスタリングは、 Delta Lakeテーブルでは一般提供され、マネージドApache Icebergテーブルではパブリック プレビューで利用可能です。 Delta Lake テーブルの場合、GA サポートは Databricks Runtime 15.2 以降で利用できます。Databricks では、最高のパフォーマンスを得るために最新の Databricks Runtime を使用することをお勧めします。Apache Iceberg テーブルの場合、Databricks Runtime 16.4 LTS 以上が必要です。
リキッドクラスタリングを使用する場合
Databricksストリーミング テーブルやマテリアライズドビューを含むすべての新しいテーブルに対してリキッドクラスタリングを推奨しています。 クラスタリングは特に次のシナリオで効果を発揮します。
- 高いカーディナリティの列によって頻繁にフィルタリングされるテーブル。
- データ分布に偏りがあるテーブル。
- 急速に増大し、メンテナンスとチューニングが必要となるテーブル。
- 並列書き込み要件を持つテーブル。
- 時間の経過とともにアクセス パターンが変化するテーブル。
- 一般的なパーティションキーでは、パーティションが多すぎたり少なすぎたりするテーブル。
リキッドクラスタリングを有効にする
リキッドクラスタリングは、パーティション化されていない既存のテーブルに対して、またはテーブルの作成中に有効にできます。 クラスタリングは、パーティションまたはZORDERと互換性がありません。Databricks では、テーブル内のデータのすべてのレイアウトおよび最適化操作をプラットフォームで管理できるようにすることをお勧めします。流動クラスタリングを有効にした後、 OPTIMIZEジョブを実行してデータを段階的にクラスター化します。 クラスタリングをトリガーする方法を参照してください。
クラスタリングを使用してテーブルを作成する
流動クラスタリングを有効にするには、以下の例のように、テーブル作成ステートメントにCLUSTER BY句を追加します。 Databricks Runtime 14.2 以降では、 PythonまたはScalaのDataFrame APIsと DeltaTable API使用して、 Delta Lakeテーブルの液体クラスタリングを有効にすることができます。
- SQL
- Python
- Scala
-- Create an empty Delta table with clustering on col0
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
-- Create table from existing data with clustering
-- Note: CLUSTER BY must appear after table name, not in SELECT clause
CREATE TABLE table2 CLUSTER BY (col0)
AS SELECT * FROM table1;
-- Copy table structure including clustering configuration
CREATE TABLE table3 LIKE table1;
# Create an empty Delta table with clustering on col0
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0") # Single clustering key
.execute())
# Create clustered table from existing DataFrame
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# Alternative: DataFrameWriterV2 API (DBR 14.2+)
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty Delta table with clustering on col0
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Create clustered table from existing DataFrame
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// Alternative: DataFrameWriterV2 API (DBR 14.2+)
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Databricks Runtime 16.0 以降では、構造化ストリーミング書き込みを使用して、液体クラスタリングを有効にしたテーブルを作成できます。 Databricks では、次の例のように、最高のパフォーマンスを得るために Databricks Runtime 16.4 以上を使用することをお勧めします。
- SQL
- Python
- Scala
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1);
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
リキッドクラスタリングが有効になっているDeltaテーブルは、 Deltaライター バージョン 7 とリーダー バージョン 3 を使用します。これらのプロトコルをサポートしていないDeltaクライアントは、これらのテーブルを読み取ることができません。 テーブル プロトコルのバージョンをダウングレードすることはできません。Delta Lake 機能の互換性とプロトコルを参照してください。
安全な機能の有効化 (削除など) をオーバーライドするには、 「安全な機能の有効化 (オプション) をオーバーライドする」を参照してください。
既存のテーブルで有効にする
次の構文を使用して、既存のパーティション分割されていない Delta テーブルでリキッドクラスタリングを有効にします。
-- Alter an existing table
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
Apache Icebergの場合、既存のマネージドIcebergテーブルでリキッドクラスタリングを有効にするときは、削除ベクトルと行 ID を明示的に無効にする必要があります。
デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。すべてのレコードに対して再クラスタリングを強制するには、 OPTIMIZE FULLを使用する必要があります。「すべてのレコードの再クラスタリングの強制」を参照してください。
クラスタリングキーを削除する
クラスタリング・キーを削除するには、次の構文を使用します。
ALTER TABLE table_name CLUSTER BY NONE;
クラスタリング キーの選択
Databricksサポートされているテーブルに対して自動リキッドクラスタリングを使用することをお勧めします。これは、クエリ パターンに基づいてクラスタリング キーをインテリジェントに選択します。 「自動リキッドクラスタリング」を参照してください。
主要な選択ガイドライン
クラスタリング キーを手動で指定する場合は、クエリ フィルターで最も頻繁に使用される列に基づいて列を選択します。クラスタリング キーは任意の順序で定義できます。2 つの列の相関性が高い場合は、そのうちの 1 つだけをクラスタリング キーとして含める必要があります。
最大 4 つのクラスタリング キー を指定できます。小さいテーブル (10 TB 未満) の場合、クラスタリング キーを多く使用すると、単一の列でフィルタリングするときにパフォーマンスが低下する可能性があります。たとえば、4 つのキーでフィルタリングすると、2 つのキーでフィルタリングするよりもパフォーマンスが低下します。ただし、テーブルのサイズが大きくなると、単一列のクエリではこのパフォーマンスの違いは無視できるようになります。
クラスタリング キーは、統計が収集された列である必要があります。デフォルトでは、Delta テーブルの最初の 32 列に統計が収集されます。Delta統計列の指定」を参照してください。
サポートされているデータ型
クラスタリングは、クラスタリング キーとして次のデータ タイプをサポートします。
- 日付
- Timestamp
- TimestampNTZ (Databricks Runtime 14.3 LTS 以上)
- 文字列
- 整数、長整数、短整数、バイト
- 浮動小数点数、倍精度浮動小数点数、小数点数
パーティション分割またはZ-Orderからの移行
既存のテーブルを変換する場合は、次の推奨事項を考慮してください。
現在のデータ最適化手法 | クラスタリングキーに関する推奨事項 |
|---|---|
Hiveスタイルのパーティション分割 | パーティション列をクラスタリングキーとして使用します。 |
Z-orderのインデックス作成 |
|
Hiveスタイルのパーティション分割とZ-order | パーティション列と |
カーディナリティを減らすためのジェネレーテッドカラム(タイムスタンプの日付など) | 元の列をクラスタリング キーとして使用し、生成された列は作成しないでください。 |
自動リキッドクラスタリング
Databricks Runtime 15.4 LTS以降では、 Unity Catalogで管理されるDeltaテーブルの自動リキッドクラスタリングを有効にすることができます。 自動流動クラスタリングにより、 Databricks CLUSTER BY AUTO句を使用してクエリのパフォーマンスを最適化するためのクラスタリング キーをインテリジェントに選択できるようになります。
自動リキッドクラスタリングの仕組み
自動リキッドクラスタリングは、使用パターンに基づいてインテリジェントな最適化を提供します。
- 予測的最適化が必要 : 自動キー選択とクラスタリング操作をメンテナンス操作として非同期に実行します。 Unity Catalogマネージドテーブルについては、「予測的最適化」を参照してください。
- クエリ ワークロードを分析 : Databricks はテーブルの履歴クエリ ワークロードを分析し、クラスタリングに最適な候補列を識別します。
- 変化に適応する : クエリ パターンやデータ分布が時間の経過とともに変化する場合、自動リキッドクラスタリングによって新しいキーが選択され、パフォーマンスが最適化されます。
- コストを考慮した選択 : Databricks は、データ スキップの改善によって予測されるコスト削減がデータ クラスタリング コストを上回る場合にのみ、クラスタリング キーを変更します。
自動液体クラスタリングは、次の理由によりキーを選択しない可能性があります。
- テーブルが小さすぎてリキッドクラスタリングの恩恵を受けられません。
- テーブルには、以前の手動キーまたはクエリ パターンに一致する自然な挿入順序のいずれかによる、効果的なクラスタリング スキームがすでに存在します。
- テーブルには頻繁なクエリはありません。
- Databricks Runtime 15.4 LTS 以降を使用していない。
データやクエリの特性に関係なく、すべてのUnity Catalogマネージドテーブルに自動リキッドクラスタリングを適用できます。 ヒューリスティックにより、クラスタリング キーを選択することがコスト面で有利かどうかが決定されます。
DBRバージョンの互換性
自動クラスタリングが有効になっているテーブルは、リキッドクラスタリングをサポートするすべてのバージョンから Databricks Runtime 読み書きできます。 ただし、インテリジェントなキー選択は、Databricks Runtime 15.4 LTS で導入されたメタデータに依存します。
Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードにメリットをもたらし、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。
自動リキッドクラスタリングを有効または無効にする
新規または既存のテーブルで自動リキッドクラスタリングを有効または無効にするには、次の構文を使用します。
- SQL
- Python
-- Create an empty table.
CREATE OR REPLACE TABLE table1(column01 int, column02 string) CLUSTER BY AUTO;
-- Enable automatic liquid clustering on an existing table,
-- including tables that previously had manually specified keys.
ALTER TABLE table1 CLUSTER BY AUTO;
-- Disable automatic liquid clustering on an existing table.
ALTER TABLE table1 CLUSTER BY NONE;
-- Disable automatic liquid clustering by setting the clustering keys
-- to chosen clustering columns or new columns.
ALTER TABLE table1 CLUSTER BY (column01, column02);
CLUSTER BY AUTOを指定せずにCREATE OR REPLACE table_name実行し、テーブルがすでに存在し、自動液体クラスタリングが有効になっている場合、 AUTO設定は無効になり、クラスタリング列は保持されません。 自動リキッドクラスタリングと以前に選択したクラスタリング列を保持するには、replace ステートメントにCLUSTER BY AUTOを含めます。 予測的最適化を保存すると、テーブルの履歴クエリ ワークロードが維持され、最適なクラスタリング キーが特定されます。
df = spark.read.table("table1")
df.write
.format("delta")
.option(“clusterByAuto”, “true”)
.saveAsTable(...)
# Set clustering columns and auto to provide a hint for initial selection
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.saveAsTable(...)
# Using DataFrameWriterV2
df.writeTo(...).using("delta")
.option("clusterByAuto", "true")
.create()
# Set clustering columns and auto to provide a hint for initial selection
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.create()
# Set clusterByAuto for streaming tables
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
# Specify a hint for clustering columns with both auto and columns
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Python API は、Databricks Runtime 16.4 以降で使用できます。.clusterBy .option('clusterByAuto', 'true)と一緒に使用する場合、動作は次のようになります。
- これにより自動リキッドクラスタリングが初めて設定される場合、手動入力が常に尊重され、
.clusterByのクラスタリング列が設定されます。 - これがすでに自動液体クラスタリングを備えたテーブルである場合は、
.clusterByを使用したヒントを 1 回だけ受け入れることができます。 たとえば、.clusterByで指定された列は、テーブルにクラスタリング列がまだ設定されていない場合にのみ設定されます。
Python は、テーブルを作成または置換する場合にのみ使用できます。SQL を使用して、既存のテーブルのclusterByAutoステータスを変更します。
自動クラスタリングが有効になっているかどうかを確認する
テーブルで自動リキッドクラスタリングが有効になっているかどうかを確認するには、DESCRIBE TABLE または SHOW TBLPROPERTIESを使用します。
自動リキッドクラスタリングが有効な場合、 clusterByAuto プロパティは trueに設定されます。 clusteringColumns プロパティは、自動または手動で選択された現在のクラスタリング列を示します。
制限
自動リキッドクラスタリングは Apache Icebergではご利用いただけません。
クラスタ化されたテーブルにデータを書き込む
クラスタ化されたDeltaテーブルに書き込むには、リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。クラスター化された Iceberg テーブルに書き込むには、Unity Catalog の Iceberg REST カタログ API を使用できます。Databricks では、Databricks Runtime 13.3 LTS 以降を使用する必要があります。
書き込み時のクラスタリングをサポートする操作
書き込み時にクラスター化する操作には、以下のものがあります。
INSERT INTOオペレーションCTASおよびRTASステートメントCOPY INTO(Parquet由来)spark.write.mode("append")
クラスタリングのサイズしきい値
書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。
クラスタリング列の数 | Unity Catalogマネージドテーブルのしきい値サイズ | 他のDeltaテーブルのしきい値サイズ |
|---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZEを頻繁に実行することをお勧めします。
ストリーミングワークロード
構造化ストリーミング ワークロードでは、Spark 構成 spark.databricks.delta.liquid.eagerClustering.streaming.enabled を trueに設定すると、書き込み時のクラスタリングがサポートされます。これらのワークロードのクラスタリングは、最後の 5 つのストリーミング更新のうち少なくとも 1 つが上の表のサイズしきい値を超えた場合にのみトリガーされます。
クラスタリングをトリガーする方法
予測的最適化 enabled tables の OPTIMIZE コマンドを自動的に実行します。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。予測的最適化を使用する場合、 Databricks スケジュールされた OPTIMIZE ジョブを無効にすることをお勧めします。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS 以上を使用する必要があります。Databricks では、大規模なテーブルでの OPTIMIZE パフォーマンスを高速化するために、Databricks Runtime 17.2 以上を推奨しています。テーブルでOPTIMIZEコマンドを使用します:
OPTIMIZE table_name;
リキッドクラスタリングは incremental です。つまり、 OPTIMIZEクラスタリングが必要なデータに対応するために、必要に応じてデータのみを書き換えます。 OPTIMIZEクラスタ化されているデータと一致しないクラスタリング キーを使用してデータ ファイルを書き換えません。
予測的最適化を使用していない場合、 Databricks では、通常の OPTIMIZE ジョブをデータのクラスタリングにスケジュールすることをお勧めします。 更新や挿入が多いテーブルの場合、Databricks では 1 時間または 2 時間ごとに OPTIMIZE ジョブをスケジュールすることをお勧めします。リキッドクラスタリングは増分であるため、クラスタ化されたテーブルのほとんどの OPTIMIZE ジョブは迅速に実行されます。
すべてのレコードの再クラスタリングを強制する
Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。
OPTIMIZE table_name FULL;
OPTIMIZE FULLを実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。
クラスタリングを初めて有効にする場合、またはクラスタリング キーを変更する場合は、 OPTIMIZE FULL を実行します。以前にOPTIMIZE FULL実行しており、クラスタリング キーに変更がなかった場合は、 OPTIMIZE FULL OPTIMIZEと同じように実行されます。 このシナリオでは、 OPTIMIZE増分アプローチを使用し、以前に圧縮されていないファイルのみを書き換えます。データ レイアウトが現在のクラスタリング キーを反映するようにするには、常にOPTIMIZE FULLを使用します。
クラスタ化テーブルからのデータの読み取り
削除ベクトルの読み取りをサポートする任意の Delta Lake クライアントを使用して、クラスター化された Delta テーブルのデータを読み取ることができます。Iceberg REST カタログ API を使用すると、クラスター化された Iceberg テーブルのデータを読み取ることができます。リキッドクラスタリングは、クラスタリング キーでフィルタリングする際の自動データ スキップを通じてクエリのパフォーマンスを向上させます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリングキーを管理する
テーブルがどのようにクラスタ化されているかを確認する
次の例のように、DESCRIBEコマンドを使用してテーブルのクラスタリングキーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
クラスタリング キーの変更
次の例のように、ALTER TABLEコマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリングキーを変更すると、後続のOPTIMIZE操作と書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。
次の例のように、キーをNONEに設定してクラスタリングをオフにすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスター キーをNONEに設定しても、クラスター化されたデータは書き換えられませんが、今後のOPTIMIZE操作でクラスタリング キーが使用されなくなります。
外部エンジンからリキッドクラスタリングを使用する
外部の Iceberg エンジンから、管理された Iceberg テーブルでリキッドクラスタリングを有効にできます。リキッドクラスタリングを有効にするには、テーブルの作成時にパーティション列を指定します。Unity Catalog は、パーティションをクラスタリング キーとして解釈します。たとえば、OSS Spark で次のコマンドを実行します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
リキッドクラスタリングを無効にすることができます。
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
クラスタリングキーは、Iceberg partition evolutionを使用して変更できます。
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
バケット変換を使用してパーティションを指定すると、Unity Catalog は式を削除し、列をクラスタリング キーとして使用します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
リキッドクラスタリングを使用するテーブルの互換性
リキッドクラスタリングは、読み取りと書き込みに特定のDatabricks Runtimeバージョンを必要とするDeltaテーブル機能を使用します。 Databricks Runtime 14.1 以降の流動クラスタリングで作成されたテーブルは、当然 v2 チェックポイントを使用します。 Databricks Runtime 13.3 LTS 以降では、v2 チェックポイントを使用してテーブルを読み書きできます。
v2 チェックポイントを無効にし、 Databricks Runtime 12.2 LTS 以降でリキッドクラスタリングを使用してテーブルを読み取るためにテーブルプロトコルをダウングレードできます。 Delta Lake テーブル機能の削除およびテーブル プロトコルのダウングレードを参照してください。
デフォルトの機能の有効化を上書きする(オプション)
有利Deltaテーブル機能の有効化は、リキッドクラスタリングの有効化中にオーバーライドできます。 これにより、これらのテーブル機能に関連付けられたリーダーおよびライター プロトコルがアップグレードされなくなります。次のステップを完了するには、既存のテーブルが必要です。
-
ALTER TABLEを使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。SQLALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false); -
次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。
SQLALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。
Delta機能 | Runtimeの互換性 | 有効化設定を上書きするためのプロパティ | 無効化によるリキッドクラスタリングへの影響 |
|---|---|---|---|
削除ベクトル | 読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。 |
| 削除ベクトルを無効にすると、行レベルの同時実行が無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。行レベルの同時実行による書き込み競合を参照してください。 |
行追跡 | 書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。 |
| 行トラッキングを無効にすると、行レベルの同時実行が無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。行レベルの同時実行による書き込み競合を参照してください。 |
チェックポイントV2 | 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。 |
| リキッドクラスタリングの動作には影響ありません。 |
制限
- DBR 15.1 以下 : 書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソース クエリはサポートされません。
- DBR 15.4 LTS以前 : 構造化ストリーミング書き込みを使用して、リキッドクラスタリングを有効にしたテーブルを作成することはできません。 構造化ストリーミングを使用すると、液体クラスタリングを有効にして既存のテーブルにデータを書き込むことができます。
- Apache Iceberg v2 : Iceberg テーブルでは削除ベクトルと行追跡がサポートされていないため、Apache Iceberg v2 の管理対象 Iceberg テーブルでは行レベルの同時実行はサポートされません。