ShuffleExchange Unary Physical Operator

ShuffleExchange is a physical operator (with one child physical operator) to perform a shuffle.

ShuffleExchange corresponds to Repartition (with shuffle enabled) and RepartitionByExpression logical operators (as translated in BasicOperators execution planning strategy).

Note
ShuffleExchange shows as Exchange in physical plans.
// Uses Repartition logical operator
// ShuffleExchange with RoundRobinPartitioning
val q1 = spark.range(6).repartition(2)
scala> q1.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(2)
+- *Range (0, 6, step=1, splits=Some(8))

// Uses RepartitionByExpression logical operator
// ShuffleExchange with HashPartitioning
val q2 = spark.range(6).repartition(2, 'id % 2)
scala> q2.explain
== Physical Plan ==
Exchange hashpartitioning((id#38L % 2), 2)
+- *Range (0, 6, step=1, splits=Some(8))

When created, ShuffleExchange takes a Partitioning, a single child physical operator and an optional ExchangeCoordinator.

Table 1. ShuffleExchange SQLMetrics (in alphabetical order)
Name Description

dataSize

data size

spark sql ShuffleExchange webui.png
Figure 1. ShuffleExchange in web UI (Details for Query)

nodeName is computed based on the optional ExchangeCoordinator with Exchange prefix and possibly (coordinator id: [coordinator-hash-code]).

outputPartitioning is the input Partitioning.

While preparing execution (using doPrepare), ShuffleExchange registers itself with the ExchangeCoordinator if available.

When doExecute, ShuffleExchange computes a ShuffledRowRDD and caches it (to reuse avoiding possibly expensive executions).

Table 2. ShuffleExchange’s Internal Registries and Counters (in alphabetical order)
Name Description

cachedShuffleRDD

ShuffledRowRDD that is cached after ShuffleExchange has been executed.

Executing ShuffleExchange (and Creating ShuffledRowRDD with Internal Binary Rows Using Optional ExchangeCoordinator) — doExecute Method

doExecute(): RDD[InternalRow]

doExecute creates a new ShuffledRowRDD or takes cached one.

doExecute branches off per optional ExchangeCoordinator.

If ExchangeCoordinator was specified, doExecute requests ExchangeCoordinator for a ShuffledRowRDD.

Otherwise (with no ExchangeCoordinator specified), doExecute prepareShuffleDependency and preparePostShuffleRDD.

Note
doExecute is a part of SparkPlan Contract to produce the result of a structured query as an RDD of internal binary rows.

preparePostShuffleRDD Method

Caution
FIXME

prepareShuffleDependency Internal Method

prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow]
Caution
FIXME

prepareShuffleDependency Helper Method

prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency creates a ShuffleDependency dependency.

Note
prepareShuffleDependency is used when ShuffleExchange prepares a ShuffleDependency (as part of…​FIXME), CollectLimitExec and TakeOrderedAndProjectExec physical operators are executed.

results matching ""

    No results matching ""