import org.apache.spark.sql.catalyst.optimizer.ReorderJoin
val rj = ReorderJoin(spark.sessionState.conf)
// Build analyzed logical plan with at least 3 joins and zero or more filters
val t1 = spark.range(4)
val t2 = spark.range(4)
val t3 = spark.range(4)
val query = t1.join(t2)
.where(t1("id") === t2("id"))
.join(t3)
.where(t3("id") === t1("id"))
.filter(t1("id") % 2 === 0)
scala> val plan = query.queryExecution.analyzed
plan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter ((id#6L % cast(2 as bigint)) = cast(0 as bigint))
+- Filter (id#12L = id#6L)
+- Join Inner
:- Filter (id#6L = id#9L)
: +- Join Inner
: :- Range (0, 4, step=1, splits=Some(8))
: +- Range (0, 4, step=1, splits=Some(8))
+- Range (0, 4, step=1, splits=Some(8))
// Apply ReorderJoin rule
scala> val optimized = rj.apply(plan)
optimized: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter ((id#6L % cast(2 as bigint)) = cast(0 as bigint))
+- Join Inner, (id#12L = id#6L)
:- Join Inner, (id#6L = id#9L)
: :- Range (0, 4, step=1, splits=Some(8))
: +- Range (0, 4, step=1, splits=Some(8))
+- Range (0, 4, step=1, splits=Some(8))
scala> plan.stats(spark.sessionState.conf)
res5: org.apache.spark.sql.catalyst.plans.logical.Statistics = Statistics(sizeInBytes=32.0 KB, isBroadcastable=false)
// CBO disabled
scala> optimized.stats(spark.sessionState.conf)
res6: org.apache.spark.sql.catalyst.plans.logical.Statistics = Statistics(sizeInBytes=32.0 KB, isBroadcastable=false)
// ReorderJoin works differently when the following holds:
// * starSchemaDetection is enabled
// * cboEnabled is disabled
import org.apache.spark.sql.internal.SQLConf.STARSCHEMA_DETECTION
spark.sessionState.conf.setConf(STARSCHEMA_DETECTION, true)
spark.sessionState.conf.starSchemaDetection
spark.sessionState.conf.cboEnabled
ReorderJoin Logical Plan Optimization
ReorderJoin
is a logical optimization rule in Optimizer that transforms Filter
(with CROSS and INNER joins) and Join
logical plans with 3 or more joins and non-empty join conditions.
ReorderJoin
is a part of Operator Optimizations fixed-point batch of rules.
Tip
|
Import |
Transforming Logical Plan — apply
Method
apply
transforms Filter
(with CROSS and INNER join types) and Join logical plans.
Note
|
apply uses ExtractFiltersAndInnerJoins Scala extractor object (using unapply method) to "destructure" a logical plan to its logical operators.
|
createOrderedJoin
Recursive Method
Caution
|
FIXME |
Extracting Filter and Join Operators from Logical Plan — unapply
Method (of ExtractFiltersAndInnerJoins)
unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]
unapply
takes Filter
(with CROSS and INNER joins) and any Join
logical operators out of the input logical plan
and flattens the joins.
Flattening Join — flattenJoin
Method (of ExtractFiltersAndInnerJoins)
flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
: (Seq[(LogicalPlan, InnerLike)], Seq[Expression])
flattenJoin
takes CROSS and INNER join types…FIXME