対話型ワークフローでの大規模なクエリの処理
インタラクティブなデータワークフローの課題は、大規模なクエリの処理です。 これには、生成される出力行が多すぎるクエリ、多数の外部パーティションをフェッチするクエリ、または非常に大きなデータ セットに対するコンピュートが含まれます。 これらのクエリは、非常に遅く、コンピュート リソースを飽和させ、他のユーザーが同じコンピュートを共有するのを困難にする可能性があります。
Query Watchdogは、大規模なクエリの最も一般的な原因を調べ、しきい値を超えたクエリを終了することにより、クエリによるコンピュート リソースの独占を防ぐプロセスです。 この記事では、Query Watchdog を有効にして構成する方法について説明します。
Query Watchdog は、UIを使用して作成したすべての万能コンピュートに対して有効です。
破壊的なクエリの例
アナリストは、ジャストインタイムのデータウェアハウスでいくつかのアドホッククエリを実行しています。 アナリストは、複数のユーザーが同時に 1 つのコンピュートを簡単に使用できるように、共有のオートスケール コンピュートを使用します。 それぞれ 100 万行を持つ 2 つのテーブルがあるとします。
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_x")
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_y")
これらのテーブル サイズは、Apache Spark で管理できます。 ただし、それぞれに join_key
列が含まれ、すべての行に空の文字列が含まれています。 これは、データが完全にクリーンでない場合や、一部のキーが他のキーよりも一般的である大幅なデータスキューがある場合に発生する可能性があります。 これらの空のジョイン キーは、他のどの値よりもはるかに一般的です。
次のコードでは、アナリストはこれら 2 つのテーブルをキーで結合し、 1 兆件の結果を 出力し、これらすべてが 1 つのエグゼキューター ( " "
キーを取得するエグゼキューター) で生成されます。
SELECT
id, count(id)
FROM
(SELECT
x.id
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key)
GROUP BY id
このクエリは実行されているようです。 しかし、データについて知らなくても、アナリストはジョブの実行中に「1つのタスク」しか残っていないことに気づきます。 クエリは決して終了せず、アナリストはイライラし、なぜそれが機能しなかったのか混乱します。
この場合、問題のあるジョイン・キーは 1 つだけです。 また、もっとたくさんある場合もあります。
Query Watchdog の有効化と構成
Query Watchdogを有効にして設定するには、以下のステップが必要です。
spark.databricks.queryWatchdog.enabled
でウォッチドッグを有効にします。- タスクランタイムを
spark.databricks.queryWatchdog.minTimeSecs
で設定します。 - 出力を
spark.databricks.queryWatchdog.minOutputRows
で表示します。 - 出力比率を
spark.databricks.queryWatchdog.outputRatioThreshold
で設定します。
クエリが入力行の数に対して出力行を多く作成しないようにするには、 Query Watchdog を有効にし、出力行の最大数を入力行の数の倍数として構成できます。 この例では、1000 (デフォルト) の比率を使用します。
spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)
後者の構成では、特定のタスクが入力行数の 1000 倍を超えて生成してはならないと宣言されています。
出力比は完全にカスタマイズ可能です。 まずは低くして、自分とチームにとってどのしきい値が効果的かを確認することをお勧めします。 1,000 から 10,000 の範囲が出発点として適しています。
Query Watchdog、ユーザーが完了しないジョブのコンピュート リソースを独占するのを防ぐだけでなく、完了しなかったクエリを高速に失敗することで時間を節約できます。たとえば、次のクエリは、比率を超えているため、数分後に失敗します。
SELECT
z.id
join_key,
sum(z.id),
count(z.id)
FROM
(SELECT
x.id,
y.join_key
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key) z
GROUP BY join_key, z.id
次のように表示されます。
通常は、Query Watchdog を有効にし、出力/入力しきい値の比率を設定するだけで十分ですが、 spark.databricks.queryWatchdog.minTimeSecs
と spark.databricks.queryWatchdog.minOutputRows
の 2 つの追加プロパティを設定するオプションもあります。 これらのプロパティは、クエリ内の特定のタスクがキャンセルされるまでに実行する必要がある最小時間と、そのクエリ内のタスクの出力行の最小数を指定します。
たとえば、タスクごとに大量の行を生成する機会を与える場合は、 minTimeSecs
を高い値に設定できます。 同様に、 spark.databricks.queryWatchdog.minOutputRows
を 1,000 万に設定できるのは、そのクエリのタスクで 1,000 万行が生成された後にのみクエリを停止する場合です。 それより小さい値を超えると、出力/入力比率を超えた場合でも、クエリは成功します。
spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)
ノートブックで Query Watchdog を構成すると、コンピュートの再起動後も構成は保持されません。 コンピュートのすべてのユーザーに対して Query Watchdog を設定する場合は、 コンピュート設定を使用することをお勧めします。
非常に大きなデータセットに対するクエリの検出
別の一般的な大規模なクエリでは、大きなテーブル/データセットから大量のデータをスキャンする場合があります。 スキャン操作が長時間続き、コンピュート リソースが飽和状態になる場合があります (大きな Hive テーブルのメタデータを読み取るだけでも、かなりの時間がかかる場合があります)。 大きな Hive テーブルからフェッチするパーティションが多すぎるのを防ぐために、 maxHivePartitions
を設定できます。 同様に、非常に大きなデータセットに対するクエリを制限するように maxQueryTasks
を設定することもできます。
spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)
Query Watchdogはいつ有効にすべきですか?
Query Watchdog は、 SQL アナリストと data scientists が特定のコンピュートを共有し、管理者がクエリが互いに "うまく機能" することを確認する必要があるアドホック アナリティクス コンピュートで有効にする必要があります。
Query Watchdogはいつ無効にする必要がありますか?
一般に、ETLシナリオで使用されるクエリを熱心にキャンセルすることはお勧めしません。なぜなら、通常、ループ内にエラーを修正する人間がいないためです。 アドホック アナリティクス コンピュートを除くすべての Query Watchdog を無効にすることをお勧めします。