RpcEnv — RPC Environment
Caution
|
|
RPC Environment (aka RpcEnv) is an environment for RpcEndpoints to process messages. A RPC Environment manages the entire lifecycle of RpcEndpoints:
-
registers (sets up) endpoints (by name or uri)
-
routes incoming messages to them
-
stops them
A RPC Environment is defined by the name, host, and port. It can also be controlled by a security manager.
You can create a RPC Environment using RpcEnv.create factory methods.
The only implementation of RPC Environment is Netty-based implementation.
A RpcEndpoint defines how to handle messages (what functions to execute given a message). RpcEndpoints register (with a name or uri) to RpcEnv
to receive messages from RpcEndpointRefs.
RpcEndpointRefs can be looked up by name or uri (because different RpcEnvs may have different naming schemes).
org.apache.spark.rpc
package contains the machinery for RPC communication in Spark.
Client Mode = is this an executor or the driver?
When an RPC Environment is initialized as part of the initialization of the driver or executors (using RpcEnv.create
), clientMode
is false
for the driver and true
for executors.
RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, clientMode = !isDriver)
Refer to Client Mode in Netty-based RpcEnv for the implementation-specific details.
Creating RpcEndpointRef For URI — asyncSetupEndpointRefByURI
Method
Caution
|
FIXME |
Creating RpcEndpointRef For URI — setupEndpointRefByURI
Method
Caution
|
FIXME |
shutdown
Method
Caution
|
FIXME |
Registering RPC Endpoint — setupEndpoint
Method
Caution
|
FIXME |
awaitTermination
Method
Caution
|
FIXME |
ThreadSafeRpcEndpoint
ThreadSafeRpcEndpoint
is a marker RpcEndpoint that does nothing by itself but tells…
Caution
|
FIXME What is marker? |
Note
|
ThreadSafeRpcEndpoint is a private[spark] trait .
|
RpcAddress
RpcAddress is the logical address for an RPC Environment, with hostname and port.
RpcAddress is encoded as a Spark URL, i.e. spark://host:port
.
RpcEndpointAddress
RpcEndpointAddress is the logical address for an endpoint registered to an RPC Environment, with RpcAddress and name.
It is in the format of spark://[name]@[rpcAddress.host]:[rpcAddress.port].
Endpoint Lookup Timeout
When a remote endpoint is resolved, a local RPC environment connects to the remote one. It is called endpoint lookup. To configure the time needed for the endpoint lookup you can use the following settings.
It is a prioritized list of lookup timeout properties (the higher on the list, the more important):
-
spark.rpc.lookupTimeout
Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s
, 100ms
, or 250us
. See Settings.
Ask Operation Timeout
Ask operation is when a RPC client expects a response to a message. It is a blocking operation.
You can control the time to wait for a response using the following settings (in that order):
Their value can be a number alone (seconds) or any number with time suffix, e.g. 50s
, 100ms
, or 250us
. See Settings.
Exceptions
When RpcEnv catches uncaught exceptions, it uses RpcCallContext.sendFailure
to send exceptions back to the sender, or logging them if no such sender or NotSerializableException
.
If any error is thrown from one of RpcEndpoint methods except onError
, onError
will be invoked with the cause. If onError
throws an error, RpcEnv will ignore it.
RpcEnvConfig
RpcEnvConfig
is a placeholder for an instance of SparkConf, the name of the RPC Environment, host and port, a security manager, and clientMode.
Creating RpcEnv — create
Factory Methods
create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv (1)
create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean): RpcEnv
-
The 6-argument
create
(withclientMode
disabled) simply passes the input arguments on to the secondcreate
makingbindAddress
andadvertiseAddress
the same.
create
creates a RpcEnvConfig (with the input arguments) and creates a NettyRpcEnv
.
Note
|
Copied (almost verbatim) from SPARK-10997 Netty-based RPC env should support a "client-only" mode and the commit: "Client mode" means the RPC env will not listen for incoming connections. This allows certain processes in the Spark stack (such as Executors or tha YARN client-mode AM) to act as pure clients when using the netty-based RPC backend, reducing the number of sockets Spark apps need to use and also the number of open ports. The AM connects to the driver in "client mode", and that connection is used for all driver — AM communication, and so the AM is properly notified when the connection goes down. In "general", non-YARN case, In Spark on YARN in |
Note
|
|
Settings
Spark Property | Default Value | Description |
---|---|---|
|
Timeout to use for RPC remote endpoint lookup. Refer to Endpoint Lookup Timeout |
|
|
Number of attempts to send a message to and receive a response from a remote endpoint. |
|
|
Time to wait between retries. |
|
|
Timeout for RPC ask calls. Refer to Ask Operation Timeout. |
|
|
Network timeout to use for RPC remote endpoint lookup. Fallback for spark.rpc.askTimeout. |