TaskResultGetter

TaskResultGetter is a helper class of TaskSchedulerImpl for asynchronous deserialization of task results of tasks that have finished successfully (possibly fetching remote blocks) or the failures for failed tasks.

Caution
FIXME Image with the dependencies
Tip
Consult Task States in Tasks to learn about the different task states.
Note
The only instance of TaskResultGetter is created while TaskSchedulerImpl is created.

TaskResultGetter requires a SparkEnv and TaskSchedulerImpl to be created and is stopped when TaskSchedulerImpl stops.

TaskResultGetter uses task-result-getter asynchronous task executor for operation.

Tip

Enable DEBUG logging level for org.apache.spark.scheduler.TaskResultGetter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.TaskResultGetter=DEBUG

Refer to Logging.

task-result-getter Asynchronous Task Executor

getTaskResultExecutor: ExecutorService

getTaskResultExecutor creates a daemon thread pool with spark.resultGetter.threads threads and task-result-getter prefix.

Tip
Read up on java.util.concurrent.ThreadPoolExecutor that getTaskResultExecutor uses under the covers.

stop Method

stop(): Unit

serializer Attribute

serializer: ThreadLocal[SerializerInstance]

serializer is a thread-local SerializerInstance that TaskResultGetter uses to deserialize byte buffers (with TaskResults or a TaskEndReason).

When created for a new thread, serializer is initialized with a new instance of Serializer (using SparkEnv.closureSerializer).

Note
TaskResultGetter uses java.lang.ThreadLocal for the thread-local SerializerInstance variable.

taskResultSerializer Attribute

taskResultSerializer: ThreadLocal[SerializerInstance]

taskResultSerializer is a thread-local SerializerInstance that TaskResultGetter uses to…​

When created for a new thread, taskResultSerializer is initialized with a new instance of Serializer (using SparkEnv.serializer).

Note
TaskResultGetter uses java.lang.ThreadLocal for the thread-local SerializerInstance variable.

Deserializing Task Result and Notifying TaskSchedulerImpl — enqueueSuccessfulTask Method

enqueueSuccessfulTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  serializedData: ByteBuffer): Unit

enqueueSuccessfulTask submits an asynchronous task (to task-result-getter asynchronous task executor) that first deserializes serializedData to a DirectTaskResult, then updates the internal accumulator (with the size of the DirectTaskResult) and ultimately notifies the TaskSchedulerImpl that the tid task was completed and the task result was received successfully or not.

Note
enqueueSuccessfulTask is just the asynchronous task enqueued for execution by task-result-getter asynchronous task executor at some point in the future.

Internally, the enqueued task first deserializes serializedData to a TaskResult (using the internal thread-local serializer).

For a DirectTaskResult, the task checks the available memory for the task result and, when the size overflows spark.driver.maxResultSize, it simply returns.

Note
enqueueSuccessfulTask is a mere thread so returning from a thread is to do nothing else. That is why the check for quota does abort when there is not enough memory.

Otherwise, when there is enough memory to hold the task result, it deserializes the DirectTaskResult (using the internal thread-local taskResultSerializer).

For a IndirectTaskResult, the task checks the available memory for the task result and, when the size could overflow the maximum result size, it removes the block and simply returns.

Otherwise, when there is enough memory to hold the task result, you should see the following DEBUG message in the logs:

DEBUG Fetching indirect task result for TID [tid]

When the block could not be fetched, TaskSchedulerImpl is informed (with TaskResultLost task failure reason) and the task simply returns.

Note
enqueueSuccessfulTask is a mere thread so returning from a thread is to do nothing else and so the real handling is when TaskSchedulerImpl is informed.

The task result (as a serialized byte buffer) is then deserialized to a DirectTaskResult (using the internal thread-local serializer) and deserialized again using the internal thread-local taskResultSerializer (just like for the DirectTaskResult case). The block is removed from BlockManagerMaster and simply returns.

Note
A IndirectTaskResult is deserialized twice to become the final deserialized task result (using serializer for a DirectTaskResult). Compare it to a DirectTaskResult task result that is deserialized once only.

A ClassNotFoundException leads to aborting the TaskSet (with ClassNotFound with classloader: [loader] error message) while any non-fatal exception shows the following ERROR message in the logs followed by aborting the TaskSet.

ERROR Exception while getting task result

Deserializing TaskFailedReason and Notifying TaskSchedulerImpl — enqueueFailedTask Method

enqueueFailedTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  taskState: TaskState.TaskState,
  serializedData: ByteBuffer): Unit

enqueueFailedTask submits an asynchronous task (to task-result-getter asynchronous task executor) that first attempts to deserialize a TaskFailedReason from serializedData (using the internal thread-local serializer) and then notifies TaskSchedulerImpl that the task has failed.

Any ClassNotFoundException leads to the following ERROR message in the logs (without breaking the flow of enqueueFailedTask):

ERROR Could not deserialize TaskEndReason: ClassNotFound with classloader [loader]

Settings

Table 1. Spark Properties
Spark Property Default Value Description

spark.resultGetter.threads

4

The number of threads for TaskResultGetter.

results matching ""

    No results matching ""