SparkPlan — Physical Query Plan / Physical Operator

SparkPlan is the base Catalyst query plan for physical operators that used (composed) together build a physical query plan (aka query execution plan).

Note

A physical operator is a Catalyst node that may have zero or more children.

Spark SQL uses Catalyst (tree manipulation framework) to compose nodes to build a tree that, in this context, translates to composing physical plan nodes to build a physical plan tree.

When executed, a physical operator produces an RDD of rows (in the internal binary row format).

Note
execute is called when QueryExecution is requested for an RDD which happens exactly when your query is executed.
Tip

Use explain operator to see the execution plan of a structured query.

val q = // your query here
q.explain

You may also access the execution plan of a Dataset using its queryExecution property.

val q = // your query here
q.queryExecution.sparkPlan

The SparkPlan contract assumes that concrete physical operators define doExecute method (with optional hooks like doPrepare) which are executed when the physical operator is executed.

Caution
FIXME A picture with methods/hooks called.
Caution
FIXME SparkPlan is Serializable. Why?
Table 1. SparkPlan’s Atributes
Name Description

metadata

metrics

outputOrdering

SparkPlan has the following final methods that prepare execution environment and pass calls on to corresponding methods (that constitute SparkPlan Contract).

Table 2. SparkPlan’s Final Methods
Name Description

execute

Executes a physical operator that generates an RDD of internal binary rows.

final def execute(): RDD[InternalRow]

Used most importantly when QueryExecution is requested for a RDD (that in turn triggers execution of any children the physical operator may have as children).

Internally, execute executes doExecute in a named scope.

Note
Executing doExecute in a named scope happens only after the operator is prepared for execution followed by waiting for any subqueries to finish.

prepare

Prepares a query for execution.

Internally, prepare calls doPrepare of its children first followed by prepareSubqueries and doPrepare.

executeBroadcast

Calls doExecuteBroadcast

Table 3. Physical Query Operators / Specialized SparkPlans
Name Description

BinaryExecNode

Binary physical operator with two child left and right physical operators

LeafExecNode

Leaf physical operator with no children

By default, the set of all attributes that are produced is exactly the set of attributes that are output.

UnaryExecNode

Unary physical operator with one child physical operator

Note
The naming convention for physical operators in Spark’s source code is to have their names end with the Exec prefix, e.g. DebugExec or LocalTableScanExec that is however removed when the operator is displayed, e.g. in web UI.

decodeUnsafeRows Method

Caution
FIXME

prepareSubqueries Method

Caution
FIXME

getByteArrayRdd Internal Method

getByteArrayRdd(n: Int = -1): RDD[Array[Byte]]
Caution
FIXME

waitForSubqueries Method

Caution
FIXME

executeCollect Method

Caution
FIXME
Note
executeCollect does not convert data to JVM types.

executeToIterator Method

Caution
FIXME

SparkPlan Contract

SparkPlan contract requires that concrete physical operators define their own custom doExecute.

doExecute(): RDD[InternalRow]

doExecute produces the result of a structured query as an RDD of internal binary rows.

Table 4. SparkPlan’s Extension Hooks (in alphabetical order)
Name Description

doExecuteBroadcast

By default reports a UnsupportedOperationException.

[nodeName] does not implement doExecuteBroadcast

Executed exclusively as part of executeBroadcast to return the result of a structured query as a broadcast variable.

doPrepare

Prepares a physical operator for execution.

Executed exclusively as part of prepare and is supposed to set some state up before executing a query (e.g. BroadcastExchangeExec to broadcast asynchronously).

outputPartitioning

Specifies how data is partitioned across different nodes in the cluster

requiredChildDistribution

Required partition requirements (aka child output distributions) of the input data, i.e. how children physical operators' output is split across partitions.

requiredChildDistribution: Seq[Distribution]

Defaults to UnspecifiedDistribution for all of the physical operator’s children.

Used exclusively when EnsureRequirements physical preparation rule enforces partition requirements of a physical operator.

