val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution
QueryExecution — Query Execution of Dataset
QueryExecution
represents the structured query execution pipeline of a Dataset.
Note
|
When an action of a Dataset is executed, that triggers an execution of QueryExecution (in the form of calling toRdd) which will morph itself into a RDD of binary rows, i.e. RDD[InternalRow] .
|
You can access the QueryExecution
of a Dataset
using queryExecution attribute.
QueryExecution
is the result of executing a LogicalPlan
in a SparkSession
(and so you could create a Dataset
from a logical operator or use the QueryExecution
after executing a logical operator).
Attribute / Phase | Description | ||||
---|---|---|---|---|---|
Analyzed logical plan that has passed Analyzer's check rules.
|
|||||
|
|||||
Optimized logical plan being the result of executing the session-owned Catalyst Query Optimizer to withCachedData. |
|||||
Physical plan (after SparkPlanner has planned the optimized logical plan).
|
|||||
Physical plan ready for execution (i.e. sparkPlan after physical optimization rules applied).
|
|||||
|
You can access the lazy attributes as follows:
val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan
Name | Description |
---|---|
QueryExecution
uses the input SparkSession
to access the current SparkPlanner (through SessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan
exactly) using the planner. It is available as the sparkPlan
attribute.
A streaming variant of QueryExecution
is IncrementalExecution.
Tip
|
Use explain operator to know about the logical and physical plans of a Dataset .
|
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]
== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Physical Plan ==
WholeStageCodegen
: +- Range 0, 1, 8, 5, [id#39L]
Note
|
QueryExecution belongs to org.apache.spark.sql.execution package.
|
Note
|
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
|
simpleString
Method
Caution
|
FIXME |
debug Object
Caution
|
FIXME |
Building Complete Text Representation — completeString
Internal Method
Caution
|
FIXME |
Physical Plan Preparation Rules — preparations
Method
preparations
is a sequence of physical plan preparation rules (i.e. Rule[SparkPlan]
).
Tip
|
A SparkPlan preparation rule transforms a physical plan to another (possibly more efficient).
|
preparations
is one of the final phases of query execution that Spark developers could use for further query optimizations.
The current list of SparkPlan
transformations in preparations
is as follows:
-
ExtractPythonUDFs
-
PlanSubqueries
-
ReuseExchange
-
ReuseSubquery
Note
|
The physical preparation rules are applied sequentially in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is first accessed (and is cached afterwards).
|
Executing preparations Physical Plan Rules — prepareForExecution
Method
prepareForExecution(plan: SparkPlan): SparkPlan
prepareForExecution
takes preparations rules and applies them one by one to the input plan
.
Note
|
prepareForExecution is used exclusively when QueryExecution prepares physical plan for execution.
|
IncrementalExecution
IncrementalExecution
is a custom QueryExecution
with OutputMode
, checkpointLocation
, and currentBatchId
.
It lives in org.apache.spark.sql.execution.streaming
package.
Caution
|
FIXME What is stateStrategy ?
|
Stateful operators in the query plan are numbered using operatorId
that starts with 0
.
IncrementalExecution
adds one Rule[SparkPlan]
called state
to preparations sequence of rules as the first element.
Caution
|
FIXME What does IncrementalExecution do? Where is it used?
|
Creating Analyzed Logical Plan and Checking Correctness — assertAnalyzed
Method
assertAnalyzed(): Unit
assertAnalyzed
triggers initialization of analyzed (which is almost like executing it).
Note
|
assertAnalyzed executes analyzed by accessing it and throwing the result away. Since analyzed is a lazy value in Scala, it will then get initialized for the first time and stays so forever.
|
assertAnalyzed
then requests Analyzer
to check the correctness of the analysis of the LogicalPlan (i.e. analyzed
).
Note
|
In Scala the access path looks as follows.
|
In case of any AnalysisException
, assertAnalyzed
creates a new AnalysisException
to make sure that it holds analyzed and reports it.
Note
|
|
Building Extended Text Representation with Logical and Physical Plans — toString
Method
toString: String
toString
is a mere alias for completeString with appendStats
flag disabled.
Note
|
toString is on the "other" side of toStringWithStats which has appendStats flag enabled.
|
Note
|
toString is used when…FIXME
|
Building Text Representation with Cost Stats — toStringWithStats
Method
toStringWithStats: String
toStringWithStats
is a mere alias for completeString with appendStats
flag enabled.
Note
|
toStringWithStats is a custom toString with cost statistics.
|
// test dataset
val dataset = spark.range(20).limit(2)
// toStringWithStats in action - note Optimized Logical Plan section with Statistics
scala> dataset.queryExecution.toStringWithStats
res6: String =
== Parsed Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
+- Range (0, 20, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
GlobalLimit 2
+- LocalLimit 2
+- Range (0, 20, step=1, splits=Some(8))
== Optimized Logical Plan ==
GlobalLimit 2, Statistics(sizeInBytes=32.0 B, rowCount=2, isBroadcastable=false)
+- LocalLimit 2, Statistics(sizeInBytes=160.0 B, isBroadcastable=false)
+- Range (0, 20, step=1, splits=Some(8)), Statistics(sizeInBytes=160.0 B, isBroadcastable=false)
== Physical Plan ==
CollectLimit 2
+- *Range (0, 20, step=1, splits=Some(8))
Note
|
toStringWithStats is used exclusively when ExplainCommand is executed (only when cost attribute is enabled).
|
Transforming SparkPlan Execution Result to Hive-Compatible Output Format — hiveResultString
Method
hiveResultString(): Seq[String]
hiveResultString
returns the result as a Hive-compatible output format.
scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)
scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id name age, 0 Jacek 42)
Internally, hiveResultString
transformation the SparkPlan.
SparkPlan | Description |
---|---|
ExecutedCommandExec for |
Executes |
ExecutedCommandExec for |
Executes |
Any other SparkPlan |
Executes |
Note
|
hiveResultString is used exclusively when SparkSQLDriver (of ThriftServer) runs a command.
|