log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ERROR
SortShuffleWriter — Fallback ShuffleWriter
SortShuffleWriter
is a ShuffleWriter that is used when SortShuffleManager
returns a ShuffleWriter
for ShuffleHandle
(and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).
Note
|
SortShuffleWriter is parameterized by types for K keys, V values, and C combiner values.
|
Name | Description |
---|---|
NOTE: Since write does not return a value, |
|
Internal flag to mark that |
Tip
|
Enable Add the following line to Refer to Logging. |
Creating SortShuffleWriter Instance
SortShuffleWriter
takes the following when created:
-
mapId
— the mapper task id
Note
|
SortShuffleWriter is created when SortShuffleManager returns a ShuffleWriter for the fallback BaseShuffleHandle .
|
Writing Records Into Shuffle Partitioned File In Disk Store — write
Method
write(records: Iterator[Product2[K, V]]): Unit
Note
|
write is a part of ShuffleWriter contract to write a sequence of records (for a RDD partition).
|
Internally, write
creates a ExternalSorter with the types K, V, C
or K, V, V
depending on mapSideCombine
flag of the ShuffleDependency
being enabled or not, respectively.
Note
|
ShuffleDependency is defined when SortShuffleWriter is created (as the dependency of BaseShuffleHandle ).
|
Note
|
write makes sure that Aggregator is defined for ShuffleDependency when mapSideCombine flag is enabled.
|
write
requests IndexShuffleBlockResolver
for the shuffle data output file (for the ShuffleDependency and mapId
) and creates a temporary file for the shuffle data file in the same directory.
write
creates a ShuffleBlockId (for the ShuffleDependency and mapId
and the special IndexShuffleBlockResolver.NOOP_REDUCE_ID
reduce id).
write
requests IndexShuffleBlockResolver
to write an index file (for the temporary partitioned file).
write
creates a MapStatus
(with the location of the shuffle server that serves the executor’s shuffle files and the sizes of the shuffle partitioned file’s partitions).
Note
|
The newly-created MapStatus is available as mapStatus internal attribute.
|
Note
|
write does not handle exceptions so when they occur, they will break the processing.
|
In the end, write
deletes the temporary partitioned file. You may see the following ERROR message in the logs if write
did not manage to do so:
ERROR Error while deleting temp file [path]
Closing SortShuffleWriter (and Calculating MapStatus) — stop
Method
stop(success: Boolean): Option[MapStatus]
Note
|
stop is a part of ShuffleWriter contract to close itself (and return the last written MapStatus).
|
Otherwise, when stopping flag is already enabled or the input success
is disabled, stop
returns no MapStatus
(i.e. None
).
In the end, stop
stops the ExternalSorter
and increments the shuffle write time task metrics.