ExecutorRunnable

ExecutorRunnable starts a YARN container with CoarseGrainedExecutorBackend standalone application.

spark yarn ExecutorRunnable.png
Figure 1. ExecutorRunnable and YarnAllocator in YARN Resource Container
Table 1. ExecutorRunnable’s Internal Properties
Name Description

rpc

YarnRPC for…​FIXME

nmClient

NMClient for…​FIXME

Note
Despite the name ExecutorRunnable is not a java.lang.Runnable anymore after SPARK-12447.
Tip

Enable INFO or DEBUG logging level for org.apache.spark.deploy.yarn.ExecutorRunnable logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.yarn.ExecutorRunnable=DEBUG

Refer to Logging.

Creating ExecutorRunnable Instance

ExecutorRunnable takes the following when created:

  1. YARN Container to run a Spark executor in

  2. YarnConfiguration

  3. sparkConf — SparkConf

  4. masterAddress

  5. executorId

  6. hostname of the YARN container

  7. executorMemory

  8. executorCores

  9. appId

  10. SecurityManager

  11. localResources — Map[String, LocalResource]

ExecutorRunnable initializes the internal registries and counters.

Note
executorMemory and executorCores input arguments are from YarnAllocator but really are spark.executor.memory and spark.executor.cores properties.
Note
Most of the input parameters are exactly as YarnAllocator was created with.

Building Command to Run CoarseGrainedExecutorBackend in YARN Container — prepareCommand Internal Method

prepareCommand(
  masterAddress: String,
  slaveId: String,
  hostname: String,
  executorMemory: Int,
  executorCores: Int,
  appId: String): List[String]

prepareCommand prepares the command that is used to start org.apache.spark.executor.CoarseGrainedExecutorBackend application in a YARN container. All the input parameters of prepareCommand become the command-line arguments of CoarseGrainedExecutorBackend application.

prepareCommand builds the command that will be executed in a YARN container.

Note
JVM options are defined using -Dkey=value format.

prepareCommand builds -Xmx JVM option using executorMemory (in MB).

Note
prepareCommand uses executorMemory that is given when ExecutorRunnable is created.

prepareCommand adds the optional spark.executor.extraJavaOptions property to the JVM options (if defined).

prepareCommand adds the optional SPARK_JAVA_OPTS environment variable to the JVM options (if defined).

prepareCommand adds the optional spark.executor.extraLibraryPath to the library path (changing the path to be YARN NodeManager-aware).

prepareCommand adds -Djava.io.tmpdir=<LOG_DIR>./tmp to the JVM options.

prepareCommand adds all the Spark properties for executors to the JVM options.

Note
prepareCommand uses SparkConf that is given when ExecutorRunnable is created.

prepareCommand adds -Dspark.yarn.app.container.log.dir=<LOG_DIR> to the JVM options.

prepareCommand adds -XX:MaxPermSize=256m unless already defined or IBM JVM or Java 8 are used.

prepareCommand reads the list of URIs representing the user classpath and adds --user-class-path and file:[path] for every entry.

prepareCommand adds -XX:OnOutOfMemoryError to the JVM options unless already defined.

In the end, prepareCommand combines the parts together to build the entire command with the following (in order):

  1. Extra library path

  2. JAVA_HOME/bin/java

  3. -server

  4. JVM options

  5. org.apache.spark.executor.CoarseGrainedExecutorBackend

  6. --driver-url followed by masterAddress

  7. --executor-id followed by executorId

  8. --hostname followed by hostname

  9. --cores followed by executorCores

  10. --app-id followed by appId

  11. --user-class-path with the arguments

  12. 1><LOG_DIR>/stdout

  13. 2><LOG_DIR>/stderr

Note
prepareCommand uses the arguments for --driver-url, --executor-id, --hostname, --cores and --app-id as given when ExecutorRunnable is created.
Note
You can see the result of prepareCommand as command in the INFO message in the logs when ApplicationMaster registers itself with YARN ResourceManager (to print it out once and avoid flooding the logs when starting Spark executors).
Note
prepareCommand is used when ExecutorRunnable starts CoarseGrainedExecutorBackend in a YARN resource container and (only for debugging purposes) when ExecutorRunnable builds launch context diagnostic information (to print it out as an INFO message to the logs).

