log4j.logger.org.apache.spark.MapOutputTrackerMaster=DEBUG
MapOutputTrackerMaster — MapOutputTracker For Driver
MapOutputTrackerMaster
is the MapOutputTracker for the driver.
A MapOutputTrackerMaster
is the source of truth for MapStatus objects (map output locations) per shuffle id (as recorded from ShuffleMapTasks).
Note
|
MapOutputTrackerMaster uses Java’s thread-safe java.util.concurrent.ConcurrentHashMap for mapStatuses internal cache.
|
Note
|
There is currently a hardcoded limit of map and reduce tasks above which Spark does not assign preferred locations aka locality preferences based on map output sizes — 1000 for map and reduce each.
|
MapOutputTrackerMaster
uses MetadataCleaner
with MetadataCleanerType.MAP_OUTPUT_TRACKER
as cleanerType
and cleanup function to drop entries in mapStatuses
.
Name | Description |
---|---|
Internal registry of…FIXME Used when…FIXME |
|
Internal registry of serialized shuffle map output statuses (as Used when…FIXME |
|
Internal registry with…FIXME Used when…FIXME |
|
Internal registry of locks for shuffle ids. Used when…FIXME |
|
Internal queue with Used when NOTE: |
Tip
|
Enable Add the following line to Refer to Logging. |
removeBroadcast
Method
Caution
|
FIXME |
clearCachedBroadcast
Method
Caution
|
FIXME |
post
Method
Caution
|
FIXME |
stop
Method
Caution
|
FIXME |
unregisterMapOutput
Method
Caution
|
FIXME |
cleanup Function for MetadataCleaner
cleanup(cleanupTime: Long)
method removes old entries in mapStatuses
and cachedSerializedStatuses
that have timestamp earlier than cleanupTime
.
It uses org.apache.spark.util.TimeStampedHashMap.clearOldValues
method.
Tip
|
Enable Add the following line to
|
You should see the following DEBUG message in the logs for entries being removed:
DEBUG Removing key [entry.getKey]
Creating MapOutputTrackerMaster Instance
MapOutputTrackerMaster
takes the following when created:
-
broadcastManager
— BroadcastManager -
isLocal
— flag to control whetherMapOutputTrackerMaster
runs in local or on a cluster.
MapOutputTrackerMaster
initializes the internal registries and counters and starts map-output-dispatcher threads.
Note
|
MapOutputTrackerMaster is created when SparkEnv is created.
|
threadpool
Thread Pool with map-output-dispatcher
Threads
threadpool: ThreadPoolExecutor
threadpool
is a daemon fixed thread pool registered with map-output-dispatcher thread name prefix.
threadpool
uses spark.shuffle.mapOutput.dispatcher.numThreads (default: 8
) for the number of MessageLoop
dispatcher threads to process received GetMapOutputMessage
messages.
Note
|
The dispatcher threads are started immediately when MapOutputTrackerMaster is created.
|
Note
|
threadpool is shut down when MapOutputTrackerMaster stops.
|
Finding Preferred BlockManagers with Most Shuffle Map Outputs (For ShuffleDependency and Partition) — getPreferredLocationsForShuffle
Method
getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int): Seq[String]
getPreferredLocationsForShuffle
finds the locations (i.e. BlockManagers) with the most map outputs for the input ShuffleDependency and Partition.
Note
|
getPreferredLocationsForShuffle is simply getLocationsWithLargestOutputs with a guard condition.
|
Internally, getPreferredLocationsForShuffle
checks whether spark.shuffle.reduceLocality.enabled
Spark property is enabled (it is by default) with the number of partitions of the RDD of the input ShuffleDependency
and partitions in the partitioner of the input ShuffleDependency
both being less than 1000
.
Note
|
The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000 and are not configurable.
|
If the condition holds, getPreferredLocationsForShuffle
finds locations with the largest number of shuffle map outputs for the input ShuffleDependency
and partitionId
(with the number of partitions in the partitioner of the input ShuffleDependency
and 0.2
) and returns the hosts of the preferred BlockManagers
.
Note
|
0.2 is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.
|
Note
|
getPreferredLocationsForShuffle is used when ShuffledRDD and ShuffledRowRDD ask for preferred locations for a partition.
|
Incrementing Epoch — incrementEpoch
Method
incrementEpoch(): Unit
incrementEpoch
increments the internal epoch.
You should see the following DEBUG message in the logs:
DEBUG MapOutputTrackerMaster: Increasing epoch to [epoch]
Note
|
incrementEpoch is used when MapOutputTrackerMaster registers map outputs (with changeEpoch flag enabled — it is disabled by default) and unregisters map outputs (for a shuffle, mapper and block manager), and when DAGScheduler is notified that an executor got lost (with filesLost flag enabled).
|
Finding Locations with Largest Number of Shuffle Map Outputs — getLocationsWithLargestOutputs
Method
getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double): Option[Array[BlockManagerId]]
getLocationsWithLargestOutputs
returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold
(given the total size of all the shuffle blocks for the shuffle across all BlockManagers).
Note
|
getLocationsWithLargestOutputs may return no BlockManagerId if their shuffle blocks do not total up above the input fractionThreshold .
|
Note
|
The input numReducers is not used.
|
Internally, getLocationsWithLargestOutputs
queries the mapStatuses internal cache for the input shuffleId
.
Note
|
One entry in |
getLocationsWithLargestOutputs
iterates over the MapStatus
array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.
Note
|
getLocationsWithLargestOutputs is used exclusively when MapOutputTrackerMaster finds the preferred locations (BlockManagers and hence executors) for a shuffle.
|
Requesting Tracking Status of Shuffle Map Output — containsShuffle
Method
containsShuffle(shuffleId: Int): Boolean
containsShuffle
checks if the input shuffleId
is registered in the cachedSerializedStatuses or mapStatuses internal caches.
Note
|
containsShuffle is used exclusively when DAGScheduler creates a ShuffleMapStage (for ShuffleDependency and ActiveJob).
|
Registering ShuffleDependency — registerShuffle
Method
registerShuffle(shuffleId: Int, numMaps: Int): Unit
registerShuffle
registers the input shuffleId
in the mapStatuses internal cache.
Note
|
The number of MapStatus entries in the new array in mapStatuses internal cache is exactly the input numMaps .
|
registerShuffle
adds a lock in the shuffleIdLocks
internal registry (without using it).
If the shuffleId
has already been registered, registerShuffle
throws a IllegalArgumentException
with the following message:
Shuffle ID [id] registered twice
Note
|
registerShuffle is used exclusively when DAGScheduler creates a ShuffleMapStage (for ShuffleDependency and ActiveJob).
|
Registering Map Outputs for Shuffle (Possibly with Epoch Change) — registerMapOutputs
Method
registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
changeEpoch: Boolean = false): Unit
registerMapOutputs
registers the input statuses
(as the shuffle map output) with the input shuffleId
in the mapStatuses internal cache.
registerMapOutputs
increments epoch if the input changeEpoch
is enabled (it is not by default).
Note
|
In both cases, the input |
Finding Serialized Map Output Statuses (And Possibly Broadcasting Them) — getSerializedMapOutputStatuses
Method
getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte]
getSerializedMapOutputStatuses
finds cached serialized map statuses for the input shuffleId
.
If found, getSerializedMapOutputStatuses
returns the cached serialized map statuses.
Otherwise, getSerializedMapOutputStatuses
acquires the shuffle lock for shuffleId
and finds cached serialized map statuses again since some other thread could not update the cachedSerializedStatuses internal cache.
getSerializedMapOutputStatuses
returns the serialized map statuses if found.
If not, getSerializedMapOutputStatuses
serializes the local array of MapStatuses
(from checkCachedStatuses).
You should see the following INFO message in the logs:
INFO Size of output statuses for shuffle [shuffleId] is [bytes] bytes
getSerializedMapOutputStatuses
saves the serialized map output statuses in cachedSerializedStatuses internal cache if the epoch has not changed in the meantime. getSerializedMapOutputStatuses
also saves its broadcast version in cachedSerializedBroadcast internal cache.
If the epoch has changed in the meantime, the serialized map output statuses and their broadcast version are not saved, and you should see the following INFO message in the logs:
INFO Epoch changed, not caching!
getSerializedMapOutputStatuses
removes the broadcast.
getSerializedMapOutputStatuses
returns the serialized map statuses.
Note
|
getSerializedMapOutputStatuses is used when MapOutputTrackerMaster responds to GetMapOutputMessage requests and DAGScheduler creates ShuffleMapStage for ShuffleDependency (copying the shuffle map output locations from previous jobs to avoid unnecessarily regenerating data).
|
Finding Cached Serialized Map Statuses — checkCachedStatuses
Internal Method
checkCachedStatuses(): Boolean
checkCachedStatuses
is an internal helper method that getSerializedMapOutputStatuses uses to do some bookkeeping (when the epoch and cacheEpoch differ) and set local statuses
, retBytes
and epochGotten
(that getSerializedMapOutputStatuses
uses).
Internally, checkCachedStatuses
acquires the epochLock
lock and checks the status of epoch to cached cacheEpoch
.
If epoch
is younger (i.e. greater), checkCachedStatuses
clears cachedSerializedStatuses internal cache, cached broadcasts and sets cacheEpoch
to be epoch
.
checkCachedStatuses
gets the serialized map output statuses for the shuffleId
(of the owning getSerializedMapOutputStatuses).
When the serialized map output status is found, checkCachedStatuses
saves it in a local retBytes
and returns true
.
When not found, you should see the following DEBUG message in the logs:
DEBUG cached status not found for : [shuffleId]
checkCachedStatuses
uses mapStatuses internal cache to get map output statuses for the shuffleId
(of the owning getSerializedMapOutputStatuses) or falls back to an empty array and sets it to a local statuses
. checkCachedStatuses
sets the local epochGotten
to the current epoch and returns false
.
MessageLoop
Dispatcher Thread
MessageLoop
is a dispatcher thread that, once started, runs indefinitely until PoisonPill arrives.
MessageLoop
takes GetMapOutputMessage
messages off mapOutputRequests internal queue (waiting if necessary until a message becomes available).
Unless PoisonPill
is processed, you should see the following DEBUG message in the logs:
DEBUG Handling request to send map output locations for shuffle [shuffleId] to [hostPort]
MessageLoop
replies back with serialized map output statuses for the shuffleId
(from the incoming GetMapOutputMessage
message).
Note
|
MessageLoop is created and executed immediately when MapOutputTrackerMaster is created.
|
PoisonPill Message
PoisonPill
is a GetMapOutputMessage
(with -99
as shuffleId
) that indicates that MessageLoop should exit its message loop.
PoisonPill
is posted when MapOutputTrackerMaster
stops.
Settings
Spark Property | Default Value | Description |
---|---|---|
|
||
|
||
|
Controls whether to compute locality preferences for reduce tasks. When enabled (i.e. |