// Start a Spark application, e.g. spark-shell, with the Spark properties to trigger selection of BaseShuffleHandle:
// 1. spark.shuffle.spill.numElementsForceSpillThreshold=1
// 2. spark.shuffle.sort.bypassMergeThreshold=1
// numSlices > spark.shuffle.sort.bypassMergeThreshold
scala> val rdd = sc.parallelize(0 to 4, numSlices = 2).groupBy(_ % 2)
rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:24
scala> rdd.dependencies
DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle 0 because an aggregator is defined
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1160c54b)
scala> rdd.getNumPartitions
res1: Int = 2
scala> import org.apache.spark.ShuffleDependency
import org.apache.spark.ShuffleDependency
scala> val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]]
shuffleDep: org.apache.spark.ShuffleDependency[Int,Int,Int] = org.apache.spark.ShuffleDependency@1160c54b
// mapSideCombine is disabled
scala> shuffleDep.mapSideCombine
res2: Boolean = false
// aggregator defined
scala> shuffleDep.aggregator
res3: Option[org.apache.spark.Aggregator[Int,Int,Int]] = Some(Aggregator(<function1>,<function2>,<function2>))
// the number of reduce partitions < spark.shuffle.sort.bypassMergeThreshold
scala> shuffleDep.partitioner.numPartitions
res4: Int = 2
scala> shuffleDep.shuffleHandle
res5: org.apache.spark.shuffle.ShuffleHandle = org.apache.spark.shuffle.BaseShuffleHandle@22b0fe7e
BaseShuffleHandle — Fallback Shuffle Handle
BaseShuffleHandle
is a ShuffleHandle
that is created solely to capture the parameters when SortShuffleManager
is requested for a ShuffleHandle
(for a ShuffleDependency
):
Note
|
BaseShuffleHandle is the last possible choice when SortShuffleManager is requested for a ShuffleHandle (after BypassMergeSortShuffleHandle and SerializedShuffleHandle have already been considered and failed the check).
|