Collecting Environment Variables for CoarseGrainedExecutorBackend Containers — prepareEnvironment Internal Method

prepareEnvironment(): HashMap[String, String]

prepareEnvironment collects environment-related entries.

prepareEnvironment populates class path (passing in YarnConfiguration, SparkConf, and spark.executor.extraClassPath property)

Caution
FIXME How does populateClasspath use the input env?

prepareEnvironment collects the executor environment variables set on the current SparkConf, i.e. the Spark properties with the prefix spark.executorEnv., and YarnSparkHadoopUtil.addPathToEnvironment(env, key, value).

Note
SPARK_YARN_USER_ENV is deprecated.

prepareEnvironment reads YARN’s yarn.http.policy property (with YarnConfiguration.YARN_HTTP_POLICY_DEFAULT) to choose a secure HTTPS scheme for container logs when HTTPS_ONLY.

With the input container defined and SPARK_USER environment variable available, prepareEnvironment registers SPARK_LOG_URL_STDERR and SPARK_LOG_URL_STDOUT environment entries with stderr?start=-4096 and stdout?start=-4096 added to [httpScheme][address]/node/containerlogs/[containerId]/[user], respectively.

In the end, prepareEnvironment collects all the System environment variables with SPARK prefix.

Note
prepareEnvironment is used when ExecutorRunnable starts CoarseGrainedExecutorBackend in a container and (for debugging purposes) builds launch context diagnostic information (to print it out as an INFO message to the logs).

Starting ExecutorRunnable (with CoarseGrainedExecutorBackend) — run Method

run(): Unit

When called, you should see the following DEBUG message in the logs:

DEBUG ExecutorRunnable: Starting Executor Container

run creates a YARN NMClient (to communicate with YARN NodeManager service), inits it with YarnConfiguration and starts it.

Note
run uses YarnConfiguration that was given when ExecutorRunnable was created.

Starting YARN Resource Container — startContainer Method

startContainer(): java.util.Map[String, ByteBuffer]

startContainer uses YARN NodeManager’s NMClient API to start a CoarseGrainedExecutorBackend in a YARN container.

Tip

startContainer follows the design pattern to request YARN NodeManager to start a YARN resource container:

val ctx = Records.newRecord(classOf[ContainerLaunchContext]).asInstanceOf[ContainerLaunchContext]
ctx.setLocalResources(...)
ctx.setEnvironment(...)
ctx.setTokens(...)
ctx.setCommands(...)
ctx.setApplicationACLs(...)
ctx.setServiceData(...)
nmClient.startContainer(container, ctx)

startContainer creates a YARN ContainerLaunchContext.

Note
YARN ContainerLaunchContext represents all of the information for the YARN NodeManager to launch a resource container.

startContainer then sets local resources and environment to the ContainerLaunchContext.

Note
startContainer uses local resources given when ExecutorRunnable was created.

startContainer sets security tokens to the ContainerLaunchContext (using Hadoop’s UserGroupInformation and the current user’s credentials).

startContainer sets the command (to launch CoarseGrainedExecutorBackend) to the ContainerLaunchContext.

startContainer sets the application ACLs to the ContainerLaunchContext.

If spark.shuffle.service.enabled property is enabled, startContainer registers the ContainerLaunchContext with the YARN shuffle service started on the YARN NodeManager under spark_shuffle service name.

In the end, startContainer requests the YARN NodeManager to start the YARN container with the ContainerLaunchContext context.

Note
startContainer uses nmClient internal reference to send the request with the YARN resource container given when ExecutorRunnable was created.

If any exception happens, startContainer reports SparkException.

Exception while starting container [containerId] on host [hostname]
Note
startContainer is used exclusively when ExecutorRunnable is started.

Building Launch Context Diagnostic Information (with Command, Environment and Resources) — launchContextDebugInfo Method

launchContextDebugInfo(): String

launchContextDebugInfo returns the launch context debug info.

===============================================================================
YARN executor launch context:
  env:
    [key] -> [value]
    ...

  command:
    [commands]

  resources:
    [key] -> [value]
===============================================================================
Note
resources entry is the input localResources given when ExecutorRunnable was created.
Note
launchContextDebugInfo is used when ApplicationMaster registers itself with YARN ResourceManager.

results matching ""

    No results matching ""