TaskScheduler — Spark Scheduler

TaskScheduler is responsible for submitting tasks for execution in a Spark application (per scheduling policy).

sparkstandalone sparkcontext taskscheduler schedulerbackend.png
Figure 1. TaskScheduler works for a single SparkContext
Note
TaskScheduler works closely with DAGScheduler that submits sets of tasks for execution (for every stage in a Spark job).

TaskScheduler tracks the executors in a Spark application using executorHeartbeatReceived and executorLost methods that are to inform about active and lost executors, respectively.

Spark comes with the following custom TaskSchedulers:

Note
The source of TaskScheduler is available in org.apache.spark.scheduler.TaskScheduler.

TaskScheduler Contract

trait TaskScheduler {
  def applicationAttemptId(): Option[String]
  def applicationId(): String
  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
  def defaultParallelism(): Int
  def executorHeartbeatReceived(
    execId: String,
    accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
    blockManagerId: BlockManagerId): Boolean
  def executorLost(executorId: String, reason: ExecutorLossReason): Unit
  def postStartHook(): Unit
  def rootPool: Pool
  def schedulingMode: SchedulingMode
  def setDAGScheduler(dagScheduler: DAGScheduler): Unit
  def start(): Unit
  def stop(): Unit
  def submitTasks(taskSet: TaskSet): Unit
}
Note
TaskScheduler is a private[spark] contract.
Table 1. TaskScheduler Contract
Method Description

applicationAttemptId

Unique identifier of an (execution) attempt of a Spark application.

Used exclusively when SparkContext is initialized.

applicationId

Unique identifier of a Spark application.

By default, it is in the format spark-application-[System.currentTimeMillis].

Used exclusively when SparkContext is initialized (to set spark.app.id).

cancelTasks

Cancels all tasks of a given stage.

Used exclusively when DAGScheduler fails a Spark job and independent single-job stages.

defaultParallelism

Calculates the default level of parallelism.

Used when SparkContext is requested for the default level of parallelism.

executorHeartbeatReceived

Intercepts heartbeats (with task metrics) from executors.

executorHeartbeatReceived(
  execId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId): Boolean

Expected to return true when the executor execId is managed by the TaskScheduler. false is to indicate that the block manager (on the executor) should re-register.

Used exclusively when HeartbeatReceiver RPC endpoint receives a heartbeat and task metrics from an executor.

executorLost

Intercepts events about executors getting lost.

Used when HeartbeatReceiver RPC endpoint gets informed about disconnected executors (i.e. that are no longer available) and when DriverEndpoint forgets or disables malfunctioning executors (i.e. either lost or blacklisted for some reason).

postStartHook

Post-start initialization.

Does nothing by default, but allows custom implementations for some additional post-start initialization.

Used exclusively when SparkContext is created (right before SparkContext is considered fully initialized).

rootPool

Pool (of Schedulables).

schedulingMode

Scheduling mode.

Puts tasks in order according to a scheduling policy (as schedulingMode). It is used in SparkContext.getSchedulingMode.

setDAGScheduler

Assigns DAGScheduler.

Used exclusively when DAGScheduler is created (and passes on a reference to itself).

start

Starts TaskScheduler.

Used exclusively when SparkContext is created.

stop

Stops TaskScheduler.

Used exclusively when DAGScheduler is stopped.

submitTasks

Submits tasks for execution (as TaskSet) of a given stage.

Used exclusively when DAGScheduler submits tasks (of a stage) for execution.

TaskScheduler’s Lifecycle

A TaskScheduler is created while SparkContext is being created (by calling SparkContext.createTaskScheduler for a given master URL and deploy mode).

taskscheduler uses schedulerbackend.png
Figure 2. TaskScheduler uses SchedulerBackend to support different clusters

At this point in SparkContext’s lifecycle, the internal _taskScheduler points at the TaskScheduler (and it is "announced" by sending a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint).

The TaskScheduler is started right after the blocking TaskSchedulerIsSet message receives a response.

The application ID and the application’s attempt ID are set at this point (and SparkContext uses the application id to set spark.app.id Spark property, and configure SparkUI, and BlockManager).

Caution
FIXME The application id is described as "associated with the job." in TaskScheduler, but I think it is "associated with the application" and you can have many jobs per application.

Right before SparkContext is fully initialized, TaskScheduler.postStartHook is called.

The internal _taskScheduler is cleared (i.e. set to null) while SparkContext is being stopped.

Warning
FIXME If it is SparkContext to start a TaskScheduler, shouldn’t SparkContext stop it too? Why is this the way it is now?

results matching ""

    No results matching ""