val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02 +- Range (0, 10, step=1, splits=8)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute
// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
import org.apache.spark.rdd.RDD
def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
import org.apache.spark.rdd.MapPartitionsRDD
hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
}
}
// END :paste -raw
import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)
import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?
TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator
TungstenAggregationIterator is a custom AggregationIterator that is created when HashAggregateExec aggregate physical operator is executed (to process rows per partition).
next Method
|
Caution
|
FIXME |
hasNext Method
|
Caution
|
FIXME |
Creating TungstenAggregationIterator Instance
TungstenAggregationIterator takes the following when created:
-
Grouping named expressions
-
Aggregate attributes
-
Output named expressions
-
Function to create a new
MutableProjectiongiven Catalyst expressions and attributes -
Output attributes of the child operator of
HashAggregateExec -
Iterator of
InternalRowsfrom a single partition of the child’s resultRDD[InternalRow] -
Optional
HashAggregateExec's testFallbackStartsAt -
numOutputRowsSQLMetric -
peakMemorySQLMetric -
spillSizeSQLMetric
TungstenAggregationIterator initializes the internal registries and counters.