テーブルにリキッドクラスタリングを使用する
リキッドクラスタリングは、テーブルのパーティショニングとZORDERに代わるデータ レイアウト最適化手法です。 クラスタリング キーに基づいてデータを自動的に整理することで、テーブル管理を簡素化し、クエリ パフォーマンスを最適化します。
従来のパーティショニングとは異なり、既存のデータを書き換えずにクラスタリング キーを再定義できます。これにより、分析ニーズの変化に合わせてデータ レイアウトを進化させることができます。リキッドクラスタリングはストリーミングテーブルとマテリアライズドビューの両方に適用されます。
リキッドクラスタリングは、Delta テーブルで一般提供されており、マネージド Apache Iceberg テーブルではパブリックプレビューで利用可能です。Deltaテーブルでは、GAサポートはDatabricks Runtime 15.2 以降で利用可能です。Databricks では、最高のパフォーマンスを実現するために、最新の Databricks Runtime を使用することを推奨しています。マネージド Apache Iceberg テーブルには、Databricks Runtime 16.4 LTS 以降が必要です。
マネージド Apache Iceberg v3 テーブルは、削除ベクトル、行追跡、行レベルの同時実行、および自動リキッドクラスタリングもサポートします。Databricks Runtime 18.0以上が必要です。Apache Iceberg v3 機能を使用を参照してください。
リキッドクラスタリングを使用する場合
Databricksストリーミング テーブルやマテリアライズドビューを含むすべての新しいテーブルに対してリキッドクラスタリングを推奨しています。 クラスタリングは特に次のシナリオで効果を発揮します。
- カーディナリティの高い列でフィルタリングを行うクエリ。
- データに著しい偏りがあるテーブル。
- 急速に成長するテーブルは、メンテナンスと調整に手間がかかる。
- 並列書き込み要件のあるテーブル。
- アクセスパターンが多様または変化するテーブル。
- 典型的なパーティションキーでは、パーティションの数が多すぎたり少なすぎたりする結果が返される可能性があるテーブル。
リキッドクラスタリングを有効にする
リキッドクラスタリングは、パーティション化されていない既存のテーブルに対して、またはテーブルの作成中に有効にできます。 クラスタリングは、パーティションまたはZORDERと互換性がありません。Databricks では、テーブル内のデータのすべてのレイアウトおよび最適化操作をプラットフォームで管理できるようにすることをお勧めします。流動クラスタリングを有効にした後、 OPTIMIZEジョブを実行してデータを段階的にクラスター化します。 クラスタリングをトリガーする方法を参照してください。
クラスタリングを使用してテーブルを作成する
リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントにCLUSTER BYフレーズを追加します。Databricks Runtime 14.2以降では、PythonまたはScalaでDataFrame APIsとDeltaTable APIを使用して、Deltaテーブルのリキッドクラスタリングを有効にできます。
- SQL
- Python
- Scala
クラスタリングを使用して空のテーブルを作成する:
CREATE TABLE table1 (col0 INT, col1 STRING) CLUSTER BY (col0);
既存のデータからクラスタリングを使用してテーブルを作成するには、CLUSTER BY はテーブル名の後に記述する必要があり、SELECT 句には記述しません。
CREATE TABLE table2 CLUSTER BY (col0)
AS SELECT * FROM table1;
クラスタリング構成を含め、テーブル構造をコピーするには:
CREATE TABLE table3 LIKE table1;
クラスターを使用して DeltaTable API で空のテーブルを作成するには:
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
既存のDataFrameからテーブルを作成する
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
DataFrameWriterV2 API を使用してテーブルを作成するには(Databricks Runtime 14.2 以降で利用可能):
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
クラスターを使用して DeltaTable API で空のテーブルを作成するには:
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
既存のDataFrameからテーブルを作成する
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
DataFrameWriterV2 API を使用してテーブルを作成するには(Databricks Runtime 14.2 以降で利用可能):
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
DataFrame APIs使用してクラスタリング キーを設定する場合、クラスタリング列を指定できるのは、テーブルの作成時またはoverwriteモードの使用時 ( CREATE OR REPLACE TABLE操作など) のみです。 appendモードを使用する場合、クラスタリング キーを変更することはできません。
既存のテーブルにデータを追加しながらクラスタリングキーを変更するには、SQL ALTER TABLEコマンドを使用して、データ書き込み操作とは別にクラスタリング構成を変更します。クラスタリングキーの変更を参照してください。
Databricks Runtime 16.0 以降では、構造化ストリーミング書き込みを使用して、液体クラスタリングを有効にしたテーブルを作成できます。 Databricks では、次の例のように、最高のパフォーマンスを得るために Databricks Runtime 16.4 以上を使用することをお勧めします。
- SQL
- Python
- Scala
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1);
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
リキッドクラスタリングが有効になっているDeltaテーブルは、 Deltaライター バージョン 7 とリーダー バージョン 3 を使用します。これらのプロトコルをサポートしていないDeltaクライアントは、これらのテーブルを読み取ることができません。 テーブル プロトコルのバージョンをダウングレードすることはできません。Delta Lake 機能の互換性とプロトコルを参照してください。
デフォルトの機能有効化 (削除ベクトルなど) をオーバーライドするには、 デフォルトの機能有効化 (オプション) をオーバーライドするを参照してください。
既存のテーブルで有効にする
既存のパーティション化されていないDeltaテーブルでリキッドクラスタリングを有効にするには、次を実行します。
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
マネージドApache Icebergテーブルについては、次の点をご検討ください。
- v2仕様のテーブルの場合、既存のテーブルでリキッドクラスタリングを有効にする際は、明示的に削除ベクトルと行追跡をオフにする必要があります。
- v3 仕様のテーブルでは、削除ベクトルと行追跡がサポートされているため、これらの機能を無効にする必要はありません。Apache Iceberg v3 機能を使用を参照してください。
デフォルトの動作では、既に書き込まれたデータにはクラスタリングは適用されません。再クラスタリングを強制するには、 OPTIMIZE FULLまたはOPTIMIZE FULL WHERE <predicate>を使用します。強制再クラスタリングを参照してください。
パーティション化されたテーブルをリキッドクラスタリングに変換する
Databricks Runtime 18.1 以降では、既存のパーティション分割された Delta Lake テーブルをリキッドクラスタリングに変換するには、ALTER TABLE ステートメントで REPLACE PARTITIONED BY WITH CLUSTER BY を使用します。変換はリーダーとライターのダウンタイムを最小限に抑え、外部テーブルとマネージドテーブルの両方をサポートします。変換後、テーブルは Databricks Runtime 13.3 LTS 以上での読み取りをサポートします。
マネージド Iceberg テーブルの場合、これらのテーブルはパーティション定義をリキッドクラスタリングキーとして使用するため、変換は不要です。変換コマンドの実行中にエラーが発生します。
パーティション分割されたテーブルをリキッドクラスタリングに変換することの利点は次のとおりです:
- データスキップ効率の低下や過剰なパーティション分割に起因するテーブルのパフォーマンス向上。
CLUSTER BY AUTOを使用すると、頻繁にクエリパターンが変化するテーブルのパフォーマンスが自動的に向上します。- クラスタリング・カラムは柔軟で変更が簡単です、一方、パーティションは固定されており、変更が困難です。
- リキッドクラスタリングが適用されたテーブルでは行レベルの同時実行が可能になるため、書き込みの競合が軽減されます。行レベルの同時実行性を参照してください。
構文
ALTER TABLE <table_name>
REPLACE PARTITIONED BY WITH CLUSTER BY [( <clustering_columns> ) | AUTO]
CLUSTER BY句は以下のオプションに対応しています:
( <clustering_columns> ):新しいクラスタリング列を指定します。Databricks は、新しいクラスタリング列を元のパーティション列と似たものに保つことを推奨します。非常に異なる列を使用すると、最初のOPTIMIZE実行で大規模な再クラスタリング操作がトリガーされます。AUTO: 現在のパーティション列を初期クラスタリング列として使用し、予測的最適化により時間とともに適応させます。Unity Catalog マネージドテーブルでのみ利用可能です。自動リキッドクラスタリングを参照してください。- オプション未指定 :現在のパーティション列を新しいクラスタリング列として使用します。
パーティション分割されたテーブルから移行する際のクラスタリングキーの選択については、「パーティション分割またはZ-Orderからの移行」を参照してください。
例
元のパーティションとは異なる列でクラスター化する場合(たとえば、(year, month, day) でパーティション分割されたテーブルの場合)、次の操作を実行します。
ALTER TABLE t1 REPLACE PARTITIONED BY WITH CLUSTER BY (day, id);
OPTIMIZE t1;
クラスタリング列の変更を利用するには、OPTIMIZEを実行する必要があります。
自動リキッドクラスタリングを使用し、現在のパーティション列から開始するには、次の操作を行います。
ALTER TABLE t2 REPLACE PARTITIONED BY WITH CLUSTER BY AUTO;
現在のパーティション列をクラスタリング列として維持するには、次の手順を実行します:
ALTER TABLE t3 REPLACE PARTITIONED BY WITH CLUSTER BY;
変換中の並列読み取りと書き込みの処理
変換後、Databricks Runtime 13.3 LTS 以降は読み取りと書き込みでサポートされています。Databricksでは、変換中にテーブルへの読み書きを行うワークロードに対して、Databricks Runtime 15.4 LTS 以上をお勧めします。
変換中の並列の読み取りおよび書き込みワークロードの処理方法については、以下の表を参照してください。
ワークロードタイプ | 変換中に読み込みます | 変換中の書き込み |
|---|---|---|
バッチ | ダウンタイムはありません。すべての Databricks Runtime バージョンは、変換中にテーブルを読み取ることができます。 | Databricks Runtime 15.4 以降でダウンタイムなし。Databricks Runtime 15.3 およびそれ以前のバージョンでは、Databricks は変換する前にワークロードを停止し、変換が完了した後でワークロードを再開することを推奨しています。 |
ストリーミング | (スキーマトラッキングと列マッピング): コミットを失うことなくストリームを再起動します。 スキーマトラッキングと列マッピングなしで :ストリームは例外を発生させます。新しいチェックポイントの場所と開始バージョンで再起動します。コミットは失われません。 | コミットを一切失うことなくストリームを再開します。 |
変換の検証またはロールバック
変換を確認するには、DESCRIBE EXTENDED を実行して、新しいクラスタリング列を確認します。DESCRIBE HISTORYを実行して、一連のREORG操作、UPGRADE PROTOCOL操作、およびREPLACE PARTITIONED BY WITH CLUSTER BY操作を確認できます。
変換をロールバックするには、RESTOREを使用して以前のバージョンに戻します。または、REPLACE TABLE ... PARTITIONED BY (...) AS SELECT * FROM ...を使用してテーブルを書き換えることができます。
RESTORE を使用してロールバックするには、次のコマンドを実行します。
ALTER TABLE my_table CLUSTER BY NONE;
ALTER TABLE my_table UNSET TBLPROPERTIES ('delta.liquid.hierarchicalClusteringColumns');
RESTORE TABLE my_table TO VERSION AS OF <version_number_before_conversion>;
See RESTORE.
タイムスタンプ列によってパーティション分割されたテーブルを変換する
タイムスタンプ列 (timestamp_col) でパーティション分割されたテーブル (t1) を変換し、そのタイムスタンプ列をクラスターキーとして使用するには、追加の設定を行う必要があります。
SET spark.databricks.delta.liquidConversion.statsGeneration.enabled = false;
ALTER TABLE t1 REPLACE PARTITIONED BY WITH CLUSTER BY (timestamp_col, id);
ANALYZE TABLE t1 COMPUTE DELTA STATISTICS;
これらの構成なしにタイムスタンプパーティション列をクラスタリング列に変換しようとすると、コマンドはエラーを返します:
ALTER TABLE REPLACE PARTITIONED BY WITH CLUSTER BY cannot auto-generate stats on table with column event_ts due to unsupported type: timestamp. Disable stats auto-generation by setting 'spark.databricks.delta.liquidConversion.statsGeneration.enabled' to 'false' and retry the command again. SQLSTATE: 42000
変換制限
REPLACE PARTITIONED BY WITH CLUSTER BY変換コマンドには次の制限が適用されます。
- LakeFlow Spark宣言型パイプラインのパイプラインから作成されたストリーミングテーブルおよびマテリアライズドビューはサポートされていません。リキッドクラスタリングを使用するには、パイプライン定義を
PARTITIONED BYの代わりにCLUSTER BYを使用するように更新する必要があります。 - Delta Sharingでパーティションフィルタリングを使用しているテーブルはサポートされていません。Delta Sharing のパーティションフィルタリングに関する情報については、共有するテーブルパーティションの指定をご覧ください。
クラスタリングキーを削除する
クラスタリング・キーを削除するには、次の構文を使用します。
ALTER TABLE table_name CLUSTER BY NONE;
クラスタリング キーの選択
Databricksサポートされているテーブルに対して自動リキッドクラスタリングを使用することをお勧めします。これは、クエリ パターンに基づいてクラスタリング キーをインテリジェントに選択します。 「自動リキッドクラスタリング」を参照してください。
主要な選択ガイドライン
クラスタリング キーを手動で指定する場合は、クエリ フィルターで最も頻繁に使用される列に基づいて列を選択します。クラスタリング キーは任意の順序で定義できます。2 つの列の相関性が高い場合は、そのうちの 1 つだけをクラスタリング キーとして含める必要があります。
クラスタリングキーは最大4つまで 指定できます。テーブルのサイズが小さい場合(10 TB未満)、クラスタリングキーを多く使用すると、単一の列でフィルタリングする際のパフォーマンスが低下する可能性があります。例えば、4つのキーを使ったフィルタリングは、2つのキーを使ったフィルタリングよりもパフォーマンスが劣ります。しかし、テーブルサイズが大きくなるにつれて、単一列クエリの場合、このパフォーマンスの差は無視できるほど小さくなる。
クラスタリング キーは、統計が収集された列である必要があります。デフォルトでは、Delta テーブルの最初の 32 列に統計が収集されます。「統計列の指定」を参照してください。
サポートされているデータ型
クラスタリングは、クラスタリング キーとして次のデータ タイプをサポートします。
- 日付
- Timestamp
- TimestampNTZ (Databricks Runtime 14.3 LTS 以上)
- 文字列
- 整数、長整数、短整数、バイト
- 浮動小数点数、倍精度浮動小数点数、小数点数
パーティション分割またはZ-Orderからの移行
Databricks では、REPLACE PARTITIONED BY WITH CLUSTER BY コマンドで自動変換を使用することをお勧めします。パーティション分割されたテーブルをリキッドクラスタリングに変換するを参照してください。
既存のテーブルを変換する場合は、次の推奨事項を考慮してください。
現在のデータ最適化手法 | クラスタリングキーに関する推奨事項 |
|---|---|
Hiveスタイルのパーティション分割 | パーティション列をクラスタリングキーとして使用します。 |
Z-orderのインデックス作成 |
|
Hiveスタイルのパーティション分割とZ-order | パーティション列と |
カーディナリティを減らすためのジェネレーテッドカラム(タイムスタンプの日付など) | 元の列をクラスタリング キーとして使用し、生成された列は作成しないでください。 |
自動リキッドクラスタリング
Databricks Runtime 15.4 LTS以降では、Unity CatalogマネージドDeltaテーブルの自動リキッドクラスタリングを有効にできます。Unity Catalog マネージド Apache Iceberg v3 テーブルの場合、自動リキッドクラスタリングには Databricks Runtime 18.0 以降が必要です。自動リキッドクラスタリングを使用すると、Databricks は CLUSTER BY AUTO 節を使用して、クエリのパフォーマンスを最適化するクラスタリングキーをインテリジェントに選択できます。
自動リキッドクラスタリングは、LakeFlow Spark宣言型パイプラインやスタンドアロンのパイプラインを含むマテリアライズドビューとストリーミングテーブルでもサポートされています。CLUSTER BY AUTOをパイプラインまたはSQLの定義で指定してください。
自動リキッドクラスタリングの仕組み
自動リキッドクラスタリングは、使用パターンに基づいてインテリジェントな最適化を提供します。
- 予測的最適化が必要 : 自動キー選択とクラスタリング操作をメンテナンス操作として非同期に実行します。 Unity Catalogマネージドテーブルについては、「予測的最適化」を参照してください。
- クエリ ワークロードを分析 : Databricks はテーブルの履歴クエリ ワークロードを分析し、クラスタリングに最適な候補列を識別します。
- 変化に適応する : クエリ パターンやデータ分布が時間の経過とともに変化する場合、自動リキッドクラスタリングによって新しいキーが選択され、パフォーマンスが最適化されます。
- コストを考慮した選択 : Databricks は、データ スキップの改善によって予測されるコスト削減がデータ クラスタリング コストを上回る場合にのみ、クラスタリング キーを変更します。
自動液体クラスタリングは、次の理由によりキーを選択しない可能性があります。
- テーブルが小さすぎてリキッドクラスタリングの恩恵を受けられません。
- テーブルには、以前の手動キーまたはクエリ パターンに一致する自然な挿入順序のいずれかによる、効果的なクラスタリング スキームがすでに存在します。
- テーブルには頻繁なクエリはありません。
- Databricks Runtime 15.4 LTS 以降を使用していない。
データやクエリの特性に関係なく、すべてのUnity Catalogマネージドテーブルに自動リキッドクラスタリングを適用できます。 ヒューリスティックにより、クラスタリング キーを選択することがコスト面で有利かどうかが決定されます。
Databricks Runtime バージョンの互換性
自動クラスタリングが有効になっているテーブルは、リキッドクラスタリングをサポートするすべてのバージョンから Databricks Runtime 読み書きできます。 ただし、インテリジェントなキー選択は、Databricks Runtime 15.4 LTS で導入されたメタデータに依存します。
Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードにメリットをもたらし、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。
自動リキッドクラスタリングを有効または無効にする
- SQL
- Python
自動リキッドクラスタリングを使用してテーブルを作成するには:
CREATE OR REPLACE TABLE table1 (column01 int, column02 string) CLUSTER BY AUTO;
既存のテーブルで、手動で指定されたキーを持つテーブルを含め、自動リキッドクラスタリングを有効にするには:
ALTER TABLE table1 CLUSTER BY AUTO;
SQL は、自動リキッドクラスタリングを有効にする際に、初期クラスタリング列のヒントをサポートしていません。最初のキー選択のヒントを提供するには、Python APIを使用してください。
自動リキッドクラスタリングをオフにするには:
ALTER TABLE table1 CLUSTER BY NONE;
自動リキッドクラスタリングをオフにし、クラスタリングカラムを指定するには:
ALTER TABLE table1 CLUSTER BY (column01, column02);
既存のテーブルで自動リキッドクラスタリングが有効になっている場合、CLUSTER BY AUTOを指定せずにCREATE OR REPLACE table_nameを実行すると、自動クラスタリングが無効になり、クラスタリングカラムは保持されません。自動リキッドクラスタリングと以前に選択された列を保持するには、replace ステートメントに CLUSTER BY AUTO を含めます。CLUSTER BY AUTOでは、予測的最適化は、テーブルの履歴クエリワークロードを使用して最適なクラスタリングキーを特定します。
Python APIはDatabricks Runtime 16.4以降で利用できます。Pythonは、テーブルを作成または置換する際にのみ使用できます。既存のテーブルの clusterByAuto ステータスを変更するには、SQL を使用します。
自動リキッドクラスタリングを使用してテーブルを作成するには、DataFrameWriter を使用します:
df = spark.read.table("table1")
df.write
.format("delta")
.option("clusterByAuto", "true")
.saveAsTable(...)
DataFrameWriterを使用してキー選択のための初期クラスタリング列ヒントを指定するには:
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.saveAsTable(...)
自動リキッドクラスタリングを使用してテーブルを作成するには、DataFrameWriterV2 を使用します:
df.writeTo(...).using("delta")
.option("clusterByAuto", "true")
.create()
DataFrameWriterV2を使用してキー選択のための初期クラスタリング列ヒントを指定するには:
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.create()
自動リキッドクラスタリングが有効なストリーミングテーブルを作成するには:
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
ストリーミングテーブルでキー選択の初期クラスタリング列のヒントを指定するには:
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
.clusterBy をクラスタキー選択ヒントに使用し、.option('clusterByAuto', 'true') と併用する場合、動作は次のとおりです。
- これにより初めて自動リキッドクラスタリングを設定する場合、クラスタリング列は
.clusterByで指定されたカラムに設定されます。 - 自動リキッドクラスタリングが有効になっている既存のテーブルである場合、
.clusterByヒントは一度のみ受け入れられます。たとえば、.clusterByで指定されたカラムは、テーブルにクラスタリングカラムが設定されていない場合にのみ設定されます。
When using DataFrame APIs, the clusterByAuto option can only be set when using overwrite mode. appendモードを使用している場合は、clusterByAutoを設定できません。この制限は、手動でクラスタリング列を設定する場合と同じです。クラスタリング設定は、テーブルの作成時または置換操作中にoverwriteモードでのみ設定できます。
回避策として、データの追加中に既存のテーブルのclusterByAutoステータスを変更する場合は、SQL ALTER TABLEコマンドを使用して、データ書き込み操作とは別にクラスタリング構成を変更します。
自動クラスタリングが有効になっているかどうかを確認する
テーブルで自動リキッドクラスタリングが有効になっているかどうかを確認するには、DESCRIBE TABLE または SHOW TBLPROPERTIESを使用します。
自動リキッドクラスタリングが有効な場合、 clusterByAuto プロパティは trueに設定されます。 clusteringColumns プロパティは、自動または手動で選択された現在のクラスタリング列を示します。
制限
自動リキッドクラスタリングは、マネージド Apache Iceberg v2 テーブルには利用できません。Databricks Runtime 18.0 以降では、マネージド Apache Iceberg v3 テーブルがサポートされています。
クラスタ化されたテーブルにデータを書き込む
クラスタ化されたDeltaテーブルに書き込むには、リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。クラスター化された Iceberg テーブルに書き込むには、Unity Catalog の Iceberg REST カタログ API を使用できます。Databricks では、Databricks Runtime 13.3 LTS 以降を使用する必要があります。
書き込み時のクラスタリングをサポートする操作
書き込み時にクラスター化する操作には、以下のものがあります。
INSERT INTOオペレーションCTASおよびRTASステートメントCOPY INTO(Parquet由来)spark.write.mode("append")
クラスタリングのサイズしきい値
書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。
クラスタリング列の数 | Unity Catalogマネージドテーブルのしきい値サイズ | 他のDeltaテーブルのしきい値サイズ |
|---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZEを頻繁に実行することをお勧めします。
ストリーミングワークロード
構造化ストリーミング ワークロードでは、Spark 構成 spark.databricks.delta.liquid.eagerClustering.streaming.enabled を trueに設定すると、書き込み時のクラスタリングがサポートされます。これらのワークロードのクラスタリングは、最後の 5 つのストリーミング更新のうち少なくとも 1 つが上の表のサイズしきい値を超えた場合にのみトリガーされます。
クラスタリングをトリガーする方法
予測的最適化 enabled tables の OPTIMIZE コマンドを自動的に実行します。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。予測的最適化を使用する場合、 Databricks スケジュールされた OPTIMIZE ジョブを無効にすることをお勧めします。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS 以上を使用する必要があります。Databricks では、大規模なテーブルでの OPTIMIZE パフォーマンスを高速化するために、Databricks Runtime 17.2 以上を推奨しています。テーブルでOPTIMIZEコマンドを使用します:
OPTIMIZE table_name;
リキッドクラスタリングは インクリメンタルで あり、 OPTIMIZEクラスタリングが必要なデータに対応するために必要な場合にのみデータを書き換えます。OPTIMIZE 、クラスタリング対象のデータと一致しないクラスタリングキーを使用してデータファイルを書き換えません。強制再クラスタリングを参照してください。
予測的最適化を使用していない場合、 Databricks では、通常の OPTIMIZE ジョブをデータのクラスタリングにスケジュールすることをお勧めします。 更新や挿入が多いテーブルの場合、Databricks では 1 時間または 2 時間ごとに OPTIMIZE ジョブをスケジュールすることをお勧めします。リキッドクラスタリングは増分であるため、クラスタ化されたテーブルのほとんどの OPTIMIZE ジョブは迅速に実行されます。
強制再クラスタリング
Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。
OPTIMIZE table_name FULL;
OPTIMIZE FULLを実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。
クラスタリングを初めて有効にする場合、またはクラスタリング キーを変更する場合は、 OPTIMIZE FULL を実行します。以前にOPTIMIZE FULL実行しており、クラスタリング キーに変更がなかった場合は、 OPTIMIZE FULL OPTIMIZEと同じように実行されます。 このシナリオでは、 OPTIMIZE増分アプローチを使用し、以前に圧縮されていないファイルのみを書き換えます。データ レイアウトが現在のクラスタリング キーを反映するようにするには、常にOPTIMIZE FULLを使用します。
部分的な再クラスター
Databricks Runtime 18.1以降では、 OPTIMIZE FULL WHERE <predicate>を使用してレコードのサブセットの再クラスタリングを強制できます。ファイルのいずれかの部分が述語と重複する場合、そのファイルはインクルードされます。See パラメーター.
OPTIMIZE events FULL WHERE event_date >= '2025-01-01';
クラスタ化テーブルからのデータの読み取り
削除ベクトルの読み取りをサポートする任意の Delta Lake クライアントを使用して、クラスター化された Delta テーブルのデータを読み取ることができます。Iceberg REST カタログ API を使用すると、クラスター化された Iceberg テーブルのデータを読み取ることができます。リキッドクラスタリングは、クラスタリング キーでフィルタリングする際の自動データ スキップを通じてクエリのパフォーマンスを向上させます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリングキーを管理する
テーブルがどのようにクラスタ化されているかを確認する
次の例のように、DESCRIBEコマンドを使用してテーブルのクラスタリングキーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
クラスタリング キーの変更
次の例のように、ALTER TABLEコマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリングキーを変更すると、後続のOPTIMIZEおよび書き込み操作では新しいクラスタリング手法が使用されますが、既存のデータは書き換えられません。更新されたクラスタリングキーを使用して既存のデータを書き換えるには、 「強制的な再クラスタリング」を参照してください。
次の例のように、キーをNONEに設定してクラスタリングをオフにすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスター キーをNONEに設定しても、クラスター化されたデータは書き換えられませんが、今後のOPTIMIZE操作でクラスタリング キーが使用されなくなります。
外部エンジンからリキッドクラスタリングを使用する
外部の Iceberg エンジンから、管理された Iceberg テーブルでリキッドクラスタリングを有効にできます。リキッドクラスタリングを有効にするには、テーブルの作成時にパーティション列を指定します。Unity Catalog は、パーティションをクラスタリング キーとして解釈します。たとえば、OSS Spark で次のコマンドを実行します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
リキッドクラスタリングをオフにするには:
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
Icebergパーティション進化を使用してクラスタリングキーを変更するには:
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
バケット変換を使用してパーティションを指定すると、Unity Catalog は式を削除し、列をクラスタリング キーとして使用します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
リキッドクラスタリングを使用するテーブルの互換性
リキッドクラスタリングは、読み取りと書き込みに特定のDatabricks Runtimeバージョンを必要とするDeltaテーブル機能を使用します。Databricks Runtime 14.1 以降のリキッドクラスタリングで作成されたテーブルは、デフォルトで v2 チェックポイントを使用します。Databricks Runtime 13.3 LTS 以降では、V2 チェックポイントを使用してテーブルを読み書きできます。チェックポイントV2を参照してください。
Databricks Runtime 12.2 LTSから13.2を使用するリーダーをサポートするには、チェックポイントV2を無効にし、テーブルプロトコルをダウングレードしてください。クラシック版へのダウングレードを参照してください。
デフォルトの機能の有効化を上書きする(オプション)
有利Deltaテーブル機能の有効化は、リキッドクラスタリングの有効化中にオーバーライドできます。 これにより、これらのテーブル機能に関連付けられたリーダーおよびライタープロトコルのアップグレードが防止されます。次のステップを完了するには、既存のテーブルが必要です。
-
ALTER TABLEを使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。SQLALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false); -
次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。
SQLALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。
Delta機能 | Runtimeの互換性 | 有効化設定を上書きするためのプロパティ | オフにした場合の液体クラスタリングへの影響 |
|---|---|---|---|
削除ベクトル | 読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。 |
| 削除ベクトルを無効にすると、行レベルの同時実行が無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。行レベルの同時実行性を参照してください。
|
行追跡 | 書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。 |
| 行トラッキングを無効にすると、行レベルの同時実行が無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。行レベルの同時実行性を参照してください。 |
チェックポイントV2 | 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。 |
| リキッドクラスタリングの動作には影響しません。 チェックポイントV2を参照してください。 |
制限
- Databricks Runtime 15.1 以下 : 書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソース クエリはサポートされません。
- Databricks Runtime 15.4 LTS以前 : 構造化ストリーミング書き込みを使用して、リキッドクラスタリングを有効にしたテーブルを作成することはできません。 構造化ストリーミングを使用すると、液体クラスタリングを有効にして既存のテーブルにデータを書き込むことができます。
- Apache Iceberg v2 :Managed Apache Iceberg v2 テーブルでは、削除ベクトルと行追跡がサポートされていないため、行レベルの同時実行性はサポートされません。
- 行レベルのコンカレンシーは、管理対象のApache Iceberg v3テーブルでサポートされています。v3仕様が削除ベクトルと行追跡をサポートしているためです。Apache Iceberg v3 機能を使用を参照してください。