TorrentBroadcast
— Default Broadcast
Implementation
TorrentBroadcast
is the default and only implementation of the Broadcast
Contract that describes broadcast variables. TorrentBroadcast
uses a BitTorrent-like protocol for block distribution (that only happens when tasks access broadcast variables on executors).
When a broadcast variable is created (using SparkContext.broadcast
) on the driver, a new instance of TorrentBroadcast
is created.
// On the driver
val sc: SparkContext = ???
val anyScalaValue = ???
val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created
A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.
Note
|
TorrentBroadcast -based broadcast variables are created using TorrentBroadcastFactory.
|
Note
|
TorrentBroadcast belongs to org.apache.spark.broadcast package.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
unBlockifyObject
Method
Caution
|
FIXME |
readBlocks
Method
Caution
|
FIXME |
releaseLock
Method
Caution
|
FIXME |
Creating TorrentBroadcast
Instance
TorrentBroadcast[T](obj: T, id: Long)
extends Broadcast[T](id)
When created, TorrentBroadcast
reads broadcast blocks (to the internal _value
).
Note
|
The internal _value is transient so it is not serialized and sent over the wire to executors. It is later recreated lazily on executors when requested.
|
TorrentBroadcast
then sets the internal optional CompressionCodec and the size of broadcast block (as controlled by spark.broadcast.blockSize Spark property in SparkConf per driver and executors).
Note
|
Compression is controlled by spark.broadcast.compress Spark property and is enabled by default. |
The internal broadcastId
is BroadcastBlockId for the input id
.
The internal numBlocks
is set to the number of the pieces the broadcast was divided into.
Note
|
A broadcast’s blocks are first stored in the local BlockManager on the driver. |
Getting Value of Broadcast Variable — getValue
Method
def getValue(): T
getValue
returns the value of a broadcast variable.
Note
|
getValue is a part of the Broadcast Variable Contract and is the only way to access the value of a broadcast variable.
|
Internaly, getValue
reads the internal _value
that, once accessed, reads broadcast blocks from the local or remote BlockManagers.
Note
|
The internal _value is transient and lazy, i.e. it is not preserved when serialized and (re)created only when requested, respectively. That "trick" allows for serializing broadcast values on the driver before they are transferred to executors over the wire.
|
readBroadcastBlock
Internal Method
readBroadcastBlock(): T
Internally, readBroadcastBlock
sets the SparkConf
Note
|
The current SparkConf is available using SparkEnv.get.conf. |
readBroadcastBlock
requests the local BlockManager
for values of the broadcast.
Note
|
The current BlockManager is available using SparkEnv.get.blockManager. |
If the broadcast was available locally, readBroadcastBlock
releases a lock for the broadcast and returns the value.
If however the broadcast was not found locally, you should see the following INFO message in the logs:
INFO Started reading broadcast variable [id]
readBroadcastBlock
reads blocks (as chunks) of the broadcast.
You should see the following INFO message in the logs:
INFO Reading broadcast variable [id] took [usedTimeMs]
readBroadcastBlock
unblockifies the collection of ByteBuffer
blocks
Note
|
readBroadcastBlock uses the current Serializer and the internal CompressionCodec to bring all the blocks together as one single broadcast variable.
|
readBroadcastBlock
stores the broadcast variable with MEMORY_AND_DISK
storage level to the local BlockManager
. When storing the broadcast variable was unsuccessful, a SparkException
is thrown.
Failed to store [broadcastId] in BlockManager
The broadcast variable is returned.
Note
|
readBroadcastBlock is exclusively used to recreate a broadcast variable on executors.
|
setConf
Internal Method
setConf(conf: SparkConf): Unit
setConf
uses the input conf
SparkConf to set compression codec and the block size.
Internally, setConf
reads spark.broadcast.compress Spark property and if enabled (which it is by default) sets a CompressionCodec (as an internal compressionCodec
property).
setConf
also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal blockSize
property).
Note
|
setConf is executed when TorrentBroadcast is created or re-created when deserialized on executors.
|
Storing Broadcast and Its Blocks in Local BlockManager — writeBlocks
Internal Method
writeBlocks(value: T): Int
writeBlocks
is an internal method to store the broadcast’s value
and blocks in the driver’s BlockManager. It returns the number of the broadcast blocks the broadcast was divided into.
Note
|
writeBlocks is exclusively used when a TorrentBroadcast is created that happens on the driver only. It sets the internal numBlocks property that is serialized as a number before the broadcast is sent to executors (after they have called value method).
|
Internally, writeBlocks
stores the block for value
broadcast to the local BlockManager
(using a new BroadcastBlockId, value
, MEMORY_AND_DISK
storage level and without telling the driver).
If storing the broadcast block fails, you should see the following SparkException
in the logs:
Failed to store [broadcastId] in BlockManager
writeBlocks
divides value
into blocks (of spark.broadcast.blockSize size) using the Serializer and an optional CompressionCodec (enabled by spark.broadcast.compress). Every block gets its own BroadcastBlockId
(with piece
and an index) that is wrapped inside a ChunkedByteBuffer
. Blocks are stored in the local BlockManager
(using the piece
block id, MEMORY_AND_DISK_SER
storage level and informing the driver).
Note
|
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level, and the pieces with MEMORY_AND_DISK_SER storage level.
|
If storing any of the broadcast pieces fails, you should see the following SparkException
in the logs:
Failed to store [pieceId] of [broadcastId] in local BlockManager
Chunking Broadcast Into Blocks — blockifyObject
Method
blockifyObject[T](
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]
blockifyObject
divides (aka blockifies) the input obj
broadcast variable into blocks (of ByteBuffer
). blockifyObject
uses the input serializer
Serializer
to write obj
in a serialized format to a ChunkedByteBufferOutputStream
(of blockSize
size) with the optional CompressionCodec.
Note
|
blockifyObject is executed when TorrentBroadcast stores a broadcast and its blocks to a local BlockManager .
|
doUnpersist
Method
doUnpersist(blocking: Boolean): Unit
Note
|
doUnpersist is a part of the Broadcast Variable Contract and is executed from unpersist method.
|
doDestroy
Method
doDestroy(blocking: Boolean): Unit
doDestroy
removes all the persisted state associated with a broadcast variable on all the nodes in a Spark application, i.e. the driver and executors.
Note
|
doDestroy is executed when Broadcast removes the persisted data and metadata related to a broadcast variable.
|
unpersist
Internal Method
unpersist(
id: Long,
removeFromDriver: Boolean,
blocking: Boolean): Unit
unpersist
removes all broadcast blocks from executors and possibly the driver (only when removeFromDriver
flag is enabled).
Note
|
unpersist belongs to TorrentBroadcast private object and is executed when TorrentBroadcast unpersists a broadcast variable and removes a broadcast variable completely.
|
When executed, you should see the following DEBUG message in the logs:
DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast [id]
unpersist
requests BlockManagerMaster
to remove the id
broadcast.
Note
|
unpersist uses SparkEnv to get the BlockManagerMaster (through blockManager property).
|