一、概述

WholeStageCodegenExec 生成 Java 代码之后,就会交给 Janino 编译器进行编译。

1
2
3
4
5
6
7
8
val (_, compiledCodeStats) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case NonFatal(_) if !Utils.isTesting && conf.codegenFallback =>
// We should already saw the error message
logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")
return child.execute()
}

$doCodeGen()$ 方法返回 CodegenContext 对象与生成并格式化后的代码 (cleanedSource),Spark 首先尝试编译,如果编译失败且配置回退机制(参数 spark.sql.codegen.wholeStage 默认为 true),则代码生成将被舍弃转而执行 Spark 原生的逻辑。

编译任务由 CodeGenerator 中的 $doCompile()$ 方法执行,调用 Janino 中的 ClassBodyEvaluator 对象。

ClassBodyEvaluator 类单独定义了一个 ParentClassLoader,这样避免编译过程中抛出 ClassNotFoundException
异常。最终得到的是一个 GeneratedClass 类,提供了 generate 方法入口供外部调用。

如果顺利编译成功,则得到生成的对象(clazz),然后调用其 $generate()$ 方法得到 BufferedRowIterator 对象。接下来,WholeStageCodegenExec 的后续处理和其他物理算子节点(mapPartitionsWithindex) 类似。调用 $inputRDDs()$ 方法得到 RDD 列表后,根据 RDD 的数量采取不同的处理逻辑。在现有的实现中,代码生成仅最多支持对两个 RDD 的处理。