requiredChildOrdering

Specifies required sort ordering for each partition requirement (from children operators)

requiredChildOrdering: Seq[Seq[SortOrder]]

Defaults to no sort ordering for all of the physical operator’s children.

Used exclusively when EnsureRequirements physical preparation rule enforces sort requirements of a physical operator.

Executing Query in Scope (after Preparations) — executeQuery Final Method

executeQuery[T](query: => T): T

executeQuery executes query in a scope (i.e. so that all RDDs created will have the same scope for visualization like web UI).

Internally, executeQuery calls prepare and waitForSubqueries followed by executing query.

Note
executeQuery is executed as part of execute, executeBroadcast and when CodegenSupport-enabled physical operator produces a Java source code.

Broadcasting Result of Structured Query — executeBroadcast Final Method

executeBroadcast[T](): broadcast.Broadcast[T]

executeBroadcast returns the result of a structured query as a broadcast variable.

Internally, executeBroadcast calls doExecuteBroadcast inside executeQuery.

Note
executeBroadcast is called in BroadcastHashJoinExec, BroadcastNestedLoopJoinExec and ReusedExchangeExec physical operators.

metrics Internal Registry

metrics: Map[String, SQLMetric] = Map.empty

metrics is a registry of supported SQLMetrics by their names.

Taking First N UnsafeRows — executeTake Method

executeTake(n: Int): Array[InternalRow]

executeTake gives an array of up to n first internal rows.

spark sql SparkPlan executeTake.png
Figure 1. SparkPlan’s executeTake takes 5 elements

Internally, executeTake gets an RDD of byte array of n unsafe rows and scans the RDD partitions one by one until n is reached or all partitions were processed.

executeTake runs Spark jobs that take all the elements from requested number of partitions, starting from the 0th partition and increasing their number by spark.sql.limit.scaleUpFactor property (but minimum twice as many).

Note
executeTake uses SparkContext.runJob to run a Spark job.

In the end, executeTake decodes the unsafe rows.

Note
executeTake gives an empty collection when n is 0 (and no Spark job is executed).
Note
executeTake may take and decode more unsafe rows than really needed since all unsafe rows from a partition are read (if the partition is included in the scan).
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 10)

// 8 groups over 10 partitions
// only 7 partitions are with numbers
val nums = spark.
  range(start = 0, end = 20, step = 1, numPartitions = 4).
  repartition($"id" % 8)

import scala.collection.Iterator
val showElements = (it: Iterator[java.lang.Long]) => {
  val ns = it.toSeq
  import org.apache.spark.TaskContext
  val pid = TaskContext.get.partitionId
  println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
}
// ordered by partition id manually for demo purposes
scala> nums.foreachPartition(showElements)
[partition: 0][size: 2] 4 12
[partition: 1][size: 2] 7 15
[partition: 2][size: 0]
[partition: 3][size: 0]
[partition: 4][size: 0]
[partition: 5][size: 5] 0 6 8 14 16
[partition: 6][size: 0]
[partition: 7][size: 3] 3 11 19
[partition: 8][size: 5] 2 5 10 13 18
[partition: 9][size: 3] 1 9 17

scala> println(spark.sessionState.conf.limitScaleUpFactor)
4

// Think how many Spark jobs will the following queries run?
// Answers follow
scala> nums.take(13)
res0: Array[Long] = Array(4, 12, 7, 15, 0, 6, 8, 14, 16, 3, 11, 19, 2)

// The number of Spark jobs = 3

scala> nums.take(5)
res34: Array[Long] = Array(4, 12, 7, 15, 0)

// The number of Spark jobs = 4

scala> nums.take(3)
res38: Array[Long] = Array(4, 12, 7)

// The number of Spark jobs = 2
Note

executeTake is used when:

  • CollectLimitExec is requested to executeCollect

  • AnalyzeColumnCommand is executed

results matching ""

    No results matching ""