TaskScheduler — Spark Scheduler
TaskScheduler
is responsible for submitting tasks for execution in a Spark application (per scheduling policy).
Note
|
TaskScheduler works closely with DAGScheduler that submits sets of tasks for execution (for every stage in a Spark job).
|
TaskScheduler
tracks the executors in a Spark application using executorHeartbeatReceived and executorLost methods that are to inform about active and lost executors, respectively.
Spark comes with the following custom TaskSchedulers
:
-
TaskSchedulerImpl — the default
TaskScheduler
(that the following two YARN-specificTaskSchedulers
extend). -
YarnScheduler for Spark on YARN in client deploy mode.
-
YarnClusterScheduler for Spark on YARN in cluster deploy mode.
Note
|
The source of TaskScheduler is available in org.apache.spark.scheduler.TaskScheduler.
|
TaskScheduler Contract
trait TaskScheduler {
def applicationAttemptId(): Option[String]
def applicationId(): String
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
def defaultParallelism(): Int
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
def postStartHook(): Unit
def rootPool: Pool
def schedulingMode: SchedulingMode
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
def start(): Unit
def stop(): Unit
def submitTasks(taskSet: TaskSet): Unit
}
Note
|
TaskScheduler is a private[spark] contract.
|
Method | Description |
---|---|
Unique identifier of an (execution) attempt of a Spark application. Used exclusively when |
|
Unique identifier of a Spark application. By default, it is in the format Used exclusively when |
|
Cancels all tasks of a given stage. Used exclusively when |
|
Calculates the default level of parallelism. Used when |
|
Intercepts heartbeats (with task metrics) from executors.
Expected to return Used exclusively when |
|
Intercepts events about executors getting lost. Used when |
|
Post-start initialization. Does nothing by default, but allows custom implementations for some additional post-start initialization. Used exclusively when |
|
Pool (of Schedulables). |
|
Scheduling mode. Puts tasks in order according to a scheduling policy (as |
|
Assigns DAGScheduler. Used exclusively when |
|
Starts Used exclusively when |
|
Stops Used exclusively when |
|
Submits tasks for execution (as TaskSet) of a given stage. Used exclusively when |
TaskScheduler’s Lifecycle
A TaskScheduler
is created while SparkContext is being created (by calling SparkContext.createTaskScheduler for a given master URL and deploy mode).
At this point in SparkContext’s lifecycle, the internal _taskScheduler
points at the TaskScheduler
(and it is "announced" by sending a blocking TaskSchedulerIsSet
message to HeartbeatReceiver RPC endpoint).
The TaskScheduler is started right after the blocking TaskSchedulerIsSet
message receives a response.
The application ID and the application’s attempt ID are set at this point (and SparkContext
uses the application id to set spark.app.id Spark property, and configure SparkUI, and BlockManager).
Caution
|
FIXME The application id is described as "associated with the job." in TaskScheduler, but I think it is "associated with the application" and you can have many jobs per application. |
Right before SparkContext is fully initialized, TaskScheduler.postStartHook is called.
The internal _taskScheduler
is cleared (i.e. set to null
) while SparkContext is being stopped.
Warning
|
FIXME If it is SparkContext to start a TaskScheduler, shouldn’t SparkContext stop it too? Why is this the way it is now? |