一、概述

二、实现

2.1. 架构设计

2.2. 解析

2.2.1. $ParserImpl#parse$

  1. 获取 Calcite 的解析器

    1
    CalciteParser parser = calciteParserSupplier.get();
  2. 使用 FlinkPlannerImpl 作为 validator

    1
    FlinkPlannerImpl planner = validatorSupplier.get();
  3. EXTENDED_PARSER

    对于一些特殊的写法,例如 SET key=value。CalciteParser 是不支持这种写法的, 为了避免在 Calcite 引入过多的关键字,这里定义了一组 extended parser,专门用于在CalciteParser之前,解析这些特殊的语句

    1
    2
    3
    4
    Optional<Operation> command = EXTENDED_PARSER.parse(statement);
    if (command.isPresent()) {
    return Collections.singletonList(command.get());
    }
  4. 使用 Calcite 解析器,解析 SQL 为语法树(SqlNode)

    1
    2
    SqlNodeList sqlNodeList = parser.parseSqlList(statement);
    List<SqlNode> parsed = sqlNodeList.getList();
  5. $SqlToOperationConverter#convert$

    1
    2
    3
    Collections.singletonList(
    SqlToOperationConverter.convert(planner, catalogManager,
    parsed.get(0)).orElseThrow(() -> new TableException("Unsupported query: " + statement)));

    将语法树转换为 operator

    Operation (Flink Table AP/中抽象出来的概念)代表任意类型的 Sql 操作行为,例如 Select、Insert、 Drop 等 sql 操作可以表示为 QueryOperation、 CatalogSinkModifyOperation、DropOperation。

    • 校验语法树

      按照校验规则,检查 SQL 的合法性,同时重写语法树

      1
      final SqlNode validated = flinkPlanner.validate(sqlNode);
      • $SqlToOperationConverter#convertValidatedSqlNode$

        创建 SqlToOperationConverter,负责 SQL 转换,判断 SqlNode 的类型,采用不同的转换逻辑

        SqlKind.QUERY 为例:

        1
        return Optional.of(converter.convertSqlQuery(validated));
        1. $FlinkPlannerImpl#rel$

          • 创建 SqlToRelConverter

            1
            val sqlToRelConverter: SqlToRelConverter = createSqlToRelConverter(sqlValidator)
          • $SqlToRelConverter#convertQuery$

            经过语法校验的 AST 树经过 $SqlToRelConverter#convertQuery$ 调用,将 SQL 转换为 RelNode,即生成逻辑计划 LogicalPlan。

        2. 将 RelNode 封装成 Operation

          Flink 为了统一 Table api 和 sql 执行两种方式,会在这个阶段将 RelNode 封装成 Operation

          1
          return new PlannerQueryOperation(relational.project());