YarnSchedulerEndpoint RPC Endpoint

YarnSchedulerEndpoint is a thread-safe RPC endpoint for communication between YarnSchedulerBackend on the driver and ApplicationMaster on YARN (inside a YARN container).

Caution
FIXME Picture it.
Tip

Enable INFO logging level for org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint=INFO

Refer to Logging.

RPC Messages

RequestExecutors

RequestExecutors(
  requestedTotal: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int])
extends CoarseGrainedClusterMessage

RequestExecutors is to inform ApplicationMaster about the current requirements for the total number of executors (as requestedTotal), including already pending and running executors.

spark YarnSchedulerEndpoint RequestExecutors.png
Figure 1. RequestExecutors Message Flow (client deploy mode)

When a RequestExecutors arrives, YarnSchedulerEndpoint simply passes it on to ApplicationMaster (via the internal RPC endpoint reference). The result of the forward call is sent back in response.

Any issues communicating with the remote ApplicationMaster RPC endpoint are reported as ERROR messages in the logs:

ERROR Sending RequestExecutors to AM was unsuccessful

RemoveExecutor

KillExecutors

AddWebUIFilter

AddWebUIFilter(
  filterName: String,
  filterParams: Map[String, String],
  proxyBase: String)

AddWebUIFilter triggers setting spark.ui.proxyBase system property and adding the filterName filter to web UI.

It firstly sets spark.ui.proxyBase system property to the input proxyBase (if not empty).

If it defines a filter, i.e. the input filterName and filterParams are both not empty, you should see the following INFO message in the logs:

INFO Add WebUI Filter. [filterName], [filterParams], [proxyBase]

It then sets spark.ui.filters to be the input filterName in the internal conf SparkConf attribute.

All the filterParams are also set as spark.[filterName].param.[key] and [value].

The filter is added to web UI using JettyUtils.addFilters(ui.getHandlers, conf).

Caution
FIXME Review JettyUtils.addFilters(ui.getHandlers, conf).

RegisterClusterManager Message

RegisterClusterManager(am: RpcEndpointRef)

When RegisterClusterManager message arrives, the following INFO message is printed out to the logs:

INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as [am]

If the internal shouldResetOnAmRegister flag is enabled, YarnSchedulerBackend is reset. It is disabled initially, so shouldResetOnAmRegister is enabled.

Note
shouldResetOnAmRegister controls whether to reset YarnSchedulerBackend when another RegisterClusterManager RPC message arrives that could be because the ApplicationManager failed and a new one was registered.

RetrieveLastAllocatedExecutorId

When RetrieveLastAllocatedExecutorId is received, YarnSchedulerEndpoint responds with the current value of currentExecutorIdCounter.

onDisconnected Callback

onDisconnected clears the internal reference to the remote ApplicationMaster RPC Endpoint (i.e. it sets it to None) if the remote address matches the reference’s.

Note
It is a callback method to be called when…​FIXME

You should see the following WARN message in the logs if that happens:

WARN ApplicationMaster has disassociated: [remoteAddress]

onStop Callback

onStop shuts askAmThreadPool down immediately.

Note
onStop is a callback method to be called when…​FIXME

Internal Reference to ApplicationMaster RPC Endpoint (amEndpoint variable)

amEndpoint is a reference to a remote ApplicationMaster RPC Endpoint.

askAmThreadPool Thread Pool

askAmThreadPool is a thread pool called yarn-scheduler-ask-am-thread-pool that creates new threads as needed and reuses previously constructed threads when they are available.

results matching ""

    No results matching ""