TaskRunner

TaskRunner is a thread of execution that manages a single individual task.

TaskRunner is created exclusively when Executor is requested to launch a task.

spark TaskRunner.png
Figure 1. Executor creates TaskRunner and runs (almost) immediately

TaskRunner can be run or killed that simply means running or killing the task this TaskRunner object manages, respectively.

Table 1. TaskRunner’s Internal Registries and Counters
Name Description

taskId

FIXME

Used when…​FIXME

threadName

FIXME

Used when…​FIXME

taskName

FIXME

Used when…​FIXME

finished

FIXME

Used when…​FIXME

killed

FIXME

Used when…​FIXME

threadId

FIXME

Used when…​FIXME

startGCTime

FIXME

Used when…​FIXME

task

FIXME

Used when…​FIXME

replClassLoader

FIXME

Used when…​FIXME

Tip

Enable INFO or DEBUG logging level for org.apache.spark.executor.Executor logger to see what happens inside TaskRunner (since TaskRunner is an internal class of Executor).

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.executor.Executor=DEBUG

Refer to Logging.

Creating TaskRunner Instance

TaskRunner takes the following when created:

TaskRunner initializes the internal registries and counters.

computeTotalGcTime Method

Caution
FIXME

updateDependencies Method

Caution
FIXME

setTaskFinishedAndClearInterruptStatus Method

Caution
FIXME

Lifecycle

Caution
FIXME Image with state changes

A TaskRunner object is created when an executor is requested to launch a task.

It is created with an ExecutorBackend (to send the task’s status updates to), task and attempt ids, task name, and serialized version of the task (as ByteBuffer).

Running Task — run Method

Note
run is part of java.lang.Runnable contract that TaskRunner follows.

When executed, run initializes threadId as the current thread identifier (using Java’s Thread)

run then sets the name of the current thread as threadName (using Java’s Thread).

run creates a TaskMemoryManager (using the current MemoryManager and taskId).

run starts tracking the time to deserialize a task.

run sets the current thread’s context classloader (with replClassLoader).

You should see the following INFO message in the logs:

INFO Executor: Running [taskName] (TID [taskId])

run notifies ExecutorBackend that taskId is in TaskState.RUNNING state.

Note
run uses ExecutorBackend that was specified when TaskRunner was created.
Note
run uses TaskDescription that is specified when TaskRunner is created.

run deserializes the task (using the context class loader) and sets its localProperties and TaskMemoryManager. run sets the task internal reference to hold the deserialized task.

Note
run uses TaskDescription to access serialized task.

If killed flag is enabled, run throws a TaskKilledException.

You should see the following DEBUG message in the logs:

DEBUG Executor: Task [taskId]'s epoch is [task.epoch]

run records the current time as the task’s start time (as taskStart).

run runs the task (with taskAttemptId as taskId, attemptNumber from TaskDescription, and metricsSystem as the current MetricsSystem).

Note
The task runs inside a "monitored" block (i.e. try-finally block) to detect any memory and lock leaks after the task’s run finishes regardless of the final outcome - the computed value or an exception thrown.

After the task’s run has finished (inside the "finally" block of the "monitored" block), run requests BlockManager to release all locks of the task (for the task’s taskId). The locks are later used for lock leak detection.

run then requests TaskMemoryManager to clean up allocated memory (that helps finding memory leaks).

If run detects memory leak of the managed memory (i.e. the memory freed is greater than 0) and spark.unsafe.exceptionOnMemoryLeak Spark property is enabled (it is not by default) and no exception was reported while the task ran, run reports a SparkException:

Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]

Otherwise, if spark.unsafe.exceptionOnMemoryLeak is disabled, you should see the following ERROR message in the logs instead:

ERROR Executor: Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
Note
If run detects a memory leak, it leads to a SparkException or ERROR message in the logs.

If run detects lock leaking (i.e. the number of locks released) and spark.storage.exceptionOnPinLeak Spark property is enabled (it is not by default) and no exception was reported while the task ran, run reports a SparkException:

[releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]

Otherwise, if spark.storage.exceptionOnPinLeak is disabled or the task reported an exception, you should see the following INFO message in the logs instead:

INFO Executor: [releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
Note
If run detects any lock leak, it leads to a SparkException or INFO message in the logs.

Rigth after the "monitored" block, run records the current time as the task’s finish time (as taskFinish).

If the task was killed (while it was running), run reports a TaskKilledException (and the TaskRunner exits).

run creates a Serializer and serializes the task’s result. run measures the time to serialize the result.

Note
run uses SparkEnv to access the current Serializer. SparkEnv was specified when the owning Executor was created.
Important
This is when TaskExecutor serializes the computed value of a task to be sent back to the driver.

run records the task metrics:

run creates a DirectTaskResult (with the serialized result and the latest values of accumulators).

run serializes the DirectTaskResult and gets the byte buffer’s limit.

Note
A serialized DirectTaskResult is Java’s java.nio.ByteBuffer.

run selects the proper serialized version of the result before sending it to ExecutorBackend.

run branches off based on the serialized DirectTaskResult byte buffer’s limit.

When maxResultSize is greater than 0 and the serialized DirectTaskResult buffer limit exceeds it, the following WARN message is displayed in the logs:

WARN Executor: Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
Tip
Read about spark.driver.maxResultSize.
$ ./bin/spark-shell -c spark.driver.maxResultSize=1m

scala> sc.version
res0: String = 2.0.0-SNAPSHOT

scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 1m

scala> sc.range(0, 1024 * 1024 + 10, 1).collect
WARN Executor: Finished task 4.0 in stage 0.0 (TID 4). Result is larger than maxResultSize (1031.4 KB > 1024.0 KB), dropping it.
...
ERROR TaskSetManager: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
...
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1448)
...

