一、概述

从 SQL 语句的解析一直到提交之前,上述整个转换过程都在 Spark 集群的 Driver 端进行,不涉及分布式环境。SparkSession 类的 sql 方法调用 SessionState 中的各种对象,包括上述不同阶段对应的 SparkSqlParser 类、Analyzer 类、Optimizer 类和 SparkPlanner 类等,最后封装成一个 QueryExecution 对象。因此,在进行 Spark SQL 开发时,可以很方便地将每一步生成的计划单独剥离出来分析。

二、实现

2.1. executedPlan

经过 Analyzer 的处理,Unresolved LogicalPlan 解析为 Analyzed LogicalPlan。Analyzed LogicalPlan 中自底向上节$QueryExecution.executedPlan$~,

1
2
3
4
5
6
lazy val executedPlan: SparkPlan = {
assertOptimized() // 逻辑优化
executePhase(QueryPlanningTracker.PLANNING) { // 物理计划阶段
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}

2.1.1. QueryExecution.assertOptimized

其中 $QueryExecution.assertOptimized$ 完成逻辑计划的优化~

可以看到,逻辑计划优化结束后,开始进入物理计划阶段~

2.1.2. 物理计划阶段

1
2
3
executePhase(QueryPlanningTracker.PLANNING) { // 物理计划阶段
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}

2.2. assertAnalyzed

1
def assertAnalyzed(): Unit = analyzed

调用 $QueryExecution.analyzed$~

1
2
3
lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
  1. 初始化 SessionState

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    lazy val sessionState: SessionState = {
    parentSessionState
    .map(_.clone(this))
    .getOrElse {
    val state = SparkSession.instantiateSessionState(
    SparkSession.sessionStateClassName(sharedState.conf),
    self)
    state
    }
    }
  2. 初始化 Analyzer

    analyzer 变量构建的是 Analyzer 类,并且在类中重写了一些规则然后调用 QueryExecution 的 assertAnalyzed 函数,接下来看 QueryExecution 类而 Analyzer executeAndTrack() 函数内部又调用了execute函数

    1
    lazy val analyzer: Analyzer = analyzerBuilder()
  3. $Analyzer.executeAndCheck$

2.3. assertOptimized

来到 $QueryExecution.assertOptimized$~

1
2
3
4
5
6
7
8
9
lazy val optimizedPlan: LogicalPlan = {
assertCommandExecuted()
executePhase(QueryPlanningTracker.OPTIMIZATION) {
val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(),
tracker)
plan.setAnalyzed()
plan
}
}
  1. $QueryExecution.assertCommandExecuted$

    在执行 optimization 阶段之前,还要完成一个关键的步骤:commandExecuted

    1
    2
    3
    4
    5
    lazy val commandExecuted: LogicalPlan = mode match {
    case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
    case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
    case CommandExecutionMode.SKIP => analyzed
    }

    CommandExecutionMode 表示命令执行的模式,总共有 3 种模式:ALL(默认),NON_ROOTSKIP

    在 Spark 3.x 版本以前,我们使用 sql(“INSERT…”) 是不会立即触发表插入的,此时必须还得跟上 .collect() 来触发。Spark 3.x 后 CommandExecutionMode 默认情况下 ALL 就可以支持整棵树以前序遍历的方式立即执行命令。‘

    为了避免无止境地递归,在递归执行命令时应该使 用 NON_ROOT。

    不能使用叶命令节点执行查询计划,因为许多命令返回 GenericInternalRow,不能直接放入查询计划中,否则查询引擎可能会将GenericInternalRow强制转换为 UnsafeRow 导致失败。

    所谓的叶命令节点指的是那些没有其他依赖的命令节点,一般和数据源相关。

    当运行 EXPLAIN 或其他命令中的命令时,我们应该使用 SKIP 来避免立即触发命令执行。

  2. 初始化 Optimizer

  3. $Optimizer.executeAndTrack$

    调用 $Optimizer.execute$,将每个 Optimizer Batch 中所有的规则 Rule 对象实施于该 Analyzed LogicalPlan,并且该 Batch 中规则可能要执行多轮,直到执行的批数等于 batch.strategy.maxIterations 或者 logicalplan 与上个批次的结果比没有变化,则退出执行。

    Optimizer 继承自 RuleExecutor 类,本身没有重载 RuleExecutor 中的 $execute()$ 方法,因此其执行过程仍然是调用其父类 RuleExecutor 中实现的 execute 方法。在 QueryExecution 中,Optimizer 会对传入的 Analyzed LogicalPlan 执行 $execute()$ 方法,启动优化过程。

