import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// structured query with count aggregate function
val q = spark.range(5).
groupBy($"id" % 2 as "group").
agg(count("id") as "count")
import q.queryExecution.optimizedPlan
scala> println(optimizedPlan.numberedTreeString)
00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L]
01 +- Range (0, 5, step=1, splits=Some(8))
import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(optimizedPlan)
// HashAggregateExec selected
scala> println(physicalPlan.head.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L])
02 +- PlanLater Range (0, 5, step=1, splits=Some(8))
Aggregation Execution Planning Strategy for Aggregate Physical Operators
Aggregation
is an execution planning strategy that SparkPlanner uses to select aggregate physical operator for Aggregate logical operator (in a query’s logical plan).
Aggregation
can select the following aggregate physical operators (in order of preference):
AggUtils.planAggregateWithOneDistinct
Method
Caution
|
FIXME |
Executing Planning Strategy — apply
Method
apply(plan: LogicalPlan): Seq[SparkPlan]
apply
finds Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator.
Internally, apply
destructures a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).
apply
then creates a physical operator using the following helper methods:
-
AggUtils.planAggregateWithoutDistinct when no distinct aggregate expression is used
-
AggUtils.planAggregateWithOneDistinct when at least one distinct aggregate expression is used.
Note
|
apply is a part of GenericStrategy Contract to execute a planning strategy.
|
Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.createAggregate
Internal Method
createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan
Internally, createAggregate
selects and creates a physical operator given the input aggregateExpressions
aggregate expressions.
Aggregate Physical Operator | Selection Criteria |
---|---|
|
|
|
|
When all the above requirements could not be met. |
Note
|
|
Creating Physical Plan with Two Aggregate Physical Operators for Partial and Final Aggregations — AggUtils.planAggregateWithoutDistinct
Method
planAggregateWithoutDistinct(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan]
planAggregateWithoutDistinct
is a two-step physical operator generator.
planAggregateWithoutDistinct
first creates an aggregate physical operator with aggregateExpressions
in Partial
mode (for partial aggregations).
Note
|
requiredChildDistributionExpressions for the aggregate physical operator for partial aggregation "stage" is empty.
|
In the end, planAggregateWithoutDistinct
creates another aggregate physical operator (of the same type as before), but aggregateExpressions
are now in Final
mode (for final aggregations). The aggregate physical operator becomes the parent of the first aggregate operator.
Note
|
requiredChildDistributionExpressions for the parent aggregate physical operator for final aggregation "stage" are the attributes of groupingExpressions .
|
Note
|
planAggregateWithoutDistinct is used exclusively when Aggregation execution planning strategy is executed (with no AggregateExpressions being distinct).
|
Destructuring Aggregate Logical Operator — PhysicalAggregation.unapply
Method
unapply(a: Any): Option[ReturnType]
unapply
destructures the input a
Aggregate logical operator into a four-element ReturnType.
Note
|
|
Note
|
PhysicalAggregation is a Scala extractor object with a single unapply method.
|