CoarseGrainedExecutorBackend
CoarseGrainedExecutorBackend is a standalone application that is started in a resource container when:
- 
Spark Standalone’s
StandaloneSchedulerBackendstarts - 
Spark on YARN’s
ExecutorRunnableis started. - 
Spark on Mesos’s
MesosCoarseGrainedSchedulerBackendlaunches Spark executors 
When started, CoarseGrainedExecutorBackend registers the Executor RPC endpoint to communicate with the driver (i.e. with CoarseGrainedScheduler RPC endpoint).
When launched, CoarseGrainedExecutorBackend immediately connects to the owning CoarseGrainedSchedulerBackend to inform that it is ready to launch tasks.
CoarseGrainedExecutorBackend is an ExecutorBackend that controls the lifecycle of a single executor and sends the executor’s status updates to the driver.
CoarseGrainedExecutorBackend is a ThreadSafeRpcEndpoint that connects to the driver (before accepting messages) and shuts down when the driver disconnects.
| Message | Description | 
|---|---|
Forwards launch task requests from the driver to the single managed coarse-grained executor.  | 
|
Creates the single managed Executor. Sent exclusively when   | 
|
| Name | Initial Value | Description | 
|---|---|---|
Initialized when  NOTE:   | 
||
(empty)  | 
RpcEndpointRef of the driver  | 
|
  | 
Enabled when  Used when   | 
|
(uninitialized)  | 
Single managed coarse-grained Executor managed exclusively by the  Initialized after   | 
| 
 Tip 
 | 
 Enable  Add the following line to  
 | 
 Forwarding Launch Task Request to Executor (from Driver) — LaunchTask Message Handler
LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
| 
 Note 
 | 
CoarseGrainedExecutorBackend acts as a proxy between the driver and the managed single executor and merely re-packages LaunchTask payload (as serialized data) to pass it along for execution.
 | 
LaunchTask first decodes TaskDescription from data. You should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Got assigned task [id]
LaunchTask then launches the task on the executor (passing itself as the owning ExecutorBackend and decoded TaskDescription).
If executor is not available, LaunchTask terminates CoarseGrainedExecutorBackend with the error code 1 and ExecutorLossReason with the following message:
Received LaunchTask command but executor was null
| 
 Note 
 | 
LaunchTask is sent when CoarseGrainedSchedulerBackend launches tasks (one LaunchTask per task).
 | 
 Sending Task Status Updates to Driver — statusUpdate Method
| 
 Note 
 | 
statusUpdate is a part of ExecutorBackend contract to send task status updates to a scheduler (on the driver).
 | 
statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
statusUpdate creates a StatusUpdate (with the input taskId, state, and data together with the executor id) and sends it to the driver (if already defined).
When no driver is available, you should see the following WARN message in the logs:
WARN Drop [msg] because has not yet connected to driver
Driver’s URL
The driver’s URL is of the format spark://[RpcEndpoint name]@[hostname]:[port], e.g. spark://[email protected]:64859.
 Launching CoarseGrainedExecutorBackend Standalone Application (in Resource Container) — main Method
