BroadcastHashJoinExec Binary Physical Operator

BroadcastHashJoinExec is a binary physical operator that supports code generation (aka codegen).

BroadcastHashJoinExec is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right physical operator can be broadcast.

val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "BroadcastHashJoinExec")
).toDF("id", "token")

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760

val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]

BroadcastHashJoinExec requires that partition requirements for the two children physical operators match BroadcastDistribution (with HashedRelationBroadcastMode) and UnspecifiedDistribution (for left and right sides of a join or vice versa).

Table 1. BroadcastHashJoinExec’s SQLMetrics
Name Description

numOutputRows

Number of output rows

spark sql BroadcastHashJoinExec webui query details.png
Figure 1. BroadcastHashJoinExec in web UI (Details for Query)
Note
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */   private UnsafeRow bhj_result;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
...
Table 2. BroadcastHashJoinExec’s Required Child Output Distributions
BuildSide Left Child Right Child

BuildLeft

BroadcastDistribution <1>

UnspecifiedDistribution

BuildRight

UnspecifiedDistribution

BroadcastDistribution <1>

  1. BroadcastDistribution uses HashedRelationBroadcastMode broadcast mode per buildKeys

Creating BroadcastHashJoinExec Instance

BroadcastHashJoinExec takes the following when created:

results matching ""

    No results matching ""