一、概述

领导选举机制(Leader Election)可以保证集群虽然存在多个 Master,但是只有一个 Master 处于激活(Active)状态,其他的 Master 处于支持(Standby)状态。当 Active 状态的 Master 出现故障时,会选举出一个 Standby 状态的 Master 作为新的 Active 状态的 Master。由于整个集群的 Worker, Driver 和 Application 的信息都已经通过持久化引擎持久化,因此切换 Master 时只会影响新任务的提交,对于正在运行中的任务没有任何影响。

二、LeaderElectionAgent

特质 LeaderElectionAgent 定义了对当前的 Master 进行跟踪和领导选举代理的通用接口。

1
2
3
4
trait LeaderElectionAgent {
val masterInstance: LeaderElectable
def stop(): Unit = {} // to avoid noops in implementations.
}

2.1. 属性

2.1.1. masterInstance

masterInstance 属性类型是 LeaderElectable

1
2
3
4
trait LeaderElectable {
def electedLeader(): Unit
def revokedLeadership(): Unit
}

LeaderElectable 定义了两个方法。

  • $electedLeader()$: 被选举为领导。
  • $revokedLeadership()$: 撤销领导关系。

LeaderElectable 目前只有 Master 这一个实现类

2.2. 实现

LeaderElectionAgent 一共有三种实现。由于 CustomLeaderElectionAgent 用于单元测试

2.2.1. ZooKeeperLeaderElectionAgent

ZooKeeperLeaderElectionAgent 是借助 ZooKeeper 实现的领导选举代理,采用 ZooKeeper 作 HA, ZooKeeper 会自动化管理 Masters 的切换, ZooKeeper 会保存整个 Spark 集群运行时的元数据, 包括 Workers、Drivers、Applications、Executors。

ZooKeeper 遇到当前 Active 级别的 Master 出现故障时会从 Standby Masters 中选取出一台作为 Active Master, 但是要注意, 被选举后到成为真正的 Active Master 之前需要从 ZooKeeper 中获取集群当前运行状态的元数据信息并进行恢复。

在 Master 切换的过程中, 所有已经在运行的程序皆正常运行。因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源, 所以在运行时,Job 本身的调度和处理和 Master 是没有任何关系的。

在 Master 的切换过程中唯一的影响是不能提交新的 Job:

  1. 一方面不能够提交新的应用程序给集群, 因为只有 Active Master 才能接收新的程序提交请求
  2. 已经运行的程序中也不能因为 Action 操作触发新的 Job 提交请求。
