Flink-源码学习-FlinkSQL&Table-执行环境 TableEnvironment
一、概述
TableEnvironment 是 Flink Table API 和 Flink SQL 中使用的执行环境,向上对开发者提供了 Flink SQL 使用的相关接口,向下连接Flink的SQL运行时。
TableEnvironment 包含如下职责:
- 连接到外部数据源
- 注册 Table 和获取元数据信息。
- 执行 SQL 语句。
- 提供 SQL 执行的配置。
二、实现
2.1. 架构设计
TableEnvironment 是顶级接口,是所有 TableEnvironment 的基类
Flink 将批处理迁移到流计算上,实现了 API 接口到执行层面的统一,以前老版本中 BatchTableEnvironment 已经退出了历史的舞台。StreamTableEnvironment 与 TableEnvironment 接口相比,扩展了与 DataStream 的相互转换能力。
同时社区也在努力推动 Java 和 Scala TableEnvironment 的统一。Flink TableEnvironment 的未来架构会更加简洁,成为推荐使用的接口,只有当需要与 DataStream 做转换时,才需要用到 StreamTableEnvironment。
2.2. 创建 TableEnvironment.create
1 | static TableEnvironment create(EnvironmentSettings settings) { |
2.2.1. 实例化 TableConfig
TableConfig 用于配置和管理表的相关属性和设置。TableConfig 提供了多个方法来设置不同的属性
2.2.2. 实例化 ModuleManager
模块管理器(模块包含函数集、类型集、规则集)
1 | final ModuleManager moduleManager = new ModuleManager(); |
2.2.3. 实例化 CatalogManager
Flink 通过 CatalogManager 来组织当前系统中可用的 Catalog 和设置、查询当前 Catalog 等的信息。
1 | final CatalogManager catalogManager = CatalogManager.newBuilder() |
2.2.4. 实例化 FunctionCatalog
用户自定义函数管理器
1 | final FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, |
2.2.5. 创建线程池
1 | final ExecutorFactory executorFactory = FactoryUtil.discoverFactory(classLoader, |
2.2.6. Planner
SQL 解析器Planner
1 | final Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor, |
2.2.7. 实例化 TableEnvironmentImpl
1 | return new TableEnvironmentImpl( |
$OperationTreeBuilder.create$
$CatalogManager.initSchemaResolver$
引用本站文章Flink-源码学习-FlinkSQL&Table-元数据体系 Catalog-CatalogManagerJoker
2.3. executeSql
2.3.1. parse
FlinkPlannerImpl 内部使用 Calcite 的 $SqlToRelConverter$ 将验证后的抽象语法树转换成关系代数树。
2.3.2. execute
根据上一步生成的Operation,将其翻译为执行计划
1 | return executeInternal(operations.get(0)); |