TorrentBroadcast — Default Broadcast Implementation

TorrentBroadcast is the default and only implementation of the Broadcast Contract that describes broadcast variables. TorrentBroadcast uses a BitTorrent-like protocol for block distribution (that only happens when tasks access broadcast variables on executors).

sparkcontext broadcast bittorrent.png
Figure 1. TorrentBroadcast - broadcasting using BitTorrent
// On the driver
val sc: SparkContext = ???
val anyScalaValue = ???
val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created

A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.

sparkcontext broadcast bittorrent newBroadcast.png
Figure 2. TorrentBroadcast puts broadcast and the chunks to driver’s BlockManager
Note
TorrentBroadcast-based broadcast variables are created using TorrentBroadcastFactory.
Note
TorrentBroadcast belongs to org.apache.spark.broadcast package.
Tip

Enable INFO or DEBUG logging levels for org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=DEBUG

Refer to Logging.

unBlockifyObject Method

Caution
FIXME

readBlocks Method

Caution
FIXME

releaseLock Method

Caution
FIXME

Creating TorrentBroadcast Instance

TorrentBroadcast[T](obj: T, id: Long)
extends Broadcast[T](id)

When created, TorrentBroadcast reads broadcast blocks (to the internal _value).

Note
The internal _value is transient so it is not serialized and sent over the wire to executors. It is later recreated lazily on executors when requested.

TorrentBroadcast then sets the internal optional CompressionCodec and the size of broadcast block (as controlled by spark.broadcast.blockSize Spark property in SparkConf per driver and executors).

Note
Compression is controlled by spark.broadcast.compress Spark property and is enabled by default.

The internal broadcastId is BroadcastBlockId for the input id.

Note
A broadcast’s blocks are first stored in the local BlockManager on the driver.

Getting Value of Broadcast Variable — getValue Method

def getValue(): T

getValue returns the value of a broadcast variable.

Note
getValue is a part of the Broadcast Variable Contract and is the only way to access the value of a broadcast variable.

Internaly, getValue reads the internal _value that, once accessed, reads broadcast blocks from the local or remote BlockManagers.

Note
The internal _value is transient and lazy, i.e. it is not preserved when serialized and (re)created only when requested, respectively. That "trick" allows for serializing broadcast values on the driver before they are transferred to executors over the wire.

readBroadcastBlock Internal Method

readBroadcastBlock(): T

Internally, readBroadcastBlock sets the SparkConf

Note
The current SparkConf is available using SparkEnv.get.conf.

readBroadcastBlock requests the local BlockManager for values of the broadcast.

Note
The current BlockManager is available using SparkEnv.get.blockManager.

If the broadcast was available locally, readBroadcastBlock releases a lock for the broadcast and returns the value.

If however the broadcast was not found locally, you should see the following INFO message in the logs:

INFO Started reading broadcast variable [id]

readBroadcastBlock reads blocks (as chunks) of the broadcast.

You should see the following INFO message in the logs:

INFO Reading broadcast variable [id] took [usedTimeMs]
Note
readBroadcastBlock uses the current Serializer and the internal CompressionCodec to bring all the blocks together as one single broadcast variable.

readBroadcastBlock stores the broadcast variable with MEMORY_AND_DISK storage level to the local BlockManager. When storing the broadcast variable was unsuccessful, a SparkException is thrown.

Failed to store [broadcastId] in BlockManager

The broadcast variable is returned.

Note
readBroadcastBlock is exclusively used to recreate a broadcast variable on executors.

setConf Internal Method

setConf(conf: SparkConf): Unit

setConf uses the input conf SparkConf to set compression codec and the block size.

Internally, setConf reads spark.broadcast.compress Spark property and if enabled (which it is by default) sets a CompressionCodec (as an internal compressionCodec property).

setConf also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal blockSize property).

Storing Broadcast and Its Blocks in Local BlockManager — writeBlocks Internal Method

writeBlocks(value: T): Int

writeBlocks is an internal method to store the broadcast’s value and blocks in the driver’s BlockManager. It returns the number of the broadcast blocks the broadcast was divided into.

Note
writeBlocks is exclusively used when a TorrentBroadcast is created that happens on the driver only. It sets the internal numBlocks property that is serialized as a number before the broadcast is sent to executors (after they have called value method).

Internally, writeBlocks stores the block for value broadcast to the local BlockManager (using a new BroadcastBlockId, value, MEMORY_AND_DISK storage level and without telling the driver).

If storing the broadcast block fails, you should see the following SparkException in the logs:

Failed to store [broadcastId] in BlockManager

writeBlocks divides value into blocks (of spark.broadcast.blockSize size) using the Serializer and an optional CompressionCodec (enabled by spark.broadcast.compress). Every block gets its own BroadcastBlockId (with piece and an index) that is wrapped inside a ChunkedByteBuffer. Blocks are stored in the local BlockManager (using the piece block id, MEMORY_AND_DISK_SER storage level and informing the driver).

Note
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level, and the pieces with MEMORY_AND_DISK_SER storage level.

If storing any of the broadcast pieces fails, you should see the following SparkException in the logs:

Failed to store [pieceId] of [broadcastId] in local BlockManager

Chunking Broadcast Into Blocks — blockifyObject Method

blockifyObject[T](
  obj: T,
  blockSize: Int,
  serializer: Serializer,
  compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]

blockifyObject divides (aka blockifies) the input obj broadcast variable into blocks (of ByteBuffer). blockifyObject uses the input serializer Serializer to write obj in a serialized format to a ChunkedByteBufferOutputStream (of blockSize size) with the optional CompressionCodec.

doUnpersist Method

doUnpersist(blocking: Boolean): Unit
Note
doUnpersist is a part of the Broadcast Variable Contract and is executed from unpersist method.

doDestroy Method

doDestroy(blocking: Boolean): Unit

unpersist Internal Method

unpersist(
  id: Long,
  removeFromDriver: Boolean,
  blocking: Boolean): Unit

unpersist removes all broadcast blocks from executors and possibly the driver (only when removeFromDriver flag is enabled).

Note
unpersist belongs to TorrentBroadcast private object and is executed when TorrentBroadcast unpersists a broadcast variable and removes a broadcast variable completely.

When executed, you should see the following DEBUG message in the logs:

DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast [id]
Note
unpersist uses SparkEnv to get the BlockManagerMaster (through blockManager property).

results matching ""

    No results matching ""