アダプティブ クエリ実行
アダプティブ クエリ実行 (AQE) は、クエリの実行中に発生するクエリの再最適化です。
ランタイムの再最適化の動機は、Databricks がシャッフルおよびブロードキャスト交換 (AQE ではクエリステージと呼ばれます) の終了時に最新の正確な統計を持っていることです。 その結果、Databricks は、より優れた物理戦略を選択したり、最適なシャッフル後のパーティション サイズと数を選択したり、スキュー結合処理など、ヒントが必要だった最適化を行ったりすることができます。
これは、統計収集が有効になっていない場合や、統計が古い場合に非常に便利です。 また、複雑なクエリの最中やデータスキューが発生した後など、静的に導き出された統計が不正確な場所でも役に立ちます。
資格
AQE はデフォルトで有効になっています。 これには 4 つの主要な機能があります。
- ソート・マージ結合をブロードキャスト・ハッシュ結合に動的に変更します。
- シャッフル交換後にパーティションを動的に合体します (小さなパーティションを適度なサイズのパーティションに結合します)。 非常に小さなタスクは I/O スループットが悪くなり、スケジュールのオーバーヘッドとタスク設定のオーバーヘッドに悩まされる傾向があります。 小さなタスクを組み合わせると、リソースが節約され、クラスターのスループットが向上します。
- ソート、マージ結合、シャッフルハッシュ結合でスキューを動的に処理し、スキューされたタスクをほぼ均等なサイズのタスクに分割(および必要に応じてレプリケート)します。
- 空のリレーションを動的に検出して伝播します。
アプリケーション
AQE は、次のすべてのクエリに適用されます。
- 非ストリーミング
- 少なくとも 1 つのエクスチェンジ (通常は、結合、集計、またはウィンドウがある場合)、1 つのサブクエリ、またはその両方が含まれます。
AQE が適用されたすべてのクエリが必ずしも再最適化されるわけではありません。 再最適化では、静的にコンパイルされたクエリ プランとは異なるクエリ プランが作成される場合とされない場合があります。 クエリのプランが AQE によって変更されたかどうかを確認するには、次の「 クエリ プラン」を参照してください。
クエリ プラン
このセクションでは、クエリ プランをさまざまな方法で調べる方法について説明します。
このセクションの内容:
Spark UI
AdaptiveSparkPlan
ノード
AQE 適用クエリには、通常は各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan
ノードが含まれます。クエリが実行される前または実行中に、対応するAdaptiveSparkPlan
ノードのisFinalPlan
フラグは次のように表示されますfalse
。クエリの実行が完了すると、isFinalPlan
フラグが true.
進化する計画
クエリ プラン図は、実行の進行に伴って進化し、実行中の最新のプランを反映します。 すでに実行されているノード(メトリクスが利用可能なノード)は変更されませんが、実行されていないノードは、再最適化の結果として時間の経過とともに変化する可能性があります。
クエリ プラン図の例を次に示します。
DataFrame.explain()
AdaptiveSparkPlan
ノード
AQE 適用クエリには、通常は各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan
ノードが含まれます。 クエリが実行される前または実行中に、対応するAdaptiveSparkPlan
ノードのisFinalPlan
フラグは次のように表示されますfalse
。クエリの実行が完了すると、isFinalPlan
フラグが true
に変わります。
現在の計画と初期計画
各 AdaptiveSparkPlan
ノードの下には、実行が完了したかどうかに応じて、初期計画(AQE最適化を適用する前の計画)と現在または最終計画の両方があります。 現在の計画は、実行が進むにつれて進化します。
Runtime 統計
各シャッフル・ステージとブロードキャスト・ステージには、データ統計が含まれています。
ステージの実行前またはステージの実行中は、統計はコンパイル時の推定値であり、フラグ isRuntime
は false
です (例: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
ステージの実行が完了すると、統計はランタイムで収集された統計になり、フラグ isRuntime
は true
になります。たとえば、次のようになります。 Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
次に、 DataFrame.explain
例を示します。
-
実行前
-
実行中
-
実行後
SQL EXPLAIN
AdaptiveSparkPlan
ノード
AQE 適用クエリには、通常は各メイン クエリまたはサブクエリのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。
現在の計画はありません
SQL EXPLAIN
はクエリを実行しないので、現在の計画は常に当初の計画と同じで、AQEが最終的に実行するものを反映していません。
次に、SQL の説明例を示します。
有効性
クエリ プランは、1 つ以上の AQE 最適化が有効になると変更されます。 これらの AQE 最適化の効果は、現在のプランと最終プラン、および現在のプランと最終プランの初期プランと特定のプラン ノードの違いによって示されます。
-
ソートマージ結合をブロードキャストハッシュ結合に動的に変更する:現在/最終プランと初期プランの間で異なる物理結合ノード
-
パーティションの動的合体: プロパティを持つノード
CustomShuffleReader
Coalesced
-
スキュー結合を動的に処理します: フィールド
isSkew
が true のノードSortMergeJoin
。 -
空のリレーションを動的に検出して伝播します。プランの一部(または全体)は、リレーションフィールドが空のノードlocalTableScanに置き換えられます。
構成
このセクションの内容:
- アダプティブ クエリ実行の有効化と無効化
- 自動最適化シャッフルを有効にする
- ソートマージ結合をブロードキャストハッシュ結合に動的に変更
- パーティションを動的に結合します
- スキュー結合を動的に処理する
- 空のリレーションを動的に検出して伝播します
アダプティブ クエリ実行の有効化と無効化
属性 |
---|
spark.databricks.optimizer.adaptive.enabled タイプ: |
自動最適化シャッフルを有効にする
属性 |
---|
spark.sql.shuffle.partitions タイプ: |
ソートマージ結合をブロードキャストハッシュ結合に動的に変更
属性 |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold タイプ: |
パーティションの動的合体
属性 |
---|
spark.sql.adaptive.coalescePartitions.enabled タイプ: |
spark.sql.adaptive.advisoryPartitionSizeInBytes タイプ: |
spark.sql.adaptive.coalescePartitions.minPartitionSize タイプ: |
spark.sql.adaptive.coalescePartitions.minPartitionNum タイプ: |
スキュー結合を動的に処理する
属性 |
---|
spark.sql.adaptive.skewJoin.enabled タイプ: |
spark.sql.adaptive.skewJoin.skewedPartitionFactor タイプ: |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes タイプ: |
パーティションは、 (partition size > skewedPartitionFactor * median partition size)
と (partition size > skewedPartitionThresholdInBytes)
の両方が true
されている場合に歪んでいると見なされます。
空のリレーションを動的に検出して伝播
属性 |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled タイプ: |
よくある質問(FAQ)
このセクションの内容:
- AQE が小さな結合テーブルをブロードキャストしなかったのはなぜですか?
- AQE を有効にしたブロードキャスト参加戦略のヒントを引き続き使用する必要がありますか?
- スキュー・ジョイン・ヒントとAQEスキュー・ジョイン最適化の違いは何ですか? どれを使うべきですか?
- AQE が結合順序を自動的に調整しなかったのはなぜですか?
- なぜAQEは私のデータスキューを検出しなかったのですか?
AQE が小さな結合テーブルをブロードキャストしなかったのはなぜですか?
ブロードキャストされる予定のリレーションのサイズがこのしきい値を下回っているにもかかわらず、まだブロードキャストされていない場合:
- 結合タイプを確認します。 ブロードキャストは、特定の結合タイプではサポートされていません (たとえば、
LEFT OUTER JOIN
の左リレーションはブロードキャストできません)。 - また、リレーションに空のパーティションが多数含まれている場合もあり、その場合、タスクの大部分はソートマージ結合で迅速に終了するか、スキュー結合処理で最適化できる可能性があります。 AQE は、空でないパーティションの割合が
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
より小さい場合、このようなソート マージ結合をブロードキャスト ハッシュ結合に変更することを回避します。
AQEが有効になっている場合でも、ブロードキャスト参加戦略のヒントを使うべきですか?
はい。 静的に計画されたブロードキャスト結合は、AQE によって動的に計画された結合よりもパフォーマンスが高くなります。これは、結合の両側でシャッフルを実行する (実際のリレーション サイズが取得される) まで AQE がブロードキャスト結合に切り替わらない場合があるためです。 したがって、クエリをよく理解していれば、ブロードキャストヒントを使用することは依然として良い選択です。 AQE は、静的最適化と同じようにクエリ ヒントを尊重しますが、ヒントの影響を受けない動的最適化を適用できます。
スキュー結合ヒントとAQEスキュー結合最適化の違いは何ですか? どちらを使用すればよいですか?
AQE スキュー結合は完全に自動化されており、一般にヒントの対応物よりも優れたパフォーマンスを発揮するため、スキュー結合ヒントを使用するのではなく、AQE スキュー結合処理に依存することをお勧めします。
AQE が結合順序を自動的に調整しなかったのはなぜですか?
動的結合の並べ替えはAQEの一部ではありません。
なぜAQEは私のデータスキューを検出しなかったのですか?
AQE がパーティションを偏ったパーティションとして検出するために満たす必要があるサイズ条件は 2 つあります。
- パーティション サイズが
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
より大きい (デフォルトは 256MB) - パーティション・サイズが、すべてのパーティションの中央値に偏ったパーティション係数
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(デフォルトは 5) を掛けた値より大きい
また、スキュー処理のサポートは、特定のジョイン・タイプ(たとえば、 LEFT OUTER JOIN
では、左側のスキューのみを最適化できます)に限定されています。
遺産
「アダプティブ実行」という用語は Spark 1.6 から存在していましたが、Spark 3.0 の新しい AQE は根本的に異なります。 機能面では、Spark 1.6 は "パーティションを動的に合体する" 部分のみを行います。 技術的なアーキテクチャの観点から見ると、新しい AQE は、ランタイム統計に基づくクエリの動的計画と再計画のフレームワークであり、この記事で説明したようなさまざまな最適化をサポートし、より潜在的な最適化を可能にするために拡張できます。