Spark-源码学习-容错-选举机制
一、概述
领导选举机制(Leader Election)可以保证集群虽然存在多个 Master,但是只有一个 Master 处于激活(Active)状态,其他 Master 处于支持(Standby)状态。当 Active 状态的 Master 出现故障时,会选举出一个 Standby 状态的 Master 作为新的 Active 状态的 Master。
在 Master 的切换过程中影响:
- 不能够提交新的应用程序给集群, 因为只有 Active Master 才能接收新的程序提交请求
- 已经运行的程序中也不能因为 Action 操作触发新的 Job 提交请求。
集群 Worker, Driver 和 Application 的信息都已经通过持久化引擎持久化,因此在 Master 切换的过程中, 所有已经在运行的程序皆正常运行。因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源, 所以在运行时,Job 本身的调度和处理和 Master 是没有任何关系的。
二、LeaderElectionAgent
特质 LeaderElectionAgent 定义了对当前的 Master 进行跟踪和领导选举代理的通用接口。
2.1. 属性
2.1.1. masterInstance
masterInstance 属性类型是 LeaderElectable
LeaderElectable 接口定义了两个方法:
- $electedLeader()$: 被选举为领导。
- $revokedLeadership()$: 撤销领导关系。
LeaderElectable 目前只有 Master 这一个实现类
2.2. 实现
LeaderElectionAgent 一共有三种实现。由于 CustomLeaderElectionAgent 用于单元测试
2.2.1. ZooKeeperLeaderElectionAgent
ZooKeeperLeaderElectionAgent 是借助 ZooKeeper 实现的领导选举代理,ZooKeeper 自动化管理 Masters 的切换,保存整个 Spark 集群运行时的元数据, 包括 Workers、Drivers、Applications、Executors。
属性
WORKING_DIR: ZooKeeperLeaderElectionAgent 在 ZooKeeper 上的工作目录(可通过 spark.deploy.zooKeeper.dir 属性配置,默认为 spark)
zk: 连接 ZooKeeper 的客户端,类型为 CuratorFramework
leaderLatch: 使用 ZooKeeper 进行领导选举的客户端,类型为 LeaderLatch
status: 领导选举的状态,包括有领导 (LEADER) 和无领导 (NOT_LEADER)
方法
$start()$: 启动基于 ZooKeeper 的领导选举代理
ZooKeeperLeaderElectionAgent 实现了 LeaderLatchListener,发生领导选举后,LeaderLatch 会回调 ZooKeeperLeaderElectionAgent 的 $isLeader$ 或 $notLeader$ 方法。
1
2
3
4
5
6private def start() {
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}$updateLeadershipStatus()$: 更新领导关系状态
1
2
3
4
5
6
7
8
9private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) { // 选举为领导
status = LeadershipStatus.LEADER
masterInstance.electedLeader()
} else if (! isLeader && status == LeadershipStatus.LEADER) {// 撤销领导职务
status = LeadershipStatus.NOT_LEADER
masterInstance.revokedLeadership()
}
}如果 Master 节点之前不是领导(NOT_LEADER),当被选为领导 Leader 时,将其状态设置为 LEADER 并调用 $masterInstance.electedLeader()$ 方法将Master 节点设置为 Leader。
1
2
3override def electedLeader(){
self.send(ElectedLeader)
}$Master.receive()$ 方法中实现了对 ElectedLeader 消息的处理, Master 处理 ElectedLeader 消息的步骤如下:
从持久化引擎 PersistenceEngine 中读取出持久化的 ApplicationInfo、DriverInfo、WorkerInfo 等信息。
1
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
如果没有任何持久化信息,则将 Master 的当前状态设置为 ALIVE,否则将 Master 的当前状态设置为 RECOVERING
1
2
3
4
5state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}如果 Master 的当前状态为 RECOVERING, 则调用 $beginRecovery()$ 方法对整个集群的状态进行恢复。在集群状态恢复完成后,创建延时任务 recoveryCompletionTask, 在
WORKER_TIMEOUT_MS
指定的时间后向 Master 自身发送CompleteRecovery
消息。1
2
3
4
5
6
7
8if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, workerTimeoutMs, TimeUnit.MILLISECONDS)
}集群的状态恢复: $beginRecovery()$
恢复 Application
遍历从持久化引擎中读取 ApplicationInfo
调用 $registerApplication()$ 方法将 ApplicationInfo 添加到 apps、idToApp、endpointToApp、 addressToApp、waitingApps 等缓存中。
将 ApplicationInfo 的状态设置为 UNKNOWN
向提交应用程序的 Driver 发送
MasterChanged
消息(此消息将携带被选举为领导的 Master 和此 Master 的 masterWebUiUrl 属性)Driver 接收到
MasterChanged
消息后,将自身的 master 属性修改为当前 Master 的 RpcEndpointRef, 并将 alreadyDisconnected 设置为 false,最后向 Master 发送MasterChangeAcknowledged
消息。1
2
3
4case MasterChanged(masterRef, masterWebUiUrl) =>
master = Some(masterRef)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId.get))Master 接收到
MasterChangeAcknowledged
消息后将 ApplicationInfo 的状态修改为 WAITING, 然后在不存在状态为 UNKNOWN 的 ApplicationInfo 和 WorkerInfo 时调用 $completeRecovery()$ 方法完成恢复。1
2
3
4
5
6
7
8case MasterChangeAcknowledged(appId) =>
idToApp.get(appId) match {
case Some(app) =>
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
if (canCompleteRecovery) { completeRecovery() }
恢复 Driver
遍历从持久化引擎中读取的 DriverInfo, 将每个 DriverInfo 添加到 drivers 缓存
1
2
3for (driver <- storedDrivers) {
drivers += driver
}恢复 Worker
遍历从持久化引擎中读取的 WorkerInfo
1
2
3
4
5
6
7
8
9for (worker <- storedWorkers) {
try {
registerWorker(worker)
worker.state = WorkerState.UNKNOWN
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
//...
}
}- 调用 $registerWorker()$ 方法将 WorkerInfo 添加到 workers、idToWorker、addressToWorker 等缓存中
- 将 WorkerInfo 的状态修改为 UNKNOWN
- 向 Worker 发送
MasterChanged
消息(此消息将携带被选举为领导的 Master 和此 Master 的 masterWebUiUrl 属性)
集群的状态恢复完成: $completeRecovery()$
$completeRecovery()$ 方法主要负责是删除那些不合格的 Application 与 Worker,主要调用了 $removeWorker()$ 与 $finashApplication()$ 两个方法。在删除之前首先将所有待恢复的 Woker 与 Application 的状态修改为
UNKOWN
。然后 Master 更新他们的状态。最后再去删除没有响应的组件。删除不合格 Application
1
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_, "Not responding for recovery"))
删除不合格 Worker
1
2apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)删除 driver
1
2
3
4
5
6
7drivers.filter(_.worker.isEmpty).foreach { d =>
if (d.desc.supervise) {
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
}
}重新调度 $schedule()$
如果 Master 节点之前 LEADER,新一轮选举当没有被选为 Leader 时,将其状态设置为
NOT_LEADER
并调用 $masterInstance.revokedLeadership()$ 方法撤销 Master 节点的领导关系。1
2
3case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
$isLeader()$
ZooKeeperLeaderElectionAgent 所属的 Master 节点被选为 Leader,会调用 $ZooKeeperLeaderElectionAgent.isLeader()$ 方法
LeaderLatch 通过在 Master 上创建领导选举的 Znode 节点,并对 Znode 节点添加监视点来发现 Master 是否成功竞选
1
2
3
4
5
6
7
8override def isLeader(): Unit = {
synchronized {
if (!leaderLatch.hasLeadership) {
return
}
updateLeadershipStatus(true)
}
}$notLeader()$
ZooKeeperLeaderElectionAgent 所属的 Master 节点没有被选举为 Leader,会调用 $ZooKeeperLeaderElectionAgent.notLeader()$ 方法
1
2
3
4
5
6
7
8override def notLeader(): Unit = {
synchronized {
if (leaderLatch.hasLeadership) {
return
}
updateLeadershipStatus(false)
}
}
2.2.2. MonarchyLeaderAgent
MonarchyLeaderAgent 在构造时会调用 masterInstance 的 electedLeader 方法选举领导
2.2.3. CustomLeaderElectionAgent
用于单元测试