Spark-源码学习-集群启动-standalone-worker-注册机制
一、概述
Worker 在启动后, 需要加入到 Master 管理的整个集群中, 以参与 Driver、Executor 的资源调度。Worker 要加入 Master 管理的集群, 就必须将 Worker 注册到 Master。在启动 Worker 的过程中需要调用 registerWithMaster 方法向 Master 注册 Worker
二、源码
2.1. 向所有 Master 注册当前 Worker
$tryRegisterAllMasters()$ 方法负责向所有的 Master 注册当前 Worker~
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230924230336788.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_130)
只有处于领导状态的 Master 来处理 Worker 的注册。
$tryRegisterAllMasters()$ 方法遍历 masterRpcAddresses 中的每个 Master 的 RPC 地址,然后向 registerMasterThreadPool 提交向 Master 注册 Worker 的任务。主要向 Master 发送 RegisterWorker
消息,并对返回的结果使用 handleRegisterResponse 方法处理。
1 | private def tryRegisterAllMasters(): Array[JFuture[_]] = { |
RegisterWorker
消息携带 Worker 的 ID、host、port、内核数、内存大小等信息~
1 | case class RegisterWorker( |
2.1.1. master 接收 RegisterWorker 消息
2.1.2. worker 接收 RegisterWorkerResponse 消息
$sendRegisterMessageToMaster()$ 向特定 Master 的 RPC 通信终端发送消息 RegisterWorker
,注册成功后,Worker 接收 RegisterWorkerResponse
,进一步调用 $handleRegisterResponse()$ 方法进行处理。
1 | case msg: RegisterWorkerResponse => handleRegisterResponse(msg) |
![](https://note3.oss-cn-hangzhou.aliyuncs.com/source/image-20230924232924464.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,text_d3d3Lnp4Y2hvbWUuY29tCg==,size_36,g_center,color_FFFFFF,shadow_100,t_100,g_se,x_30,y_230)
注册成功
获取 preferredMasterAddress
1
2
3
4
5val preferredMasterAddress = if (preferConfiguredMasterAddress) {
masterAddress.toSparkURL
} else {
masterRef.address.toSparkURL
}$changeMaster()$
修改激活的 Master 的信息。$changeMaster()$ 方法还调用 $cancelLastRegistrationRetry()$ 方法取消注册尝试
即取消了调用 $tryRegisterAllMasters()$ 方法产生的注册任务和 registrationRetryTimer
心跳
$handleRegisterResponse()$ 方法启动以
HEARTBEAT_MILLIS
作为间隔向 Worker 自身发送SendHeartbeat
消息的定时任务1
2
3forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)工作目录的定期清理
启动工作目录的定期清理调度器, 默认情况下,该配置的属性为 False,需要手动设置, 对应属性名为
spark-worker-cleanup-enabled
1
2
3
4
5if (CLEANUP_ENABLED) {
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}向 master 发送当前状态信息
向 Master 发送
WorkerLatestState
消息此消息携带 Worker 的身份标识、Worker 节点的所有 Executor 的描述信息、调度到当前 Worker 的所有 Driver 的身份标识
1
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
注册失败
处理 RegisterWorkerFailed 消息
1
2
3
4
5case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}处理 MasterInStandby 消息
Master 处于 Standby 的状态,并不是领导身份,Worker 不作任何处理。
2.2. 注册重试
$registerWithMaster()$ 创建定时任务,按照常量 INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS
的时间间隔向 Worker 自身发送 ReregisterwithMaster
消息。
1 | registrationRetryTimer = Some(forwardMessageScheduler.scheduleAtFixedRate( |
Worker 接收 ReregisterWithMaster
消息,并调用 $reregisterWithMaster()$ 方法处理该消息。
1 | case ReregisterWithMaster => reregisterWithMaster() |
$reregisterWithMaster()$ 方法用于在 Worker 的 registered
为 false 时,重新向 Master 注册,由于其实现与 $registerWithMaster()$ 方法很多地方都是相似哒~