Basic Aggregation — Typed and Untyped Grouping Operators

You can group records in a Dataset by using aggregate operators and then executing aggregate functions to calculate aggregates (over a collection of grouped records).

Note
Aggregate functions without aggregate operators return a single value. If you want to find the aggregate values for each unique value (in a column), you should groupBy first (over this column) to build the groups.
Table 1. Aggregate Operators (in alphabetical order)
Operator Return Type Description

agg

RelationalGroupedDataset

Aggregates with or without grouping (i.e. entire Dataset)

groupBy

RelationalGroupedDataset

Used for untyped aggregations with DataFrames. Grouping is described using Column-based functions or column names.

groupByKey

KeyValueGroupedDataset

Used for type-preserving aggregations where records are grouped by a given key function.

Note

You can also use SparkSession to execute good ol' SQL with GROUP BY.

val spark: SparkSession = ???
spark.sql("SELECT COUNT(*) FROM sales GROUP BY city")

Aggregates Over Subset Of or Whole Dataset — agg Operators

agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

agg applies an aggregate function on a subset or the entire Dataset (i.e. considering the entire data set as one group).

Note
agg on a Dataset is simply a shortcut for groupBy().agg(…​).
scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+

agg can compute aggregate expressions on all the records in a Dataset.

Untyped Grouping — groupBy Operator

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

groupBy methods group the records in a Dataset using the specified discriminator columns (as Columns or their text representation). It returns a RelationalGroupedDataset to execute aggregate functions or operators.

// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt

scala> val dataset = ints.toDF("n").withColumn("m", 'n % 2)
dataset: org.apache.spark.sql.DataFrame = [n: int, m: int]

scala> dataset.count
res0: Long = 1000

scala> dataset.groupBy('m).agg(sum('n)).show
+---+------+
|  m|sum(n)|
+---+------+
|  1|250000|
|  0|250500|
+---+------+

Internally, it first resolves columns and then builds a RelationalGroupedDataset.

Note
The following session uses the data setup as described in Test Setup section below.
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|
+----+---------+-----+

scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa|         150.0|     0.205|
| bbb|         250.0|     0.475|
+----+--------------+----------+

scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa|      200|      0.29|
| bbb|      200|      0.53|
| bbb|      300|      0.42|
| aaa|      100|      0.12|
+----+---------+----------+

scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa|    2|
| bbb|    2|
+----+-----+

scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa|      0.29|
| bbb|      0.53|
+----+----------+

scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa|      0.41|
| bbb|      0.95|
+----+----------+

scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId|        sum(score)|
+---------+------------------+
|      300|              0.42|
|      100|              0.12|
|      200|0.8200000000000001|
+---------+------------------+

Type-Preserving Grouping — groupByKey Operator

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey groups records (of type T) by the input func. It returns a KeyValueGroupedDataset to apply aggregation to.

Note
groupByKey is Dataset's experimental API.
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
|  300|       1|
|  100|       1|
|  200|       2|
+-----+--------+

import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId|               sum|
+---------+------------------+
|      100|              0.12|
|      200|0.8200000000000001|
|      300|              0.42|
+---------+------------------+

Test Setup

This is a setup for learning GroupedData. Paste it into Spark Shell using :paste.

import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
  Token("aaa", 200, 0.29) ::
  Token("bbb", 200, 0.53) ::
  Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache  (1)
  1. Cache the dataset so the following queries won’t load/recompute data over and over again.

results matching ""

    No results matching ""