Flink-源码学习-集群启动-taskmanager-注册
ResourceManager 作为统一的集群资源管理器,用于管理整个集群的计算资源,包括 CPU 资源、内存资源等。同时,ResourceManager 负责向集群资源管理器中申请容器资源启动 TaskManager 实例,并对 TaskManager 进行集中管理。当
新的作业提交到集群后,JobManager 会向 ResourceManager 申请作业执行需要的计算资源,进而完成整个作业的运行
让我们看看注册机制源码👀😈~~~
一、概述
当 TaskManager 所在的RPC 服务启动后,TaskManager 会和 ResourceManager之间创建RPC连接,此时TaskManager将自身的信息注册到 ResourceManager 中,并长期保持 与 ResourceManager 之间的心跳连接。ResourceManager 接收到TaskManager的注册信息后,将TaskManager资源信息存储在 SlotManager 服务中进行管理。
二、源码
在TaskExecutor.startTaskExecutorServices()方法中,主要通过
1 | resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); |
代码启动 TaskManager 和 ResourceManager 之间的RPC连接
在 实例化 TaskExecutor时,构造方法里会实例化 resourceManagerLeaderRetriever,
haServices.getResourceManagerLeaderRetriever()
基于 Zookeeper 实现会返回一个ZooKeeperLeaderRetrievalService 对象
ZooKeeperLeaderRetrievalService 对象实现了 LeaderRetrievalService 和 NodeCacheListener 接口,NodeCacheListener 是 curator 提供的监听器,而 resourceManagerLeaderRetriever.start()
方法等同于启动监听,当指定的 Zookeeper znode 节点数据发生改变,则会接收到通知自动回调 ZooKeeperLeaderRetrievalService#nodeChanged()
方法
所以让我们看看 ZooKeeperLeaderRetrievalService#nodeChanged()
~当监听响应的时候,如 ResourceManager 发生过变更, 或者是第一次启动,就会自动调用这个方法
等同于 Zookeeper 中 watcher 的 process() 方法
在 nodeChanged()
会调用对应的 LeaderRetrievalListener#notifyIfNewLeaderAddress()
方法
1 | notifyIfNewLeaderAddress(leaderAddress, leaderSessionID); |
参数 leaderAddress
就是 ResourceManager 的地址
在notifyIfNewLeaderAddress()
里又调用了 leaderListener.notifyLeaderAddress
方法,当 ResourceManager 变更的时候,执行 ResourceManager 的重连
这儿的 leaderListener 就是 ResourceManagerLeaderListener
1 | runAsync( |
进 notifyOfNewResourceManagerLeader()
这个方法里~😯
2.1. notifyOfNewResourceManagerLeader()
2.1.1. 创建 ResourceManager 的地址信息
1 | resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, |
2.1.2. 重连
1 | reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress))); |
虽然方法名是 reconnectToResourceManager,但是第一次连接也走的是这个方法~
关闭和原有的 ResourceManager 的链接
注册
从现在开始计时,如果倒计时 5min 中,到了,还没有注册成功,则意味着注册超时
1
startRegistrationTimeout();
链接新的 ResourceManager
1
tryConnectToResourceManager();
进来
tryConnectToResourceManager()
方法~构建 TaskExecutorRegistration 对象
用于注册,封装了当前 TaskExecutor 的各种信息,到时候,会通过注册发送给 ResourceManager
TaskExecutor 和 ResourceManager 之间的链接对象
启动
1
resourceManagerConnection.start();
创建注册对象
1
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
创建注册对象返回的结果是:
TaskExecutor 注册
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration
JobManager注册
DefaultJobLeaderService.JobManagerRetryingRegistration
开始注册
1
newRegistration.startRegistration();
连接 ResourceManager
1
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
执行注册
第二个参数 1,代表第一次注册,如果注册失败进行重试
1
2
3
4
5
6
7
8rpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {
if(failure != null && !canceled) {
...
// 重试注册
startRegistrationLater(
retryingRegistrationConfiguration.getErrorDelayMillis());
}
}, rpcService.getExecutor());点点点~,来到核心代码~
registerTaskExecutorInternal()
方法-
1
2
3private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
TaskExecutorRegistrationtaskExecutorRegistration)获取 TaskExecutor 的 ResourceID
移除旧的注册对象
构造一个 WorkerRegistration
参数 taskExecutorRegistration 封装要注册的内容,包括 TaskExecutor 硬件抽象
虽然 TaskExecutorRegistration 中封装了注册的内容,但真正的注册对象是 WorkerRegistration,需要将 TaskExecutorRegistration 转为 WorkerRegistration 重新放入 taskExecutors
1
2
3
4
5
6WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(
taskExecutorGateway,
newWorker,
taskExecutorRegistration.getDataPort(),
taskExecutorRegistration.getHardwareDescription()
);真正完成注册
1
2
3log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager",
taskExecutorResourceId, taskExecutorAddress);
taskExecutors.put(taskExecutorResourceId, registration);taskExecutors 是个 map 结构
1
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
一般来说,需要就行注册的主从结构,都会有个类似的结构
三、总结
总结下~,不管是那种集群的注册一般都会进行这些步骤
- 主节点启动注册服务
- 从节点启动
- 先封装自己的信息,通过 RPC 发送给 主节点
- 主节点处理注册,其实就是给这个从节点,分配一个 全局唯一的 ID(从节点启动的时候就已经生成好了),
再通过 ID–> Registrition 注册对象的 这种映射关系保持在 主节点的内存当中。