val t1 = spark.range(5)
val t2 = spark.range(5)
val q = t1.join(t2).where(t1("id") === t2("id"))
scala> q.explain
== Physical Plan ==
*BroadcastHashJoin [id#19L], [id#22L], Inner, BuildRight
:- *Range (0, 5, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 5, step=1, splits=Some(8))
BroadcastExchangeExec Unary Operator for Broadcasting Joins
BroadcastExchangeExec
is a physical operator (with one child physical operator) to broadcast rows (of a relation) to worker nodes.
BroadcastExchangeExec
is created exclusively when EnsureRequirements
physical query plan optimization ensures BroadcastDistribution of the input data of a physical operator (that seemingly can be either BroadcastHashJoinExec or BroadcastNestedLoopJoinExec operators).
Name | Description |
---|---|
time to broadcast (ms) |
|
time to build (ms) |
|
time to collect (ms) |
|
data size (bytes) |
BroadcastExchangeExec
uses BroadcastPartitioning partitioning scheme (with the input BroadcastMode).
Creating BroadcastExchangeExec Instance
BroadcastExchangeExec
takes the following when created:
-
Child logical plan
Preparing Asynchronous Broadcast (with Rows) — doPrepare
Method
doPrepare(): Unit
doPrepare
"materializes" the internal lazily-once-initialized asynchronous broadcast.
Note
|
doPrepare is a part of SparkPlan Contract to prepare a physical operator for execution.
|
Broadcasting Rows — doExecuteBroadcast
Method
def doExecuteBroadcast[T](): broadcast.Broadcast[T]
doExecuteBroadcast
waits until the rows are broadcast.
Note
|
doExecuteBroadcast waits spark.sql.broadcastTimeout (i.e. 5 minutes).
|
Note
|
doExecuteBroadcast is a part of SparkPlan Contract to return the result of a structured query as a broadcast variable.
|
Lazily-Once-Initialized Asynchronously-Broadcast relationFuture
Internal Attribute
relationFuture: Future[broadcast.Broadcast[Any]]
When "materialized" (aka executed), relationFuture
finds the current execution id and sets it to the Future
thread.
relationFuture
requests child physical operator to executeCollect.
relationFuture
records the time for executeCollect
in collectTime metrics and the size of the data in dataSize metrics.
Note
|
relationFuture accepts a relation with up to 512 millions rows and 8GB in size, and reports a SparkException if the conditions are violated.
|
relationFuture
requests the input BroadcastMode to transform
the internal rows and records the time in buildTime metrics.
relationFuture
requests the current SparkContext
to broadcast
the transformed internal rows and records the time in broadcastTime metrics.
In the end, relationFuture
posts SparkListenerDriverAccumUpdates
(with the execution id and the metrics) and returns the broadcast internal rows.
In case of OutOfMemoryError
, relationFuture
reports another OutOfMemoryError
with the following message:
Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
Note
|
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix broadcast-exchange and 128 threads).
|