一、概述

Dispatcher 负责对集群中的作业进行接收和分发处理操作,客户端可以通过与 Dispatcher建立 RPC 连接,将作业提交到集群

二、实现

2.1. 架构设计

2.1.1. Dispatcher

负责对集群中的作业进行接收和分发处理操作,客户端可以通过与 Dispatcher 建立 RPC 连接,将作业过提交到集群 Dispatcher 服务中。

2.1.2. DispatcherRunner

负责启动和管理 Dispatcher 组件,并支持对 Dispatcher 组件的 Leader 选举。当 Dispatcher 集群组件出现异常并停止时,会通过 DispatcherRunner 重新选择和启动新的 Dispatcher 服务,从而保证 Dispatcher 组件的高可用。

DispatcherRunner

2.1.3. DispatcherLeaderProcess

负责管理 Dispatcher 生命周期,同时提供了对 JobGraph 的任务恢复管理功能。如果基于 ZooKeeper 实现了集群高可用,DispatcherLeaderProcess 会将提交的 JobGraph 存储在 ZooKeeper 中,当集群停止或者出现异常时,就会通过 DispatcherLeaderProcess 对集群中的 JobGraph 进行恢复,这些 JobGraph 都会被存储在 JobGraphStore 的实现类中。

2.1.4. DispatcherGatewayService

主要基于 Dispatcher 实现的 GatewayService,用于获取 DispatcherGateway。

DispatcherGatewayService

三、创建 DispatcherRunner

DispatcherRunner 通过 DispatcherRunnerFactory 创建,DispatcherRunnerFactory 中的参数依赖 DispatcherFactory, DispatcherRunner 主要提供了启动 Dispatcher 组件以及 Leader 选举等功能

3.1. DispatcherLeaderProcess 创建和启动

DispatcherLeaderProcess 负责管理 Dispatcher 生命周期,同时提供了对 JobGraph 的任务恢复管理功能,根据不同的集群类型,DispatcherLeaderProcess 分为 SessionDispatcherLeaderProcess 和 JobDispatcherLeaderProcess 两种实现类。

3.1.1. SessionDispatcherLeaderProcess

SessionDispatcherLeaderProcess 中实现了与 Session 集群相关的 Dispatcher 处理逻辑,主要用于对 JobGraphStore 中有储的JobGraph 进行恢复。在非高可用集群下,JobGraphStore 的实现类为 StandaloneJobGraphStore,也就是不对
JobGraph 进行存储和管理;在高可用集群中,JobGraphStore 基于 Zookeeper 存储集群中的 JobGraph,当集群重新启动
后会将 JobGraphStore 中存储的 JobGraph 恢复并创建相应的任务

  1. 启动
    SessionDispatcherLeaderProcess.onStart()方法包含如下步骤:
SessionDispatcherLeaderProcess.onStart()
  • 调用 startService() 方法启动 JobGraphStore 服务

    JobGraphStore 主要用于存储集群中运行的 JobGraph,当系统出现异常时,可以从 JobGraphStore 中获取 JobGraph 并再次提交到 Dispatcher 上运行。

  • 调用 recoverJobsAsync() 方法对 JobGraphStore 中的方法进行恢复

  • 调用 createDispatcherIfRunning() 方法,创建 Dispatcher 并将恢复的 JobGraph 提交到 Dispatcher 上运行

  • 调用 onErrorIfRunning() 方法捕获执行过程中出现的异常并处理。

3.1.2. JobDispatcherLeaderProcess

JobDispatcherLeaderProcess 用于单个 JobGraph 的恢复和提交

  1. 启动

JobDispatcherLeaderProcess 和 SessionDispatcherLeaderProcess 不同,JobDispatcherLeaderProcess 无须恢复 JobGraphStore 中存储的 JobGraph,仅支持恢复当前作业的 JobGraph。

3.2. Dispatcher 创建和启动

当 JobGraph 从 JobGraphStore 中恢复后,会立刻创建和启动 Dispatcher 组件,然后将恢复出来的 JobGraph 提交到 Dispatcher 上运行。如果集群中没有需要恢复的 JobGraph,直接创建并启动 Dispatcher。
在 SessionDispatcherLeaderProcess.createDispatcher() 方法是通过 DefaultDispatcherGatewayServiceFactory#create() 方法创建和启动 Dispatcher,

  1. 调用 dispatcherFactory 创建 dispatcher 组件

参数包括 rpcService、fencingToken 以及从 JobGraphStore 中恢复的 recoveredJobs 集合,还有通过
partialDispatcherServices 和 jobGraphWriter 创建的 PartialDispatcherServicesWithJobGraphStore。其中,DispatcherServices 包含了 Dispatcher 组件用到的服务,Dispatcher组件会在初始化的过程中从 DispatcherServices 获取这些服务,比如 highAvailabilityServices、heartbeatServices等。

dispatcherFactory

  1. dispatcher 创建完毕后,会调用 Dispatcher.start() 方法启动 dispatcher 组件,实际上调用的是 RpcEndpoint.start() 方法启动 Dispatcher 对应 RPC 通信服务