MapOutputTrackerMaster — MapOutputTracker For Driver

MapOutputTrackerMaster is the MapOutputTracker for the driver.

A MapOutputTrackerMaster is the source of truth for MapStatus objects (map output locations) per shuffle id (as recorded from ShuffleMapTasks).

Note
MapOutputTrackerMaster uses Java’s thread-safe java.util.concurrent.ConcurrentHashMap for mapStatuses internal cache.
Note
There is currently a hardcoded limit of map and reduce tasks above which Spark does not assign preferred locations aka locality preferences based on map output sizes — 1000 for map and reduce each.

MapOutputTrackerMaster uses MetadataCleaner with MetadataCleanerType.MAP_OUTPUT_TRACKER as cleanerType and cleanup function to drop entries in mapStatuses.

Table 1. MapOutputTrackerMaster Internal Registries and Counters
Name Description

cachedSerializedBroadcast

Internal registry of…​FIXME

Used when…​FIXME

cachedSerializedStatuses

Internal registry of serialized shuffle map output statuses (as Array[Byte]) per…​FIXME

Used when…​FIXME

cacheEpoch

Internal registry with…​FIXME

Used when…​FIXME

shuffleIdLocks

Internal registry of locks for shuffle ids.

Used when…​FIXME

mapOutputRequests

Internal queue with GetMapOutputMessage requests for map output statuses.

Used when MapOutputTrackerMaster posts GetMapOutputMessage messages to and take one head element off this queue.

NOTE: mapOutputRequests uses Java’s java.util.concurrent.LinkedBlockingQueue.

Tip

Enable INFO or DEBUG logging level for org.apache.spark.MapOutputTrackerMaster logger to see what happens in MapOutputTrackerMaster.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.MapOutputTrackerMaster=DEBUG

Refer to Logging.

removeBroadcast Method

Caution
FIXME

clearCachedBroadcast Method

Caution
FIXME

post Method

Caution
FIXME

stop Method

Caution
FIXME

unregisterMapOutput Method

Caution
FIXME

cleanup Function for MetadataCleaner

cleanup(cleanupTime: Long) method removes old entries in mapStatuses and cachedSerializedStatuses that have timestamp earlier than cleanupTime.

It uses org.apache.spark.util.TimeStampedHashMap.clearOldValues method.

Tip

Enable DEBUG logging level for org.apache.spark.util.TimeStampedHashMap logger to see what happens in TimeStampedHashMap.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.TimeStampedHashMap=DEBUG

You should see the following DEBUG message in the logs for entries being removed:

DEBUG Removing key [entry.getKey]

Creating MapOutputTrackerMaster Instance

MapOutputTrackerMaster takes the following when created:

  1. SparkConf

  2. broadcastManager — BroadcastManager

  3. isLocal — flag to control whether MapOutputTrackerMaster runs in local or on a cluster.

MapOutputTrackerMaster initializes the internal registries and counters and starts map-output-dispatcher threads.

Note
MapOutputTrackerMaster is created when SparkEnv is created.

threadpool Thread Pool with map-output-dispatcher Threads

threadpool: ThreadPoolExecutor

threadpool is a daemon fixed thread pool registered with map-output-dispatcher thread name prefix.

threadpool uses spark.shuffle.mapOutput.dispatcher.numThreads (default: 8) for the number of MessageLoop dispatcher threads to process received GetMapOutputMessage messages.

Note
The dispatcher threads are started immediately when MapOutputTrackerMaster is created.
Note
threadpool is shut down when MapOutputTrackerMaster stops.

Finding Preferred BlockManagers with Most Shuffle Map Outputs (For ShuffleDependency and Partition) — getPreferredLocationsForShuffle Method

getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int): Seq[String]

getPreferredLocationsForShuffle finds the locations (i.e. BlockManagers) with the most map outputs for the input ShuffleDependency and Partition.

Note
getPreferredLocationsForShuffle is simply getLocationsWithLargestOutputs with a guard condition.

