メインコンテンツまでスキップ

ベスト プラクティス: Delta Lake

この記事では、Delta Lakeを使用する際のベストプラクティスについて説明します。

Databricks では、予測的最適化を使用することをお勧めします。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。

同じ場所にあるテーブルを削除して再作成する場合は、常に CREATE OR REPLACE TABLE ステートメントを使用する必要があります。 「Delta テーブルの削除または置換」を参照してください。

従来の Delta 構成の削除

Databricks では、新しい Databricks Runtime バージョンにアップグレードするときに、Spark 構成とテーブル プロパティから最も明示的なレガシ Delta 構成を削除することをお勧めします。 従来の構成では、Databricks によって導入された新しい最適化とデフォルト値が移行されたワークロードに適用されない可能性があります。

リキッドクラスタリングを使用してデータスキップを最適化

Databricks 、パーティション分割、 Z-Order、またはその他のデータ編成戦略ではなく、リキッドクラスタリングを使用して、データスキップのデータレイアウトを最適化することをお勧めします。 「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。

コンパクトファイル

予測的最適化 自動的に実行 OPTIMIZEVACUUM コマンド on Unity Catalog マネージドテーブル. Unity Catalog マネージドテーブルの予測的最適化を参照してください。

Databricks では、小さなファイルを圧縮するために OPTIMIZE コマンドを頻繁に実行することをお勧めします。

注記

この操作では、古いファイルは削除されません。 これらを削除するには、vacuum コマンドを実行します。

テーブルの内容やスキーマを置き換える

場合によっては、Deltaテーブルを置き換える必要があるかもしれません。例えば:

  • テーブル内のデータが間違っていることに気づき、内容を置き換えたいと考えています。
  • テーブル全体を書き換えて、互換性のないスキーマ変更(列タイプの変更など)を実行したいと考えています。

Deltaテーブルのディレクトリ全体を削除して、同じパスに新しいテーブルを作成することもできますが、次の理由から お勧めできません

  • ディレクトリの削除は効率的ではありません。非常に大きなファイルが含まれるディレクトリの削除には、数時間、場合によっては数日かかる場合があります。

  • 削除されたファイルの内容はすべて失われます。間違ったテーブルを削除すると回復が難しくなります。

  • ディレクトリの削除はアトミックではありません。テーブルを削除している間、テーブルを読み取る同時クエリーが失敗したり、テーブルの一部が表示されたりする可能性があります。

  • S3では、結果的にのみ整合性があるため、潜在的な整合性の問題が発生する可能性があります。

テーブル スキーマを変更する必要がない場合は、Delta テーブルからデータ を削除して 新しいデータを挿入するか、テーブル を更新して 正しくない値を修正できます。

テーブルスキーマを変更する場合は、テーブル全体をアトミックに置き換えることができます。例えば:

Python
dataframe.write \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("<your-table>") # Managed table

dataframe.write \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.option("path", "<your-table-path>") \
.saveAsTable("<your-table>") # External table

このアプローチには複数の利点があります:

  • テーブルの上書きは、ディレクトリを再帰的にリストアップしたり、ファイルを削除したりする必要がないため、はるかに高速です。

  • 古いバージョンのテーブルがまだ存在します。 間違ったテーブルを削除した場合、タイムトラベルを使用して古いデータを簡単に取得できます。 「Delta Lake テーブル履歴の操作」を参照してください。

  • これはアトミックなオペレーションです。同時実行クエリーは、テーブルを削除している間もテーブルを読み取ることができます。

  • Delta Lake ACIDトランザクション保証のため、テーブルの上書きが失敗した場合、テーブルは以前の状態になります。

  • ファイルを削除しないため、S3では一貫性の問題に直面することはありません。

また、テーブルを上書きした後でストレージコストを節約するために古いファイルを削除したい場合は、 vacuum を使用してそれらを削除できます。 ファイルの削除に最適化されており、通常はディレクトリ全体を削除するよりも高速です。

Spark キャッシング

Databricks では、次の理由から Spark キャッシュの使用はお勧めしません。

  • キャッシュされたDataFrameの上に追加された追加のフィルターによって発生する可能性のあるデータスキップにより、データが失われます。
  • キャッシュされるデータは、テーブルが異なる識別子を使用してアクセスされた場合、更新されない可能性があります。

