Flink-源码学习-集群启动-taskmanager-心跳
在注册成功之后,TaskManager 和 JobManager(ResourceManager) 通过心跳机制保持连接。让我们看看心跳机制源码👀😈~~~
一、概述
在这个 TaskManager 启动过程中,最重要的事情,就是在 TaskManager初始化了一些基础服务和一些 对外提供服务的核心服务之后就启动 TaskExecutor,向 JobManager(ResourceManager) 进行 TaskManager 的注册,并且在注册成功之后,维持 TaskManager 和 JobManager(ResourceManager) 的心跳。
二、源码
TaskExecutor 向 ResourceManager 注册成功之后,会给这个 TaskExecutor 生成一个 HeartbeatTarget,每一个 TaskExecutor 都有一个唯一的 HeartbeatTarget 对象,这个 HeartbeatTarget 会被封装为 HeartbeatMonitor
最终,每个 TaskExecutor 对应的一个唯一的 HeartbeatMonitor 就被保存在 heartbeatTargetsmap 中
1 | heartbeatTargets.put(resourceID, heartbeatMonitor); |
heartbeatTargets 是一个 ConcurrentHashMap 集合对象
1 | private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets; |
key:TaskExecution 的 ID
value:heartbeatMonitor 包装了 heartbeatTarget, heartbeatTarget 中有两个方法
- receiveHeartBeat()
- requestHeartBeat()
既然 heartbeatTargets 是一个集合对象,在这个方法里将 HeartbeatMonitor 加入到集合中,那么应该在某处会把这个集合中的 HeartbeatMonitor 对象遍历出来~~~,问题来了,在哪儿🤔️,是谁呢❓
当前类有吗?
有一处,不过是在 stop() 方法里,而且遍历出来是要被 cancel() 的,应该不是~🤔️,再找找~,也有可能当前类的方法返回了 heartbeatTargets 集合对象,heartbeatTargets 在别处被调用 🤔️,试试~
谁调用了 getHeartbeatTargets() 呢 ~
来到 HeartbeatManagerSenderlmpl
~,这个应该才是我们想找的~
找到啦~,HeartbeatManagerSenderImpl 是个线程,是谁在什么时间启动的呢?
在 HeartbeatManagerSenderImpl 构造方法里通过 mainThreadExecutor 调度当前的类实例的 run() 方法的执行
1 | mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS); |
来到 HeartbeatServices
在 Flink 的心跳机制中,跟其他的集群不一样,Flink 中 ResourceManager 主动发送心跳给 从节点 Taskmanager,Taskmanager 接收到心跳信息之后,返回响应,让我找找谁调用了 createHeartbeatManagerSender()~
不管是集群的从节点执行心跳,还是每一个 job会启动的一个主控程序 都是向 ResourceManager 去汇报,来到 ResourceManager ~
1 | private void startHeartbeatServices() { |
好熟悉的代码🤔️~
在 JobManager 启动过程中有一个重要的步骤就是创建 ResourceManager 实例对象
三、总结下~
RM 在启动的时候,就已经启动了:TaskManagerHeartbeatManager 这个组件的内部:启动了一个 HearBeatManagerSenderImpl 对象。内部通过一种特别的机制,实现了一个 每隔 10s 调度一次 该组件的 run() 运行
最终的效果 RM 启动好了之后,就每隔10s钟,向所有的已注册的 TaskExecutor 发送心跳请求,如果最终发现某一个 TaskExecutor 的上一次心跳时间,距离现在超过 50s则认为该 TaskExecutor 宕机了。RM 要执行针对这个 TaskExecutor 的注销