CoarseGrainedExecutorBackend

CoarseGrainedExecutorBackend is a standalone application that is started in a resource container when:

When started, CoarseGrainedExecutorBackend registers the Executor RPC endpoint to communicate with the driver (i.e. with CoarseGrainedScheduler RPC endpoint).

CoarseGrainedExecutorBackend.png
Figure 1. CoarseGrainedExecutorBackend Communicates with Driver’s CoarseGrainedSchedulerBackend Endpoint

When launched, CoarseGrainedExecutorBackend immediately connects to the owning CoarseGrainedSchedulerBackend to inform that it is ready to launch tasks.

CoarseGrainedExecutorBackend is an ExecutorBackend that controls the lifecycle of a single executor and sends the executor’s status updates to the driver.

CoarseGrainedExecutorBackend statusUpdate.png
Figure 2. CoarseGrainedExecutorBackend Sending Task Status Updates to Driver’s CoarseGrainedScheduler Endpoint

CoarseGrainedExecutorBackend is a ThreadSafeRpcEndpoint that connects to the driver (before accepting messages) and shuts down when the driver disconnects.

Table 1. CoarseGrainedExecutorBackend’s Executor RPC Endpoint Messages (in alphabetical order)
Message Description

KillTask

LaunchTask

Forwards launch task requests from the driver to the single managed coarse-grained executor.

RegisteredExecutor

Creates the single managed Executor.

Sent exclusively when CoarseGrainedSchedulerBackend receives RegisterExecutor.

RegisterExecutorFailed

StopExecutor

Shutdown

Table 2. CoarseGrainedExecutorBackend’s Internal Properties
Name Initial Value Description

ser

SerializerInstance

Initialized when CoarseGrainedExecutorBackend is created.

NOTE: CoarseGrainedExecutorBackend uses the input env to access closureSerializer.

driver

(empty)

RpcEndpointRef of the driver

FIXME

stopping

false

Enabled when CoarseGrainedExecutorBackend gets notified to stop itself or shut down the managed executor.

Used when CoarseGrainedExecutorBackend RPC Endpoint gets notified that a remote RPC endpoint disconnected.

executor

(uninitialized)

Single managed coarse-grained Executor managed exclusively by the CoarseGrainedExecutorBackend to forward launch and kill task requests to from the driver.

Initialized after CoarseGrainedExecutorBackend has registered with CoarseGrainedSchedulerBackend and stopped when CoarseGrainedExecutorBackend gets requested to shut down.

Tip

Enable INFO logging level for org.apache.spark.executor.CoarseGrainedExecutorBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.executor.CoarseGrainedExecutorBackend=INFO

Forwarding Launch Task Request to Executor (from Driver) — LaunchTask Message Handler

LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
Note
CoarseGrainedExecutorBackend acts as a proxy between the driver and the managed single executor and merely re-packages LaunchTask payload (as serialized data) to pass it along for execution.

LaunchTask first decodes TaskDescription from data. You should see the following INFO message in the logs:

INFO CoarseGrainedExecutorBackend: Got assigned task [id]

LaunchTask then launches the task on the executor (passing itself as the owning ExecutorBackend and decoded TaskDescription).

If executor is not available, LaunchTask terminates CoarseGrainedExecutorBackend with the error code 1 and ExecutorLossReason with the following message:

Received LaunchTask command but executor was null
Note
LaunchTask is sent when CoarseGrainedSchedulerBackend launches tasks (one LaunchTask per task).

Sending Task Status Updates to Driver — statusUpdate Method

Note
statusUpdate is a part of ExecutorBackend contract to send task status updates to a scheduler (on the driver).
statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit

statusUpdate creates a StatusUpdate (with the input taskId, state, and data together with the executor id) and sends it to the driver (if already defined).

CoarseGrainedExecutorBackend statusUpdate.png
Figure 3. CoarseGrainedExecutorBackend Sending Task Status Updates to Driver’s CoarseGrainedScheduler Endpoint

When no driver is available, you should see the following WARN message in the logs:

WARN Drop [msg] because has not yet connected to driver

Driver’s URL

The driver’s URL is of the format spark://[RpcEndpoint name]@[hostname]:[port], e.g. spark://[email protected]:64859.

Launching CoarseGrainedExecutorBackend Standalone Application (in Resource Container) — main Method

