toDF(aggExprs: Seq[Expression]): DataFrame
RelationalGroupedDataset — Untyped Row-based Grouping
RelationalGroupedDataset is an interface to calculate aggregates over groups of rows in a DataFrame.
|
Note
|
KeyValueGroupedDataset is used for typed aggregates using custom Scala objects (not Rows). |
RelationalGroupedDataset is a result of executing the following grouping operators:
| Operator | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
Pivots on a column (with new columns per distinct value) |
|
|
|
Note
|
spark.sql.retainGroupColumns property controls whether to retain columns used for aggregation or not (in RelationalGroupedDataset operators). Enabled by default.
|
Creating DataFrame from Aggregate Expressions — toDF Internal Method
|
Caution
|
FIXME |
Internally, toDF branches off per group type.
|
Caution
|
FIXME |
For PivotType, toDF creates a DataFrame with Pivot unary logical operator.
Creating RelationalGroupedDataset Instance
RelationalGroupedDataset takes the following when created:
-
Grouping expressions
-
Group type (to indicate what operation has created it), i.e.
GroupByType,CubeType,RollupType,PivotType
agg Operator
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(expr: Column, exprs: Column*): DataFrame
pivot Operator
pivot(pivotColumn: String): RelationalGroupedDataset (1)
pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset (2)
-
Selects distinct and sorted values on
pivotColumnand calls the otherpivot(that results in 3 extra "scanning" jobs) -
Preferred as more efficient because the unique values are aleady provided
pivot pivots on a pivotColumn column, i.e. adds new columns per distinct values in pivotColumn.
|
Note
|
pivot is only supported after groupBy operation.
|
|
Note
|
Only one pivot operation is supported on a RelationalGroupedDataset.
|
val visits = Seq(
(0, "Warsaw", 2015),
(1, "Warsaw", 2016),
(2, "Boston", 2017)
).toDF("id", "city", "year")
val q = visits
.groupBy("city") // <-- rows in pivot table
.pivot("year") // <-- columns (unique values queried)
.count() // <-- values in cells
scala> q.show
+------+----+----+----+
| city|2015|2016|2017|
+------+----+----+----+
|Warsaw| 1| 1|null|
|Boston|null|null| 1|
+------+----+----+----+
scala> q.explain
== Physical Plan ==
HashAggregate(keys=[city#8], functions=[pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)])
+- Exchange hashpartitioning(city#8, 200)
+- HashAggregate(keys=[city#8], functions=[partial_pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)])
+- *HashAggregate(keys=[city#8, year#9], functions=[count(1)])
+- Exchange hashpartitioning(city#8, year#9, 200)
+- *HashAggregate(keys=[city#8, year#9], functions=[partial_count(1)])
+- LocalTableScan [city#8, year#9]
scala> visits
.groupBy('city)
.pivot("year", Seq("2015")) // <-- one column in pivot table
.count
.show
+------+----+
| city|2015|
+------+----+
|Warsaw| 1|
|Boston|null|
+------+----+
|
Important
|
Use pivot with a list of distinct values to pivot on so Spark does not have to compute the list itself (and run three extra "scanning" jobs).
|
|
Note
|
spark.sql.pivotMaxValues (default: 10000) controls the maximum number of (distinct) values that will be collected without error (when doing pivot without specifying the values for the pivot column).
|
Internally, pivot creates a RelationalGroupedDataset with PivotType group type and pivotColumn resolved using the DataFrame’s columns with values as Literal expressions.
|
Note
|
|