テーブルにリキッドクラスタリングを使用する
リキッドクラスタリングは、テーブルのパーティション分割とZORDERを置き換えるデータレイアウト最適化手法です。クラスタリングキーに基づいてデータを自動的に整理することで、テーブル管理を簡素化し、クエリのパフォーマンスを最適化します。
従来のパーティショニングとは異なり、既存のデータを書き換えることなくクラスタリングキーを再定義できます。これにより、データレイアウトを変化する分析ニーズに合わせて進化させることができます。リキッドクラスタリングは、ストリーミングテーブルとマテリアライズドビューの両方に適用されます。
リキッドクラスタリングは、Delta Lakeテーブルでは一般提供されており、マネージド Apache Iceberg テーブルではパブリックプレビューで提供されています。Delta Lakeテーブルでは、Databricks Runtime 15.2以降でGAサポートが一般提供されています。Databricks では、最高のパフォーマンスを実現するために、最新の Databricks Runtime を使用することを推奨しています。マネージド Apache Iceberg テーブルには、Databricks Runtime 16.4 LTS 以降が必要です。
マネージド Apache Iceberg v3 テーブルは、削除ベクトル、行追跡、行レベルの同時実行、および自動リキッドクラスタリングもサポートします。Databricks Runtime 18.0以上が必要です。Apache Iceberg v3 機能を使用を参照してください。
リキッドクラスタリングの使用時期
Databricks では、ストリーミングテーブルとマテリアライズドビューを含むすべての新しいテーブルにリキッドクラスタリングを推奨しています。次のシナリオは、クラスタリングの恩恵を特に受けます。
- カーディナリティの高い列でフィルター処理するクエリ
- データ スキューが大きいテーブル。
- 急速に増大し、メンテナンスとチューニングが必要となるテーブル。
- 並列書き込み要件のあるテーブル。
- 多様または変化するアクセスパターンを持つテーブル。
- 一般的なパーティションキーでは、多すぎる、または少なすぎるパーティションから結果が返される可能性があるテーブル。
リキッドクラスタリングを有効にする
リキッドクラスタリングは、既存のパーティション分割されていないテーブルに対して、またはテーブルの作成時に有効にすることができます。クラスタリングはパーティション分割や ZORDERと互換性がありません。Databricks は、テーブル内のデータのすべてのレイアウト操作と最適化操作をプラットフォームに管理させることをお勧めします。リキッドクラスタリングを有効にした後、OPTIMIZEジョブを実行してデータを増分的にクラスター化します。クラスタリングをトリガーする方法を参照してください。
クラスタリングに対応したテーブルを作成する
リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントに CLUSTER BY フレーズを追加します。Databricks Runtime 14.2以降では、PythonまたはScalaでDataFrame APIとDeltaTable APIを使用してリキッドクラスタリングを有効にできます。
- SQL
- Python
- Scala
クラスタリングを使用した空のテーブルを作成するには:
CREATE TABLE table1 (col0 INT, col1 STRING) CLUSTER BY (col0);
既存のデータからクラスタリングを使用してテーブルを作成するには、CLUSTER BY をテーブル名の後に配置する必要があり、SELECT 句には配置しません。
CREATE TABLE table2 CLUSTER BY (col0)
AS SELECT * FROM table1;
クラスター構成を含めてテーブル構造をコピーするには:
CREATE TABLE table3 LIKE table1;
クラスター化された空のテーブルを作成するには、DeltaTable API を使用します。
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
既存のデータフレームからテーブルを作成するには:
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
DataFrameWriterV2 API を使用してテーブルを作成するには (Databricks Runtime 14.2 以降で利用可能):
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
クラスター化された空のテーブルを作成するには、DeltaTable API を使用します。
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
既存のデータフレームからテーブルを作成するには:
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
DataFrameWriterV2 API を使用してテーブルを作成するには (Databricks Runtime 14.2 以降で利用可能):
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
DataFrame APIs を使用してクラスタリングキーを設定する場合、テーブル作成時、または overwrite モード (CREATE OR REPLACE TABLE オペレーションなど) を使用している場合にのみ、クラスタリングカラムを指定できます。append モードを使用している場合、クラスタリング キーを変更することはできません。
既存のテーブルでクラスタリングキーを変更しながらデータを追加するには、データ書き込み操作とは別にSQL ALTER TABLEコマンドを使用してクラスタリング設定を変更します。クラスタリング キーの変更を参照してください。
Databricks Runtime 16.0 以降では、構造化ストリーミングの書き込みを使用して、リキッドクラスタリングが有効なテーブルを作成できます。Databricks は、次の例に示すように、最高のパフォーマンスを得るために Databricks Runtime 16.4 以降を使用することをお勧めします。
- SQL
- Python
- Scala
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1);
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
リキッドクラスタリングが有効になっているDelta Lakeテーブルは、Deltaライターバージョン7およびリーダーバージョン3を使用します。これらのプロトコルをサポートしないDeltaクライアントは、これらのテーブルを読み取れません。テーブル プロトコル バージョンはダウングレードできません。Delta Lake 機能の互換性とプロトコルを参照してください。
デフォルトの機能有効化 (削除ベクトルなど) をオーバーライドするには、 デフォルトの機能有効化 (オプション) をオーバーライドするを参照してください。
既存のテーブルで有効にする
既存のパーティション化されていないDelta Lakeテーブルでリキッドクラスタリングを有効にするには、以下を実行してください。
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
マネージドApache Icebergテーブルについては、次の点をご検討ください。
- v2仕様のテーブルの場合、既存のテーブルでリキッドクラスタリングを有効にする際は、明示的に削除ベクトルと行追跡をオフにする必要があります。
- v3 仕様のテーブルでは、削除ベクトルと行追跡がサポートされているため、これらの機能を無効にする必要はありません。Apache Iceberg v3 機能を使用を参照してください。
デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。再クラスタリングを強制するには、OPTIMIZE FULL または OPTIMIZE FULL WHERE <predicate> を使用してください。「再クラスタリングの強制」を参照してください。
パーティション化されたテーブルをリキッドクラスタリングに変換する
Databricks Runtime 18.1 以降では、既存のパーティション分割された Delta Lake テーブルをリキッドクラスタリングに変換するには、ALTER TABLE ステートメントで REPLACE PARTITIONED BY WITH CLUSTER BY を使用します。変換はリーダーとライターのダウンタイムを最小限に抑え、外部テーブルとマネージドテーブルの両方をサポートします。変換後、テーブルは Databricks Runtime 13.3 LTS 以上での読み取りをサポートします。
マネージド Iceberg テーブルの場合、これらのテーブルはパーティション定義をリキッドクラスタリングキーとして使用するため、変換は不要です。変換コマンドの実行中にエラーが発生します。
パーティション分割されたテーブルをリキッドクラスタリングに変換することの利点は次のとおりです:
- データスキップ効率の低下や過剰なパーティション分割に起因するテーブルのパフォーマンス向上。
CLUSTER BY AUTOを使用すると、頻繁にクエリパターンが変化するテーブルのパフォーマンスが自動的に向上します。- クラスタリング・カラムは柔軟で変更が簡単です、一方、パーティションは固定されており、変更が困難です。
- リキッドクラスタリングが適用されたテーブルでは行レベルの同時実行が可能になるため、書き込みの競合が軽減されます。行レベルの同時実行性を参照してください。
構文
ALTER TABLE <table_name>
REPLACE PARTITIONED BY WITH CLUSTER BY [( <clustering_columns> ) | AUTO]
CLUSTER BY句は以下のオプションに対応しています:
( <clustering_columns> ):新しいクラスタリング列を指定します。Databricks は、新しいクラスタリング列を元のパーティション列と似たものに保つことを推奨します。非常に異なる列を使用すると、最初のOPTIMIZE実行で大規模な再クラスタリング操作がトリガーされます。AUTO: 現在のパーティション列を初期クラスタリング列として使用し、予測的最適化により時間とともに適応させます。Unity Catalog マネージドテーブルでのみ利用可能です。自動リキッドクラスタリングを参照してください。- オプション未指定 :現在のパーティション列を新しいクラスタリング列として使用します。
パーティション分割されたテーブルから移行する際のクラスタリングキーの選択については、「パーティション分割またはZ-Orderからの移行」を参照してください。
例
元のパーティションとは異なる列でクラスター化する場合(たとえば、(year, month, day) でパーティション分割されたテーブルの場合)、次の操作を実行します。
ALTER TABLE t1 REPLACE PARTITIONED BY WITH CLUSTER BY (day, id);
OPTIMIZE t1;
クラスタリング列の変更を利用するには、OPTIMIZEを実行する必要があります。
自動リキッドクラスタリングを使用し、現在のパーティション列から開始するには、次の操作を行います。
ALTER TABLE t2 REPLACE PARTITIONED BY WITH CLUSTER BY AUTO;
現在のパーティション列をクラスタリング列として維持するには、次の手順を実行します:
ALTER TABLE t3 REPLACE PARTITIONED BY WITH CLUSTER BY;
変換中の並列読み取りと書き込みの処理
変換後、Databricks Runtime 13.3 LTS 以降は読み取りと書き込みでサポートされています。Databricksでは、変換中にテーブルへの読み書きを行うワークロードに対して、Databricks Runtime 15.4 LTS 以上をお勧めします。
変換中の並列の読み取りおよび書き込みワークロードの処理方法については、以下の表を参照してください。
ワークロードタイプ | 変換中に読み込みます | 変換中の書き込み |
|---|---|---|
バッチ | ダウンタイムはありません。すべての Databricks Runtime バージョンは、変換中にテーブルを読み取ることができます。 | Databricks Runtime 15.4 以降でダウンタイムなし。Databricks Runtime 15.3 およびそれ以前のバージョンでは、Databricks は変換する前にワークロードを停止し、変換が完了した後でワークロードを再開することを推奨しています。 |
ストリーミング | (スキーマトラッキングと列マッピング): コミットを失うことなくストリームを再起動します。 スキーマトラッキングと列マッピングなしで :ストリームは例外を発生させます。新しいチェックポイントの場所と開始バージョンで再起動します。コミットは失われません。 | コミットを一切失うことなくストリームを再開します。 |
変換の検証またはロールバック
変換を確認するには、DESCRIBE EXTENDED を実行して、新しいクラスタリング列を確認します。DESCRIBE HISTORYを実行して、一連のREORG操作、UPGRADE PROTOCOL操作、およびREPLACE PARTITIONED BY WITH CLUSTER BY操作を確認できます。
変換をロールバックするには、RESTOREを使用して以前のバージョンに戻します。または、REPLACE TABLE ... PARTITIONED BY (...) AS SELECT * FROM ...を使用してテーブルを書き換えることができます。
RESTORE を使用してロールバックするには、次のコマンドを実行します。
ALTER TABLE my_table CLUSTER BY NONE;
ALTER TABLE my_table UNSET TBLPROPERTIES ('delta.liquid.hierarchicalClusteringColumns');
RESTORE TABLE my_table TO VERSION AS OF <version_number_before_conversion>;
See RESTORE.
タイムスタンプ列によってパーティション分割されたテーブルを変換する
タイムスタンプ列 (timestamp_col) でパーティション分割されたテーブル (t1) を変換し、そのタイムスタンプ列をクラスターキーとして使用するには、追加の設定を行う必要があります。
SET spark.databricks.delta.liquidConversion.statsGeneration.enabled = false;
ALTER TABLE t1 REPLACE PARTITIONED BY WITH CLUSTER BY (timestamp_col, id);
ANALYZE TABLE t1 COMPUTE DELTA STATISTICS;
これらの構成なしにタイムスタンプパーティション列をクラスタリング列に変換しようとすると、コマンドはエラーを返します:
ALTER TABLE REPLACE PARTITIONED BY WITH CLUSTER BY cannot auto-generate stats on table with column event_ts due to unsupported type: timestamp. Disable stats auto-generation by setting 'spark.databricks.delta.liquidConversion.statsGeneration.enabled' to 'false' and retry the command again. SQLSTATE: 42000
変換制限
REPLACE PARTITIONED BY WITH CLUSTER BY変換コマンドには次の制限が適用されます。
- LakeFlow Spark宣言型パイプラインのパイプラインから作成されたストリーミングテーブルおよびマテリアライズドビューはサポートされていません。リキッドクラスタリングを使用するには、パイプライン定義を
PARTITIONED BYの代わりにCLUSTER BYを使用するように更新する必要があります。 - Delta Sharingでパーティションフィルタリングを使用しているテーブルはサポートされていません。Delta Sharing のパーティションフィルタリングに関する情報については、共有するテーブルパーティションの指定をご覧ください。
クラスタリング キーの削除
クラスタリング・キーを削除するには、次の構文を使用します。
ALTER TABLE table_name CLUSTER BY NONE;
クラスタリング キーの選択
Databricks では、サポートされているテーブルに対して、クエリパターンに基づいてクラスタリングキーをインテリジェントに選択する自動リキッドクラスタリングの利用を推奨しています。自動リキッドクラスタリングを参照してください。
キー選択ガイドライン
手動でクラスタリング キーを指定する場合は、クエリ フィルターで最も頻繁に使用される列に基づいて列を選択してください。クラスタリングキーは任意の順序で定義できます。2 つの列の相関性が高い場合は、そのうちの 1 つだけをクラスタリング キーとして含める必要があります。
最大4つのクラスタリングキーを指定できます。小規模なテーブル (10 TB 未満) では、クラスタリングキーの数が多い場合、1つのカラムでフィルタリングするときにパフォーマンスが低下する可能性があります。例えば、4つのキーでフィルタリングすると、2つのキーでフィルタリングするよりもパフォーマンスが低下します。ただし、テーブル・サイズが大きくなると、このパフォーマンスの違いは単一カラム・クエリではごくわずかになります。
クラスタリングキーは、統計が収集される列である必要があります。デフォルトでは、Delta Lake テーブルの最初の32列に統計が収集されます。統計列の指定を参照してください。
サポートされているデータ型
クラスタリングでは、クラスタリングキーとしてこれらのデータ型がサポートされています。
- Date
- タイムスタンプ
- TimestampNTZ(Databricks Runtime 14.3 LTS以降)
- String
- Integer、Long、Short、バイト
- フロート、ダブル、デシマル
StructFieldで、ドット表記(例:CLUSTER BY (struct_col.field))を使用してクラスター化できます。ネストされた構造体フィールドは、CLUSTER BY (struct_col.nested.field) のように、任意の深さまでサポートされています。フィールドのデータ型は、前述のリストにあるサポートされている型のいずれかである必要があります。
以下のものではクラスタリングできません:
StructType、MapTypeなどの複合型、またはArrayTypeMapTypeとArrayTypeの要素、map_col['key']、array_col[0]、またはmap_col.keyなど。
パーティション分割またはZ-orderからの移行
Databricks では、REPLACE PARTITIONED BY WITH CLUSTER BY コマンドで自動変換を使用することをお勧めします。パーティション分割されたテーブルをリキッドクラスタリングに変換するを参照してください。
既存のテーブルを変換する場合は、以下の推奨事項を考慮してください。
現在のデータ最適化手法 | クラスタリングキーに関する推奨事項 |
|---|---|
Hiveスタイルのパーティション分割 | パーティション列をクラスタリングキーとして使用します。 |
Z-orderのインデックス作成 |
|
Hiveスタイルのパーティション分割とZ-order | パーティション列と |
カーディナリティを減らすためのジェネレーテッドカラム(タイムスタンプの日付など) | 元の列をクラスタリング キーとして使用し、生成された列は作成しないでください。 |
自動リキッドクラスタリング
Databricks Runtime 15.4 LTS 以降では、Unity Catalog マネージド Delta Lake テーブルの自動リキッドクラスタリングを有効にできます。Unity Catalog マネージド Apache Iceberg v3 テーブルの場合、自動リキッドクラスタリングには Databricks Runtime 18.0 以降が必要です。自動リキッドクラスタリングを使用すると、Databricks は CLUSTER BY AUTO 節を使用して、クエリのパフォーマンスを最適化するクラスタリングキーをインテリジェントに選択できます。
自動リキッドクラスタリングは、LakeFlow Spark宣言型パイプラインやスタンドアロンのパイプラインを含むマテリアライズドビューとストリーミングテーブルでもサポートされています。CLUSTER BY AUTOをパイプラインまたはSQLの定義で指定してください。
自動リキッドクラスタリングの仕組み
自動リキッドクラスタリングは、使用パターンに基づいてインテリジェントな最適化を提供します:
- 予測的最適化が必須です :自動キー選択操作とクラスタリング操作は、メンテナンス操作として非同期に実行されます。Unity Catalog マネージドテーブルの予測的最適化を参照してください。
- クエリワークロードを分析 : Databricks はテーブルの履歴クエリワークロードを分析し、クラスタリングに最適な候補列を特定します。
- 変化への対応 : クエリパターンやデータ分布が時間の経過とともに変化した場合、自動リキッドクラスタリングはパフォーマンスを最適化するために新しいキーを選択します。
- コストアウェア選択 : Databricks は、データ スキップの改善による予測されるコスト削減がデータ クラスタリング コストを上回る場合にのみ、クラスタリング キーを変更します。
自動リキッドクラスタリングは、以下の理由によりキーを選択しない場合があります。
- テーブルが小さすぎてリキッドクラスタリングの恩恵を受けられません。
- テーブルには、以前の手動キー、またはクエリパターンに一致する自然な挿入順序のいずれかにより、すでに適切なクラスタリングスキームがあります。
- テーブルにはよくあるクエリがありません。
- Databricks Runtime 15.4 LTS以降を使用していません。
自動リキッドクラスタリングは、データやクエリの特性に関係なく、すべての Unity Catalog マネージドテーブルに適用できます。クラスタリング キーを選択することがコストメリットがあるかどうかはヒューリスティックによって決定されます。
Databricks Runtime バージョンの互換性
自動クラスタリングが有効になっているテーブルは、リキッドクラスタリングをサポートするすべてのバージョンから Databricks Runtime 読み書きできます。 ただし、インテリジェントなキー選択は、Databricks Runtime 15.4 LTS で導入されたメタデータに依存します。
Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードにメリットをもたらし、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。
自動リキッドクラスタリングを有効にするか、オフにする
- SQL
- Python
自動リキッドクラスタリングテーブルを作成するには:
CREATE OR REPLACE TABLE table1 (column01 int, column02 string) CLUSTER BY AUTO;
手動で指定されたキーを持つテーブルを含む既存のテーブルで自動リキッドクラスタリングを有効にするには:
ALTER TABLE table1 CLUSTER BY AUTO;
キー選択の初期クラスタリング列のヒントを設定するには、クラスタリングキーを設定し、その後、自動クラスタリングを有効にします。
ALTER TABLE table1 CLUSTER BY (c1, c2);
ALTER TABLE table1 CLUSTER BY AUTO;
または、Python API を使用して、1回の操作でヒントを設定できます。
自動リキッドクラスタリングをオフにするには:
ALTER TABLE table1 CLUSTER BY NONE;
自動リキッドクラスタリングをオフにして、クラスタリング列を指定するには、
ALTER TABLE table1 CLUSTER BY (column01, column02);
既存のテーブルで自動リキッドクラスタリングが有効になっている場合、CLUSTER BY AUTO を指定せずに CREATE OR REPLACE table_name を実行すると、自動クラスタリングがオフになり、クラスタリング列は保持されません。自動リキッドクラスタリングと以前に選択された列を維持するには、replaceステートメントにCLUSTER BY AUTOを含めてください。CLUSTER BY AUTOでは、予測的最適化はテーブルの履歴クエリワークロードを使用して、最適なクラスタリングキーを特定します。
Python APIは、Databricks Runtime 16.4以降で使用できます。Python は、テーブルを作成または置き換える場合にのみ使用できます。SQL を使用して、既存のテーブルの clusterByAuto の状態を変更します。
自動リキッドクラスタリングを使用してテーブルを作成するには、DataFrameWriterを使用します。
df = spark.read.table("table1")
df.write
.format("delta")
.option("clusterByAuto", "true")
.saveAsTable(...)
キー選択用の初期クラスタリング列のヒントをDataFrameWriterを使用して設定するには:
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.saveAsTable(...)
自動リキッドクラスタリングを使用してテーブルを作成するには、DataFrameWriterV2を使用します。
df.writeTo(...).using("delta")
.option("clusterByAuto", "true")
.create()
キー選択用の初期クラスタリング列のヒントをDataFrameWriterV2を使用して設定するには:
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option("clusterByAuto", "true")
.create()
自動リキッドクラスタリングが有効なストリーミングテーブルを作成するには:
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
ストリーミングテーブルでキー選択の初期クラスタリング列ヒントを設定するには:
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
.clusterBy をクラスターキーの選択ヒントとして .option('clusterByAuto', 'true') と併用する場合、動作は次のとおりです。
- これが初めて自動リキッドクラスタリングを設定する場合、クラスタリング列は
.clusterByで指定された列に設定されます。 - これが自動リキッドクラスタリングが有効になっている既存のテーブルの場合は、
.clusterByヒントは一度だけ受け入れられます。たとえば、.clusterByで指定されたカラムは、テーブルにクラスタリングカラムが設定されていない場合にのみ設定されます。
DataFrame APIs を使用する場合、clusterByAuto オプションは、overwrite モードを使用している場合にのみ設定できます。appendモードを使用する場合、clusterByAutoを設定できません。この制限は、クラスタリング列を手動で設定する場合と同じです。クラスタリング設定は、overwriteモードを使用して、テーブルの作成時または置換操作時にのみ構成できます。
回避策として、既存のテーブルのclusterByAutoステータスをデータを追加しながら変更したい場合は、SQL ALTER TABLEコマンドを使用して、データ書き込み操作とは別にクラスタリング構成を変更してください。
自動クラスタリングが有効になっているかどうかを確認する
テーブルで自動リキッドクラスタリングが有効になっているかどうかを確認するには、DESCRIBE TABLE または SHOW TBLPROPERTIESを使用します。
自動リキッドクラスタリングが有効な場合、 clusterByAuto プロパティは trueに設定されます。 clusteringColumns プロパティは、自動または手動で選択された現在のクラスタリング列を示します。
制限事項:
自動リキッドクラスタリングは、マネージド Apache Iceberg v2 テーブルには利用できません。Databricks Runtime 18.0 以降では、マネージド Apache Iceberg v3 テーブルがサポートされています。
クラスター化されたテーブルへのデータの書き込み
クラスタリングされたDelta Lakeテーブルに書き込むには、リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。クラスター化されたIcebergテーブルに書き込むには、Unity CatalogのIceberg REST Catalog APIを使用できます。Databricksでは、Databricks Runtime 13.3 LTS以降を使用する必要があります。
書き込み時にクラスター化をサポートする操作
書き込み時にクラスター化する操作には、以下のものがあります。
INSERT INTOオペレーションCTASおよびRTASステートメントCOPY INTO(Parquet由来)spark.write.mode("append")
クラスターのサイズしきい値
書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDelta Lakeテーブルよりも低くなります。
クラスタリング列の数 | Unity Catalogマネージドテーブルのしきい値サイズ | 他のDelta Lakeテーブルのしきい値サイズ |
|---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZEを頻繁に実行することをお勧めします。
ストリーミング ワークロード
構造化ストリーミングワークロードは、Spark設定 spark.databricks.delta.liquid.eagerClustering.streaming.enabled を true に設定すると、クラスタリングオンライトをサポートしています。これらのワークロードに対するクラスタリングは、過去5回のストリーミング更新のうち少なくとも1つが、上記の表に記載されているサイズしきい値を超えた場合にのみトリガーされます。
クラスタリングをトリガーする方法
予測的最適化は、有効なテーブルに対してOPTIMIZEコマンドを自動的に実行します。Unity Catalog マネージドテーブルの予測的最適化を参照してください。予測的最適化を使用する場合、Databricksでは、スケジュールされたOPTIMIZEジョブを無効にすることをお勧めします。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS 以降を使用する必要があります。Databricks は、大規模なテーブルでの OPTIMIZE パフォーマンス向上のために、Databricks Runtime 17.2 以降を推奨します。テーブルに対してOPTIMIZEコマンドを使用します。
OPTIMIZE table_name;
リキッドクラスタリングは インクリメンタル であり、クラスタリングする必要があるデータに対応するために、OPTIMIZE は必要な場合にのみデータを書き換えます。OPTIMIZEは、クラスター化されるデータに一致しないクラスタリングキーを持つデータファイルを書き換えません。「再クラスタリングの強制」を参照してください。
予測的最適化を使用していない場合は、Databricksでは、通常のOPTIMIZEジョブをクラスタリングデータにスケジュールすることをお勧めします。多くの更新または挿入が発生しているテーブルの場合、Databricks では、1時間または2時間ごとにOPTIMIZEジョブをスケジュールすることをお勧めします。リキッドクラスタリングは増分であるため、クラスター化されたテーブルのほとんどのOPTIMIZEジョブは迅速に実行されます。
再クラスタリングを強制する
Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。
OPTIMIZE table_name FULL;
OPTIMIZE FULLを実行すると、すべての既存データは必要に応じて再クラスタリングされます。指定されたキーで以前にクラスタリング化されていない大規模なテーブルの場合、この操作には数時間かかる可能性があります。
クラスタリングを初めて有効にするとき、またはクラスタリング キーを変更するときに OPTIMIZE FULL を実行します。以前に OPTIMIZE FULL を実行したことがあり、クラスタリング キーに変更がない場合は、OPTIMIZEと同じようにOPTIMIZE FULL実行できます。このシナリオでは、OPTIMIZE は増分アプローチを使用し、以前に圧縮されていないファイルのみを書き換えます。常に OPTIMIZE FULL を使用して、データレイアウトが現在のクラスタリングキーを反映していることを確認します。
部分的な再クラスタリング
Databricks Runtime 18.1 以降では、OPTIMIZE FULL WHERE <predicate> を使用して一部のレコードの再クラスタリングを強制できます。範囲のいずれかの部分が述語と重複する場合、ファイルが含まれます。See パラメーター.
OPTIMIZE events FULL WHERE event_date >= '2025-01-01';
クラスター化されたテーブルからデータを読み取る
クラスター化されたDelta Lakeテーブルのデータは、削除ベクトルの読み取りをサポートする任意のDelta Lakeクライアントを使用して読み取ることができます。Iceberg REST Catalog API を使用して、クラスター化された Iceberg テーブル内のデータを読み取ることができます。リキッドクラスタリングは、クラスタリングキーでフィルタリングする際の自動データスキッピングによって、クエリパフォーマンスを向上させます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリング キーの管理
テーブルのクラスター化方法を確認する
次の例のように、DESCRIBEコマンドを使用してテーブルのクラスタリングキーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
クラスタリング キーの変更
次の例のように、ALTER TABLEコマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリングキーを変更すると、後続のOPTIMIZEおよび書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。更新されたクラスタリングキーを使用して既存のデータを書き換えるには、「再クラスタリングの強制」を参照してください。
次の例のように、キーをNONEに設定してクラスタリングをオフにすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスターキーをNONEに設定した場合、クラスタリングされたデータは書き換えられませんが、その後のOPTIMIZE操作でクラスタリングキーが使用されなくなります。
外部エンジンからリキッドクラスタリングを使用する
外部の Iceberg エンジンから、マネージド Iceberg テーブルでリキッドクラスタリングを有効にできます。リキッドクラスタリングを有効にするには、テーブルを作成する際にパーティション列を指定します。Unity Catalogはパーティションをクラスタリングキーとして解釈します。例えば、OSS Spark で以下のコマンドを実行します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
リキッドクラスタリングをオフにするには:
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
Icebergパーティションの進化を使用してクラスタリングキーを変更するには:
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
バケット変換を使用してパーティションを指定すると、Unity Catalog はその式を破棄し、その列をクラスタリングキーとして使用します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
リキッドクラスタリングを使用するテーブルの互換性
リキッドクラスタリングは、読み取りと書き込みに特定のDatabricks Runtimeバージョンを必要とするDelta Lakeテーブル機能を使用します。Databricks Runtime 14.1 以降でリキッドクラスタリングを有効にして作成されたテーブルでは、デフォルトで checkpoint V2 が使用されます。Databricks Runtime 13.3 LTS 以降では、v2 チェックポイントを使用してテーブルを読み書きできます。チェックポイントV2を参照してください。
Databricks Runtime 12.2 LTSから13.2をご利用のお客様には、Checkpoint V2を無効にし、テーブルプロトコルをダウングレードしてください。クラシックへのダウングレードを参照してください。
デフォルトの機能有効化設定を上書きする(オプション)
リキッドクラスタリング有効化時に、デフォルトのDelta Lakeテーブル機能の有効化を上書きできます。これにより、それらのテーブル機能に関連するリーダーとライターのプロトコルがアップグレードされなくなります。次のステップを完了するには、既存のテーブルが必要です。
-
ALTER TABLEを使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。SQLALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false); -
次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。
SQLALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。
Delta機能 | Runtimeの互換性 | 有効化設定を上書きするためのプロパティ | オフにした場合のリキッドクラスタリングへの影響 |
|---|---|---|---|
削除ベクトル | 読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。 |
| 削除ベクトルを無効にすると、行レベルの同時実行性が無効になり、トランザクションとクラスタリング操作が競合しやすくなります。行レベルの同時実行性を参照してください。
|
行トラッキング | 書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。 |
| 行追跡を無効にすると、行レベルの同時実行も無効になり、トランザクションとクラスタリング操作が競合しやすくなります。行レベルの同時実行性を参照してください。 |
チェックポイントV2 | 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。 |
| リキッドクラスタリングの動作には影響ありません。チェックポイントV2を参照してください。 |
制限事項:
- Databricks Runtime 15.1以前 において、書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソースクエリがサポートされません。
- Databricks Runtime 15.4 LTS 以前 : 構造化ストリーミング書き込みを使用してリキッドクラスタリングが有効になっているテーブルを作成することはできません。構造化ストリーミングを使用して、リキッドクラスタリングが有効になっている既存のテーブルにデータを書き込むことができます。
- Apache Iceberg v2 :Managed Apache Iceberg v2 テーブルでは、削除ベクトルと行追跡がサポートされていないため、行レベルの同時実行性はサポートされません。
- 行レベルのコンカレンシーは、管理対象のApache Iceberg v3テーブルでサポートされています。v3仕様が削除ベクトルと行追跡をサポートしているためです。Apache Iceberg v3 機能を使用を参照してください。