scala> val rdd = sc.parallelize(0 to 8).groupBy(_ % 3)
rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:24
scala> rdd.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@454f6cc5)
ShuffleDependency — Shuffle Dependency
ShuffleDependency
is a RDD Dependency on the output of a ShuffleMapStage for a key-value pair RDD.
ShuffleDependency
uses the RDD to know the number of (map-side/pre-shuffle) partitions and the Partitioner
for the number of (reduce-size/post-shuffle) partitions.
ShuffleDependency
is a dependency of ShuffledRDD as well as CoGroupedRDD and SubtractedRDD but only when partitioners (of the RDD’s and after transformations) are different.
A ShuffleDependency
is created for a key-value pair RDD, i.e. RDD[Product2[K, V]]
with K
and V
being the types of keys and values, respectively.
Tip
|
Use dependencies method on an RDD to know the dependencies.
|
Every ShuffleDependency
has a unique application-wide shuffleId number that is assigned when ShuffleDependency
is created (and is used throughout Spark’s code to reference a ShuffleDependency
).
Note
|
Shuffle ids are tracked by SparkContext .
|
keyOrdering
Property
Caution
|
FIXME |
serializer
Property
Caution
|
FIXME |
Creating ShuffleDependency Instance
ShuffleDependency
takes the following when created:
-
A single key-value pair RDD, i.e.
RDD[Product2[K, V]]
, -
Partitioner (available as
partitioner
property), -
Optional key ordering (of Scala’s scala.math.Ordering type),
-
Optional Aggregator,
-
mapSideCombine flag which is disabled (i.e.
false
) by default.
Note
|
ShuffleDependency uses SparkEnv to access the current Serializer .
|
When created, ShuffleDependency
gets shuffle id (as shuffleId
).
Note
|
ShuffleDependency uses the input RDD to access SparkContext and so the shuffleId .
|
ShuffleDependency
registers itself with ShuffleManager
and gets a ShuffleHandle
(available as shuffleHandle property).
Note
|
ShuffleDependency accesses ShuffleManager using SparkEnv .
|
In the end, ShuffleDependency
registers itself for cleanup with ContextCleaner
.
Note
|
ShuffleDependency accesses the optional ContextCleaner through SparkContext .
|
Note
|
ShuffleDependency is created when ShuffledRDD, CoGroupedRDD, and SubtractedRDD return their RDD dependencies.
|
rdd
Property
rdd: RDD[Product2[K, V]]
rdd
returns a key-value pair RDD this ShuffleDependency
was created for.
Note
|
|
partitioner
Property
partitioner
property is a Partitioner that is used to partition the shuffle output.
partitioner
is specified when ShuffleDependency
is created.
Note
|
|
shuffleHandle
Property
shuffleHandle: ShuffleHandle
shuffleHandle
is the ShuffleHandle
of a ShuffleDependency
as assigned eagerly when ShuffleDependency
was created.
Note
|
shuffleHandle is used to compute CoGroupedRDDs, ShuffledRDD, SubtractedRDD, and ShuffledRowRDD (to get a ShuffleReader for a ShuffleDependency ) and when a ShuffleMapTask runs (to get a ShuffleWriter for a ShuffleDependency ).
|
Map-Size Combine Flag — mapSideCombine
Attribute
mapSideCombine
is a flag to control whether to use partial aggregation (aka map-side combine).
mapSideCombine
is by default disabled (i.e. false
) when creating a ShuffleDependency
.
When enabled, SortShuffleWriter and BlockStoreShuffleReader assume that an Aggregator is also defined.
Note
|
mapSideCombine is exclusively set (and hence can be enabled) when ShuffledRDD returns the dependencies (which is a single ShuffleDependency ).
|
aggregator
Property
aggregator: Option[Aggregator[K, V, C]] = None
aggregator
is a map/reduce-side Aggregator (for a RDD’s shuffle).
aggregator
is by default undefined (i.e. None
) when ShuffleDependency
is created.
Note
|
aggregator is used when SortShuffleWriter writes records and BlockStoreShuffleReader reads combined key-values for a reduce task.
|
Usage
The places where ShuffleDependency
is used:
-
ShuffledRDD and ShuffledRowRDD that are RDDs from a shuffle
The RDD operations that may or may not use the above RDDs and hence shuffling:
-
cogroup
-
intersection
-
-
subtractByKey
-
subtract
-
-
sortByKey
-
sortBy
-
-
repartitionAndSortWithinPartitions
-
-
combineByKey
-
aggregateByKey
-
foldByKey
-
reduceByKey
-
countApproxDistinctByKey
-
groupByKey
-
-
partitionBy
Note
|
There may be other dependent methods that use the above. |