CoarseGrainedExecutorBackend
CoarseGrainedExecutorBackend
is a standalone application that is started in a resource container when:
-
Spark Standalone’s
StandaloneSchedulerBackend
starts -
Spark on YARN’s
ExecutorRunnable
is started. -
Spark on Mesos’s
MesosCoarseGrainedSchedulerBackend
launches Spark executors
When started, CoarseGrainedExecutorBackend
registers the Executor RPC endpoint to communicate with the driver (i.e. with CoarseGrainedScheduler RPC endpoint).
When launched, CoarseGrainedExecutorBackend
immediately connects to the owning CoarseGrainedSchedulerBackend to inform that it is ready to launch tasks.
CoarseGrainedExecutorBackend
is an ExecutorBackend that controls the lifecycle of a single executor and sends the executor’s status updates to the driver.
CoarseGrainedExecutorBackend
is a ThreadSafeRpcEndpoint that connects to the driver (before accepting messages) and shuts down when the driver disconnects.
Message | Description |
---|---|
Forwards launch task requests from the driver to the single managed coarse-grained executor. |
|
Creates the single managed Executor. Sent exclusively when |
|
Name | Initial Value | Description |
---|---|---|
Initialized when NOTE: |
||
(empty) |
RpcEndpointRef of the driver |
|
|
Enabled when Used when |
|
(uninitialized) |
Single managed coarse-grained Executor managed exclusively by the Initialized after |
Tip
|
Enable Add the following line to
|
Forwarding Launch Task Request to Executor (from Driver) — LaunchTask
Message Handler
LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
Note
|
CoarseGrainedExecutorBackend acts as a proxy between the driver and the managed single executor and merely re-packages LaunchTask payload (as serialized data ) to pass it along for execution.
|
LaunchTask
first decodes TaskDescription
from data
. You should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Got assigned task [id]
LaunchTask
then launches the task on the executor (passing itself as the owning ExecutorBackend and decoded TaskDescription).
If executor is not available, LaunchTask
terminates CoarseGrainedExecutorBackend
with the error code 1
and ExecutorLossReason
with the following message:
Received LaunchTask command but executor was null
Note
|
LaunchTask is sent when CoarseGrainedSchedulerBackend launches tasks (one LaunchTask per task).
|
Sending Task Status Updates to Driver — statusUpdate
Method
Note
|
statusUpdate is a part of ExecutorBackend contract to send task status updates to a scheduler (on the driver).
|
statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
statusUpdate
creates a StatusUpdate (with the input taskId
, state
, and data
together with the executor id) and sends it to the driver (if already defined).
When no driver is available, you should see the following WARN message in the logs:
WARN Drop [msg] because has not yet connected to driver
Driver’s URL
The driver’s URL is of the format spark://[RpcEndpoint name]@[hostname]:[port]
, e.g. spark://[email protected]:64859
.
Launching CoarseGrainedExecutorBackend Standalone Application (in Resource Container) — main
Method
CoarseGrainedExecutorBackend
is a standalone application (i.e. comes with main
entry method) that parses command-line arguments and runs CoarseGrainedExecutorBackend’s Executor RPC endpoint to communicate with the driver.
Argument | Required? | Description |
---|---|---|
yes |
Driver’s URL. See driver’s URL |
|
yes |
Executor id |
|
yes |
Host name |
|
yes |
Number of cores (that must be greater than |
|
yes |
Application id |
|
no |
Worker’s URL, e.g. NOTE: |
|
no |
User-defined class path entry which can be an URL or path to a resource (often a jar file) to be added to CLASSPATH; can be specified multiple times. |
When executed with unrecognized command-line arguments or required arguments are missing, main
shows the usage help and exits (with exit status 1
).
$ ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend
Usage: CoarseGrainedExecutorBackend [options]
Options are:
--driver-url <driverUrl>
--executor-id <executorId>
--hostname <hostname>
--cores <cores>
--app-id <appid>
--worker-url <workerUrl>
--user-class-path <url>
Note
|
|
Running CoarseGrainedExecutorBackend (and Registering Executor RPC Endpoint) — run
Internal Method
run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: scala.Seq[URL]): Unit
When executed, run
executes Utils.initDaemon(log)
.
Caution
|
FIXME What does initDaemon do?
|
Note
|
run runs itself with a Hadoop UserGroupInformation (as a thread local variable distributed to child threads for authenticating HDFS and YARN calls).
|
Note
|
run expects a clear hostname with no : included (for a port perhaps).
|
run
uses spark.executor.port Spark property (or 0
if not set) for the port to create a RpcEnv
called driverPropsFetcher (together with the input hostname
and clientMode
enabled).
run
resolves RpcEndpointRef
for the input driverUrl
and requests SparkAppConfig
(by posting a blocking RetrieveSparkAppConfig
).
Important
|
This is the first moment when CoarseGrainedExecutorBackend initiates communication with the driver available at driverUrl through RpcEnv .
|
run
uses SparkAppConfig
to get the driver’s sparkProperties
and adds spark.app.id Spark property with the value of the input appId
.
run
creates a SparkConf using the Spark properties fetched from the driver, i.e. with the executor-related Spark settings if they were missing and the rest unconditionally.
If spark.yarn.credentials.file Spark property is defined in SparkConf
, you should see the following INFO message in the logs:
INFO Will periodically update credentials from: [spark.yarn.credentials.file]
Note
|
run uses SparkHadoopUtil.get to access the current SparkHadoopUtil .
|
run
creates SparkEnv
for executors (with the input executorId
, hostname
and cores
, and isLocal
disabled).
Important
|
This is the moment when SparkEnv gets created with all the executor services.
|
run
sets up an RPC endpoint with the name Executor and CoarseGrainedExecutorBackend as the endpoint.
(only in Spark Standalone) If the optional input workerUrl
was defined, run
sets up an RPC endpoint with the name WorkerWatcher and WorkerWatcher
RPC endpoint.
Note
|
The optional input
|
run
's main thread is blocked until RpcEnv
terminates and only the RPC endpoints process RPC messages.
Once RpcEnv
has terminated, run
stops the credential updater.
Caution
|
FIXME Think of the place for Utils.initDaemon , Utils.getProcessName et al.
|
Note
|
run is used exclusively when CoarseGrainedExecutorBackend standalone application is launched.
|
Creating CoarseGrainedExecutorBackend Instance
CoarseGrainedExecutorBackend
takes the following when created:
Note
|
driverUrl , executorId , hostname , cores and userClassPath correspond to CoarseGrainedExecutorBackend standalone application’s command-line arguments.
|
CoarseGrainedExecutorBackend
initializes the internal properties.
Note
|
CoarseGrainedExecutorBackend is created (to act as an RPC endpoint) when Executor RPC endpoint is registered.
|
Registering with Driver — onStart
Method
onStart(): Unit
Note
|
onStart is a part of RpcEndpoint contract that is executed before a RPC endpoint starts accepting messages.
|
When executed, you should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Connecting to driver: [driverUrl]
Note
|
driverUrl is given when CoarseGrainedExecutorBackend is created.
|
onStart
then takes the RpcEndpointRef
of the driver asynchronously and initializes the internal driver property. onStart
sends a blocking RegisterExecutor message immediately (with executorId, RpcEndpointRef to itself, hostname, cores and log URLs).
In case of failures, onStart
terminates CoarseGrainedExecutorBackend
with the error code 1
and the reason (and no notification to the driver):
Cannot register with driver: [driverUrl]
Creating Single Managed Executor — RegisteredExecutor
Message Handler
RegisteredExecutor
extends CoarseGrainedClusterMessage with RegisterExecutorResponse
When RegisteredExecutor
is received, you should see the following INFO in the logs:
INFO CoarseGrainedExecutorBackend: Successfully registered with driver
CoarseGrainedExecutorBackend
creates a Executor
(with isLocal
disabled) that becomes the single managed Executor.
Note
|
CoarseGrainedExecutorBackend uses executorId , hostname , env , userClassPath to create the Executor that are specified when CoarseGrainedExecutorBackend is created.
|
If creating the Executor
fails with a non-fatal exception, RegisteredExecutor
terminates CoarseGrainedExecutorBackend
with the reason:
Unable to create executor due to [message]
Note
|
RegisteredExecutor is sent exclusively when CoarseGrainedSchedulerBackend RPC Endpoint receives a RegisterExecutor (that is sent right before CoarseGrainedExecutorBackend RPC Endpoint starts accepting messages which happens when CoarseGrainedExecutorBackend is started).
|
RegisterExecutorFailed
RegisterExecutorFailed(message)
When a RegisterExecutorFailed
message arrives, the following ERROR is printed out to the logs:
ERROR CoarseGrainedExecutorBackend: Slave registration failed: [message]
CoarseGrainedExecutorBackend
then exits with the exit code 1
.
Killing Tasks — KillTask
Message Handler
KillTask(taskId, _, interruptThread)
message kills a task (calls Executor.killTask
).
If an executor has not been initialized yet (FIXME: why?), the following ERROR message is printed out to the logs and CoarseGrainedExecutorBackend exits:
ERROR Received KillTask command but executor was null
StopExecutor Handler
case object StopExecutor
extends CoarseGrainedClusterMessage
When StopExecutor
is received, the handler turns stopping internal flag on. You should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
In the end, the handler sends a Shutdown message to itself.
Note
|
StopExecutor message is sent when CoarseGrainedSchedulerBackend RPC Endpoint (aka DriverEndpoint ) processes StopExecutors or RemoveExecutor messages.
|
Shutdown Handler
case object Shutdown
extends CoarseGrainedClusterMessage
Shutdown
turns stopping internal flag on and starts the CoarseGrainedExecutorBackend-stop-executor
thread that stops the owned Executor
(using executor reference).
Note
|
Shutdown message is sent exclusively when CoarseGrainedExecutorBackend receives StopExecutor .
|
Terminating CoarseGrainedExecutorBackend (and Notifying Driver with RemoveExecutor) — exitExecutor
Method
exitExecutor(
code: Int,
reason: String,
throwable: Throwable = null,
notifyDriver: Boolean = true): Unit
When exitExecutor
is executed, you should see the following ERROR message in the logs (followed by throwable
if available):
ERROR Executor self-exiting due to : [reason]
If notifyDriver
is enabled (it is by default) exitExecutor
informs the driver that the executor should be removed (by sending a blocking RemoveExecutor
message with executor id and a ExecutorLossReason
with the input reason
).
You may see the following WARN message in the logs when the notification fails.
Unable to notify the driver due to [message]
In the end, exitExecutor
terminates the CoarseGrainedExecutorBackend
JVM process with the status code
.
Note
|
exitExecutor uses Java’s System.exit and initiates JVM’s shutdown sequence (and executing all registered shutdown hooks).
|
Note
|
|
onDisconnected
Callback
Caution
|
FIXME |
start
Method
Caution
|
FIXME |
stop
Method
Caution
|
FIXME |
requestTotalExecutors
Caution
|
FIXME |
Extracting Log URLs — extractLogUrls
Method
Caution
|
FIXME |