Standard Functions — functions Object

org.apache.spark.sql.functions object defines many built-in functions to work with Columns in Datasets.

You can access the functions using the following import statement:

import org.apache.spark.sql.functions._

There are over 200 functions in the functions object.

scala> spark.catalog.listFunctions.count
res1: Long = 251
Table 1. (Subset of) Standard Functions in Spark SQL
Name Description

Aggregate functions

count

grouping

Indicates whether a specified column is aggregated or not

grouping_id

Computes the level of grouping

Collection functions

explode

explode_outer

(new in 2.2.0) Creates a new row for each element in the given array or map column.

If the array/map is null or empty then null is produced.

from_json

Parses a column with a JSON string into a StructType or ArrayType of StructType elements with the specified schema.

Date and time functions

current_timestamp

to_date

to_timestamp

unix_timestamp

Converts current or specified time to Unix timestamp (in seconds)

window

Generates tumbling time windows

Math functions

bin

Converts the value of a long column to binary format

Regular functions

broadcast

col and column

Creating Columns

expr

struct

String functions

split

upper

UDF functions

udf

Creating UDFs

Window functions

rank, dense_rank, percent_rank

Ranking records per window partition

ntile

Gives the ntile group if (from 1 to n inclusive) in an ordered window partition

row_number

Sequential numbering per window partition

cume_dist

Cumulative distribution of records across window partitions

lag

lead

Tip
The page gives only a brief ovierview of the many functions available in functions object and so you should read the official documentation of the functions object.

count Function

Caution
FIXME

explode_outer Function

explode_outer(e: Column): Column

explode_outer generates a new row for each element in e array or map column.

Note
Unlike explode, explode_outer generates null when the array or map is null or empty.
val arrays = Seq((1,Seq.empty[String])).toDF("id", "array")
scala> arrays.printSchema
root
 |-- id: integer (nullable = false)
 |-- array: array (nullable = true)
 |    |-- element: string (containsNull = true)
scala> arrays.select(explode_outer($"array")).show
+----+
| col|
+----+
|null|
+----+

Internally, explode_outer creates a Column with GeneratorOuter and Explode Catalyst expressions.

