Bucketing Example(Scala)
%sql
create database if not exists demo;
use demo;
OK
import org.apache.spark.sql.functions._
def base = spark.range(1, 16000000, 1, 16).select($"id" as "key", rand(12) as "value")
import org.apache.spark.sql.functions._ base: org.apache.spark.sql.DataFrame
// Write non-bucketed table
base.write.format("parquet").saveAsTable("unbucketed")
// Write bucketed table
base.write.format("parquet").bucketBy(16, "key").sortBy("value").saveAsTable("bucketed")
val t1 = spark.table("unbucketed")
val t2 = spark.table("bucketed")
val t3 = spark.table("bucketed")
t1: org.apache.spark.sql.DataFrame = [key: bigint, value: double] t2: org.apache.spark.sql.DataFrame = [key: bigint, value: double] t3: org.apache.spark.sql.DataFrame = [key: bigint, value: double]
// Unbucketed - bucketed join. Both sides need to be repartitioned.
t1.join(t2, Seq("key")).explain()
== Physical Plan == *(5) Project [key#150L, value#151, value#155] +- *(5) SortMergeJoin [key#150L], [key#154L], Inner :- *(2) Sort [key#150L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#150L, 200) : +- *(1) Project [key#150L, value#151] : +- *(1) Filter isnotnull(key#150L) : +- *(1) FileScan parquet demo.unbucketed[key#150L,value#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/unbucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double> +- *(4) Sort [key#154L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key#154L, 200) +- *(3) Project [key#154L, value#155] +- *(3) Filter isnotnull(key#154L) +- *(3) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
// Unbucketed - bucketed join. Unbucketed side is correctly repartitioned, and only one shuffle is needed.
t1.repartition(16, $"key").join(t2, Seq("key")).explain()
== Physical Plan == *(4) Project [key#150L, value#151, value#155] +- *(4) SortMergeJoin [key#150L], [key#154L], Inner :- *(2) Sort [key#150L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#150L, 16) : +- *(1) Project [key#150L, value#151] : +- *(1) Filter isnotnull(key#150L) : +- *(1) FileScan parquet demo.unbucketed[key#150L,value#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/unbucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double> +- *(3) Sort [key#154L ASC NULLS FIRST], false, 0 +- *(3) Project [key#154L, value#155] +- *(3) Filter isnotnull(key#154L) +- *(3) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
// Unbucketed - bucketed join. Unbucketed side is incorrectly repartitioned, and two shuffles are needed
t1.repartition($"key").join(t2, Seq("key")).explain()
== Physical Plan == *(5) Project [key#150L, value#151, value#155] +- *(5) SortMergeJoin [key#150L], [key#154L], Inner :- *(2) Sort [key#150L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#150L, 200) : +- *(1) Project [key#150L, value#151] : +- *(1) Filter isnotnull(key#150L) : +- *(1) FileScan parquet demo.unbucketed[key#150L,value#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/unbucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double> +- *(4) Sort [key#154L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key#154L, 200) +- *(3) Project [key#154L, value#155] +- *(3) Filter isnotnull(key#154L) +- *(3) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
// bucketed - bucketed join. Both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, Seq("key")).explain()
== Physical Plan == *(3) Project [key#154L, value#155, value#197] +- *(3) SortMergeJoin [key#154L], [key#196L], Inner :- *(1) Sort [key#154L ASC NULLS FIRST], false, 0 : +- *(1) Project [key#154L, value#155] : +- *(1) Filter isnotnull(key#154L) : +- *(1) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double> +- *(2) Sort [key#196L ASC NULLS FIRST], false, 0 +- *(2) Project [key#196L, value#197] +- *(2) Filter isnotnull(key#196L) +- *(2) FileScan parquet demo.bucketed[key#196L,value#197] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>