Flink-源码学习-集群启动-taskmanager-TaskManagerServices
TaskManagerServices.fromConfiguration() 进来之后,然后这个里面呃做的事挺多挺多😯~~~
一、概述
根据已有信息创建 TaskManagerServicesConfiguration 实例,这些参数都会用于创建 TaskExecutor 实例,TaskManagerServices 包含了 TaskExecutor 创建过程中需要的全部服务,如 shuffleEnvironment、jobLeaderService 等。
二、源码
开始源码啦~~~
2.1. 工作目录检查
检查 temp.dir
目录是否存在,如果不存在会创建文件夹
1 | checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); |
flink.yaml
中 io.tmp.dirs 值
目录是否存在
1
2
3
4
5if(!file.exists()) {
if(!file.mkdirs()) {
throw new IOException("");
}
}是否是目录
1
2
3if(!file.isDirectory()) {
throw new IOException("");
}是否可写入
1
2
3if(!file.canWrite()) {
throw new IOException("");
}
2.2. 初始化 TaskEventDispatcher
任务事件分发器,从消费者任务分发事件给生产者任务
1 | final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); |
TaskEventDispatcher 负责 从 消费Task 发送 Task消费结果 给 上游生产Task
不管哪儿遇到 *Dispatcher 一般都是用来调度哒😯~
2.3. 初始化 IOManagerASync
通过一个异步IO 的 方式来实现数据的流转。
1 | final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); |
目前 flink 只有这一种异步的IO管理器
2.3.1. 启动 WriterThread
根据配置的 tmp.dir
的个数,创建对应数量的读写线程,负责异步读写数据
1 | this.writers = new WriterThread[tempDirs.length]; |
2.3.2. 启动 ReaderThread
启动一定数量的 ReaderThread
1 | this.readers = new ReaderThread[tempDirs.length]; |
2.4. createShuffleEnvironment ( 炒鸡重要~‼️ )
上游stream task 和下游 stream task 之间有shuffle 动作,在这个过程中肯定需要很多的一些组件来执行处理,来为其服务现在创建的 NettyShuffleEnvironment 对象,就可以提供各种组件创建的支撑,跟shuffle 有关的一些各种组件都是通过ShuffleEnvironment 拿到的,比如把数据写到内存或者写到磁盘。
构建 Shuffle 上下文对象: ShuffleEnvironmentContext
通过反射拿到 NettyShuffleServiceFactory 对象
2.5. shuffleEnvironment.start();
启动过程中,启动了 Netty 服务端 和 客户端
- 启动了 Netty 服务端
- 启动了 Netty 客户端
一个比较复杂的一个 Flink job 执行的时候,就是一个 stream task,在 stream task 的执行过程当中,上游task 跟下游task 的在进行这个 shuffle 过程当中,相互之间建立连接,从上游算子拿到数据,把上游算子计算得到数据,发送到下游算子。
所以算子之间传递数据需要客户端与服务端,类似客户端将数据发送到服务端~,每个 task 既要接收数据又要发送数据,所以需要同时启动 netty 客户端和 netty 服务端~
2.6. 状态管理服务
这个是状态管理的一个服务
1 | final KvStateService kvStateService = |
2.7. 创建广播变量管理器
Flink 支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中, 这样可以减缓大量的 shuffle 操作
1 | final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager(); |
2.8. 创建 TaskSlotTable (炒鸡重要‼️)
创建 TaskSlotTable,维护 task 和 slot 的分配关系
1 | final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable( |
那么简单来说就是这一句方法是帮我们去构造一个task slot table。
TaskSlotTable 用来干嘛的呢?
一个节点能提供很多的 slot 的,也会执行很多的task。
好,那么现在就有个问题了 :)
那么到底哪些 task 对应到哪些 slot 来执行呢 ?
Flink 在逻辑上就是一张表来进行管理,TaskSlotTable 在于 taskmanager 的内部的作用就是把当前这个节点的这个 slot 跟要执行的 task 的映射关系管理起来,这样的话 task 执行结束之后就可以知道哪些 slot 可以被释放,同理申请 slot 会从这个组件里面去释放一些 slot 出来~
TaskSlotTable 从不同的维度维护 slot 信息,如 JobID 和 slot 的关系,AllocationID 与 slot 的关系,其具体存储的方式是 map,这样根据不同的 key 就可以很快的获取到 slot 信息。其中,对于那些已分配但是无法分配到具体 JobManager 的slot 会启动一个定时任务,若是超时会释放 slot 以免内存泄漏。
- TaskSlotTable 的初始化过程是在
TaskManagerServices#fromConfiguration
方法中被初始化的 - TaskSlotTable 的启动时在
TaskExecutor#startTaskExecutorServices()
中
2.9. 初始化 JobTable
创建 JobManagerTable,维护 JobId 和 JobManager 的对应关系,在此仅仅是初始了默认的 DefaultJobTable。JobTable 的初始过程在 ResourceManager 向 TaskManager 请求 slot 的过程中初始化的
1 | final JobTable jobTable = DefaultJobTable.create(); |
DefaultJobTable 简单来说就是用来管理 job 信息,有哪些 job 的 task 在我们当前这个节点当中执行。
JobTable 的任务是用来管理一个 Job 在 TaskExecutor 上的生命周期。其主要是维护了两个 Map: JobID 与JobConnection、ResourceId 与 JobID。其中 Job 接口反映了 job 和 JobMaster 的 connect。
2.10. 初始化 JobLeaderService
创建 JobLeaderService (JobMaster) 服务,一旦某个 jobmanager 获得 leader 角色,或者失去 leader 状态,会通知 JobLeaderListener
1 | final JobLeaderService jobLeaderService = new DefaultJobLeaderService( |
2.11. 初始化 StateRootDirectory 和 StateRootDirectoryFile
Flink 在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互, state 中存储着每条数据消费后数据的消费点(生产环境需要持久化这些状态),当 Job 因为某种错误或者其他原因导致重启时,就能够从 checkpoint 中的 state 数据进行恢复
本地状态保存根路径 taskmanager.state.local.root-dirs
执行完后会生成 localState 目录,
2.12. 初始化 TaskExecutorLocalStateStoresManager
创建任务状态管理器
1 | final TaskExecutorLocalStateStoresManager taskStateManager = new |
2.13. 初始化 LibraryCacheManager
用来缓存下载得到的目标 jar 包,具体实现类为BlobLibraryCacheManage
1 | final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( |
2.14. 返回 TaskManagerServices
初始化的各种服务组件全部包装在 TaskManagerServices 对象里返回
1 | return new TaskManagerServices( |