一、概述

Cluster Manager 在 Standalone 部署模式下为 Master。Master 的设计将直接决定整个集群的可扩展性、可用性和容错性。

二、master 实现

2.1. 属性

  1. address: RpcEnv 的地址(即 RpcAddress)。RpcAddress 只包含 host 和 port 两个属性,用来记录 Master URL 的 host 和 port

  2. webUiPort: 参数要指定的 WebUI 的端口

  3. securityMgr: 即 SecurityManager

  4. conf: 即 SparkConf

  5. forwardMessageThread: 包含一个线程的 ScheduledThreadPoolExecutor,启动的线程以 master-forward-message-thread 作为名称。forwardMessageThread主要用于运行 checkForWorkerTimeOutTask 和 recoveryCompletionTask。

  6. checkForWorkerTimeOutTask: 检查 Worker 超时的任务。

    1
    2
    3
    4
    private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
    () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
    0, workerTimeoutMs, TimeUnit.MILLISECONDS)
  7. recoveryCompletionTask: 当 Master 被选举为领导后,用于集群状态恢复的任务。

    1
    2
    3
    4
    5
    6
    7
    8
    if (state == RecoveryState.RECOVERING) {
    beginRecovery(storedApps, storedDrivers, storedWorkers)
    recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    self.send(CompleteRecovery)
    }
    }, workerTimeoutMs, TimeUnit.MILLISECONDS)
    }
  8. WORKER_TIMEOUT_MS: Worker 的超时时间。可通过 spark.worker.timeout 属性配置,默认为 60s

  9. workers: 所有注册到 Master 的 Worker 信息(WorkerInfo)的集合

  10. idToWorker: Worker id 与 WorkerInfo 的映射关系

    1
    private val idToWorker = new HashMap[String, WorkerInfo]
  11. addressToWorker: Worker 的 RpcEnv 的地址(RpcAddress) 与 WorkerInfo 的映射关系

  12. endpointToApp: RpcEndpointRef 与 ApplicationInfo 的映射关系

  13. addressToApp: Application 对应 Driver 的 RpcEnv 的地址(RpcAddress) 与 ApplicationInfo 的映射关系。

  14. completedApps: 已经完成的 ApplicationInfo 的集合。

  15. RETAINED_APPLICATIONS: completedApps 中最多可以保留的 ApplicationInfo 的数量的限制大小。

    当 completedApps 中的 ApplicationInfo 数量大于等于 RETAINED_APPLICATIONS 时,需要对 completedApps 中的部分 ApplicationInfo 进行清除。可通过 spark.deploy.retainedApplications 属性配置,默认为200。

  16. nextAppNumber: 下一个 Application 的号码。

    nextAppNumber 将参与到 Application ID 的生成规则中。

  17. drivers: 所有 Driver 信息(DriverInfo)的集合。

  18. completedDrivers: 已经完成的 DriverInfo 的集合。

  19. reaperIterations: 从 workers 中移除处于死亡(DEAD) 状态的 Worker 所对应的 WorkerInfo 的权重。可通过 spark.dead.worker.persistence 属性配置,默认为 15。

    1
    private val reaperIterations = conf.get(REAPER_ITERATIONS)
  20. nextDriverNumber: 下一个 Driver 的号码

    1
    2
    3
    4
    5
    private def newDriverId(submitDate: Date): String = {
    val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
    nextDriverNumber += 1
    appId
    }
  21. persistenceEngine: 持久化引擎

  22. leaderElectionAgent: 领导选举代理

  23. spreadOutApps: 是否允许 Application 能够在所有节点间调度,避免 Application 总是固定在一小群节点上执行。可通过 spark.deploy.spreadOut 属性配置,默认为 true

  24. defaultCores: 应用程序默认的最大内核数。可通过 spark.deploy.defaultCores 属性配置,默认为 java.lang.Integer.MAX_VALUE。

  25. reverseProxy: SparkUI 是否采用反向代理。可通过 spark.ui.reverseProxy 属性配置,默认为 false。

  26. restServerEnabled: 是否提供 REST 服务以提交应用程序

三、启动~

3.1. 启动脚本

Spark 中各个组件是通过脚本来启动部署的,每个组件对应提供了启动的脚本,同时也会提供停止的脚本~

3.2. Master#main()

来到 Master 伴生对象中的 $main()$ 方法~

1
2
3
4
5
6
7
8
9
def main(argStrings: Array[String]): Unit = {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}

Master 类的入口点处包含了对应的参数类 MasterArguments。MasterArguments 类包括 Spark 属性配置相关的一些解析。解析完 Master 的参数后,调用 $startRpcEnvAndEndpoint()$ 方法启动 RPC 通信环境以及 Master 的 RPC 通信终端~

3.2.1.创建 SecurityManager

SecurityManager 主要对账号、权限及身份认证进行设置和管理。

3.2.2. 创建 RpcEnv

