ShuffleMapStage — Intermediate Stage in Execution DAG

ShuffleMapStage (aka shuffle map stage or simply map stage) is an intermediate stage in the physical execution DAG that corresponds to a ShuffleDependency.

Note
The logical DAG or logical execution plan is the RDD lineage.

When executed, a ShuffleMapStage saves map output files that can later be fetched by reduce tasks. When all map outputs are available, the ShuffleMapStage is considered available (or ready).

Output locations can be missing, i.e. partitions have not been calculated or are lost.

ShuffleMapStage uses outputLocs and _numAvailableOutputs internal registries to track how many shuffle map outputs are available.

ShuffleMapStage is an input for the other following stages in the DAG of stages and is also called a shuffle dependency’s map side.

A ShuffleMapStage may contain multiple pipelined operations, e.g. map and filter, before shuffle operation.

A single ShuffleMapStage can be shared across different jobs.

Table 1. ShuffleMapStage Internal Registries and Counters
Name Description

_mapStageJobs

ActiveJobs associated with the ShuffleMapStage.

A new ActiveJob can be registered and deregistered.

The list of ActiveJobs registered are available using mapStageJobs.

outputLocs

Tracks MapStatuses for each partition.

There could be many MapStatus entries per partition due to Speculative Execution of Tasks.

When ShuffleMapStage is created, outputLocs is empty, i.e. all elements are empty lists.

The size of outputLocs is exactly the number of partitions of the RDD the stage runs on.

_numAvailableOutputs

The number of available outputs for the partitions of the ShuffleMapStage.

_numAvailableOutputs is incremented when the first MapStatus is registered for a partition (that could be more tasks per partition) and decrements when the last MapStatus is removed for a partition.

_numAvailableOutputs should not be greater than the number of partitions (and hence the number of MapStatus collections in outputLocs internal registry).

Creating ShuffleMapStage Instance

ShuffleMapStage takes the following when created:

  1. id identifier

  2. rdd — the RDD of ShuffleDependency

  3. numTasks — the number of tasks (that is exactly the number of partitions in the rdd)

  4. parents — the collection of parent Stages

  5. firstJobId — the ActiveJob that created it

  6. callSite — the creationSite of the RDD

  7. shuffleDep — ShuffleDependency (from the logical execution plan)

ShuffleMapStage initializes the internal registries and counters.

Note
DAGScheduler tracks the number of ShuffleMapStage created so far.
Note
ShuffleMapStage is created only when DAGScheduler creates one for a ShuffleDependency.

Registering MapStatus For Partition — addOutputLoc Method

addOutputLoc(partition: Int, status: MapStatus): Unit

addOutputLoc adds the input status to the output locations for the input partition.

addOutputLoc increments _numAvailableOutputs internal counter if the input MapStatus is the first result for the partition.

Removing MapStatus For Partition And BlockManager — removeOutputLoc Method

removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit

removeOutputLoc removes the MapStatus for the input partition and bmAddress BlockManager from the output locations.

removeOutputLoc decrements _numAvailableOutputs internal counter if the the removed MapStatus was the last result for the partition.

Note
removeOutputLoc is exclusively used when a Task has failed with FetchFailed exception.

Finding Missing Partitions — findMissingPartitions Method

findMissingPartitions(): Seq[Int]
Note
findMissingPartitions is a part of Stage contract that returns the partitions that are missing, i.e. are yet to be computed.

Internally, findMissingPartitions uses outputLocs internal registry to find indices with empty lists of MapStatus.

ShuffleMapStage Sharing

A ShuffleMapStage can be shared across multiple jobs, if these jobs reuse the same RDDs.

When a ShuffleMapStage is submitted to DAGScheduler to execute, getShuffleMapStage is called.

scala> val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey()  (1)

scala> rdd.count  (2)

scala> rdd.count  (3)
  1. Shuffle at sortByKey()

  2. Submits a job with two stages with two being executed

  3. Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed

dagscheduler webui skipped stages.png
Figure 1. Skipped Stages are already-computed ShuffleMapStages

Returning Number of Available Shuffle Map Outputs — numAvailableOutputs Method

numAvailableOutputs: Int

numAvailableOutputs returns _numAvailableOutputs internal registry.

Note
numAvailableOutputs is used exclusively when DAGScheduler submits missing tasks for ShuffleMapStage (and only to print a DEBUG message when the ShuffleMapStage is finished).

Returning Collection of Active Jobs — mapStageJobs Method

mapStageJobs: Seq[ActiveJob]

mapStageJobs returns _mapStageJobs internal registry.

Note
mapStageJobs is used exclusively when DAGScheduler is notified that a ShuffleMapTask has finished successfully (and the task made ShuffleMapStage completed and so marks any map-stage jobs waiting on this stage as finished).

Registering Job (that Computes ShuffleDependency) — addActiveJob Method

addActiveJob(job: ActiveJob): Unit

addActiveJob registers the input ActiveJob in _mapStageJobs internal registry.

Note
The ActiveJob is added as the first element in _mapStageJobs.
Note
addActiveJob is used exclusively when DAGScheduler is notified that a ShuffleDependency was submitted (and so a new ActiveJob is created to compute it).

Deregistering Job — removeActiveJob Method

removeActiveJob(job: ActiveJob): Unit

removeActiveJob removes a ActiveJob from _mapStageJobs internal registry.

Note
removeActiveJob is used exclusively when DAGScheduler cleans up after ActiveJob has finished (regardless of the outcome).

Removing All Shuffle Outputs Registered for Lost Executor — removeOutputsOnExecutor Method

removeOutputsOnExecutor(execId: String): Unit

removeOutputsOnExecutor removes all MapStatuses with the input execId executor from the outputLocs internal registry (of MapStatuses per partition).

If the input execId had the last registered MapStatus for a partition, removeOutputsOnExecutor decrements _numAvailableOutputs counter and you should see the following INFO message in the logs:

INFO [stage] is now unavailable on executor [execId] ([_numAvailableOutputs]/[numPartitions], [isAvailable])
Note
removeOutputsOnExecutor is used exclusively when DAGScheduler cleans up after a lost executor.

Preparing Shuffle Map Outputs in MapOutputTracker Format — outputLocInMapOutputTrackerFormat Method

outputLocInMapOutputTrackerFormat(): Array[MapStatus]

outputLocInMapOutputTrackerFormat returns the first (if available) element for every partition from outputLocs internal registry. If there is no entry for a partition, that position is filled with null.

Note

outputLocInMapOutputTrackerFormat is used when DAGScheduler is notified that a ShuffleMapTask has finished successfully (and the corresponding ShuffleMapStage is complete) and cleans up after a lost executor.

In both cases, outputLocInMapOutputTrackerFormat is used to register the shuffle map outputs (of the ShuffleDependency) with MapOutputTrackerMaster.

results matching ""

    No results matching ""