Deltaテーブルにリキッドクラスタリングを使用する

Delta Lakeリキッドクラスタリングは、テーブルのパーティション分割とZORDERを置き換えて、データレイアウトの決定を簡素化し、クエリーのパフォーマンスを最適化します。リキッドクラスタリングは、既存のデータを書き換えることなくクラスタリングキーを再定義する柔軟性を提供し、時間の経過とともに分析のニーズに応じてデータレイアウトを進化させることができます。

重要

Databricksでは、リキッドクラスタリングが有効なすべてのテーブルに対して、Databricks Runtime 15.2以降を使用することをお勧めします。制限付きのパブリックプレビューサポートは、Databricks Runtime 13.3 LTS以降で使用できます。

注:

Databricks Runtime 13.3LTS以降では、リキッドクラスタリングが有効になっているテーブルで行レベルの同時実行がサポートされます。行レベルの同時実行は、削除ベクトルが有効なすべてのテーブルに対し、Databricks Runtime 14.2以降で一般公開されています。「Databricksの分離レベルと書き込み競合」を参照してください。

リキッドクラスタリングは何に使用されますか?

Databricksでは、すべての新しいDeltaテーブルにリキッドクラスタリングを推奨しています。クラスターの恩恵を受けるシナリオの例を次に示します。

  • カーディナリティの高い列によってフィルタリングされることが多いテーブル。

  • データの分散に大きな偏りがあるテーブル。

  • 急速に増大し、メンテナンスとチューニングが必要となるテーブル。

  • 並列書き込み要件のあるテーブル。

  • 時間の経過と共に変化するアクセスパターンを持つテーブル。

  • 一般的なパーティションキーでは、パーティションが多すぎたり少なすぎたりするテーブル。

リキッドクラスタリングを有効にする

リキッドクラスタリングは、既存のテーブルに対して、またはテーブルの作成時に有効にすることができます。クラスタリングはパーティション分割やZORDERと互換性がなく、Databricksを使用してテーブル内のデータのすべてのレイアウト操作と最適化操作を管理する必要があります。リキッドクラスタリングを有効にすると、OPTIMIZEジョブを通常どおり実行することでデータを増分的にクラスター化できます。「クラスタリングをトリガーする方法」を参照してください。

リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントにCLUSTER BYフレーズを追加します。

注:

Databricks Runtime 14.2以降では、PythonまたはScalaでDataFrame APIとDeltaTable APIを使用してリキッドクラスタリングを有効にできます。

-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);

-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)  -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;

-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
# Create an empty table
(DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute())

# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty table
DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute()

// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

Databricks Runtime 16.0 以降では、次の例のように、構造化ストリーミング write を使用してリキッドクラスタリングを有効にしたテーブルを作成できます。

(spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")
)
spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

警告

リキッドクラスタリングを有効にして作成されたテーブルでは、作成時に多数のDeltaテーブル機能が有効になっており、Deltaライターバージョン7およびリーダーバージョン3が使用されます。これらの機能の一部の有効化設定を上書きできます。「デフォルトの機能有効化設定を上書きする(オプション)」を参照してください。

テーブルプロトコルのバージョンはダウングレードできません。また、有効なDeltaリーダープロトコルテーブル機能のすべてをサポートしていないDelta Lakeクライアントからは、クラスタリングが有効になっているテーブルを読み取ることができません。「DatabricksでDelta Lake機能の互換性を管理する方法」を参照してください。

次の構文を使用すると、既存のパーティション化されていないDeltaテーブルでリキッドクラスタリングを有効にできます。

ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

重要

デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。 すべてのレコードに対して再クラスタリングを強制するには、 OPTIMIZE FULLを使用する必要があります。 「すべてのレコードの再クラスタリングの強制」を参照してください。

デフォルトの機能有効化設定を上書きする(オプション)

リキッドクラスタリングが有効な場合にDeltaテーブル機能を有効にするデフォルトの動作を上書きできます。これにより、それらのテーブル機能に関連するリーダーとライターのプロトコルがアップグレードされなくなります。次の手順を完了するには、既存のテーブルが必要です。

  1. ALTER TABLEを使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。

    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
    
  2. 次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。

    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)
    

次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。

Delta機能

Runtimeの互換性

有効化設定を上書きするためのプロパティ

無効化によるリキッドクラスタリングへの影響

削除ベクトル

読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。

'delta.enableDeletionVectors' = false

行レベルの同時実行が無効になるため、トランザクションとクラスタリング操作が競合しやすくなります。「行レベルの同時実行との書き込みの競合」を参照してください。

DELETEMERGEUPDATEコマンドの実行速度が遅くなる可能性があります。

行追跡

書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。

'delta.enableRowTracking' = false

行レベルの同時実行が無効になるため、トランザクションとクラスタリング操作が競合しやすくなります。「行レベルの同時実行との書き込みの競合」を参照してください。

チェックポイントV2

読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。

'delta.checkpointPolicy' = 'classic'

リキッドクラスタリングの動作には影響ありません。

クラスタリングキーの選択

Databricksでは、一般的に使用されるクエリーフィルターに基づいてクラスタリングキーを選択することを推奨しています。クラスタリングキーは任意の順序で定義できます。2つの列が相関している場合は、そのうちの1つだけをクラスタリングキーとして追加する必要があります。

クラスタリングキーとして指定できるのは最大4列までです。クラスタリングキー用に収集された統計情報を持つ列のみを指定できます。デフォルトでは、Deltaテーブルの最初の32列に統計情報が収集されています。「Delta統計列の指定」を参照してください。

