// Start spark-shell with broadcast hash join disabled, i.e. spark.sql.autoBroadcastJoinThreshold=-1
// ./bin/spark-shell -c spark.sql.autoBroadcastJoinThreshold=-1
// Mind the data types so ShuffledHashJoinExec is not selected
val dataset = Seq(
(0, "playing"),
(1, "with"),
(2, "SortMergeJoinExec")
).toDF("id", "token")
// all data types are orderable
scala> dataset.printSchema
root
|-- id: integer (nullable = false)
|-- token: string (nullable = true)
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = -1
val q = dataset.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#27, token#28, token#6]
+- *SortMergeJoin [id#27], [id#5], Inner
:- *Sort [id#27 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#27, 200)
: +- LocalTableScan [id#27, token#28]
+- *Sort [id#5 ASC NULLS FIRST], false, 0
+- ReusedExchange [id#5, token#6], Exchange hashpartitioning(id#27, 200)
SortMergeJoinExec Binary Physical Operator
SortMergeJoinExec
is a binary physical operator that supports code generation (aka whole-stage codegen).
SortMergeJoinExec
is created exclusively for joins with left join keys orderable, i.e. that can be ordered (sorted).
Note
|
A join key is orderable when is of one of the following data types:
Therefore, a join key is not orderable when is of the following data type:
|
Name | Description |
---|---|
number of output rows |
Note
|
The prefix for variable names for SortMergeJoinExec operators in CodegenSupport-generated code is smj.
|
scala> q.queryExecution.debug.codegen
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 ==
*Project [id#5, token#6, token#11]
+- *SortMergeJoin [id#5], [id#10], Inner
:- *Sort [id#5 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5, 200)
: +- LocalTableScan [id#5, token#6]
+- *Sort [id#10 ASC NULLS FIRST], false, 0
+- ReusedExchange [id#10, token#11], Exchange hashpartitioning(id#5, 200)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator smj_leftInput;
/* 009 */ private scala.collection.Iterator smj_rightInput;
/* 010 */ private InternalRow smj_leftRow;
/* 011 */ private InternalRow smj_rightRow;
/* 012 */ private int smj_value2;
/* 013 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 014 */ private int smj_value3;
/* 015 */ private int smj_value4;
/* 016 */ private UTF8String smj_value5;
/* 017 */ private boolean smj_isNull2;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 019 */ private UnsafeRow smj_result;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
...
Note
|
SortMergeJoinExec operator is chosen in JoinSelection execution planning strategy (after BroadcastHashJoinExec and ShuffledHashJoinExec physical join operators have not met the requirements).
|
doExecute
Method
Caution
|
FIXME |
Creating SortMergeJoinExec Instance
SortMergeJoinExec
takes the following when created:
-
Left join key expressions
-
Right join key expressions
-
Optional join condition expression
-
Left physical operator
-
Right physical operator