Handling large queries in interactive workflows
A challenge with interactive data workflows is handling large queries. This includes queries that generate too many output rows, fetch many external partitions, or compute on extremely large data sets. These queries can be extremely slow, saturate cluster resources, and make it difficult for others to share the same cluster.
Query Watchdog is a process that prevents queries from monopolizing cluster resources by examining the most common causes of large queries and terminating queries that pass a threshold. This article describes how to enable and configure Query Watchdog.
Query Watchdog is enabled for all all-purpose clusters created using the UI.
Example of a disruptive query
An analyst is performing some ad hoc queries in a just-in-time data warehouse. The analyst uses a shared autoscaling cluster that makes it easy for multiple users to use a single cluster at the same time. Suppose there are two tables that each have a million rows.
import org.apache.spark.sql.functions._ spark.conf.set("spark.sql.shuffle.partitions", 10) spark.range(1000000) .withColumn("join_key", lit(" ")) .createOrReplaceTempView("table_x") spark.range(1000000) .withColumn("join_key", lit(" ")) .createOrReplaceTempView("table_y")
These table sizes are manageable in Apache Spark. However, they each include a
join_key column with an empty string in every row. This can happen if the data is not perfectly clean or if there is significant data skew where some keys are more prevalent than others. These empty join keys are far more prevalent than any other value.
In the following code, the analyst is joining these two tables on their keys, which produces output of one trillion results, and all of these are produced on a single executor (the executor that gets the
" " key):
SELECT id, count(id) FROM (SELECT x.id FROM table_x x JOIN table_y y on x.join_key = y.join_key) GROUP BY id
This query appears to be running. But without knowing about the data, the analyst sees that there’s “only” a single task left over the course of executing the job. The query never finishes, leaving the analyst frustrated and confused about why it did not work.
In this case there is only one problematic join key. Other times there may be many more.
Enable and configure Query Watchdog
To enable and configure Query Watchdog, the following steps are required.
Enable Watchdog with
Configure the task runtime with
Display output with
Configure the output ratio with
To a prevent a query from creating too many output rows for the number of input rows, you can enable Query Watchdog and configure the maximum number of output rows as a multiple of the number of input rows. In this example we use a ratio of 1000 (the default).
spark.conf.set("spark.databricks.queryWatchdog.enabled", true) spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)
The latter configuration declares that any given task should never produce more than 1000 times the number of input rows.
The output ratio is completely customizable. We recommend starting lower and seeing what threshold works well for you and your team. A range of 1,000 to 10,000 is a good starting point.
Not only does Query Watchdog prevent users from monopolizing cluster resources for jobs that will never complete, it also saves time by fast-failing a query that would have never completed. For example, the following query will fail after several minutes because it exceeds the ratio.
SELECT z.id join_key, sum(z.id), count(z.id) FROM (SELECT x.id, y.join_key FROM table_x x JOIN table_y y on x.join_key = y.join_key) z GROUP BY join_key, z.id
Here’s what you would see:
It’s usually enough to enable Query Watchdog and set the output/input threshold ratio, but you also have the option to set two additional properties:
spark.databricks.queryWatchdog.minOutputRows. These properties specify the minimum time a given task in a query must run before cancelling it and the minimum number of output rows for a task in that query.
For example, you can set
minTimeSecs to a higher value if you want to give it a chance to produce a large number of rows per task. Likewise, you can set
spark.databricks.queryWatchdog.minOutputRows to ten million if you want to stop a query only after a task in that query has produced ten million rows. Anything less and the query succeeds, even if the output/input ratio was exceeded.
spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L) spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)
If you configure Query Watchdog in a notebook, the configuration does not persist across cluster restarts. If you want to configure Query Watchdog for all users of a cluster, we recommend that you use a cluster configuration.
Detect query on extremely large dataset
Another typical large query may scan a large amount of data from big tables/datasets. The scan operation may last for a long time and saturate cluster resources (even reading metadata of a big Hive table can take a significant amount of time). You can set
maxHivePartitions to prevent fetching too many partitions from a big Hive table. Similarly, you can also set
maxQueryTasks to limit queries on an extremely large dataset.
spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000) spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)
When should you enable Query Watchdog?
Query Watchdog should be enabled for ad hoc analytics clusters where SQL analysts and data scientists are sharing a given cluster and an administrator needs to make sure that queries “play nicely” with one another.