在注册成功之后,TaskManager 和 JobManager(ResourceManager) 通过心跳机制保持连接。让我们看看心跳机制源码👀😈~~~

一、概述

在这个 TaskManager 启动过程中,最重要的事情,就是在 TaskManager初始化了一些基础服务和一些 对外提供服务的核心服务之后就启动 TaskExecutor,向 JobManager(ResourceManager) 进行 TaskManager 的注册,并且在注册成功之后,维持 TaskManager 和 JobManager(ResourceManager) 的心跳。

二、源码

TaskExecutor 向 ResourceManager 注册成功之后,会给这个 TaskExecutor 生成一个 HeartbeatTarget,每一个 TaskExecutor 都有一个唯一的 HeartbeatTarget 对象,这个 HeartbeatTarget 会被封装为 HeartbeatMonitor

截屏2022-03-23 上午11.34.41

最终,每个 TaskExecutor 对应的一个唯一的 HeartbeatMonitor 就被保存在 heartbeatTargetsmap 中

1
heartbeatTargets.put(resourceID, heartbeatMonitor);

heartbeatTargets 是一个 ConcurrentHashMap 集合对象

1
private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;

key:TaskExecution 的 ID

value:heartbeatMonitor 包装了 heartbeatTarget, heartbeatTarget 中有两个方法

  • receiveHeartBeat()
  • requestHeartBeat()

既然 heartbeatTargets 是一个集合对象,在这个方法里将 HeartbeatMonitor 加入到集合中,那么应该在某处会把这个集合中的  HeartbeatMonitor 对象遍历出来~~~,问题来了,在哪儿🤔️,是谁呢❓

当前类有吗?

截屏2022-03-20 上午8.50.25

有一处,不过是在 stop() 方法里,而且遍历出来是要被 cancel() 的,应该不是~🤔️,再找找~,也有可能当前类的方法返回了 heartbeatTargets 集合对象,heartbeatTargets 在别处被调用 🤔️,试试~

截屏2022-03-20 上午8.53.21

谁调用了 getHeartbeatTargets() 呢 ~

截屏2022-03-20 上午8.54.47

来到 HeartbeatManagerSenderlmpl ~,这个应该才是我们想找的~

截屏2022-03-20 上午8.56.10

找到啦~,HeartbeatManagerSenderImpl 是个线程,是谁在什么时间启动的呢?

在 HeartbeatManagerSenderImpl 构造方法里通过 mainThreadExecutor 调度当前的类实例的 run() 方法的执行

1
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);

截屏2022-03-20 上午8.58.36

来到 HeartbeatServices

截屏2022-03-20 上午8.59.25

在 Flink 的心跳机制中,跟其他的集群不一样,Flink 中 ResourceManager 主动发送心跳给 从节点 Taskmanager,Taskmanager 接收到心跳信息之后,返回响应,让我找找谁调用了 createHeartbeatManagerSender()~

截屏2022-03-20 上午9.17.37

不管是集群的从节点执行心跳,还是每一个 job会启动的一个主控程序 都是向 ResourceManager 去汇报,来到 ResourceManager ~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void startHeartbeatServices() {

/*
* 用来收听 TaskManager 的心跳
*/
taskManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);

/*
* 用来收听 JobManager 的心跳
*/
jobManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log);
}

好熟悉的代码🤔️~

在 JobManager 启动过程中有一个重要的步骤就是创建 ResourceManager 实例对象

三、总结下~

RM 在启动的时候,就已经启动了:TaskManagerHeartbeatManager 这个组件的内部:启动了一个 HearBeatManagerSenderImpl 对象。内部通过一种特别的机制,实现了一个 每隔 10s 调度一次 该组件的 run() 运行

最终的效果 RM 启动好了之后,就每隔10s钟,向所有的已注册的 TaskExecutor 发送心跳请求,如果最终发现某一个 TaskExecutor 的上一次心跳时间,距离现在超过 50s则认为该 TaskExecutor 宕机了。RM 要执行针对这个 TaskExecutor 的注销