ExecutorsListener Spark Listener

ExecutorsListener is a SparkListener that tracks executors and their tasks in a Spark application for Stage Details page, Jobs tab and /allexecutors REST endpoint.

Table 1. ExecutorsListener’s SparkListener Callbacks (in alphabetical order)
Event Handler Description

onApplicationStart

May create an entry for the driver in executorToTaskSummary registry

onExecutorAdded

May create an entry in executorToTaskSummary registry. It also makes sure that the number of entries for dead executors does not exceed spark.ui.retainedDeadExecutors and removes excess.

Adds an entry to executorEvents registry and optionally removes the oldest if the number of entries exceeds spark.ui.timeline.executors.maximum.

onExecutorBlacklisted

FIXME

onExecutorRemoved

Marks an executor dead in executorToTaskSummary registry.

Adds an entry to executorEvents registry and optionally removes the oldest if the number of entries exceeds spark.ui.timeline.executors.maximum.

onExecutorUnblacklisted

FIXME

onNodeBlacklisted

FIXME

onNodeUnblacklisted

FIXME

onTaskStart

May create an entry for an executor in executorToTaskSummary registry.

onTaskEnd

May create an entry for an executor in executorToTaskSummary registry.

ExecutorsListener requires a StorageStatusListener and SparkConf.

Table 2. ExecutorsListener’s Internal Registries and Counters
Registry Description

executorToTaskSummary

The lookup table for ExecutorTaskSummary per executor id.

Used to build a ExecutorSummary for /allexecutors REST endpoint, to display stdout and stderr logs in Tasks and Aggregated Metrics by Executor sections in Stage Details page.

executorEvents

A collection of SparkListenerEvents.

Used to build the event timeline in All Jobs and Details for Job pages.

updateExecutorBlacklist Method

Caution
FIXME

Intercepting Executor Was Blacklisted Events — onExecutorBlacklisted Callback

Caution
FIXME

Intercepting Executor Is No Longer Blacklisted Events — onExecutorUnblacklisted Callback

Caution
FIXME

Intercepting Node Was Blacklisted Events — onNodeBlacklisted Callback

Caution
FIXME

Intercepting Node Is No Longer Blacklisted Events — onNodeUnblacklisted Callback

Caution
FIXME

Inactive/Dead BlockManagers — deadStorageStatusList Method

deadStorageStatusList: Seq[StorageStatus]
Note

deadStorageStatusList is used when:

Intercepting Application Started Events — onApplicationStart Callback

onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit
Note
onApplicationStart is a part of SparkListener contract to announce that a Spark application has been started.

onApplicationStart takes driverLogs property from the input applicationStart (if defined) and finds the driver’s active StorageStatus (using the current StorageStatusListener). onApplicationStart then uses the driver’s StorageStatus (if defined) to set executorLogs.

Table 3. ExecutorTaskSummary and ExecutorInfo Attributes
ExecutorTaskSummary Attribute SparkListenerApplicationStart Attribute

executorLogs

driverLogs (if defined)

Intercepting Executor Added Events — onExecutorAdded Callback

onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
Note
onExecutorAdded is a part of SparkListener contract to announce that a new executor has been registered with the Spark application.

onExecutorAdded finds the executor (using the input executorAdded) in the internal executorToTaskSummary registry and sets the attributes. If not found, onExecutorAdded creates a new entry.

Table 4. ExecutorTaskSummary and ExecutorInfo Attributes
ExecutorTaskSummary Attribute ExecutorInfo Attribute

executorLogs

logUrlMap

totalCores

totalCores

tasksMax

totalCores / spark.task.cpus

onExecutorAdded adds the input executorAdded to executorEvents collection. If the number of elements in executorEvents collection is greater than spark.ui.timeline.executors.maximum, the first/oldest event is removed.

onExecutorAdded removes the oldest dead executor from executorToTaskSummary lookup table if their number is greater than spark.ui.retainedDeadExecutors.

Intercepting Executor Removed Events — onExecutorRemoved Callback

onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
Note
onExecutorRemoved is a part of SparkListener contract to announce that an executor has been unregistered with the Spark application.

onExecutorRemoved adds the input executorRemoved to executorEvents collection. It then removes the oldest event if the number of elements in executorEvents collection is greater than spark.ui.timeline.executors.maximum.

The executor is marked as removed/inactive in executorToTaskSummary lookup table.

Intercepting Task Started Events — onTaskStart Callback

onTaskStart(taskStart: SparkListenerTaskStart): Unit
Note
onTaskStart is a part of SparkListener contract to announce that a task has been started.

onTaskStart increments tasksActive for the executor (using the input SparkListenerTaskStart).

Table 5. ExecutorTaskSummary and SparkListenerTaskStart Attributes
ExecutorTaskSummary Attribute Description

tasksActive

Uses taskStart.taskInfo.executorId.

Intercepting Task End Events — onTaskEnd Callback

onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
Note
onTaskEnd is a part of SparkListener contract to announce that a task has ended.

onTaskEnd takes TaskInfo from the input taskEnd (if available).

Depending on the reason for SparkListenerTaskEnd onTaskEnd does the following:

Table 6. onTaskEnd Behaviour per SparkListenerTaskEnd Reason
SparkListenerTaskEnd Reason onTaskEnd Behaviour

Resubmitted

Does nothing

ExceptionFailure

Increment tasksFailed

anything

Increment tasksComplete

tasksActive is decremented but only when the number of active tasks for the executor is greater than 0.

Table 7. ExecutorTaskSummary and onTaskEnd Behaviour
ExecutorTaskSummary Attribute Description

tasksActive

Decremented if greater than 0.

duration

Uses taskEnd.taskInfo.duration

If the TaskMetrics (in the input taskEnd) is available, the metrics are added to the taskSummary for the task’s executor.

Table 8. Task Metrics and Task Summary
Task Summary Task Metric

inputBytes

inputMetrics.bytesRead

inputRecords

inputMetrics.recordsRead

outputBytes

outputMetrics.bytesWritten

outputRecords

outputMetrics.recordsWritten

shuffleRead

shuffleReadMetrics.remoteBytesRead

shuffleWrite

shuffleWriteMetrics.bytesWritten

jvmGCTime

metrics.jvmGCTime

Finding Active BlockManagers — activeStorageStatusList Method

activeStorageStatusList: Seq[StorageStatus]

activeStorageStatusList requests StorageStatusListener for active BlockManagers (on executors).

Note

activeStorageStatusList is used when:

Settings

Table 9. Spark Properties
Spark Property Default Value Description

spark.ui.timeline.executors.maximum

1000

The maximum number of entries in executorEvents registry.

results matching ""

    No results matching ""