ZooKeeperLeaderElectionAgent
  1. 属性

    • WORKING_DIR: ZooKeeperLeaderElectionAgent 在 ZooKeeper 上的工作目录,是 Spark 基于 ZooKeeper 进行热备的根节点(可通过 spark.deploy.zooKeeper.dir 属性配置,默认为 spark)的子节点 leader_election

    • zk: 连接 ZooKeeper 的客户端,类型为 CuratorFramework

    • leaderLatch: 使用 ZooKeeper 进行领导选举的客户端,类型为 LeaderLatch

    • status: 领导选举的状态,包括有领导 (LEADER) 和无领导 (NOT_LEADER)

  2. 方法

    • $start()$: 启动基于 ZooKeeper 的领导选举代理

      1
      2
      3
      4
      5
      6
      private def start() {
      zk = SparkCuratorUtil.newClient(conf)
      leaderLatch = new LeaderLatch(zk, WORKING_DIR)
      leaderLatch.addListener(this)
      leaderLatch.start()
      }

      ZooKeeperLeaderElectionAgent 实现了 LeaderLatchListener,因此发生领导选举后,LeaderLatch 会回调 ZooKeeperLeaderElectionAgent 的 $isLeader$ 或 $notLeader$ 方法。

    • $updateLeadershipStatus()$: 更新领导关系状态

      1
      2
      3
      4
      5
      6
      7
      8
      9
      private def updateLeadershipStatus(isLeader: Boolean) {
      if (isLeader && status == LeadershipStatus.NOT_LEADER) { // 选举为领导
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
      } else if (! isLeader && status == LeadershipStatus.LEADER) {// 撤销领导职务
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
      }
      }
      • 如果 Master 节点之前不是领导(NOT_LEADER),当被选为领导 Leader 时,将其状态设置为 LEADER 并调用 $masterInstance.electedLeader()$ 方法将Master 节点设置为 Leader。

        1
        2
        3
        override def electedLeader(){
        self.send(ElectedLeader)
        }

        Master的 $receive()$ 方法中实现了对 ElectedLeader 消息的处理, Master 处理 ElectedLeader 消息的步骤如下:

        1. 从持久化引擎 PersistenceEngine 中读取出持久化的 ApplicationInfo、DriverInfo、WorkerInfo 等信息。

          1
          val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
        2. 如果没有任何持久化信息,则将 Master 的当前状态设置为 ALIVE,否则将 Master 的当前状态设置为 RECOVERING

          1
          2
          3
          4
          5
          state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
          RecoveryState.ALIVE
          } else {
          RecoveryState.RECOVERING
          }
        3. 如果 Master 的当前状态为 RECOVERING, 则调用 $beginRecovery()$ 方法对整个集群的状态进行恢复。在集群状态恢复完成后,创建延时任务 recoveryCompletionTask, 在 WORKER_TIMEOUT_MS 指定的时间后向 Master 自身发送 CompleteRecovery 消息。

          1
          2
          3
          4
          5
          6
          7
          8
          if (state == RecoveryState.RECOVERING) {
          beginRecovery(storedApps, storedDrivers, storedWorkers)
          recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
          self.send(CompleteRecovery)
          }
          }, workerTimeoutMs, TimeUnit.MILLISECONDS)
          }
          • 集群的状态恢复: $beginRecovery()$

            • 遍历从持久化引擎中读取 ApplicationInfo

              1. 调用 $registerApplication()$ 方法将 ApplicationInfo 添加到 apps、idToApp、endpointToApp、 addressToApp、waitingApps 等缓存中。

              2. 将 ApplicationInfo 的状态设置为 UNKNOWN

              3. 向提交应用程序的 Driver 发送 MasterChanged 消息(此消息将携带被选举为领导的 Master 和此 Master 的 masterWebUiUrl 属性)

                Driver 接收到 MasterChanged 消息后,将自身的 master 属性修改为当前 Master 的 RpcEndpointRef, 并将 alreadyDisconnected 设置为 false, 最后向 Master 发送 MasterChangeAcknowledged 消息。Master 接收到 MasterChangeAcknowledged 消息后将 ApplicationInfo 的状态修改为 WAITING, 然后在不存在状态为 UNKNOWN 的 ApplicationInfo 和 WorkerInfo 时调用 completeRecovery() 方法完成恢复。

            • 恢复 Driver

              遍历从持久化引擎中读取的 DriverInfo, 将每个 DriverInfo 添加到 drivers 缓存

              1
              2
              3
              for (driver <- storedDrivers) {
              drivers += driver
              }
            • 恢复 Worker

              遍历从持久化引擎中读取的 WorkerInfo

              1
              2
              3
              4
              5
              6
              7
              8
              9
              for (worker <- storedWorkers) {
              try {
              registerWorker(worker)
              worker.state = WorkerState.UNKNOWN
              worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
              } catch {
              //...
              }
              }
              • 调用 $registerWorker()$ 方法将 WorkerInfo 添加到 workers、idToWorker、addressToWorker 等缓存中

              • 将 WorkerInfo 的状态修改为 UNKNOWN

              • 向 Worker 发送 MasterChanged 消息(此消息将携带被选举为领导的 Master 和此 Master 的 masterWebUiUrl 属性)

                Worker 接收到 MasterChanged 消息后, 将自身的 activeMasterUrl、activeMasterWebUiUrl、master 等属性修改为当前 Master 的对应信息, 然后将 connected 设置为 true, 最后向 Master 发送 WorkerSchedulerStateResponse 消息。

                Master 接收到 WorkerSchedulerStateResponse 消息后, 首先将 WorkerInfo 的状态修改为 ALIVE, 然后对此 Worker 上的 Executor 和 Driver 也进行恢复, 最后在不存在状态为 UNKNOWN 的 ApplicationInfo 和 WorkerInfo 的调用 $completeRecovery()$ 方法完成恢复。

                1. 如果 Master 的状态不是 RECOVERING, 直接返回。
                2. 将 Workers 中状态为 UNKNOWN 的所有 WorkerInfo, 通过调用 removeWorker 方法从 Master 中移除
                3. 将 apps 中状态为 UNKNOWN 的所有 ApplicationInfo,通过调用 $finishApplication()$ 方法从 Master 中移除
                4. 从 drivers 中过滤出还没有分配 Worker 的所有 DriverInfo, 如果 Driver 是被监管的,则调用 $relaunchDriver()$ 方法重新调度运行指定的 Driver, 否则调用 $removeDriver()$ 方法移除 Master 维护的关于指定 Driver 的相关信息和状态。
                5. 将 Master 的状态设置为 ALIVE
                6. 调用 $schedule()$ 方法进行资源调度
          • 向 Master 自身发送 CompleteRecovery 消息

          • 处理 CompleteRecovery 消息: $completeRecovery()$

        4. 如果 Master 节点之前 LEADER,当没有被选为 Leader 时,将其状态设置为 NOT_LEADER 并调用 $masterInstance.revokedLeadership()$ 方法撤销 Master 节点的领导关系。

      • $isLeader()$

      ZooKeeperLeaderElectionAgent 所属的 Master 节点被选为 Leader,会调用 ZooKeeperLeaderElectionAgent 的 $isLeader()$ 方法

      LeaderLatch 通过在 Master 上创建领导选举的 Znode 节点,并对 Znode 节点添加监视点来发现 Master 是否成功竞选

      • $notLeader()$
 ZooKeeperLeaderElectionAgent 所属的 Master 节点没有被选为 Leader,并更新领导关系状态。当 LeaderLatch 发现 Master 没有被选举为 Leader,会调用 ZooKeeperLeaderElectionAgent 的 $notLeader()$ 方法

2.2.2. MonarchyLeaderAgent

MonarchyLeaderAgent 在构造时会调用 masterInstance 的 electedLeader 方法选举领导

2.2.3. CustomLeaderElectionAgent

用于单元测试

三、总结