addOutputLoc(partition: Int, status: MapStatus): Unit
ShuffleMapStage — Intermediate Stage in Execution DAG
ShuffleMapStage
(aka shuffle map stage or simply map stage) is an intermediate stage in the physical execution DAG that corresponds to a ShuffleDependency.
Note
|
ShuffleMapStage can also be submitted independently as a Spark job for Adaptive Query Planning / Adaptive Scheduling.
|
Note
|
The logical DAG or logical execution plan is the RDD lineage. |
When executed, a ShuffleMapStage
saves map output files that can later be fetched by reduce tasks. When all map outputs are available, the ShuffleMapStage
is considered available (or ready).
Output locations can be missing, i.e. partitions have not been calculated or are lost.
ShuffleMapStage
uses outputLocs and _numAvailableOutputs internal registries to track how many shuffle map outputs are available.
ShuffleMapStage
is an input for the other following stages in the DAG of stages and is also called a shuffle dependency’s map side.
A ShuffleMapStage
may contain multiple pipelined operations, e.g. map
and filter
, before shuffle operation.
A single ShuffleMapStage
can be shared across different jobs.
Name | Description |
---|---|
ActiveJobs associated with the A new The list of |
|
Tracks MapStatuses for each partition. There could be many When The size of |
|
The number of available outputs for the partitions of the
|
Creating ShuffleMapStage Instance
ShuffleMapStage
takes the following when created:
-
id
identifier -
rdd
— theRDD
ofShuffleDependency
-
numTasks
— the number of tasks (that is exactly the number of partitions in therdd
) -
parents
— the collection of parent Stages -
firstJobId
— the ActiveJob that created it -
callSite
— the creationSite of the RDD -
shuffleDep
— ShuffleDependency (from the logical execution plan)
ShuffleMapStage
initializes the internal registries and counters.
Note
|
DAGScheduler tracks the number of ShuffleMapStage created so far.
|
Note
|
ShuffleMapStage is created only when DAGScheduler creates one for a ShuffleDependency .
|
Registering MapStatus For Partition — addOutputLoc
Method
addOutputLoc
adds the input status
to the output locations for the input partition
.
addOutputLoc
increments _numAvailableOutputs
internal counter if the input MapStatus
is the first result for the partition
.
Note
|
addOutputLoc is used when DAGScheduler creates a ShuffleMapStage for a ShuffleDependency and a ActiveJob (and MapOutputTrackerMaster tracks some output locations of the ShuffleDependency ) and when ShuffleMapTask has finished.
|
Removing MapStatus For Partition And BlockManager — removeOutputLoc
Method
removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit
removeOutputLoc
removes the MapStatus
for the input partition
and bmAddress
BlockManager from the output locations.
removeOutputLoc
decrements _numAvailableOutputs
internal counter if the the removed MapStatus
was the last result for the partition
.
Note
|
removeOutputLoc is exclusively used when a Task has failed with FetchFailed exception.
|
Finding Missing Partitions — findMissingPartitions
Method
findMissingPartitions(): Seq[Int]
Note
|
findMissingPartitions is a part of Stage contract that returns the partitions that are missing, i.e. are yet to be computed.
|
Internally, findMissingPartitions
uses outputLocs
internal registry to find indices with empty lists of MapStatus
.
ShuffleMapStage Sharing
A ShuffleMapStage
can be shared across multiple jobs, if these jobs reuse the same RDDs.
When a ShuffleMapStage
is submitted to DAGScheduler to execute, getShuffleMapStage
is called.
scala> val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey() (1)
scala> rdd.count (2)
scala> rdd.count (3)
-
Shuffle at
sortByKey()
-
Submits a job with two stages with two being executed
-
Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed
Returning Number of Available Shuffle Map Outputs — numAvailableOutputs
Method
numAvailableOutputs: Int
numAvailableOutputs
returns _numAvailableOutputs internal registry.
Note
|
numAvailableOutputs is used exclusively when DAGScheduler submits missing tasks for ShuffleMapStage (and only to print a DEBUG message when the ShuffleMapStage is finished).
|
Returning Collection of Active Jobs — mapStageJobs
Method
mapStageJobs: Seq[ActiveJob]
mapStageJobs
returns _mapStageJobs internal registry.
Note
|
mapStageJobs is used exclusively when DAGScheduler is notified that a ShuffleMapTask has finished successfully (and the task made ShuffleMapStage completed and so marks any map-stage jobs waiting on this stage as finished).
|
Registering Job (that Computes ShuffleDependency) — addActiveJob
Method
addActiveJob(job: ActiveJob): Unit
addActiveJob
registers the input ActiveJob in _mapStageJobs internal registry.
Note
|
The ActiveJob is added as the first element in _mapStageJobs .
|
Note
|
addActiveJob is used exclusively when DAGScheduler is notified that a ShuffleDependency was submitted (and so a new ActiveJob is created to compute it).
|
Deregistering Job — removeActiveJob
Method
removeActiveJob(job: ActiveJob): Unit
removeActiveJob
removes a ActiveJob
from _mapStageJobs internal registry.
Note
|
removeActiveJob is used exclusively when DAGScheduler cleans up after ActiveJob has finished (regardless of the outcome).
|
Removing All Shuffle Outputs Registered for Lost Executor — removeOutputsOnExecutor
Method
removeOutputsOnExecutor(execId: String): Unit
removeOutputsOnExecutor
removes all MapStatuses
with the input execId
executor from the outputLocs internal registry (of MapStatuses
per partition).
If the input execId
had the last registered MapStatus
for a partition, removeOutputsOnExecutor
decrements _numAvailableOutputs counter and you should see the following INFO message in the logs:
INFO [stage] is now unavailable on executor [execId] ([_numAvailableOutputs]/[numPartitions], [isAvailable])
Note
|
removeOutputsOnExecutor is used exclusively when DAGScheduler cleans up after a lost executor.
|
Preparing Shuffle Map Outputs in MapOutputTracker Format — outputLocInMapOutputTrackerFormat
Method
outputLocInMapOutputTrackerFormat(): Array[MapStatus]
outputLocInMapOutputTrackerFormat
returns the first (if available) element for every partition from outputLocs internal registry. If there is no entry for a partition, that position is filled with null
.
Note
|
In both cases, |