メインコンテンツまでスキップ

ベスト プラクティス: Delta Lake

この記事では、Delta Lakeを使用する際のベストプラクティスについて説明します。

ベスト プラクティスの概要

以下は、ほとんどの Delta Lake ワークロードに適用される一般的な推奨事項です。

従来の Delta 構成の削除

Databricks では、新しい Databricks Runtime バージョンにアップグレードするときに、Spark 構成とテーブル プロパティから最も明示的なレガシ Delta 構成を削除することをお勧めします。 従来の構成では、Databricks によって導入された新しい最適化とデフォルト値が移行されたワークロードに適用されない可能性があります。

コンパクトファイル

Unity Catalog マネージドテーブルにおいては、予測的最適化は自動的に OPTIMIZEVACUUM コマンドを実行します。Unity Catalog マネージドテーブルの予測的最適化を参照してください。

Databricks では、小さなファイルを圧縮するために OPTIMIZE コマンドを頻繁に実行することをお勧めします。

注記

この操作では、古いファイルは削除されません。 これらを削除するには、vacuum コマンドを実行します。

Delta Lake で Spark キャッシュを使用しない

Databricks では、次の理由から Spark キャッシュの使用はお勧めしません。

  • キャッシュされたDataFrameの上に追加された追加のフィルターによって発生する可能性のあるデータスキップにより、データが失われます。
  • キャッシュされるデータは、テーブルが異なる識別子を使用してアクセスされた場合、更新されない可能性があります。

Delta Lake と Apache Spark の Parquet の違い

Delta Lakeは以下のオペレーションを自動的に処理します。これらのオペレーションを手動で実行しないでください:

  • REFRESH TABLE :Deltaテーブルは常に最新の情報を返すので、変更後に手動でREFRESH TABLEを呼び出す必要はありません。
  • パーティションの追加と削除 :Delta Lakeは、テーブルにあるパーティションのセットを自動的に追跡し、データが追加または削除されるとリストを更新します。そのため、ALTER TABLE [ADD|DROP] PARTITIONまたはMSCKを実行する必要はありません。
  • 単一のパーティションをロードする :パーティションを直接読み取る必要はありません。たとえば、spark.read.format("parquet").load("/data/date=2017-01-01")を実行する必要はありません。代わりに、データをスキップするには、spark.read.table("<table-name>").where("date = '2017-01-01'")などのWHERE句を使用します。
  • データファイルを手動で変更しないでください :Delta Lakeはトランザクションログを使用して、テーブルへの変更をアトミックにコミットします。データの損失やテーブルの破損につながる可能性があるため、Deltaテーブル内のParquetデータファイルを直接変更、追加、削除しないでください。

Delta Lake マージのパフォーマンスを向上させる

マージにかかる時間を短縮するには、次の方法を使用します。

  • マッチする探索空間を縮小します :デフォルトでは、mergeオペレーションはソーステーブルのマッチを見つけるためにDeltaテーブル全体を検索します。mergeを高速化する一つの方法は、マッチ条件に既知の制約を追加して探索空間を縮小することです。たとえば、countrydateでパーティション化されたテーブルがあり、mergeを使用して最終日と特定の国の情報を更新します。以下の条件を追加すると、関連するパーティションでのみマッチを検索するため、クエリーがより高速になります:

    SQL
    events.date = current_date() AND events.country = 'USA'

    さらに、このクエリは、他の並列操作との競合の可能性も減らします。 詳細については、「 Databricks での分離レベルと書き込みの競合 」を参照してください。

  • ファイルをコンパクトに : データが多数の小さなファイルに保存されている場合、データを読み取って一致を検索するのが遅くなる可能性があります。 小さなファイルを大きなファイルに圧縮して、読み取りスループットを向上させることができます。 詳細については、「 データ ファイルのレイアウトの最適化 」を参照してください。

  • 書き込み用のシャッフルパーティションを制御するmergeオペレーションはデータを複数回シャッフルして、更新されたデータを計算して書き込みます。シャッフルに使用されるタスクの数は、Sparkセッション構成spark.sql.shuffle.partitionsによって制御されます。このパラメーターを設定すると、並列処理が制御されるだけでなく、出力ファイルの数も決まります。値を増やすと並列処理が増加しますが、より多くの小さなデータファイルが生成されます。

  • 最適化された書き込みを有効にする : パーティション分割されたテーブルの場合、 merge はシャッフルパーティションの数よりもはるかに多くの小さなファイルを生成できます。 これは、すべてのシャッフル タスクが複数のパーティションに複数のファイルを書き込む可能性があり、パフォーマンスのボトルネックになる可能性があるためです。 最適化された書き込みを有効にすることで、ファイルの数を減らすことができます。 Databricks 上の Delta Lake の最適化された書き込みを参照してください。

  • テーブル内のファイル サイズを調整する : Databricks は、Delta テーブルでファイルを書き換える頻繁な merge 操作があるかどうかを自動的に検出し、将来のさらなるファイル書き換えを見越して書き換えられたファイルのサイズを縮小することを選択できます。 詳細については、 ファイルサイズのチューニング に関するセクションを参照してください。

  • 低シャッフル マージ : 低シャッフル マージはMERGE の最適化された実装を提供し、最も一般的なワークロードでより優れたパフォーマンスを提供します。 さらに、変更されていないデータに対する Z-Ordering など、既存のデータ レイアウトの最適化が保持されます。