BroadcastExchangeExec Unary Operator for Broadcasting Joins

BroadcastExchangeExec is a physical operator (with one child physical operator) to broadcast rows (of a relation) to worker nodes.

BroadcastExchangeExec is created exclusively when EnsureRequirements physical query plan optimization ensures BroadcastDistribution of the input data of a physical operator (that seemingly can be either BroadcastHashJoinExec or BroadcastNestedLoopJoinExec operators).

val t1 = spark.range(5)
val t2 = spark.range(5)
val q = t1.join(t2).where(t1("id") === t2("id"))

scala> q.explain
== Physical Plan ==
*BroadcastHashJoin [id#19L], [id#22L], Inner, BuildRight
:- *Range (0, 5, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 5, step=1, splits=Some(8))
Table 1. BroadcastExchangeExec SQLMetrics (in alphabetical order)
Name Description

broadcastTime

time to broadcast (ms)

buildTime

time to build (ms)

collectTime

time to collect (ms)

dataSize

data size (bytes)

spark sql BroadcastExchangeExec webui details for query.png
Figure 1. BroadcastExchangeExec in web UI (Details for Query)

BroadcastExchangeExec uses BroadcastPartitioning partitioning scheme (with the input BroadcastMode).

Creating BroadcastExchangeExec Instance

BroadcastExchangeExec takes the following when created:

Preparing Asynchronous Broadcast (with Rows) — doPrepare Method

doPrepare(): Unit

doPrepare "materializes" the internal lazily-once-initialized asynchronous broadcast.

Note
doPrepare is a part of SparkPlan Contract to prepare a physical operator for execution.

Broadcasting Rows — doExecuteBroadcast Method

def doExecuteBroadcast[T](): broadcast.Broadcast[T]

doExecuteBroadcast waits until the rows are broadcast.

Note
doExecuteBroadcast waits spark.sql.broadcastTimeout (i.e. 5 minutes).
Note
doExecuteBroadcast is a part of SparkPlan Contract to return the result of a structured query as a broadcast variable.

Lazily-Once-Initialized Asynchronously-Broadcast relationFuture Internal Attribute

relationFuture: Future[broadcast.Broadcast[Any]]

When "materialized" (aka executed), relationFuture finds the current execution id and sets it to the Future thread.

relationFuture requests child physical operator to executeCollect.

relationFuture records the time for executeCollect in collectTime metrics and the size of the data in dataSize metrics.

Note
relationFuture accepts a relation with up to 512 millions rows and 8GB in size, and reports a SparkException if the conditions are violated.

relationFuture requests the input BroadcastMode to transform the internal rows and records the time in buildTime metrics.

relationFuture requests the current SparkContext to broadcast the transformed internal rows and records the time in broadcastTime metrics.

In the end, relationFuture posts SparkListenerDriverAccumUpdates (with the execution id and the metrics) and returns the broadcast internal rows.

In case of OutOfMemoryError, relationFuture reports another OutOfMemoryError with the following message:

Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
Note
relationFuture is executed on a separate thread from a custom scala.concurrent.ExecutionContext (built from a cached java.util.concurrent.ThreadPoolExecutor with the prefix broadcast-exchange and 128 threads).

results matching ""

    No results matching ""