TaskSetManager

TaskSetManager is a Schedulable that manages scheduling of tasks in a TaskSet.

Note
A TaskSet represents a set of tasks that correspond to missing partitions of a stage.
TaskSetManager TaskSchedulerImpl TaskSet.png
Figure 1. TaskSetManager and its Dependencies

TaskSetManager is notified when a task (from the TaskSet it manages) finishes — sucessfully or due to a failure (in task execution or an executor being lost).

TaskSetManager uses maxTaskFailures to control how many times a single task can fail before an entire TaskSet gets aborted that can take the following values:

The responsibilities of a TaskSetManager include:

Tip

Enable DEBUG logging levels for org.apache.spark.scheduler.TaskSchedulerImpl (or org.apache.spark.scheduler.cluster.YarnScheduler for YARN) and org.apache.spark.scheduler.TaskSetManager and execute the following two-stage job to see their low-level innerworkings.

A cluster manager is recommended since it gives more task localization choices (with YARN additionally supporting rack localization).

$ ./bin/spark-shell --master yarn --conf spark.ui.showConsoleProgress=false

// Keep # partitions low to keep # messages low
scala> sc.parallelize(0 to 9, 3).groupBy(_ % 3).count
INFO YarnScheduler: Adding task set 0.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.2.87, executor 1, partition 0, PROCESS_LOCAL, 7541 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.0.2.87, executor 2, partition 1, PROCESS_LOCAL, 7541 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.0.2.87, executor 1, partition 2, PROCESS_LOCAL, 7598 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 518 ms on 10.0.2.87 (executor 1) (1/3)
INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 512 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 51 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
INFO YarnScheduler: Adding task set 1.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 1.0: 1
DEBUG TaskSetManager: Valid locality levels for TaskSet 1.0: NODE_LOCAL, RACK_LOCAL, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, 10.0.2.87, executor 2, partition 0, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, 10.0.2.87, executor 1, partition 1, NODE_LOCAL, 7348 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, 10.0.2.87, executor 1, partition 2, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 130 ms on 10.0.2.87 (executor 1) (1/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NODE_LOCAL, so moving to locality level RACK_LOCAL
DEBUG TaskSetManager: No tasks for locality level RACK_LOCAL, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 133 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 21 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
res0: Long = 3
Table 1. TaskSetManager’s Internal Registries and Counters
Name Description

allPendingTasks

Indices of all the pending tasks to execute (regardless of their localization preferences).

Updated with an task index when TaskSetManager registers a task as pending execution (per preferred locations).

calculatedTasks

The number of the tasks that have already completed execution.

Starts from 0 when a TaskSetManager is created and is only incremented when the TaskSetManager checks that there is enough memory to fetch a task result.

copiesRunning

The number of task copies currently running per task (index in its task set).

The number of task copies of a task is increased when finds a task for execution (given resource offer) or checking for speculatable tasks and decreased when a task fails or an executor is lost (for a shuffle map stage and no external shuffle service).

currentLocalityIndex

epoch

Current map output tracker epoch.

failedExecutors

Lookup table of TaskInfo indices that failed to executor ids and the time of the failure.

Used in handleFailedTask.

isZombie

Disabled, i.e. false, by default.

Read Zombie state in this document.

lastLaunchTime

localityWaits

myLocalityLevels

TaskLocality locality preferences of the pending tasks in the TaskSet ranging from PROCESS_LOCAL through NODE_LOCAL, NO_PREF, and RACK_LOCAL to ANY.

NOTE: myLocalityLevels may contain only a few of all the available TaskLocality preferences with ANY as a mandatory task locality preference.

Set immediately when TaskSetManager is created.

Recomputed every change in the status of executors.

name

numFailures

Array of the number of task failures per task.

Incremented when TaskSetManager handles a task failure and immediatelly checked if above acceptable number of task failures.

numTasks

Number of tasks to compute.

pendingTasksForExecutor

Lookup table of the indices of tasks pending execution per executor.

Updated with an task index and executor when TaskSetManager registers a task as pending execution (per preferred locations) (and the location is a ExecutorCacheTaskLocation or HDFSCacheTaskLocation).

pendingTasksForHost

Lookup table of the indices of tasks pending execution per host.

Updated with an task index and host when TaskSetManager registers a task as pending execution (per preferred locations).

pendingTasksForRack

Lookup table of the indices of tasks pending execution per rack.

Updated with an task index and rack when TaskSetManager registers a task as pending execution (per preferred locations).

pendingTasksWithNoPrefs

Lookup table of the indices of tasks pending execution with no location preferences.

Updated with an task index when TaskSetManager registers a task as pending execution (per preferred locations).

priority

recentExceptions

runningTasksSet

Collection of running tasks that a TaskSetManager manages.

Used to implement runningTasks (that is simply the size of runningTasksSet but a required part of any Schedulable). runningTasksSet is expanded when registering a running task and shrinked when unregistering a running task.

Used in TaskSchedulerImpl to cancel tasks.

speculatableTasks

stageId

The stage’s id a TaskSetManager runs for.

Set when TaskSetManager is created.

NOTE: stageId is a part of Schedulable contract.

successful

Status of tasks (with a boolean flag, i.e. true or false, per task).

All tasks start with their flags disabled, i.e. false, when TaskSetManager is created.

The flag for a task is turned on, i.e. true, when a task finishes successfully but also with a failure.

A flag is explicitly turned off only for ShuffleMapTask tasks when their executor is lost.

taskAttempts

Registry of TaskInfos per every task attempt per task.

taskInfos

Registry of TaskInfos per task id.

Updated with the task (id) and the corresponding TaskInfo when TaskSetManager finds a task for execution (given resource offer).

NOTE: It appears that the entires stay forever, i.e. are never removed (perhaps because the maintenance overhead is not needed given a TaskSetManager is a short-lived entity).

tasks

Lookup table of Tasks (per partition id) to schedule execution of.

NOTE: The tasks all belong to a single TaskSet that was given when TaskSetManager was created (which actually represent a single Stage).

tasksSuccessful

totalResultSize

The current total size of the result of all the tasks that have finished.

Starts from 0 when TaskSetManager is created.

Only increased with the size of a task result whenever a TaskSetManager checks that there is enough memory to fetch the task result.

Tip

Enable DEBUG logging level for org.apache.spark.scheduler.TaskSetManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.TaskSetManager=DEBUG

Refer to Logging.

isTaskBlacklistedOnExecOrNode Method

Caution
FIXME

getLocalityIndex Method

Caution
FIXME

dequeueSpeculativeTask Method

Caution
FIXME

executorAdded Method

executorAdded simply calls recomputeLocality method.

abortIfCompletelyBlacklisted Method

Caution
FIXME

TaskSetManager is Schedulable

TaskSetManager is a Schedulable with the following implementation:

  • name is TaskSet_[taskSet.stageId.toString]

  • no parent is ever assigned, i.e. it is always null.

    It means that it can only be a leaf in the tree of Schedulables (with Pools being the nodes).

  • schedulingMode always returns SchedulingMode.NONE (since there is nothing to schedule).

  • weight is always 1.

  • minShare is always 0.

  • runningTasks is the number of running tasks in the internal runningTasksSet.

  • priority is the priority of the owned TaskSet (using taskSet.priority).

  • stageId is the stage id of the owned TaskSet (using taskSet.stageId).

  • schedulableQueue returns no queue, i.e. null.

  • addSchedulable and removeSchedulable do nothing.

  • getSchedulableByName always returns null.

  • getSortedTaskSetQueue returns a one-element collection with the sole element being itself.

  • executorLost

  • checkSpeculatableTasks

Marking Task As Fetching Indirect Result — handleTaskGettingResult Method

handleTaskGettingResult(tid: Long): Unit

handleTaskGettingResult finds TaskInfo for tid task in taskInfos internal registry and marks it as fetching indirect task result. It then notifies DAGScheduler.

Note
handleTaskGettingResult is executed when TaskSchedulerImpl is notified about fetching indirect task result.

Registering Running Task — addRunningTask Method

addRunningTask(tid: Long): Unit

addRunningTask adds tid to runningTasksSet internal registry and requests the parent pool to increase the number of running tasks (if defined).

Unregistering Running Task — removeRunningTask Method

removeRunningTask(tid: Long): Unit

removeRunningTask removes tid from runningTasksSet internal registry and requests the parent pool to decrease the number of running task (if defined).

Checking Speculatable Tasks — checkSpeculatableTasks Method

Note
checkSpeculatableTasks is part of the Schedulable Contract.
checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean

checkSpeculatableTasks checks whether there are speculatable tasks in a TaskSet.

Note
checkSpeculatableTasks is called when TaskSchedulerImpl checks for speculatable tasks.

If the TaskSetManager is zombie or has a single task in TaskSet, it assumes no speculatable tasks.

The method goes on with the assumption of no speculatable tasks by default.

It computes the minimum number of finished tasks for speculation (as spark.speculation.quantile of all the finished tasks).

You should see the DEBUG message in the logs:

DEBUG Checking for speculative tasks: minFinished = [minFinishedForSpeculation]

It then checks whether the number is equal or greater than the number of tasks completed successfully (using tasksSuccessful).

Having done that, it computes the median duration of all the successfully completed tasks (using taskInfos internal registry) and task length threshold using the median duration multiplied by spark.speculation.multiplier that has to be equal or less than 100.

You should see the DEBUG message in the logs:

DEBUG Task length threshold for speculation: [threshold]

For each task (using taskInfos internal registry) that is not marked as successful yet (using successful) for which there is only one copy running (using copiesRunning) and the task takes more time than the calculated threshold, but it was not in speculatableTasks it is assumed speculatable.

You should see the following INFO message in the logs:

INFO Marking task [index] in stage [taskSet.id] (on [info.host]) as speculatable because it ran more than [threshold] ms

The task gets added to the internal speculatableTasks collection. The method responds positively.

getAllowedLocalityLevel Method

Caution
FIXME

Finding Task For Execution (Given Resource Offer) — resourceOffer Method

resourceOffer(
  execId: String,
  host: String,
  maxLocality: TaskLocality): Option[TaskDescription]

(only if TaskSetBlacklist is defined) resourceOffer requests TaskSetBlacklist to check if the input execId executor or host node are blacklisted.

When TaskSetManager is a zombie or the resource offer (as executor and host) is blacklisted, resourceOffer finds no tasks to execute (and returns no TaskDescription).

Note
resourceOffer finds a task to schedule for a resource offer when neither TaskSetManager is a zombie nor the resource offer is blacklisted.

resourceOffer calculates the allowed task locality for task selection. When the input maxLocality is not NO_PREF task locality, resourceOffer getAllowedLocalityLevel (for the current time) and sets it as the current task locality if more localized (specific).

Note
TaskLocality can be the most localized PROCESS_LOCAL, NODE_LOCAL through NO_PREF and RACK_LOCAL to ANY.

If a task (index) is found, resourceOffer takes the Task (from tasks registry).

resourceOffer increments the number of the copies of the task that are currently running and finds the task attempt number (as the size of taskAttempts entries for the task index).

resourceOffer creates a TaskInfo that is then registered in taskInfos and taskAttempts.

If the maximum acceptable task locality is not NO_PREF, resourceOffer getLocalityIndex (using the task’s locality) and records it as currentLocalityIndex with the current time as lastLaunchTime.

resourceOffer serializes the task.

If the task serialization fails, you should see the following ERROR message in the logs:

Failed to serialize task [taskId], not attempting to retry it.

resourceOffer aborts the TaskSet with the following message and reports a TaskNotSerializableException.

Failed to serialize task [taskId], not attempting to retry it. Exception during serialization: [exception]

resourceOffer checks the size of the serialized task. If it is greater than 100 kB, you should see the following WARN message in the logs:

WARN Stage [id] contains a task of very large size ([size] KB). The maximum recommended task size is 100 KB.
Note
The size of the serializable task, i.e. 100 kB, is not configurable.

If however the serialization went well and the size is fine too, resourceOffer registers the task as running.

You should see the following INFO message in the logs:

INFO TaskSetManager: Starting [name] (TID [id], [host], executor [id], partition [id], [taskLocality], [size] bytes)

For example:

INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 2054 bytes)
Important
This is the moment when TaskSetManager informs DAGScheduler that a task has started.
Note
resourceOffer is used when TaskSchedulerImpl resourceOfferSingleTaskSet.

