一、概述

Worker 在启动后, 需要加入到 Master 管理的整个集群中, 以参与 Driver、Executor 的资源调度。Worker 要加入 Master 管理的集群, 就必须将 Worker 注册到 Master。在启动 Worker 的过程中需要调用 registerWithMaster 方法向 Master 注册 Worker

二、源码

2.1. 向所有 Master 注册当前 Worker

$tryRegisterAllMasters()$ 方法负责向所有的 Master 注册当前 Worker~

只有处于领导状态的 Master 来处理 Worker 的注册。

$tryRegisterAllMasters()$ 方法遍历 masterRpcAddresses 中的每个 Master 的 RPC 地址,然后向 registerMasterThreadPool 提交向 Master 注册 Worker 的任务。主要向 Master 发送 RegisterWorker 消息,并对返回的结果使用 handleRegisterResponse 方法处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
sendRegisterMessageToMaster(masterEndpoint)
} catch {
//...
}
}
})
}
}

RegisterWorker 消息携带 Worker 的 ID、host、port、内核数、内存大小等信息~

1
2
3
4
5
6
7
8
9
10
11
12
13
case class RegisterWorker(
id: String,
host: String,
port: Int,
worker: RpcEndpointRef,
cores: Int,
memory: Int,
workerWebUiUrl: String,
masterAddress: RpcAddress,
resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage {
Utils.checkHost(host)
assert (port > 0)
}

2.1.1. master 接收 RegisterWorker 消息

2.1.2. worker 接收 RegisterWorkerResponse 消息

$sendRegisterMessageToMaster()$ 向特定 Master 的 RPC 通信终端发送消息 RegisterWorker,注册成功后,Worker 接收 RegisterWorkerResponse ,进一步调用 $handleRegisterResponse()$ 方法进行处理。

1
case msg: RegisterWorkerResponse => handleRegisterResponse(msg)
  1. 注册成功

    1. 获取 preferredMasterAddress

      1
      2
      3
      4
      5
      val preferredMasterAddress = if (preferConfiguredMasterAddress) {
      masterAddress.toSparkURL
      } else {
      masterRef.address.toSparkURL
      }
    2. $changeMaster()$

      修改激活的 Master 的信息。$changeMaster()$ 方法还调用 $cancelLastRegistrationRetry()$ 方法取消注册尝试

      即取消了调用 $tryRegisterAllMasters()$ 方法产生的注册任务和 registrationRetryTimer

    3. 心跳

      $handleRegisterResponse()$ 方法启动以 HEARTBEAT_MILLIS 作为间隔向 Worker 自身发送 SendHeartbeat 消息的定时任务

      1
      2
      3
      forwardMessageScheduler.scheduleAtFixedRate(
      () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
      0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
    4. 工作目录的定期清理

      启动工作目录的定期清理调度器, 默认情况下,该配置的属性为 False,需要手动设置, 对应属性名为 spark-worker-cleanup-enabled

      1
      2
      3
      4
      5
      if (CLEANUP_ENABLED) {
      forwardMessageScheduler.scheduleAtFixedRate(
      () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
      CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
      }
    5. 向 master 发送当前状态信息

      向 Master 发送 WorkerLatestState 消息

      此消息携带 Worker 的身份标识、Worker 节点的所有 Executor 的描述信息、调度到当前 Worker 的所有 Driver 的身份标识

      1
      masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
  2. 注册失败

    处理 RegisterWorkerFailed 消息

    1
    2
    3
    4
    5
    case RegisterWorkerFailed(message) =>
    if (!registered) {
    logError("Worker registration failed: " + message)
    System.exit(1)
    }
  3. 处理 MasterInStandby 消息

    Master 处于 Standby 的状态,并不是领导身份,Worker 不作任何处理。

2.2. 注册重试

$registerWithMaster()$ 创建定时任务,按照常量 INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS 的时间间隔向 Worker 自身发送 ReregisterwithMaster 消息。

1
2
3
4
5
registrationRetryTimer = Some(forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) },
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))

Worker 接收 ReregisterWithMaster 消息,并调用 $reregisterWithMaster()$ 方法处理该消息。

1
case ReregisterWithMaster => reregisterWithMaster()

$reregisterWithMaster()$ 方法用于在 Worker 的 registered 为 false 时,重新向 Master 注册,由于其实现与 $registerWithMaster()$ 方法很多地方都是相似哒~