Aggregation Execution Planning Strategy for Aggregate Physical Operators

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// structured query with count aggregate function
val q = spark.range(5).
  groupBy($"id" % 2 as "group").
  agg(count("id") as "count")
import q.queryExecution.optimizedPlan
scala> println(optimizedPlan.numberedTreeString)
00 Aggregate [(id#0L % 2)], [(id#0L % 2) AS group#3L, count(1) AS count#8L]
01 +- Range (0, 5, step=1, splits=Some(8))

import spark.sessionState.planner.Aggregation
val physicalPlan = Aggregation.apply(optimizedPlan)

// HashAggregateExec selected
scala> println(physicalPlan.head.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#12L], functions=[count(1)], output=[group#3L, count#8L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_count(1)], output=[(id#0L % 2)#12L, count#14L])
02    +- PlanLater Range (0, 5, step=1, splits=Some(8))

Aggregation can select the following aggregate physical operators (in order of preference):

AggUtils.planAggregateWithOneDistinct Method

Caution
FIXME

Executing Planning Strategy — apply Method

apply(plan: LogicalPlan): Seq[SparkPlan]

apply finds Aggregate logical operators and creates a single aggregate physical operator for every Aggregate logical operator.

Internally, apply destructures a Aggregate logical operator (into a four-element tuple) and splits aggregate expressions per whether they are distinct or not (using their isDistinct flag).

apply then creates a physical operator using the following helper methods:

Note
apply is a part of GenericStrategy Contract to execute a planning strategy.

Selecting Aggregate Physical Operator Given Aggregate Expressions — AggUtils.createAggregate Internal Method

createAggregate(
  requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
  groupingExpressions: Seq[NamedExpression] = Nil,
  aggregateExpressions: Seq[AggregateExpression] = Nil,
  aggregateAttributes: Seq[Attribute] = Nil,
  initialInputBufferOffset: Int = 0,
  resultExpressions: Seq[NamedExpression] = Nil,
  child: SparkPlan): SparkPlan

Internally, createAggregate selects and creates a physical operator given the input aggregateExpressions aggregate expressions.

Table 1. createAggregate’s Aggregate Physical Operator Selection Criteria (in execution order)
Aggregate Physical Operator Selection Criteria

HashAggregateExec

HashAggregateExec supports all aggBufferAttributes of the input aggregateExpressions aggregate expressions.

ObjectHashAggregateExec

  1. spark.sql.execution.useObjectHashAggregateExec internal flag enabled (it is by default)

  2. ObjectHashAggregateExec supports the input aggregateExpressions aggregate expressions.

SortAggregateExec

When all the above requirements could not be met.

Note

createAggregate is used in:

Creating Physical Plan with Two Aggregate Physical Operators for Partial and Final Aggregations — AggUtils.planAggregateWithoutDistinct Method

planAggregateWithoutDistinct(
  groupingExpressions: Seq[NamedExpression],
  aggregateExpressions: Seq[AggregateExpression],
  resultExpressions: Seq[NamedExpression],
  child: SparkPlan): Seq[SparkPlan]

planAggregateWithoutDistinct is a two-step physical operator generator.

planAggregateWithoutDistinct first creates an aggregate physical operator with aggregateExpressions in Partial mode (for partial aggregations).

Note
requiredChildDistributionExpressions for the aggregate physical operator for partial aggregation "stage" is empty.

In the end, planAggregateWithoutDistinct creates another aggregate physical operator (of the same type as before), but aggregateExpressions are now in Final mode (for final aggregations). The aggregate physical operator becomes the parent of the first aggregate operator.

Note
requiredChildDistributionExpressions for the parent aggregate physical operator for final aggregation "stage" are the attributes of groupingExpressions.
Note
planAggregateWithoutDistinct is used exclusively when Aggregation execution planning strategy is executed (with no AggregateExpressions being distinct).

Destructuring Aggregate Logical Operator — PhysicalAggregation.unapply Method

unapply(a: Any): Option[ReturnType]

unapply destructures the input a Aggregate logical operator into a four-element ReturnType.

Note

ReturnType is a type alias (aka type synonym) for a four-element tuple with grouping, aggregate and result Catalyst expressions, and child logical operator.

type ReturnType =
  (Seq[NamedExpression], Seq[AggregateExpression], Seq[NamedExpression], LogicalPlan)
Note
PhysicalAggregation is a Scala extractor object with a single unapply method.

results matching ""

    No results matching ""