val ds: Dataset[Long] = ...
val queryExec = ds.queryExecution
QueryExecution — Query Execution of Dataset
QueryExecution represents the structured query execution pipeline of a Dataset.
|
Note
|
When an action of a Dataset is executed, that triggers an execution of QueryExecution (in the form of calling toRdd) which will morph itself into a RDD of binary rows, i.e. RDD[InternalRow].
|
You can access the QueryExecution of a Dataset using queryExecution attribute.
QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a logical operator or use the QueryExecution after executing a logical operator).
| Attribute / Phase | Description | ||||
|---|---|---|---|---|---|
Analyzed logical plan that has passed Analyzer's check rules.
|
|||||
|
|||||
Optimized logical plan being the result of executing the session-owned Catalyst Query Optimizer to withCachedData. |
|||||
Physical plan (after SparkPlanner has planned the optimized logical plan).
|
|||||
Physical plan ready for execution (i.e. sparkPlan after physical optimization rules applied).
|
|||||
|
You can access the lazy attributes as follows:
val dataset: Dataset[Long] = ...
dataset.queryExecution.executedPlan
| Name | Description |
|---|---|
QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan attribute.
A streaming variant of QueryExecution is IncrementalExecution.
|
Tip
|
Use explain operator to know about the logical and physical plans of a Dataset.
|
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]
== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Physical Plan ==
WholeStageCodegen
: +- Range 0, 1, 8, 5, [id#39L]
|
Note
|
QueryExecution belongs to org.apache.spark.sql.execution package.
|
|
Note
|
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
|
simpleString Method
|
Caution
|
FIXME |
debug Object
|
Caution
|
FIXME |
Building Complete Text Representation — completeString Internal Method
|
Caution
|
FIXME |
Physical Plan Preparation Rules — preparations Method
preparations is a sequence of physical plan preparation rules (i.e. Rule[SparkPlan]).
|
Tip
|
A SparkPlan preparation rule transforms a physical plan to another (possibly more efficient).
|
preparations is one of the final phases of query execution that Spark developers could use for further query optimizations.
The current list of SparkPlan transformations in preparations is as follows:
-
ExtractPythonUDFs -
PlanSubqueries -
ReuseExchange -
ReuseSubquery
|
Note
|
The physical preparation rules are applied sequentially in order to the physical plan before execution, i.e. they generate a SparkPlan when executedPlan lazy value is first accessed (and is cached afterwards).
|
Executing preparations Physical Plan Rules — prepareForExecution Method
prepareForExecution(plan: SparkPlan): SparkPlan
prepareForExecution takes preparations rules and applies them one by one to the input plan.
|
Note
|
prepareForExecution is used exclusively when QueryExecution prepares physical plan for execution.
|
IncrementalExecution
IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.
It lives in org.apache.spark.sql.execution.streaming package.
|
Caution
|
FIXME What is stateStrategy?
|
Stateful operators in the query plan are numbered using operatorId that starts with 0.
IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.
|
Caution
|
FIXME What does IncrementalExecution do? Where is it used?
|
Creating Analyzed Logical Plan and Checking Correctness — assertAnalyzed Method
assertAnalyzed(): Unit
assertAnalyzed triggers initialization of analyzed (which is almost like executing it).
|
Note
|
assertAnalyzed executes analyzed by accessing it and throwing the result away. Since analyzed is a lazy value in Scala, it will then get initialized for the first time and stays so forever.
|
assertAnalyzed then requests Analyzer to check the correctness of the analysis of the LogicalPlan (i.e. analyzed).
|
Note
|
In Scala the access path looks as follows.
|
In case of any AnalysisException, assertAnalyzed creates a new AnalysisException to make sure that it holds analyzed and reports it.
|
Note
|
|
Building Extended Text Representation with Logical and Physical Plans — toString Method
toString: String
toString is a mere alias for completeString with appendStats flag disabled.
|
Note
|
toString is on the "other" side of toStringWithStats which has appendStats flag enabled.
|
|
Note
|
toString is used when…FIXME
|
Building Text Representation with Cost Stats — toStringWithStats Method
toStringWithStats: String
toStringWithStats is a mere alias for completeString with appendStats flag enabled.
|
Note
|
toStringWithStats is a custom toString with cost statistics.
|
// test dataset
val dataset = spark.range(20).limit(2)
// toStringWithStats in action - note Optimized Logical Plan section with Statistics
scala> dataset.queryExecution.toStringWithStats
res6: String =
== Parsed Logical Plan ==
GlobalLimit 2
+- LocalLimit 2
+- Range (0, 20, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
GlobalLimit 2
+- LocalLimit 2
+- Range (0, 20, step=1, splits=Some(8))
== Optimized Logical Plan ==
GlobalLimit 2, Statistics(sizeInBytes=32.0 B, rowCount=2, isBroadcastable=false)
+- LocalLimit 2, Statistics(sizeInBytes=160.0 B, isBroadcastable=false)
+- Range (0, 20, step=1, splits=Some(8)), Statistics(sizeInBytes=160.0 B, isBroadcastable=false)
== Physical Plan ==
CollectLimit 2
+- *Range (0, 20, step=1, splits=Some(8))
|
Note
|
toStringWithStats is used exclusively when ExplainCommand is executed (only when cost attribute is enabled).
|
Transforming SparkPlan Execution Result to Hive-Compatible Output Format — hiveResultString Method
hiveResultString(): Seq[String]
hiveResultString returns the result as a Hive-compatible output format.
scala> spark.range(5).queryExecution.hiveResultString
res0: Seq[String] = ArrayBuffer(0, 1, 2, 3, 4)
scala> spark.read.csv("people.csv").queryExecution.hiveResultString
res4: Seq[String] = ArrayBuffer(id name age, 0 Jacek 42)
Internally, hiveResultString transformation the SparkPlan.
| SparkPlan | Description |
|---|---|
ExecutedCommandExec for |
Executes |
ExecutedCommandExec for |
Executes |
Any other SparkPlan |
Executes |
|
Note
|
hiveResultString is used exclusively when SparkSQLDriver (of ThriftServer) runs a command.
|