TaskRunner
TaskRunner is a thread of execution that manages a single individual task.
TaskRunner is created exclusively when Executor is requested to launch a task.
TaskRunner can be run or killed that simply means running or killing the task this TaskRunner object manages, respectively.
| Name | Description |
|---|---|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating TaskRunner Instance
TaskRunner takes the following when created:
TaskRunner initializes the internal registries and counters.
computeTotalGcTime Method
|
Caution
|
FIXME |
updateDependencies Method
|
Caution
|
FIXME |
setTaskFinishedAndClearInterruptStatus Method
|
Caution
|
FIXME |
Lifecycle
|
Caution
|
FIXME Image with state changes |
A TaskRunner object is created when an executor is requested to launch a task.
It is created with an ExecutorBackend (to send the task’s status updates to), task and attempt ids, task name, and serialized version of the task (as ByteBuffer).
Running Task — run Method
|
Note
|
run is part of java.lang.Runnable contract that TaskRunner follows.
|
run then sets the name of the current thread as threadName (using Java’s Thread).
run creates a TaskMemoryManager (using the current MemoryManager and taskId).
|
Note
|
run uses SparkEnv to access the current MemoryManager.
|
run starts tracking the time to deserialize a task.
run sets the current thread’s context classloader (with replClassLoader).
|
Note
|
run uses SparkEnv to access the current closure Serializer.
|
You should see the following INFO message in the logs:
INFO Executor: Running [taskName] (TID [taskId])
run notifies ExecutorBackend that taskId is in TaskState.RUNNING state.
|
Note
|
run uses ExecutorBackend that was specified when TaskRunner was created.
|
run computes startGCTime.
run updates dependencies.
|
Note
|
run uses TaskDescription that is specified when TaskRunner is created.
|
run deserializes the task (using the context class loader) and sets its localProperties and TaskMemoryManager. run sets the task internal reference to hold the deserialized task.
|
Note
|
run uses TaskDescription to access serialized task.
|
If killed flag is enabled, run throws a TaskKilledException.
You should see the following DEBUG message in the logs:
DEBUG Executor: Task [taskId]'s epoch is [task.epoch]
|
Note
|
run uses SparkEnv to access the current MapOutputTracker.
|
run records the current time as the task’s start time (as taskStart).
run runs the task (with taskAttemptId as taskId, attemptNumber from TaskDescription, and metricsSystem as the current MetricsSystem).
|
Note
|
run uses SparkEnv to access the current MetricsSystem.
|
|
Note
|
The task runs inside a "monitored" block (i.e. try-finally block) to detect any memory and lock leaks after the task’s run finishes regardless of the final outcome - the computed value or an exception thrown.
|
After the task’s run has finished (inside the "finally" block of the "monitored" block), run requests BlockManager to release all locks of the task (for the task’s taskId). The locks are later used for lock leak detection.
run then requests TaskMemoryManager to clean up allocated memory (that helps finding memory leaks).
If run detects memory leak of the managed memory (i.e. the memory freed is greater than 0) and spark.unsafe.exceptionOnMemoryLeak Spark property is enabled (it is not by default) and no exception was reported while the task ran, run reports a SparkException:
Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
Otherwise, if spark.unsafe.exceptionOnMemoryLeak is disabled, you should see the following ERROR message in the logs instead:
ERROR Executor: Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
|
Note
|
If run detects a memory leak, it leads to a SparkException or ERROR message in the logs.
|
If run detects lock leaking (i.e. the number of locks released) and spark.storage.exceptionOnPinLeak Spark property is enabled (it is not by default) and no exception was reported while the task ran, run reports a SparkException:
[releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
Otherwise, if spark.storage.exceptionOnPinLeak is disabled or the task reported an exception, you should see the following INFO message in the logs instead:
INFO Executor: [releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
|
Note
|
If run detects any lock leak, it leads to a SparkException or INFO message in the logs.
|
Rigth after the "monitored" block, run records the current time as the task’s finish time (as taskFinish).
If the task was killed (while it was running), run reports a TaskKilledException (and the TaskRunner exits).
run creates a Serializer and serializes the task’s result. run measures the time to serialize the result.
|
Note
|
run uses SparkEnv to access the current Serializer. SparkEnv was specified when the owning Executor was created.
|
|
Important
|
This is when TaskExecutor serializes the computed value of a task to be sent back to the driver.
|
run records the task metrics:
run creates a DirectTaskResult (with the serialized result and the latest values of accumulators).
run serializes the DirectTaskResult and gets the byte buffer’s limit.
|
Note
|
A serialized DirectTaskResult is Java’s java.nio.ByteBuffer.
|
run selects the proper serialized version of the result before sending it to ExecutorBackend.
run branches off based on the serialized DirectTaskResult byte buffer’s limit.
When maxResultSize is greater than 0 and the serialized DirectTaskResult buffer limit exceeds it, the following WARN message is displayed in the logs:
WARN Executor: Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
|
Tip
|
Read about spark.driver.maxResultSize. |
$ ./bin/spark-shell -c spark.driver.maxResultSize=1m
scala> sc.version
res0: String = 2.0.0-SNAPSHOT
scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 1m
scala> sc.range(0, 1024 * 1024 + 10, 1).collect
WARN Executor: Finished task 4.0 in stage 0.0 (TID 4). Result is larger than maxResultSize (1031.4 KB > 1024.0 KB), dropping it.
...
ERROR TaskSetManager: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
...
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1448)
...
In this case, run creates a IndirectTaskResult (with a TaskResultBlockId for the task’s taskId and resultSize) and serializes it.
When maxResultSize is not positive or resultSize is smaller than maxResultSize but greater than maxDirectResultSize, run creates a TaskResultBlockId for the task’s taskId and stores the serialized DirectTaskResult in BlockManager (as the TaskResultBlockId with MEMORY_AND_DISK_SER storage level).
You should see the following INFO message in the logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)
In this case, run creates a IndirectTaskResult (with a TaskResultBlockId for the task’s taskId and resultSize) and serializes it.
|
Note
|
The difference between the two above cases is that the result is dropped or stored in BlockManager with MEMORY_AND_DISK_SER storage level.
|
When the two cases above do not hold, you should see the following INFO message in the logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver
run uses the serialized DirectTaskResult byte buffer as the final serializedResult.
|
Note
|
The final serializedResult is either a IndirectTaskResult (possibly with the block stored in BlockManager) or a DirectTaskResult.
|
run notifies ExecutorBackend that taskId is in TaskState.FINISHED state with the serialized result and removes taskId from the owning executor’s runningTasks registry.
|
Note
|
run uses ExecutorBackend that is specified when TaskRunner is created.
|
|
Note
|
TaskRunner is Java’s Runnable and the contract requires that once a TaskRunner has completed execution it must not be restarted.
|
When run catches a exception while executing the task, run acts according to its type (as presented in the following "run’s Exception Cases" table and the following sections linked from the table).
| Exception Type | TaskState | Serialized ByteBuffer |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FetchFailedException
When FetchFailedException is reported while running a task, run setTaskFinishedAndClearInterruptStatus.
run requests FetchFailedException for the TaskFailedReason, serializes it and notifies ExecutorBackend that the task has failed (with taskId, TaskState.FAILED, and a serialized reason).
|
Note
|
ExecutorBackend was specified when TaskRunner was created.
|
|
Note
|
run uses a closure Serializer to serialize the failure reason. The Serializer was created before run ran the task.
|
TaskKilledException
When TaskKilledException is reported while running a task, you should see the following INFO message in the logs:
INFO Executor killed [taskName] (TID [taskId])
run then setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has been killed (with taskId, TaskState.KILLED, and a serialized TaskKilled object).
InterruptedException (with Task Killed)
When InterruptedException is reported while running a task, and the task has been killed, you should see the following INFO message in the logs:
INFO Executor interrupted and killed [taskName] (TID [taskId])
run then setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has been killed (with taskId, TaskState.KILLED, and a serialized TaskKilled object).
|
Note
|
The difference between this InterruptedException and TaskKilledException is the INFO message in the logs.
|
CommitDeniedException
When CommitDeniedException is reported while running a task, run setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has failed (with taskId, TaskState.FAILED, and a serialized TaskKilled object).
|
Note
|
The difference between this CommitDeniedException and FetchFailedException is just the reason being sent to ExecutorBackend.
|
Throwable
When run catches a Throwable, you should see the following ERROR message in the logs (followed by the exception).
ERROR Exception in [taskName] (TID [taskId])
run then records the following task metrics (only when Task is available):
run then collects the latest values of internal and external accumulators (with taskFailed flag enabled to inform that the collection is for a failed task).
Otherwise, when Task is not available, the accumulator collection is empty.
run converts the task accumulators to collection of AccumulableInfo, creates a ExceptionFailure (with the accumulators), and serializes them.
|
Note
|
run uses a closure Serializer to serialize the ExceptionFailure.
|
|
Caution
|
FIXME Why does run create new ExceptionFailure(t, accUpdates).withAccums(accums), i.e. accumulators occur twice in the object.
|
run setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has failed (with taskId, TaskState.FAILED, and the serialized ExceptionFailure).
run may also trigger SparkUncaughtExceptionHandler.uncaughtException(t) if this is a fatal error.
|
Note
|
The difference between this most Throwable case and other FAILED cases (i.e. FetchFailedException and CommitDeniedException) is just the serialized ExceptionFailure vs a reason being sent to ExecutorBackend, respectively.
|
Killing Task — kill Method
kill(interruptThread: Boolean): Unit
kill marks the TaskRunner as killed and kills the task (if available and not finished already).
|
Note
|
kill passes the input interruptThread on to the task itself while killing it.
|
When executed, you should see the following INFO message in the logs:
INFO TaskRunner: Executor is trying to kill [taskName] (TID [taskId])