scala> val rdd = sc.parallelize(0 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.getNumPartitions
res0: Int = 8
// ShuffledRDD and coalesce Example
scala> rdd.coalesce(numPartitions = 4, shuffle = true).toDebugString
res1: String =
(4) MapPartitionsRDD[4] at coalesce at <console>:27 []
| CoalescedRDD[3] at coalesce at <console>:27 []
| ShuffledRDD[2] at coalesce at <console>:27 []
+-(8) MapPartitionsRDD[1] at coalesce at <console>:27 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
// ShuffledRDD and sortByKey Example
scala> val grouped = rdd.groupBy(_ % 2)
grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupBy at <console>:26
scala> grouped.sortByKey(numPartitions = 2).toDebugString
res2: String =
(2) ShuffledRDD[9] at sortByKey at <console>:29 []
+-(8) ShuffledRDD[6] at groupBy at <console>:26 []
+-(8) MapPartitionsRDD[5] at groupBy at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
ShuffledRDD
ShuffledRDD
is an RDD of key-value pairs that represents the shuffle step in a RDD lineage. It uses custom ShuffledRDDPartition partitions.
A ShuffledRDD
is created for RDD transformations that trigger a data shuffling:
-
coalesce
transformation (withshuffle
flag enabled). -
PairRDDFunctions
's combineByKeyWithClassTag and partitionBy (when the parent RDD’s and specified Partitioners are different). -
OrderedRDDFunctions
's sortByKey and repartitionAndSortWithinPartitions ordered operators.
ShuffledRDD
takes a parent RDD and a Partitioner when created.
getDependencies
returns a single-element collection of RDD dependencies with a ShuffleDependency (with the Serializer
according to map-side combine internal flag).
Map-Side Combine mapSideCombine
Internal Flag
mapSideCombine: Boolean
mapSideCombine
internal flag is used to select the Serializer
(for shuffling) when ShuffleDependency
is created (which is the one and only Dependency
of a ShuffledRDD
).
Note
|
mapSideCombine is only used when userSpecifiedSerializer optional Serializer is not specified explicitly (which is the default).
|
Note
|
mapSideCombine uses SparkEnv to access the current SerializerManager .
|
If enabled (i.e. true
), mapSideCombine
directs to find the Serializer
for the types K
and C
. Otherwise, getDependencies
finds the Serializer
for the types K
and V
.
Note
|
The types K , C and V are specified when ShuffledRDD is created.
|
Note
|
|
Computing Partition (in TaskContext
) — compute
Method
compute(split: Partition, context: TaskContext): Iterator[(K, C)]
Note
|
compute is a part of RDD contract to compute a given partition in a TaskContext.
|
Internally, compute
makes sure that the input split
is a ShuffleDependency. It then requests ShuffleManager
for a ShuffleReader
to read key-value pairs (as Iterator[(K, C)]
) for the split
.
Note
|
compute uses SparkEnv to access the current ShuffleManager .
|
Note
|
A Partition has the index property to specify startPartition and endPartition partition offsets.
|
Getting Placement Preferences of Partition — getPreferredLocations
Method
getPreferredLocations(partition: Partition): Seq[String]
Note
|
getPreferredLocations is a part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
Internally, getPreferredLocations
requests MapOutputTrackerMaster
for the preferred locations, i.e. BlockManagers with the most map outputs, for the input partition
(of the one and only ShuffleDependency).
Note
|
getPreferredLocations uses SparkEnv to access the current MapOutputTrackerMaster (which runs on the driver).
|
ShuffledRDDPartition
ShuffledRDDPartition
gets an index
when it is created (that in turn is the index of partitions as calculated by the Partitioner of a ShuffledRDD).