Flink-源码学习-集群启动-standalone-jobmanager-核心组件-Resourcemanager
一、概述
ResourceManager 作为统一的集群资源管理器,用于管理整个集群的计算资源,包括CPU资源、内存资源等。同时,ResourceManager 负责向集群资源管理器中申请容器资源启动 TaskManager 实例,并对 TaskManager 进行集中管理。当新的作业提交到集群后,JobManager 会向 ResourceManager 申请作业执行需要的计算资源,进而完成整个作业的运 行。
二、实现
ResourceManager 作为集群资源管理组件,不同的 Cluster 集群资源管理涉及的初始化过程也会有所不同。为了兼容 Hadoop Yarn、 Kubernetes、 Mesos 等集群资源管理器,在 ResourceManager 抽象实现类的基础上,分别实现了 ActiveResourceManager、StandaloneResourceManager 以及 MesosResourceManager 等子类。
2.1. ActiveResourceManager
ActiveResourceManager 实现了动态资源管理,可以根据提交的作业动态选择启动或停止 TaskManager 实例。目前支持 TaskManager 动态管理和启动的 ResourceManager 主要有 KubernetesResourceManager 和 YarnResourceManager 实现类。
2.2. ResourceManagerGateway 接口
ResourceManagerGateway 接口提供了 ResourceManager 需要的 RPC 方法,供其他集群组件调用。例如在 TaskExecutor 中调用ResourceManagerGateway 完成在 ResourceManager 中注册 TaskExecutor 的操作。
三、创建 ResourceManager
不管是哪种类型的 ResourceManager 实现,都需要在内部创建 ResourceManagerRuntimeServices,ResourceManagerRuntimeServices 中包含 SlotManager 和 JobLeaderIdService 两个主要服务。其中
- SlotManager 服务管理整个集群的 Slot 计算资源,并对 Slot 计算资源进行统一的分配和管理;
- JobLeaderIdService 通过实现 jobLeaderldListeners 实时监听 JobManager 的运行状态,以获取集群启动的作业对应的 JobLeaderId信息,防止出现 JobManager 无法连接的情况。
在创建和启动 ResourceManager 组件中,同样也需要 RpcService、HighAvailabilityServices、HeartbeatServices 等基础服务,并通过这些基础服务创建 ResourceManager 组件。
3.1. Standalone
在 StandaloneResourceManagerFactory 中通过调用 createResourceManager() 方法创建 Standalone 类型集群的 ResourceManager 组件。
3.1.1. 创建 ResourceManagerRuntimeServices
在创建 StandaloneResourceManager 之前,需要先创建 ResourceManagerRuntimeServices,主要包含了 SlotManager 和 JobLeaderIdService 两个内部服务。
SlotManager
SlotManager 是 ResourceManager 组件最重要的内部组件,主要用于管理和协调整个集群的 Slot 计算资源,同时实现了对 TaskManager 信息的注册和管理。通过调用 ResourceManagerRuntimeServices.createSlotManager() 方法创建 SlotManager 服务。
创建 SlotManagerConfiguration 配置,用于创建 SlotManager。
SlotManagerConfiguration 主要包含 TaskManagerRequestTimeout、SlotRequestTimeout 等配置信息。
创建和初始化 SlotMatchingStrategy。
SlotMatchingStrategy 根据作业中给定的 ResourceProfile 匹配 Slot 计算资源。
SlotMatchingStrategy 主要分为两种类型:
- LeastUtilizationSlotMatchingStrategy: 按照利用率最低原则匹配 Slot 资源,尽可能保证 TaskExecutor上资源的使用率处于比较低的水平,这种策略能够有效降低机器的负载。
- AnyMatchingSlotMatchingStrategy: 即直接返回第一个匹配的 Slot 资源策略。
创建 SlotManagerImpl 实现类并返回,然后在 ResourceManager 组件启动过程中进行初始化。
JobLeaderIdService
JobLeaderIdService,此对象定义了一系列 listener,用于监听 jobMaster 变化
3.1.2. 返回 StandaloneResourceManager 实例
1 | return new StandaloneResourceManager( |
四、启动 ResourceManager
ResourceManager 组件创建完毕后,会在 DefaultDispatcherResourceManagerComponentFactory 中调用 ResourceManager.start() 方法启动 ResourceManager 组件。
因为 ResourceManager 继承自RpcEndpoint,ResourceManager 本质上是一个 RPC 组件服务,启动 ResourceManager 组件实际上就是在启动 ResourceManager 组件对应 RpcEndpoint 中的 RpcServer。当 ResourceManager 对应的 RPC 服务启动后调用 ResourceManager.onStart() 方法启动 ResourceManager 内部的其他核心服务,最终完成 ResourceManager 的启动流程。
1 |
|
启动高可用 ResourceManager 服务
从 highAvailabilityServices 基础服务中获取 ResourceManager 对应的 leaderElectionService
leaderElectionService 用于在高可用集群模式下保证 ResourceManager 组件的高可用。
初始化 ResourceManager
通过 LeaderElectionService 服务启动 ResourceManager
1
leaderElectionService.start(this);
主节点中的三个重要的组件: ResourceManager,Dispatcher 和 WebMonitorEndpoint 启动的时候,都会进行选举
见到
leaderElectionService.start(this)
这种格式的代码 一定要记住,最终- 参与选举的某个获胜的角色会调用 leaderElectionService.isLeader() ==> leaderContender.grantLeaderShip()
- 参与选举的某个失败的角色会调用 leaderElectionService.notLeader()
点点点~,最终代码来到 ZooKeeperLeaderElectionService#start(),
ZooKeeperLeaderElectionService 类是 LeaderLatchListener 的子类,所以当该组件在进行选举如果成功的话则会自动调用
isLeader()
方法,否则调用notLeader()
方法。tryAcceptLeadership()
生成 ResourceManagerId
startServicesOnLeadership()
开启心跳服务
1
startHeartbeatServices();
TaskManager 的心跳服务 taskManagerHeartbeatManager
这个心跳管理器关心点是 taskManager 的存活情况
JobManager 的心跳服务 jobManagerHeartbeatManager
这个心跳管理器关心点是 jobManager 的存活情况
启动 SlotManagerImpl
SlotManager 服务在 ResourceManager 收到 LeaderShip 调用 SlotManager.start() 方法启动 SlotManager服务。
对传入的参数进行校验,确保参数不为空。
将 SlotManager 启动标志设为True,然后通过 scheduledExecutor 线程池启动 TaskManager 周期性超时检查线程服务,防止 TaskManager 长时间掉线等问题。
每 30 秒检测以下是否有 TaskManager 挂掉
启动单独的线程对提交的 SlotRequest 进行周期性超时检查,防止 Slot 请求超时。
最后成功启动 SlotManager 服务。
confirmLeadership()
启动 jobLeaderIdService 服务
用于管理注册的 JobManager 节点,包括对 JobManager 的注册和注销等操作。
注册 Slot 和 TaskExecutor 的 Metrics 监控信息。