TaskManagerServices.fromConfiguration() 进来之后,然后这个里面呃做的事挺多挺多😯~~~

一、概述

根据已有信息创建 TaskManagerServicesConfiguration 实例,这些参数都会用于创建 TaskExecutor 实例,TaskManagerServices 包含了 TaskExecutor 创建过程中需要的全部服务,如 shuffleEnvironment、jobLeaderService 等。

二、源码

开始源码啦~~~

2.1. 工作目录检查

检查 temp.dir 目录是否存在,如果不存在会创建文件夹

1
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

flink.yaml 中 io.tmp.dirs 值

  1. 目录是否存在

    1
    2
    3
    4
    5
    if(!file.exists()) {
    if(!file.mkdirs()) {
    throw new IOException("");
    }
    }
  2. 是否是目录

    1
    2
    3
    if(!file.isDirectory()) {
    throw new IOException("");
    }
  3. 是否可写入

    1
    2
    3
    if(!file.canWrite()) {
    throw new IOException("");
    }

2.2. 初始化 TaskEventDispatcher

任务事件分发器,从消费者任务分发事件给生产者任务

1
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

TaskEventDispatcher 负责 从 消费Task 发送 Task消费结果 给 上游生产Task

不管哪儿遇到 *Dispatcher 一般都是用来调度哒😯~

2.3. 初始化 IOManagerASync

通过一个异步IO 的 方式来实现数据的流转。

1
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

目前 flink 只有这一种异步的IO管理器

2.3.1. 启动 WriterThread

根据配置的 tmp.dir 的个数,创建对应数量的读写线程,负责异步读写数据

1
2
3
4
5
6
7
8
9
this.writers = new WriterThread[tempDirs.length];
for (int i = 0; i < this.writers.length; i++) {
final WriterThread t = new WriterThread();
this.writers[i] = t;
t.setName("IOManager writer thread #" + (i + 1));
t.setDaemon(true);
t.setUncaughtExceptionHandler(this);
t.start();
}

2.3.2. 启动 ReaderThread

启动一定数量的 ReaderThread

1
2
3
4
5
6
7
8
9
this.readers = new ReaderThread[tempDirs.length];
for (int i = 0; i < this.readers.length; i++) {
final ReaderThread t = new ReaderThread();
this.readers[i] = t;
t.setName("IOManager reader thread #" + (i + 1));
t.setDaemon(true);
t.setUncaughtExceptionHandler(this);
t.start();
}

2.4. createShuffleEnvironment ( 炒鸡重要~‼️ )

上游stream task 和下游 stream task 之间有shuffle 动作,在这个过程中肯定需要很多的一些组件来执行处理,来为其服务现在创建的 NettyShuffleEnvironment 对象,就可以提供各种组件创建的支撑,跟shuffle 有关的一些各种组件都是通过ShuffleEnvironment 拿到的,比如把数据写到内存或者写到磁盘。

  1. 构建 Shuffle 上下文对象: ShuffleEnvironmentContext

  2. 通过反射拿到 NettyShuffleServiceFactory 对象

2.5. shuffleEnvironment.start();

启动过程中,启动了 Netty 服务端 和 客户端

  • 启动了 Netty 服务端
  • 启动了 Netty 客户端

一个比较复杂的一个 Flink job 执行的时候,就是一个 stream task,在 stream task 的执行过程当中,上游task 跟下游task 的在进行这个 shuffle 过程当中,相互之间建立连接,从上游算子拿到数据,把上游算子计算得到数据,发送到下游算子。

所以算子之间传递数据需要客户端与服务端,类似客户端将数据发送到服务端~,每个 task 既要接收数据又要发送数据,所以需要同时启动 netty 客户端和 netty 服务端~

2.6. 状态管理服务

这个是状态管理的一个服务

1
2
3
final KvStateService kvStateService =
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();

2.7. 创建广播变量管理器

