Aggregate Unary Logical Operator

Aggregate is a unary logical operator that holds the following:

Aggregate is created to represent the following after a logical plan is analyzed:

Note
Aggregate logical operator is translated to one of HashAggregateExec, ObjectHashAggregateExec or SortAggregateExec physical operators in Aggregation execution planning strategy.
Table 1. Aggregate’s Properties (in alphabetical order)
Name Description

maxRows

Child logical plan's maxRows

Note
Part of LogicalPlan contract.

output

Note
Part of QueryPlan contract.

resolved

Enabled when:

Note
Part of LogicalPlan contract.

validConstraints

The (expression) constraints of child logical plan and non-aggregate aggregate named expressions.

Note
Part of QueryPlan contract.

computeStats Method

Caution
FIXME
Note
computeStats is a part of LogicalPlan Contract to calculating statistics estimates (for cost-based optimizer).

Rule-Based Logical Optimization Phase

PushDownPredicate logical plan optimization applies so-called filter pushdown to a Pivot operator when under Filter operator and with all expressions deterministic.

import org.apache.spark.sql.catalyst.optimizer.PushDownPredicate

val q = visits
  .groupBy("city")
  .pivot("year")
  .count()
  .where($"city" === "Boston")

val pivotPlanAnalyzed = q.queryExecution.analyzed
scala> println(pivotPlanAnalyzed.numberedTreeString)
00 Filter (city#8 = Boston)
01 +- Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
02    +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
03       +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
04          +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
05             +- LocalRelation [_1#3, _2#4, _3#5]

val afterPushDown = PushDownPredicate(pivotPlanAnalyzed)
scala> println(afterPushDown.numberedTreeString)
00 Project [city#8, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[0] AS 2015#143L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[1] AS 2016#144L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#142[2] AS 2017#145L]
01 +- Aggregate [city#8], [city#8, pivotfirst(year#9, count(1) AS `count`#134L, 2015, 2016, 2017, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#142]
02    +- Aggregate [city#8, year#9], [city#8, year#9, count(1) AS count(1) AS `count`#134L]
03       +- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
04          +- Filter (_2#4 = Boston)
05             +- LocalRelation [_1#3, _2#4, _3#5]

results matching ""

    No results matching ""