Netty-based RpcEnv

Tip
Read up RpcEnv — RPC Environment on the concept of RPC Environment in Spark.

The class org.apache.spark.rpc.netty.NettyRpcEnv is the implementation of RpcEnv using Netty - "an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients".

Netty-based RPC Environment is created by NettyRpcEnvFactory when spark.rpc is netty or org.apache.spark.rpc.netty.NettyRpcEnvFactory.

It uses Java’s built-in serialization (the implementation of JavaSerializerInstance).

Caution
FIXME What other choices of JavaSerializerInstance are available in Spark?

NettyRpcEnv is only started on the driver. See Client Mode.

The default port to listen to is 7077.

When NettyRpcEnv starts, the following INFO message is printed out in the logs:

INFO Utils: Successfully started service 'NettyRpcEnv' on port 0.
Tip

Set DEBUG for org.apache.spark.network.server.TransportServer logger to know when Shuffle server/NettyRpcEnv starts listening to messages.

DEBUG Shuffle server started on port :

FIXME: The message above in TransportServer has a space before :.

Creating NettyRpcEnv — create Method

Caution
FIXME

Client Mode

Refer to Client Mode = is this an executor or the driver? for introduction about client mode. This is only for Netty-based RpcEnv.

When created, a Netty-based RpcEnv starts the RPC server and register necessary endpoints for non-client mode, i.e. when client mode is false.

Caution
FIXME What endpoints?

It means that the required services for remote communication with NettyRpcEnv are only started on the driver (not executors).

Thread Pools

shuffle-server-ID

EventLoopGroup uses a daemon thread pool called shuffle-server-ID, where ID is a unique integer for NioEventLoopGroup (NIO) or EpollEventLoopGroup (EPOLL) for the Shuffle server.

Caution
FIXME Review Netty’s NioEventLoopGroup.
Caution
FIXME Where are SO_BACKLOG, SO_RCVBUF, SO_SNDBUF channel options used?

dispatcher-event-loop-ID

NettyRpcEnv’s Dispatcher uses the daemon fixed thread pool with spark.rpc.netty.dispatcher.numThreads threads.

Thread names are formatted as dispatcher-event-loop-ID, where ID is a unique, sequentially assigned integer.

It starts the message processing loop on all of the threads.

netty-rpc-env-timeout

NettyRpcEnv uses the daemon single-thread scheduled thread pool netty-rpc-env-timeout.

"netty-rpc-env-timeout" #87 daemon prio=5 os_prio=31 tid=0x00007f887775a000 nid=0xc503 waiting on condition [0x0000000123397000]

netty-rpc-connection-ID

NettyRpcEnv uses the daemon cached thread pool with up to spark.rpc.connect.threads threads.

Thread names are formatted as netty-rpc-connection-ID, where ID is a unique, sequentially assigned integer.

Settings

The Netty-based implementation uses the following properties:

  • spark.rpc.io.mode (default: NIO) - NIO or EPOLL for low-level IO. NIO is always available, while EPOLL is only available on Linux. NIO uses io.netty.channel.nio.NioEventLoopGroup while EPOLL io.netty.channel.epoll.EpollEventLoopGroup.

  • spark.shuffle.io.numConnectionsPerPeer always equals 1

  • spark.rpc.io.threads (default: 0; maximum: 8) - the number of threads to use for the Netty client and server thread pools.

    • spark.shuffle.io.serverThreads (default: the value of spark.rpc.io.threads)

    • spark.shuffle.io.clientThreads (default: the value of spark.rpc.io.threads)

  • spark.rpc.netty.dispatcher.numThreads (default: the number of processors available to JVM)

  • spark.rpc.connect.threads (default: 64) - used in cluster mode to communicate with a remote RPC endpoint

  • spark.port.maxRetries (default: 16 or 100 for testing when spark.testing is set) controls the maximum number of binding attempts/retries to a port before giving up.

Endpoints

  • endpoint-verifier (RpcEndpointVerifier) - a RpcEndpoint for remote RpcEnvs to query whether an RpcEndpoint exists or not. It uses Dispatcher that keeps track of registered endpoints and responds true/false to CheckExistence message.

endpoint-verifier is used to check out whether a given endpoint exists or not before the endpoint’s reference is given back to clients.

One use case is when an AppClient connects to standalone Masters before it registers the application it acts for.

Caution
FIXME Who’d like to use endpoint-verifier and how?

Message Dispatcher

A message dispatcher is responsible for routing RPC messages to the appropriate endpoint(s).

It uses the daemon fixed thread pool dispatcher-event-loop with spark.rpc.netty.dispatcher.numThreads threads for dispatching messages.

"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=31 tid=0x00007f8877153800 nid=0x7103 waiting on condition [0x000000011f78b000]

results matching ""

    No results matching ""