Handling large queries in interactive workflows
A challenge with interactive data workflows is handling large queries. This includes queries that generate too many output rows, fetch many external partitions, or compute on extremely large data sets. These queries can be extremely slow, saturate compute resources, and make it difficult for others to share the same compute.
Query Watchdog is a process that prevents queries from monopolizing compute resources by examining the most common causes of large queries and terminating queries that pass a threshold. This article describes how to enable and configure Query Watchdog.
Important
Query Watchdog is enabled for all all-purpose computes created using the UI.
Example of a disruptive query
An analyst is performing some ad hoc queries in a just-in-time data warehouse. The analyst uses a shared autoscaling compute that makes it easy for multiple users to use a single compute at the same time. Suppose there are two tables that each have a million rows.
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_x")
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_y")
These table sizes are manageable in Apache Spark. However, they each include a join_key
column with an empty string in every row. This can happen if the data is not perfectly clean or if there is significant data skew where some keys are more prevalent than others. These empty join keys are far more prevalent than any other value.
In the following code, the analyst is joining these two tables on their keys, which produces output of one trillion results, and all of these are produced on a single executor (the executor that gets the " "
key):
SELECT
id, count(id)
FROM
(SELECT
x.id
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key)
GROUP BY id
This query appears to be running. But without knowing about the data, the analyst sees that there’s “only” a single task left over the course of executing the job. The query never finishes, leaving the analyst frustrated and confused about why it did not work.
In this case there is only one problematic join key. Other times there may be many more.
Enable and configure Query Watchdog
To enable and configure Query Watchdog, the following steps are required.
Enable Watchdog with
spark.databricks.queryWatchdog.enabled
.Configure the task runtime with
spark.databricks.queryWatchdog.minTimeSecs
.Display output with
spark.databricks.queryWatchdog.minOutputRows
.Configure the output ratio with
spark.databricks.queryWatchdog.outputRatioThreshold
.
To a prevent a query from creating too many output rows for the number of input rows, you can enable Query Watchdog and configure the maximum number of output rows as a multiple of the number of input rows. In this example we use a ratio of 1000 (the default).
spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)
The latter configuration declares that any given task should never produce more than 1000 times the number of input rows.
Tip
The output ratio is completely customizable. We recommend starting lower and seeing what threshold works well for you and your team. A range of 1,000 to 10,000 is a good starting point.
Not only does Query Watchdog prevent users from monopolizing compute resources for jobs that will never complete, it also saves time by fast-failing a query that would have never completed. For example, the following query will fail after several minutes because it exceeds the ratio.
SELECT
z.id
join_key,
sum(z.id),
count(z.id)
FROM
(SELECT
x.id,
y.join_key
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key) z
GROUP BY join_key, z.id
Here’s what you would see:
It’s usually enough to enable Query Watchdog and set the output/input threshold ratio, but you also have the option to set two additional properties: spark.databricks.queryWatchdog.minTimeSecs
and spark.databricks.queryWatchdog.minOutputRows
. These properties specify the minimum time a given task in a query must run before cancelling it and the minimum number of output rows for a task in that query.
For example, you can set minTimeSecs
to a higher value if you want to give it a chance to produce a large number of rows per task. Likewise, you can set spark.databricks.queryWatchdog.minOutputRows
to ten million if you want to stop a query only after a task in that query has produced ten million rows. Anything less and the query succeeds, even if the output/input ratio was exceeded.
spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)
Tip
If you configure Query Watchdog in a notebook, the configuration does not persist across compute restarts. If you want to configure Query Watchdog for all users of a compute, we recommend that you use a compute configuration.
Detect query on extremely large dataset
Another typical large query may scan a large amount of data from big tables/datasets. The scan operation may last for a long time and saturate compute resources (even reading metadata of a big Hive table can take a significant amount of time). You can set maxHivePartitions
to prevent fetching too many partitions from a big Hive table. Similarly, you can also set maxQueryTasks
to limit queries on an extremely large dataset.
spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)