spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT 'id IN (list#2481 [])
: +- 'Project ['id]
: +- 'UnresolvedRelation `tblA_NoNull`
+- 'UnresolvedRelation `table_withNull`
== Analyzed Logical Plan ==
id: bigint
Project [id#2482L]
+- Filter NOT id#2482L IN (list#2481 [])
: +- Project [id#2483L]
: +- SubqueryAlias `default`.`tbla_nonull`
: +- Relation[id#2483L] parquet
+- SubqueryAlias `default`.`table_withnull`
+- Relation[id#2482L] parquet
== Optimized Logical Plan ==
Join LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- Relation[id#2482L] parquet
+- Relation[id#2483L] parquet
== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
+- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT exists#2486 []
: +- 'Project [unresolvedalias(1, None)]
: +- 'Filter ('table_withNull.id = 'tblA_NoNull.id)
: +- 'UnresolvedRelation `tblA_NoNull`
+- 'UnresolvedRelation `table_withNull`
== Analyzed Logical Plan ==
id: bigint
Project [id#2482L]
+- Filter NOT exists#2486 [id#2482L]
: +- Project [1 AS 1#2490]
: +- Filter (outer(id#2482L) = id#2483L)
: +- SubqueryAlias `default`.`tbla_nonull`
: +- Relation[id#2483L] parquet
+- SubqueryAlias `default`.`table_withnull`
+- Relation[id#2482L] parquet
== Optimized Logical Plan ==
Join LeftAnti, (id#2482L = id#2483L)
:- Relation[id#2482L] parquet
+- Filter isnotnull(id#2483L)
+- Relation[id#2483L] parquet
== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
: +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
+- *(2) Project [id#2483L]
+- *(2) Filter isnotnull(id#2483L)
+- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").collect()
res4: Array[org.apache.spark.sql.Row] = Array([45], [47], [28], [33], [32], [43], [26], [29], [35], [20], [40], [27], [39], [17], [15], [41], [13], [34], [31], [16], [18], [11], [49], [30], [42], [12], [48], [38], [19], [14], [25], [36], [23], [24], [21], [46], [37], [22], [44], [10])
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").collect()
res6: Array[org.apache.spark.sql.Row] = Array([26], [29], [19], [22], [34], [32], [43], [31], [39], [25], [null], [27], [17], [41], [28], [33], [10], [48], [44], [37], [12], [11], [49], [35], [13], [36], [18], [14], [21], [15], [38], [30], [42], [23], [46], [20], [40], [16], [45], [47], [24])
sql("select * from table_withNull where not exists (select 1 from tbl_notinwithnulls where table_withNull.id = tbl_notinwithnulls.id)").collect()
res7: Array[org.apache.spark.sql.Row] = Array([26], [29], [19], [22], [34], [32], [43], [31], [39], [25], [null], [27], [17], [41], [28], [33], [10], [48], [44], [37], [12], [11], [49], [35], [13], [36], [18], [14], [21], [15], [38], [30], [42], [23], [46], [20], [40], [16], [45], [47], [24])
This example notebook explains why a broadcast happens even if you disable the broadcast by setting
spark.sql.autoBroadcastJoinThreshold
to -1. Apache Spark attempts to broadcast the bigger table and fails with broadcast error. This is not a bug. We are going to review the expected behavior and provide a mitigation option for this issue.Last refresh: Never