Dequeueing Task For Execution (Given Locality Information) — dequeueTask Internal Method

dequeueTask(execId: String, host: String, maxLocality: TaskLocality): Option[(Int, TaskLocality, Boolean)]

dequeueTask tries to find the higest task index (meeting localization requirements) using tasks (indices) registered for execution on execId executor. If a task is found, dequeueTask returns its index, PROCESS_LOCAL task locality and the speculative marker disabled.

dequeueTask then goes over all the possible task localities and checks what locality is allowed given the input maxLocality.

dequeueTask checks out NODE_LOCAL, NO_PREF, RACK_LOCAL and ANY in that order.

For NODE_LOCAL dequeueTask tries to find the higest task index (meeting localization requirements) using tasks (indices) registered for execution on host host and if found returns its index, NODE_LOCAL task locality and the speculative marker disabled.

For NO_PREF dequeueTask tries to find the higest task index (meeting localization requirements) using pendingTasksWithNoPrefs internal registry and if found returns its index, PROCESS_LOCAL task locality and the speculative marker disabled.

Note
For NO_PREF the task locality is PROCESS_LOCAL.

For RACK_LOCAL dequeueTask finds the rack for the input host and if available tries to find the higest task index (meeting localization requirements) using tasks (indices) registered for execution on the rack. If a task is found, dequeueTask returns its index, RACK_LOCAL task locality and the speculative marker disabled.

