BlockManagerMasterEndpoint — BlockManagerMaster RPC Endpoint

BlockManagerMasterEndpoint is the ThreadSafeRpcEndpoint for BlockManagerMaster under BlockManagerMaster name.

BlockManagerMasterEndpoint tracks status of the BlockManagers (on the executors) in a Spark application.

BlockManagerMasterEndpoint is created when SparkEnv is created (for the driver and executors).

Table 1. BlockManagerMaster RPC Endpoint’s Messages (in alphabetical order)
Message When posted?

RegisterBlockManager

Posted when BlockManagerMaster registers a BlockManager.

UpdateBlockInfo

Posted when BlockManagerMaster receives a block status update (from BlockManager on an executor).

Table 2. BlockManagerMasterEndpoint’s Internal Registries and Counters
Name Description

blockManagerIdByExecutor

FIXME

blockManagerInfo

Lookup table of BlockManagerInfo per BlockManagerId

Updated when BlockManagerMasterEndpoint registers a new BlockManager or removes a BlockManager

blockLocations

Collection of BlockIds and their locations (as BlockManagerId).

Used in removeRdd to remove blocks for a RDD, removeBlockManager to remove blocks after a BlockManager gets removed, removeBlockFromWorkers, updateBlockInfo, and getLocations.

Tip

Enable INFO logging level for org.apache.spark.storage.BlockManagerMasterEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerMasterEndpoint=INFO

Refer to Logging.

storageStatus Internal Method

Caution
FIXME

getLocationsMultipleBlockIds Method

Caution
FIXME

Removing Shuffle Blocks — removeShuffle Internal Method

Caution
FIXME

UpdateBlockInfo

class UpdateBlockInfo(
  var blockManagerId: BlockManagerId,
  var blockId: BlockId,
  var storageLevel: StorageLevel,
  var memSize: Long,
  var diskSize: Long)

When RegisterBlockManager arrives, BlockManagerMasterEndpoint…​FIXME

Caution
FIXME

RemoveExecutor

RemoveExecutor(execId: String)

When RemoveExecutor is received, executor execId is removed and the response true sent back.

Note
RemoveExecutor is posted when BlockManagerMaster removes an executor.

Finding Peers of BlockManager — getPeers Internal Method

getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId]

getPeers finds all the registered BlockManagers (using blockManagerInfo internal registry) and checks if the input blockManagerId is amongst them.

If the input blockManagerId is registered, getPeers returns all the registered BlockManagers but the one on the driver and blockManagerId.

Otherwise, getPeers returns no BlockManagers.

Note
Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application.
Note
getPeers is used exclusively when BlockManagerMasterEndpoint handles GetPeers message.

Finding Peers of BlockManager — GetPeers Message

GetPeers(blockManagerId: BlockManagerId)
extends ToBlockManagerMaster

GetPeers replies with the peers of blockManagerId.

Note
Peers of a BlockManager are the other BlockManagers in a cluster (except the driver’s BlockManager). Peers are used to know the available executors in a Spark application.

BlockManagerHeartbeat

Caution
FIXME

GetLocations Message

GetLocations(blockId: BlockId)
extends ToBlockManagerMaster

GetLocations replies with the locations of blockId.

GetLocationsMultipleBlockIds Message

GetLocationsMultipleBlockIds(blockIds: Array[BlockId])
extends ToBlockManagerMaster

GetLocationsMultipleBlockIds replies with the getLocationsMultipleBlockIds for the input blockIds.

Note
GetLocationsMultipleBlockIds is posted when BlockManagerMaster requests the block locations for multiple blocks.

RegisterBlockManager Event

RegisterBlockManager(
  blockManagerId: BlockManagerId,
  maxMemSize: Long,
  sender: RpcEndpointRef)

When RegisterBlockManager arrives, BlockManagerMasterEndpoint registers the BlockManager.

Registering BlockManager (on Executor) — register Internal Method

register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit

register records the current time and registers BlockManager (using BlockManagerId) unless it has been registered already (in blockManagerInfo internal registry).

Note
register is executed when RegisterBlockManager has been received.
Note
Registering a BlockManager can only happen once for an executor (identified by BlockManagerId.executorId in blockManagerIdByExecutor internal registry).

If another BlockManager has earlier been registered for the executor, you should see the following ERROR message in the logs:

ERROR Got two different block manager registrations on same executor - will replace old one [oldId] with new one [id]

You should see the following INFO message in the logs:

INFO Registering block manager [hostPort] with [bytes] RAM, [id]

The BlockManager is recorded in the internal registries:

Caution
FIXME Why does blockManagerInfo require a new System.currentTimeMillis() since time was already recorded?

In either case, SparkListenerBlockManagerAdded is posted (to listenerBus).

Note
The method can only be executed on the driver where listenerBus is available.
Caution
FIXME Describe listenerBus + omnigraffle it.

Other RPC Messages

  • GetLocationsMultipleBlockIds

  • GetRpcHostPortForExecutor

  • GetMemoryStatus

  • GetStorageStatus

  • GetBlockStatus

  • GetMatchingBlockIds

  • RemoveShuffle

  • RemoveBroadcast

  • RemoveBlock

  • StopBlockManagerMaster

  • BlockManagerHeartbeat

  • HasCachedBlocks

Removing Executor — removeExecutor Internal Method

removeExecutor(execId: String)

removeExecutor prints the following INFO message to the logs:

INFO BlockManagerMasterEndpoint: Trying to remove executor [execId] from BlockManagerMaster.

If the execId executor is registered (in the internal blockManagerIdByExecutor internal registry), removeExecutor removes the corresponding BlockManager.

Note
removeExecutor is executed when BlockManagerMasterEndpoint receives a RemoveExecutor or registers a new BlockManager (and another BlockManager was already registered that is replaced by the new one).

Removing BlockManager — removeBlockManager Internal Method

removeBlockManager(blockManagerId: BlockManagerId)

removeBlockManager looks up blockManagerId and removes the executor it was working on from the internal registries:

It then goes over all the blocks for the BlockManager, and removes the executor for each block from blockLocations registry.

You should then see the following INFO message in the logs:

INFO BlockManagerMasterEndpoint: Removing block manager [blockManagerId]
Note
removeBlockManager is used exclusively when BlockManagerMasterEndpoint removes an executor.

Get Block Locations — getLocations Method

getLocations(blockId: BlockId): Seq[BlockManagerId]

When executed, getLocations looks up blockId in the blockLocations internal registry and returns the locations (as a collection of BlockManagerId) or an empty collection.

Creating BlockManagerMasterEndpoint Instance

BlockManagerMasterEndpoint takes the following when created:

BlockManagerMasterEndpoint initializes the internal registries and counters.

results matching ""

    No results matching ""