val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "BroadcastHashJoinExec")
).toDF("id", "token")
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760
val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
:- LocalTableScan [id#15, token#16]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#20, token#21]
BroadcastHashJoinExec Binary Physical Operator
BroadcastHashJoinExec
is a binary physical operator that supports code generation (aka codegen).
BroadcastHashJoinExec
is created after applying JoinSelection execution planning strategy to ExtractEquiJoinKeys-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right
physical operator can be broadcast.
BroadcastHashJoinExec
requires that partition requirements for the two children physical operators match BroadcastDistribution
(with HashedRelationBroadcastMode
) and UnspecifiedDistribution
(for left and right sides of a join or vice versa).
Name | Description |
---|---|
Number of output rows |
Note
|
The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.
|
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
:- LocalTableScan [id#15, token#16]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#20, token#21]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */ private UnsafeRow bhj_result;
/* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
...
BuildSide | Left Child | Right Child |
---|---|---|
|
|
|
|
|
|
-
BroadcastDistribution
usesHashedRelationBroadcastMode
broadcast mode perbuildKeys
Creating BroadcastHashJoinExec Instance
BroadcastHashJoinExec
takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator