Seq((0, "aa"), (0, "bb"))
.toDF("id", "token")
.createOrReplaceTempView("left")
Seq(("aa", 0.99), ("bb", 0.57))
.toDF("token", "prob")
.createOrReplaceTempView("right")
scala> spark.catalog.listTables.filter('name.like("left") or 'name.like("right")).show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
| left| null| null|TEMPORARY| true|
|right| null| null|TEMPORARY| true|
+-----+--------+-----------+---------+-----------+
val query = """
| EXPLAIN COST
| SELECT /*+ BROADCAST (right) */ *
| FROM left, right
| WHERE left.token = right.token
| """
val cost = sql(query).as[String].collect()(0)
scala> println(cost)
== Parsed Logical Plan ==
'Hint BROADCAST, [right]
+- 'Project [*]
+- 'Filter ('left.token = 'right.token)
+- 'Join Inner
:- 'UnresolvedRelation `left`
+- 'UnresolvedRelation `right`
== Analyzed Logical Plan ==
id: int, token: string, token: string, prob: double
Project [id#184, token#185, token#195, prob#196]
+- Filter (token#185 = token#195)
+- Join Inner
:- SubqueryAlias left
: +- Project [_1#181 AS id#184, _2#182 AS token#185]
: +- LocalRelation [_1#181, _2#182]
+- BroadcastHint
+- SubqueryAlias right
+- Project [_1#192 AS token#195, _2#193 AS prob#196]
+- LocalRelation [_1#192, _2#193]
== Optimized Logical Plan ==
Join Inner, (token#185 = token#195), Statistics(sizeInBytes=2.6 KB, isBroadcastable=false)
:- Project [_1#181 AS id#184, _2#182 AS token#185], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
: +- Filter isnotnull(_2#182), Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
: +- LocalRelation [_1#181, _2#182], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
+- BroadcastHint, Statistics(sizeInBytes=56.0 B, isBroadcastable=true)
+- Project [_1#192 AS token#195, _2#193 AS prob#196], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
+- Filter isnotnull(_1#192), Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
+- LocalRelation [_1#192, _2#193], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
== Physical Plan ==
*BroadcastHashJoin [token#185], [token#195], Inner, BuildRight
:- *Project [_1#181 AS id#184, _2#182 AS token#185]
: +- *Filter isnotnull(_2#182)
: +- LocalTableScan [_1#181, _2#182]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_1#192 AS token#195, _2#193 AS prob#196]
+- *Filter isnotnull(_1#192)
+- LocalTableScan [_1#192, _2#193]
BroadcastHint Unary Logical Operator
BroadcastHint
is a unary logical operator that acts as a hint for…FIXME
BroadcastHint
is added to a logical plan when:
-
Analyzer
resolves broadcast hints, i.e.BROADCAST
,BROADCASTJOIN
andMAPJOIN
hints in SQL queries (see the example) -
broadcast function is used (see the example)
BroadcastHint and SQL’s Hints
BroadcastHint and broadcast function
val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]
scala> println(left.join(broadcast(right), "token").queryExecution.toStringWithStats)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(token))
:- Project [_1#123 AS id#126, _2#124 AS token#127]
: +- LocalRelation [_1#123, _2#124]
+- BroadcastHint
+- Project [_1#136 AS token#139, _2#137 AS prob#140]
+- LocalRelation [_1#136, _2#137]
== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#127, id#126, prob#140]
+- Join Inner, (token#127 = token#139)
:- Project [_1#123 AS id#126, _2#124 AS token#127]
: +- LocalRelation [_1#123, _2#124]
+- BroadcastHint
+- Project [_1#136 AS token#139, _2#137 AS prob#140]
+- LocalRelation [_1#136, _2#137]
== Optimized Logical Plan ==
Project [token#127, id#126, prob#140], Statistics(sizeInBytes=1792.0 B, isBroadcastable=false)
+- Join Inner, (token#127 = token#139), Statistics(sizeInBytes=2.6 KB, isBroadcastable=false)
:- Project [_1#123 AS id#126, _2#124 AS token#127], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
: +- Filter isnotnull(_2#124), Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
: +- LocalRelation [_1#123, _2#124], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
+- BroadcastHint, Statistics(sizeInBytes=56.0 B, isBroadcastable=true)
+- Project [_1#136 AS token#139, _2#137 AS prob#140], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
+- Filter isnotnull(_1#136), Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
+- LocalRelation [_1#136, _2#137], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
== Physical Plan ==
*Project [token#127, id#126, prob#140]
+- *BroadcastHashJoin [token#127], [token#139], Inner, BuildRight
:- *Project [_1#123 AS id#126, _2#124 AS token#127]
: +- *Filter isnotnull(_2#124)
: +- LocalTableScan [_1#123, _2#124]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_1#136 AS token#139, _2#137 AS prob#140]
+- *Filter isnotnull(_1#136)
+- LocalTableScan [_1#136, _2#137]
computeStats Method
computeStats(conf: CatalystConf): Statistics
computeStats
marks the parent as broadcast (i.e. isBroadcastable
flag is enabled).
Note
|
computeStats is a part of LogicalPlan Contract.
|