BypassMergeSortShuffleWriter

spark BypassMergeSortShuffleWriter write.png
Figure 1. BypassMergeSortShuffleWriter writing records (for ShuffleMapTask) using DiskBlockObjectWriters
Table 1. BypassMergeSortShuffleWriter’s Internal Registries and Counters
Name Description

numPartitions

FIXME

partitionWriters

FIXME

partitionWriterSegments

FIXME

shuffleBlockResolver

IndexShuffleBlockResolver.

Initialized when BypassMergeSortShuffleWriter is created.

Used when BypassMergeSortShuffleWriter writes records.

mapStatus

MapStatus that BypassMergeSortShuffleWriter returns when stopped

Initialized every time BypassMergeSortShuffleWriter writes records.

Used when BypassMergeSortShuffleWriter stops (with success enabled) as a marker if any records were written and returned if they did.

partitionLengths

Temporary array of partition lengths after records are written to a shuffle system.

Initialized every time BypassMergeSortShuffleWriter writes records before passing it in to IndexShuffleBlockResolver). After IndexShuffleBlockResolver finishes, it is used to initialize mapStatus internal property.

transferToEnabled

Internal flag that controls the use of Java New I/O when BypassMergeSortShuffleWriter concatenates per-partition shuffle files into a single shuffle block data file.

Specified when BypassMergeSortShuffleWriter is created and controlled by spark.file.transferTo Spark property. Enabled by default.

Tip

Enable ERROR logging level for org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter logger to see what happens in BypassMergeSortShuffleWriter.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter=ERROR

Refer to Logging.

Creating BypassMergeSortShuffleWriter Instance

BypassMergeSortShuffleWriter takes the following when created:

BypassMergeSortShuffleWriter uses spark.shuffle.file.buffer (for fileBufferSize as 32k by default) and spark.file.transferTo (for transferToEnabled internal flag which is enabled by default) Spark properties.

BypassMergeSortShuffleWriter initializes the internal registries and counters.

Writing Records (Into One Single Shuffle Block Data File) — write Method

void write(Iterator<Product2<K, V>> records) throws IOException
Note
write is a part of ShuffleWriter contract to write a sequence of records to a shuffle system.

Internally, when the input records iterator has no more records, write creates an empty partitionLengths internal array of numPartitions size.

write then requests the internal IndexShuffleBlockResolver to write shuffle index and data files (with dataTmp as null) and sets the internal mapStatus (with the address of BlockManager in use and partitionLengths).

However, when there are records to write, write creates a new Serializer.

Note
Serializer was specified when BypassMergeSortShuffleWriter was created and is exactly the Serializer of the ShuffleDependency.

write initializes partitionWriters internal array of DiskBlockObjectWriters for every partition.

Note
write uses BlockManager to access DiskBlockManager. BlockManager was specified when BypassMergeSortShuffleWriter was created.

write requests BlockManager for a DiskBlockObjectWriter (for the temporary blockId and file, SerializerInstance, fileBufferSize and writeMetrics).

write initializes partitionWriterSegments with FileSegment for every partition.

write takes records serially, i.e. record by record, and, after computing the partition for a key, requests the corresponding DiskBlockObjectWriter to write them.

Note
write uses partitionWriters internal array of DiskBlockObjectWriter indexed by partition number.
Note
write initializes partitionWriters with numPartitions number of DiskBlockObjectWriters.

After all the records have been written, write requests every DiskBlockObjectWriter to commitAndGet and saves the commit results in partitionWriterSegments. write closes every DiskBlockObjectWriter.

Note
IndexShuffleBlockResolver was defined when BypassMergeSortShuffleWriter was created.

write creates a temporary shuffle block data file and writes the per-partition shuffle files to it.

Note
This is the moment when BypassMergeSortShuffleWriter concatenates per-partition shuffle file segments into one single map shuffle data file.

In the end, write requests IndexShuffleBlockResolver to write shuffle index and data files for the shuffleId and mapId (with partitionLengths and the temporary file) and creates a new mapStatus (with the location of the BlockManager and partitionLengths).

Concatenating Per-Partition Files Into Single File (and Tracking Write Time) — writePartitionedFile Internal Method

long[] writePartitionedFile(File outputFile) throws IOException

writePartitionedFile creates a file output stream for the input outputFile in append mode.

Note
writePartitionedFile uses Java’s java.io.FileOutputStream to create a file output stream.

writePartitionedFile starts tracking write time (as writeStartTime).

For every numPartitions partition, writePartitionedFile takes the file from the FileSegment (from partitionWriterSegments) and creates a file input stream to read raw bytes.

Note
writePartitionedFile uses Java’s java.io.FileInputStream to create a file input stream.

writePartitionedFile then copies the raw bytes from each partition segment input stream to outputFile (possibly using Java New I/O per transferToEnabled flag set when BypassMergeSortShuffleWriter was created) and records the length of the shuffle data file (in lengths internal array).

Note
transferToEnabled is controlled by spark.file.transferTo Spark property and is enabled (i.e. true) by default.

In the end, writePartitionedFile increments shuffle write time, clears partitionWriters array and returns the lengths of the shuffle data files per partition.

Note
writePartitionedFile uses ShuffleWriteMetrics to track shuffle write time that was created when BypassMergeSortShuffleWriter was created.
Note
writePartitionedFile is used exclusively when BypassMergeSortShuffleWriter writes records.

Copying Raw Bytes Between Input Streams (Possibly Using Java New I/O) — Utils.copyStream Method

copyStream(
  in: InputStream,
  out: OutputStream,
  closeStreams: Boolean = false,
  transferToEnabled: Boolean = false): Long

copyStream branches off depending on the type of in and out streams, i.e. whether they are both FileInputStream with transferToEnabled input flag is enabled.

If they are both FileInputStream with transferToEnabled enabled, copyStream gets their FileChannels and transfers bytes from the input file to the output file and counts the number of bytes, possibly zero, that were actually transferred.

Note
copyStream uses Java’s java.nio.channels.FileChannel to manage file channels.

If either in and out input streams are not FileInputStream or transferToEnabled flag is disabled (default), copyStream reads data from in to write to out and counts the number of bytes written.

copyStream can optionally close in and out streams (depending on the input closeStreams — disabled by default).

Note
Utils.copyStream is used when BypassMergeSortShuffleWriter writes records into one single shuffle block data file (among other places).
Note
Utils.copyStream is here temporarily (until I find a better place).
Tip
Visit the official web site of JSR 51: New I/O APIs for the Java Platform and read up on java.nio package.

results matching ""

    No results matching ""