BlockStoreShuffleReader

BlockStoreShuffleReader is the one and only ShuffleReader that fetches and reads the partitions (in range [startPartition, endPartition)) from a shuffle by requesting them from other nodes' block stores.

BlockStoreShuffleReader is created when the default SortShuffleManager is requested for a ShuffleReader (for a ShuffleHandle).

Creating BlockStoreShuffleReader Instance

BlockStoreShuffleReader takes:

  1. BaseShuffleHandle

  2. startPartition and endPartition partition indices

  3. TaskContext

  4. (optional) SerializerManager

  5. (optional) BlockManager

  6. (optional) MapOutputTracker

Reading Combined Key-Value Records For Reduce Task (using ShuffleBlockFetcherIterator) — read Method

read(): Iterator[Product2[K, C]]
Note
read is a part of ShuffleReader contract.
Note
read uses BlockManager to access ShuffleClient to create ShuffleBlockFetcherIterator.
Note
read uses MapOutputTracker to find the BlockManagers with the shuffle blocks and sizes to create ShuffleBlockFetcherIterator.

read creates a key/value iterator by deserializeStream every shuffle block stream.

read updates the context task metrics for each record read.

Note
read uses CompletionIterator (to count the records read) and InterruptibleIterator (to support task cancellation).

If the ShuffleDependency has an Aggregator defined, read wraps the current iterator inside an iterator defined by Aggregator.combineCombinersByKey (for mapSideCombine enabled) or Aggregator.combineValuesByKey otherwise.

For keyOrdering defined in ShuffleDependency, run does the following:

  1. Creates an ExternalSorter

  2. Inserts all the records into the ExternalSorter

  3. Updates context TaskMetrics

  4. Returns a CompletionIterator for the ExternalSorter

Settings

Table 1. Spark Properties
Spark Property Default Value Description

spark.reducer.maxSizeInFlight

48m

Maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task.

Since each output requires a new buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory.

Used when BlockStoreShuffleReader creates a ShuffleBlockFetcherIterator to read records.

spark.reducer.maxReqsInFlight

(unlimited)

The maximum number of remote requests to fetch blocks at any given point.

When the number of hosts in the cluster increases, it might lead to very large number of in-bound connections to one or more nodes, causing the workers to fail under load. By allowing it to limit the number of fetch requests, this scenario can be mitigated.

Used when BlockStoreShuffleReader creates a ShuffleBlockFetcherIterator to read records.

spark.shuffle.detectCorrupt

true

Controls whether to detect any corruption in fetched blocks.

Used when BlockStoreShuffleReader creates a ShuffleBlockFetcherIterator to read records.

results matching ""

    No results matching ""