一、概述

领导选举机制(Leader Election)可以保证集群虽然存在多个 Master,但是只有一个 Master 处于激活(Active)状态,其他 Master 处于支持(Standby)状态。当 Active 状态的 Master 出现故障时,会选举出一个 Standby 状态的 Master 作为新的 Active 状态的 Master。

在 Master 的切换过程中影响:

  1. 不能够提交新的应用程序给集群, 因为只有 Active Master 才能接收新的程序提交请求
  2. 已经运行的程序中也不能因为 Action 操作触发新的 Job 提交请求。

集群 Worker, Driver 和 Application 的信息都已经通过持久化引擎持久化,因此在 Master 切换的过程中, 所有已经在运行的程序皆正常运行。因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源, 所以在运行时,Job 本身的调度和处理和 Master 是没有任何关系的。

二、LeaderElectionAgent

特质 LeaderElectionAgent 定义了对当前的 Master 进行跟踪和领导选举代理的通用接口。

2.1. 属性

2.1.1. masterInstance

masterInstance 属性类型是 LeaderElectable

LeaderElectable 接口定义了两个方法:

  • $electedLeader()$: 被选举为领导。
  • $revokedLeadership()$: 撤销领导关系。

LeaderElectable 目前只有 Master 这一个实现类

2.2. 实现

LeaderElectionAgent 一共有三种实现。由于 CustomLeaderElectionAgent 用于单元测试

2.2.1. ZooKeeperLeaderElectionAgent

