// HashAggregateExec selected due to:
// sum uses mutable types for aggregate expression
// just a single id column reference of LongType data type
val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#57L % 2)#69L], functions=[sum(id#57L)])
+- Exchange hashpartitioning((id#57L % 2)#69L, 200)
+- *HashAggregate(keys=[(id#57L % 2) AS (id#57L % 2)#69L], functions=[partial_sum(id#57L)])
+- *Range (0, 10, step=1, splits=8)
scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 HashAggregate(keys=[(id#57L % 2)#72L], functions=[sum(id#57L)], output=[group#60L, sum#64L])
01 +- HashAggregate(keys=[(id#57L % 2) AS (id#57L % 2)#72L], functions=[partial_sum(id#57L)], output=[(id#57L % 2)#72L, sum#74L])
02 +- Range (0, 10, step=1, splits=8)
// Going low level...watch your steps :)
import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val aggregateExpressions: Seq[AggregateExpression] = PhysicalAggregation.unapply(aggLog).get._2
val aggregateBufferAttributes = aggregateExpressions.
flatMap(_.aggregateFunction.aggBufferAttributes)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
// that's the exact reason why HashAggregateExec was selected
// Aggregation execution planning strategy prefers HashAggregateExec
scala> val useHash = HashAggregateExec.supportsAggregate(aggregateBufferAttributes)
useHash: Boolean = true
val execPlan = q.queryExecution.sparkPlan
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#39L % 2)#50L], functions=[sum(id#39L)], output=[group#42L, sum#46L])
01 +- HashAggregate(keys=[(id#39L % 2) AS (id#39L % 2)#50L], functions=[partial_sum(id#39L)], output=[(id#39L % 2)#50L, sum#52L])
02 +- Range (0, 10, step=1, splits=8)
val hashAggExecRDD = hashAggExec.execute // <-- calls doExecute
scala> println(hashAggExecRDD.toDebugString)
(8) MapPartitionsRDD[14] at execute at <console>:35 []
| MapPartitionsRDD[13] at execute at <console>:35 []
| MapPartitionsRDD[12] at execute at <console>:35 []
| ParallelCollectionRDD[11] at execute at <console>:35 []
HashAggregateExec Aggregate Physical Operator for Hash-Based Aggregation
HashAggregateExec
is a unary physical operator for hash-based aggregation that is created (indirectly through AggUtils.createAggregate) when:
-
Aggregation execution planning strategy selects the aggregate physical operator for an Aggregate logical operator
-
Structured Streaming’s
StatefulAggregationStrategy
strategy creates plan for streamingEventTimeWatermark
or Aggregate logical operators
Note
|
HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec ).
|
HashAggregateExec
supports code generation (aka codegen).
Name | Description |
---|---|
|
aggregate time |
|
number of output rows |
|
peak memory |
|
spill size |
Name | Description |
---|---|
Collection of |
|
Output schema for the input NamedExpressions |
requiredChildDistribution
varies per the input required child distribution expressions.
requiredChildDistributionExpressions | Distribution |
---|---|
Defined, but empty |
|
Non-empty |
|
Undefined ( |
|
Note
|
(No distinct in aggregation)
(one distinct in aggregation)
FIXME for the following two cases in aggregation with one distinct. |
Note
|
The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.
|
testFallbackStartsAt
Internal Value
Caution
|
FIXME |
supportsAggregate
Method
supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean
supportsAggregate
first builds the schema of the aggregation buffer (from the input aggregateBufferAttributes
attributes) and checks if UnsafeFixedWidthAggregationMap
supports it (i.e. the schema uses mutable field data types only that have fixed length and can be mutated in place in an UnsafeRow).
Note
|
supportsAggregate is used exclusively when AggUtils.createAggregate selects an aggregate physical operator given aggregate expressions.
|
Creating HashAggregateExec Instance
HashAggregateExec
takes the following when created:
-
Required child distribution expressions
-
Grouping named expressions
-
Aggregate attributes
-
Output named expressions
-
Child physical operator
Executing HashAggregateExec — doExecute
Method
doExecute(): RDD[InternalRow]
doExecute
executes the input child SparkPlan (to produce InternalRow objects) and applies calculation over partitions (using RDD.mapPartitions
).
Important
|
RDD.mapPartitions does not preserve partitioning and neither does HashAggregateExec when executed.
|
In the mapPartitions
block, doExecute
creates one of the following:
-
an empty iterator for no-record partitions with at least one grouping expression
Note
|
doExecute is a part of SparkPlan Contract to produce the result of a structured query as an RDD of InternalRow objects.
|
doProduce
Method
doProduce(ctx: CodegenContext): String
doProduce
executes doProduceWithoutKeys when no groupingExpressions were specified for the HashAggregateExec
or doProduceWithKeys otherwise.
Note
|
doProduce is a part of CodegenSupport Contract.
|