ベスト プラクティス: Delta Lake
この記事では、Delta Lakeを使用する際のベストプラクティスについて説明します。
ベスト プラクティスの概要
以下は、ほとんどの Delta Lake ワークロードに適用される一般的な推奨事項です。
- マネージドテーブル Unity Catalog を使用します。 「マネージドテーブルの操作」を参照してください。
- 予測的最適化を使用します。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。
- リキッドクラスタリングを使用します。「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。
- 同じ場所にあるテーブルを削除して再作成する場合は、常に
CREATE OR REPLACE TABLE
ステートメントを使用する必要があります。 Delta テーブルの削除または置換を参照してください。
従来の Delta 構成の削除
Databricks では、新しい Databricks Runtime バージョンにアップグレードするときに、Spark 構成とテーブル プロパティから最も明示的なレガシ Delta 構成を削除することをお勧めします。 従来の構成では、Databricks によって導入された新しい最適化とデフォルト値が移行されたワークロードに適用されない可能性があります。
コンパクトファイル
Unity Catalog マネージドテーブルにおいては、予測的最適化は自動的に OPTIMIZE
と VACUUM
コマンドを実行します。Unity Catalog マネージドテーブルの予測的最適化を参照してください。
Databricks では、小さなファイルを圧縮するために OPTIMIZE コマンドを頻繁に実行することをお勧めします。
この操作では、古いファイルは削除されません。 これらを削除するには、vacuum コマンドを実行します。
Delta Lake で Spark キャッシュを使用しない
Databricks では、次の理由から Spark キャッシュの使用はお勧めしません。
- キャッシュされた
DataFrame
の上に追加された追加のフィルターによって発生する可能性のあるデータスキップにより、データが失われます。 - キャッシュされるデータは、テーブルが異なる識別子を使用してアクセスされた場合、更新されない可能性があります。
Delta Lake と Apache Spark の Parquet の違い
Delta Lakeは以下のオペレーションを自動的に処理します。これらのオペレーションを手動で実行しないでください:
REFRESH TABLE
:Deltaテーブルは常に最新の情報を返すので、変更後に手動でREFRESH TABLE
を呼び出す必要はありません。- パーティションの追加と削除 :Delta Lakeは、テーブルにあるパーティションのセットを自動的に追跡し、データが追加または削除されるとリストを更新します。そのため、
ALTER TABLE [ADD|DROP] PARTITION
またはMSCK
を実行する必要はありません。 - 単一のパーティションをロードする :パーティションを直接読み取る必要はありません。たとえば、
spark.read.format("parquet").load("/data/date=2017-01-01")
を実行する必要はありません。代わりに、データをスキップするには、spark.read.table("<table-name>").where("date = '2017-01-01'")
などのWHERE
句を使用します。 - データファイルを手動で変更しないでください :Delta Lakeはトランザクションログを使用して、テーブルへの変更をアトミックにコミットします。データの損失やテーブルの破損につながる可能性があるため、Deltaテーブル内のParquetデータファイルを直接変更、追加、削除しないでください。
Delta Lake マージのパフォーマンスを向上させる
マージにかかる時間を短縮するには、次の方法を使用します。
-
マッチする探索空間を縮小します :デフォルトでは、
merge
オペレーションはソーステーブルのマッチを見つけるためにDeltaテーブル全体を検索します。merge
を高速化する一つの方法は、マッチ条件に既知の制約を追加して探索空間を縮小することです。たとえば、country
とdate
でパーティション化されたテーブルがあり、merge
を使用して最終日と特定の国の情報を更新します。以下の条件を追加すると、関連するパーティションでのみマッチを検索するため、クエリーがより高速になります:SQLevents.date = current_date() AND events.country = 'USA'
さらに、このクエリは、他の並列操作との競合の可能性も減らします。 詳細については、「 Databricks での分離レベルと書き込みの競合 」を参照してください。
-
ファイルをコンパクトに : データが多数の小さなファイルに保存されている場合、データを読み取って一致を検索するのが遅くなる可能性があります。 小さなファイルを大きなファイルに圧縮して、読み取りスループットを向上させることができます。 詳細については、「 データ ファイルのレイアウトの最適化 」を参照してください。
-
書き込み用のシャッフルパーティションを制御する :
merge
オペレーションはデータを複数回シャッフルして、更新されたデータを計算して書き込みます。シャッフルに使用されるタスクの数は、Sparkセッション構成spark.sql.shuffle.partitions
によって制御されます。このパラメーターを設定すると、並列処理が制御されるだけでなく、出力ファイルの数も決まります。値を増やすと並列処理が増加しますが、より多くの小さなデータファイルが生成されます。 -
最適化された書き込みを有効にする : パーティション分割されたテーブルの場合、
merge
はシャッフルパーティションの数よりもはるかに多くの小さなファイルを生成できます。 これは、すべてのシャッフル タスクが複数のパーティションに複数のファイルを書き込む可能性があり、パフォーマンスのボトルネックになる可能性があるためです。 最適化された書き込みを有効にすることで、ファイルの数を減らすことができます。 Databricks 上の Delta Lake の最適化された書き込みを参照してください。 -
テーブル内のファイル サイズを調整する : Databricks は、Delta テーブルでファイルを書き換える頻繁な
merge
操作があるかどうかを自動的に検出し、将来のさらなるファイル書き換えを見越して書き換えられたファイルのサイズを縮小することを選択できます。 詳細については、 ファイルサイズのチューニング に関するセクションを参照してください。 -
低シャッフル マージ : 低シャッフル マージは 、
MERGE
の最適化された実装を提供し、最も一般的なワークロードでより優れたパフォーマンスを提供します。 さらに、変更されていないデータに対する Z-Ordering など、既存のデータ レイアウトの最適化が保持されます。