Adaptive query execution

Adaptive query execution (AQE) is query re-optimization that occurs during query execution.

The motivation for runtime re-optimization is that Databricks has the most up-to-date accurate statistics at the end of a shuffle and broadcast exchange (referred to as a query stage in AQE). As a result, Databricks can opt for a better physical strategy, pick an optimal post-shuffle partition size and number, or do optimizations that used to require hints, for example, skew join handling.

This can be very useful when statistics collection is not turned on or when statistics are stale. It is also useful in places where statically derived statistics are inaccurate, such as in the middle of a complicated query, or after the occurrence of data skew.

Capabilities

In Databricks Runtime 7.3 LTS, AQE is enabled by default. It has 4 major features:

  • Dynamically changes sort merge join into broadcast hash join.
  • Dynamically coalesces partitions (combine small partitions into reasonably sized partitions) after shuffle exchange. Very small tasks have worse I/O throughput and tend to suffer more from scheduling overhead and task setup overhead. Combining small tasks saves resources and improves cluster throughput.
  • Dynamically handles skew in sort merge join and shuffle hash join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks.
  • Dynamically detects and propagates empty relations.

Application

AQE applies to all queries that are:

  • Non-streaming
  • Contain at least one exchange (usually when there’s a join, aggregate, or window), one sub-query, or both.

Not all AQE-applied queries are necessarily re-optimized. The re-optimization might or might not come up with a different query plan than the one statically compiled. Refer to the next section regarding how to determine if a query’s plan has been changed by AQE.

Query plans

This section discusses how you can examine query plans in different ways.

Spark UI

AdaptiveSparkPlan node

AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query. Before the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false; after the query execution completes, the isFinalPlan flag changes to true.

Evolving plan

The query plan diagram evolves as the execution progresses and reflects the most current plan that is being executed. Nodes that have already been executed (in which metrics are available) will not change, but those that haven’t can change over time as the result of re-optimizations.

The following is a query plan diagram example:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan node

AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query. Before the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false; after the query execution completes, the isFinalPlan flag changes to true.

Current and initial plan

Under each AdaptiveSparkPlan node there will be both the initial plan (the plan before applying any AQE optimizations) and the current or the final plan, depending on whether the execution has completed. The current plan will evolve as the execution progresses.

Runtime statistics

Each shuffle and broadcast stage contains data statistics.

Before the stage runs or when the stage is running, the statistics are compile-time estimates, and the flag isRuntime is false, for example: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

After the stage execution completes, the statistics are those collected at runtime, and the flag isRuntime will become true, for example: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

The following is a DataFrame.explain example:

  • Before the execution

    Before execution
  • During the execution

    During execution
  • After the execution

    After execution

SQL EXPLAIN

AdaptiveSparkPlan node

AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query.

No current plan

As SQL EXPLAIN does not execute the query, the current plan is always the same as the initial plan and does not reflect what would eventually get executed by AQE.

The following is a SQL explain example:

SQL explain

Effectiveness

The query plan will change if one or more AQE optimizations take effect. The effect of these AQE optimizations is demonstrated by the difference between the current and final plans and the initial plan and specific plan nodes in the current and final plans.

  • Dynamically change sort merge join into broadcast hash join: different physical join nodes between the current/final plan and the initial plan

    Join strategy string
  • Dynamically coalesce partitions: node CustomShuffleReader with property Coalesced

    Custom shuffle reader
    Custom shuffle reader string
  • Dynamically handle skew join: node SortMergeJoin with field isSkew as true.

    Skew join plan
    Skew join string
  • Dynamically detect and propagate empty relations: part of (or entire) the plan is replaced by node LocalTableScan with the relation field as empty.

    Local table scan
    Local table scan string

Configuration

In the properties in this section, replace <prefix> with spark.sql.adaptive, and <db_prefix> with spark.databricks.adaptive.

Enable and disable adaptive query execution

Property Default Description
spark.databricks.optimizer.adaptive.enabled true Whether to enable or disable adaptive query execution.

Dynamically change sort merge join into broadcast hash join

Property Default Description
<db_prefix>.autoBroadcastJoinThreshold 30MB The threshold to trigger switching to broadcast join at runtime.

