一、概述

TableEnvironment 是 Flink Table API 和 Flink SQL 中使用的执行环境,向上对开发者提供了 Flink SQL 使用的相关接口,向下连接Flink的SQL运行时。

TableEnvironment 包含如下职责:

  1. 连接到外部数据源
  2. 注册 Table 和获取元数据信息。
  3. 执行 SQL 语句。
  4. 提供 SQL 执行的配置。

二、实现

2.1. 架构设计

TableEnvironment 是顶级接口,是所有 TableEnvironment 的基类

Flink 将批处理迁移到流计算上,实现了 API 接口到执行层面的统一,以前老版本中 BatchTableEnvironment 已经退出了历史的舞台。StreamTableEnvironment 与 TableEnvironment 接口相比,扩展了与 DataStream 的相互转换能力。

同时社区也在努力推动 Java 和 Scala TableEnvironment 的统一。Flink TableEnvironment 的未来架构会更加简洁,成为推荐使用的接口,只有当需要与 DataStream 做转换时,才需要用到 StreamTableEnvironment。

2.2. 创建 TableEnvironment.create

1
2
3
static TableEnvironment create(EnvironmentSettings settings) {
return TableEnvironmentImpl.create(settings);
}

2.2.1. 实例化 TableConfig

TableConfig 用于配置和管理表的相关属性和设置。TableConfig 提供了多个方法来设置不同的属性

TableConfig

2.2.2. 实例化 ModuleManager

模块管理器(模块包含函数集、类型集、规则集)

1
final ModuleManager moduleManager = new ModuleManager();

2.2.3. 实例化 CatalogManager

Flink 通过 CatalogManager 来组织当前系统中可用的 Catalog 和设置、查询当前 Catalog 等的信息。

1
2
3
4
5
6
7
final CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(),settings.getBuiltInDatabaseName())
)
.build();

2.2.4. 实例化 FunctionCatalog

用户自定义函数管理器

1
2
final FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager,
moduleManager);

2.2.5. 创建线程池

1
2
3
4
final ExecutorFactory executorFactory = FactoryUtil.discoverFactory(classLoader,
ExecutorFactory.class,
settings.getExecutor());
final Executor executor = executorFactory.create(configuration);

2.2.6. Planner

SQL 解析器Planner

1
2
3
final Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor,
tableConfig, catalogManager,
functionCatalog);

2.2.7. 实例化 TableEnvironmentImpl

1
2
3
4
5
6
7
8
9
return new TableEnvironmentImpl(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode(),
classLoader);
  1. $OperationTreeBuilder.create$

    OperationTreeBuilder#create

  2. $CatalogManager.initSchemaResolver$

2.3. executeSql

2.3.1. parse

FlinkPlannerImpl 内部使用 Calcite 的 $SqlToRelConverter$ 将验证后的抽象语法树转换成关系代数树。

2.3.2. execute

根据上一步生成的Operation,将其翻译为执行计划

1
return executeInternal(operations.get(0));