CoarseGrainedExecutorBackend is a standalone application (i.e. comes with main entry method) that parses command-line arguments and runs CoarseGrainedExecutorBackend’s Executor RPC endpoint to communicate with the driver.
| Argument | Required? | Description | 
|---|---|---|
yes  | 
Driver’s URL. See driver’s URL  | 
|
yes  | 
Executor id  | 
|
yes  | 
Host name  | 
|
yes  | 
Number of cores (that must be greater than   | 
|
yes  | 
Application id  | 
|
no  | 
Worker’s URL, e.g.  NOTE:   | 
|
no  | 
User-defined class path entry which can be an URL or path to a resource (often a jar file) to be added to CLASSPATH; can be specified multiple times.  | 
When executed with unrecognized command-line arguments or required arguments are missing, main shows the usage help and exits (with exit status 1).
$ ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend
Usage: CoarseGrainedExecutorBackend [options]
 Options are:
   --driver-url <driverUrl>
   --executor-id <executorId>
   --hostname <hostname>
   --cores <cores>
   --app-id <appid>
   --worker-url <workerUrl>
   --user-class-path <url>
| 
 Note 
 | 
 
 
  | 
 Running CoarseGrainedExecutorBackend (and Registering Executor RPC Endpoint) — run Internal Method
run(
  driverUrl: String,
  executorId: String,
  hostname: String,
  cores: Int,
  appId: String,
  workerUrl: Option[String],
  userClassPath: scala.Seq[URL]): Unit
When executed, run executes Utils.initDaemon(log).
| 
 Caution 
 | 
FIXME What does initDaemon do?
 | 
| 
 Note 
 | 
run runs itself with a Hadoop UserGroupInformation (as a thread local variable distributed to child threads for authenticating HDFS and YARN calls).
 | 
| 
 Note 
 | 
run expects a clear hostname with no : included (for a port perhaps).
 | 
run uses spark.executor.port Spark property (or 0 if not set) for the port to create a RpcEnv called driverPropsFetcher (together with the input hostname and clientMode enabled).
run resolves RpcEndpointRef for the input driverUrl and requests SparkAppConfig (by posting a blocking RetrieveSparkAppConfig).
| 
 Important 
 | 
This is the first moment when CoarseGrainedExecutorBackend initiates communication with the driver available at driverUrl through RpcEnv.
 | 
run uses SparkAppConfig to get the driver’s sparkProperties and adds spark.app.id Spark property with the value of the input appId.
run creates a SparkConf using the Spark properties fetched from the driver, i.e. with the executor-related Spark settings if they were missing and the rest unconditionally.
If spark.yarn.credentials.file Spark property is defined in SparkConf, you should see the following INFO message in the logs:
INFO Will periodically update credentials from: [spark.yarn.credentials.file]
| 
 Note 
 | 
run uses SparkHadoopUtil.get to access the current SparkHadoopUtil.
 | 
run creates SparkEnv for executors (with the input executorId, hostname and cores, and isLocal disabled).
| 
 Important 
 | 
This is the moment when SparkEnv gets created with all the executor services.
 | 
run sets up an RPC endpoint with the name Executor and CoarseGrainedExecutorBackend as the endpoint.
(only in Spark Standalone) If the optional input workerUrl was defined, run sets up an RPC endpoint with the name WorkerWatcher and WorkerWatcher RPC endpoint.
| 
 Note 
 | 
 The optional input  
  | 
run's main thread is blocked until RpcEnv terminates and only the RPC endpoints process RPC messages.
Once RpcEnv has terminated, run stops the credential updater.
| 
 Caution 
 | 
FIXME Think of the place for Utils.initDaemon, Utils.getProcessName et al.
 | 
| 
 Note 
 | 
run is used exclusively when CoarseGrainedExecutorBackend standalone application is launched.
 | 
Creating CoarseGrainedExecutorBackend Instance
CoarseGrainedExecutorBackend takes the following when created:
| 
 Note 
 | 
driverUrl, executorId, hostname, cores and userClassPath correspond to CoarseGrainedExecutorBackend standalone application’s command-line arguments.
 | 
CoarseGrainedExecutorBackend initializes the internal properties.
| 
 Note 
 | 
CoarseGrainedExecutorBackend is created (to act as an RPC endpoint) when Executor RPC endpoint is registered.
 | 
 Registering with Driver — onStart Method
onStart(): Unit
| 
 Note 
 | 
onStart is a part of RpcEndpoint contract that is executed before a RPC endpoint starts accepting messages.
 | 
When executed, you should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Connecting to driver: [driverUrl]
| 
 Note 
 | 
driverUrl is given when CoarseGrainedExecutorBackend is created.
 | 
onStart then takes the RpcEndpointRef of the driver asynchronously and initializes the internal driver property. onStart sends a blocking RegisterExecutor message immediately (with executorId, RpcEndpointRef to itself, hostname, cores and log URLs).
In case of failures, onStart terminates CoarseGrainedExecutorBackend with the error code 1 and the reason (and no notification to the driver):
Cannot register with driver: [driverUrl]
 Creating Single Managed Executor — RegisteredExecutor Message Handler
RegisteredExecutor
extends CoarseGrainedClusterMessage with RegisterExecutorResponse
When RegisteredExecutor is received, you should see the following INFO in the logs:
INFO CoarseGrainedExecutorBackend: Successfully registered with driver
CoarseGrainedExecutorBackend creates a Executor (with isLocal disabled) that becomes the single managed Executor.
| 
 Note 
 | 
CoarseGrainedExecutorBackend uses executorId, hostname, env, userClassPath to create the Executor that are specified when CoarseGrainedExecutorBackend is created.
 | 
If creating the Executor fails with a non-fatal exception, RegisteredExecutor terminates CoarseGrainedExecutorBackend with the reason:
Unable to create executor due to [message]
| 
 Note 
 | 
RegisteredExecutor is sent exclusively when CoarseGrainedSchedulerBackend RPC Endpoint receives a RegisterExecutor (that is sent right before CoarseGrainedExecutorBackend RPC Endpoint starts accepting messages which happens when CoarseGrainedExecutorBackend is started).
 | 
RegisterExecutorFailed
RegisterExecutorFailed(message)
When a RegisterExecutorFailed message arrives, the following ERROR is printed out to the logs:
ERROR CoarseGrainedExecutorBackend: Slave registration failed: [message]
CoarseGrainedExecutorBackend then exits with the exit code 1.
 Killing Tasks — KillTask Message Handler
KillTask(taskId, _, interruptThread) message kills a task (calls Executor.killTask).
If an executor has not been initialized yet (FIXME: why?), the following ERROR message is printed out to the logs and CoarseGrainedExecutorBackend exits:
ERROR Received KillTask command but executor was null
StopExecutor Handler
case object StopExecutor
extends CoarseGrainedClusterMessage
When StopExecutor is received, the handler turns stopping internal flag on. You should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
In the end, the handler sends a Shutdown message to itself.
| 
 Note 
 | 
StopExecutor message is sent when CoarseGrainedSchedulerBackend RPC Endpoint (aka DriverEndpoint) processes StopExecutors or RemoveExecutor messages.
 | 
Shutdown Handler
case object Shutdown
extends CoarseGrainedClusterMessage
Shutdown turns stopping internal flag on and starts the CoarseGrainedExecutorBackend-stop-executor thread that stops the owned Executor (using executor reference).
| 
 Note 
 | 
Shutdown message is sent exclusively when CoarseGrainedExecutorBackend receives StopExecutor.
 | 
 Terminating CoarseGrainedExecutorBackend (and Notifying Driver with RemoveExecutor) — exitExecutor Method
exitExecutor(
  code: Int,
  reason: String,
  throwable: Throwable = null,
  notifyDriver: Boolean = true): Unit
When exitExecutor is executed, you should see the following ERROR message in the logs (followed by throwable if available):
ERROR Executor self-exiting due to : [reason]
If notifyDriver is enabled (it is by default) exitExecutor informs the driver that the executor should be removed (by sending a blocking RemoveExecutor message with executor id and a ExecutorLossReason with the input reason).
You may see the following WARN message in the logs when the notification fails.
Unable to notify the driver due to [message]
In the end, exitExecutor terminates the CoarseGrainedExecutorBackend JVM process with the status code.
| 
 Note 
 | 
exitExecutor uses Java’s System.exit and initiates JVM’s shutdown sequence (and executing all registered shutdown hooks).
 | 
| 
 Note 
 | 
 
 
  | 
 onDisconnected Callback
| 
 Caution 
 | 
FIXME | 
 start Method
| 
 Caution 
 | 
FIXME | 
 stop Method
| 
 Caution 
 | 
FIXME | 
 requestTotalExecutors
| 
 Caution 
 | 
FIXME | 
 Extracting Log URLs — extractLogUrls Method
| 
 Caution 
 | 
FIXME |