Delta Lake と Apache Spark の Parquet の違い

Delta Lakeは以下のオペレーションを自動的に処理します。これらのオペレーションを手動で実行しないでください:

  • REFRESH TABLE :Deltaテーブルは常に最新の情報を返すので、変更後に手動でREFRESH TABLEを呼び出す必要はありません。
  • パーティションの追加と削除 :Delta Lakeは、テーブルにあるパーティションのセットを自動的に追跡し、データが追加または削除されるとリストを更新します。そのため、ALTER TABLE [ADD|DROP] PARTITIONまたはMSCKを実行する必要はありません。
  • 単一のパーティションをロードする :パーティションを直接読み取る必要はありません。たとえば、spark.read.format("parquet").load("/data/date=2017-01-01")を実行する必要はありません。代わりに、データをスキップするには、spark.read.table("<table-name>").where("date = '2017-01-01'")などのWHERE句を使用します。
  • データファイルを手動で変更しないでください :Delta Lakeはトランザクションログを使用して、テーブルへの変更をアトミックにコミットします。データの損失やテーブルの破損につながる可能性があるため、Deltaテーブル内のParquetデータファイルを直接変更、追加、削除しないでください。

Delta Lake マージのパフォーマンスを向上させる

マージにかかる時間を短縮するには、次の方法を使用します。

  • 一致の検索スペースを縮小 します:デフォルトでは、mergeオペレーションはソーステーブルのマッチを見つけるためにDeltaテーブル全体を検索します。mergeを高速化する一つの方法は、マッチ条件に既知の制約を追加して探索空間を縮小することです。たとえば、countrydateでパーティション化されたテーブルがあり、mergeを使用して最終日と特定の国の情報を更新します。以下の条件を追加すると、関連するパーティションでのみマッチを検索するため、クエリーがより高速になります:

    SQL
    events.date = current_date() AND events.country = 'USA'

    さらに、このクエリは、他の並列操作との競合の可能性も減らします。 詳細については、「 Databricks での分離レベルと書き込みの競合 」を参照してください。

  • コンパクト ファイル : データが多数の小さなファイルに保存されている場合、データを読み取って一致を検索するのが遅くなる可能性があります。 小さなファイルを大きなファイルに圧縮して、読み取りスループットを向上させることができます。 詳細については、「 データ ファイルのレイアウトの最適化 」を参照してください。

  • 書き込み用のシャッフルパーティションを制御するmergeオペレーションはデータを複数回シャッフルして、更新されたデータを計算して書き込みます。シャッフルに使用されるタスクの数は、Sparkセッション構成spark.sql.shuffle.partitionsによって制御されます。このパラメーターを設定すると、並列処理が制御されるだけでなく、出力ファイルの数も決まります。値を増やすと並列処理が増加しますが、より多くの小さなデータファイルが生成されます。

  • 最適化された書き込みを有効にする : パーティション分割されたテーブルの場合、 merge はシャッフルパーティションの数よりもはるかに多くの小さなファイルを生成できます。 これは、すべてのシャッフル タスクが複数のパーティションに複数のファイルを書き込む可能性があり、パフォーマンスのボトルネックになる可能性があるためです。 最適化された書き込みを有効にすることで、ファイルの数を減らすことができます。 「Databricks 上の Delta Lake の最適化された書き込み」を参照してください。

  • テーブル内のファイル サイズを調整する : Databricks は、Delta テーブルでファイルを書き換える頻繁な merge 操作があるかどうかを自動的に検出し、将来のさらなるファイル書き換えを見越して書き換えられたファイルのサイズを縮小することを選択できます。 詳細については、 ファイルサイズのチューニング に関するセクションを参照してください。

  • 低シャッフル マージ : 低シャッフル マージはMERGE の最適化された実装を提供し、最も一般的なワークロードでより優れたパフォーマンスを提供します。 さらに、変更されていないデータに対する Z-Ordering など、既存のデータ レイアウトの最適化が保持されます。

データの最新性の管理