CoarseGrainedExecutorBackend is a standalone application (i.e. comes with main entry method) that parses command-line arguments and runs CoarseGrainedExecutorBackend’s Executor RPC endpoint to communicate with the driver.

Table 3. CoarseGrainedExecutorBackend Command-Line Arguments
Argument Required? Description

--driver-url

yes

Driver’s URL. See driver’s URL

--executor-id

yes

Executor id

--hostname

yes

Host name

--cores

yes

Number of cores (that must be greater than 0).

--app-id

yes

Application id

--worker-url

no

Worker’s URL, e.g. spark://[email protected]:64557

NOTE: --worker-url is only used in Spark Standalone to enforce fate-sharing with the worker.

--user-class-path

no

User-defined class path entry which can be an URL or path to a resource (often a jar file) to be added to CLASSPATH; can be specified multiple times.

When executed with unrecognized command-line arguments or required arguments are missing, main shows the usage help and exits (with exit status 1).

$ ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend

Usage: CoarseGrainedExecutorBackend [options]

 Options are:
   --driver-url <driverUrl>
   --executor-id <executorId>
   --hostname <hostname>
   --cores <cores>
   --app-id <appid>
   --worker-url <workerUrl>
   --user-class-path <url>
Note

main is used when:

Running CoarseGrainedExecutorBackend (and Registering Executor RPC Endpoint) — run Internal Method

run(
  driverUrl: String,
  executorId: String,
  hostname: String,
  cores: Int,
  appId: String,
  workerUrl: Option[String],
  userClassPath: scala.Seq[URL]): Unit

When executed, run executes Utils.initDaemon(log).

Caution
FIXME What does initDaemon do?
Note
run runs itself with a Hadoop UserGroupInformation (as a thread local variable distributed to child threads for authenticating HDFS and YARN calls).
Note
run expects a clear hostname with no : included (for a port perhaps).

run uses spark.executor.port Spark property (or 0 if not set) for the port to create a RpcEnv called driverPropsFetcher (together with the input hostname and clientMode enabled).

run resolves RpcEndpointRef for the input driverUrl and requests SparkAppConfig (by posting a blocking RetrieveSparkAppConfig).

Important
This is the first moment when CoarseGrainedExecutorBackend initiates communication with the driver available at driverUrl through RpcEnv.

run uses SparkAppConfig to get the driver’s sparkProperties and adds spark.app.id Spark property with the value of the input appId.

run creates a SparkConf using the Spark properties fetched from the driver, i.e. with the executor-related Spark settings if they were missing and the rest unconditionally.

If spark.yarn.credentials.file Spark property is defined in SparkConf, you should see the following INFO message in the logs:

INFO Will periodically update credentials from: [spark.yarn.credentials.file]
Note
run uses SparkHadoopUtil.get to access the current SparkHadoopUtil.

run creates SparkEnv for executors (with the input executorId, hostname and cores, and isLocal disabled).

Important
This is the moment when SparkEnv gets created with all the executor services.

run sets up an RPC endpoint with the name Executor and CoarseGrainedExecutorBackend as the endpoint.

(only in Spark Standalone) If the optional input workerUrl was defined, run sets up an RPC endpoint with the name WorkerWatcher and WorkerWatcher RPC endpoint.

Note

The optional input workerUrl is defined only when --worker-url command-line argument was used to launch CoarseGrainedExecutorBackend standalone application.

--worker-url is only used in Spark Standalone.

run's main thread is blocked until RpcEnv terminates and only the RPC endpoints process RPC messages.

Once RpcEnv has terminated, run stops the credential updater.

Caution
FIXME Think of the place for Utils.initDaemon, Utils.getProcessName et al.

Creating CoarseGrainedExecutorBackend Instance

CoarseGrainedExecutorBackend takes the following when created:

  1. RpcEnv

  2. driverUrl

  3. executorId

  4. hostname

  5. cores

  6. userClassPath

  7. SparkEnv

Note
driverUrl, executorId, hostname, cores and userClassPath correspond to CoarseGrainedExecutorBackend standalone application’s command-line arguments.

CoarseGrainedExecutorBackend initializes the internal properties.

Note
CoarseGrainedExecutorBackend is created (to act as an RPC endpoint) when Executor RPC endpoint is registered.

Registering with Driver — onStart Method

