一、概述

对逻辑算子树的操作(绑定,解析,优化等)主要都是基于规则的,通过 Scala 的语言模式匹配进行树结构转换或节点改写。由 RuleExecutor 来调用规则,所有涉及树形结构转换过程的都继承自RuleExecutor[TreeType] 抽象类。

Analyzer、Optimizer 定义了一系列 rule,而 RuleExecutor 定义了一个 rules 执行框架,即怎么把一批批规则应用在一个 plan 上得到一个新的 plan。Spark sql 通过 Analyzer 中定义的 rule 把 Parsed Logical Plan 解析成 Analyzed Logical Plan; 通过 Optimizer 定义的 rule 把 Analyzed Logical Plan 优化成 Optimized Logical Plan。

二、实现

RuleExecutor 内部提供一个 Seq[Batch] 定义了改 RuleExecutor 的处理步骤,每个 Batch 代表一套规则;$RuleExecutor.apply()$ 方法会按照 batches 和 batches 内 Rule 的顺序对传入的 plan 内的节点进行迭代处理。

2.1. Rule

Rule 是一个抽象类,子类需要复写 $apply()$ 方法来制定特定的处理逻辑。

rule

2.2. Batch

规则批(Batch)由规则名称、执行策略和规则列表组成。

1
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

2.1.1. 执行策略

1
2
3
4
5
6
7
8
abstract class Strategy {
/** 执行最大迭代次数 */
def maxIterations: Int
/** 超过最大次数后是否抛出异常 */
def errorOnExceed: Boolean = false
/** SQLConf 中用来配置最大迭代次数的键 */
def maxIterationsSetting: String = null
}

执行策略分为 2 种类型:fixedPointOnce

  1. FixedPoint

    FixedPoint 代表运行到定点或最大迭代次数的策略

    1
    2
    3
    case class FixedPoint(override val maxIterations: Int, 
    override val errorOnExceed: Boolean = false,
    override val maxIterationsSetting: String = null) extends Strategy
  2. Once

    Once 代表是一种只运行一次且幂等的策略。

    1
    case object Once extends Strategy { val maxIterations = 1 }

2.3. RuleExecutor

有了各种具体规则后,还需要驱动程序来调用这些规则,在 Catalyst 中这个功能由 RuleExecutor 提供。凡是涉及树型结构的转换过程(如Analyzer 逻辑算子树分析过程、Optimizer 逻辑算子树的优化过程和后续物理算子树的生成过程等),都要实施规则匹配和节点处理,都继承自 RuleExecutor[TreeType] 抽象类。

2.3.1. execute()

$RuleExecutor.execute()$ 分批次 (batch) 执行一系列的优化规则直到稳定点(fixpoint)或最大迭代次数

  1. 遍历 batches,取出 batch
  2. 针对每个 batch,调用其 $batch.rules.foldLeft(curPlan){…}$
  3. 对比执行规则前后,初始的 plan 有无变化
  4. 执行直到达到稳定点或者最大迭代次数,到达最大迭代次数, 不再执行优化,对最大迭代次数大于1的情况打 log
  5. plan 不变,到达稳定点后,不再执行优化