Internally, getPreferredLocationsForShuffle checks whether spark.shuffle.reduceLocality.enabled Spark property is enabled (it is by default) with the number of partitions of the RDD of the input ShuffleDependency and partitions in the partitioner of the input ShuffleDependency both being less than 1000.

Note
The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000 and are not configurable.

If the condition holds, getPreferredLocationsForShuffle finds locations with the largest number of shuffle map outputs for the input ShuffleDependency and partitionId (with the number of partitions in the partitioner of the input ShuffleDependency and 0.2) and returns the hosts of the preferred BlockManagers.

Note
0.2 is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.
Note
getPreferredLocationsForShuffle is used when ShuffledRDD and ShuffledRowRDD ask for preferred locations for a partition.

Incrementing Epoch — incrementEpoch Method

incrementEpoch(): Unit

incrementEpoch increments the internal epoch.

You should see the following DEBUG message in the logs:

DEBUG MapOutputTrackerMaster: Increasing epoch to [epoch]
Note
incrementEpoch is used when MapOutputTrackerMaster registers map outputs (with changeEpoch flag enabled — it is disabled by default) and unregisters map outputs (for a shuffle, mapper and block manager), and when DAGScheduler is notified that an executor got lost (with filesLost flag enabled).

Finding Locations with Largest Number of Shuffle Map Outputs — getLocationsWithLargestOutputs Method

getLocationsWithLargestOutputs(
  shuffleId: Int,
  reducerId: Int,
  numReducers: Int,
  fractionThreshold: Double): Option[Array[BlockManagerId]]

getLocationsWithLargestOutputs returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold (given the total size of all the shuffle blocks for the shuffle across all BlockManagers).

Note
getLocationsWithLargestOutputs may return no BlockManagerId if their shuffle blocks do not total up above the input fractionThreshold.
Note
The input numReducers is not used.

Internally, getLocationsWithLargestOutputs queries the mapStatuses internal cache for the input shuffleId.

Note

One entry in mapStatuses internal cache is a MapStatus array indexed by partition id.

getLocationsWithLargestOutputs iterates over the MapStatus array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.

Requesting Tracking Status of Shuffle Map Output — containsShuffle Method

containsShuffle(shuffleId: Int): Boolean

containsShuffle checks if the input shuffleId is registered in the cachedSerializedStatuses or mapStatuses internal caches.

Note
containsShuffle is used exclusively when DAGScheduler creates a ShuffleMapStage (for ShuffleDependency and ActiveJob).

Registering ShuffleDependency — registerShuffle Method

registerShuffle(shuffleId: Int, numMaps: Int): Unit

registerShuffle registers the input shuffleId in the mapStatuses internal cache.

Note
The number of MapStatus entries in the new array in mapStatuses internal cache is exactly the input numMaps.

registerShuffle adds a lock in the shuffleIdLocks internal registry (without using it).

If the shuffleId has already been registered, registerShuffle throws a IllegalArgumentException with the following message:

Shuffle ID [id] registered twice
Note
registerShuffle is used exclusively when DAGScheduler creates a ShuffleMapStage (for ShuffleDependency and ActiveJob).

Registering Map Outputs for Shuffle (Possibly with Epoch Change) — registerMapOutputs Method

registerMapOutputs(
  shuffleId: Int,
  statuses: Array[MapStatus],
  changeEpoch: Boolean = false): Unit

registerMapOutputs registers the input statuses (as the shuffle map output) with the input shuffleId in the mapStatuses internal cache.

registerMapOutputs increments epoch if the input changeEpoch is enabled (it is not by default).

Note

registerMapOutputs is used when DAGScheduler handles successful ShuffleMapTask completion and executor lost events.

In both cases, the input changeEpoch is enabled.

Finding Serialized Map Output Statuses (And Possibly Broadcasting Them) — getSerializedMapOutputStatuses Method

getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte]

getSerializedMapOutputStatuses finds cached serialized map statuses for the input shuffleId.