2.4. prepareForExecution

在逻辑计划优化阶段执行之后,进入物理计划阶段~

1
2
3
4
5
6
lazy val executedPlan: SparkPlan = {
assertOptimized() // 逻辑优化
executePhase(QueryPlanningTracker.PLANNING) { // 物理计划阶段
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}

将整个 planning阶段分成了 3 个部分:

  1. preparations
  2. sparkPlan.clone()
  3. prepareForExecution

2.4.1. preparations

$ preparations$ 准备物理计划的执行规则序列,这些规则序列包含 2 个部分:

  1. AQE 相关的执行规则
  2. 其他执行规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
subquery: Boolean): Seq[Rule[SparkPlan]] = {
adaptiveExecutionRule.toSeq ++
Seq(
CoalesceBucketsInJoin,
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
RemoveRedundantProjects,
EnsureRequirements(),
ReplaceHashWithSortAgg,
RemoveRedundantSorts,
DisableUnnecessaryBucketedScan,
ApplyColumnarRulesAndInsertTransitions(
sparkSession.sessionState.columnarRules, outputsColumnar = false),
CollapseCodegenStages()) ++
(if (subquery) {
Nil
} else {
Seq(ReuseExchangeAndSubquery)
})
}

2.1.1. AQE 相关的执行规则

2.1.2. 其他规则

  1. CoalesceBucketsInJoin

  2. PlanDynamicPruningFilters

  3. EnsureRequirements

  4. ReplaceHashWithSortAgg

  5. CollapseCodegenStages

2.2. sparkPlan.clone()

2.2.1. 初始化 SparkPlan

1
2
3
4
5
6
val sparkPlan: SparkPlan = {
assertOptimized()
executePhase(QueryPlanningTracker.PLANNING) {
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
}
}

$QueryExecution.createSparkPlan$~

1
2
3
4
5
6
7
8
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(plan)).next()
}
  1. 获取 SparkPlan 的候选集

    调用 $SparkStrategies.plan(ReturnAnswer(plan))$ 获取 SparkPlan 的候选集 Iterator[SparkPlan]

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    override def plan(plan: LogicalPlan): Iterator[SparkPlan] = {
    super.plan(plan).map { p =>
    val logicalPlan = plan match {
    case ReturnAnswer(rootPlan) => rootPlan
    case _ => plan
    }
    p.setLogicalLink(logicalPlan)
    p
    }
    }
  2. 选择 SparkPlan

    目前实现直接获取第一个 SparkPlan 😭~

2.3. QueryExecution.prepareForExecution()

经过 $QueryExecution.preparations$ 和 $sparkPlan.clone$ 两个阶段得到了执行规则和物理计划之后,$QueryExecution.prepareForExecution$将这些规则应用到物理计划上。

1
2
3
4
5
6
7
8
9
10
11
12
private[execution] def prepareForExecution(
preparations: Seq[Rule[SparkPlan]],
plan: SparkPlan): SparkPlan = {
val planChangeLogger = new PlanChangeLogger[SparkPlan]()
val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>
val result = rule.apply(sp)
planChangeLogger.logRule(rule.ruleName, sp, result)
result
}
planChangeLogger.logBatch("Preparations", plan, preparedPlan)
preparedPlan
}