インタラクティブなワークフローで大規模なクエリを処理する

インタラクティブなデータ ワークフローの課題は、大規模なクエリの処理です。 これには、多すぎる出力行を生成するクエリ、多数の外部パーティションをフェッチするクエリ、または非常に大規模なデータ セットのコンピュートが含まれます。 これらのクエリは非常に遅く、コンピュートのリソースを飽和させ、他の人が同じコンピュートを共有するのを困難にする可能性があります。

Query Watchdogは、大規模なクエリの最も一般的な原因を調べ、しきい値を超えたクエリを終了することにより、クエリによるコンピュート リソースの独占を防ぐプロセスです。 この記事では、Query Watchdog を有効にして構成する方法について説明します。

重要

Query Watchdog は、UI を使用して作成されたすべての汎用コンピュートに対して有効です。

破壊的なクエリの例

アナリストは、ジャストインタイム データウェアハウスでいくつかのアドホック クエリを実行しています。 アナリストは、複数のユーザーが 1 つのコンピュートを同時に簡単に使用できるようにする共有オートスケール コンピュートを使用します。 それぞれに 100 万行のテーブルが 2 つあるとします。

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

これらのテーブル サイズは Apache Spark で管理できます。 ただし、各行には空の文字列を含むjoin_key列が含まれています。 これは、データが完全にクリーンでない場合や、一部のキーが他のキーよりも普及しているような重大なデータスキューがある場合に発生する可能性があります。 これらの空の結合キーは、他のどの値よりもはるかに一般的です。

次のコードでは、アナリストはこれら 2 つのテーブルをキーで結合し、 1 兆の結果の出力を生成します。これらはすべて 1 つのエグゼキューター ( " "キーを取得するエグゼキューター) で生成されます。

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

このクエリは実行中です。 しかし、データについての知識がなければ、アナリストはジョブの実行中にタスクが 1 つ「だけ」残っていることがわかります。 クエリは決して終了しないため、アナリストはなぜ機能しなかったのかイライラし、混乱することになります。

この場合、問題のある結合キーは 1 つだけです。 また、もっと多くの場合もあるかもしれません。

Query Watchdogの有効化と設定

Query Watchdog を有効にして設定するには、次のステップが必要です。

  • ウォッチドッグを spark.databricks.queryWatchdog.enabledで有効にします。

  • spark.databricks.queryWatchdog.minTimeSecsを使用してタスク ランタイムを構成します。

  • 出力を spark.databricks.queryWatchdog.minOutputRowsで表示します。

  • spark.databricks.queryWatchdog.outputRatioThresholdで出力比を設定します。

クエリが入力行数に対して作成する出力行が多すぎないようにするには、クエリウォッチドッグを有効にし、出力行の最大数を入力行数の倍数として構成します。 この例では、比率 1000 (デフォルト) を使用します。

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

後者の構成は、特定のタスクが 1000 倍を超える入力行を生成しないことを宣言します。

ヒント

出力比は完全にカスタマイズ可能です。 低い位置から始めて、自分とチームにとってどのしきい値が効果的かを確認することをお勧めします。 1,000 から 10,000 の範囲が適切な出発点です。

クエリ ウォッチドッグは、ユーザーが完了しないジョブのコンピュート リソースを独占するのを防ぐだけでなく、完了しないはずのクエリを高速に失敗させることで時間を節約します。 たとえば、次のクエリは比率を超えているため、数分後に失敗します。

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

表示される内容は次のとおりです。

クエリウォッチドッグ

通常は、Query Watchdog を有効にし、出力/入力しきい値の比率を設定するだけで十分ですが、 spark.databricks.queryWatchdog.minTimeSecsspark.databricks.queryWatchdog.minOutputRowsの 2 つの追加プロパティを設定するオプションもあります。 これらのプロパティは、クエリ内の特定のタスクをキャンセルするまでに実行する必要がある最小時間と、そのクエリ内のタスクの最小出力行数を指定します。

たとえば、タスクごとに大量の行を生成する機会を与えたい場合は、 minTimeSecsをより高い値に設定できます。 同様に、クエリ内のタスクが 1,000 万行を生成した後でのみクエリを停止する場合は、 spark.databricks.queryWatchdog.minOutputRowsを 1,000 万に設定できます。 それより小さい場合、出力/入力比を超えた場合でも、クエリは成功します。

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

ヒント

ノートブックで Query Watchdog を構成した場合、その構成はコンピュートの再起動後は保持されません。 コンピュートのすべてのユーザーに対して Query Watchdog を構成する場合は、 コンピュート構成を使用することをお勧めします。

非常に大規模なデータセットに対するクエリを検出する

別の典型的な大規模クエリでは、大きなテーブル/データセットから大量のデータをスキャンする場合があります。 スキャン操作は長時間継続し、コンピュート リソースを飽和させる可能性があります (大きな Hive テーブルのメタデータを読み取るだけでも、かなりの時間がかかる場合があります)。 maxHivePartitionsを設定すると、大きな Hive テーブルから大量のパーティションが取得されないようにすることができます。 同様に、 maxQueryTasksを設定して、非常に大規模なデータセットに対するクエリを制限することもできます。

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Query Watchdogはいつ有効にするべきか

クエリ ウォッチドッグは、SQL アナリストとデータ サイエンティストが特定のコンピュートを共有するアドホック アナリティクス コンピュートに対して有効にする必要があり、管理者はクエリが相互に「適切に動作する」ことを確認する必要があります。

Query Watchdogはいつ無効にするべきか

一般に、ETL シナリオで使用されるクエリを積極的にキャンセルすることはお勧めしません。通常、エラーを修正するループに人間が関与していないためです。 アドホック アナリティクス コンピュートを除くすべてのクエリ ウォッチドッグを無効にすることをお勧めします。