AQE Demo(SQL)

Loading...

Adaptive Query Execution Demo

Adaptive Query Execution (AQE) is query re-optimization that occurs during query execution based on runtime statistics. AQE in Spark 3.0 includes 3 main features:

  • Dynamically coalescing shuffle partitions
  • Dynamically switching join strategies
  • Dynamically optimizing skew joins

Enable AQE

set spark.sql.adaptive.enabled = true;
 
key
value
1
spark.sql.adaptive.enabled
true

Showing all 1 rows.

-- For demo purpose only.
-- Not necesary in real-life usage.
 
set spark.sql.adaptive.coalescePartitions.minPartitionNum = 1;
 
key
value
1
spark.sql.adaptive.coalescePartitions.minPartitionNum
1

Showing all 1 rows.

Create Tables

%scala
dbutils.fs.rm("dbfs:/user/hive/warehouse/aqe_demo_db", true)
res1: Boolean = false
CREATE DATABASE IF NOT EXISTS aqe_demo_db;
USE aqe_demo_db;
 
DROP TABLE IF EXISTS items;
DROP TABLE IF EXISTS sales;
 
-- Create "items" table.
 
CREATE TABLE items
USING parquet
AS
SELECT id AS i_item_id,
CAST(rand() * 1000 AS INT) AS i_price
FROM RANGE(30000000);
 
-- Create "sales" table with skew.
-- Item with id 100 is in 80% of all sales.
 
CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id,
CAST(rand() * 100 AS INT) AS s_quantity,
DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date
FROM RANGE(1000000000);
OK

Dynamically Coalesce Shuffle Partitions

-- Get the sums of sales quantity grouped by sales date.
-- The grouped result is very small.
 
SELECT s_date, sum(s_quantity) AS q
FROM sales
GROUP BY s_date
ORDER BY q DESC;
 
s_date
q
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2019-06-17
137763926
2019-07-27
137720126
2019-12-21
137713157
2020-03-21
137704890
2020-04-03
137698938
2019-11-30
137692178
2020-03-22
137686312
2019-11-07
137683601
2019-09-20
137681751
2019-11-06
137680960
2019-11-02
137672344
2019-11-20
137664262
2019-06-09
137662731
2019-10-09
137662549
2019-07-12
137660212
2020-05-05
137658579
2020-02-23
137655801

Showing all 360 rows.

  • The partition sizes after aggregation are very small: 13KB on average, 431KB in total (see the highlighted box shuffle bytes written).
  • AQE combines these small partitions into one new partition (see the highlighted box CustomShuffleReader).

screenshot_coalesce

Dynamically Switch Join Strategies

-- Get total sales amount grouped by sales date for items with a price lower than 10.
-- The selectivity of the filter by price is not known in static planning, so the initial plan opts for sort merge join.
-- But in fact, the "items" table after filtering is very small, so the query can do a broadcast hash join instead.
 
-- Static explain shows the initial plan with sort merge join.
 
EXPLAIN FORMATTED
SELECT s_date, sum(s_quantity * i_price) AS total_sales
FROM sales
JOIN items ON s_item_id = i_item_id
WHERE i_price < 10
GROUP BY s_date
ORDER BY total_sales DESC;
 
plan
1
== Physical Plan == AdaptiveSparkPlan (18) +- Sort (17) +- Exchange (16) +- HashAggregate (15) +- Exchange (14) +- HashAggregate (13) +- Project (12) +- SortMergeJoin Inner (11) :- Sort (5) : +- Exchange (4) : +- Project (3) : +- Filter (2) : +- Scan parquet aqe_demo_db.sales (1) +- Sort ...

Showing all 1 rows.

-- The runtime join stategy is changed to broadcast hash join.
 
SELECT s_date, sum(s_quantity * i_price) AS total_sales
FROM sales
JOIN items ON s_item_id = i_item_id
WHERE i_price < 10
GROUP BY s_date
ORDER BY total_sales DESC;
 
s_date
total_sales
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2020-01-15
1310664
2019-11-06
1307607
2019-09-04
1298943
2019-08-02
1291183
2019-08-12
1289616
2019-11-03
1285164
2019-12-28
1284868
2020-04-14
1284502
2019-06-27
1283611
2019-06-19
1282992
2019-05-14
1282055
2019-06-23
1280800
2019-10-18
1278619
2020-04-30
1278220
2019-11-17
1278199
2019-08-23
1278159
2019-09-27
1277851

Showing all 360 rows.

  • The data size of the "items" table after filtering is very small 6.9 MB (see the highlighted box data size).
  • AQE changes the sort merge join to broadcast hash join at runtime (see the highlighted box BroadcastHashJoin).

screenshot_strategy

Dynamically Optimize Skew Join