Query Watchdog

New in version 2.1.0-db3.

See Databricks Runtime Release Notes for more information.

Handling Disruptive Queries in Spark SQL

One challenge with any interactive data workflow is handling large queries that generate too many output rows. These queries can be extremely slow, saturate cluster resources, and make it difficult for others to share the same cluster.

Let’s take a look at an example, a SQL analyst in my organization is just getting ramped up on Databricks and the data we have in our organization. She’s performing some ad hoc queries to get familiar with the data layout in our just-in-time data warehouse. We work with a shared autoscaling cluster on Databricks that makes it easy for multiple users to use a single cluster at the same time. In the process of this ad hoc querying, let’s imagine that we have two tables that each have a million rows.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

These tables are totally manageable in Apache Spark. However, you’ll notice that they each include a join key of an empty string in every row. These empty join keys are far more prevalent than any other value in our table. Our analyst, getting started and trying to understand the data, is joining these two tables on their keys. She doesn’t quite realize that this will produce output of one trillion results and all of those will be produced by processing node (the executor that gets the ” ” key). In this case there is only one problematic join key, other times there may be many more.

This is not only far more than she is hoping to use but also it’s going to make it hard for anyone else to use this cluster.

%sql

SELECT
  id, count()
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id
-- you'll want to cancel this query.

This query would cause problem regardless of the system that you are using. Luckily, we have Databricks and I, as an administrator, can set properties on the clusters used by our analysts to avoid this problem. The two properties I will set with (1) turn on the Query Watchdog and the other one will set an output ratio. What this property controls is given x input rows to a given Spark task, kill queries where the task result count will result in more than 1,000x the number of input rows. We can change (at runtime) this according to how strictly we’d like to control the kinds of queries our cluster will tolerate (as we do in the code snippet below).

Tip

The output ratio is completely customizable. We recommend starting lower and seeing what threshold works well for you and your team. The range of 1,000 to 10,000 is likely a good starting point.

val maximumOutputRowRatio = 1000L

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", maximumOutputRowRatio)
%sql

SELECT
  join_key,
  sum(x.id),
  count()
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY join_key

This relieves me, as an administrator, from having to constantly monitor usage on shared clusters. This watchdog does that for me by allowing me to specify a certain threshold for these queries. This also prevents users from running queries that may never complete because of poor data skew.

These two properties are often enough for most use cases, but in case we’d like to control things further, I can set another two properties. These parameters specify the minimum time a given task in a query must run before cancelling it. We can set this to a higher value if we’d like to give it a chance to still produce a large amount of rows (per task). The second parameter allows me to set a minimum number of output rows for a task in that query. For instance, we can set this to ten million if we want to stop a query only after a task in that query has produced ten million rows. Anything less and the query would still succeed (even if the ratio was exceeded).

spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)
spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)

When is this a good choice?

This is a great choice for ad hoc analytics clusters where SQL analysts and data scientists are sharing a given cluster and an administrator needs to make sure that queries “play nicely” with one another. If you set this value in a notebook, it is important to know that it will not persist across restarts. We recommend setting as a cluster configuration, at cluster launch time, if you would like to set this property for all users of a given cluster.

When is this a bad choice?

While this configuration can be helpful for production ETL workloads although it likely depends on your use case. In general we do not advise eagerly cancelling queries used in an ETL scenario because there typically isn’t a human in the loop to correct the error. We recommend avoiding the use of this tool outside ad hoc analytics clusters.