メインコンテンツまでスキップ

アダプティブ クエリ実行

アダプティブ クエリ実行 (AQE) は、クエリの実行中に発生するクエリの再最適化です。

ランタイムの再最適化の動機は、Databricks がシャッフルおよびブロードキャスト交換 (AQE ではクエリステージと呼ばれます) の終了時に最新の正確な統計を持っていることです。 その結果、Databricks は、より優れた物理戦略を選択したり、最適なシャッフル後のパーティション サイズと数を選択したり、スキュー結合処理など、ヒントが必要だった最適化を行ったりすることができます。

これは、統計収集が有効になっていない場合や、統計が古い場合に非常に便利です。 また、複雑なクエリの最中やデータスキューが発生した後など、静的に導き出された統計が不正確な場所でも役に立ちます。

資格

AQE はデフォルトで有効になっています。 これには 4 つの主要な機能があります。

  • ソート・マージ結合をブロードキャスト・ハッシュ結合に動的に変更します。
  • シャッフル交換後にパーティションを動的に合体します (小さなパーティションを適度なサイズのパーティションに結合します)。 非常に小さなタスクは I/O スループットが悪くなり、スケジュールのオーバーヘッドとタスク設定のオーバーヘッドに悩まされる傾向があります。 小さなタスクを組み合わせると、リソースが節約され、クラスターのスループットが向上します。
  • ソート、マージ結合、シャッフルハッシュ結合でスキューを動的に処理し、スキューされたタスクをほぼ均等なサイズのタスクに分割(および必要に応じてレプリケート)します。
  • 空のリレーションを動的に検出して伝播します。

アプリケーション

AQE は、次のすべてのクエリに適用されます。

  • 非ストリーミング
  • 少なくとも 1 つのエクスチェンジ (通常は、結合、集計、またはウィンドウがある場合)、1 つのサブクエリ、またはその両方が含まれます。

AQE が適用されたすべてのクエリが必ずしも再最適化されるわけではありません。 再最適化では、静的にコンパイルされたクエリ プランとは異なるクエリ プランが作成される場合とされない場合があります。 クエリのプランが AQE によって変更されたかどうかを確認するには、次の「 クエリ プラン」を参照してください。

クエリ プラン

このセクションでは、クエリ プランをさまざまな方法で調べる方法について説明します。

このセクションの内容:

Spark UI

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常は各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。クエリが実行される前または実行中に、対応するAdaptiveSparkPlanノードのisFinalPlanフラグは次のように表示されますfalse。クエリの実行が完了すると、isFinalPlan フラグが true.

進化する計画

クエリ プラン図は、実行の進行に伴って進化し、実行中の最新のプランを反映します。 すでに実行されているノード(メトリクスが利用可能なノード)は変更されませんが、実行されていないノードは、再最適化の結果として時間の経過とともに変化する可能性があります。

クエリ プラン図の例を次に示します。

クエリ プラン図

DataFrame.explain()

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常は各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。 クエリが実行される前または実行中に、対応するAdaptiveSparkPlanノードのisFinalPlanフラグは次のように表示されますfalse。クエリの実行が完了すると、isFinalPlan フラグが trueに変わります。

現在の計画と初期計画

AdaptiveSparkPlan ノードの下には、実行が完了したかどうかに応じて、初期計画(AQE最適化を適用する前の計画)と現在または最終計画の両方があります。 現在の計画は、実行が進むにつれて進化します。

Runtime 統計

各シャッフル・ステージとブロードキャスト・ステージには、データ統計が含まれています。

