import org.apache.spark.sql.expressions.Window
val byValueDesc = Window.partitionBy("value").orderBy($"value".desc)
val query = table.withColumn(
"count over window", count("*") over byValueDesc)
import org.apache.spark.sql.catalyst.expressions.WindowExpression
val windowExpr = query.queryExecution
.logical
.expressions(1)
.children(0)
.asInstanceOf[WindowExpression]
scala> windowExpr.windowSpec
res0: org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition = windowspecdefinition('value, 'value DESC NULLS LAST, UnspecifiedFrame)
WindowSpecDefinition Unevaluable Expression
WindowSpecDefinition
is an unevaluable expression (i.e. with no support for eval
and doGenCode
methods).
WindowSpecDefinition
is created for a window specification in a SQL query or Column
's over operator.
WindowSpecDefinition
contains the following:
-
Window partition specification expressions
import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition
Seq((0, "hello"), (1, "windows"))
.toDF("id", "token")
.createOrReplaceTempView("mytable")
val sqlText = """
SELECT count(*) OVER myWindowSpec
FROM mytable
WINDOW
myWindowSpec AS (
PARTITION BY token
ORDER BY id
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
"""
import spark.sessionState.{analyzer,sqlParser}
scala> val parsedPlan = sqlParser.parsePlan(sqlText)
parsedPlan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'WithWindowDefinition Map(myWindowSpec -> windowspecdefinition('token, 'id ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
+- 'Project [unresolvedalias(unresolvedwindowexpression('count(1), WindowSpecReference(myWindowSpec)), None)]
+- 'UnresolvedRelation `mytable`
import org.apache.spark.sql.catalyst.plans.logical.WithWindowDefinition
val myWindowSpec = parsedPlan.asInstanceOf[WithWindowDefinition].windowDefinitions("myWindowSpec")
scala> println(myWindowSpec)
windowspecdefinition('token, 'id ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
scala> println(myWindowSpec.sql)
(PARTITION BY `token` ORDER BY `id` ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
scala> sql(sqlText)
res4: org.apache.spark.sql.DataFrame = [count(1) OVER (PARTITION BY token ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW): bigint]
scala> println(analyzer.execute(sqlParser.parsePlan(sqlText)))
Project [count(1) OVER (PARTITION BY token ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#25L]
+- Project [token#13, id#12, count(1) OVER (PARTITION BY token ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#25L, count(1) OVER (PARTITION BY token ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#25L]
+- Window [count(1) windowspecdefinition(token#13, id#12 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count(1) OVER (PARTITION BY token ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#25L], [token#13], [id#12 ASC NULLS FIRST]
+- Project [token#13, id#12]
+- SubqueryAlias mytable
+- Project [_1#9 AS id#12, _2#10 AS token#13]
+- LocalRelation [_1#9, _2#10]
Name | Description |
---|---|
Window partition and order specifications (for which |
|
|
Unsupported (i.e. reports a |
|
Disabled (i.e. |
|
Enabled (i.e. |
|
Enabled when children are and the input DataType is valid and the input frameSpecification is a |
|
Contains (PARTITION BY `token` ORDER BY `id` ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) |