val q = // your query here
q.explain
SparkPlan — Physical Query Plan / Physical Operator
SparkPlan
is the base Catalyst query plan for physical operators that used (composed) together build a physical query plan (aka query execution plan).
Note
|
A physical operator is a Catalyst node that may have zero or more children. Spark SQL uses Catalyst (tree manipulation framework) to compose nodes to build a tree that, in this context, translates to composing physical plan nodes to build a physical plan tree. |
When executed, a physical operator produces an RDD of rows (in the internal binary row format).
Note
|
execute is called when QueryExecution is requested for an RDD which happens exactly when your query is executed.
|
Tip
|
Use explain operator to see the execution plan of a structured query. You may also access the execution plan of a
|
The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which are executed when the physical operator is executed.
Caution
|
FIXME A picture with methods/hooks called. |
Caution
|
FIXME SparkPlan is Serializable . Why?
|
Name | Description |
---|---|
|
|
|
SparkPlan
has the following final
methods that prepare execution environment and pass calls on to corresponding methods (that constitute SparkPlan Contract).
Name | Description | ||
---|---|---|---|
Executes a physical operator that generates an
Used most importantly when Internally,
|
|||
Prepares a query for execution. Internally, |
|||
Calls doExecuteBroadcast |
Name | Description |
---|---|
Binary physical operator with two child |
|
Leaf physical operator with no children By default, the set of all attributes that are produced is exactly the set of attributes that are output. |
|
Unary physical operator with one |
Note
|
The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.
|
decodeUnsafeRows
Method
Caution
|
FIXME |
prepareSubqueries
Method
Caution
|
FIXME |
waitForSubqueries
Method
Caution
|
FIXME |
executeToIterator
Method
Caution
|
FIXME |
SparkPlan Contract
SparkPlan
contract requires that concrete physical operators define their own custom doExecute
.
doExecute(): RDD[InternalRow]
doExecute
produces the result of a structured query as an RDD
of internal binary rows.
Name | Description |
---|---|
By default reports a
Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable. |
|
Prepares a physical operator for execution. Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast asynchronously). |
|
Specifies how data is partitioned across different nodes in the cluster |
|
Required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators' output is split across partitions.
Defaults to Used exclusively when |
|
Specifies required sort ordering for each partition requirement (from children operators)
Defaults to no sort ordering for all of the physical operator’s children. Used exclusively when |
Executing Query in Scope (after Preparations) — executeQuery
Final Method
executeQuery[T](query: => T): T
executeQuery
executes query
in a scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).
Internally, executeQuery
calls prepare and waitForSubqueries followed by executing query
.
Note
|
executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport -enabled physical operator produces a Java source code.
|
Broadcasting Result of Structured Query — executeBroadcast
Final Method
executeBroadcast[T](): broadcast.Broadcast[T]
executeBroadcast
returns the result of a structured query as a broadcast variable.
Internally, executeBroadcast
calls doExecuteBroadcast inside executeQuery.
Note
|
executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.
|
metrics Internal Registry
metrics: Map[String, SQLMetric] = Map.empty
metrics
is a registry of supported SQLMetrics by their names.
Taking First N UnsafeRows — executeTake
Method
executeTake(n: Int): Array[InternalRow]
executeTake
gives an array of up to n
first internal rows.
Internally, executeTake
gets an RDD of byte array of n
unsafe rows and scans the RDD partitions one by one until n
is reached or all partitions were processed.
executeTake
runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).
Note
|
executeTake uses SparkContext.runJob to run a Spark job.
|
In the end, executeTake
decodes the unsafe rows.
Note
|
executeTake gives an empty collection when n is 0 (and no Spark job is executed).
|
Note
|
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).
|
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 10)
// 8 groups over 10 partitions
// only 7 partitions are with numbers
val nums = spark.
range(start = 0, end = 20, step = 1, numPartitions = 4).
repartition($"id" % 8)
import scala.collection.Iterator
val showElements = (it: Iterator[java.lang.Long]) => {
val ns = it.toSeq
import org.apache.spark.TaskContext
val pid = TaskContext.get.partitionId
println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
}
// ordered by partition id manually for demo purposes
scala> nums.foreachPartition(showElements)
[partition: 0][size: 2] 4 12
[partition: 1][size: 2] 7 15
[partition: 2][size: 0]
[partition: 3][size: 0]
[partition: 4][size: 0]
[partition: 5][size: 5] 0 6 8 14 16
[partition: 6][size: 0]
[partition: 7][size: 3] 3 11 19
[partition: 8][size: 5] 2 5 10 13 18
[partition: 9][size: 3] 1 9 17
scala> println(spark.sessionState.conf.limitScaleUpFactor)
4
// Think how many Spark jobs will the following queries run?
// Answers follow
scala> nums.take(13)
res0: Array[Long] = Array(4, 12, 7, 15, 0, 6, 8, 14, 16, 3, 11, 19, 2)
// The number of Spark jobs = 3
scala> nums.take(5)
res34: Array[Long] = Array(4, 12, 7, 15, 0)
// The number of Spark jobs = 4
scala> nums.take(3)
res38: Array[Long] = Array(4, 12, 7)
// The number of Spark jobs = 2
Note
|
|