For ANY dequeueTask tries to find the higest task index (meeting localization requirements) using allPendingTasks internal registry and if found returns its index, ANY task locality and the speculative marker disabled.

In the end, when no task could be found, dequeueTask dequeueSpeculativeTask and if found returns its index, locality and the speculative marker enabled.

Note
The speculative marker is enabled for a task only when dequeueTask did not manage to find a task for the available task localities and did find a speculative task.
Note
dequeueTask is used exclusively when TaskSetManager finds a task for execution (given resource offer).

Finding Higest Task Index (Not Blacklisted, With No Copies Running and Not Completed Already) — dequeueTaskFromList Internal Method

dequeueTaskFromList(
  execId: String,
  host: String,
  list: ArrayBuffer[Int]): Option[Int]

dequeueTaskFromList takes task indices from the input list backwards (from the last to the first entry). For every index dequeueTaskFromList checks if it is not blacklisted on the input execId executor and host and if not, checks that:

If so, dequeueTaskFromList returns the task index.

If dequeueTaskFromList has checked all the indices and no index has passed the checks, dequeueTaskFromList returns None (to indicate that no index has met the requirements).

Note
dequeueTaskFromList is used exclusively when TaskSetManager dequeues a task tor execution (given locality information).

Finding Tasks (Indices) Registered For Execution on Executor — getPendingTasksForExecutor Internal Method

getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int]

getPendingTasksForExecutor finds pending tasks (indices) registered for execution on the input executorId executor (in pendingTasksForExecutor internal registry).

Note
getPendingTasksForExecutor may find no matching tasks and return an empty collection.
Note
getPendingTasksForExecutor is used exclusively when TaskSetManager dequeues a task tor execution (given locality information).

Finding Tasks (Indices) Registered For Execution on Host — getPendingTasksForHost Internal Method

getPendingTasksForHost(host: String): ArrayBuffer[Int]

getPendingTasksForHost finds pending tasks (indices) registered for execution on the input host host (in pendingTasksForHost internal registry).

Note
getPendingTasksForHost may find no matching tasks and return an empty collection.
Note
getPendingTasksForHost is used exclusively when TaskSetManager dequeues a task tor execution (given locality information).

Finding Tasks (Indices) Registered For Execution on Rack — getPendingTasksForRack Internal Method

getPendingTasksForRack(rack: String): ArrayBuffer[Int]

getPendingTasksForRack finds pending tasks (indices) registered for execution on the input rack rack (in pendingTasksForRack internal registry).

Note
getPendingTasksForRack may find no matching tasks and return an empty collection.
Note
getPendingTasksForRack is used exclusively when TaskSetManager dequeues a task tor execution (given locality information).

Scheduling Tasks in TaskSet

Caution
FIXME

For each submitted TaskSet, a new TaskSetManager is created. The TaskSetManager completely and exclusively owns a TaskSet submitted for execution.

Caution
FIXME A picture with TaskSetManager owning TaskSet
Caution
FIXME What component knows about TaskSet and TaskSetManager. Isn’t it that TaskSets are created by DAGScheduler while TaskSetManager is used by TaskSchedulerImpl only?

TaskSetManager keeps track of the tasks pending execution per executor, host, rack or with no locality preferences.

Locality-Aware Scheduling aka Delay Scheduling

TaskSetManager computes locality levels for the TaskSet for delay scheduling. While computing you should see the following DEBUG in the logs:

DEBUG Valid locality levels for [taskSet]:  [levels]
Caution
FIXME What’s delay scheduling?

Events

Once a task has finished, TaskSetManager informs DAGScheduler.

Caution
FIXME

Recording Successful Task And Notifying DAGScheduler — handleSuccessfulTask Method

handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit

handleSuccessfulTask records the tid task as finished, notifies the DAGScheduler that the task has ended and attempts to mark the TaskSet finished.

Internally, handleSuccessfulTask finds TaskInfo (in taskInfos internal registry) and marks it as FINISHED.

It then removes tid task from runningTasksSet internal registry.

handleSuccessfulTask notifies DAGScheduler that tid task ended successfully (with the Task object from tasks internal registry and the result as Success).

