read(): Iterator[Product2[K, C]]
BlockStoreShuffleReader
BlockStoreShuffleReader
is the one and only ShuffleReader that fetches and reads the partitions (in range [startPartition
, endPartition
)) from a shuffle by requesting them from other nodes' block stores.
BlockStoreShuffleReader
is created when the default SortShuffleManager
is requested for a ShuffleReader
(for a ShuffleHandle
).
Creating BlockStoreShuffleReader Instance
BlockStoreShuffleReader
takes:
-
startPartition
andendPartition
partition indices -
(optional) SerializerManager
-
(optional) BlockManager
-
(optional) MapOutputTracker
Note
|
BlockStoreShuffleReader uses SparkEnv to define the optional SerializerManager , BlockManager and MapOutputTracker .
|
Reading Combined Key-Value Records For Reduce Task (using ShuffleBlockFetcherIterator) — read
Method
Note
|
read is a part of ShuffleReader contract.
|
Internally, read
first creates a ShuffleBlockFetcherIterator
(passing in the values of spark.reducer.maxSizeInFlight, spark.reducer.maxReqsInFlight and spark.shuffle.detectCorrupt Spark properties).
Note
|
read uses BlockManager to access ShuffleClient to create ShuffleBlockFetcherIterator .
|
Note
|
read uses MapOutputTracker to find the BlockManagers with the shuffle blocks and sizes to create ShuffleBlockFetcherIterator .
|
read
creates a new SerializerInstance (using Serializer
from ShuffleDependency).
read
creates a key/value iterator by deserializeStream
every shuffle block stream.
read
updates the context task metrics for each record read.
Note
|
read uses CompletionIterator (to count the records read) and InterruptibleIterator (to support task cancellation).
|
If the ShuffleDependency
has an Aggregator
defined, read
wraps the current iterator inside an iterator defined by Aggregator.combineCombinersByKey (for mapSideCombine
enabled) or Aggregator.combineValuesByKey otherwise.
Note
|
run reports an exception when ShuffleDependency has no Aggregator defined with mapSideCombine flag enabled.
|
For keyOrdering
defined in ShuffleDependency
, run
does the following:
-
Inserts all the records into the
ExternalSorter
-
Updates context
TaskMetrics
-
Returns a
CompletionIterator
for theExternalSorter
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task. Since each output requires a new buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory. Used when |
|
(unlimited) |
The maximum number of remote requests to fetch blocks at any given point. When the number of hosts in the cluster increases, it might lead to very large number of in-bound connections to one or more nodes, causing the workers to fail under load. By allowing it to limit the number of fetch requests, this scenario can be mitigated. Used when |
|
|
Controls whether to detect any corruption in fetched blocks. Used when |