ステージの実行前またはステージの実行中は、統計はコンパイル時の推定値であり、フラグ isRuntimefalseです (例: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

ステージの実行が完了すると、統計はランタイムで収集された統計になり、フラグ isRuntimetrueになります。たとえば、次のようになります。 Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

次に、 DataFrame.explain 例を示します。

  • 実行前

    実行前

  • 実行中

    実行中

  • 実行後

    実行後

SQL EXPLAIN

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常は各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。

現在の計画はありません

SQL EXPLAIN はクエリを実行しないので、現在の計画は常に当初の計画と同じで、AQEが最終的に実行するものを反映していません。

次に、SQL の説明例を示します。

SQLの説明

有効性

クエリ プランは、1 つ以上の AQE 最適化が有効になると変更されます。 これらの AQE 最適化の効果は、現在のプランと最終プラン、および現在のプランと最終プランの初期プランと特定のプラン ノードの違いによって示されます。

  • ソートマージ結合をブロードキャストハッシュ結合に動的に変更する:現在/最終プランと初期プランの間で異なる物理結合ノード

    ジョイン戦略文字列

  • パーティションの動的合体: プロパティを持つノードCustomShuffleReader Coalesced

    カスタムシャッフルリーダー

    カスタムシャッフルリーダー文字列

  • スキュー結合を動的に処理します: フィールドisSkewが true のノードSortMergeJoin

    スキュー・ジョイン計画

    スキュー結合文字列

  • 空のリレーションを動的に検出して伝播します。プランの一部(または全体)は、リレーションフィールドが空のノードlocalTableScanに置き換えられます。

    ローカル テーブル スキャン

    ローカル テーブル スキャン文字列

構成

このセクションの内容:

アダプティブ クエリ実行の有効化と無効化

属性

spark.databricks.optimizer.adaptive.enabled タイプ: Boolean アダプティブ クエリの実行を有効にするか無効にするか。 デフォルト値 true

自動最適化シャッフルを有効にする

属性

spark.sql.shuffle.partitions タイプ: Integer 結合または集計のためにデータをシャッフルするときに使用するパーティションのデフォルト数。 値を auto に設定すると、自動最適化シャッフルが有効になり、クエリ プランとクエリ入力データ サイズに基づいてこの数が自動的に決定されます。 注: 構造化ストリーミングの場合、同じチェックポイントの場所からクエリを再起動する間でこの構成を変更することはできません。 デフォルト値: 200

ソートマージ結合をブロードキャストハッシュ結合に動的に変更

属性

spark.databricks.adaptive.autoBroadcastJoinThreshold タイプ: Byte String ランタイムでブロードキャスト参加への切り替えをトリガーするしきい値。 デフォルト値 30MB

パーティションの動的合体

属性

spark.sql.adaptive.coalescePartitions.enabled タイプ: Boolean パーティションの合体を有効にするか無効にするか。 デフォルト値 true

spark.sql.adaptive.advisoryPartitionSizeInBytes タイプ: Byte String 合体後の目標サイズ。 結合されたパーティション サイズは、このターゲット サイズに近くなりますが、それより大きくなることはありません。 デフォルト値 64MB

spark.sql.adaptive.coalescePartitions.minPartitionSize タイプ: Byte String 結合後のパーティションの最小サイズ。 結合されたパーティションのサイズは、このサイズより小さくなることはありません。 デフォルト値 1MB

spark.sql.adaptive.coalescePartitions.minPartitionNum タイプ: Integer 合体後のパーティションの最小数。 設定は明示的に上書きされるため、推奨されません spark.sql.adaptive.coalescePartitions.minPartitionSize. デフォルト value: 2x no.クラスター コアの数

スキュー結合を動的に処理する

属性

spark.sql.adaptive.skewJoin.enabled タイプ: Boolean スキュー・ジョイン処理を有効にするか無効にするか。 デフォルト値 true

spark.sql.adaptive.skewJoin.skewedPartitionFactor タイプ: Integer パーティション サイズの中央値を掛けると、パーティションが歪んでいるかどうかを判断するのに役立つ要因。 デフォルト値 5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes タイプ: Byte String パーティションが歪んでいるかどうかを判断するのに役立つしきい値。 デフォルト値 256MB

パーティションは、 (partition size > skewedPartitionFactor * median partition size)(partition size > skewedPartitionThresholdInBytes) の両方が trueされている場合に歪んでいると見なされます。

空のリレーションを動的に検出して伝播

属性

spark.databricks.adaptive.emptyRelationPropagation.enabled タイプ: Boolean 動的な空のリレーションの伝播を有効にするか無効にするか。 デフォルト値 true

よくある質問(FAQ)

このセクションの内容:

AQE が小さな結合テーブルをブロードキャストしなかったのはなぜですか?

ブロードキャストされる予定のリレーションのサイズがこのしきい値を下回っているにもかかわらず、まだブロードキャストされていない場合:

  • 結合タイプを確認します。 ブロードキャストは、特定の結合タイプではサポートされていません (たとえば、 LEFT OUTER JOIN の左リレーションはブロードキャストできません)。
  • また、リレーションに空のパーティションが多数含まれている場合もあり、その場合、タスクの大部分はソートマージ結合で迅速に終了するか、スキュー結合処理で最適化できる可能性があります。 AQE は、空でないパーティションの割合が spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoinより小さい場合、このようなソート マージ結合をブロードキャスト ハッシュ結合に変更することを回避します。

AQEが有効になっている場合でも、ブロードキャスト参加戦略のヒントを使うべきですか?

はい。 静的に計画されたブロードキャスト結合は、AQE によって動的に計画された結合よりもパフォーマンスが高くなります。これは、結合の両側でシャッフルを実行する (実際のリレーション サイズが取得される) まで AQE がブロードキャスト結合に切り替わらない場合があるためです。 したがって、クエリをよく理解していれば、ブロードキャストヒントを使用することは依然として良い選択です。 AQE は、静的最適化と同じようにクエリ ヒントを尊重しますが、ヒントの影響を受けない動的最適化を適用できます。

スキュー結合ヒントとAQEスキュー結合最適化の違いは何ですか? どちらを使用すればよいですか?

AQE スキュー結合は完全に自動化されており、一般にヒントの対応物よりも優れたパフォーマンスを発揮するため、スキュー結合ヒントを使用するのではなく、AQE スキュー結合処理に依存することをお勧めします。

AQE が結合順序を自動的に調整しなかったのはなぜですか?

動的結合の並べ替えはAQEの一部ではありません。

なぜAQEは私のデータスキューを検出しなかったのですか?

AQE がパーティションを偏ったパーティションとして検出するために満たす必要があるサイズ条件は 2 つあります。

  • パーティション サイズが spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes より大きい (デフォルトは 256MB)
  • パーティション・サイズが、すべてのパーティションの中央値に偏ったパーティション係数 spark.sql.adaptive.skewJoin.skewedPartitionFactor (デフォルトは 5) を掛けた値より大きい

また、スキュー処理のサポートは、特定のジョイン・タイプ(たとえば、 LEFT OUTER JOINでは、左側のスキューのみを最適化できます)に限定されています。

遺産

「アダプティブ実行」という用語は Spark 1.6 から存在していましたが、Spark 3.0 の新しい AQE は根本的に異なります。 機能面では、Spark 1.6 は "パーティションを動的に合体する" 部分のみを行います。 技術的なアーキテクチャの観点から見ると、新しい AQE は、ランタイム統計に基づくクエリの動的計画と再計画のフレームワークであり、この記事で説明したようなさまざまな最適化をサポートし、より潜在的な最適化を可能にするために拡張できます。