テーブルにリキッドクラスタリングを使用する
リキッドクラスタリングは、テーブルのパーティション分割と ZORDER
を置き換えて、データレイアウトの決定を簡素化し、クエリのパフォーマンスを最適化します。 既存のデータを書き換えることなくクラスタリングキーを再定義できる柔軟性を備えているため、データレイアウトを時間の経過とともに分析ニーズに合わせて進化させることができます。リキッドクラスタリングは、ストリーミングテーブルとマテリアライズドビューの両方に適用されます。
リキッドクラスタリングは、 Delta Lake では一般提供され、 Apache Icebergではパブリック プレビューとして提供されます。
リキッドクラスタリングが有効になっているすべての Delta Lake テーブルでは、 Databricks 15.2 以降 Databricks Runtime を使用することをお勧めします。 制限付きパブリック プレビュー サポートは、Databricks Runtime 13.3 LTS 以降で利用できます。行レベルのコンカレンシーは、Databricks Runtime 13.3 LTS 以降でサポートされており、Databricks Runtime 14.2 以降では、削除ベクトルが有効になっているすべてのテーブルで一般公開されています。「Databricks での分離レベルと書き込みの競合」を参照してください。
リキッドクラスタリングが有効になっているすべての Apache Iceberg テーブルには、 Databricks Runtime 16.4 LTS 以上が必要です。
リキッドクラスタリングは何に使用されますか?
Databricks 、ストリーミングテーブル (STs) とマテリアライズドビュー (MVs) の両方を含むすべての新しいテーブルに対してリキッドクラスタリングを推奨しています。 クラスタリングの恩恵を受けるシナリオの例を次に示します。
- カーディナリティの高い列によってフィルタリングされることが多いテーブル。
- データの分散に大きな偏りがあるテーブル。
- 急速に増大し、メンテナンスとチューニングが必要となるテーブル。
- 並列書き込み要件のあるテーブル。
- 時間の経過と共に変化するアクセスパターンを持つテーブル。
- 一般的なパーティションキーでは、パーティションが多すぎたり少なすぎたりするテーブル。
リキッドクラスタリングを有効にする
リキッドクラスタリングは、既存のテーブルで、またはテーブルの作成中に有効にできます。 クラスタリングはパーティション分割や ZORDER
と互換性がなく、Databricks を使用してテーブル内のデータのすべてのレイアウトおよび最適化操作を管理する必要があります。 リキッドクラスタリングを有効にした後、通常どおり OPTIMIZE
ジョブを実行してデータを段階的にクラスタリングします。 クラスタリングをトリガーする方法を参照してください。
リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントにCLUSTER BY
フレーズを追加します。
Databricks Runtime14.2 以降では、DataFrameAPIs または で と DeltaTable を使用して、API PythonScalaテーブルのリキッドクラスタリングを有効にできます。Delta Lake
- SQL
- Python
- Scala
-- Create an empty Delta table
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
Apache Icebergの場合、マネージド Iceberg テーブルでリキッドクラスタリングを有効にするときに、削除ベクトルと行 ID を明示的に無効にする必要があります。
# Create an empty Delta table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty Delta table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Databricks Runtime 16.0 以降では、次の例のように、構造化ストリーミング の書き込み を使用してリキッドクラスタリングを有効にしたテーブルを作成できます。
:::
- SQL
- Python
- Scala
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1)
TBLPROPERTIES (
'clusterByAuto' = 'true'
);
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
リキッドクラスタリングを有効にして作成されたDeltaテーブルは、作成時に多数のDeltaテーブル機能が有効になっておりDeltaライターバージョン7とリーダーバージョン3を使用しています。これらの機能の一部の有効化を上書きできます。「デフォルトの機能の有効化を上書きする(オプション)」を参照してください。
テーブル プロトコル バージョンはダウングレードできず、クラスタリングが有効になっているテーブルは、有効な Delta リーダー プロトコル テーブル機能をすべてサポートしていない Delta Lake クライアントでは読み取れません。Delta Lake 機能の互換性とプロトコルを参照してください。
次の構文を使用して、既存のパーティション分割されていない Delta テーブルでリキッドクラスタリングを有効にします。
-- Alter an existing table
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
Apache Icebergの場合、既存のマネージドIcebergテーブルでリキッドクラスタリングを有効にするときは、削除ベクトルと行 ID を明示的に無効にする必要があります。
デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。すべてのレコードに対して再クラスタリングを強制するには、 OPTIMIZE FULL
を使用する必要があります。「すべてのレコードの再クラスタリングの強制」を参照してください。
クラスタリング・キーを削除するには、次の構文を使用します。
ALTER TABLE table_name CLUSTER BY NONE;
自動リキッドクラスタリング
Databricks Runtime 15.4 LTS 以降では、Unity Catalog管理Deltaテーブルに対して自動リキッドクラスタリングを有効にできます。自動リキッドクラスタリングを有効にすると、Databricks はクラスタリングキーをインテリジェントに選択してクエリのパフォーマンスを最適化します。自動リキッドクラスタリングを有効にするには、 CLUSTER BY AUTO
節を使用します。
有効にすると、自動キー選択操作と自動クラスタリング操作はメンテナンス操作として非同期に実行され、テーブルに対して予測的最適化が有効になっている必要があります。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。
クラスタリング キーを特定するために、Databricks はテーブルの履歴クエリ ワークロードを分析し、最適な候補列を特定します。クラスタリング キーは、データ スキップの改善による予測されるコスト削減がデータ クラスタリング コストを上回る場合に変更されます。
データのクエリ方法が時間の経過と共に変化したり、クエリのパフォーマンスがデータ分布の変化を示している場合、自動リキッドクラスタリングはパフォーマンスを最適化するために新しいキーを選択します。
自動リキッドクラスタリングによってキーが選択されなかった場合、その理由として次のことが考えられます。
- テーブルが小さすぎてリキッドクラスタリングの恩恵を受けられません。
- テーブルにはすでに適切なクラスタリングスキームがあります。たとえば、以前に一度適用された適切なキーがある場合や、時系列で挿入され、タイムスタンプでクエリされたデータなど、特定のクエリ パターンに対して挿入順序が既に良好に実行されている場合などです。
- テーブルには頻繁なクエリはありません。
- Databricks Runtime 15.4 LTS 以降を使用していない。
自動リキッドクラスタリングは、データやクエリの特性に関係なく、すべての Unity Catalog マネージドテーブルに適用できます。 これらの機能により、データ使用パターンに基づいてデータ レイアウトがインテリジェントに最適化され、クラスタリング キーを選択することがコストメリットがあるかどうかはヒューリスティックによって決定されます。
自動クラスタリングが有効になっているテーブルは、リキッドクラスタリングをサポートするすべてのバージョンから Databricks Runtime 読み書きできます。 ただし、インテリジェントなキー選択は、Databricks Runtime 15.4 LTS で導入されたメタデータに依存します。Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードにメリットをもたらし、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。
自動リキッドクラスタリングを有効または無効にする
新規または既存のテーブルで自動リキッドクラスタリングを有効または無効にするには、次の構文を使用します。
- SQL
- Python
-- Create an empty table.
CREATE OR REPLACE TABLE table1(column01 int, column02 string) CLUSTER BY AUTO;
-- Enable automatic liquid clustering on an existing table,
-- including tables that previously had manually specified keys.
ALTER TABLE table1 CLUSTER BY AUTO;
-- Disable automatic liquid clustering on an existing table.
ALTER TABLE table1 CLUSTER BY NONE;
-- Disable automatic liquid clustering by setting the clustering keys
-- to chosen clustering columns or new columns.
ALTER TABLE table1 CLUSTER BY (column01, column02);
CLUSTER BY AUTO
を指定せずにCREATE OR REPLACE table_name
を実行し、テーブルがすでに存在し、自動リキッドクラスタリングが有効になっている場合、テーブルのAUTO
設定とクラスタリング列(適用されている場合)は、テーブルを置き換えるときに保持されます。また、予測的最適化では、このテーブルの履歴クエリワークロードも保持され、最適なクラスタリングキーが特定されます。
df = spark.read.table("table1")
df.write
.format("delta")
.option(“clusterByAuto”, “true”)
.saveAsTable(...)
# To set clustering columns and auto, which serves as a way to give a hint
# for the initial selection.
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option(“clusterByAuto”, “true”)
.saveAsTable(...)
# Using DataFrameWriterV2
df.writeTo(...).using("delta")
.option(“clusterByAuto”, “true”)
.create()
# To set clustering columns and auto, which serves as a way to give a hint
# for the initial selection.
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option(“clusterByAuto”, “true”)
.create()
# Similar syntax can also be used to set clusterByAuto for streaming tables.
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
# Or to specify a hint for the clustering columns by specifying both auto and columns together
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Python API は、Databricks Runtime 16.4 以降で使用できます。
.clusterBy
を .option('clusterByAuto', 'true)
と一緒に使用すると、次のようになります。
- これにより初めて自動リキッドクラスタリングを設定する場合、常に手動入力が尊重され、
.clusterBy
のクラスタリング列が設定されます。 - これがすでに自動リキッドクラスタリング付きのテーブルであれば、
.clusterBy
を使ったヒントを一度受け入れることができます。 たとえば、.clusterBy
で指定されたカラムは、テーブルにクラスタリングカラムがまだ設定されていない場合にのみ設定されます (ユーザーまたは自動リキッドクラスタリングによって)。
Python を使用できるのは、テーブルを作成または置換するときだけです。SQL を使用して、既存のテーブルの clusterByAuto
ステータスを変更します。
自動クラスタリングが有効になっているかどうかを確認する
テーブルで自動リキッドクラスタリングが有効になっているかどうかを確認するには、DESCRIBE TABLE
または SHOW TBLPROPERTIES
を使用します。
自動リキッドクラスタリングが有効な場合、 clusterByAuto
プロパティは true
に設定されます。 clusteringColumns
プロパティは、自動または手動で選択された現在のクラスタリング列を示します。
制限
自動リキッドクラスタリングは Apache Icebergではご利用いただけません。
デフォルトの機能の有効化を上書きする(オプション)
リキッドクラスタリングが有効な場合にDeltaテーブル機能を有効にするデフォルトの動作を上書きできます。これにより、それらのテーブル機能に関連するリーダーとライターのプロトコルがアップグレードされなくなります。次の手順を完了するには、既存のテーブルが必要です。
-
ALTER TABLE
を使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。SQLALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
-
次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。
SQLALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。
Delta機能 | Runtimeの互換性 | 有効化設定を上書きするためのプロパティ | 無効化によるリキッドクラスタリングへの影響 |
---|---|---|---|
削除ベクトル | 読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。 |
| 行レベルの同時実行性は無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。 行レベルの同時実行性との書き込みの競合を参照してください。 |
行追跡 | 書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。 |
| 行レベルの同時実行性は無効になり、トランザクションとクラスタリング操作が競合する可能性が高くなります。 行レベルの同時実行性との書き込みの競合を参照してください。 |
チェックポイントV2 | 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。 |
| リキッドクラスタリングの動作には影響ありません。 |
クラスタリング キーの選択
Databricks では、サポートされているテーブルに対して自動リキッドクラスタリングを推奨しています。 自動リキッドクラスタリングを参照してください。
Databricks では、クエリ フィルターで最も頻繁に使用される列に基づいてクラスタリング キーを選択することをお勧めします。 クラスタリング・キーは、任意の順序で定義できます。 2 つの列の相関性が高い場合は、そのうちの 1 つだけをクラスタリング キーとして含める必要があります。
最大 4 つのクラスタリングキーを指定できます。 小規模なテーブル (10 TB 未満) では、クラスタリングキーの数が多い (4 つなど) と、1 つのカラムでフィルタリングするときのパフォーマンスが、より少ないクラスタリングキー (2 つなど) と比較して低下する可能性があります。 ただし、テーブル・サイズが大きくなると、単一カラム・クエリにより多くのクラスタリング・キーを使用した場合のパフォーマンスの違いはごくわずかになります。
指定できるのは、統計がクラスタリング・キーとして収集される列のみです。 デフォルトでは、 Delta テーブルの最初の32列に統計が収集されます。 Delta 統計カラムの指定を参照してください。
クラスタリングでは、クラスタリングキーとして次のデータ型がサポートされています。
- 日付
- Timestamp
- TimestampNTZ(Databricks Runtime 14.3 LTS以降が必要)
- 文字列
- Integer
- Long
- Short
- Float
- Double
- Decimal
- Byte
既存のテーブルを変換する場合は、次の推奨事項を考慮してください。
現在のデータ最適化手法 | クラスタリングキーに関する推奨事項 |
---|---|
Hiveスタイルのパーティション分割 | パーティション列をクラスタリングキーとして使用します。 |
Z-orderのインデックス作成 |
|
Hiveスタイルのパーティション分割とZ-order | パーティション列と |
カーディナリティを減らすためのジェネレーテッドカラム(タイムスタンプの日付など) | 元の列をクラスタリング キーとして使用し、生成された列は作成しないでください。 |
クラスタ化されたテーブルにデータを書き込む
クラスタ化されたDeltaテーブルに書き込むには、リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。クラスター化された Iceberg テーブルに書き込むには、Unity Catalog の Iceberg REST Catalog API を使用できます。Databricks では、Databricks Runtime 13.3 LTS 以降を使用する必要があります。
書き込み時にクラスター化する操作には、以下のものがあります。
INSERT INTO
オペレーションCTAS
およびRTAS
ステートメントCOPY INTO
(Parquet由来)spark.write.mode("append")
構造化ストリーミングの書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。 追加の制限が適用されます。 制限事項を参照してください。
書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。
クラスタリング列の数 | Unity Catalogマネージドテーブルのしきい値サイズ | 他のDeltaテーブルのしきい値サイズ |
---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZE
を頻繁に実行することをお勧めします。
クラスタリングをトリガーする方法
予測的最適化 enabled tables の OPTIMIZE コマンドを自動的に実行します。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。予測的最適化を使用する場合、 Databricks スケジュールされた OPTIMIZE ジョブを無効にすることをお勧めします。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS 以降を使用する必要があります。テーブルで OPTIMIZE
コマンドを使用します。
OPTIMIZE table_name;
リキッドクラスタリングはインクリメンタルであり、クラスタリングが必要なデータに対応するために、必要に応じてデータのみが書き換えられます。クラスタリング・キーがクラスタリングされるデータと一致しないデータ・ファイルは、再書き込みされません。
予測的最適化を使用していない場合、 Databricks では、通常の OPTIMIZE
ジョブをデータのクラスタリングにスケジュールすることをお勧めします。 更新や挿入が多いテーブルの場合、Databricks では 1 時間または 2 時間ごとに OPTIMIZE
ジョブをスケジュールすることをお勧めします。リキッドクラスタリングは増分であるため、クラスタ化されたテーブルのほとんどの OPTIMIZE
ジョブは迅速に実行されます。
すべてのレコードの再クラスタリングを強制する
Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。
OPTIMIZE table_name FULL;
OPTIMIZE FULL
を実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。
クラスタリングを初めて有効にするとき、またはクラスタリング キーを変更するときに OPTIMIZE FULL
を実行します。以前に OPTIMIZE FULL
を実行したことがあり、クラスタリング キーに変更がない場合は、OPTIMIZE
と同じようにOPTIMIZE FULL
実行できます。このシナリオでは、 OPTIMIZE
は増分アプローチを使用し、以前に圧縮されていないファイルのみを書き換えます。常に OPTIMIZE FULL
を使用して、データ レイアウトが現在のクラスタリング キーを反映していることを確認します。
クラスタ化テーブルからのデータの読み取り
クラスター化された Delta テーブル内のデータを読み取るには、削除ベクトルの読み取りをサポートする任意の Delta Lake クライアントを使用します。Iceberg REST Catalog API を使用すると、クラスター化された Iceberg テーブル内のデータを読み取ることができます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリング キーの変更
次の例のように、ALTER TABLE
コマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリングキーを変更すると、後続のOPTIMIZE
操作と書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。
次の例のように、キーをNONE
に設定してクラスタリングをオフにすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスタリングキーをNONE
に設定した場合、既にクラスタリング化されたデータは書き換わりませんが、その後のOPTIMIZE
操作でクラスタリングキーが使用されなくなります。
外部エンジンからのリキッドクラスタリングの使用
外部の Iceberg エンジンから、管理された Iceberg テーブルでリキッドクラスタリングを有効にできます。リキッドクラスタリングを有効にするには、テーブルの作成時にパーティション列を指定します。Unity Catalog は、パーティションをクラスタリング キーとして解釈します。たとえば、OSS Spark で次のコマンドを実行します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
リキッドクラスタリングを無効にすることができます。
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
クラスタリングキーは、Iceberg partition evolutionを使用して変更できます。
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
バケット変換を使用してパーティションを指定すると、Unity Catalog は式を削除し、列をクラスタリング キーとして使用します。
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
テーブルのクラスタ化方法を見る
次の例のように、DESCRIBE
コマンドを使用してテーブルのクラスタリングキーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
リキッドクラスタリングを使用するテーブルの互換性
Databricks Runtime 14.1以降でリキッドクラスタリングを有効にして作成されたテーブルでは、デフォルトでv2チェックポイントが使用されます。Databricks Runtime 13.3 LTS以降では、v2チェックポイントを使用してテーブルを読み書きできます。
v2 チェックポイントを無効にし、 Databricks Runtime 12.2 LTS 以降でリキッドクラスタリングを使用してテーブルを読み取るためにテーブルプロトコルをダウングレードできます。 Delta Lake テーブル機能の削除およびテーブル プロトコルのダウングレードを参照してください。
制限
次の制限があります。
- Databricks Runtime 15.1以前において、書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソースクエリがサポートされません。
- 構造化ストリーミングワークロードはクラスタリングオンライトをサポートしていません。
- Databricks Runtime 15.4 LTS 以前では、構造化ストリーミング書き込みを使用してリキッドクラスタリングが有効になっているテーブルを作成することはできません。構造化ストリーミングを使用して、リキッドクラスタリングが有効になっている既存のテーブルにデータを書き込むことができます。
- Iceberg テーブルでは削除ベクトルと行追跡がサポートされていないため、マネージド Iceberg テーブルでは行レベルの同時実行性はサポートされていません。