log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=TRACE
NettyBlockTransferService
— Netty-Based BlockTransferService
NettyBlockTransferService
is a BlockTransferService that uses Netty for block transport (when uploading or fetching blocks of data).
Note
|
NettyBlockTransferService is created when SparkEnv is created (and later passed on to create a BlockManager for the driver and executors).
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating NettyBlockTransferService
Instance
Caution
|
FIXME |
fetchBlocks
Method
fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit
fetchBlocks
…FIXME
When executed, fetchBlocks
prints out the following TRACE message in the logs:
TRACE Fetch blocks from [host]:[port] (executor id [execId])
fetchBlocks
then creates a RetryingBlockFetcher.BlockFetchStarter
where createAndStart
method…FIXME
Depending on the maximum number of acceptable IO exceptions (such as connection timeouts) per request, if the number is greater than 0
, fetchBlocks
creates RetryingBlockFetcher
and starts it immediately.
Note
|
RetryingBlockFetcher is created with the RetryingBlockFetcher.BlockFetchStarter created earlier, the input blockIds and listener .
|
If however the number of retries is not greater than 0
(it could be 0
or less), the RetryingBlockFetcher.BlockFetchStarter
created earlier is started (with the input blockIds
and listener
).
In case of any Exception
, you should see the following ERROR message in the logs and the input BlockFetchingListener
gets notified (using onBlockFetchFailure
for every block id).
ERROR Exception while beginning fetchBlocks
Note
|
fetchBlocks is called when BlockTransferService fetches one block synchronously and ShuffleBlockFetcherIterator sends a request for blocks (using sendRequest ).
|
Application Id — appId
Property
Caution
|
FIXME |
Initializing NettyBlockTransferService — init
Method
init(blockDataManager: BlockDataManager): Unit
Note
|
init is a part of the BlockTransferService contract.
|
init
starts a server for…FIXME
Internally, init
creates a NettyBlockRpcServer
(using the application id, a JavaSerializer
and the input blockDataManager
).
Caution
|
FIXME Describe security when authEnabled is enabled.
|
init
creates a TransportContext
with the NettyBlockRpcServer
created earlier.
Caution
|
FIXME Describe transportConf and TransportContext .
|
init
creates the internal clientFactory
and a server.
Caution
|
FIXME What’s the "a server"? |
In the end, you should see the INFO message in the logs:
INFO NettyBlockTransferService: Server created on [hostName]:[port]
Note
|
hostname is given when NettyBlockTransferService is created and is controlled by spark.driver.host Spark property for the driver and differs per deployment environment for executors (as controlled by --hostname for CoarseGrainedExecutorBackend ).
|
Uploading Block — uploadBlock
Method
uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit]
Note
|
uploadBlock is a part of the BlockTransferService contract.
|
Internally, uploadBlock
creates a TransportClient
client to send a UploadBlock
message (to the input hostname
and port
).
Note
|
UploadBlock message is processed by NettyBlockRpcServer.
|
The UploadBlock
message holds the application id, the input execId
and blockId
. It also holds the serialized bytes for block metadata with level
and classTag
serialized (using the internal JavaSerializer
) as well as the serialized bytes for the input blockData
itself (this time however the serialization uses ManagedBuffer.nioByteBuffer
method).
The entire UploadBlock
message is further serialized before sending (using TransportClient.sendRpc
).
Caution
|
FIXME Describe TransportClient and clientFactory.createClient .
|
When blockId
block was successfully uploaded, you should see the following TRACE message in the logs:
TRACE NettyBlockTransferService: Successfully uploaded block [blockId]
When an upload failed, you should see the following ERROR message in the logs:
ERROR Error while uploading block [blockId]
Note
|
uploadBlock is executed when BlockTransferService does block upload in a blocking fashion.
|
UploadBlock
Message
UploadBlock
is a BlockTransferMessage
that describes a block being uploaded, i.e. send over the wire from a NettyBlockTransferService to a NettyBlockRpcServer.
Attribute | Description |
---|---|
|
The application id (the block belongs to) |
|
The executor id |
|
The block id |
|
|
|
The block data as an array of bytes |
As an Encodable
, UploadBlock
can calculate the encoded size and do encoding and decoding itself to or from a ByteBuf
, respectively.