If found, getSerializedMapOutputStatuses returns the cached serialized map statuses.

Otherwise, getSerializedMapOutputStatuses acquires the shuffle lock for shuffleId and finds cached serialized map statuses again since some other thread could not update the cachedSerializedStatuses internal cache.

getSerializedMapOutputStatuses returns the serialized map statuses if found.

If not, getSerializedMapOutputStatuses serializes the local array of MapStatuses (from checkCachedStatuses).

You should see the following INFO message in the logs:

INFO Size of output statuses for shuffle [shuffleId] is [bytes] bytes

getSerializedMapOutputStatuses saves the serialized map output statuses in cachedSerializedStatuses internal cache if the epoch has not changed in the meantime. getSerializedMapOutputStatuses also saves its broadcast version in cachedSerializedBroadcast internal cache.

If the epoch has changed in the meantime, the serialized map output statuses and their broadcast version are not saved, and you should see the following INFO message in the logs:

INFO Epoch changed, not caching!

getSerializedMapOutputStatuses removes the broadcast.

getSerializedMapOutputStatuses returns the serialized map statuses.

Note
getSerializedMapOutputStatuses is used when MapOutputTrackerMaster responds to GetMapOutputMessage requests and DAGScheduler creates ShuffleMapStage for ShuffleDependency (copying the shuffle map output locations from previous jobs to avoid unnecessarily regenerating data).

Finding Cached Serialized Map Statuses — checkCachedStatuses Internal Method

checkCachedStatuses(): Boolean

checkCachedStatuses is an internal helper method that getSerializedMapOutputStatuses uses to do some bookkeeping (when the epoch and cacheEpoch differ) and set local statuses, retBytes and epochGotten (that getSerializedMapOutputStatuses uses).

Internally, checkCachedStatuses acquires the epochLock lock and checks the status of epoch to cached cacheEpoch.

If epoch is younger (i.e. greater), checkCachedStatuses clears cachedSerializedStatuses internal cache, cached broadcasts and sets cacheEpoch to be epoch.

checkCachedStatuses gets the serialized map output statuses for the shuffleId (of the owning getSerializedMapOutputStatuses).

When the serialized map output status is found, checkCachedStatuses saves it in a local retBytes and returns true.

When not found, you should see the following DEBUG message in the logs:

DEBUG cached status not found for : [shuffleId]

checkCachedStatuses uses mapStatuses internal cache to get map output statuses for the shuffleId (of the owning getSerializedMapOutputStatuses) or falls back to an empty array and sets it to a local statuses. checkCachedStatuses sets the local epochGotten to the current epoch and returns false.

MessageLoop Dispatcher Thread

MessageLoop is a dispatcher thread that, once started, runs indefinitely until PoisonPill arrives.

MessageLoop takes GetMapOutputMessage messages off mapOutputRequests internal queue (waiting if necessary until a message becomes available).

Unless PoisonPill is processed, you should see the following DEBUG message in the logs:

DEBUG Handling request to send map output locations for shuffle [shuffleId] to [hostPort]

MessageLoop replies back with serialized map output statuses for the shuffleId (from the incoming GetMapOutputMessage message).

Note
MessageLoop is created and executed immediately when MapOutputTrackerMaster is created.

PoisonPill Message

PoisonPill is a GetMapOutputMessage (with -99 as shuffleId) that indicates that MessageLoop should exit its message loop.

PoisonPill is posted when MapOutputTrackerMaster stops.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.shuffle.mapOutput.dispatcher.numThreads

8

FIXME

spark.shuffle.mapOutput.minSizeForBroadcast

512k

FIXME

spark.shuffle.reduceLocality.enabled

true

Controls whether to compute locality preferences for reduce tasks.

When enabled (i.e. true), MapOutputTrackerMaster computes the preferred hosts on which to run a given map output partition in a given shuffle, i.e. the nodes that the most outputs for that partition are on.

results matching ""

    No results matching ""