BroadcastNestedLoopJoin example(Scala)
Loading...

This example notebook explains why a broadcast happens even if you disable the broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1. Apache Spark attempts to broadcast the bigger table and fails with broadcast error. This is not a bug. We are going to review the expected behavior and provide a mitigation option for this issue.

Start by creating two tables, one with null values table_withNull and the other without null values tblA_NoNull.

sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

We attempt to disable broadcast by setting spark.sql.autoBroadcastJoinThreshold for the query, which has a sub-query with an in clause. If you review the query plan, BroadcastNestedLoopJoin is the last possible fallback in this situation. It appears even after attempting to disable the broadcast.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)
== Parsed Logical Plan == 'Project [*] +- 'Filter NOT 'id IN (list#2481 []) : +- 'Project ['id] : +- 'UnresolvedRelation `tblA_NoNull` +- 'UnresolvedRelation `table_withNull` == Analyzed Logical Plan == id: bigint Project [id#2482L] +- Filter NOT id#2482L IN (list#2481 []) : +- Project [id#2483L] : +- SubqueryAlias `default`.`tbla_nonull` : +- Relation[id#2483L] parquet +- SubqueryAlias `default`.`table_withnull` +- Relation[id#2482L] parquet == Optimized Logical Plan == Join LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L))) :- Relation[id#2482L] parquet +- Relation[id#2483L] parquet == Physical Plan == *(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L))) :- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> +- BroadcastExchange IdentityBroadcastMode, [id=#2586] +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

If the data being processed is large enough, this results in broadcast errors when Spark attempts to broadcast the table. You can resolve the issue by rewriting the query with not exists instead of in.

// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)
== Parsed Logical Plan == 'Project [*] +- 'Filter NOT exists#2486 [] : +- 'Project [unresolvedalias(1, None)] : +- 'Filter ('table_withNull.id = 'tblA_NoNull.id) : +- 'UnresolvedRelation `tblA_NoNull` +- 'UnresolvedRelation `table_withNull` == Analyzed Logical Plan == id: bigint Project [id#2482L] +- Filter NOT exists#2486 [id#2482L] : +- Project [1 AS 1#2490] : +- Filter (outer(id#2482L) = id#2483L) : +- SubqueryAlias `default`.`tbla_nonull` : +- Relation[id#2483L] parquet +- SubqueryAlias `default`.`table_withnull` +- Relation[id#2482L] parquet == Optimized Logical Plan == Join LeftAnti, (id#2482L = id#2483L) :- Relation[id#2482L] parquet +- Filter isnotnull(id#2483L) +- Relation[id#2483L] parquet == Physical Plan == SortMergeJoin [id#2482L], [id#2483L], LeftAnti :- Sort [id#2482L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#2482L, 200), [id=#2653] : +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> +- Sort [id#2483L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#2483L, 200), [id=#2656] +- *(2) Project [id#2483L] +- *(2) Filter isnotnull(id#2483L) +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

Spark doesn't do this automatically, because Spark and SQL have slightly different semantics for null handling. Let's illustrate this with an example. A new table with null values is created tbl_notinwithnulls.

sql("SELECT id FROM RANGE(10) UNION SELECT NULL").write.mode("overwrite").saveAsTable("tbl_notinwithnulls")

With tblA_NoNull containing values 0..9, we get values 10..49 from table_withNull table:

sql("select * from table_withNull where id not in (select id from tblA_NoNull)").collect()
res4: Array[org.apache.spark.sql.Row] = Array([45], [47], [28], [33], [32], [43], [26], [29], [35], [20], [40], [27], [39], [17], [15], [41], [13], [34], [31], [16], [18], [11], [49], [30], [42], [12], [48], [38], [19], [14], [25], [36], [23], [24], [21], [46], [37], [22], [44], [10])

But when there is a NULL in tbl_notinwithnulls table, we get nothing back. In SQL, not in means that if there is any null value in the not in values, the result is empty. This is why it can only be executed with BroadcastNestedLoopJoin. All not in values must be known in order to ensure there is no null value in the set.

sql("select * from table_withNull where id not in (select id from tbl_notinwithnulls)").collect()
res5: Array[org.apache.spark.sql.Row] = Array()

With not exists, the null doesn't matter, since it's an inner join subquery:

Note: not exists will also return the null from table_withNull, because it is also true that there is no element in tblA_NoNull, where null = tblA_NoNull.id.

sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").collect()
res6: Array[org.apache.spark.sql.Row] = Array([26], [29], [19], [22], [34], [32], [43], [31], [39], [25], [null], [27], [17], [41], [28], [33], [10], [48], [44], [37], [12], [11], [49], [35], [13], [36], [18], [14], [21], [15], [38], [30], [42], [23], [46], [20], [40], [16], [45], [47], [24])

Also with tbl_notinwithnulls, there is no element where null = tblA_NoNull.id.

sql("select * from table_withNull where not exists (select 1 from tbl_notinwithnulls where table_withNull.id = tbl_notinwithnulls.id)").collect()
res7: Array[org.apache.spark.sql.Row] = Array([26], [29], [19], [22], [34], [32], [43], [31], [39], [25], [null], [27], [17], [41], [28], [33], [10], [48], [44], [37], [12], [11], [49], [35], [13], [36], [18], [14], [21], [15], [38], [30], [42], [23], [46], [20], [40], [16], [45], [47], [24])

The differences between not in, and the not exists rewrite are:

  • with not exists, null values from table_withNull are returned, while with not in they are not returned.
  • with not exists, null values located in tblA_NoNull don't matter, while with not in they cause the query to return an empty result.

As a result, this cannot be rewritten automatically. This would not be an issue if we could guarantee that there are no null values, but without statistics you cannot determine this in advance. Even with statistics, in a complex query, it would be hard to determine. Spark currently doesn't attempt to perform such optimization.

Most users don't care about null values, and would be satisfied with the non exists semantics, but use not in because it is easier to do.