log4j.logger.org.apache.spark.shuffle.sort.UnsafeShuffleWriter=DEBUG
UnsafeShuffleWriter — ShuffleWriter for SerializedShuffleHandle
UnsafeShuffleWriter is a ShuffleWriter that is used to write records (i.e. key-value pairs).
UnsafeShuffleWriter is chosen when SortShuffleManager is requested for a ShuffleWriter for a SerializedShuffleHandle.
UnsafeShuffleWriter can use a specialized NIO-based merge procedure that avoids extra serialization/deserialization.
| Name | Initial Value | Description |
|---|---|---|
|
(uninitialized) |
Initialized when Used when |
|
Tip
|
Enable Add the following line to Refer to Logging. |
mergeSpillsWithTransferTo Method
|
Caution
|
FIXME |
forceSorterToSpill Method
|
Caution
|
FIXME |
mergeSpills Method
|
Caution
|
FIXME |
updatePeakMemoryUsed Method
|
Caution
|
FIXME |
Writing Records — write Method
void write(Iterator<Product2<K, V>> records) throws IOException
|
Note
|
write is a part of ShuffleWriter contract.
|
Internally, write traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write closes internal resources and writes spill files merged.
In the end, write requests ShuffleExternalSorter to clean after itself.
|
Caution
|
FIXME |
Stopping UnsafeShuffleWriter — stop Method
Option<MapStatus> stop(boolean success)
|
Caution
|
FIXME |
|
Note
|
stop is a part of ShuffleWriter contract.
|
Creating UnsafeShuffleWriter Instance
UnsafeShuffleWriter takes the following when created:
-
IndexShuffleBlockResolver -
mapId
UnsafeShuffleWriter makes sure that the number of shuffle output partitions (of the ShuffleDependency of the input SerializedShuffleHandle) is at most (1 << 24) - 1, i.e. 16777215.
|
Note
|
The number of shuffle output partitions is first enforced when SortShuffleManager checks if SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter).
|
UnsafeShuffleWriter uses spark.file.transferTo and spark.shuffle.sort.initialBufferSize Spark properties to initialize transferToEnabled and initialSortBufferSize attributes, respectively.
If the number of shuffle output partitions is greater than the maximum, UnsafeShuffleWriter throws a IllegalArgumentException.
UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions
|
Note
|
UnsafeShuffleWriter is created exclusively when SortShuffleManager selects a ShuffleWriter (for a SerializedShuffleHandle).
|
Opening UnsafeShuffleWriter (i.e. Creating ShuffleExternalSorter and SerializationStream) — open Internal Method
void open() throws IOException
open makes sure that the internal reference to ShuffleExternalSorter (as sorter) is not defined and creates one itself.
open creates a new byte array output stream (as serBuffer) with the buffer capacity of 1M.
open creates a new SerializationStream for the new byte array output stream using SerializerInstance.
|
Note
|
SerializerInstance was defined when UnsafeShuffleWriter was created (and is exactly the one used to create the ShuffleDependency).
|
|
Note
|
open is used exclusively when UnsafeShuffleWriter is created.
|
Inserting Record Into ShuffleExternalSorter — insertRecordIntoSorter Method
void insertRecordIntoSorter(Product2<K, V> record)
throws IOException
insertRecordIntoSorter calculates the partition for the key of the input record.
|
Note
|
Partitioner is defined when UnsafeShuffleWriter is created.
|
insertRecordIntoSorter then writes the key and the value of the input record to SerializationStream and calculates the size of the serialized buffer.
|
Note
|
SerializationStream is created when UnsafeShuffleWriter opens.
|
In the end, insertRecordIntoSorter inserts the serialized buffer to ShuffleExternalSorter (as Platform.BYTE_ARRAY_OFFSET ).
|
Note
|
ShuffleExternalSorter is created when UnsafeShuffleWriter opens.
|
|
Note
|
insertRecordIntoSorter is used exclusively when UnsafeShuffleWriter writes records.
|
Closing Internal Resources and Writing Spill Files Merged — closeAndWriteOutput Method
void closeAndWriteOutput() throws IOException
closeAndWriteOutput first updates peak memory used.
closeAndWriteOutput removes the internal ByteArrayOutputStream and SerializationStream.
closeAndWriteOutput requests ShuffleExternalSorter to close itself and return SpillInfo metadata.
closeAndWriteOutput removes the internal ShuffleExternalSorter.
closeAndWriteOutput requests IndexShuffleBlockResolver for the data file for the shuffleId and mapId.
closeAndWriteOutput creates a temporary file to merge spill files, deletes them afterwards, and requests IndexShuffleBlockResolver to write index file and commit.
closeAndWriteOutput creates a MapStatus with the location of the executor’s BlockManager and partition lengths in the merged file.
If there is an issue with deleting spill files, you should see the following ERROR message in the logs:
ERROR Error while deleting spill file [path]
If there is an issue with deleting the temporary file, you should see the following ERROR message in the logs:
ERROR Error while deleting temp file [path]
|
Note
|
closeAndWriteOutput is used exclusively when UnsafeShuffleWriter writes records.
|
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
Controls whether…FIXME |
|
|
Default initial sort buffer size |