val spark: SparkSession = ???
spark.sql("SELECT COUNT(*) FROM sales GROUP BY city")
Basic Aggregation — Typed and Untyped Grouping Operators
You can group records in a Dataset by using aggregate operators and then executing aggregate functions to calculate aggregates (over a collection of grouped records).
Note
|
Aggregate functions without aggregate operators return a single value. If you want to find the aggregate values for each unique value (in a column), you should groupBy first (over this column) to build the groups. |
Operator | Return Type | Description |
---|---|---|
Aggregates with or without grouping (i.e. entire Dataset) |
||
Used for untyped aggregations with DataFrames. Grouping is described using Column-based functions or column names. |
||
Used for type-preserving aggregations where records are grouped by a given key function. |
Note
|
You can also use SparkSession to execute good ol' SQL with |
Aggregates Over Subset Of or Whole Dataset — agg
Operators
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
agg applies an aggregate function on a subset or the entire Dataset
(i.e. considering the entire data set as one group).
Note
|
agg on a Dataset is simply a shortcut for groupBy().agg(…).
|
scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+
agg
can compute aggregate expressions on all the records in a Dataset
.
Untyped Grouping — groupBy
Operator
groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy
methods group the records in a Dataset
using the specified discriminator columns (as Columns or their text representation). It returns a RelationalGroupedDataset to execute aggregate functions or operators.
// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt
scala> val dataset = ints.toDF("n").withColumn("m", 'n % 2)
dataset: org.apache.spark.sql.DataFrame = [n: int, m: int]
scala> dataset.count
res0: Long = 1000
scala> dataset.groupBy('m).agg(sum('n)).show
+---+------+
| m|sum(n)|
+---+------+
| 1|250000|
| 0|250500|
+---+------+
Internally, it first resolves columns and then builds a RelationalGroupedDataset
.
Note
|
The following session uses the data setup as described in Test Setup section below. |
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa| 150.0| 0.205|
| bbb| 250.0| 0.475|
+----+--------------+----------+
scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
| aaa| 100| 0.12|
+----+---------+----------+
scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa| 2|
| bbb| 2|
+----+-----+
scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa| 0.29|
| bbb| 0.53|
+----+----------+
scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa| 0.41|
| bbb| 0.95|
+----+----------+
scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId| sum(score)|
+---------+------------------+
| 300| 0.42|
| 100| 0.12|
| 200|0.8200000000000001|
+---------+------------------+
Type-Preserving Grouping — groupByKey
Operator
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
groupByKey
groups records (of type T
) by the input func
. It returns a KeyValueGroupedDataset to apply aggregation to.
Note
|
groupByKey is Dataset 's experimental API.
|
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
| 300| 1|
| 100| 1|
| 200| 2|
+-----+--------+
import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId| sum|
+---------+------------------+
| 100| 0.12|
| 200|0.8200000000000001|
| 300| 0.42|
+---------+------------------+
Test Setup
This is a setup for learning GroupedData
. Paste it into Spark Shell using :paste
.
import spark.implicits._
case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
Token("aaa", 200, 0.29) ::
Token("bbb", 200, 0.53) ::
Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache (1)
-
Cache the dataset so the following queries won’t load/recompute data over and over again.