log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager$=DEBUG
SortShuffleManager — The Default (And Only) Sort-Based Shuffle System
SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort.
|
Note
|
You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract. |
|
Caution
|
FIXME The internal registries |
| Name | Description |
|---|---|
IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning NOTE: Beside the uses due to the contract, |
|
Tip
|
Enable Add the following line to Refer to Logging. |
unregisterShuffle Method
|
Caution
|
FIXME |
Creating SortShuffleManager Instance
SortShuffleManager takes a SparkConf.
SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:
WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.
SortShuffleManager initializes the internal registries and counters.
|
Note
|
SortShuffleManager is created when SparkEnv is created (on the driver and executors) which is at the very beginning of a Spark application’s lifecycle.
|
Creating ShuffleHandle (For ShuffleDependency) — registerShuffle Method
registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
|
Note
|
registerShuffle is a part of ShuffleManager contract.
|
|
Caution
|
FIXME Copy the conditions |
registerShuffle returns a new ShuffleHandle that can be one of the following:
-
BypassMergeSortShuffleHandle (with
ShuffleDependency[K, V, V]) whenshouldBypassMergeSortcondition holds. -
SerializedShuffleHandle (with
ShuffleDependency[K, V, V]) whencanUseSerializedShufflecondition holds.
Selecting ShuffleWriter For ShuffleHandle — getWriter Method
getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
|
Note
|
getWriter is a part of ShuffleManager contract.
|
Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.
|
Caution
|
FIXME Associated?! What’s that? |
|
Note
|
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.
|
getWriter then returns a new ShuffleWriter for the input ShuffleHandle:
Creating BlockStoreShuffleReader For ShuffleHandle — getReader Method
getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
|
Note
|
getReader is a part of ShuffleManager contract.
|
getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.
|
Note
|
getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.
|
Stopping SortShuffleManager — stop Method
stop(): Unit
|
Note
|
stop is a part of ShuffleManager contract.
|
stop stops IndexShuffleBlockResolver (available as shuffleBlockResolver internal reference).
Considering BypassMergeSortShuffleHandle for ShuffleHandle — shouldBypassMergeSort Method
shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean
shouldBypassMergeSort holds (i.e. is positive) when:
-
The input ShuffleDependency has
mapSideCombineflag enabled andaggregatordefined. -
mapSideCombineflag is disabled (i.e.false) but the number of partitions (of thePartitionerof the inputShuffleDependency) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to200).
Otherwise, shouldBypassMergeSort does not hold (i.e. false).
|
Note
|
shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).
|
Considering SerializedShuffleHandle for ShuffleHandle — canUseSerializedShuffle Method
canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean
canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):
-
The
Serializerof the inputShuffleDependencysupports relocation of serialized objects. -
The
Aggregatorof the inputShuffleDependencyis not defined. -
The number of shuffle output partitions of the input
ShuffleDependencyis at most the supported maximum number (which is(1 << 24) - 1, i.e.16777215).
You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:
DEBUG Can use serialized shuffle for shuffle [id]
Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:
DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle [id] because an aggregator is defined
DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
|
Note
|
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).
|
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
The maximum number of reduce partitions below which |
|
|
No longer in use. When
|