BroadcastHint Unary Logical Operator

BroadcastHint is a unary logical operator that acts as a hint for…​FIXME

BroadcastHint is added to a logical plan when:

BroadcastHint and SQL’s Hints

Seq((0, "aa"), (0, "bb"))
  .toDF("id", "token")
  .createOrReplaceTempView("left")

Seq(("aa", 0.99), ("bb", 0.57))
  .toDF("token", "prob")
  .createOrReplaceTempView("right")

scala> spark.catalog.listTables.filter('name.like("left") or 'name.like("right")).show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
| left|    null|       null|TEMPORARY|       true|
|right|    null|       null|TEMPORARY|       true|
+-----+--------+-----------+---------+-----------+

val query = """
     | EXPLAIN COST
     | SELECT /*+ BROADCAST (right) */ *
     | FROM left, right
     | WHERE left.token = right.token
     | """

val cost = sql(query).as[String].collect()(0)

scala> println(cost)
== Parsed Logical Plan ==
'Hint BROADCAST, [right]
+- 'Project [*]
   +- 'Filter ('left.token = 'right.token)
      +- 'Join Inner
         :- 'UnresolvedRelation `left`
         +- 'UnresolvedRelation `right`

== Analyzed Logical Plan ==
id: int, token: string, token: string, prob: double
Project [id#184, token#185, token#195, prob#196]
+- Filter (token#185 = token#195)
   +- Join Inner
      :- SubqueryAlias left
      :  +- Project [_1#181 AS id#184, _2#182 AS token#185]
      :     +- LocalRelation [_1#181, _2#182]
      +- BroadcastHint
         +- SubqueryAlias right
            +- Project [_1#192 AS token#195, _2#193 AS prob#196]
               +- LocalRelation [_1#192, _2#193]

== Optimized Logical Plan ==
Join Inner, (token#185 = token#195), Statistics(sizeInBytes=2.6 KB, isBroadcastable=false)
:- Project [_1#181 AS id#184, _2#182 AS token#185], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
:  +- Filter isnotnull(_2#182), Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
:     +- LocalRelation [_1#181, _2#182], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
+- BroadcastHint, Statistics(sizeInBytes=56.0 B, isBroadcastable=true)
   +- Project [_1#192 AS token#195, _2#193 AS prob#196], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
      +- Filter isnotnull(_1#192), Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
         +- LocalRelation [_1#192, _2#193], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)

== Physical Plan ==
*BroadcastHashJoin [token#185], [token#195], Inner, BuildRight
:- *Project [_1#181 AS id#184, _2#182 AS token#185]
:  +- *Filter isnotnull(_2#182)
:     +- LocalTableScan [_1#181, _2#182]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *Project [_1#192 AS token#195, _2#193 AS prob#196]
      +- *Filter isnotnull(_1#192)
         +- LocalTableScan [_1#192, _2#193]

BroadcastHint and broadcast function

val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]

scala> println(left.join(broadcast(right), "token").queryExecution.toStringWithStats)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(token))
:- Project [_1#123 AS id#126, _2#124 AS token#127]
:  +- LocalRelation [_1#123, _2#124]
+- BroadcastHint
   +- Project [_1#136 AS token#139, _2#137 AS prob#140]
      +- LocalRelation [_1#136, _2#137]

== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#127, id#126, prob#140]
+- Join Inner, (token#127 = token#139)
   :- Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- LocalRelation [_1#123, _2#124]
   +- BroadcastHint
      +- Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- LocalRelation [_1#136, _2#137]

== Optimized Logical Plan ==
Project [token#127, id#126, prob#140], Statistics(sizeInBytes=1792.0 B, isBroadcastable=false)
+- Join Inner, (token#127 = token#139), Statistics(sizeInBytes=2.6 KB, isBroadcastable=false)
   :- Project [_1#123 AS id#126, _2#124 AS token#127], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
   :  +- Filter isnotnull(_2#124), Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
   :     +- LocalRelation [_1#123, _2#124], Statistics(sizeInBytes=48.0 B, isBroadcastable=false)
   +- BroadcastHint, Statistics(sizeInBytes=56.0 B, isBroadcastable=true)
      +- Project [_1#136 AS token#139, _2#137 AS prob#140], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
         +- Filter isnotnull(_1#136), Statistics(sizeInBytes=56.0 B, isBroadcastable=false)
            +- LocalRelation [_1#136, _2#137], Statistics(sizeInBytes=56.0 B, isBroadcastable=false)

== Physical Plan ==
*Project [token#127, id#126, prob#140]
+- *BroadcastHashJoin [token#127], [token#139], Inner, BuildRight
   :- *Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- *Filter isnotnull(_2#124)
   :     +- LocalTableScan [_1#123, _2#124]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- *Filter isnotnull(_1#136)
            +- LocalTableScan [_1#136, _2#137]

computeStats Method

computeStats(conf: CatalystConf): Statistics

computeStats marks the parent as broadcast (i.e. isBroadcastable flag is enabled).

Note
computeStats is a part of LogicalPlan Contract.

results matching ""

    No results matching ""