At this point, handleSuccessfulTask finds the other running task attempts of tid task and requests SchedulerBackend to kill them (since they are no longer necessary now when at least one task attempt has completed successfully). You should see the following INFO message in the logs:

INFO Killing attempt [attemptNumber] for task [id] in stage [id] (TID [id]) on [host] as the attempt [attemptNumber] succeeded on [host]
Caution
FIXME Review taskAttempts

If tid has not yet been recorded as successful, handleSuccessfulTask increases tasksSuccessful counter. You should see the following INFO message in the logs:

INFO Finished task [id] in stage [id] (TID [taskId]) in [duration] ms on [host] (executor [executorId]) ([tasksSuccessful]/[numTasks])

tid task is marked as successful. If the number of task that have finished successfully is exactly the number of the tasks to execute (in the TaskSet), the TaskSetManager becomes a zombie.

If tid task was already recorded as successful, you should merely see the following INFO message in the logs:

INFO Ignoring task-finished event for [id] in stage [id] because task [index] has already completed successfully

Ultimately, handleSuccessfulTask attempts to mark the TaskSet finished.

Attempting to Mark TaskSet Finished — maybeFinishTaskSet Internal Method

maybeFinishTaskSet(): Unit

Retrying Tasks on Failure

Caution
FIXME

Up to spark.task.maxFailures attempts

Task retries and spark.task.maxFailures

When you start Spark program you set up spark.task.maxFailures for the number of failures that are acceptable until TaskSetManager gives up and marks a job failed.

Tip
In Spark shell with local master, spark.task.maxFailures is fixed to 1 and you need to use local-with-retries master to change it to some other value.

In the following example, you are going to execute a job with two partitions and keep one failing at all times (by throwing an exception). The aim is to learn the behavior of retrying task execution in a stage in TaskSet. You will only look at a single task execution, namely 0.0.

$ ./bin/spark-shell --master "local[*, 5]"
...
scala> sc.textFile("README.md", 2).mapPartitionsWithIndex((idx, it) => if (idx == 0) throw new Exception("Partition 2 marked failed") else it).count
...
15/10/27 17:24:56 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:25)
15/10/27 17:24:56 DEBUG DAGScheduler: New pending partitions: Set(0, 1)
15/10/27 17:24:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
...
15/10/27 17:24:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2062 bytes)
...
15/10/27 17:24:56 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
...
15/10/27 17:24:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2062 bytes)
15/10/27 17:24:56 INFO Executor: Running task 0.1 in stage 1.0 (TID 4)
15/10/27 17:24:56 INFO HadoopRDD: Input split: file:/Users/jacek/dev/oss/spark/README.md:0+1784
15/10/27 17:24:56 ERROR Executor: Exception in task 0.1 in stage 1.0 (TID 4)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 ERROR Executor: Exception in task 0.4 in stage 1.0 (TID 7)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 INFO TaskSetManager: Lost task 0.4 in stage 1.0 (TID 7) on executor localhost: java.lang.Exception (Partition 2 marked failed) [duplicate 4]
15/10/27 17:24:56 ERROR TaskSetManager: Task 0 in stage 1.0 failed 5 times; aborting job
15/10/27 17:24:56 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/10/27 17:24:56 INFO TaskSchedulerImpl: Cancelling stage 1
15/10/27 17:24:56 INFO DAGScheduler: ResultStage 1 (count at <console>:25) failed in 0.058 s
15/10/27 17:24:56 DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
15/10/27 17:24:56 INFO DAGScheduler: Job 1 failed: count at <console>:25, took 0.085810 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 5 times, most recent failure: Lost task 0.4 in stage 1.0 (TID 7, localhost): java.lang.Exception: Partition 2 marked failed

Zombie state

A TaskSetManager is in zombie state when all tasks in a taskset have completed successfully (regardless of the number of task attempts), or if the taskset has been aborted.

While in zombie state, a TaskSetManager can launch no new tasks and responds with no TaskDescription to resourceOffers.

A TaskSetManager remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks.

Aborting TaskSet — abort Method

abort(message: String, exception: Option[Throwable] = None): Unit
Caution
FIXME image with DAGScheduler call

The TaskSetManager enters zombie state.

Checking Available Memory For Task Result — canFetchMoreResults Method

canFetchMoreResults(size: Long): Boolean

canFetchMoreResults checks whether there is enough memory to fetch the result of a task.

Internally, canFetchMoreResults increments the internal totalResultSize with the input size which is the result of a task. It also increments the internal calculatedTasks.

If the current internal totalResultSize is bigger than spark.driver.maxResultSize the following ERROR message is printed out to the logs:

