CollapseCodegenStages Physical Preparation Rule — Collapsing Physical Operators for Whole-Stage CodeGen


You can disable CollapseCodegenStages (and so whole-stage codegen) by turning spark.sql.codegen.wholeStage internal property off.

spark.sql.codegen.wholeStage property is enabled by default.

import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
scala> spark.conf.get(WHOLESTAGE_CODEGEN_ENABLED)
res0: String = true

Use SQLConf.wholeStageEnabled method to access the current value.

scala> spark.sessionState.conf.wholeStageEnabled
res1: Boolean = true

CollapseCodegenStages acts only on physical operators with CodegenSupport for which Java code can really be generated.

CollapseCodegenStages takes a SQLConf when created.

Import CollapseCodegenStages and apply the rule directly to a physical plan to learn how the rule works.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
val query = spark.range(2).join(spark.range(2), "id")

// the final result (after CollapseCodegenStages among other rules)
scala> query.explain
== Physical Plan ==
*Project [id#9L]
+- *BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
   :- *Range (0, 2, step=1, splits=8)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 2, step=1, splits=8)

val plan = query.queryExecution.sparkPlan

// wholeStageEnabled is enabled
scala> println(spark.sessionState.conf.wholeStageEnabled)

import org.apache.spark.sql.execution.CollapseCodegenStages
val ccs = CollapseCodegenStages(conf = spark.sessionState.conf)

scala> ccs.ruleName
res0: String = org.apache.spark.sql.execution.CollapseCodegenStages

// Before CollapseCodegenStages
scala> println(plan.numberedTreeString)
00 Project [id#9L]
01 +- BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02    :- Range (0, 2, step=1, splits=8)
03    +- Range (0, 2, step=1, splits=8)

// After CollapseCodegenStages
// Note the star
val executedPlan = ccs.apply(plan)
scala> println(executedPlan.numberedTreeString)
00 *Project [id#9L]
01 +- *BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02    :- *Range (0, 2, step=1, splits=8)
03    +- *Range (0, 2, step=1, splits=8)

import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsc = executedPlan(0).asInstanceOf[WholeStageCodegenExec]
scala> println(wsc.numberedTreeString)
00 *Project [id#9L]
01 +- *BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02    :- *Range (0, 2, step=1, splits=8)
03    +- *Range (0, 2, step=1, splits=8)

scala> println(wsc.child.numberedTreeString)
00 Project [id#9L]
01 +- BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02    :- Range (0, 2, step=1, splits=8)
03    +- Range (0, 2, step=1, splits=8)

// Let's disable wholeStage codegen
// CollapseCodegenStages becomes a noop

val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
newSpark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)

scala> println(newSpark.sessionState.conf.wholeStageEnabled)

val ccsWholeStageDisabled = CollapseCodegenStages(conf = newSpark.sessionState.conf)
scala> println(ccsWholeStageDisabled.apply(plan).numberedTreeString)
00 Project [id#9L]
01 +- BroadcastHashJoin [id#9L], [id#12L], Inner, BuildRight
02    :- Range (0, 2, step=1, splits=8)
03    +- Range (0, 2, step=1, splits=8)

Inserting WholeStageCodegenExec to Physical Plan for Operators with CodeGen Support — apply Method

apply(plan: SparkPlan): SparkPlan

apply starts inserting WholeStageCodegenExec (with InputAdapter) in the input plan physical plan only when spark.sql.codegen.wholeStage internal property is enabled. Otherwise, it does nothing at all (i.e. passes the input physical plan through unchanged).


Input Adapters show themselves with no star in explain.

scala> spark.range(1).groupBy("id").count.explain
== Physical Plan ==
*HashAggregate(keys=[id#31L], functions=[count(1)])
+- Exchange hashpartitioning(id#31L, 200) // <-- no star here
   +- *HashAggregate(keys=[id#31L], functions=[partial_count(1)])
      +- *Range (0, 1, step=1, splits=8)

spark.sql.codegen.wholeStage property is enabled by default.

import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
scala> spark.conf.get(WHOLESTAGE_CODEGEN_ENABLED)
res0: String = true

Use SQLConf.wholeStageEnabled method to access the current value.

scala> spark.sessionState.conf.wholeStageEnabled
res1: Boolean = true

Inserting WholeStageCodegenExec (with InputAdapter) for Physical Operators with Codegen Support — insertWholeStageCodegen Internal Method

insertWholeStageCodegen(plan: SparkPlan): SparkPlan

insertWholeStageCodegen is the main recursive method of CollapseCodegenStages that (walks down the plan tree and) finds physical operators with optional Java code generation for which Java code can really be generated and inserts WholeStageCodegenExec operator (with InputAdapter) for them.

insertWholeStageCodegen skips physical operators with output with just a single ObjectType value (regardless of their support for codegen).
insertWholeStageCodegen is used recursively by itself and insertInputAdapter, but more importantly when CollapseCodegenStages runs.

Inserting InputAdapter Unary Operator — insertInputAdapter Internal Method

insertInputAdapter(plan: SparkPlan): SparkPlan

insertInputAdapter inserts an InputAdapter unary operator in a physical plan.

FIXME Examples for every case + screenshots from web UI
insertInputAdapter is used in insertWholeStageCodegen and recursively.

Physical Operators with Codegen Support — supportCodegen Internal Predicate

supportCodegen(plan: SparkPlan): Boolean

supportCodegen finds physical operators with CodegenSupport and supportCodegen flag enabled.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// both where and select support codegen
val query = spark.range(2).where('id === 0).select('id)
scala> query.explain
== Physical Plan ==
*Filter (id#88L = 0)
+- *Range (0, 2, step=1, splits=8)

supportCodegen is positive when all of the following hold:

Otherwise, supportCodegen is negative/disabled.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
// both where and select support codegen
// let's break the requirement of having up to spark.sql.codegen.maxFields
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)

import newSpark.implicits._
val query = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0)
scala> query.explain
== Physical Plan ==
Project [_1#452 AS id#456, _2#453 AS c0#457, _3#454 AS c1#458]
+- Filter (_1#452 = 0)
   +- LocalTableScan [_1#452, _2#453, _3#454]

Expressions with Codegen Support — supportCodegen Internal Predicate

supportCodegen(e: Expression): Boolean

supportCodegen is positive when the Catalyst expression e is (in the order of verification):

Otherwise, supportCodegen is negative.

supportCodegen (for expressions) is used when supportCodegen (for physical plans) finds operators that support codegen.

results matching ""

    No results matching ""