BlockManagerSlaveEndpoint

BlockManagerSlaveEndpoint is a thread-safe RPC endpoint for remote communication between executors and the driver.

Caution
FIXME the intro needs more love.

While a BlockManager is being created so is the BlockManagerSlaveEndpoint RPC endpoint with the name BlockManagerEndpoint[randomId] to handle RPC messages.

Tip

Enable DEBUG logging level for org.apache.spark.storage.BlockManagerSlaveEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerSlaveEndpoint=DEBUG

Refer to Logging.

RemoveBlock Message

RemoveBlock(blockId: BlockId)

When a RemoveBlock message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing block [blockId]
Note
Handling RemoveBlock messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing block [blockId], response is [response]

And true response is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: true to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing block [blockId]

RemoveRdd Message

RemoveRdd(rddId: Int)

When a RemoveRdd message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing RDD [rddId]
Note
Handling RemoveRdd messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing RDD [rddId], response is [response]

And the number of blocks removed is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: [#blocks] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing RDD [rddId]

RemoveShuffle Message

RemoveShuffle(shuffleId: Int)

When a RemoveShuffle message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing shuffle [shuffleId]

If MapOutputTracker was given (when the RPC endpoint was created), it calls MapOutputTracker to unregister the shuffleId shuffle.

Note
Handling RemoveShuffle messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing shuffle [shuffleId], response is [response]

And the result is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: [response] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing shuffle [shuffleId]
Note
RemoveShuffle is posted when BlockManagerMaster and BlockManagerMasterEndpoint remove all blocks for a shuffle.

RemoveBroadcast Message

RemoveBroadcast(broadcastId: Long)

When a RemoveBroadcast message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing broadcast [broadcastId]
Note
Handling RemoveBroadcast messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing broadcast [broadcastId], response is [response]

And the result is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: [response] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing broadcast [broadcastId]

GetBlockStatus Message

GetBlockStatus(blockId: BlockId)

When a GetBlockStatus message comes in, it responds with the result of calling BlockManager about the status of blockId.

GetMatchingBlockIds Message

GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)

GetMatchingBlockIds triggers a computation of the memory and disk blocks matching filter and sends it back.

TriggerThreadDump Message

When a TriggerThreadDump message comes in, a thread dump is generated and sent back.

BlockManagerSlaveEndpoint Thread Pool

BlockManagerSlaveEndpoint uses block-manager-slave-async-thread-pool daemon thread pool (asyncThreadPool) for some messages to talk to other Spark services, i.e. BlockManager, MapOutputTracker, ShuffleManager in a non-blocking, asynchronous way.

The reason for the async thread pool is that the block-related operations might take quite some time and to release the main RPC thread other threads are spawned to talk to the external services and pass responses on to the clients.

Note
BlockManagerSlaveEndpoint uses Java’s java.util.concurrent.ThreadPoolExecutor.

results matching ""

    No results matching ""