onStart(): Unit
Note
onStart is a part of RpcEndpoint contract that is executed before a RPC endpoint starts accepting messages.

When executed, you should see the following INFO message in the logs:

INFO CoarseGrainedExecutorBackend: Connecting to driver: [driverUrl]

onStart then takes the RpcEndpointRef of the driver asynchronously and initializes the internal driver property. onStart sends a blocking RegisterExecutor message immediately (with executorId, RpcEndpointRef to itself, hostname, cores and log URLs).

In case of failures, onStart terminates CoarseGrainedExecutorBackend with the error code 1 and the reason (and no notification to the driver):

Cannot register with driver: [driverUrl]

Creating Single Managed Executor — RegisteredExecutor Message Handler

RegisteredExecutor
extends CoarseGrainedClusterMessage with RegisterExecutorResponse

When RegisteredExecutor is received, you should see the following INFO in the logs:

INFO CoarseGrainedExecutorBackend: Successfully registered with driver

CoarseGrainedExecutorBackend creates a Executor (with isLocal disabled) that becomes the single managed Executor.

Note
CoarseGrainedExecutorBackend uses executorId, hostname, env, userClassPath to create the Executor that are specified when CoarseGrainedExecutorBackend is created.

If creating the Executor fails with a non-fatal exception, RegisteredExecutor terminates CoarseGrainedExecutorBackend with the reason:

Unable to create executor due to [message]
Note
RegisteredExecutor is sent exclusively when CoarseGrainedSchedulerBackend RPC Endpoint receives a RegisterExecutor (that is sent right before CoarseGrainedExecutorBackend RPC Endpoint starts accepting messages which happens when CoarseGrainedExecutorBackend is started).

RegisterExecutorFailed

RegisterExecutorFailed(message)

When a RegisterExecutorFailed message arrives, the following ERROR is printed out to the logs:

ERROR CoarseGrainedExecutorBackend: Slave registration failed: [message]

CoarseGrainedExecutorBackend then exits with the exit code 1.

Killing Tasks — KillTask Message Handler

KillTask(taskId, _, interruptThread) message kills a task (calls Executor.killTask).

If an executor has not been initialized yet (FIXME: why?), the following ERROR message is printed out to the logs and CoarseGrainedExecutorBackend exits:

ERROR Received KillTask command but executor was null

StopExecutor Handler

case object StopExecutor
extends CoarseGrainedClusterMessage

When StopExecutor is received, the handler turns stopping internal flag on. You should see the following INFO message in the logs:

INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown

In the end, the handler sends a Shutdown message to itself.

Note
StopExecutor message is sent when CoarseGrainedSchedulerBackend RPC Endpoint (aka DriverEndpoint) processes StopExecutors or RemoveExecutor messages.

Shutdown Handler

case object Shutdown
extends CoarseGrainedClusterMessage

Shutdown turns stopping internal flag on and starts the CoarseGrainedExecutorBackend-stop-executor thread that stops the owned Executor (using executor reference).

Note
Shutdown message is sent exclusively when CoarseGrainedExecutorBackend receives StopExecutor.

Terminating CoarseGrainedExecutorBackend (and Notifying Driver with RemoveExecutor) — exitExecutor Method

exitExecutor(
  code: Int,
  reason: String,
  throwable: Throwable = null,
  notifyDriver: Boolean = true): Unit

When exitExecutor is executed, you should see the following ERROR message in the logs (followed by throwable if available):

ERROR Executor self-exiting due to : [reason]

If notifyDriver is enabled (it is by default) exitExecutor informs the driver that the executor should be removed (by sending a blocking RemoveExecutor message with executor id and a ExecutorLossReason with the input reason).

You may see the following WARN message in the logs when the notification fails.

Unable to notify the driver due to [message]

In the end, exitExecutor terminates the CoarseGrainedExecutorBackend JVM process with the status code.

Note
exitExecutor uses Java’s System.exit and initiates JVM’s shutdown sequence (and executing all registered shutdown hooks).
Note

exitExecutor is used when:

onDisconnected Callback

Caution
FIXME

start Method

Caution
FIXME

stop Method

Caution
FIXME

requestTotalExecutors

Caution
FIXME

Extracting Log URLs — extractLogUrls Method

Caution
FIXME

results matching ""

    No results matching ""