Dynamically coalesce partitions

Property Default Description
<prefix>.coalescePartitions.enabled true Whether to enable or disable partition coalescing.
<prefix>.advisoryPartitionSizeInBytes 64MB The target size after coalescing. The coalesced partition sizes will be close to but no bigger than this target size.
<prefix>.coalescePartitions.minPartitionSize 1MB The minimum size of partitions after coalescing. The coalesced partition sizes will be no smaller than this size.
<prefix>.coalescePartitions.minPartitionNum 2x no. of cluster cores The minimum number of partitions after coalescing. Not recommended, because setting explicitly overrides <prefix>.coalescePartitions.minPartitionSize.

Dynamically handle skew join

Property Default Description
<prefix>.skewJoin.enabled true Set true/false to enable/disable skew join handling.
<prefix>.skewJoin.skewedPartitionFactor 5 A factor that when multiplied by the median partition size contributes to determining whether a partition is skewed.
<prefix>.skewJoin.skewedPartitionThresholdInBytes 256MB A threshold that contributes to determining whether a partition is skewed.

A partition is considered skewed when both (partition size > skewedPartitionFactor * median partition size) and (partition size > skewedPartitionThresholdInBytes) are true.

Dynamically detect and propagate empty relations

Property Default Description
<db_prefix>.emptyRelationPropagation.enabled true Whether to enable or disable dynamic empty relation propagation.

Frequently asked questions (FAQs)

Why didn’t AQE change the shuffle partition number despite the partition coalescing already being enabled?

AQE does not change the initial partition number. It is recommended that you set a reasonably high value for the shuffle partition number and let AQE coalesce small partitions based on the output data size at each stage of the query.

If you see spilling in your jobs, you can try:

  • Increasing the shuffle partition number config: spark.sql.shuffle.partitions
  • Enabling auto optimized shuffle by setting <db_prefix>.autoOptimizeShuffle.enabled to true

Why didn’t AQE broadcast a small join table?

If the size of the relation expected to be broadcast does fall under this threshold but is still not broadcast:

  • Check the join type. Broadcast is not supported for certain join types, for example, the left relation of a LEFT OUTER JOIN cannot be broadcast.
  • It can also be that the relation contains a lot of empty partitions, in which case the majority of the tasks can finish quickly with sort merge join or it can potentially be optimized with skew join handling. AQE avoids changing such sort merge joins to broadcast hash joins if the percentage of non-empty partitions is lower than <prefix>.nonEmptyPartitionRatioForBroadcastJoin.

Should I still use a broadcast join strategy hint with AQE enabled?

Yes. A statically planned broadcast join is usually more performant than a dynamically planned one by AQE as AQE might not switch to broadcast join until after performing shuffle for both sides of the join (by which time the actual relation sizes are obtained). So using a broadcast hint can still be a good choice if you know your query well. AQE will respect query hints the same way as static optimization does, but can still apply dynamic optimizations that are not affected by the hints.

What is the difference between skew join hint and AQE skew join optimization? Which one should I use?

It is recommended to rely on AQE skew join handling rather than use the skew join hint, because AQE skew join is completely automatic and in general performs better than the hint counterpart.

Why didn’t AQE adjust my join ordering automatically?

Dynamic join reordering is not part of AQE as of Databricks Runtime 7.3 LTS.

Why didn’t AQE detect my data skew?

There are two size conditions that must be satisfied for AQE to detect a partition as a skewed partition:

  • The partition size is larger than the <prefix>.skewJoin.skewedPartitionThresholdInBytes (default 256MB)
  • The partition size is larger than the median size of all partitions times the skewed partition factor <prefix>.skewJoin.skewedPartitionFactor (default 5)

In addition, skew handling support is limited for certain join types, for example, in LEFT OUTER JOIN, only skew on the left side can be optimized.

Legacy

The term “Adaptive Execution” has existed since Spark 1.6, but the new AQE in Spark 3.0 is fundamentally different. In terms of functionality, Spark 1.6 does only the “dynamically coalesce partitions” part. In terms of technical architecture, the new AQE is a framework of dynamic planning and replanning of queries based on runtime stats, which supports a variety of optimizations such as the ones we have described in this article and can be extended to enable more potential optimizations.