toDF(aggExprs: Seq[Expression]): DataFrame
RelationalGroupedDataset — Untyped Row-based Grouping
RelationalGroupedDataset
is an interface to calculate aggregates over groups of rows in a DataFrame.
Note
|
KeyValueGroupedDataset is used for typed aggregates using custom Scala objects (not Rows). |
RelationalGroupedDataset
is a result of executing the following grouping operators:
Operator | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
Pivots on a column (with new columns per distinct value) |
|
|
Note
|
spark.sql.retainGroupColumns property controls whether to retain columns used for aggregation or not (in RelationalGroupedDataset operators). Enabled by default.
|
Creating DataFrame from Aggregate Expressions — toDF
Internal Method
Caution
|
FIXME |
Internally, toDF
branches off per group type.
Caution
|
FIXME |
For PivotType
, toDF
creates a DataFrame with Pivot unary logical operator.
Creating RelationalGroupedDataset Instance
RelationalGroupedDataset
takes the following when created:
-
Grouping expressions
-
Group type (to indicate what operation has created it), i.e.
GroupByType
,CubeType
,RollupType
,PivotType
agg
Operator
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(expr: Column, exprs: Column*): DataFrame
pivot
Operator
pivot(pivotColumn: String): RelationalGroupedDataset (1)
pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset (2)
-
Selects distinct and sorted values on
pivotColumn
and calls the otherpivot
(that results in 3 extra "scanning" jobs) -
Preferred as more efficient because the unique values are aleady provided
pivot
pivots on a pivotColumn
column, i.e. adds new columns per distinct values in pivotColumn
.
Note
|
pivot is only supported after groupBy operation.
|
Note
|
Only one pivot operation is supported on a RelationalGroupedDataset .
|
val visits = Seq(
(0, "Warsaw", 2015),
(1, "Warsaw", 2016),
(2, "Boston", 2017)
).toDF("id", "city", "year")
val q = visits
.groupBy("city") // <-- rows in pivot table
.pivot("year") // <-- columns (unique values queried)
.count() // <-- values in cells
scala> q.show
+------+----+----+----+
| city|2015|2016|2017|
+------+----+----+----+
|Warsaw| 1| 1|null|
|Boston|null|null| 1|
+------+----+----+----+
scala> q.explain
== Physical Plan ==
HashAggregate(keys=[city#8], functions=[pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)])
+- Exchange hashpartitioning(city#8, 200)
+- HashAggregate(keys=[city#8], functions=[partial_pivotfirst(year#9, count(1) AS `count`#222L, 2015, 2016, 2017, 0, 0)])
+- *HashAggregate(keys=[city#8, year#9], functions=[count(1)])
+- Exchange hashpartitioning(city#8, year#9, 200)
+- *HashAggregate(keys=[city#8, year#9], functions=[partial_count(1)])
+- LocalTableScan [city#8, year#9]
scala> visits
.groupBy('city)
.pivot("year", Seq("2015")) // <-- one column in pivot table
.count
.show
+------+----+
| city|2015|
+------+----+
|Warsaw| 1|
|Boston|null|
+------+----+
Important
|
Use pivot with a list of distinct values to pivot on so Spark does not have to compute the list itself (and run three extra "scanning" jobs).
|
Note
|
spark.sql.pivotMaxValues (default: 10000 ) controls the maximum number of (distinct) values that will be collected without error (when doing pivot without specifying the values for the pivot column).
|
Internally, pivot
creates a RelationalGroupedDataset
with PivotType
group type and pivotColumn
resolved using the DataFrame’s columns with values
as Literal
expressions.
Note
|
|