import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
scala> spark.conf.get(WHOLESTAGE_CODEGEN_ENABLED)
res0: String = true
CollapseCodegenStages Physical Preparation Rule — Collapsing Physical Operators for Whole-Stage CodeGen
CollapseCodegenStages
is a physical preparation rule that collapses physical operators for Java code generation (as part of Whole-Stage CodeGen).
Note
|
You can disable
Use SQLConf.wholeStageEnabled method to access the current value.
|
CollapseCodegenStages
acts only on physical operators with CodegenSupport for which Java code can really be generated.
CollapseCodegenStages
takes a SQLConf when created.
Tip
|
Import CollapseCodegenStages and apply the rule directly to a physical plan to learn how the rule works.
|
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
val query = spark.range(2).join(spark.range(2), "id")
// the final result (after CollapseCodegenStages among other rules)
scala> query.explain
== Physical Plan ==
*Project [id#9L]
+- *BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
:- *Range (0, 2, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 2, step=1, splits=8)
val plan = query.queryExecution.sparkPlan
// wholeStageEnabled is enabled
scala> println(spark.sessionState.conf.wholeStageEnabled)
true
import org.apache.spark.sql.execution.CollapseCodegenStages
val ccs = CollapseCodegenStages(conf = spark.sessionState.conf)
scala> ccs.ruleName
res0: String = org.apache.spark.sql.execution.CollapseCodegenStages
// Before CollapseCodegenStages
scala> println(plan.numberedTreeString)
00 Project [id#9L]
01 +- BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02 :- Range (0, 2, step=1, splits=8)
03 +- Range (0, 2, step=1, splits=8)
// After CollapseCodegenStages
// Note the star
val executedPlan = ccs.apply(plan)
scala> println(executedPlan.numberedTreeString)
00 *Project [id#9L]
01 +- *BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02 :- *Range (0, 2, step=1, splits=8)
03 +- *Range (0, 2, step=1, splits=8)
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsc = executedPlan(0).asInstanceOf[WholeStageCodegenExec]
scala> println(wsc.numberedTreeString)
00 *Project [id#9L]
01 +- *BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02 :- *Range (0, 2, step=1, splits=8)
03 +- *Range (0, 2, step=1, splits=8)
scala> println(wsc.child.numberedTreeString)
00 Project [id#9L]
01 +- BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02 :- Range (0, 2, step=1, splits=8)
03 +- Range (0, 2, step=1, splits=8)
// Let's disable wholeStage codegen
// CollapseCodegenStages becomes a noop
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
newSpark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
scala> println(newSpark.sessionState.conf.wholeStageEnabled)
false
val ccsWholeStageDisabled = CollapseCodegenStages(conf = newSpark.sessionState.conf)
scala> println(ccsWholeStageDisabled.apply(plan).numberedTreeString)
00 Project [id#9L]
01 +- BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02 :- Range (0, 2, step=1, splits=8)
03 +- Range (0, 2, step=1, splits=8)
Inserting WholeStageCodegenExec to Physical Plan for Operators with CodeGen Support — apply
Method
apply(plan: SparkPlan): SparkPlan
apply
starts inserting WholeStageCodegenExec (with InputAdapter) in the input plan
physical plan only when spark.sql.codegen.wholeStage internal property is enabled. Otherwise, it does nothing at all (i.e. passes the input physical plan through unchanged).
Note
|
Input Adapters show themselves with no star in explain.
|
Note
|
spark.sql.codegen.wholeStage property is enabled by default.
Use SQLConf.wholeStageEnabled method to access the current value.
|
Inserting WholeStageCodegenExec (with InputAdapter) for Physical Operators with Codegen Support — insertWholeStageCodegen
Internal Method
insertWholeStageCodegen(plan: SparkPlan): SparkPlan
insertWholeStageCodegen
is the main recursive method of CollapseCodegenStages
that (walks down the plan
tree and) finds physical operators with optional Java code generation for which Java code can really be generated and inserts WholeStageCodegenExec operator (with InputAdapter) for them.
Note
|
insertWholeStageCodegen skips physical operators with output with just a single ObjectType value (regardless of their support for codegen).
|
Note
|
insertWholeStageCodegen is used recursively by itself and insertInputAdapter, but more importantly when CollapseCodegenStages runs.
|
Inserting InputAdapter Unary Operator — insertInputAdapter
Internal Method
insertInputAdapter(plan: SparkPlan): SparkPlan
insertInputAdapter
inserts an InputAdapter unary operator in a physical plan.
-
For SortMergeJoinExec (with inner and outer joins) inserts an InputAdapter operator for both children physical operators individually
-
For codegen-unsupported operators inserts an InputAdapter operator
-
For other operators (except
SortMergeJoinExec
operator above or for which Java code cannot be generated) inserts an InputAdapter operator for every child operator
Caution
|
FIXME Examples for every case + screenshots from web UI |
Note
|
insertInputAdapter is used in insertWholeStageCodegen and recursively.
|
Physical Operators with Codegen Support — supportCodegen
Internal Predicate
supportCodegen(plan: SparkPlan): Boolean
supportCodegen
finds physical operators with CodegenSupport and supportCodegen flag enabled.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// both where and select support codegen
val query = spark.range(2).where('id === 0).select('id)
scala> query.explain
== Physical Plan ==
*Filter (id#88L = 0)
+- *Range (0, 2, step=1, splits=8)
supportCodegen
is positive when all of the following hold:
-
Catalyst expressions of the physical operator all support codegen
-
Number of nested fields of the schema of the physical operator is up to spark.sql.codegen.maxFields internal property (100 by default)
-
Number of the nested fields in the schema of the children is up to
spark.sql.codegen.maxFields
(same as above)
Otherwise, supportCodegen
is negative/disabled.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// both where and select support codegen
// let's break the requirement of having up to spark.sql.codegen.maxFields
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)
scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2
import newSpark.implicits._
val query = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0)
scala> query.explain
== Physical Plan ==
Project [_1#452 AS id#456, _2#453 AS c0#457, _3#454 AS c1#458]
+- Filter (_1#452 = 0)
+- LocalTableScan [_1#452, _2#453, _3#454]
Expressions with Codegen Support — supportCodegen
Internal Predicate
supportCodegen(e: Expression): Boolean
supportCodegen
is positive when the Catalyst expression e
is (in the order of verification):
-
non-CodegenFallback expression
Otherwise, supportCodegen
is negative.
Note
|
supportCodegen (for expressions) is used when supportCodegen (for physical plans) finds operators that support codegen.
|