Flink-源码学习-集群启动-standalone-taskmanager
作为整个运行时的工作节点,TaskManager 提供了作业运行过程中需要的Slot 计算资源,JobManager中提交的Task实例都会运行在TaskManager组件上,我们看看TaskManager的启动源码👀~~~
一、概述
在这个 TaskManager 启动过程中,最重要的事情,就是在 TaskManager初始化了一些基础服务和一些 对外提供服务的核心服务,向 JobManager 进行 TaskManager 的注册,并且在注册成功之后,维持 TaskManager 和 JobManager 的心跳,从JobManager 接收需要部署的任务,使用 Slot 资源启动 Task
二、启动
根据以上的启动脚本分析来到 TaskManagerRunner#main()
~~~
看看时序图👀~~~,直接进入 核心代码 runTaskManager()
在 runTaskManagerSecurely()
实例化并启动了 TaskManagerRunner,其中构建 TaskManagerRunner 实例大致做了三件事~
1 | final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId, |
2.1. 启动公共基础服务
2.2. 初始化 TaskManager 运行时服务
前面已经做了各种基础服务的初始化,最终的目的都是帮助我们去启动 TaskManager~~~
1 | taskManager = startTaskManager(this.configuration, |
2.2.1. 获取资源定义对象 TaskExecutorResourceSpec
TaskExecutorResourceSpec 可以理解是 TaskExecutor 的资源抽象对象
那么这个对象里面所拥有的那些信息都是从这个里面来的 ?
两个方面:
配置 configuration
搭 Flink 集群的时候会配置集群资源,从 configuration 中抽取 TaskExecutor 的资源配置,最后将配置信息转换为TaskExecutorResourceSpec 对象。
默认值
2.2.2. 运行时服务配置 TaskManagerServicesConfiguration
1 | TaskManagerServicesConfiguration taskManagerServicesConfiguration = |
前面构造的 TaskExecutorResourceSpec 对象作为参数传过来啦~~~
启动 TaskManager 的过程当中所需要的一些东西都被放在这个构造函数里,比如说 configuration 配置,resourceID,资源配置等,也就是说节点启动所需要的一些信息都已经被封装在 TaskManagerServicesConfiguration 这个抽象对象里面了😯~
2.2.3. 初始化 TaskManager 运行时服务
初始化 TaskManager 在运行过程中需要的服务,在TaskManager 启动之前,已经初始化了一些服务组件叫基础服务(共用服务组件),而这里面创建的服务是基于基础服务组件创建的 TaskManager 在运行过程中,真正需要的用来对外提供服务的各种服务组件,不一样哦~
1 | TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( |
根据 TaskManagerServicesConfiguration 和 TaskManagerMetricGroup 等参数创建 TaskManagerServices, TaskManagerServices 包含了 TaskExecutor 创建过程中需要的全部服务。
2.3. 创建 TaskExecutor 实例
通过 rpcService、highAvailabilityServices 等基础服务和上一步创建的 TaskManagerServices 创建 TaskExecutor 实例。
TaskExecutor 是 TaskManager 服务的底层实现类,启动 TaskManager 最重要的事情,就是创建启动 TaskExecutor~
2.3.1. 构造器
在构造器里会初始化两个心跳服务~
jobManagerHeartbeatManager
1
2this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,
resourceId);resourceManagerHeartbeatManager
1
2this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
2.3.2. onStart()
由于 TaskExecutor 是 RpcEndpoint 的子类,所以TaskExecutor 在实例化之后,会转到 onStart() 方法执行
让我们看看 TaskExecutor 的 onStart()
方法~~~
1 | startTaskExecutorServices(); |
连接 ResourceManager
获取 ResourceManager 地址
resourceManagerLeaderRetriever 通过 LeaderRetrievalService 获取 ResourceManager 的 Leader 地址。
在 TaskExecutor 中通过实现 ResourceManagerLeaderListener 来监听 ResourceManager 最新的 Leader 地址,一旦 ResourceManager 地址发生切换,则通知 TaskExecutor 服务,持 TaskExecutor 与ResourceManager 之间的有效通信连接。
向 ResourceManager 注册
引用本站文章Flink-源码-集群启动-taskmanager-注册Joker和 ResourceManager 维持心跳
完成注册后我们需要维持心跳~
引用本站文章Flink-源码-集群启动-taskmanager-心跳Joker
启动 TaskSlotTable 服务
对象中的字段进行初始化
启动 JobLeaderService 服务
对象中的字段进行初始化
启动 FileCache 服务
用于存储 Task 在执行过程中以 PermanentBlobService 拉取的文件,并将文件展开在 /tmp_/ 路径中,如果 Task 处于非注册状态超过 5 秒,则清理临时文件
startTaskExecutorServices() 后,启动注册超时的检测,默认是 5min,如果超过这个时间还没注册完成,就会抛出异常退出进程,启动失败。
1 | startRegistrationTimeout(); |