リキッドクラスタリングを使用して Delta テーブル
Delta Lake リキッドクラスタリングは、テーブルのパーティション分割とZORDER
を置き換えて、データレイアウトの決定を簡素化し、クエリのパフォーマンスを最適化します。 リキッドクラスタリングは、既存のデータを書き換えることなくクラスタリングキーを再定義する柔軟性を提供し、時間の経過とともに分析ニーズに沿ってデータレイアウトを進化させることができます。 リキッドクラスタリングは、ストリーミング テーブルとマテリアライズドビューの両方に適用されます。
Databricksでは、リキッドクラスタリングが有効なすべてのテーブルに対して、Databricks Runtime 15.2以降を使用することをお勧めします。制限付きのパブリックプレビューサポートは、Databricks Runtime 13.3 LTS以降で使用できます。
リキッドクラスタリングが有効になっているテーブルは、 Databricks Runtime 13.3 LTS 以降で行レベルの同時実行性をサポートします。 行レベルのコンカレンシーは、Databricks Runtime 14.2 以降で、削除ベクトルが有効になっているすべてのテーブルで一般公開されています。 「Databricks での分離レベルと書き込みの競合」を参照してください。
リキッドクラスタリングは何に使用されますか?
Databricks では、ストリーミング テーブル (ST) とマテリアライズド ビュー (MV) の両方を含む、すべての新しい Delta テーブルに対してリキッドクラスタリングを推奨しています。 クラスタリングの恩恵を受けるシナリオの例を次に示します。
- カーディナリティの高い列によってフィルタリングされることが多いテーブル。
- データの分散に大きな偏りがあるテーブル。
- 急速に増大し、メンテナンスとチューニングが必要となるテーブル。
- 並列書き込み要件のあるテーブル。
- 時間の経過と共に変化するアクセスパターンを持つテーブル。
- 一般的なパーティションキーでは、パーティションが多すぎたり少なすぎたりするテーブル。
リキッドクラスタリングを有効にする
リキッドクラスタリングは、既存のテーブルで、またはテーブルの作成中に有効にできます。 クラスタリングはパーティション分割や ZORDER
と互換性がなく、Databricks を使用してテーブル内のデータのすべてのレイアウトおよび最適化操作を管理する必要があります。 リキッドクラスタリングを有効にした後、通常どおり OPTIMIZE
ジョブを実行してデータを段階的にクラスタリングします。 クラスタリングをトリガーする方法を参照してください。
リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントにCLUSTER BY
フレーズを追加します。
Databricks Runtime 14.2以降では、PythonまたはScalaでデータフレーム APIとDeltaTable APIを使用してリキッドクラスタリングを有効にできます。
- SQL
- Python
- Scala
-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
# Create an empty table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Databricks Runtime 16.0 以降では、次の例のように、構造化ストリーミング write を使用してリキッドクラスタリングを有効にしたテーブルを作成できます。
- Python
- Scala
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
リキッドクラスタリングを有効にして作成されたテーブルでは、作成時に多数の Delta テーブル機能が有効になっており、Delta ライターバージョン 7 とリーダーバージョン 3 が使用されます。 これらの機能の一部の有効化を上書きできます。 「デフォルトの機能の有効化を上書きする(オプション)」を参照してください。
テーブル プロトコル バージョンはダウングレードできず、クラスタリングが有効になっているテーブルは、有効な Delta リーダー プロトコル テーブル機能をすべてサポートしていない Delta Lake クライアントでは読み取れません。「Delta Lake 機能の互換性とプロトコル」を参照してください。
次の構文を使用すると、既存のパーティション化されていないDeltaテーブルでリキッドクラスタリングを有効にできます。
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。 すべてのレコードに対して再クラスタリングを強制するには、 OPTIMIZE FULL
を使用する必要があります。 「すべてのレコードの再クラスタリングの強制」を参照してください。
クラスタリング・キーを削除するには、次の構文を使用します。
ALTER TABLE table_name CLUSTER BY NONE;
自動リキッドクラスタリング
プレビュー
自動リキッドクラスタリングは パブリック プレビュー段階です。
Databricks Runtime 15.4 LTS以降では、Unity Catalogマネージドテーブルの自動リキッドクラスタリングを有効にできます。自動リキッドクラスタリングを有効にすると、Databricks はクラスタリングキーをインテリジェントに選択してクエリのパフォーマンスを最適化します。 自動リキッドクラスタリングを有効にするには、 CLUSTER BY AUTO
節を使用します。
有効にすると、自動キー選択操作と自動クラスタリング操作はメンテナンス操作として非同期に実行され、テーブルに対して予測的最適化が有効になっている必要があります。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。
クラスタリング キーを特定するために、Databricks はテーブルの履歴クエリ ワークロードを分析し、最適な候補列を特定します。 クラスタリング キーは、データ スキップの改善による予測されるコスト削減がデータのクラスタリング コストを上回る場合に変更されます。
データのクエリ方法が時間の経過と共に変化したり、クエリのパフォーマンスがデータ分布の変化を示している場合、自動リキッドクラスタリングはパフォーマンスを最適化するために新しいキーを選択します。
リキッドクラスタリングをサポートするすべての Databricks Runtime バージョンから、自動クラスタリングが有効になっているテーブルの読み取りまたは書き込みが可能ですが、インテリジェントなキー選択は、 Databricks Runtime 15.4 LTSで導入されたメタデータに依存します。 Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードにメリットをもたらし、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。
自動クラスタリングを有効または無効にする
自動リキッドクラスタリングが有効になっている新しいテーブルを作成するには、次の構文を使用します。
CREATE OR REPLACE TABLE table_name CLUSTER BY AUTO;
次の例に示すように、以前に手動でキーが指定されていたテーブルを含む、既存のテーブルで自動リキッドクラスタリングを有効にすることもできます。
ALTER TABLE table_name CLUSTER BY AUTO;
また、自動リキッドクラスタリングが有効になっているテーブルを変更して、手動で指定したキーを使用することもできます。
clusterByAuto
プロパティは、自動リキッドクラスタリングを有効にすると true
に設定されます。clusteringColumns
プロパティは、自動キー選択によって選択された現在のクラスタリング列を示します。DESCRIBE EXTENDED table_name
を実行すると、テーブルのプロパティの完全な一覧が表示されます。
デフォルトの機能の有効化を上書きする(オプション)
リキッドクラスタリングが有効な場合にDeltaテーブル機能を有効にするデフォルトの動作を上書きできます。これにより、それらのテーブル機能に関連するリーダーとライターのプロトコルがアップグレードされなくなります。次の手順を完了するには、既存のテーブルが必要です。
-
ALTER TABLE
を使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。SQLALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
-
次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。
SQLALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。
Delta機能 | Runtimeの互換性 | 有効化設定を上書きするためのプロパティ | 無効化によるリキッドクラスタリングへの影響 |
---|---|---|---|
削除ベクトル | 読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。 |
| 行レベルの同時実行性は無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルの同時実行性との書き込みの競合」を参照してください。 |
行追跡 | 書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。 |
| 行レベルの同時実行性は無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルの同時実行性との書き込みの競合」を参照してください。 |
チェックポイントV2 | 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。 |
| リキッドクラスタリングの動作には影響ありません。 |
クラスタリング キーの選択
Databricks では、サポートされているテーブルに対して自動リキッドクラスタリングを推奨しています。 自動リキッドクラスタリングを参照してください。
Databricks では、クエリ フィルターで最も頻繁に使用される列に基づいてクラスタリング キーを選択することをお勧めします。 クラスタリング・キーは、任意の順序で定義できます。 2 つの列の相関性が高い場合は、そのうちの 1 つだけをクラスタリング キーとして含める必要があります。
最大 4 つのクラスタリングキーを指定できます。 小規模なテーブル (10 TB 未満) では、クラスタリングキーの数が多い (4 つなど) と、1 つのカラムでフィルタリングするときのパフォーマンスが、より少ないクラスタリングキー (2 つなど) と比較して低下する可能性があります。 ただし、テーブル・サイズが大きくなると、単一カラム・クエリにより多くのクラスタリング・キーを使用した場合のパフォーマンスの違いはごくわずかになります。
指定できるのは、統計がクラスタリング・キーとして収集される列のみです。 デフォルトでは、 Delta テーブルの最初の32列に統計が収集されます。 Delta 統計カラムの指定を参照してください。
クラスタリングでは、クラスタリングキーとして次のデータ型がサポートされています。
- 日付
- Timestamp
- TimestampNTZ(Databricks Runtime 14.3 LTS以降が必要)
- 文字列
- 整数タイプ
- Long
- Short
- Float
- Double
- DECIMALタイプ
- Byte
既存のテーブルを変換する場合は、以下の推奨事項を考慮してください。
現在のデータ最適化手法 | クラスタリングキーに関する推奨事項 |
---|---|
Hiveスタイルのパーティション分割 | パーティション列をクラスタリングキーとして使用します。 |
Z-orderのインデックス作成 |
|
Hiveスタイルのパーティション分割とZ-order | パーティション列と |
カーディナリティを減らすための生成済み列(タイムスタンプの日付など) | 元の列をクラスタリングキーとして使用し、生成済み列を作成しないでください。 |
クラスタ化されたテーブルにデータを書き込む
リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。Databricksでは、Databricks Runtime 13.3 LTS以降を使用する必要があります。
書き込み時にクラスター化する操作には、以下のものがあります。
INSERT INTO
オペレーションCTAS
およびRTAS
ステートメントCOPY INTO
(Parquet由来)spark.write.mode("append")
構造化ストリーミングの書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。 追加の制限が適用されます。 制限事項を参照してください。
書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。
クラスタリング列の数 | Unity Catalogマネージドテーブルのしきい値サイズ | 他のDeltaテーブルのしきい値サイズ |
---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZE
を頻繁に実行することをお勧めします。
クラスタリングをトリガーする方法
予測的最適化 enabled tables の OPTIMIZE
コマンドを自動的に実行します。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS以降を使用する必要があります。次の例のように、テーブルに対してOPTIMIZE
コマンドを使用します。
OPTIMIZE table_name;
リキッドクラスタリングはインクリメンタルであり、クラスタリングする必要があるデータに対応するために、必要な場合にのみデータが書き換えられます。クラスタリング化されるデータと一致しないクラスタリングキーを持つデータファイルは書き換えられません。
最適なパフォーマンスを得るために、Databricksでは、通常のOPTIMIZE
ジョブをクラスタリングデータにスケジュールすることをお勧めしています。また、多くの更新または挿入が発生しているテーブルの場合、Databricksでは、1時間または2時間ごとにOPTIMIZE
ジョブをスケジュールすることをお勧めしています。リキッドクラスタリングは増分であるため、クラスタリング化されたテーブルのほとんどのOPTIMIZE
ジョブは迅速に実行されます。
すべてのレコードの再クラスタリングを強制する
Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。
OPTIMIZE table_name FULL;
OPTIMIZE FULL
を実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。
クラスタリングを初めて有効にするとき、またはクラスタリング キーを変更するときに OPTIMIZE FULL
を実行します。以前に OPTIMIZE FULL
を実行したことがあり、クラスタリング キーに変更がない場合は、OPTIMIZE
と同じようにOPTIMIZE FULL
実行できます。このシナリオでは、 OPTIMIZE
は増分アプローチを使用し、以前に圧縮されていないファイルのみを書き換えます。常に OPTIMIZE FULL
を使用して、データ レイアウトが現在のクラスタリング キーを反映していることを確認します。
クラスタ化テーブルからのデータの読み取り
クラスタリング化されたテーブルのデータは、削除ベクトルの読み取りをサポートする任意のDelta Lakeクライアントを使用して読み取ることができます。最適なクエリー結果を得るには、次の例のように、クエリーフィルターにクラスタリングキーを含めます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリング キーの変更
次の例のように、ALTER TABLE
コマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリングキーを変更すると、後続のOPTIMIZE
操作と書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。
次の例のように、キーをNONE
に設定してクラスタリングをオフにすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスタリングキーをNONE
に設定した場合、既にクラスタリング化されたデータは書き換わりませんが、その後のOPTIMIZE
操作でクラスタリングキーが使用されなくなります。
テーブルのクラスタ化方法を見る
次の例のように、DESCRIBE
コマンドを使用してテーブルのクラスタリングキーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
リキッドクラスタリングを使用するテーブルの互換性
Databricks Runtime 14.1以降でリキッドクラスタリングを有効にして作成されたテーブルでは、デフォルトでv2チェックポイントが使用されます。Databricks Runtime 13.3 LTS以降では、v2チェックポイントを使用してテーブルを読み書きできます。
v2 チェックポイントを無効にし、 Databricks Runtime 12.2 LTS 以降でリキッドクラスタリングを使用してテーブルを読み取るためにテーブルプロトコルをダウングレードできます。 「Delta Lake テーブル機能の削除」および「テーブル プロトコルのダウングレード」を参照してください。
制限
次の制限があります。
- Databricks Runtime 15.1以前において、書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソースクエリがサポートされません。
- 構造化ストリーミングワークロードはクラスタリングオンライトをサポートしていません。
- Databricks Runtime 15.4 LTS 以前では、構造化ストリーミング書き込みを使用してリキッドクラスタリングが有効になっているテーブルを作成することはできません。構造化ストリーミングを使用して、リキッドクラスタリングが有効になっている既存のテーブルにデータを書き込むことができます。