クラスタリングでは、クラスタリングキーとして次のデータ型がサポートされています。

  • Date

  • Timestamp

  • TimestampNTZ(Databricks Runtime 14.3 LTS以降が必要)

  • String

  • Integer

  • Long

  • Short

  • Float

  • Double

  • Decimal

  • Byte

既存のテーブルを変換する場合は、以下の推奨事項を考慮してください。

現在のデータ最適化手法

クラスタリングキーに関する推奨事項

Hiveスタイルのパーティション分割

パーティション列をクラスタリングキーとして使用します。

Z-orderのインデックス作成

ZORDER BY列をクラスタリングキーとして使用します。

Hiveスタイルのパーティション分割とZ-order

パーティション列とZORDER BY列の両方をクラスタリングキーとして使用します。

カーディナリティを減らすための生成済み列(タイムスタンプの日付など)

元の列をクラスタリングキーとして使用し、生成済み列を作成しないでください。

クラスター化されたテーブルへのデータの書き込み

リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。Databricksでは、Databricks Runtime 13.3 LTS以降を使用する必要があります。

書き込み時にクラスタリングが行われる操作には、次のものがあります。

  • INSERT INTO オペレーション

  • CTAS およびRTASステートメント

  • COPY INTO (Parquet由来)

  • spark.write.mode("append")

構造化ストリーミングによる書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。追加の制限が適用されます。「制限事項」を参照してください。

書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。

クラスタリング列の数

Unity Catalogマネージドテーブルのしきい値サイズ

他のDeltaテーブルのしきい値サイズ

1

64 MB

256 MB

2

256 MB

1 GB

3

512 MB

2 GB

4

1 GB

4 GB

すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZEを頻繁に実行することをお勧めします。

クラスタリングをトリガーする方法

予測的最適化は、予測的最適化が有効になっているテーブルに対してOPTIMIZEコマンドを自動的に実行する機能です。「Unity Catalog管理テーブルの予測的最適化」を参照してください。

クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS以降を使用する必要があります。次の例のように、テーブルに対してOPTIMIZEコマンドを使用します。

OPTIMIZE table_name;

リキッドクラスタリングはインクリメンタルであり、クラスタリングする必要があるデータに対応するために、必要な場合にのみデータが書き換えられます。クラスター化されるデータと一致しないクラスタリングキーを持つデータファイルは書き換えられません。

最適なパフォーマンスを得るために、Databricksでは、通常のOPTIMIZEジョブをクラスターデータにスケジュールすることをお勧めしています。また、多くの更新または挿入が発生しているテーブルの場合、Databricksでは、1時間または2時間ごとにOPTIMIZEジョブをスケジュールすることをお勧めしています。リキッドクラスタリングは増分であるため、クラスター化されたテーブルのほとんどのOPTIMIZEジョブは迅速に実行されます。

すべてのレコードの再クラスタリングを強制する

Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。

OPTIMIZE table_name FULL;

重要

OPTIMIZE FULLを実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。

クラスタリングを初めて有効にするとき、またはクラスタリング キーを変更するときに OPTIMIZE FULL を実行します。 以前に OPTIMIZE FULL を実行したことがあり、クラスタリング キーに変更がない場合は、OPTIMIZEと同じようにOPTIMIZE FULL実行できます。常に OPTIMIZE FULL を使用して、データ レイアウトが現在のクラスタリング キーを反映していることを確認します。

クラスター化されたテーブルからデータを読み取る

クラスター化されたテーブルのデータは、削除ベクトルの読み取りをサポートする任意のDelta Lakeクライアントを使用して読み取ることができます。最適なクエリー結果を得るには、次の例のように、クエリーフィルターにクラスタリングキーを含めます。

SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

クラスタリングキーの変更

次の例のように、ALTER TABLEコマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。

ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

クラスタリングキーを変更すると、後続のOPTIMIZE操作と書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。

次の例のように、キーをNONEに設定してクラスタリングをオフにすることもできます。

ALTER TABLE table_name CLUSTER BY NONE;

クラスターキーをNONEに設定した場合、既にクラスター化されたデータは書き換わりませんが、その後のOPTIMIZE操作でクラスタリングキーが使用されなくなります。

テーブルのクラスター化方法を確認する

次の例のように、DESCRIBEコマンドを使用してテーブルのクラスタリングキーを確認できます。

DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

リキッドクラスタリングが有効なテーブルの互換性

Databricks Runtime 14.1以降でリキッドクラスタリングを有効にして作成されたテーブルでは、デフォルトでv2チェックポイントが使用されます。Databricks Runtime 13.3 LTS以降では、v2チェックポイントを使用してテーブルを読み書きできます。

Databricks Runtime 12.2 LTS以降では、v2チェックポイントを無効にし、テーブルプロトコルをダウングレードすることで、リキッドクラスタリングが有効なテーブルを読み取ることができます。「Deltaテーブル機能の削除」を参照してください。

制限事項

次の制限があります。

  • Databricks Runtime 15.1以前において、書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソースクエリがサポートされません。

  • 構造化ストリーミングワークロードはクラスタリングオンライトをサポートしていません。

  • Databricks Runtime 15.4 LTS 以前では、構造化ストリーミング書き込みを使用してリキッドクラスタリングが有効になっているテーブルを作成することはできません。構造化ストリーミングを使用して、リキッドクラスタリングが有効になっている既存のテーブルにデータを書き込むことができます。