ERROR TaskSetManager: Total size of serialized results of [calculatedTasks] tasks ([totalResultSize]) is bigger than spark.driver.maxResultSize ([maxResultSize])

The current TaskSet is aborted and canFetchMoreResults returns false.

Otherwise, canFetchMoreResults returns true.

Note
canFetchMoreResults is used in TaskResultGetter.enqueueSuccessfulTask only.

Creating TaskSetManager Instance

TaskSetManager takes the following when created:

TaskSetManager initializes the internal registries and counters.

Note
maxTaskFailures is 1 for local run mode, maxFailures for Spark local-with-retries, and spark.task.maxFailures property for Spark local-cluster and Spark with cluster managers (Spark Standalone, Mesos and YARN).

TaskSetManager requests the current epoch from MapOutputTracker and sets it on all tasks in the taskset.

Note
TaskSetManager uses TaskSchedulerImpl (that was given when created) to access the current MapOutputTracker.

You should see the following DEBUG in the logs:

DEBUG Epoch for [taskSet]: [epoch]
Caution
FIXME Why is the epoch important?
Note
TaskSetManager requests MapOutputTracker from TaskSchedulerImpl which is likely for unit testing only since MapOutputTracker is available using SparkEnv.

TaskSetManager adds the tasks as pending execution (in reverse order from the highest partition to the lowest).

Caution
FIXME Why is reverse order important? The code says it’s to execute tasks with low indices first.

Getting Notified that Task Failed — handleFailedTask Method

handleFailedTask(
  tid: Long,
  state: TaskState,
  reason: TaskFailedReason): Unit

handleFailedTask finds TaskInfo of tid task in taskInfos internal registry and simply quits if the task is already marked as failed or killed.

TaskSetManager handleFailedTask.png
Figure 2. TaskSetManager Gets Notified that Task Has Failed
Note
handleFailedTask is executed after TaskSchedulerImpl has been informed that tid task failed or an executor was lost. In either case, tasks could not finish successfully or could not report their status back.

handleFailedTask decrements the number of the running copies of tid task (in copiesRunning internal registry).

Note
With speculative execution of tasks enabled, there can be many copies of a task running simultaneuosly.

handleFailedTask uses the following pattern as the reason of the failure:

Lost task [id] in stage [taskSetId] (TID [tid], [host], executor [executorId]): [reason]

handleFailedTask then calculates the failure exception per the input reason (follow the links for more details):

Note
Description of how the final failure exception is "computed" was moved to respective sections below to make the reading slightly more pleasant and comprehensible.

handleFailedTask informs DAGScheduler that tid task has ended (passing on the Task instance from tasks internal registry, the input reason, null result, calculated accumUpdates per failure, and the TaskInfo).

Important
This is the moment when TaskSetManager informs DAGScheduler that a task has ended.

If tid task has already been marked as completed (in successful internal registry) you should see the following INFO message in the logs:

INFO Task [id] in stage [id] (TID [tid]) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
Tip
Read up on Speculative Execution of Tasks to find out why a single task could be executed multiple times.

If however tid task was not recorded as completed, handleFailedTask records it as pending.

If the TaskSetManager is not a zombie and the task failed reason should be counted towards the maximum number of times the task is allowed to fail before the stage is aborted (i.e. TaskFailedReason.countTowardsTaskFailures attribute is enabled), the optional TaskSetBlacklist is notified (passing on the host, executor and the task’s index). handleFailedTask then increments the number of failures for tid task and checks if the number of failures is equal or greater than the allowed number of task failures per TaskSet (as defined when the TaskSetManager was created).

If so, i.e. the number of task failures of tid task reached the maximum value, you should see the following ERROR message in the logs:

ERROR Task [id] in stage [id] failed [maxTaskFailures] times; aborting job

And handleFailedTask aborts the TaskSet with the following message and then quits:

Task [index] in stage [id] failed [maxTaskFailures] times, most recent failure: [failureReason]

In the end (except when the number of failures of tid task grew beyond the acceptable number), handleFailedTask attempts to mark the TaskSet as finished.

Note
handleFailedTask is used when TaskSchedulerImpl is informed that a task has failed or when TaskSetManager is informed that an executor has been lost.

FetchFailed TaskFailedReason

For FetchFailed you should see the following WARN message in the logs:

WARN Lost task [id] in stage [id] (TID [tid], [host], executor [id]): [reason]

Unless tid has already been marked as successful (in successful internal registry), it becomes so and the number of successful tasks in TaskSet gets increased.

The TaskSetManager enters zombie state.

The failure exception is empty.

