ReorderJoin Logical Plan Optimization

ReorderJoin is a logical optimization rule in Optimizer that transforms Filter (with CROSS and INNER joins) and Join logical plans with 3 or more joins and non-empty join conditions.

ReorderJoin is a part of Operator Optimizations fixed-point batch of rules.

Tip

Import ReorderJoin and apply the rule directly on your structured queries to learn how the rule works.

import org.apache.spark.sql.catalyst.optimizer.ReorderJoin
val rj = ReorderJoin(spark.sessionState.conf)

// Build analyzed logical plan with at least 3 joins and zero or more filters
val t1 = spark.range(4)
val t2 = spark.range(4)
val t3 = spark.range(4)

val query = t1.join(t2)
  .where(t1("id") === t2("id"))
  .join(t3)
  .where(t3("id") === t1("id"))
  .filter(t1("id") % 2 === 0)

scala> val plan = query.queryExecution.analyzed
plan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter ((id#6L % cast(2 as bigint)) = cast(0 as bigint))
+- Filter (id#12L = id#6L)
   +- Join Inner
      :- Filter (id#6L = id#9L)
      :  +- Join Inner
      :     :- Range (0, 4, step=1, splits=Some(8))
      :     +- Range (0, 4, step=1, splits=Some(8))
      +- Range (0, 4, step=1, splits=Some(8))

// Apply ReorderJoin rule
scala> val optimized = rj.apply(plan)
optimized: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Filter ((id#6L % cast(2 as bigint)) = cast(0 as bigint))
+- Join Inner, (id#12L = id#6L)
   :- Join Inner, (id#6L = id#9L)
   :  :- Range (0, 4, step=1, splits=Some(8))
   :  +- Range (0, 4, step=1, splits=Some(8))
   +- Range (0, 4, step=1, splits=Some(8))

scala> plan.stats(spark.sessionState.conf)
res5: org.apache.spark.sql.catalyst.plans.logical.Statistics = Statistics(sizeInBytes=32.0 KB, isBroadcastable=false)

// CBO disabled
scala> optimized.stats(spark.sessionState.conf)
res6: org.apache.spark.sql.catalyst.plans.logical.Statistics = Statistics(sizeInBytes=32.0 KB, isBroadcastable=false)

// ReorderJoin works differently when the following holds:
// * starSchemaDetection is enabled
// * cboEnabled is disabled
import org.apache.spark.sql.internal.SQLConf.STARSCHEMA_DETECTION
spark.sessionState.conf.setConf(STARSCHEMA_DETECTION, true)

spark.sessionState.conf.starSchemaDetection
spark.sessionState.conf.cboEnabled

Transforming Logical Plan — apply Method

apply transforms Filter (with CROSS and INNER join types) and Join logical plans.

Note
apply uses ExtractFiltersAndInnerJoins Scala extractor object (using unapply method) to "destructure" a logical plan to its logical operators.

createOrderedJoin Recursive Method

Caution
FIXME

Extracting Filter and Join Operators from Logical Plan — unapply Method (of ExtractFiltersAndInnerJoins)

unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]

unapply takes Filter (with CROSS and INNER joins) and any Join logical operators out of the input logical plan and flattens the joins.

Flattening Join — flattenJoin Method (of ExtractFiltersAndInnerJoins)

flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
  : (Seq[(LogicalPlan, InnerLike)], Seq[Expression])

flattenJoin takes CROSS and INNER join types…​FIXME

results matching ""

    No results matching ""