ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan
EnsureRequirements Physical Preparation Rule
EnsureRequirements
is a physical preparation rule that transforms physical operators (up the plan tree):
-
Removes two adjacent ShuffleExchange physical operators if the child partitioning scheme guarantees the parent’s partitioning
-
For other non-
ShuffleExchange
physical operators, ensures partition distribution and ordering (possibly adding new physical operators, e.g. BroadcastExchangeExec and ShuffleExchange for distribution orSortExec
for sorting)
EnsureRequirements
is a part of preparations batch of physical plan rules and is executed in executedPlan phase of a query execution.
EnsureRequirements
takes a SQLConf when created.
createPartitioning
Internal Method
Caution
|
FIXME |
defaultNumPreShufflePartitions
Internal Method
Caution
|
FIXME |
Ensuring Partition Requirements (Distribution and Ordering) of Physical Operator — ensureDistributionAndOrdering
Internal Method
Internally, ensureDistributionAndOrdering
takes the following from the input physical operator
:
-
required partition requirements for the children
-
required sort ordering per the required partition requirements per child
-
child physical plans
Note
|
The number of requirements for partitions and their sort ordering has to match the number and the order of the child physical plans. |
ensureDistributionAndOrdering
matches the operator’s required partition requirements of children (requiredChildDistributions
) to the children’s output partitioning and (in that order):
-
If the child satisfies the requested distribution, the child is left unchanged
-
For
BroadcastDistribution
, the child becomes the child of BroadcastExchangeExec unary operator for broadcasting joins -
Any other pair of child and distribution leads to ShuffleExchange unary physical operator (with proper partitioning for distribution and with spark.sql.shuffle.partitions number of partitions, i.e.
200
by default)
Note
|
ShuffleExchange can appear in the physical plan when the children’s output partitioning cannot satisfy the physical operator’s required child distribution. |
If the input operator
has multiple children and specifies child output distributions, then the children’s output partitionings have to be compatible.
If the children’s output partitionings are not all compatible, then…FIXME
ensureDistributionAndOrdering
adds ExchangeCoordinator (only when adaptive query execution is enabled which is not by default).
Note
|
At this point in ensureDistributionAndOrdering the required child distributions are already handled.
|
ensureDistributionAndOrdering
matches the operator’s required sort ordering of children (requiredChildOrderings
) to the children’s output partitioning and if the orderings do not match, SortExec
unary physical operator is created as a new child.
ensureDistributionAndOrdering
sets the new children for the input operator
.
Note
|
ensureDistributionAndOrdering is used exclusively when EnsureRequirements is executed (i.e. applied to a physical plan).
|
Adding ExchangeCoordinator (When Adaptive Query Execution Enabled) — withExchangeCoordinator
Internal Method
withExchangeCoordinator(
children: Seq[SparkPlan],
requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan]
withExchangeCoordinator
adds ExchangeCoordinator to ShuffleExchange operators if adaptive query execution is enabled (per spark.sql.adaptive.enabled property) and partitioning scheme of the ShuffleExchanges
support ExchangeCoordinator
.
Note
|
spark.sql.adaptive.enabled property is disabled by default. |
Internally, withExchangeCoordinator
checks if the input children
operators support ExchangeCoordinator
which is that either holds:
-
If there is at least one ShuffleExchange operator, all children are either
ShuffleExchange
with HashPartitioning or their output partitioning is HashPartitioning (even inside PartitioningCollection) -
There are at least two
children
operators and the inputrequiredChildDistributions
are allClusteredDistribution
With adaptive query execution enabled (i.e. when spark.sql.adaptive.enabled flag is true
) and the operator supports ExchangeCoordinator, withExchangeCoordinator
creates a ExchangeCoordinator
and:
-
For every
ShuffleExchange
, registers theExchangeCoordinator
-
Creates HashPartitioning partitioning scheme with the default number of partitions to use when shuffling data for joins or aggregations (as
spark.sql.shuffle.partitions
which is200
by default) and addsShuffleExchange
to the final result (for the current physical operator)
Otherwise (when adaptive query execution is disabled or children
do not support ExchangeCoordinator
), withExchangeCoordinator
returns the input children
unchanged.
Note
|
withExchangeCoordinator is used exclusively for enforcing partition requirements of a physical operator.
|