作为整个运行时的工作节点,TaskManager 提供了作业运行过程中需要的Slot 计算资源,JobManager中提交的Task实例都会运行在TaskManager组件上,我们看看TaskManager的启动源码👀~~~

一、概述

在这个 TaskManager 启动过程中,最重要的事情,就是在 TaskManager初始化了一些基础服务和一些 对外提供服务的核心服务,向 JobManager 进行 TaskManager 的注册,并且在注册成功之后,维持 TaskManager 和 JobManager 的心跳,从JobManager 接收需要部署的任务,使用 Slot 资源启动 Task

flink runtime

二、启动

根据以上的启动脚本分析来到 TaskManagerRunner#main()~~~

看看时序图👀~~~,直接进入 核心代码 runTaskManager()

runTaskManagerSecurely()实例化并启动了 TaskManagerRunner,其中构建 TaskManagerRunner 实例大致做了三件事~

1
2
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId,
pluginManager);

2.1. 启动公共基础服务

2.2. 初始化 TaskManager 运行时服务

前面已经做了各种基础服务的初始化,最终的目的都是帮助我们去启动 TaskManager~~~

1
2
3
4
5
6
7
8
9
10
taskManager = startTaskManager(this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);

2.2.1. 获取资源定义对象 TaskExecutorResourceSpec

TaskExecutorResourceSpec 可以理解是 TaskExecutor 的资源抽象对象

那么这个对象里面所拥有的那些信息都是从这个里面来的 ?

两个方面:

  • 配置 configuration

    搭 Flink 集群的时候会配置集群资源,从 configuration 中抽取 TaskExecutor 的资源配置,最后将配置信息转换为TaskExecutorResourceSpec 对象。

  • 默认值

2.2.2. 运行时服务配置 TaskManagerServicesConfiguration

1
2
3
4
5
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration
.fromConfiguration(configuration, resourceID, externalAddress, localCommunicationOnly,
taskExecutorResourceSpec
);

前面构造的 TaskExecutorResourceSpec 对象作为参数传过来啦~~~

启动 TaskManager 的过程当中所需要的一些东西都被放在这个构造函数里,比如说 configuration 配置,resourceID,资源配置等,也就是说节点启动所需要的一些信息都已经被封装在 TaskManagerServicesConfiguration 这个抽象对象里面了😯~

2.2.3. 初始化 TaskManager 运行时服务

初始化 TaskManager 在运行过程中需要的服务,在TaskManager 启动之前,已经初始化了一些服务组件叫基础服务(共用服务组件),而这里面创建的服务是基于基础服务组件创建的 TaskManager 在运行过程中,真正需要的用来对外提供服务的各种服务组件,不一样哦~

1
2
3
4
5
6
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
blobCacheService.getPermanentBlobService(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);

根据 TaskManagerServicesConfiguration 和 TaskManagerMetricGroup 等参数创建 TaskManagerServices, TaskManagerServices 包含了 TaskExecutor 创建过程中需要的全部服务。

2.3. 创建 TaskExecutor 实例

通过 rpcService、highAvailabilityServices 等基础服务和上一步创建的 TaskManagerServices 创建 TaskExecutor 实例。

TaskExecutor 是 TaskManager 服务的底层实现类,启动 TaskManager 最重要的事情,就是创建启动 TaskExecutor~

2.3.1. 构造器

在构造器里会初始化两个心跳服务~

  1. jobManagerHeartbeatManager

    1
    2
    this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,
    resourceId);
  2. resourceManagerHeartbeatManager

    1
    2
    this.resourceManagerHeartbeatManager =
    createResourceManagerHeartbeatManager(heartbeatServices, resourceId);

2.3.2. onStart()

由于 TaskExecutor 是 RpcEndpoint 的子类,所以TaskExecutor 在实例化之后,会转到 onStart() 方法执行

让我们看看 TaskExecutor 的 onStart()方法~~~

1
startTaskExecutorServices();
  1. 连接 ResourceManager

  2. 启动 TaskSlotTable 服务

    对象中的字段进行初始化

  3. 启动 JobLeaderService 服务

    对象中的字段进行初始化

  4. 启动 FileCache 服务

    用于存储 Task 在执行过程中以 PermanentBlobService 拉取的文件,并将文件展开在 /tmp_/ 路径中,如果 Task 处于非注册状态超过 5 秒,则清理临时文件

startTaskExecutorServices() 后,启动注册超时的检测,默认是 5min,如果超过这个时间还没注册完成,就会抛出异常退出进程,启动失败。

1
startRegistrationTimeout();