val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02 +- Range (0, 10, step=1, splits=8)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute
// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
import org.apache.spark.rdd.RDD
def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
import org.apache.spark.rdd.MapPartitionsRDD
hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
}
}
// END :paste -raw
import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)
import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?
TungstenAggregationIterator — Iterator of UnsafeRows for HashAggregateExec Physical Operator
TungstenAggregationIterator
is a custom AggregationIterator that is created when HashAggregateExec aggregate physical operator is executed (to process rows per partition).
next
Method
Caution
|
FIXME |
hasNext
Method
Caution
|
FIXME |
Creating TungstenAggregationIterator Instance
TungstenAggregationIterator
takes the following when created:
-
Grouping named expressions
-
Aggregate attributes
-
Output named expressions
-
Function to create a new
MutableProjection
given Catalyst expressions and attributes -
Output attributes of the child operator of
HashAggregateExec
-
Iterator of
InternalRows
from a single partition of the child’s resultRDD[InternalRow]
-
Optional
HashAggregateExec
's testFallbackStartsAt -
numOutputRows
SQLMetric -
peakMemory
SQLMetric -
spillSize
SQLMetric
TungstenAggregationIterator
initializes the internal registries and counters.