Flink-源码学习-集群启动-standalone-jobmanager-核心组件-WebMonitorEndpoint
一、概述
WebMonitorEndpoint 基于 Netty 通信框架实现了 Restful 的服务后端,提供 Restful 接口支持 Flink Web 页面在内的所有 Rest 请求,例如获取集群监控指标。
二、实现
2.1. 架构设计
WebMonitorEndpoint 继承了 RestServerEndpoint 基本实现类, 其中 RestServerEndpoint 基于 Netty 框架实现了 Rest 服务后端,并提供了自定义 Handler 的初始化和实现抽象方法。
DispatcherRestEndpoint
通过 Restful API 提交 JobGraphMiniDispatcherRestEndpoint
主要是针对本地执行实现的 mini 版 DispatcherRestEndpoint,区别在于因为 MiniDispatcherRestEndpoint 不支持通过 Restful API 提交 JobGraph,不用加载 JobGraph 提交使用的 Handlers。
在 IDEA 中执行作业时创建的实际上是 MiniCluster, 而在 MiniCluster 中对应的 WebMonitorEndpoint 实现是
MiniDispatcherRestEndpointWebMonitorEndpoint 实现了针对 Session 和 JobCluster 集群的 SessionFestEndpointFactory 和 JobRestEndpointFactory 两种工厂创建类。
三、创建 DispatcherRestEndpoint
DispatcherRestEndpoint 主要通过 SessionRestEndpointFactory 创建
configuration: 集群配置参数。
dispatcherGatewayRetriever
DispatcherGateway 服务地址获取器,用于获取当前活跃的 dispatcherGateway 地址。
基于 dispatcherGateway 可以实现与 Dispatcher 的 RPC 通信,最终提交的 JobGraph 通过 dispatcherGateway 发送给 Dispatcher 组件。
resourceManagerGatewayRetriever
ResourceManagerGateway 服务地址获取器,用于获取当前活跃的 ResourceManagerGateway 地址
通过 ResourceManagerGateway 实现 ResourceManager 组件之间的 RPC 通信,例如在 TaskManagersHandler 中通过调用ResourceManagerGateway 获取集群中的 TaskManagers 监控信息。
transientBlobService
临时二进制对象数据存储服务,BlobServer 接收数据后,会及时清理 Cache 中的对象数据。
executor
用于处理 WebMonitorEndpoint 请求的线程池服务。
metricFetcher
用于拉取 JobManager 和 TaskManager 上的 Metric 监控指标。
leaderElectionService
用于在高可用集群中启动和选择服务的 Leader 节点,如通过 leaderElectionService 启动 WebMonitorEndpoint RPC 服务,然后将Leader 节点注册至 ZooKeeper,以此实现 WebMonitorEndpoint 服务的高可用。
fatalErrorHandler
异常处理器,当 WebMonitorEndpoint 出现异常时调用 fatalErrorHandler 的中处理接口。
四、启动 RestServerEndpoint
1 | webMonitorEndpoint.start() |
4.1. 初始化 Handler
WebMonitorEndpoint 内部就始化了各种 handler, 针对不同请求调用不同 handler 进行处理
创建 Handler 使用的路由类 Router,用于根据地址寻找对应的 Handlers。
JobSubmit Handler 任务提交处理器将来自客户端提交应用程序上来,由 JobManager 中的 Netty 服务端的 JobSubmitHandler 来执行处理
这儿还会过来看哦😯~~~
父类还初始化了 60 多种其他 Handler,这些 Handler 的作用,其实就对应到 Flink web 业务的 rest 服务(相当于 servlet),最终由 Handler 的 handlerRequest (处理localhost:port/list)
4.2. Handlers 排序
针对所有的 Handlers 进行排序,排序规则: RestHandlerUrIComparator
1 | Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE); |
4.3. 启动 Netty 服务端
主节点上的 webMonitorEndpoint 组件的 Netty 服务端起好啦~~~,任务提交的时候,当然还要启动 Netty 的客户端~
4.4. 启动 RPC 高可用服务
4.4.1. 选举
主节点中的三个重要的组件: ResourceManager,Dispatcher 和 WebMonitorEndpoint 启动的时候,都会进行选举
1 | leaderElectionService.start(this) |
选举 ZooKeeperLeaderElectionService
见到
leaderElectionService.start(this)
这种格式的代码 一定要记住,最终
- 参与选举的 某个获胜的角色会调用 leaderElectionService.isLeader() ==> leaderContender.grantLeaderShip()
- 参与选举的 某个失败的角色会调用 leaderElectionService.notLeader()
点点点~,最终代码来到 ZooKeeperLeaderElectionService#start(),
ZooKeeperLeaderElectionService 类是 LeaderLatchListener 的子类,所以当该组件在进行选举如果成功的话则会自动调用
isLeader()
方法,否则调用notLeader()
方法。这是 ZooKeeper 的 API 框架 cruator 的机制
isLeader()
4.4.2. 启动定时任务 ExecutionGraphCacheCleanupTask
计算时间间隔
1
final long cleanupInterval = 2 * restConfiguration.getRefreshInterval();
开启定时任务 executionGraphCache.cleanup()
我们在执行任务时会生成一些文件,这个定时任务就是在任务执行结束后把一些没用的文件删除掉~,每个一段时间执行检查,删掉已经执行完毕的 ExecutionGraph
1
cachedExecutionGraphs.values().removeIf((ExecutionGraphEntry entry) -> currentTime >= entry.getTTL());