SortShuffleWriter — Fallback ShuffleWriter

SortShuffleWriter is a ShuffleWriter that is used when SortShuffleManager returns a ShuffleWriter for ShuffleHandle (and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).

Note
SortShuffleWriter is parameterized by types for K keys, V values, and C combiner values.
Table 1. SortShuffleWriter’s Internal Registries and Counters
Name Description

mapStatus

MapStatus the SortShuffleWriter has recently persisted (as a shuffle partitioned file in disk store).

NOTE: Since write does not return a value, mapStatus attribute is used to be returned when SortShuffleWriter is closed.

stopping

Internal flag to mark that SortShuffleWriter is closed.

Tip

Enable ERROR logging level for org.apache.spark.shuffle.sort.SortShuffleWriter logger to see what happens in SortShuffleWriter.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ERROR

Refer to Logging.

Creating SortShuffleWriter Instance

SortShuffleWriter takes the following when created:

  1. IndexShuffleBlockResolver

  2. BaseShuffleHandle

  3. mapId — the mapper task id

  4. TaskContext

Writing Records Into Shuffle Partitioned File In Disk Store — write Method

write(records: Iterator[Product2[K, V]]): Unit
Note
write is a part of ShuffleWriter contract to write a sequence of records (for a RDD partition).

Internally, write creates a ExternalSorter with the types K, V, C or K, V, V depending on mapSideCombine flag of the ShuffleDependency being enabled or not, respectively.

Note
write makes sure that Aggregator is defined for ShuffleDependency when mapSideCombine flag is enabled.

write requests IndexShuffleBlockResolver for the shuffle data output file (for the ShuffleDependency and mapId) and creates a temporary file for the shuffle data file in the same directory.

write creates a ShuffleBlockId (for the ShuffleDependency and mapId and the special IndexShuffleBlockResolver.NOOP_REDUCE_ID reduce id).

write requests IndexShuffleBlockResolver to write an index file (for the temporary partitioned file).

write creates a MapStatus (with the location of the shuffle server that serves the executor’s shuffle files and the sizes of the shuffle partitioned file’s partitions).

Note
The newly-created MapStatus is available as mapStatus internal attribute.
Note
write does not handle exceptions so when they occur, they will break the processing.

In the end, write deletes the temporary partitioned file. You may see the following ERROR message in the logs if write did not manage to do so:

ERROR Error while deleting temp file [path]

Closing SortShuffleWriter (and Calculating MapStatus) — stop Method

stop(success: Boolean): Option[MapStatus]
Note
stop is a part of ShuffleWriter contract to close itself (and return the last written MapStatus).

stop turns stopping flag on and returns the internal mapStatus if the input success is enabled.

Otherwise, when stopping flag is already enabled or the input success is disabled, stop returns no MapStatus (i.e. None).

In the end, stop stops the ExternalSorter and increments the shuffle write time task metrics.

results matching ""

    No results matching ""