JobManager 根据 JobGragh 生成 ExecutionGragh, ExecutionGragh 是 JobGragh 的并行化版本,是调度层最核心的数据结构。来到 ExecutionGraph 构建源码分析~,有点难~~~

一、概述

JobManager 根据 JobGragh 生成ExecutionGragh, ExecutionGragh是JobGragh的并行化版本,是调度层最核心的数据结构,每个 ExecutionGraph 都有一个与其相关联的作业状态。此作业状态指示作业执行的当前状态

  1. ExecutionJobVertex

    和 JobGraph 中的 JobVertex一一对应。每一个 ExecutionJobVertex 都有和并发度一样多的 ExecutionVertex。

  2. ExecutionVertex

    表示 ExecutionJobVertex 的其中一个并发子任务,输入是ExecutionEdge,输出是 IntermediateResultPartition。

  3. IntermediateResult

    和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个 IntermediateResultPartition,其个数等于该operator的并发度。

  4. IntermediateResultPartition

    表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。

  5. ExecutionEdge

    表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。

  6. Execution

    是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过ExecutionAttemptID 来确定消息接受者。

二、源码

截屏2022-03-22 下午5.26.19

2.1. 接收 JobGraph

从客户端将 JobGraph 对象提交到集群运行时后,集群通过Dispatcher组件接收提交的 JobGraph对象。Dispatcher组件通过JobManagerRunnerFactory创建

Flink-源码-集群启动(1)-JobManager启动.md 里我们说过,WebMonitorEndpoint 用来接收客户端提交的各种 restful 请求,在客户端通过 Flink 脚本提交了各种请求或者在 Flink UI中发出一个人请求,最终这些请求都会被 WebMonitorEndpoint 接收,WebMonitorEndpoint 内部初始化了各种 handler,针对不同请求调用不同 handler 进行处理WebMonitorEndpoint请求

截屏2022-03-21 下午5.06.53

将来自客户端提交应用程序上来,由 JobManager 中的 Netty 服务端的 JobSubmitHandler 来执行处理,所以我们来到 JobSubmitHandler#handleRequest() ~~~

2.1.1. 获取文件

从请求中获取文件,包含 JobGraph 序列化文件

1
final Collection<File> uploadedFiles = request.getUploadedFiles();

2.1.2. 获取请求体

1
final JobSubmitRequestBody requestBody = request.getRequestBody();

2.1.3. 反序列化得到 JobGraph

服务端接收到客户端提交的,其实就是一个 JobGraph,到这儿客户端终于把 JobGraph 提交给 JobManager 了。 最终由 JobSubmitHandler 来执行处理~~~

1
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
  1. 获取 JobGraph 的序列化文件的存储位置
  2. 从 JobGraphFile 文件中,反序列化得到 JobGraph 对象

2.1.4. 获取 jar

1
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

2.1.5. 获取依赖 jar

1
2
Collection<Tuple2<String, Path>> artifacts =
getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

2.1.6. 上传 JobGraph + 程序jar + 依赖 jar

通过 BlobClient 来上传 jar 资源和依赖 jar 和 jobGraph

1
2
3
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture,
jarFiles, artifacts,
configuration);

2.1.7. 提交任务至 Dispatcher

1
2
3
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(
jobGraph -> gateway.submitJob(jobGraph, timeout)
);

注:这儿的 gateway (DispatcherGateway) 可以认为就是 Dispatcher 哦~

2.2. Dispatcher ExecutionGraph 构建

截屏2022-03-21 下午5.13.21

截屏2022-03-21 下午5.19.57

2.2.1. 判断提交的 JobGraph 中 JobID 是否重复

2.2.2. 判断 JobGraph 中是否仅为部分节点配置了资源

JobGraph的节点不支持部分资源配置,因此要么全部节点都配置资源,要么全不配置,否则会抛出异常

2.2.3. internalSubmitJob(jobGraph)

