Deltaテーブルにリキッドクラスタリングを使用する
Delta Lakeリキッドクラスタリングは、テーブルのパーティション分割とZORDER
を置き換えて、データレイアウトの決定を簡素化し、クエリーのパフォーマンスを最適化します。リキッドクラスタリングは、既存のデータを書き換えることなくクラスタリングキーを再定義する柔軟性を提供し、時間の経過とともに分析のニーズに応じてデータレイアウトを進化させることができます。
重要
Databricksでは、リキッドクラスタリングが有効なすべてのテーブルに対して、Databricks Runtime 15.2以降を使用することをお勧めします。制限付きのパブリックプレビューサポートは、Databricks Runtime 13.3 LTS以降で使用できます。
注:
Databricks Runtime 13.3LTS以降では、リキッドクラスタリングが有効になっているテーブルで行レベルの同時実行がサポートされます。行レベルの同時実行は、削除ベクトルが有効なすべてのテーブルに対し、Databricks Runtime 14.2以降で一般公開されています。「Databricksの分離レベルと書き込み競合」を参照してください。
リキッドクラスタリングは何に使用されますか?
Databricksでは、すべての新しいDeltaテーブルにリキッドクラスタリングを推奨しています。クラスターの恩恵を受けるシナリオの例を次に示します。
カーディナリティの高い列によってフィルタリングされることが多いテーブル。
データの分散に大きな偏りがあるテーブル。
急速に増大し、メンテナンスとチューニングが必要となるテーブル。
並列書き込み要件のあるテーブル。
時間の経過と共に変化するアクセスパターンを持つテーブル。
一般的なパーティションキーでは、パーティションが多すぎたり少なすぎたりするテーブル。
リキッドクラスタリングを有効にする
リキッドクラスタリングは、既存のテーブルに対して、またはテーブルの作成時に有効にすることができます。クラスタリングはパーティション分割やZORDER
と互換性がなく、Databricksを使用してテーブル内のデータのすべてのレイアウト操作と最適化操作を管理する必要があります。リキッドクラスタリングを有効にすると、OPTIMIZE
ジョブを通常どおり実行することでデータを増分的にクラスター化できます。「クラスタリングをトリガーする方法」を参照してください。
リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントにCLUSTER BY
フレーズを追加します。
注:
Databricks Runtime 14.2以降では、PythonまたはScalaでDataFrame APIとDeltaTable APIを使用してリキッドクラスタリングを有効にできます。
-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
# Create an empty table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Databricks Runtime 16.0 以降では、次の例のように、構造化ストリーミング write を使用してリキッドクラスタリングを有効にしたテーブルを作成できます。
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
警告
リキッドクラスタリングを有効にして作成されたテーブルでは、作成時に多数のDeltaテーブル機能が有効になっており、Deltaライターバージョン7およびリーダーバージョン3が使用されます。これらの機能の一部の有効化設定を上書きできます。「デフォルトの機能有効化設定を上書きする(オプション)」を参照してください。
テーブルプロトコルのバージョンはダウングレードできません。また、有効なDeltaリーダープロトコルテーブル機能のすべてをサポートしていないDelta Lakeクライアントからは、クラスタリングが有効になっているテーブルを読み取ることができません。「DatabricksでDelta Lake機能の互換性を管理する方法」を参照してください。
次の構文を使用すると、既存のパーティション化されていないDeltaテーブルでリキッドクラスタリングを有効にできます。
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
重要
デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。 すべてのレコードに対して再クラスタリングを強制するには、 OPTIMIZE FULL
を使用する必要があります。 「すべてのレコードの再クラスタリングの強制」を参照してください。
デフォルトの機能有効化設定を上書きする(オプション)
リキッドクラスタリングが有効な場合にDeltaテーブル機能を有効にするデフォルトの動作を上書きできます。これにより、それらのテーブル機能に関連するリーダーとライターのプロトコルがアップグレードされなくなります。次の手順を完了するには、既存のテーブルが必要です。
ALTER TABLE
を使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。
Delta機能 |
Runtimeの互換性 |
有効化設定を上書きするためのプロパティ |
無効化によるリキッドクラスタリングへの影響 |
---|---|---|---|
削除ベクトル |
読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。 |
|
行レベルの同時実行が無効になるため、トランザクションとクラスタリング操作が競合しやすくなります。「行レベルの同時実行との書き込みの競合」を参照してください。
|
行追跡 |
書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。 |
|
行レベルの同時実行が無効になるため、トランザクションとクラスタリング操作が競合しやすくなります。「行レベルの同時実行との書き込みの競合」を参照してください。 |
チェックポイントV2 |
読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。 |
|
リキッドクラスタリングの動作には影響ありません。 |
クラスタリングキーの選択
Databricksでは、一般的に使用されるクエリーフィルターに基づいてクラスタリングキーを選択することを推奨しています。クラスタリングキーは任意の順序で定義できます。2つの列が相関している場合は、そのうちの1つだけをクラスタリングキーとして追加する必要があります。
クラスタリングキーとして指定できるのは最大4列までです。クラスタリングキー用に収集された統計情報を持つ列のみを指定できます。デフォルトでは、Deltaテーブルの最初の32列に統計情報が収集されています。「Delta統計列の指定」を参照してください。
クラスタリングでは、クラスタリングキーとして次のデータ型がサポートされています。
Date
Timestamp
TimestampNTZ(Databricks Runtime 14.3 LTS以降が必要)
String
Integer
Long
Short
Float
Double
Decimal
Byte
既存のテーブルを変換する場合は、以下の推奨事項を考慮してください。
現在のデータ最適化手法 |
クラスタリングキーに関する推奨事項 |
---|---|
Hiveスタイルのパーティション分割 |
パーティション列をクラスタリングキーとして使用します。 |
Z-orderのインデックス作成 |
|
Hiveスタイルのパーティション分割とZ-order |
パーティション列と |
カーディナリティを減らすための生成済み列(タイムスタンプの日付など) |
元の列をクラスタリングキーとして使用し、生成済み列を作成しないでください。 |
クラスター化されたテーブルへのデータの書き込み
リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。Databricksでは、Databricks Runtime 13.3 LTS以降を使用する必要があります。
書き込み時にクラスタリングが行われる操作には、次のものがあります。
INSERT INTO
オペレーションCTAS
およびRTAS
ステートメントCOPY INTO
(Parquet由来)spark.write.mode("append")
構造化ストリーミングによる書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。追加の制限が適用されます。「制限事項」を参照してください。
書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。
クラスタリング列の数 |
Unity Catalogマネージドテーブルのしきい値サイズ |
他のDeltaテーブルのしきい値サイズ |
---|---|---|
1 |
64 MB |
256 MB |
2 |
256 MB |
1 GB |
3 |
512 MB |
2 GB |
4 |
1 GB |
4 GB |
すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZE
を頻繁に実行することをお勧めします。
クラスタリングをトリガーする方法
予測的最適化は、予測的最適化が有効になっているテーブルに対してOPTIMIZE
コマンドを自動的に実行する機能です。「Unity Catalog管理テーブルの予測的最適化」を参照してください。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS以降を使用する必要があります。次の例のように、テーブルに対してOPTIMIZE
コマンドを使用します。
OPTIMIZE table_name;
リキッドクラスタリングはインクリメンタルであり、クラスタリングする必要があるデータに対応するために、必要な場合にのみデータが書き換えられます。クラスター化されるデータと一致しないクラスタリングキーを持つデータファイルは書き換えられません。
最適なパフォーマンスを得るために、Databricksでは、通常のOPTIMIZE
ジョブをクラスターデータにスケジュールすることをお勧めしています。また、多くの更新または挿入が発生しているテーブルの場合、Databricksでは、1時間または2時間ごとにOPTIMIZE
ジョブをスケジュールすることをお勧めしています。リキッドクラスタリングは増分であるため、クラスター化されたテーブルのほとんどのOPTIMIZE
ジョブは迅速に実行されます。
すべてのレコードの再クラスタリングを強制する
Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。
OPTIMIZE table_name FULL;
重要
OPTIMIZE FULL
を実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。
クラスタリングを初めて有効にするとき、またはクラスタリング キーを変更するときに OPTIMIZE FULL
を実行します。 以前に OPTIMIZE FULL
を実行したことがあり、クラスタリング キーに変更がない場合は、OPTIMIZE
と同じようにOPTIMIZE FULL
実行できます。常に OPTIMIZE FULL
を使用して、データ レイアウトが現在のクラスタリング キーを反映していることを確認します。
クラスター化されたテーブルからデータを読み取る
クラスター化されたテーブルのデータは、削除ベクトルの読み取りをサポートする任意のDelta Lakeクライアントを使用して読み取ることができます。最適なクエリー結果を得るには、次の例のように、クエリーフィルターにクラスタリングキーを含めます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリングキーの変更
次の例のように、ALTER TABLE
コマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリングキーを変更すると、後続のOPTIMIZE
操作と書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。
次の例のように、キーをNONE
に設定してクラスタリングをオフにすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスターキーをNONE
に設定した場合、既にクラスター化されたデータは書き換わりませんが、その後のOPTIMIZE
操作でクラスタリングキーが使用されなくなります。
テーブルのクラスター化方法を確認する
次の例のように、DESCRIBE
コマンドを使用してテーブルのクラスタリングキーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
リキッドクラスタリングが有効なテーブルの互換性
Databricks Runtime 14.1以降でリキッドクラスタリングを有効にして作成されたテーブルでは、デフォルトでv2チェックポイントが使用されます。Databricks Runtime 13.3 LTS以降では、v2チェックポイントを使用してテーブルを読み書きできます。
Databricks Runtime 12.2 LTS以降では、v2チェックポイントを無効にし、テーブルプロトコルをダウングレードすることで、リキッドクラスタリングが有効なテーブルを読み取ることができます。「Deltaテーブル機能の削除」を参照してください。