TorrentBroadcast — Default Broadcast Implementation
TorrentBroadcast is the default and only implementation of the Broadcast Contract that describes broadcast variables. TorrentBroadcast uses a BitTorrent-like protocol for block distribution (that only happens when tasks access broadcast variables on executors).
When a broadcast variable is created (using SparkContext.broadcast) on the driver, a new instance of TorrentBroadcast is created.
// On the driver
val sc: SparkContext = ???
val anyScalaValue = ???
val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created
A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.
|
Note
|
TorrentBroadcast-based broadcast variables are created using TorrentBroadcastFactory.
|
|
Note
|
TorrentBroadcast belongs to org.apache.spark.broadcast package.
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
unBlockifyObject Method
|
Caution
|
FIXME |
readBlocks Method
|
Caution
|
FIXME |
releaseLock Method
|
Caution
|
FIXME |
Creating TorrentBroadcast Instance
TorrentBroadcast[T](obj: T, id: Long)
extends Broadcast[T](id)
When created, TorrentBroadcast reads broadcast blocks (to the internal _value).
|
Note
|
The internal _value is transient so it is not serialized and sent over the wire to executors. It is later recreated lazily on executors when requested.
|
TorrentBroadcast then sets the internal optional CompressionCodec and the size of broadcast block (as controlled by spark.broadcast.blockSize Spark property in SparkConf per driver and executors).
|
Note
|
Compression is controlled by spark.broadcast.compress Spark property and is enabled by default. |
The internal broadcastId is BroadcastBlockId for the input id.
The internal numBlocks is set to the number of the pieces the broadcast was divided into.
|
Note
|
A broadcast’s blocks are first stored in the local BlockManager on the driver. |
Getting Value of Broadcast Variable — getValue Method
def getValue(): T
getValue returns the value of a broadcast variable.
|
Note
|
getValue is a part of the Broadcast Variable Contract and is the only way to access the value of a broadcast variable.
|
Internaly, getValue reads the internal _value that, once accessed, reads broadcast blocks from the local or remote BlockManagers.
|
Note
|
The internal _value is transient and lazy, i.e. it is not preserved when serialized and (re)created only when requested, respectively. That "trick" allows for serializing broadcast values on the driver before they are transferred to executors over the wire.
|
readBroadcastBlock Internal Method
readBroadcastBlock(): T
Internally, readBroadcastBlock sets the SparkConf
|
Note
|
The current SparkConf is available using SparkEnv.get.conf. |
readBroadcastBlock requests the local BlockManager for values of the broadcast.
|
Note
|
The current BlockManager is available using SparkEnv.get.blockManager. |
If the broadcast was available locally, readBroadcastBlock releases a lock for the broadcast and returns the value.
If however the broadcast was not found locally, you should see the following INFO message in the logs:
INFO Started reading broadcast variable [id]
readBroadcastBlock reads blocks (as chunks) of the broadcast.
You should see the following INFO message in the logs:
INFO Reading broadcast variable [id] took [usedTimeMs]
readBroadcastBlock unblockifies the collection of ByteBuffer blocks
|
Note
|
readBroadcastBlock uses the current Serializer and the internal CompressionCodec to bring all the blocks together as one single broadcast variable.
|
readBroadcastBlock stores the broadcast variable with MEMORY_AND_DISK storage level to the local BlockManager. When storing the broadcast variable was unsuccessful, a SparkException is thrown.
Failed to store [broadcastId] in BlockManager
The broadcast variable is returned.
|
Note
|
readBroadcastBlock is exclusively used to recreate a broadcast variable on executors.
|
setConf Internal Method
setConf(conf: SparkConf): Unit
setConf uses the input conf SparkConf to set compression codec and the block size.
Internally, setConf reads spark.broadcast.compress Spark property and if enabled (which it is by default) sets a CompressionCodec (as an internal compressionCodec property).
setConf also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal blockSize property).
|
Note
|
setConf is executed when TorrentBroadcast is created or re-created when deserialized on executors.
|
Storing Broadcast and Its Blocks in Local BlockManager — writeBlocks Internal Method
writeBlocks(value: T): Int
writeBlocks is an internal method to store the broadcast’s value and blocks in the driver’s BlockManager. It returns the number of the broadcast blocks the broadcast was divided into.
|
Note
|
writeBlocks is exclusively used when a TorrentBroadcast is created that happens on the driver only. It sets the internal numBlocks property that is serialized as a number before the broadcast is sent to executors (after they have called value method).
|
Internally, writeBlocks stores the block for value broadcast to the local BlockManager (using a new BroadcastBlockId, value, MEMORY_AND_DISK storage level and without telling the driver).
If storing the broadcast block fails, you should see the following SparkException in the logs:
Failed to store [broadcastId] in BlockManager
writeBlocks divides value into blocks (of spark.broadcast.blockSize size) using the Serializer and an optional CompressionCodec (enabled by spark.broadcast.compress). Every block gets its own BroadcastBlockId (with piece and an index) that is wrapped inside a ChunkedByteBuffer. Blocks are stored in the local BlockManager (using the piece block id, MEMORY_AND_DISK_SER storage level and informing the driver).
|
Note
|
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level, and the pieces with MEMORY_AND_DISK_SER storage level.
|
If storing any of the broadcast pieces fails, you should see the following SparkException in the logs:
Failed to store [pieceId] of [broadcastId] in local BlockManager
Chunking Broadcast Into Blocks — blockifyObject Method
blockifyObject[T](
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]
blockifyObject divides (aka blockifies) the input obj broadcast variable into blocks (of ByteBuffer). blockifyObject uses the input serializer Serializer to write obj in a serialized format to a ChunkedByteBufferOutputStream (of blockSize size) with the optional CompressionCodec.
|
Note
|
blockifyObject is executed when TorrentBroadcast stores a broadcast and its blocks to a local BlockManager.
|
doUnpersist Method
doUnpersist(blocking: Boolean): Unit
|
Note
|
doUnpersist is a part of the Broadcast Variable Contract and is executed from unpersist method.
|
doDestroy Method
doDestroy(blocking: Boolean): Unit
doDestroy removes all the persisted state associated with a broadcast variable on all the nodes in a Spark application, i.e. the driver and executors.
|
Note
|
doDestroy is executed when Broadcast removes the persisted data and metadata related to a broadcast variable.
|
unpersist Internal Method
unpersist(
id: Long,
removeFromDriver: Boolean,
blocking: Boolean): Unit
unpersist removes all broadcast blocks from executors and possibly the driver (only when removeFromDriver flag is enabled).
|
Note
|
unpersist belongs to TorrentBroadcast private object and is executed when TorrentBroadcast unpersists a broadcast variable and removes a broadcast variable completely.
|
When executed, you should see the following DEBUG message in the logs:
DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast [id]
unpersist requests BlockManagerMaster to remove the id broadcast.
|
Note
|
unpersist uses SparkEnv to get the BlockManagerMaster (through blockManager property).
|