ResourceManager 作为统一的集群资源管理器,用于管理整个集群的计算资源,包括 CPU 资源、内存资源等。同时,ResourceManager 负责向集群资源管理器中申请容器资源启动 TaskManager 实例,并对 TaskManager 进行集中管理。当
新的作业提交到集群后,JobManager 会向 ResourceManager 申请作业执行需要的计算资源,进而完成整个作业的运行

让我们看看注册机制源码👀😈~~~

一、概述

当 TaskManager 所在的RPC 服务启动后,TaskManager 会和 ResourceManager之间创建RPC连接,此时TaskManager将自身的信息注册到 ResourceManager 中,并长期保持 与 ResourceManager 之间的心跳连接。ResourceManager 接收到TaskManager的注册信息后,将TaskManager资源信息存储在 SlotManager 服务中进行管理。

二、源码

在TaskExecutor.startTaskExecutorServices()方法中,主要通过

1
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

代码启动 TaskManager 和 ResourceManager 之间的RPC连接

截屏2022-03-23 上午10.52.05

在 实例化 TaskExecutor时,构造方法里会实例化 resourceManagerLeaderRetriever,

haServices.getResourceManagerLeaderRetriever() 基于 Zookeeper 实现会返回一个ZooKeeperLeaderRetrievalService 对象

截屏2022-03-20 上午7.20.36

截屏2022-03-20 上午7.24.15

ZooKeeperLeaderRetrievalService 对象实现了 LeaderRetrievalService 和 NodeCacheListener 接口,NodeCacheListener 是 curator 提供的监听器,而 resourceManagerLeaderRetriever.start()方法等同于启动监听,当指定的 Zookeeper znode 节点数据发生改变,则会接收到通知自动回调 ZooKeeperLeaderRetrievalService#nodeChanged() 方法

截屏2022-03-20 上午7.19.22

所以让我们看看 ZooKeeperLeaderRetrievalService#nodeChanged()~当监听响应的时候,如 ResourceManager 发生过变更, 或者是第一次启动,就会自动调用这个方法

等同于 Zookeeper 中 watcher 的 process() 方法

截屏2022-03-20 上午7.45.31

nodeChanged() 会调用对应的 LeaderRetrievalListener#notifyIfNewLeaderAddress() 方法

1
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);

参数 leaderAddress 就是 ResourceManager 的地址

notifyIfNewLeaderAddress() 里又调用了 leaderListener.notifyLeaderAddress 方法,当 ResourceManager 变更的时候,执行 ResourceManager 的重连

这儿的 leaderListener 就是 ResourceManagerLeaderListener

1
2
3
runAsync(
() -> notifyOfNewResourceManagerLeader(leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID))
);

notifyOfNewResourceManagerLeader() 这个方法里~😯

2.1. notifyOfNewResourceManagerLeader()

2.1.1. 创建 ResourceManager 的地址信息

1
2
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress,
newResourceManagerId);

截屏2022-03-23 上午11.01.26

2.1.2. 重连

1
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));

虽然方法名是 reconnectToResourceManager,但是第一次连接也走的是这个方法~

  1. 关闭和原有的 ResourceManager 的链接

  2. 注册

    从现在开始计时,如果倒计时 5min 中,到了,还没有注册成功,则意味着注册超时

    1
    startRegistrationTimeout();
  3. 链接新的 ResourceManager

    1
    tryConnectToResourceManager();

    截屏2022-03-20 上午8.01.20

    进来 tryConnectToResourceManager() 方法~

    • 构建 TaskExecutorRegistration 对象

      用于注册,封装了当前 TaskExecutor 的各种信息,到时候,会通过注册发送给 ResourceManager

      截屏2022-03-23 上午11.04.27

    • TaskExecutor 和 ResourceManager 之间的链接对象

      截屏2022-03-23 上午11.05.22

    • 启动

      1
      resourceManagerConnection.start();
      • 创建注册对象

        1
        final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();

        创建注册对象返回的结果是:

        • TaskExecutor 注册

          TaskExecutorToResourceManagerConnection.ResourceManagerRegistration

          截屏2022-03-23 上午11.15.01

        • JobManager注册

          DefaultJobLeaderService.JobManagerRetryingRegistration

      • 开始注册

        1
        newRegistration.startRegistration();
        • 连接 ResourceManager

          1
          rpcGatewayFuture = rpcService.connect(targetAddress, targetType);

          截屏2022-03-23 上午11.09.10

        • 执行注册

          截屏2022-03-23 上午11.17.34

          第二个参数 1,代表第一次注册,如果注册失败进行重试

          1
          2
          3
          4
          5
          6
          7
          8
          rpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {
          if(failure != null && !canceled) {
          ...
          // 重试注册
          startRegistrationLater(
          retryingRegistrationConfiguration.getErrorDelayMillis());
          }
          }, rpcService.getExecutor());

          点点点~,来到核心代码~ registerTaskExecutorInternal() 方法

          1

                                    -
          
          1
          2
          3
          private RegistrationResponse registerTaskExecutorInternal(
          TaskExecutorGateway taskExecutorGateway,
          TaskExecutorRegistrationtaskExecutorRegistration)
          • 获取 TaskExecutor 的 ResourceID

          • 移除旧的注册对象

          • 构造一个 WorkerRegistration

            参数 taskExecutorRegistration 封装要注册的内容,包括 TaskExecutor 硬件抽象

            截屏2022-03-20 上午8.15.03

            虽然 TaskExecutorRegistration 中封装了注册的内容,但真正的注册对象是 WorkerRegistration,需要将 TaskExecutorRegistration 转为 WorkerRegistration 重新放入 taskExecutors

            1
            2
            3
            4
            5
            6
            WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(
            taskExecutorGateway,
            newWorker,
            taskExecutorRegistration.getDataPort(),
            taskExecutorRegistration.getHardwareDescription()
            );
          • 真正完成注册

            1
            2
            3
            log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager",
            taskExecutorResourceId, taskExecutorAddress);
            taskExecutors.put(taskExecutorResourceId, registration);

            taskExecutors 是个 map 结构

            1
            private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;

            一般来说,需要就行注册的主从结构,都会有个类似的结构

三、总结

总结下~,不管是那种集群的注册一般都会进行这些步骤

  1. 主节点启动注册服务
  2. 从节点启动
  3. 先封装自己的信息,通过 RPC 发送给 主节点
  4. 主节点处理注册,其实就是给这个从节点,分配一个 全局唯一的 ID(从节点启动的时候就已经生成好了),
    再通过 ID–> Registrition 注册对象的 这种映射关系保持在 主节点的内存当中。