CREATE DATABASE IF NOT EXISTS aqe_demo_db;
USE aqe_demo_db;
DROP TABLE IF EXISTS items;
DROP TABLE IF EXISTS sales;
-- Create "items" table.
CREATE TABLE items
USING parquet
AS
SELECT id AS i_item_id,
CAST(rand() * 1000 AS INT) AS i_price
FROM RANGE(30000000);
-- Create "sales" table with skew.
-- Item with id 100 is in 80% of all sales.
CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id,
CAST(rand() * 100 AS INT) AS s_quantity,
DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date
FROM RANGE(1000000000);
OK
-- Get total sales amount grouped by sales date for items with a price lower than 10.
-- The selectivity of the filter by price is not known in static planning, so the initial plan opts for sort merge join.
-- But in fact, the "items" table after filtering is very small, so the query can do a broadcast hash join instead.
-- Static explain shows the initial plan with sort merge join.
EXPLAIN FORMATTED
SELECT s_date, sum(s_quantity * i_price) AS total_sales
FROM sales
JOIN items ON s_item_id = i_item_id
WHERE i_price < 10
GROUP BY s_date
ORDER BY total_sales DESC;
Adaptive Query Execution Demo
Adaptive Query Execution (AQE) is query re-optimization that occurs during query execution based on runtime statistics. AQE in Spark 3.0 includes 3 main features: