UnsafeShuffleWriter — ShuffleWriter for SerializedShuffleHandle

UnsafeShuffleWriter is a ShuffleWriter that is used to write records (i.e. key-value pairs).

UnsafeShuffleWriter can use a specialized NIO-based merge procedure that avoids extra serialization/deserialization.

Table 1. UnsafeShuffleWriter’s Internal Properties
Name Initial Value Description

sorter

(uninitialized)

ShuffleExternalSorter

Initialized when UnsafeShuffleWriter opens (which is when UnsafeShuffleWriter is created) and destroyed when it closes internal resources and writes spill files merged.

Used when UnsafeShuffleWriter inserts a record into ShuffleExternalSorter, writes records, forceSorterToSpill, updatePeakMemoryUsed, closes internal resources and writes spill files merged, stops.

Tip

Enable ERROR or DEBUG logging levels for org.apache.spark.shuffle.sort.UnsafeShuffleWriter logger to see what happens in UnsafeShuffleWriter.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.UnsafeShuffleWriter=DEBUG

Refer to Logging.

mergeSpillsWithTransferTo Method

Caution
FIXME

forceSorterToSpill Method

Caution
FIXME

mergeSpills Method

Caution
FIXME

updatePeakMemoryUsed Method

Caution
FIXME

Writing Records — write Method

void write(Iterator<Product2<K, V>> records) throws IOException
Note
write is a part of ShuffleWriter contract.

Internally, write traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write closes internal resources and writes spill files merged.

Caution
FIXME

Stopping UnsafeShuffleWriter — stop Method

Option<MapStatus> stop(boolean success)
Caution
FIXME
Note
stop is a part of ShuffleWriter contract.

Creating UnsafeShuffleWriter Instance

UnsafeShuffleWriter takes the following when created:

UnsafeShuffleWriter makes sure that the number of shuffle output partitions (of the ShuffleDependency of the input SerializedShuffleHandle) is at most (1 << 24) - 1, i.e. 16777215.

Note
The number of shuffle output partitions is first enforced when SortShuffleManager checks if SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter).

UnsafeShuffleWriter uses spark.file.transferTo and spark.shuffle.sort.initialBufferSize Spark properties to initialize transferToEnabled and initialSortBufferSize attributes, respectively.

If the number of shuffle output partitions is greater than the maximum, UnsafeShuffleWriter throws a IllegalArgumentException.

UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions
Note
UnsafeShuffleWriter is created exclusively when SortShuffleManager selects a ShuffleWriter (for a SerializedShuffleHandle).

Opening UnsafeShuffleWriter (i.e. Creating ShuffleExternalSorter and SerializationStream) — open Internal Method

void open() throws IOException

open makes sure that the internal reference to ShuffleExternalSorter (as sorter) is not defined and creates one itself.

open creates a new byte array output stream (as serBuffer) with the buffer capacity of 1M.

open creates a new SerializationStream for the new byte array output stream using SerializerInstance.

Note
SerializerInstance was defined when UnsafeShuffleWriter was created (and is exactly the one used to create the ShuffleDependency).
Note
open is used exclusively when UnsafeShuffleWriter is created.

Inserting Record Into ShuffleExternalSorter — insertRecordIntoSorter Method

void insertRecordIntoSorter(Product2<K, V> record)
throws IOException
Note
Partitioner is defined when UnsafeShuffleWriter is created.

insertRecordIntoSorter then writes the key and the value of the input record to SerializationStream and calculates the size of the serialized buffer.

Note
SerializationStream is created when UnsafeShuffleWriter opens.

In the end, insertRecordIntoSorter inserts the serialized buffer to ShuffleExternalSorter (as Platform.BYTE_ARRAY_OFFSET ).

Note
ShuffleExternalSorter is created when UnsafeShuffleWriter opens.
Note
insertRecordIntoSorter is used exclusively when UnsafeShuffleWriter writes records.

Closing Internal Resources and Writing Spill Files Merged — closeAndWriteOutput Method

void closeAndWriteOutput() throws IOException

closeAndWriteOutput first updates peak memory used.

closeAndWriteOutput removes the internal ByteArrayOutputStream and SerializationStream.

closeAndWriteOutput removes the internal ShuffleExternalSorter.

closeAndWriteOutput requests IndexShuffleBlockResolver for the data file for the shuffleId and mapId.

closeAndWriteOutput creates a temporary file to merge spill files, deletes them afterwards, and requests IndexShuffleBlockResolver to write index file and commit.

closeAndWriteOutput creates a MapStatus with the location of the executor’s BlockManager and partition lengths in the merged file.

If there is an issue with deleting spill files, you should see the following ERROR message in the logs:

ERROR Error while deleting spill file [path]

If there is an issue with deleting the temporary file, you should see the following ERROR message in the logs:

ERROR Error while deleting temp file [path]
Note
closeAndWriteOutput is used exclusively when UnsafeShuffleWriter writes records.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.file.transferTo

true

Controls whether…​FIXME

spark.shuffle.sort.initialBufferSize

4096 (bytes)

Default initial sort buffer size

results matching ""

    No results matching ""