YarnShuffleService — ExternalShuffleService on YARN

YarnShuffleService is an external shuffle service for Spark on YARN. It is YARN NodeManager’s auxiliary service that implements org.apache.hadoop.yarn.server.api.AuxiliaryService.

Note
There is the ExternalShuffleService for Spark and despite their names they don’t share code.
Caution
FIXME What happens when the spark.shuffle.service.enabled flag is enabled?

After the external shuffle service is configured in YARN you enable it in a Spark application using spark.shuffle.service.enabled flag.

Note
YarnShuffleService was introduced in SPARK-3797.
Tip

Enable INFO logging level for org.apache.spark.network.yarn.YarnShuffleService logger in YARN logging system to see what happens inside.

log4j.logger.org.apache.spark.network.yarn.YarnShuffleService=INFO

YARN saves logs in /usr/local/Cellar/hadoop/2.7.2/libexec/logs directory on Mac OS X with brew, e.g. /usr/local/Cellar/hadoop/2.7.2/libexec/logs/yarn-jacek-nodemanager-japila.local.log.

Advantages

The advantages of using the YARN Shuffle Service:

  • With dynamic allocation enabled executors can be discarded and a Spark application could still get at the shuffle data the executors wrote out.

  • It allows individual executors to go into GC pause (or even crash) and still allow other Executors to read shuffle data and make progress.

Creating YarnShuffleService Instance

When YarnShuffleService is created, it calls YARN’s AuxiliaryService with spark_shuffle service name.

You should see the following INFO message in the logs:

INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark
INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service spark_shuffle, "spark_shuffle"

getRecoveryPath

Caution
FIXME

serviceStop

void serviceStop()

serviceStop is a part of YARN’s AuxiliaryService contract and is called when…​FIXME

Caution
FIXME The contract

When called, serviceStop simply closes shuffleServer and blockHandler.

Caution
FIXME What are shuffleServer and blockHandler? What’s their lifecycle?

When an exception occurs, you should see the following ERROR message in the logs:

ERROR org.apache.spark.network.yarn.YarnShuffleService: Exception when stopping service

stopContainer

void stopContainer(ContainerTerminationContext context)

stopContainer is a part of YARN’s AuxiliaryService contract and is called when…​FIXME

Caution
FIXME The contract

When called, stopContainer simply prints out the following INFO message in the logs and exits.

INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping container [containerId]

It obtains the containerId from context using getContainerId method.

initializeContainer

void initializeContainer(ContainerInitializationContext context)

initializeContainer is a part of YARN’s AuxiliaryService contract and is called when…​FIXME

Caution
FIXME The contract

When called, initializeContainer simply prints out the following INFO message in the logs and exits.

INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing container [containerId]

It obtains the containerId from context using getContainerId method.

stopApplication

void stopApplication(ApplicationTerminationContext context)

stopApplication is a part of YARN’s AuxiliaryService contract and is called when…​FIXME

Caution
FIXME The contract

stopApplication requests the ShuffleSecretManager to unregisterApp when authentication is enabled and ExternalShuffleBlockHandler to applicationRemoved.

When called, stopApplication obtains YARN’s ApplicationId for the application (using the input context).

You should see the following INFO message in the logs:

INFO org.apache.spark.network.yarn.YarnShuffleService: Stopping application [appId]

If isAuthenticationEnabled, secretManager.unregisterApp is executed for the application id.

It requests ExternalShuffleBlockHandler to applicationRemoved (with cleanupLocalDirs flag disabled).

Caution
FIXME What does ExternalShuffleBlockHandler#applicationRemoved do?

When an exception occurs, you should see the following ERROR message in the logs:

ERROR org.apache.spark.network.yarn.YarnShuffleService: Exception when stopping application [appId]

initializeApplication

void initializeApplication(ApplicationInitializationContext context)

initializeApplication is a part of YARN’s AuxiliaryService contract and is called when…​FIXME

Caution
FIXME The contract

initializeApplication requests the ShuffleSecretManager to registerApp when authentication is enabled.

When called, initializeApplication obtains YARN’s ApplicationId for the application (using the input context) and calls context.getApplicationDataForService for shuffleSecret.

You should see the following INFO message in the logs:

INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing application [appId]

If isAuthenticationEnabled, secretManager.registerApp is executed for the application id and shuffleSecret.

When an exception occurs, you should see the following ERROR message in the logs:

ERROR org.apache.spark.network.yarn.YarnShuffleService: Exception when initializing application [appId]

serviceInit

void serviceInit(Configuration conf)

serviceInit is a part of YARN’s AuxiliaryService contract and is called when…​FIXME

Caution
FIXME

When called, serviceInit creates a TransportConf for the shuffle module that is used to create ExternalShuffleBlockHandler (as blockHandler).

It checks spark.authenticate key in the configuration (defaults to false) and if only authentication is enabled, it sets up a SaslServerBootstrap with a ShuffleSecretManager and adds it to a collection of TransportServerBootstraps.

It creates a TransportServer as shuffleServer to listen to spark.shuffle.service.port (default: 7337). It reads spark.shuffle.service.port key in the configuration.

You should see the following INFO message in the logs:

INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port [port]. Authentication is [authEnabled].  Registered executor file is [registeredExecutorFile]

Installation

YARN Shuffle Service Plugin

Add the YARN Shuffle Service plugin from the common/network-yarn module to YARN NodeManager’s CLASSPATH.

Tip
Use yarn classpath command to know YARN’s CLASSPATH.
cp common/network-yarn/target/scala-2.11/spark-2.0.0-SNAPSHOT-yarn-shuffle.jar \
  /usr/local/Cellar/hadoop/2.7.2/libexec/share/hadoop/yarn/lib/

yarn-site.xml — NodeManager Configuration File

If external shuffle service is enabled, you need to add spark_shuffle to yarn.nodemanager.aux-services in the yarn-site.xml file on all nodes.

yarn-site.xml — NodeManager Configuration properties
<?xml version="1.0"?>
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>spark_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
  </property>
  <!-- optional -->
  <property>
      <name>spark.shuffle.service.port</name>
      <value>10000</value>
  </property>
  <property>
      <name>spark.authenticate</name>
      <value>true</value>
  </property>
</configuration>

yarn.nodemanager.aux-services property is for the auxiliary service name being spark_shuffle with yarn.nodemanager.aux-services.spark_shuffle.class property being org.apache.spark.network.yarn.YarnShuffleService.

Exception — Attempting to Use External Shuffle Service in Spark Application in Spark on YARN

When you enable an external shuffle service in a Spark application when using Spark on YARN but do not install YARN Shuffle Service you will see the following exception in the logs:

Exception in thread "ContainerLauncher-0" java.lang.Error: org.apache.spark.SparkException: Exception while starting container container_1465448245611_0002_01_000002 on host 192.168.99.1
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Exception while starting container container_1465448245611_0002_01_000002 on host 192.168.99.1
	at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:126)
	at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:71)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	... 2 more
Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
	at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
	at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:207)
	at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:123)
	... 4 more

results matching ""

    No results matching ""