onJobStart(jobStart: SparkListenerJobStart): Unit
JobProgressListener Spark Listener
JobProgressListener is a SparkListener for web UI.
JobProgressListener intercepts the following Spark events.
| Handler | Purpose |
|---|---|
Creates a JobUIData. It updates jobGroupToJobIds, pendingStages, jobIdToData, activeJobs, stageIdToActiveJobIds, stageIdToInfo and stageIdToData. |
|
Removes an entry in activeJobs. It also removes entries in pendingStages and stageIdToActiveJobIds. It updates completedJobs, numCompletedJobs, failedJobs, numFailedJobs and skippedStages. |
|
Updates the |
|
Updates the task’s |
|
Updates the task’s |
|
|
Sets Used in Jobs tab (for the Scheduling Mode), and to display pools in FIXME: Add the links/screenshots for pools. |
|
Records an executor and its block manager in the internal executorIdToBlockManagerId registry. |
|
Removes the executor from the internal executorIdToBlockManagerId registry. |
|
Records a Spark application’s start time (in the internal Used in Jobs tab (for a total uptime and the event timeline) and Job page (for the event timeline). |
|
Records a Spark application’s end time (in the internal Used in Jobs tab (for a total uptime). |
|
Does nothing. FIXME: Why is this event intercepted at all?! |
updateAggregateMetrics Method
|
Caution
|
FIXME |
Registries and Counters
JobProgressListener uses registries to collect information about job executions.
| Name | Description |
|---|---|
Holds StageUIData per stage, i.e. the stage and stage attempt ids. |
|
The lookup table for Used to track block managers so the Stage page can display FIXME: How does Executors page collect the very same information? |
onJobStart Method
onJobStart creates a JobUIData. It updates jobGroupToJobIds, pendingStages, jobIdToData, activeJobs, stageIdToActiveJobIds, stageIdToInfo and stageIdToData.
onJobStart reads the optional Spark Job group id as spark.jobGroup.id (from properties in the input jobStart).
onJobStart then creates a JobUIData using the input jobStart with status attribute set to JobExecutionStatus.RUNNING and records it in jobIdToData and activeJobs registries.
onJobStart looks the job ids for the group id (in jobGroupToJobIds registry) and adds the job id.
The internal pendingStages is updated with StageInfo for the stage id (for every StageInfo in SparkListenerJobStart.stageInfos collection).
onJobStart records the stages of the job in stageIdToActiveJobIds.
onJobStart records StageInfos in stageIdToInfo and stageIdToData.
onJobEnd Method
onJobEnd(jobEnd: SparkListenerJobEnd): Unit
onJobEnd removes an entry in activeJobs. It also removes entries in pendingStages and stageIdToActiveJobIds. It updates completedJobs, numCompletedJobs, failedJobs, numFailedJobs and skippedStages.
onJobEnd removes the job from activeJobs registry. It removes stages from pendingStages registry.
When completed successfully, the job is added to completedJobs registry with status attribute set to JobExecutionStatus.SUCCEEDED. numCompletedJobs gets incremented.
When failed, the job is added to failedJobs registry with status attribute set to JobExecutionStatus.FAILED. numFailedJobs gets incremented.
For every stage in the job, the stage is removed from the active jobs (in stageIdToActiveJobIds) that can remove the entire entry if no active jobs exist.
Every pending stage in stageIdToInfo gets added to skippedStages.
onExecutorMetricsUpdate Method
onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
onTaskStart Method
onTaskStart(taskStart: SparkListenerTaskStart): Unit
onTaskStart updates StageUIData and JobUIData, and registers a new TaskUIData.
onTaskStart takes TaskInfo from the input taskStart.
onTaskStart looks the StageUIData for the stage and stage attempt ids up (in stageIdToData registry).
onTaskStart increments numActiveTasks and puts a TaskUIData for the task in stageData.taskData.
Ultimately, onTaskStart looks the stage in the internal stageIdToActiveJobIds and for each active job reads its JobUIData (from jobIdToData). It then increments numActiveTasks.
onTaskEnd Method
onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
onTaskEnd updates the StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.
onTaskEnd takes TaskInfo from the input taskEnd.
|
Note
|
onTaskEnd does its processing when the TaskInfo is available and stageAttemptId is not -1.
|
onTaskEnd looks the StageUIData for the stage and stage attempt ids up (in stageIdToData registry).
onTaskEnd saves accumulables in the StageUIData.
onTaskEnd reads the ExecutorSummary for the executor (the task has finished on).
Depending on the task end’s reason onTaskEnd increments succeededTasks, killedTasks or failedTasks counters.
onTaskEnd adds the task’s duration to taskTime.
onTaskEnd decrements the number of active tasks (in the StageUIData).
Again, depending on the task end’s reason onTaskEnd computes errorMessage and updates StageUIData.
|
Caution
|
FIXME Why is the same information in two different registries — stageData and execSummary?!
|
If taskMetrics is available, updateAggregateMetrics is executed.
The task’s TaskUIData is looked up in stageData.taskData and updateTaskInfo and updateTaskMetrics are executed. errorMessage is updated.
onTaskEnd makes sure that the number of tasks in StageUIData (stageData.taskData) is not above spark.ui.retainedTasks and drops the excess.
Ultimately, onTaskEnd looks the stage in the internal stageIdToActiveJobIds and for each active job reads its JobUIData (from jobIdToData). It then decrements numActiveTasks and increments numCompletedTasks, numKilledTasks or numFailedTasks depending on the task’s end reason.
onStageCompleted Method
onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
onStageCompleted updates the StageUIData and JobUIData.
onStageCompleted reads stageInfo from the input stageCompleted and records it in stageIdToInfo registry.
onStageCompleted looks the StageUIData for the stage and the stage attempt ids up in stageIdToData registry.
onStageCompleted records accumulables in StageUIData.
onStageCompleted removes the stage from poolToActiveStages and activeStages registries.
If the stage completed successfully (i.e. has no failureReason), onStageCompleted adds the stage to completedStages registry and increments numCompletedStages counter. It trims completedStages.
Otherwise, when the stage failed, onStageCompleted adds the stage to failedStages registry and increments numFailedStages counter. It trims failedStages.
Ultimately, onStageCompleted looks the stage in the internal stageIdToActiveJobIds and for each active job reads its JobUIData (from jobIdToData). It then decrements numActiveStages. When completed successfully, it adds the stage to completedStageIndices. With failure, numFailedStages gets incremented.
JobUIData
|
Caution
|
FIXME |
StageUIData
|
Caution
|
FIXME |