点点点~~~来到 waitForTerminatingJobManager()~~~,主要负责将 JobGraph 异步提交给 Dispatcher~

  • persistAndRunJob(JobGraph jobGraph)

    persistAndRunJob() 方法主要涉及 JobGraph 的持久化及执行操作

    1. 保存 JobGraph

      调用 jobGraphWriter.putJobGraph() 方法对 JobGraph 进行持久化,如果基于 ZooKeeper 实现了集群高可用,则将 JobGraph 记录到 ZooKeeperJobGraphStore 中,异常情况下会通过ZooKeeperJobGraphStore恢复作业。

      1
      jobGraphWriter.putJobGraph(jobGraph);
    2. 运行 job

      执行 jobGraph,然后返回 CompletableFuturerunJobFuture 对象。

      1
      final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

      点进来~~~

      1. 创建 JobManagerRunner
      2. 防止 Job 重复执行
      3. startJobManagerRunner

      首先让我们来看看~创建 JobManagerRunner

      1
      2
      final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
      createJobManagerRunner(jobGraph);

      这儿 JobManagerRunner指的是 JobMaster ~🤔️

      截屏2022-03-21 下午8.45.21

      点点点~来到DefaultJobMasterServiceFactory#createJobMasterService()

      截屏2022-03-22 下午4.41.11

      在 JobMaster 初始化过程中 会将 JobGraph转为ExecutionGraph~~~

      同时JobMaster 实现了 RpcEndpoint 接口,所以在实例化之后会自动调用 JobMaster#onStart(),可是在JobMaster和 FencedRpcEndpoint 都没有实现 onStart() 方法~~~,不用看啦~

      截屏2022-03-21 下午8.49.10

2.3. ExecutionGraph 构建

在 JobMaster 初始化过程中,SchedulerBase#createAndRestoreExecutionGraph()会将 JobGraph转为ExecutionGraph~~~

截屏2022-03-21 下午8.58.30

先来看看 DefaultSchedulerFactory#createInstance()~~~

2.3.1. DefaultSchedulerFactory#createInstance()

  1. 设置调度策略

  2. 设置重启策略

  3. 构造 SlotProviderStrategy

  4. 实例化调度器

点点点~~~,顺着时序图来到 ExecutionGraphBuilder.java#buildGraph() ,该方法是将 JobGraph 转为 ExecutionGraph的核心代码~~~

2.3.2. ExecutionGraphBuilder.java#buildGraph()

  1. 从 JobGraph 中获取 JobName 和 JobID

    截屏2022-03-22 下午5.00.13

  2. 构建包含 job 信息的 JobInformation 对象

    1
    2
    3
    4
    final JobInformation jobInformation = 
    new JobInformation(jobId, jobName, jobGraph.getSerializedExecutionConfig(),
    jobGraph.getJobConfiguration(), jobGraph.getUserJarBlobKeys(),
    jobGraph.getClasspaths());
  3. 释放 IntermediateResultPartition 的策略

    1
    2
    final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
    PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
  4. 初始化 ExecutionGraph 对象

  5. 设置 ExecutionGraph 基本属性

    以 Json 格式

    1
    executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));

    截屏2022-03-22 下午5.09.37

  6. 遍历 JobVertex 并进行初始化

    • 获取 JobVertex 的 invokableClassName
    • vertex.initializeOnMaster(classLoader)
  7. executionGraph.attachJobGraph(sortedTopology)

    来啦来啦,最核心的代码~,方法主体在一个 for 循环里,遍历所有的 JobVertex

    • 一个 JobVertex 对应的创建一个 ExecutionJobVertex

      1
      2
      3
      ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1,
      maxPriorAttemptsHistoryLength, rpcTimeout,
      globalModVersion, createTimestamp);
    • ejv.connectToPredecessors(this.intermediateResults)

      处理 JobEdge 和 IntermediateResult 和 ExecutionJobVertex中的 ExecutionVertex,对每个 JobEdge,获取对应的 IntermediateResult,并记录到本节点的输入上,最后,把每个 ExecutorVertex 和对应的 IntermediateResult 关联起来

      截屏2022-03-22 下午5.26.19

      方法主体同样在一个 for 循环里,遍历 jobVertex 的所有输入 JobEdge

      如图获取 FlatMap的输入边 JobEdge

      截屏2022-03-22 下午5.34.56

      • 遍历 JobEdge

      • 获取到 JobEdge 连接的 IntermediateResult

        截屏2022-03-22 下午5.38.08

      • 将当前 IntermediateResult 作为 ExecutionJobVertex 的输入

        1
        this.inputs.add(ires);
      • 根据并行度来设置 ExecutionVertex

        我们的并行度为2~~~

        截屏2022-03-22 下午5.41.31

    • 将生成好的 ExecutionJobVertex 加入到 ExecutionGraph 中

      1
      ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
    • 将当前 JobVertex 的输入 IntermediateResult 加入到 intermediateResults map 中

      1
      2
      3
      4
      for(IntermediateResult res : ejv.getProducedDataSets()) {
      IntermediateResult previousDataSet =
      this.intermediateResults.putIfAbsent(res.getId(), res);
      }
    • 顺序保存 ExecutionJobVertex

      1
      this.verticesInCreationOrder.add(ejv);
    • 获取总并行度

    • ExecutionJobVertex 加入 newExecJobVertices List 中

  8. 设置 checkpoint

  9. 设置状态后端

