一、概述

flink runtime

  1. ResourceManager

    Flink 的集群资源管理器,只有一个,关于slot的管理和申请等工作,都由他负责

  2. Dispatcher

    负责接收用户提交的 JobGragh, 然后为这个新提交的作业启动一个 JobMaster

  3. WebMonitorEndpoint

    WebMonitorEndpoint 基于 Netty 通信框架实现了 Restful 的服务后端,提供 Restful 接口支持 Flink Web 页面在内的所有 Rest 请求,例如获取集群监控指标。

  4. JobMaster

    负责一个具体的 Job 的执行,在一个集群中,可能会有多个 JobMaster 同时执行,类似于 YARN 集群中的 AppMaster 角色,类似于 Spark Job 中的 Driver 角色

二、启动

根据以上的启动脚本分析来到 StandaloneSessionClusterEntrypoint#main() ~~~

2.1. 前置步骤

  1. 提供对 JVM 执行环境的访问的实用程序类

  2. 注册一些信号处理

  3. 安装安全关闭的钩子函数

  4. Flink 集群启动过程中,或者在启动好了之后的运行中都有可能接收到关闭集群的命令

  5. 对传入的参数进行解析

    内部通过 EntrypointClusterConfigurationParserFactory 解析配置文件,返回 EntrypointClusterConfigurationClusterConfiguration 的子类

    1
    2
    3
    final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new
    CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
    entrypointClusterConfiguration = commandLineParser.parse(args);
  6. 解析 Flink 的配置文件

    解析配置参数, 解析 flink 的配置文件: fink-conf.yaml

    1
    Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

现在来到核心代码啦 ClusterEntrypoint#runClusterEntrypoint()~,启动集群

1
ClusterEntrypoint.runClusterEntrypoint(entrypoint);

2.2. 集群插件管理器

PluginManager 是新版支持提供通用的插件机制,负责管理集群插件,这些插件是使用单独的类加载器加载的

2.3. 初始化文件系统

1
configureFileSystems(configuration, pluginManager);

2.4. 权限控制

1
SecurityContext securityContext = installSecurityContext(configuration);

2.5. 启动集群

1
2
3
4
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration, pluginManager);
return null;
});

2.5.1. 初始化公共基础服务

initializeServices() 中做了很多服务组件的初始化

1
initializeServices(configuration, pluginManager);

2.5.2. 启动 JobManager 组件

JobManager 启动时会初始化一个 DefaultDispatcherResourceManagerComponentFactory 工厂实例,其作为 DispatcherResourceManagerComponentFactory 接口的默认实现类,内部包含了 ResourceManagerFactory、DispatcherRunnerFactory 和 RestEndpointFactory 三个主要的成员变量,分别提供了 Flink 集群JobManager 核心组件 ResourceManager、 DispatcherRunner 以及 WebMonitorEndpoint 的创建方式。

DefaultDispatcherResourceManagerComponentFactory
  1. 创建 Gateway 组件
    获取集群 HA Leader 恢复服务及创建组件 Gateway

  2. 创建和启动核心组件

    JobMaster 在提交作业时才会启动

  3. 启动 Gateway 组件

  4. 返回 DispatcherResourceManagerComponent 对象

    将创建好的 WebMonitorEndpoin、ResourceManager 以及其对应的高可用管理服务 resourceManagerRetrievalService 和 DispatcherRunner 对应的高可用服务 dispatcherLeaderRetrievalService 封装到 DispatcherResourceManagerComponent。