In this case, run creates a IndirectTaskResult (with a TaskResultBlockId for the task’s taskId and resultSize) and serializes it.

When maxResultSize is not positive or resultSize is smaller than maxResultSize but greater than maxDirectResultSize, run creates a TaskResultBlockId for the task’s taskId and stores the serialized DirectTaskResult in BlockManager (as the TaskResultBlockId with MEMORY_AND_DISK_SER storage level).

You should see the following INFO message in the logs:

INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)

In this case, run creates a IndirectTaskResult (with a TaskResultBlockId for the task’s taskId and resultSize) and serializes it.

Note
The difference between the two above cases is that the result is dropped or stored in BlockManager with MEMORY_AND_DISK_SER storage level.

When the two cases above do not hold, you should see the following INFO message in the logs:

INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver

run uses the serialized DirectTaskResult byte buffer as the final serializedResult.

Note
The final serializedResult is either a IndirectTaskResult (possibly with the block stored in BlockManager) or a DirectTaskResult.

run notifies ExecutorBackend that taskId is in TaskState.FINISHED state with the serialized result and removes taskId from the owning executor’s runningTasks registry.

Note
run uses ExecutorBackend that is specified when TaskRunner is created.
Note
TaskRunner is Java’s Runnable and the contract requires that once a TaskRunner has completed execution it must not be restarted.

When run catches a exception while executing the task, run acts according to its type (as presented in the following "run’s Exception Cases" table and the following sections linked from the table).

Table 2. run’s Exception Cases, TaskState and Serialized ByteBuffer
Exception Type TaskState Serialized ByteBuffer

FetchFailedException

FAILED

TaskFailedReason

TaskKilledException

KILLED

TaskKilled

InterruptedException

KILLED

TaskKilled

CommitDeniedException

FAILED

TaskFailedReason

Throwable

FAILED

ExceptionFailure

FetchFailedException

When FetchFailedException is reported while running a task, run setTaskFinishedAndClearInterruptStatus.

run requests FetchFailedException for the TaskFailedReason, serializes it and notifies ExecutorBackend that the task has failed (with taskId, TaskState.FAILED, and a serialized reason).

Note
ExecutorBackend was specified when TaskRunner was created.
Note
run uses a closure Serializer to serialize the failure reason. The Serializer was created before run ran the task.

TaskKilledException

When TaskKilledException is reported while running a task, you should see the following INFO message in the logs:

INFO Executor killed [taskName] (TID [taskId])

run then setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has been killed (with taskId, TaskState.KILLED, and a serialized TaskKilled object).

InterruptedException (with Task Killed)

When InterruptedException is reported while running a task, and the task has been killed, you should see the following INFO message in the logs:

INFO Executor interrupted and killed [taskName] (TID [taskId])

run then setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has been killed (with taskId, TaskState.KILLED, and a serialized TaskKilled object).

Note
The difference between this InterruptedException and TaskKilledException is the INFO message in the logs.

CommitDeniedException

When CommitDeniedException is reported while running a task, run setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has failed (with taskId, TaskState.FAILED, and a serialized TaskKilled object).

Note
The difference between this CommitDeniedException and FetchFailedException is just the reason being sent to ExecutorBackend.

Throwable

When run catches a Throwable, you should see the following ERROR message in the logs (followed by the exception).

ERROR Exception in [taskName] (TID [taskId])

run then records the following task metrics (only when Task is available):

run then collects the latest values of internal and external accumulators (with taskFailed flag enabled to inform that the collection is for a failed task).

Otherwise, when Task is not available, the accumulator collection is empty.

run converts the task accumulators to collection of AccumulableInfo, creates a ExceptionFailure (with the accumulators), and serializes them.

Note
run uses a closure Serializer to serialize the ExceptionFailure.
Caution
FIXME Why does run create new ExceptionFailure(t, accUpdates).withAccums(accums), i.e. accumulators occur twice in the object.

run setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has failed (with taskId, TaskState.FAILED, and the serialized ExceptionFailure).

run may also trigger SparkUncaughtExceptionHandler.uncaughtException(t) if this is a fatal error.

Note
The difference between this most Throwable case and other FAILED cases (i.e. FetchFailedException and CommitDeniedException) is just the serialized ExceptionFailure vs a reason being sent to ExecutorBackend, respectively.

Killing Task — kill Method

kill(interruptThread: Boolean): Unit

kill marks the TaskRunner as killed and kills the task (if available and not finished already).

Note
kill passes the input interruptThread on to the task itself while killing it.

When executed, you should see the following INFO message in the logs:

INFO TaskRunner: Executor is trying to kill [taskName] (TID [taskId])
Note
killed flag is checked periodically in run to stop executing the task. Once killed, the task will eventually stop.

Settings

Table 3. Spark Properties
Spark Property Default Value Description

spark.unsafe.exceptionOnMemoryLeak

false

FIXME

spark.storage.exceptionOnPinLeak

false

FIXME

results matching ""

    No results matching ""