ShuffleBlockFetcherIterator

ShuffleBlockFetcherIterator is a Scala Iterator that fetches multiple shuffle blocks (aka shuffle map outputs) from local and remote BlockManagers.

ShuffleBlockFetcherIterator allows for iterating over a sequence of blocks as (BlockId, InputStream) pairs so a caller can handle shuffle blocks in a pipelined fashion as they are received.

ShuffleBlockFetcherIterator throttles the remote fetches to avoid using too much memory.

Table 1. ShuffleBlockFetcherIterator’s Internal Registries and Counters
Name Description

results

Internal FIFO blocking queue (using Java’s java.util.concurrent.LinkedBlockingQueue) to hold FetchResult remote and local fetch results.

Used in:

1. next to take one FetchResult off the queue,

2. sendRequest to put SuccessFetchResult or FailureFetchResult remote fetch results (as part of BlockFetchingListener callback),

3. fetchLocalBlocks (similarly to sendRequest) to put local fetch results,

4. cleanup to release managed buffers for SuccessFetchResult results.

maxBytesInFlight

The maximum size (in bytes) of all the remote shuffle blocks to fetch.

Set when ShuffleBlockFetcherIterator is created.

maxReqsInFlight

The maximum number of remote requests to fetch shuffle blocks.

Set when ShuffleBlockFetcherIterator is created.

bytesInFlight

The bytes of fetched remote shuffle blocks in flight

Starts at 0 when ShuffleBlockFetcherIterator is created.

Incremented every sendRequest and decremented every next.

ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight below maxBytesInFlight holds every remote shuffle block fetch.

reqsInFlight

The number of remote shuffle block fetch requests in flight.

Starts at 0 when ShuffleBlockFetcherIterator is created.

Incremented every sendRequest and decremented every next.

ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight below maxReqsInFlight holds every remote shuffle block fetch.

isZombie

Flag whether ShuffleBlockFetcherIterator is still active. It is disabled, i.e. false, when ShuffleBlockFetcherIterator is created.

When enabled (when the task using ShuffleBlockFetcherIterator finishes), the block fetch successful callback (registered in sendRequest) will no longer add fetched remote shuffle blocks into results internal queue.

currentResult

The currently-processed SuccessFetchResult

Set when ShuffleBlockFetcherIterator returns the next (BlockId, InputStream) tuple and released (on cleanup).

Tip

Enable ERROR, WARN, INFO, DEBUG or TRACE logging levels for org.apache.spark.storage.ShuffleBlockFetcherIterator logger to see what happens in ShuffleBlockFetcherIterator.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=TRACE

Refer to Logging.

splitLocalRemoteBlocks Method

Caution
FIXME

fetchUpToMaxBytes Method

Caution
FIXME

fetchLocalBlocks Method

Caution
FIXME

Creating ShuffleBlockFetcherIterator Instance

When created, ShuffleBlockFetcherIterator takes the following:

  1. TaskContext

  2. ShuffleClient

  3. BlockManager

  4. blocksByAddress list of blocks to fetch per BlockManager.

    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])]
  5. streamWrapper function to wrap the returned input stream

    streamWrapper: (BlockId, InputStream) => InputStream
  6. maxBytesInFlight — the maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task (controlled by spark.reducer.maxSizeInFlight Spark property)

  7. maxReqsInFlight — the maximum number of remote requests to fetch blocks at any given point (controlled by spark.reducer.maxReqsInFlight Spark property)

  8. detectCorrupt flag to detect any corruption in fetched blocks (controlled by spark.shuffle.detectCorrupt Spark property)

Caution
FIXME

next Method

Caution
FIXME

Initializing ShuffleBlockFetcherIterator — initialize Internal Method

initialize(): Unit

initialize registers a task cleanup and fetches shuffle blocks from remote and local BlockManagers.

Internally, initialize registers a TaskCompletionListener (that will clean up right after the task finishes).

As ShuffleBlockFetcherIterator is in initialization phase, initialize makes sure that reqsInFlight and bytesInFlight internal counters are both 0. Otherwise, initialize throws an exception.

initialize fetches shuffle blocks (from remote BlockManagers).

You should see the following INFO message in the logs:

INFO ShuffleBlockFetcherIterator: Started [numFetches] remote fetches in [time] ms

You should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: Got local blocks in  [time] ms
Note
initialize is used when ShuffleBlockFetcherIterator is created.

Sending Remote Shuffle Block Fetch Request — sendRequest Internal Method

sendRequest(req: FetchRequest): Unit

Internally, when sendRequest runs, you should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: Sending request for [blocks.size] blocks ([size] B) from [hostPort]

sendRequest increments bytesInFlight and reqsInFlight internal counters.

Note
The input FetchRequest contains the remote BlockManagerId address and the shuffle blocks to fetch (as a sequence of BlockId and their sizes).

sendRequest requests ShuffleClient to fetch shuffle blocks (from the host, the port, and the executor as defined in the input FetchRequest).

Note
ShuffleClient was defined when ShuffleBlockFetcherIterator was created.

sendRequest registers a BlockFetchingListener with ShuffleClient that:

  1. For every successfully fetched shuffle block adds it as SuccessFetchResult to results internal queue.

  2. For every shuffle block fetch failure adds it as FailureFetchResult to results internal queue.

Note
sendRequest is used exclusively when ShuffleBlockFetcherIterator fetches remote shuffle blocks.

onBlockFetchSuccess Callback

onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit

Internally, onBlockFetchSuccess checks if the iterator is not zombie and does the further processing if it is not.

onBlockFetchSuccess marks the input blockId as received (i.e. removes it from all the blocks to fetch as requested in sendRequest).

onBlockFetchSuccess adds the managed buf (as SuccessFetchResult) to results internal queue.

You should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: remainingBlocks: [blocks]

Regardless of zombie state of ShuffleBlockFetcherIterator, you should see the following TRACE message in the logs:

TRACE ShuffleBlockFetcherIterator: Got remote block [blockId] after [time] ms

onBlockFetchFailure Callback

onBlockFetchFailure(blockId: String, e: Throwable): Unit

When onBlockFetchFailure is called, you should see the following ERROR message in the logs:

ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from [hostPort]

onBlockFetchFailure adds the block (as FailureFetchResult) to results internal queue.

Throwing FetchFailedException (for ShuffleBlockId) — throwFetchFailedException Internal Method

throwFetchFailedException(
  blockId: BlockId,
  address: BlockManagerId,
  e: Throwable): Nothing

throwFetchFailedException throws a FetchFailedException when the input blockId is a ShuffleBlockId.

Note
throwFetchFailedException creates a FetchFailedException passing on the root cause of a failure, i.e. the input e.

Otherwise, throwFetchFailedException throws a SparkException:

Failed to get block [blockId], which is not a shuffle block
Note
throwFetchFailedException is used when ShuffleBlockFetcherIterator is requested for the next element.

Releasing Resources — cleanup Internal Method

cleanup(): Unit

Internally, cleanup marks ShuffleBlockFetcherIterator a zombie.

cleanup iterates over results internal queue and for every SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.

Decrementing Reference Count Of and Releasing Result Buffer (for SuccessFetchResult) — releaseCurrentResultBuffer Internal Method

releaseCurrentResultBuffer(): Unit

releaseCurrentResultBuffer decrements the currently-processed SuccessFetchResult reference's buffer reference count if there is any.

releaseCurrentResultBuffer releases currentResult.

Note
releaseCurrentResultBuffer is used when ShuffleBlockFetcherIterator releases resources and BufferReleasingInputStream closes.

results matching ""

    No results matching ""