各クエリーの開始時に、Deltaテーブルはテーブルの最新バージョンに自動更新されます。このプロセスは、コマンドステータスがUpdating the Delta table's stateと報告するときにノートブックで観察できます。ただし、テーブルで履歴分析を実行する場合、特にストリーミングデータが頻繁に取り込まれるテーブルの場合は、必ずしも最新のデータが必要なわけではありません。このような場合、Deltaテーブルの古いスナップショットに対してクエリーを実行できます。このアプローチにより、クエリーから結果を取得する際の待ち時間を短縮できます。

古いデータの許容範囲を構成するには、Spark セッション構成 spark.databricks.delta.stalenessLimit1h15m などの時間文字列値 (それぞれ 1 時間または 15 分) を設定します。 この設定はセッション固有で、テーブルにアクセスする他のクライアントには影響しません。 テーブルの状態が古い制限内で更新された場合、テーブルに対するクエリは最新のテーブル更新を待たずに結果を返します。 この設定では、テーブルの更新が妨げられることはありません。古いデータが返された場合、更新はバックグラウンドで処理されます。 最後のテーブル更新が古さの制限よりも古い場合、クエリはテーブルの状態の更新が完了するまで結果を返しません。

低レイテンシのクエリのための拡張チェックポイント

Delta Lake は、チェックポイント を Delta テーブルの集計状態として最適な頻度で書き込みます。 これらのチェックポイントは、テーブルの最新の状態をコンピュートするための開始点として機能します。 チェックポイントがない場合、 Delta Lake 、トランザクション ログへのコミットを表す JSON ファイル ("デルタ" ファイル) の大規模なコレクションを読み取って、テーブルの状態をコンピュートする必要があります。 さらに、Delta Lake が データのスキップ を実行するために使用する列レベルの統計は、チェックポイントに格納されます。

important

Delta Lake チェックポイントは、構造化ストリーミング チェックポイントとは異なります。 構造化ストリーミング・チェックポイントを参照してください。

列レベルの統計は、構造体と JSON (下位互換性のため) として格納されます。 struct 形式を使用すると、次の理由により、Delta Lake の読み取りが大幅に高速になります。

  • Delta Lakeは、列レベルの統計を取得するために高価なJSON解析を実行しません。
  • Parquet列のプルーニング機能により、列の統計を読み取るために必要なI/Oが大幅に削減されます。

構造体形式を使用すると、Delta Lake読み取りオペレーションのオーバーヘッドを数秒から数十ミリ秒に削減する一連の最適化が可能になり、短いクエリーの待ち時間が大幅に短縮されます。

チェックポイントでの列レベルの統計の管理

テーブルプロパティdelta.checkpoint.writeStatsAsJsonおよびdelta.checkpoint.writeStatsAsStructを使用して、チェックポイントに統計を書き込む方法を管理します。両方のテーブルプロパティがfalseの場合、Delta Lakeはデータスキップを実行 できません

  • バッチ書き込みでは、JSON形式と構造体形式の両方で統計情報を書き込みます。delta.checkpoint.writeStatsAsJsontrueです.
  • delta.checkpoint.writeStatsAsStruct デフォルトでは未定義です。
  • リーダーは、利用可能な場合はstruct列を使用し、利用できない場合はJSON列の使用に戻ります。
important

強化されたチェックポイントによって、オープンソースのDelta Lakeリーダーとの互換性が損なわれることはありません。ただし、delta.checkpoint.writeStatsAsJsonfalseに設定すると、独自のDelta Lakeリーダーに影響が出る可能性があります。パフォーマンスへの影響の詳細については、ベンダーにお問い合わせください。

構造化ストリーミング クエリの拡張チェックポイントを有効にする

Structured Streamingワークロードに低レイテンシー要件(1分未満のレイテンシー)がない場合は、次のSQLコマンドを実行して拡張チェックポイントを有効にすることができます:

SQL
ALTER TABLE <table-name> SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

次のテーブルプロパティを設定することで、チェックポイントの書き込み遅延を改善することもできます:

SQL
ALTER TABLE <table-name> SET TBLPROPERTIES
(
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.checkpoint.writeStatsAsJson' = 'false'
)

アプリケーションでデータスキップが役に立たない場合は、両方のプロパティをfalseに設定できます。その場合、統計は収集も書き込まれません。Databricksはこの構成を推奨していません。