到这基本完成了将 JobGraph 转换为 ExecutionGraph 的过程, JobGraph 转换为 ExecutionGraph 的过程是实例化 JobMaster 核心之一

截屏2022-03-22 下午4.41.11

在创建 JobManagerRunner 结束之后~~~,我们开始 startJobManagerRunner 啦~~~

2.4. 运行 JobManagerRunner

这儿 JobManagerRunner 指的就是 JobMaster~

客户端正常提交一个 job 的时候,最终由集群主节点中的 Dispatcher 接收继续提交执行,回到 Dispatcher#runJob()

JobMaster 已经实例化结束,接下来将任务提交到 JobMaster 开始运行~~~

将创建的 jobManagerRunnerFuture 对象添加到 jobManagerRunnerFutures 中,防止 Job 重复执行。

1
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

提交任务

1
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

因为 JobMaster 服务实现了高可用,所以在启动它时,会通过调用 leaderElectionService 启动LeaderContender 实现类 JobManagerRunnerImpl。

1
leaderElectionService.start(this);

这种格式我们见过好多次了~,选举成功,则跳转到 ZooKeeperLeaderElectionService 的 isLeader() 方法来到 isLeader()~,这个方法我们来过好几次了😯~

截屏2022-03-21 下午8.19.06

这次轮到 JobManagerRunnerImpl

截屏2022-03-21 下午8.17.49

grantLeadership() 最终会调用verifyJobSchedulingStatusAndStartJobManager()方法启动 JobMaster服务

来到 verifyJobSchedulingStatusAndStartJobManager()~~~

2.4.1. 获取 Job 的调度状态

1
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

2.4.2. 启动 JobMaster

首先判断返回的 JobSchedulingStatus 是否为 JobSchedulingStatus.DONE,如果是,则表明 Job 已经被其他的 JobMaster 执行过,此时调用 jobAlreadyDone()方法处理后续流程;否则调用创建并启动新的JobMaster服务。然后通过该 JobMaster 调度和执行 JobGraph 中的Task实例

1
2
3
4
5
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
  • 在 runningJobsRegistry 中注册当前的 JobId 信息,当 JobMaster 启动好了之后,更改 Job 状态为 Running

    1
    runningJobsRegistry.setJobRunning(jobGraph.getJobID());
  • 启动 JobMaster

    1
    startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
    • 启动 JobMaster 对应的 RPC 服务

    • 执行 JobGraph

      1
      callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);