3.2.3. 注册 master 通信终端

创建 Master, 并且将 Master 注册到刚创建的 RpcEnv 中,并获得 Master 的 RpcEndpointRef

Master 继承了 ThreadSafeRpcEndpoint 和 LeaderElectable,其中继承 LeaderElectable 涉及 Master 的高可用性 HA 机制。继承 ThreadSafeRpcEndpoint 类后,Master 作为一个 RpoEndpoint,实例化后首先会调用 $onStart()$ 方法。

来到 $onStart()$ ~

  1. 创建 Master WebUI

    构建 Master Web UI, 查看向 Master 提交的应用程序等信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    webUi = new MasterWebUI(this, webUiPort)
    webUi.bind()
    masterWebUiUrl = webUi.webUrl
    if (reverseProxy) {
    val uiReverseProxyUrl = conf.get(UI_REVERSE_PROXY_URL).map(_.stripSuffix("/"))
    if (uiReverseProxyUrl.nonEmpty) {
    System.setProperty("spark.ui.proxyBase", uiReverseProxyUrl.get)
    masterWebUiUrl = uiReverseProxyUrl.get + "/"
    }
    webUi.addProxy()
    }

    Master WebUI 的 spark.ui.killEnabled 设置为 True,可以通过 WebUI 页面把 Spark 的进程杀掉。

    Master WebUI 中在初始化时用 new() 函数创建 MasterPage, 在 MasterPage 中通过代码去写 Web 页面。在 Master WebUI 的 initialize() 方法, 调用 attachPage() 方法, 在 WebUI 中增加 Web 页面。

    1
    2
    attachPage(new ApplicationPage(this))
    attachPage(masterPage)
    1. 创建 Master WebUI, 并绑定端口。
    2. 拼接 Master WebUI 的 URL, 即 masterWebUiUrl
    3. 如果启用了 Spark UI 的反向代理,那么将 masterWebUiUrl 设置为从 spark.ui.reverseProxyUrl 属性获得的反向代理的 URL
  2. 启动 Worker 超时检查任务

    在一个守护线程中, 启动定时任务, 周期性检查 Worker 是否超时,当 Worker 节点超时后, 会修改其状态或从 Master 中移除其相关的操作。

    1
    2
    3
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
    () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
    0, workerTimeoutMs, TimeUnit.MILLISECONDS)

    定时任务 $checkForWorkerTimeOutTask()$ 以 WORKER_TIMEOUT_MS 为时间间隔,通过不断向 Master 自身发送 CheckForWorkerTimeOut 消息实现对 Worker 的超时检查。 Master 调用 $timeOutDeadworkers()$ 方法处理 CheckForworkerTimeout 消息。

    • 过滤超时 Worker

      过滤当前时间与 WORKER_TIMEOUT_MS 之差大于 WorkerInfo 的 lastHeartbeat 的 Worker 节点。

    • 超时 Worker 处理:

      • 如果 WorkerInfo 的状态不是 DEAD, 则调用 $removeWorker()$ 方法移除 Worker 的相关信息。

      • 如果 WorkerInfo 的状态是 DEAD,则等待足够长的时间后将它从 workers 列表中移除。

        足够长的时间的计算公式为: REAPER_ITERATIONS 与 1 的和再乘以 WORKER_TIMEOUT_MS

  3. 创建并启动 REST 服务

    默认情况下会启动 REST 服务.可以通过该服务向 Master 提交各种请求

    1
    2
    3
    4
    if (restServerEnabled) {
    val port = conf.get(MASTER_REST_SERVER_PORT)
    restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
    }

    如果需要提供 REST 服务,那么创建并启动 StandaloneRestServer。StandaloneRestServer 的端口可通过 spark.master.rest.port 属性配置,默认为 6066

    Livy 是一个提供 Rest 接口和 spark 集群交互的服务。它可以提交 Spark Job 或者 Spark 一段代码,同步或者异步的返回结果;也提供 Sparkcontext 的管理,通过 Restful 接口或 RPC 客户端库。Livy 也简化了与 Spark 与应用服务的交互,这允许通过web 与 Spark 的使用交互。

  4. 启动度量系统

    度量系统启动后, 将主程序和应用程序度量 handler 处理程序附加到 WebUI 中

  5. Master HA

    Master 根据 RECOVERY_MODE 创建持久化引擎和领导选举代理。

    Master 基于高可用性的考虑, 可以同时启动多个 Master。这些 Master 中只有一个是 Active 态,其余的都是 Standby 状态。Master 实现了 LeaderElectable 接口, 当被选举为领导时领导选举代理 LeaderElectionAgent 将会调用 Master 的 $electeaLeader()$ 方法。

2.2.4. 发送 BoundPortsResponse

Master 自己发送一个消息 BoundPortsRequest 给自己,确保 masterEndpoint 正常启动起来。返回 BoundPortsResponse