Flink-源码学习-FlinkSQL&Table-Planner-Parser 模块
一、概述
二、实现
2.1. 架构设计
2.2. 解析
2.2.1. $ParserImpl#parse$
获取 Calcite 的解析器
1
CalciteParser parser = calciteParserSupplier.get();
使用 FlinkPlannerImpl 作为 validator
1
FlinkPlannerImpl planner = validatorSupplier.get();
EXTENDED_PARSER
对于一些特殊的写法,例如 SET key=value。CalciteParser 是不支持这种写法的, 为了避免在 Calcite 引入过多的关键字,这里定义了一组 extended parser,专门用于在CalciteParser之前,解析这些特殊的语句
1
2
3
4Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {
return Collections.singletonList(command.get());
}使用 Calcite 解析器,解析 SQL 为语法树(SqlNode)
1
2SqlNodeList sqlNodeList = parser.parseSqlList(statement);
List<SqlNode> parsed = sqlNodeList.getList();$SqlToOperationConverter#convert$
1
2
3Collections.singletonList(
SqlToOperationConverter.convert(planner, catalogManager,
parsed.get(0)).orElseThrow(() -> new TableException("Unsupported query: " + statement)));将语法树转换为 operator
Operation (Flink Table AP/中抽象出来的概念)代表任意类型的 Sql 操作行为,例如 Select、Insert、 Drop 等 sql 操作可以表示为 QueryOperation、 CatalogSinkModifyOperation、DropOperation。
校验语法树
按照校验规则,检查 SQL 的合法性,同时重写语法树
1
final SqlNode validated = flinkPlanner.validate(sqlNode);
引用本站文章Flink-源码学习-FlinkSQL&Table-SQL 引擎-Validator 模块Joker$SqlToOperationConverter#convertValidatedSqlNode$
创建 SqlToOperationConverter,负责 SQL 转换,判断 SqlNode 的类型,采用不同的转换逻辑
以
SqlKind.QUERY
为例:1
return Optional.of(converter.convertSqlQuery(validated));
$FlinkPlannerImpl#rel$
创建 SqlToRelConverter
1
val sqlToRelConverter: SqlToRelConverter = createSqlToRelConverter(sqlValidator)
$SqlToRelConverter#convertQuery$
经过语法校验的 AST 树经过 $SqlToRelConverter#convertQuery$ 调用,将 SQL 转换为 RelNode,即生成逻辑计划 LogicalPlan。
将 RelNode 封装成 Operation
Flink 为了统一 Table api 和 sql 执行两种方式,会在这个阶段将 RelNode 封装成 Operation
1
return new PlannerQueryOperation(relational.project());