Flink 支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中, 这样可以减缓大量的 shuffle 操作

1
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

2.8. 创建 TaskSlotTable (炒鸡重要‼️)

创建 TaskSlotTable,维护 task 和 slot 的分配关系

1
2
3
4
5
6
final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);

那么简单来说就是这一句方法是帮我们去构造一个task slot table。

TaskSlotTable 用来干嘛的呢?

一个节点能提供很多的 slot 的,也会执行很多的task。
好,那么现在就有个问题了 :)

那么到底哪些 task 对应到哪些 slot 来执行呢 ?
Flink 在逻辑上就是一张表来进行管理,TaskSlotTable 在于 taskmanager 的内部的作用就是把当前这个节点的这个 slot 跟要执行的 task 的映射关系管理起来,这样的话 task 执行结束之后就可以知道哪些 slot 可以被释放,同理申请 slot 会从这个组件里面去释放一些 slot 出来~

TaskSlotTable 从不同的维度维护 slot 信息,如 JobID 和 slot 的关系,AllocationID 与 slot 的关系,其具体存储的方式是 map,这样根据不同的 key 就可以很快的获取到 slot 信息。其中,对于那些已分配但是无法分配到具体 JobManager 的slot 会启动一个定时任务,若是超时会释放 slot 以免内存泄漏。

  • TaskSlotTable 的初始化过程是在 TaskManagerServices#fromConfiguration 方法中被初始化的
  • TaskSlotTable 的启动时在 TaskExecutor#startTaskExecutorServices()

2.9. 初始化 JobTable

创建 JobManagerTable,维护 JobId 和 JobManager 的对应关系,在此仅仅是初始了默认的 DefaultJobTable。JobTable 的初始过程在 ResourceManager 向 TaskManager 请求 slot 的过程中初始化的

1
final JobTable jobTable = DefaultJobTable.create();

DefaultJobTable 简单来说就是用来管理 job 信息,有哪些 job 的 task 在我们当前这个节点当中执行。

JobTable 的任务是用来管理一个 Job 在 TaskExecutor 上的生命周期。其主要是维护了两个 Map: JobID 与JobConnection、ResourceId 与 JobID。其中 Job 接口反映了 job 和 JobMaster 的 connect。

2.10. 初始化 JobLeaderService

创建 JobLeaderService (JobMaster) 服务,一旦某个 jobmanager 获得 leader 角色,或者失去 leader 状态,会通知 JobLeaderListener

1
2
3
4
final JobLeaderService jobLeaderService = new DefaultJobLeaderService(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getRetryingRegistrationConfiguration()
);

2.11. 初始化 StateRootDirectory 和 StateRootDirectoryFile

Flink 在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互, state 中存储着每条数据消费后数据的消费点(生产环境需要持久化这些状态),当 Job 因为某种错误或者其他原因导致重启时,就能够从 checkpoint 中的 state 数据进行恢复

本地状态保存根路径 taskmanager.state.local.root-dirs

执行完后会生成 localState 目录,

2.12. 初始化 TaskExecutorLocalStateStoresManager

创建任务状态管理器

1
2
3
4
5
final TaskExecutorLocalStateStoresManager taskStateManager = new
TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, ioExecutor);
final boolean failOnJvmMetaspaceOomError = taskManagerServicesConfiguration.getConfiguration()
.getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);

2.13. 初始化 LibraryCacheManager

用来缓存下载得到的目标 jar 包,具体实现类为BlobLibraryCacheManage

1
2
3
4
5
6
7
final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
permanentBlobService,
BlobLibraryCacheManager.defaultClassLoaderFactory(
taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
failOnJvmMetaspaceOomError ? fatalErrorHandler : null)
);

2.14. 返回 TaskManagerServices

初始化的各种服务组件全部包装在 TaskManagerServices 对象里返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
return new TaskManagerServices(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher,
ioExecutor,
libraryCacheManager
);