ExceptionFailure TaskFailedReason

For ExceptionFailure, handleFailedTask checks if the exception is of type NotSerializableException. If so, you should see the following ERROR message in the logs:

ERROR Task [id] in stage [id] (TID [tid]) had a not serializable result: [description]; not retrying

And handleFailedTask aborts the TaskSet and then quits.

Otherwise, if the exception is not of type NotSerializableException, handleFailedTask accesses accumulators and calculates whether to print the WARN message (with the failure reason) or the INFO message.

If the failure has already been reported (and is therefore a duplication), spark.logging.exceptionPrintInterval is checked before reprinting the duplicate exception in its entirety.

For full printout of the ExceptionFailure, the following WARN appears in the logs:

WARN Lost task [id] in stage [id] (TID [tid], [host], executor [id]): [reason]

Otherwise, the following INFO appears in the logs:

INFO Lost task [id] in stage [id] (TID [tid]) on [host], executor [id]: [className] ([description]) [duplicate [dupCount]]

The exception in ExceptionFailure becomes the failure exception.

ExecutorLostFailure TaskFailedReason

For ExecutorLostFailure if not exitCausedByApp, you should see the following INFO in the logs:

INFO Task [tid] failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task.

The failure exception is empty.

Other TaskFailedReasons

For the other TaskFailedReasons, you should see the following WARN message in the logs:

WARN Lost task [id] in stage [id] (TID [tid], [host], executor [id]): [reason]

The failure exception is empty.

Registering Task As Pending Execution (Per Preferred Locations) — addPendingTask Internal Method

addPendingTask(index: Int): Unit

addPendingTask registers a index task in the pending-task lists that the task should be eventually scheduled to (per its preferred locations).

Internally, addPendingTask takes the preferred locations of the task (given index) and registers the task in the internal pending-task registries for every preferred location:

For a TaskLocation being HDFSCacheTaskLocation, addPendingTask requests TaskSchedulerImpl for the executors on the host (of a preferred location) and registers the task in pendingTasksForExecutor for every executor (if available).

You should see the following INFO message in the logs:

INFO Pending task [index] has a cached location at [host] , where there are executors [executors]

When addPendingTask could not find executors for a HDFSCacheTaskLocation preferred location, you should see the following DEBUG message in the logs:

DEBUG Pending task [index] has a cached location at [host] , but there are no executors alive there.

If the task has no location preferences, addPendingTask registers it in pendingTasksWithNoPrefs.

addPendingTask always registers the task in allPendingTasks.

Note
addPendingTask is used immediatelly when TaskSetManager is created and later when handling a task failure or lost executor.

Re-enqueuing ShuffleMapTasks (with no ExternalShuffleService) and Reporting All Running Tasks on Lost Executor as Failed — executorLost Method

executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit

executorLost re-enqueues all the ShuffleMapTasks that have completed already on the lost executor (when external shuffle service is not in use) and reports all currently-running tasks on the lost executor as failed.

Note
Since TaskSetManager manages execution of the tasks in a single TaskSet, when an executor gets lost, the affected tasks that have been running on the failed executor need to be re-enqueued. executorLost is the mechanism to "announce" the event to all TaskSetManagers.

Internally, executorLost first checks whether the tasks are ShuffleMapTasks and whether an external shuffle service is enabled (that could serve the map shuffle outputs in case of failure).

Note
executorLost checks out the first task in tasks as it is assumed the other belong to the same stage. If the task is a ShuffleMapTask, the entire TaskSet is for a ShuffleMapStage.
Note
executorLost uses SparkEnv to access the current BlockManager and finds out whether an external shuffle service is enabled or not (that is controlled using spark.shuffle.service.enabled property).

If executorLost is indeed due to an executor lost that executed tasks for a ShuffleMapStage (that this TaskSetManager manages) and no external shuffle server is enabled, executorLost finds all the tasks that were scheduled on this lost executor and marks the ones that were already successfully completed as not executed yet.

Note
executorLost uses records every tasks on the lost executor in successful (as false) and decrements [copiesRunning copiesRunning], and tasksSuccessful for every task.
Note
executorLost uses TaskSchedulerImpl to access the DAGScheduler. TaskSchedulerImpl is given when the TaskSetManager was created.

Regardless of whether this TaskSetManager manages ShuffleMapTasks or not (it could also manage ResultTasks) and whether the external shuffle service is used or not, executorLost finds all currently-running tasks on this lost executor and reports them as failed (with the task state FAILED).

