QueryExecution — Query Execution of Dataset

QueryExecution represents the structured query execution pipeline of a Dataset.

Note
When an action of a Dataset is executed, that triggers an execution of QueryExecution (in the form of calling toRdd) which will morph itself into a RDD of binary rows, i.e. RDD[InternalRow].

You can access the QueryExecution of a Dataset using queryExecution attribute.

val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).

Table 1. QueryExecution’s (Lazily-Initialized) Attributes (aka Structured Query Execution Pipeline)
Attribute / Phase Description

analyzed

Analyzed logical plan that has passed Analyzer's check rules.

val schema = queryExecution.analyzed.output
Tip
Use Dataset’s explain(extended = true) or SQL’s EXPLAIN EXTENDED to see the analyzed logical plan of a structured query.

withCachedData

LogicalPlan that is the analyzed plan after being analyzed, checked (for unsupported operations) and replaced with cached segments.

optimizedPlan

Optimized logical plan being the result of executing the session-owned Catalyst Query Optimizer to withCachedData.

sparkPlan

Note
sparkPlan is the first physical plan from the collection of all possible physical plans.
Note
It is guaranteed that Catalyst’s QueryPlanner (which SparkPlanner extends) will always generate at least one physical plan.

executedPlan

Physical plan ready for execution (i.e. sparkPlan after physical optimization rules applied).

Note
executedPlan is the phase when CollapseCodegenStages physical preparation rule is executed to collapse physical operators that support code generation together as a WholeStageCodegenExec operator.

toRdd

RDD of binary rows (after executing the executedPlan).

Note

toRdd is a "boundary" between two Spark modules: Spark SQL and Spark Core.

After you have executed toRdd (directly or not), you basically "leave" Spark SQL’s Datasets and "enter" Spark Core’s RDD space.

You can access the lazy attributes as follows:

val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan
Table 2. QueryExecution’s Properties (in alphabetical order)
Name Description

planner

SparkPlanner

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.

A streaming variant of QueryExecution is IncrementalExecution.

Tip
Use explain operator to know about the logical and physical plans of a Dataset.
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]

== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Physical Plan ==
WholeStageCodegen
:  +- Range 0, 1, 8, 5, [id#39L]
Note
QueryExecution belongs to org.apache.spark.sql.execution package.
Note
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.

simpleString Method

Caution
FIXME

debug Object

Caution
FIXME

Building Complete Text Representation — completeString Internal Method

Caution
FIXME

Creating QueryExecution Instance

QueryExecution takes the following when created:

Physical Plan Preparation Rules — preparations Method

preparations is a sequence of physical plan preparation rules (i.e. Rule[SparkPlan]).

Tip
A SparkPlan preparation rule transforms a physical plan to another (possibly more efficient).

preparations is one of the final phases of query execution that Spark developers could use for further query optimizations.

The current list of SparkPlan transformations in preparations is as follows:

  1. ExtractPythonUDFs

  2. PlanSubqueries

  3. EnsureRequirements

  4. CollapseCodegenStages

  5. ReuseExchange

  6. ReuseSubquery

Note
The physical preparation rules are applied sequentially in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is first accessed (and is cached afterwards).

Executing preparations Physical Plan Rules — prepareForExecution Method

prepareForExecution(plan: SparkPlan): SparkPlan

prepareForExecution takes preparations rules and applies them one by one to the input plan.

Note
prepareForExecution is used exclusively when QueryExecution prepares physical plan for execution.

IncrementalExecution

IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.

It lives in org.apache.spark.sql.execution.streaming package.

Caution
FIXME What is stateStrategy?

Stateful operators in the query plan are numbered using operatorId that starts with 0.

IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.

Caution
FIXME What does IncrementalExecution do? Where is it used?

Creating Analyzed Logical Plan and Checking Correctness — assertAnalyzed Method

assertAnalyzed(): Unit

assertAnalyzed triggers initialization of analyzed (which is almost like executing it).

Note
assertAnalyzed executes analyzed by accessing it and throwing the result away. Since analyzed is a lazy value in Scala, it will then get initialized for the first time and stays so forever.

assertAnalyzed then requests Analyzer to check the correctness of the analysis of the LogicalPlan (i.e. analyzed).

Note

assertAnalyzed uses SparkSession to access the current SessionState that it then uses to access the Analyzer.

In Scala the access path looks as follows.

sparkSession.sessionState.analyzer

In case of any AnalysisException, assertAnalyzed creates a new AnalysisException to make sure that it holds analyzed and reports it.

Note

assertAnalyzed is used when:

Building Extended Text Representation with Logical and Physical Plans — toString Method

toString: String

toString is a mere alias for completeString with appendStats flag disabled.

Note
toString is on the "other" side of toStringWithStats which has appendStats flag enabled.
Note
toString is used when…​FIXME

Building Text Representation with Cost Stats — toStringWithStats Method

toStringWithStats: String

toStringWithStats is a mere alias for completeString with appendStats flag enabled.

Note
toStringWithStats is a custom toString with cost statistics.
// test dataset
val dataset = spark.range(20).limit(2)

// toStringWithStats in action - note Optimized Logical Plan section with Statistics
scala> dataset.queryExecution.toStringWithStats
res6: String =
== Parsed Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
   +- Range (0, 20, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
GlobalLimit 2
+- LocalLimit 2
   +- Range (0, 20, step=1, splits=Some(8))

== Optimized Logical Plan ==
GlobalLimit 2, Statistics(sizeInBytes=32.0 B, rowCount=2, isBroadcastable=false)
+- LocalLimit 2, Statistics(sizeInBytes=160.0 B, isBroadcastable=false)
   +- Range (0, 20, step=1, splits=Some(8)), Statistics(sizeInBytes=160.0 B, isBroadcastable=false)

== Physical Plan ==
CollectLimit 2
+- *Range (0, 20, step=1, splits=Some(8))
Note
toStringWithStats is used exclusively when ExplainCommand is executed (only when cost attribute is enabled).

Transforming SparkPlan Execution Result to Hive-Compatible Output Format — hiveResultString Method

hiveResultString(): Seq[String]

hiveResultString returns the result as a Hive-compatible output format.

scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)

scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id	name	age, 0	Jacek	42)

Internally, hiveResultString transformation the SparkPlan.

Table 3. hiveResultString’s SparkPlan Transformations (in execution order)
SparkPlan Description

ExecutedCommandExec for DescribeTableCommand

Executes DescribeTableCommand and transforms every Row to a Hive-compatible output format.

ExecutedCommandExec for ShowTablesCommand

Executes ExecutedCommandExec and transforms the result to a collection of table names.

Any other SparkPlan

Executes SparkPlan and transforms the result to a Hive-compatible output format.

Note
hiveResultString is used exclusively when SparkSQLDriver (of ThriftServer) runs a command.

results matching ""

    No results matching ""