ZooKeeperLeaderElectionAgent 是借助 ZooKeeper 实现的领导选举代理,ZooKeeper 自动化管理 Masters 的切换,保存整个 Spark 集群运行时的元数据, 包括 Workers、Drivers、Applications、Executors。

  1. 属性

    • WORKING_DIR: ZooKeeperLeaderElectionAgent 在 ZooKeeper 上的工作目录(可通过 spark.deploy.zooKeeper.dir 属性配置,默认为 spark)

    • zk: 连接 ZooKeeper 的客户端,类型为 CuratorFramework

    • leaderLatch: 使用 ZooKeeper 进行领导选举的客户端,类型为 LeaderLatch

    • status: 领导选举的状态,包括有领导 (LEADER) 和无领导 (NOT_LEADER)

  2. 方法

    • $start()$: 启动基于 ZooKeeper 的领导选举代理

      ZooKeeperLeaderElectionAgent 实现了 LeaderLatchListener,发生领导选举后,LeaderLatch 会回调 ZooKeeperLeaderElectionAgent 的 $isLeader$ 或 $notLeader$ 方法。

      1
      2
      3
      4
      5
      6
      private def start() {
      zk = SparkCuratorUtil.newClient(conf)
      leaderLatch = new LeaderLatch(zk, WORKING_DIR)
      leaderLatch.addListener(this)
      leaderLatch.start()
      }
    • $updateLeadershipStatus()$: 更新领导关系状态

      1
      2
      3
      4
      5
      6
      7
      8
      9
      private def updateLeadershipStatus(isLeader: Boolean) {
      if (isLeader && status == LeadershipStatus.NOT_LEADER) { // 选举为领导
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
      } else if (! isLeader && status == LeadershipStatus.LEADER) {// 撤销领导职务
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
      }
      }
      • 如果 Master 节点之前不是领导(NOT_LEADER),当被选为领导 Leader 时,将其状态设置为 LEADER 并调用 $masterInstance.electedLeader()$ 方法将Master 节点设置为 Leader。

        1
        2
        3
        override def electedLeader(){
        self.send(ElectedLeader)
        }

        $Master.receive()$ 方法中实现了对 ElectedLeader 消息的处理, Master 处理 ElectedLeader 消息的步骤如下:

        1. 从持久化引擎 PersistenceEngine 中读取出持久化的 ApplicationInfo、DriverInfo、WorkerInfo 等信息。

          1
          val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
        2. 如果没有任何持久化信息,则将 Master 的当前状态设置为 ALIVE,否则将 Master 的当前状态设置为 RECOVERING

          1
          2
          3
          4
          5
          state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
          RecoveryState.ALIVE
          } else {
          RecoveryState.RECOVERING
          }
        3. 如果 Master 的当前状态为 RECOVERING, 则调用 $beginRecovery()$ 方法对整个集群的状态进行恢复。在集群状态恢复完成后,创建延时任务 recoveryCompletionTask, 在 WORKER_TIMEOUT_MS 指定的时间后向 Master 自身发送 CompleteRecovery 消息。

          1
          2
          3
          4
          5
          6
          7
          8
          if (state == RecoveryState.RECOVERING) {
          beginRecovery(storedApps, storedDrivers, storedWorkers)
          recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
          self.send(CompleteRecovery)
          }
          }, workerTimeoutMs, TimeUnit.MILLISECONDS)
          }
          • 集群的状态恢复: $beginRecovery()$

            • 恢复 Application

              遍历从持久化引擎中读取 ApplicationInfo

              1. 调用 $registerApplication()$ 方法将 ApplicationInfo 添加到 apps、idToApp、endpointToApp、 addressToApp、waitingApps 等缓存中。

              2. 将 ApplicationInfo 的状态设置为 UNKNOWN

              3. 向提交应用程序的 Driver 发送 MasterChanged 消息(此消息将携带被选举为领导的 Master 和此 Master 的 masterWebUiUrl 属性)

                Driver 接收到 MasterChanged 消息后,将自身的 master 属性修改为当前 Master 的 RpcEndpointRef, 并将 alreadyDisconnected 设置为 false,最后向 Master 发送 MasterChangeAcknowledged 消息。

                1
                2
                3
                4
                case MasterChanged(masterRef, masterWebUiUrl) =>
                master = Some(masterRef)
                alreadyDisconnected = false
                masterRef.send(MasterChangeAcknowledged(appId.get))

                Master 接收到 MasterChangeAcknowledged 消息后将 ApplicationInfo 的状态修改为 WAITING, 然后在不存在状态为 UNKNOWN 的 ApplicationInfo 和 WorkerInfo 时调用 $completeRecovery()$ 方法完成恢复。

                1
                2
                3
                4
                5
                6
                7
                8
                case MasterChangeAcknowledged(appId) =>
                idToApp.get(appId) match {
                case Some(app) =>
                app.state = ApplicationState.WAITING
                case None =>
                logWarning("Master change ack from unknown app: " + appId)
                }
                if (canCompleteRecovery) { completeRecovery() }
            • 恢复 Driver

              遍历从持久化引擎中读取的 DriverInfo, 将每个 DriverInfo 添加到 drivers 缓存

              1
              2
              3
              for (driver <- storedDrivers) {
              drivers += driver
              }
            • 恢复 Worker

              遍历从持久化引擎中读取的 WorkerInfo

              1
              2
              3
              4
              5
              6
              7
              8
              9
              for (worker <- storedWorkers) {
              try {
              registerWorker(worker)
              worker.state = WorkerState.UNKNOWN
              worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
              } catch {
              //...
              }
              }
              • 调用 $registerWorker()$ 方法将 WorkerInfo 添加到 workers、idToWorker、addressToWorker 等缓存中
              • 将 WorkerInfo 的状态修改为 UNKNOWN
              • 向 Worker 发送 MasterChanged 消息(此消息将携带被选举为领导的 Master 和此 Master 的 masterWebUiUrl 属性)
          • 集群的状态恢复完成: $completeRecovery()$

            $completeRecovery()$ 方法主要负责是删除那些不合格的 Application 与 Worker,主要调用了 $removeWorker()$ 与 $finashApplication()$ 两个方法。在删除之前首先将所有待恢复的 Woker 与 Application 的状态修改为 UNKOWN 。然后 Master 更新他们的状态。最后再去删除没有响应的组件。

            • 删除不合格 Application

              1
              workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_, "Not responding for recovery"))
            • 删除不合格 Worker

              1
              2
              apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
              apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
            • 删除 driver

              1
              2
              3
              4
              5
              6
              7
              drivers.filter(_.worker.isEmpty).foreach { d =>
              if (d.desc.supervise) {
              relaunchDriver(d)
              } else {
              removeDriver(d.id, DriverState.ERROR, None)
              }
              }
            • 重新调度 $schedule()$

        4. 如果 Master 节点之前 LEADER,新一轮选举当没有被选为 Leader 时,将其状态设置为 NOT_LEADER 并调用 $masterInstance.revokedLeadership()$ 方法撤销 Master 节点的领导关系。

          1
          2
          3
          case RevokedLeadership =>
          logError("Leadership has been revoked -- master shutting down.")
          System.exit(0)
      • $isLeader()$

        ZooKeeperLeaderElectionAgent 所属的 Master 节点被选为 Leader,会调用 $ZooKeeperLeaderElectionAgent.isLeader()$ 方法

        LeaderLatch 通过在 Master 上创建领导选举的 Znode 节点,并对 Znode 节点添加监视点来发现 Master 是否成功竞选

        1
        2
        3
        4
        5
        6
        7
        8
        override def isLeader(): Unit = {
        synchronized {
        if (!leaderLatch.hasLeadership) {
        return
        }
        updateLeadershipStatus(true)
        }
        }
      • $notLeader()$

        ZooKeeperLeaderElectionAgent 所属的 Master 节点没有被选举为 Leader,会调用 $ZooKeeperLeaderElectionAgent.notLeader()$ 方法

        1
        2
        3
        4
        5
        6
        7
        8
        override def notLeader(): Unit = {
        synchronized {
        if (leaderLatch.hasLeadership) {
        return
        }
        updateLeadershipStatus(false)
        }
        }

2.2.2. MonarchyLeaderAgent

MonarchyLeaderAgent 在构造时会调用 masterInstance 的 electedLeader 方法选举领导

2.2.3. CustomLeaderElectionAgent

用于单元测试