Note
executorLost finds out if the reason for the executor lost is due to application fault, i.e. assumes ExecutorExited's exit status as the indicator, ExecutorKilled for non-application’s fault and any other reason is an application fault.

Recomputing Task Locality Preferences — recomputeLocality Method

recomputeLocality(): Unit

recomputeLocality recomputes the internal caches: myLocalityLevels, localityWaits and currentLocalityIndex.

Caution
FIXME But why are the caches important (and have to be recomputed)?

recomputeLocality records the current TaskLocality level of this TaskSetManager (that is currentLocalityIndex in myLocalityLevels).

Note
TaskLocality is one of PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL and ANY values.

recomputeLocality computes locality levels (for scheduled tasks) and saves the result in myLocalityLevels internal cache.

recomputeLocality computes localityWaits (by finding locality wait for every locality level in myLocalityLevels internal cache).

In the end, recomputeLocality getLocalityIndex of the previous locality level and records it in currentLocalityIndex.

Note
recomputeLocality is used when TaskSetManager gets notified about status change in executors, i.e. when an executor is lost or added.

Computing Locality Levels (for Scheduled Tasks) — computeValidLocalityLevels Internal Method

computeValidLocalityLevels(): Array[TaskLocality]

computeValidLocalityLevels computes valid locality levels for tasks that were registered in corresponding registries per locality level.

Note
TaskLocality is a task locality preference and can be the most localized PROCESS_LOCAL, NODE_LOCAL through NO_PREF and RACK_LOCAL to ANY.
Table 2. TaskLocalities and Corresponding Internal Registries
TaskLocality Internal Registry

PROCESS_LOCAL

pendingTasksForExecutor

NODE_LOCAL

pendingTasksForHost

NO_PREF

pendingTasksWithNoPrefs

RACK_LOCAL

pendingTasksForRack

computeValidLocalityLevels walks over every internal registry and if it is not empty computes locality wait for the corresponding TaskLocality and proceeds with it only when the locality wait is not 0.

For TaskLocality with pending tasks, computeValidLocalityLevels asks TaskSchedulerImpl whether there is at least one executor alive (for PROCESS_LOCAL, NODE_LOCAL and RACK_LOCAL) and if so registers the TaskLocality.

Note
computeValidLocalityLevels uses TaskSchedulerImpl that was given when TaskSetManager was created.

computeValidLocalityLevels always registers ANY task locality level.

In the end, you should see the following DEBUG message in the logs:

DEBUG TaskSetManager: Valid locality levels for [taskSet]: [comma-separated levels]
Note
computeValidLocalityLevels is used when TaskSetManager is created and later to recompute locality.

Finding Locality Wait — getLocalityWait Internal Method

getLocalityWait(level: TaskLocality): Long

getLocalityWait finds locality wait (in milliseconds) for a given TaskLocality.

getLocalityWait uses spark.locality.wait (default: 3s) when the TaskLocality-specific property is not defined or 0 for NO_PREF and ANY.

Note
NO_PREF and ANY task localities have no locality wait.
Table 3. TaskLocalities and Corresponding Spark Properties
TaskLocality Spark Property

PROCESS_LOCAL

spark.locality.wait.process

NODE_LOCAL

spark.locality.wait.node

RACK_LOCAL

spark.locality.wait.rack

Note
getLocalityWait is used when TaskSetManager calculates localityWaits, computes locality levels (for scheduled tasks) and recomputes locality preferences.

Settings

Table 4. Spark Properties
Spark Property Default Value Description

spark.driver.maxResultSize

1g

The maximum size of all the task results in a TaskSet. If the value is smaller than 1m or 1048576 (1024 * 1024), it is considered 0.

Used when TaskSetManager checks available memory for a task result and Utils.getMaxResultSize.

spark.scheduler.executorTaskBlacklistTime

0L

Time interval to pass after which a task can be re-launched on the executor where it has once failed. It is to prevent repeated task failures due to executor failures.

spark.logging.exceptionPrintInterval

10000

How frequently to reprint duplicate exceptions in full (in millis).

spark.locality.wait

3s

For locality-aware delay scheduling for PROCESS_LOCAL, NODE_LOCAL, and RACK_LOCAL TaskLocalities when locality-specific setting is not set.

spark.locality.wait.process

The value of spark.locality.wait

Scheduling delay for PROCESS_LOCAL TaskLocality

spark.locality.wait.node

The value of spark.locality.wait

Scheduling delay for NODE_LOCAL TaskLocality

spark.locality.wait.rack

The value of spark.locality.wait

Scheduling delay for RACK_LOCAL TaskLocality

results matching ""

    No results matching ""