val explodeOuter = explode_outer($"array").expr
scala> println(explodeOuter.numberedTreeString)
00 generatorouter(explode('array))
01 +- explode('array)
02    +- 'array

explode Function

Caution
FIXME
scala> Seq(Array(0,1,2)).toDF("array").withColumn("num", explode('array)).show
+---------+---+
|    array|num|
+---------+---+
|[0, 1, 2]|  0|
|[0, 1, 2]|  1|
|[0, 1, 2]|  2|
+---------+---+
Note
explode function is an equivalent of flatMap operator for Dataset.

Ranking Records per Window Partition — rank Function

rank(): Column
dense_rank(): Column
percent_rank(): Column

rank functions assign the sequential rank of each distinct value per window partition. They are equivalent to RANK, DENSE_RANK and PERCENT_RANK functions in the good ol' SQL.

val dataset = spark.range(9).withColumn("bucket", 'id % 3)

import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)

scala> dataset.withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
|  0|     0|   1|
|  3|     0|   2|
|  6|     0|   3|
|  1|     1|   1|
|  4|     1|   2|
|  7|     1|   3|
|  2|     2|   1|
|  5|     2|   2|
|  8|     2|   3|
+---+------+----+

scala> dataset.withColumn("percent_rank", percent_rank over byBucket).show
+---+------+------------+
| id|bucket|percent_rank|
+---+------+------------+
|  0|     0|         0.0|
|  3|     0|         0.5|
|  6|     0|         1.0|
|  1|     1|         0.0|
|  4|     1|         0.5|
|  7|     1|         1.0|
|  2|     2|         0.0|
|  5|     2|         0.5|
|  8|     2|         1.0|
+---+------+------------+

rank function assigns the same rank for duplicate rows with a gap in the sequence (similarly to Olympic medal places). dense_rank is like rank for duplicate rows but compacts the ranks and removes the gaps.

// rank function with duplicates
// Note the missing/sparse ranks, i.e. 2 and 4
scala> dataset.union(dataset).withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
|  0|     0|   1|
|  0|     0|   1|
|  3|     0|   3|
|  3|     0|   3|
|  6|     0|   5|
|  6|     0|   5|
|  1|     1|   1|
|  1|     1|   1|
|  4|     1|   3|
|  4|     1|   3|
|  7|     1|   5|
|  7|     1|   5|
|  2|     2|   1|
|  2|     2|   1|
|  5|     2|   3|
|  5|     2|   3|
|  8|     2|   5|
|  8|     2|   5|
+---+------+----+

// dense_rank function with duplicates
// Note that the missing ranks are now filled in
scala> dataset.union(dataset).withColumn("dense_rank", dense_rank over byBucket).show
+---+------+----------+
| id|bucket|dense_rank|
+---+------+----------+
|  0|     0|         1|
|  0|     0|         1|
|  3|     0|         2|
|  3|     0|         2|
|  6|     0|         3|
|  6|     0|         3|
|  1|     1|         1|
|  1|     1|         1|
|  4|     1|         2|
|  4|     1|         2|
|  7|     1|         3|
|  7|     1|         3|
|  2|     2|         1|
|  2|     2|         1|
|  5|     2|         2|
|  5|     2|         2|
|  8|     2|         3|
|  8|     2|         3|
+---+------+----------+

// percent_rank function with duplicates
scala> dataset.union(dataset).withColumn("percent_rank", percent_rank over byBucket).show
+---+------+------------+
| id|bucket|percent_rank|
+---+------+------------+
|  0|     0|         0.0|
|  0|     0|         0.0|
|  3|     0|         0.4|
|  3|     0|         0.4|
|  6|     0|         0.8|
|  6|     0|         0.8|
|  1|     1|         0.0|
|  1|     1|         0.0|
|  4|     1|         0.4|
|  4|     1|         0.4|
|  7|     1|         0.8|
|  7|     1|         0.8|
|  2|     2|         0.0|
|  2|     2|         0.0|
|  5|     2|         0.4|
|  5|     2|         0.4|
|  8|     2|         0.8|
|  8|     2|         0.8|
+---+------+------------+

Cumulative Distribution of Records Across Window Partitions — cume_dist Function

cume_dist(): Column

cume_dist computes the cumulative distribution of the records in window partitions. This is equivalent to SQL’s CUME_DIST function.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("cume_dist", cume_dist over windowSpec).show
+---+------+------------------+
| id|bucket|         cume_dist|
+---+------+------------------+
|  0|     0|0.3333333333333333|
|  3|     0|0.6666666666666666|
|  6|     0|               1.0|
|  1|     1|0.3333333333333333|
|  4|     1|0.6666666666666666|
|  7|     1|               1.0|
|  2|     2|0.3333333333333333|
|  5|     2|0.6666666666666666|
|  8|     2|               1.0|
+---+------+------------------+

lag Function

lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
lag(e: Column, offset: Int, defaultValue: Any): Column

lag returns the value in e / columnName column that is offset records before the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lag", lag('id, 1) over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
|  0|     0|null|
|  3|     0|   0|
|  6|     0|   3|
|  1|     1|null|
|  4|     1|   1|
|  7|     1|   4|
|  2|     2|null|
|  5|     2|   2|
|  8|     2|   5|
+---+------+----+

scala> dataset.withColumn("lag", lag('id, 2, "<default_value>") over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
|  0|     0|null|
|  3|     0|null|
|  6|     0|   0|
|  1|     1|null|
|  4|     1|null|
|  7|     1|   1|
|  2|     2|null|
|  5|     2|null|
|  8|     2|   2|
+---+------+----+
Caution
FIXME It looks like lag with a default value has a bug — the default value’s not used at all.

lead Function

lead(columnName: String, offset: Int): Column
lead(e: Column, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
lead(e: Column, offset: Int, defaultValue: Any): Column

lead returns the value that is offset records after the current records, and defaultValue if there is less than offset records after the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lead", lead('id, 1) over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
|  0|     0|   0|
|  0|     0|   3|
|  3|     0|   3|
|  3|     0|   6|
|  6|     0|   6|
|  6|     0|null|
|  1|     1|   1|
|  1|     1|   4|
|  4|     1|   4|
|  4|     1|   7|
|  7|     1|   7|
|  7|     1|null|
|  2|     2|   2|
|  2|     2|   5|
|  5|     2|   5|
|  5|     2|   8|
|  8|     2|   8|
|  8|     2|null|
+---+------+----+

scala> dataset.withColumn("lead", lead('id, 2, "<default_value>") over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
|  0|     0|   3|
|  0|     0|   3|
|  3|     0|   6|
|  3|     0|   6|
|  6|     0|null|
|  6|     0|null|
|  1|     1|   4|
|  1|     1|   4|
|  4|     1|   7|
|  4|     1|   7|
|  7|     1|null|
|  7|     1|null|
|  2|     2|   5|
|  2|     2|   5|
|  5|     2|   8|
|  5|     2|   8|
|  8|     2|null|
|  8|     2|null|
+---+------+----+
Caution
FIXME It looks like lead with a default value has a bug — the default value’s not used at all.

Sequential numbering per window partition — row_number Function

row_number(): Column

row_number returns a sequential number starting at 1 within a window partition.

val buckets = spark.range(9).withColumn("bucket", 'id % 3)
// Make duplicates
val dataset = buckets.union(buckets)

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("row_number", row_number() over windowSpec).show
+---+------+----------+
| id|bucket|row_number|
+---+------+----------+
|  0|     0|         1|
|  0|     0|         2|
|  3|     0|         3|
|  3|     0|         4|
|  6|     0|         5|
|  6|     0|         6|
|  1|     1|         1|
|  1|     1|         2|
|  4|     1|         3|
|  4|     1|         4|
|  7|     1|         5|
|  7|     1|         6|
|  2|     2|         1|
|  2|     2|         2|
|  5|     2|         3|
|  5|     2|         4|
|  8|     2|         5|
|  8|     2|         6|
+---+------+----------+

ntile Function

ntile(n: Int): Column

ntile computes the ntile group id (from 1 to n inclusive) in an ordered window partition.

val dataset = spark.range(7).select('*, 'id % 3 as "bucket")

import org.apache.spark.sql.expressions.Window
val byBuckets = Window.partitionBy('bucket).orderBy('id)
scala> dataset.select('*, ntile(3) over byBuckets as "ntile").show
+---+------+-----+
| id|bucket|ntile|
+---+------+-----+
|  0|     0|    1|
|  3|     0|    2|
|  6|     0|    3|
|  1|     1|    1|
|  4|     1|    2|
|  2|     2|    1|
|  5|     2|    2|
+---+------+-----+
Caution
FIXME How is ntile different from rank? What about performance?

Creating Columns — col and column Functions

col(colName: String): Column
column(colName: String): Column

col and column methods create a Column that you can later use to reference a column in a dataset.

import org.apache.spark.sql.functions._

scala> val nameCol = col("name")
nameCol: org.apache.spark.sql.Column = name

scala> val cityCol = column("city")
cityCol: org.apache.spark.sql.Column = city

Defining UDFs — udf Function

udf(f: FunctionN[...]): UserDefinedFunction

The udf family of functions allows you to create user-defined functions (UDFs) based on a user-defined function in Scala. It accepts f function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f).

import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)

// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")

// apply the user-defined function to "num" column
scala> df.withColumn("len", _lengthUDF($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  1|
|  1|  1|
|  2|  1|
|  3|  1|
+---+---+

Since Spark 2.0.0, there is another variant of udf function:

udf(f: AnyRef, dataType: DataType): UserDefinedFunction

udf(f: AnyRef, dataType: DataType) allows you to use a Scala closure for the function argument (as f) and explicitly declaring the output data type (as dataType).

// given the dataframe above

import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)

scala> df.withColumn("len", byTwo($"num")).show
+---+---+
|num|len|
+---+---+
|  0|  0|
|  1|  2|
|  2|  4|
|  3|  6|
+---+---+

split Function

split(str: Column, pattern: String): Column

split function splits str column using pattern. It returns a new Column.

val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input")
val withSplit = df.withColumn("split", split($"input", "[|]"))

scala> withSplit.show
+---+-------------+----------------+
|num|        input|           split|
+---+-------------+----------------+
|  0|  hello|world|  [hello, world]|
|  1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note
.$|()[{^?*+\ are RegEx’s meta characters and are considered special.

upper Function

upper(e: Column): Column

upper function converts a string column into one with all letter upper. It returns a new Column.

Note
The following example uses two functions that accept a Column and return another to showcase how to chain them.
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name")
val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))

scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
|  0|  1|hello|OLLEH|
|  2|  3|world|DLROW|
|  2|  4|  ala|  ALA|
+---+---+-----+-----+

struct Functions

struct(cols: Column*): Column
struct(colName: String, colNames: String*): Column

struct family of functions allows you to create a new struct column based on a collection of Column or their names.

Note
The difference between struct and another similar array function is that the types of the columns can be different (in struct).
scala> df.withColumn("struct", struct($"name", $"val")).show
+---+---+-----+---------+
| id|val| name|   struct|
+---+---+-----+---------+
|  0|  1|hello|[hello,1]|
|  2|  3|world|[world,3]|
|  2|  4|  ala|  [ala,4]|
+---+---+-----+---------+

broadcast Function

broadcast[T](df: Dataset[T]): Dataset[T]

broadcast function marks the input Dataset small enough to be used in broadcast join.

val left = Seq((0, "aa"), (0, "bb")).toDF("id", "token").as[(Int, String)]
val right = Seq(("aa", 0.99), ("bb", 0.57)).toDF("token", "prob").as[(String, Double)]

scala> left.join(broadcast(right), "token").explain(extended = true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(token))
:- Project [_1#123 AS id#126, _2#124 AS token#127]
:  +- LocalRelation [_1#123, _2#124]
+- BroadcastHint
   +- Project [_1#136 AS token#139, _2#137 AS prob#140]
      +- LocalRelation [_1#136, _2#137]

== Analyzed Logical Plan ==
token: string, id: int, prob: double
Project [token#127, id#126, prob#140]
+- Join Inner, (token#127 = token#139)
   :- Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- LocalRelation [_1#123, _2#124]
   +- BroadcastHint
      +- Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- LocalRelation [_1#136, _2#137]

== Optimized Logical Plan ==
Project [token#127, id#126, prob#140]
+- Join Inner, (token#127 = token#139)
   :- Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- Filter isnotnull(_2#124)
   :     +- LocalRelation [_1#123, _2#124]
   +- BroadcastHint
      +- Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- Filter isnotnull(_1#136)
            +- LocalRelation [_1#136, _2#137]

== Physical Plan ==
*Project [token#127, id#126, prob#140]
+- *BroadcastHashJoin [token#127], [token#139], Inner, BuildRight
   :- *Project [_1#123 AS id#126, _2#124 AS token#127]
   :  +- *Filter isnotnull(_2#124)
   :     +- LocalTableScan [_1#123, _2#124]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *Project [_1#136 AS token#139, _2#137 AS prob#140]
         +- *Filter isnotnull(_1#136)
            +- LocalTableScan [_1#136, _2#137]

expr Function

expr(expr: String): Column

expr function parses the input expr SQL string to a Column it represents.

val ds = Seq((0, "hello"), (1, "world"))
  .toDF("id", "token")
  .as[(Long, String)]

scala> ds.show
+---+-----+
| id|token|
+---+-----+
|  0|hello|
|  1|world|
+---+-----+

val filterExpr = expr("token = 'hello'")

scala> ds.filter(filterExpr).show
+---+-----+
| id|token|
+---+-----+
|  0|hello|
+---+-----+

Internally, expr uses the active session’s sqlParser or creates a new SparkSqlParser to call parseExpression method.

grouping Aggregate Function

grouping(e: Column): Column
grouping(columnName: String): Column  (1)
  1. Calls the first grouping with columnName as a Column

grouping is an aggregate function that indicates whether a specified column is aggregated or not and:

  • returns 1 if the column is in a subtotal and is NULL

  • returns 0 if the underlying value is NULL or any other value

Note
grouping can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).

From Hive’s documentation about Grouping__ID function (that can somehow help to understand grouping):

When aggregates are displayed for a column its value is null. This may conflict in case the column itself has some null values. There needs to be some way to identify NULL in column, which means aggregate and NULL in column, which means value. GROUPING__ID function is the solution to that.

val tmpWorkshops = Seq(
  ("Warsaw", 2016, 2),
  ("Toronto", 2016, 4),
  ("Toronto", 2017, 1)).toDF("city", "year", "count")

// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
  (null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")

val workshops = tmpWorkshops union cityNull

scala> workshops.show
+-------+----+-----+
|   city|year|count|
+-------+----+-----+
| Warsaw|2016|    2|
|Toronto|2016|    4|
|Toronto|2017|    1|
|   null|2016|    2|
+-------+----+-----+

val q = workshops
  .cube("city", "year")
  .agg(grouping("city"), grouping("year")) // <-- grouping here
  .sort($"city".desc_nulls_last, $"year".desc_nulls_last)

scala> q.show
+-------+----+--------------+--------------+
|   city|year|grouping(city)|grouping(year)|
+-------+----+--------------+--------------+
| Warsaw|2016|             0|             0|
| Warsaw|null|             0|             1|
|Toronto|2017|             0|             0|
|Toronto|2016|             0|             0|
|Toronto|null|             0|             1|
|   null|2017|             1|             0|
|   null|2016|             1|             0|
|   null|2016|             0|             0|  <-- null is city
|   null|null|             0|             1|  <-- null is city
|   null|null|             1|             1|
+-------+----+--------------+--------------+

Internally, grouping creates a Column with Grouping expression.

val q = workshops.cube("city", "year").agg(grouping("city"))
scala> println(q.queryExecution.logical)
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping('city) AS grouping(city)#705]
+- Union
   :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
   :  +- LocalRelation [_1#178, _2#179, _3#180]
   +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
      +- LocalRelation [_1#192, _2#193, _3#194]

scala> println(q.queryExecution.analyzed)
Aggregate [city#724, year#725, spark_grouping_id#721], [city#724, year#725, cast((shiftright(spark_grouping_id#721, 1) & 1) as tinyint) AS grouping(city)#720]
+- Expand [List(city#182, year#183, count#184, city#722, year#723, 0), List(city#182, year#183, count#184, city#722, null, 1), List(city#182, year#183, count#184, null, year#723, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#724, year#725, spark_grouping_id#721]
   +- Project [city#182, year#183, count#184, city#182 AS city#722, year#183 AS year#723]
      +- Union
         :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
         :  +- LocalRelation [_1#178, _2#179, _3#180]
         +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
            +- LocalRelation [_1#192, _2#193, _3#194]

grouping_id Aggregate Function

grouping_id(cols: Column*): Column
grouping_id(colName: String, colNames: String*): Column (1)
  1. Calls the first grouping_id with colName and colNames as objects of type Column

grouping_id is an aggregate function that computes the level of grouping:

  • 0 for combinations of each column

  • 1 for subtotals of column 1

  • 2 for subtotals of column 2

  • And so on…

val tmpWorkshops = Seq(
  ("Warsaw", 2016, 2),
  ("Toronto", 2016, 4),
  ("Toronto", 2017, 1)).toDF("city", "year", "count")

// there seems to be a bug with nulls
// and so the need for the following union
val cityNull = Seq(
  (null.asInstanceOf[String], 2016, 2)).toDF("city", "year", "count")

val workshops = tmpWorkshops union cityNull

scala> workshops.show
+-------+----+-----+
|   city|year|count|
+-------+----+-----+
| Warsaw|2016|    2|
|Toronto|2016|    4|
|Toronto|2017|    1|
|   null|2016|    2|
+-------+----+-----+

val query = workshops
  .cube("city", "year")
  .agg(grouping_id()) // <-- all grouping columns used
  .sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> query.show
+-------+----+-------------+
|   city|year|grouping_id()|
+-------+----+-------------+
| Warsaw|2016|            0|
| Warsaw|null|            1|
|Toronto|2017|            0|
|Toronto|2016|            0|
|Toronto|null|            1|
|   null|2017|            2|
|   null|2016|            2|
|   null|2016|            0|
|   null|null|            1|
|   null|null|            3|
+-------+----+-------------+

scala> spark.catalog.listFunctions.filter(_.name.contains("grouping_id")).show(false)
+-----------+--------+-----------+----------------------------------------------------+-----------+
|name       |database|description|className                                           |isTemporary|
+-----------+--------+-----------+----------------------------------------------------+-----------+
|grouping_id|null    |null       |org.apache.spark.sql.catalyst.expressions.GroupingID|true       |
+-----------+--------+-----------+----------------------------------------------------+-----------+

// bin function gives the string representation of the binary value of the given long column
scala> query.withColumn("bitmask", bin($"grouping_id()")).show
+-------+----+-------------+-------+
|   city|year|grouping_id()|bitmask|
+-------+----+-------------+-------+
| Warsaw|2016|            0|      0|
| Warsaw|null|            1|      1|
|Toronto|2017|            0|      0|
|Toronto|2016|            0|      0|
|Toronto|null|            1|      1|
|   null|2017|            2|     10|
|   null|2016|            2|     10|
|   null|2016|            0|      0|  <-- null is city
|   null|null|            3|     11|
|   null|null|            1|      1|
+-------+----+-------------+-------+

The list of columns of grouping_id should match grouping columns (in cube or rollup) exactly, or empty which means all the grouping columns (which is exactly what the function expects).

Note
grouping_id can only be used with cube, rollup or GROUPING SETS multi-dimensional aggregate operators (and is verified when Analyzer does check analysis).
Note
Spark SQL’s grouping_id function is known as grouping__id in Hive.

When aggregates are displayed for a column its value is null. This may conflict in case the column itself has some null values. There needs to be some way to identify NULL in column, which means aggregate and NULL in column, which means value. GROUPING__ID function is the solution to that.

Internally, grouping_id() creates a Column with GroupingID unevaluable expression.

Note
Unevaluable expressions are expressions replaced by some other expressions during analysis or optimization.
// workshops dataset was defined earlier
val q = workshops
  .cube("city", "year")
  .agg(grouping_id())

// grouping_id function is spark_grouping_id virtual column internally
// that is resolved during analysis - see Analyzed Logical Plan
scala> q.explain(true)
== Parsed Logical Plan ==
'Aggregate [cube(city#182, year#183)], [city#182, year#183, grouping_id() AS grouping_id()#742]
+- Union
   :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
   :  +- LocalRelation [_1#178, _2#179, _3#180]
   +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
      +- LocalRelation [_1#192, _2#193, _3#194]

== Analyzed Logical Plan ==
city: string, year: int, grouping_id(): int
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#182, year#183, count#184, city#755, year#756, 0), List(city#182, year#183, count#184, city#755, null, 1), List(city#182, year#183, count#184, null, year#756, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, count#184, city#757, year#758, spark_grouping_id#754]
   +- Project [city#182, year#183, count#184, city#182 AS city#755, year#183 AS year#756]
      +- Union
         :- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
         :  +- LocalRelation [_1#178, _2#179, _3#180]
         +- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
            +- LocalRelation [_1#192, _2#193, _3#194]

== Optimized Logical Plan ==
Aggregate [city#757, year#758, spark_grouping_id#754], [city#757, year#758, spark_grouping_id#754 AS grouping_id()#742]
+- Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
   +- Union
      :- LocalRelation [city#755, year#756]
      +- LocalRelation [city#755, year#756]

== Physical Plan ==
*HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, grouping_id()#742])
+- Exchange hashpartitioning(city#757, year#758, spark_grouping_id#754, 200)
   +- *HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=[city#757, year#758, spark_grouping_id#754])
      +- *Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, year#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
         +- Union
            :- LocalTableScan [city#755, year#756]
            +- LocalTableScan [city#755, year#756]

Parsing Column With JSON-Encoded Records — from_json Functions

from_json(e: Column, schema: DataType): Column  (1)
from_json(
  e: Column,
  schema: DataType,
  options: Map[String, String]): Column
  1. Relays to the other from_json with empty options

Parses a column with a JSON string into a StructType or ArrayType of StructType elements with the specified schema.

Note
options controls how a JSON is parsed and contains the same options as the json data source.

Internally, from_json creates a Column with JsonToStructs unary expression.

val jsons = Seq("""{ "id": 0 }""").toDF("json")

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", IntegerType, nullable = false) :: Nil)

scala> jsons.select(from_json($"json", schema) as "ids").show
+---+
|ids|
+---+
|[0]|
+---+
Note
from_json corresponds to SQL’s from_json.

Converting Long to Binary Format (in String Representation) — bin Function

bin(e: Column): Column
bin(columnName: String): Column (1)
  1. Calls the first bin with columnName as a Column

bin converts the long value in a column to its binary format (i.e. as an unsigned integer in base 2) with no extra leading 0s.

scala> spark.range(5).withColumn("binary", bin('id)).show
+---+------+
| id|binary|
+---+------+
|  0|     0|
|  1|     1|
|  2|    10|
|  3|    11|
|  4|   100|
+---+------+

val withBin = spark.range(5).withColumn("binary", bin('id))
scala> withBin.printSchema
root
 |-- id: long (nullable = false)
 |-- binary: string (nullable = false)

Internally, bin creates a Column with Bin unary expression.

scala> withBin.queryExecution.logical
res2: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Project [*, bin('id) AS binary#14]
+- Range (0, 5, step=1, splits=Some(8))
Note
Bin unary expression uses java.lang.Long.toBinaryString for the conversion.
Note

Bin expression supports code generation (aka CodeGen).

val withBin = spark.range(5).withColumn("binary", bin('id))
scala> withBin.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#19L, bin(id#19L) AS binary#22]
+- *Range (0, 5, step=1, splits=Some(8))
...
/* 103 */           UTF8String project_value1 = null;
/* 104 */           project_value1 = UTF8String.fromString(java.lang.Long.toBinaryString(range_value));

results matching ""

    No results matching ""