一、概述

Parser 模块生成的 Unresolved LogicalPlan 仅仅是一种数据结构,不包含任何数据信息。Analyzer 模块使用事先定义好的规则(Rule)以及 Catalog 等信息对未解析的逻辑计划 Unresolved Logical Plan 进行补充和替换 logicalPlan 中的各个节点,让语法树包含元数据信息。

二、实现

Analyzer 的父类是 RuleExecutor,所以,调用 Analyzer 的 $apply()$ 方法时,实际上会调用 RuleExecutor 的 $apply()$ 方法中,并传入一个 Unresolved LogicalPlan。

2.1. RuleExecutor

Analyzer 模块中 RuleExecutor 为 Analyzer,Analyzer 调用 $executeAndCheck$ 方法执行模块规则批次~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
if (plan.analyzed) return plan
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
try {
checkAnalysis(analyzed)
analyzed
} catch {
case e: AnalysisException =>
val ae = e.copy(plan = Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}
}
  1. $RuleExecutor.executeAndTrack$

    调用 $RuleExecutor.execute$ 方法,将每个 Analyzer Batch 中所有的规则 Rule 对象实施于该 Unsolved LogicalPlan,并且该 Batch 中规则可能要执行多轮,直到执行的批数等于 batch.strategy.maxIterations 或者 logicalplan 与上个批次的结果比没有变化,则退出执行。

  2. $CheckAnalysis.checkAnalysis$

    执行完成后校验解析结果,利用 scala 的模式匹配机制,每当新增一种校验规则后只需要新增一个 case 分支就行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def checkAnalysis(plan: LogicalPlan): Unit = {
    val inlineCTE = InlineCTE(alwaysInline = true)
    inlineCTE(plan).foreachUp {
    case p if p.analyzed => // Skip already analyzed sub-plans
    case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
    throw new IllegalStateException(
    "[BUG] logical plan should not have output of char/varchar type: " + leaf)
    case u: UnresolvedNamespace =>
    u.failAnalysis(s"Namespace not found: ${u.multipartIdentifier.quoted}")
    case u: UnresolvedTable =>
    u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
    ...
    }
    }

2.2. 规则批次

因为继承自 RuleExecutor 类,所以 Analyzer 执行过程会调用其父类 RuleExecutor 中实现的 run 方法,主要的不同之处是 Analyzer 中重新定义了一系列规则,即 RuleExecutor 类中的成员变 量 batches。

1
2
/** Defines a sequence of rule batches, to be overridden by the implementation. */
protected def batches: Seq[Batch]

2.2.1. Substitution

Substitution Batch 对节点的作用类似于替换操作

CTESubstitution 规则是用来处理 With 语句。 在遍历逻辑算子树的过程中,当匹配到 With(child, relations) 节点时,将子 LogicalPlan 替换成解析后的 CTE,由于 CTE 的存在,SparkSqlParser 对 SQL 语句从左向右解析后会产生多个 LogicalPlan。

将子节点中未解析的窗口函数表达式 UnresolvedWindow Expression 转换成窗口函数表达式 WindowExpression

在 Union 算子节点只有一个子节点时,Union 操作并没有起到作用,这种情况下需要消除该 Union 节点。该规则在遍历逻辑算子树过程中,匹配到 Union(children)且 children 的数目只有 1 个时,将 Union(children) 替换为 children.head 节点。

Spark 从 2.0 版本开始,在 Order ByGroup By 语句中开始支持用常数来表示列的下标。

例如,假设某行数据包括 A、 B、 C 列,那么 1 对 应 A 列,2 对应 B 列, 3 对应 C 列;此时 “Group By 1, 2” 等价于 “Group By A, B” 语句。 而在 2.0 版本之前,这种写法会直接被当作常数而忽略。

新版本中这种特性通过配置参数 spark.sql.orderByOrdinalspark.sql.groupByOrdinal 进行设置 ,默认都为 true ,表示 该特性开启 。

Substitute Unresolved Ordinals 根据这两个配置参数将下标替换成 UnresolvedOrdinal 表达式,以映射到对应的列。

2.2.2.Disable Hints

2.2.3. Hints

2.2.4. Resolution

Resolution Batch 中包含了 Analyzer 中最多同时也最常用的解析规则。

2.2.5. UDF

Batch 主要用来对用户自定义函数进行一些特别的处理

理输入数据为 Null 的情形,从上至下进行表达式的遍历(transformExpressionsUp) , 当匹配到 ScalaUDF 类型的表达式时,创建 If 表达式来进行 Null 值的检查。

2.2.6. Cleanup

该 Batch 中仅包含 CleanupAliases 这一条规则,用来删除 LogicalPlan 中无用的别名信息 。 一般情况下,逻辑算子树中仅 Project、 Aggregate 或 Window 算子的最高一层表达式(分别对应 project list、 aggregate expressions 和 window expressions)才需要别名 。CleanupAliases 通过 trimAliases 方法对表达式执行中的别名进行删除。