// Unbucketed - bucketed join. Both sides need to be repartitioned. t1.join(t2, Seq("key")).explain()
== Physical Plan ==
*(5) Project [key#150L, value#151, value#155]
+- *(5) SortMergeJoin [key#150L], [key#154L], Inner
:- *(2) Sort [key#150L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#150L, 200)
: +- *(1) Project [key#150L, value#151]
: +- *(1) Filter isnotnull(key#150L)
: +- *(1) FileScan parquet demo.unbucketed[key#150L,value#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/unbucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
+- *(4) Sort [key#154L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#154L, 200)
+- *(3) Project [key#154L, value#155]
+- *(3) Filter isnotnull(key#154L)
+- *(3) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
// Unbucketed - bucketed join. Unbucketed side is correctly repartitioned, and only one shuffle is needed. t1.repartition(16, $"key").join(t2, Seq("key")).explain()
== Physical Plan ==
*(4) Project [key#150L, value#151, value#155]
+- *(4) SortMergeJoin [key#150L], [key#154L], Inner
:- *(2) Sort [key#150L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#150L, 16)
: +- *(1) Project [key#150L, value#151]
: +- *(1) Filter isnotnull(key#150L)
: +- *(1) FileScan parquet demo.unbucketed[key#150L,value#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/unbucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
+- *(3) Sort [key#154L ASC NULLS FIRST], false, 0
+- *(3) Project [key#154L, value#155]
+- *(3) Filter isnotnull(key#154L)
+- *(3) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
// Unbucketed - bucketed join. Unbucketed side is incorrectly repartitioned, and two shuffles are needed t1.repartition($"key").join(t2, Seq("key")).explain()
== Physical Plan ==
*(5) Project [key#150L, value#151, value#155]
+- *(5) SortMergeJoin [key#150L], [key#154L], Inner
:- *(2) Sort [key#150L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#150L, 200)
: +- *(1) Project [key#150L, value#151]
: +- *(1) Filter isnotnull(key#150L)
: +- *(1) FileScan parquet demo.unbucketed[key#150L,value#151] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/unbucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
+- *(4) Sort [key#154L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#154L, 200)
+- *(3) Project [key#154L, value#155]
+- *(3) Filter isnotnull(key#154L)
+- *(3) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
// bucketed - bucketed join. Both sides have the same bucketing, and no shuffles are needed. t3.join(t2, Seq("key")).explain()
== Physical Plan ==
*(3) Project [key#154L, value#155, value#197]
+- *(3) SortMergeJoin [key#154L], [key#196L], Inner
:- *(1) Sort [key#154L ASC NULLS FIRST], false, 0
: +- *(1) Project [key#154L, value#155]
: +- *(1) Filter isnotnull(key#154L)
: +- *(1) FileScan parquet demo.bucketed[key#154L,value#155] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
+- *(2) Sort [key#196L ASC NULLS FIRST], false, 0
+- *(2) Project [key#196L, value#197]
+- *(2) Filter isnotnull(key#196L)
+- *(2) FileScan parquet demo.bucketed[key#196L,value#197] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/demo.db/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>