一、概述

SparkPlanner 继承自 SparkStrategies 类,而 SparkStrategies 类则继承自 QueryPlanner 基类,重要的 plan() 方法实现就在 QueryPlanner 类中 。 SparkStrategies 类本身不提供任何方法,而是在内部提供一 批 SparkPlanner 会用到的各种策略( Strate盯)实现。 最后 ,在 SparkPlanner 层面将这些策略整

合在一起,通过 plan()方法进行逐个应用 。

类似逻辑计划阶段的 Anaylzer 和 Optimizer,SparkPlanner 本身只是一个逻辑的驱动 ,各种策略的 apply 方法把逻辑执行计划算子映射成物理执行计划算子。

二、实现

2.1. $plan$

$plan$ 方法传入 LogicalPlan 作为参数,将 strategies 应用 到 LogicalPlan,生成物理计划候选集合(Candidates)。 如果该集合中存在 PlanLater 类型的 SparkPlan,则通过 placeholder 中间变量取 出对应的 LogicalPlan 后,递归调用 plan()方法,将 PlanLater 替换为子节点的物理计划 。 最后,对物理计划列表进行过滤,去掉一些不够高效的物 理计划。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {

// 收集物理计划的候选者
val candidates = strategies.iterator.flatMap(_(plan))

// 候选者必须包含被标记为[[planLater]]的占位符
// 因此试着用子计划来替代它们
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)

if (placeholders.isEmpty) {
// 按原样接受候选者,因为它不包含占位符。
Iterator(candidate)
} else {
// 把逻辑计划标记为 [[planLater]] ,同时替换占位符
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// 继续规划那些有占位符的逻辑计划
val childPlans = this.plan(logicalPlan)

candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
// 使用子计划来替代占位符
candidateWithPlaceholders.transformUp {
case p if p.eq(placeholder) => childPlan
}
}
}
}
}
}

// 修剪物理计划在当前 3.3.0 版本中还未实现
val pruned = prunePlans(plans)
assert(pruned.hasNext, s"No plan for $plan")
pruned
}

2.1.1. 收集物理计划的候选者

1
val candidates = strategies.iterator.flatMap(_(plan))