Spark-源码学习-集群启动-standalone-clustermanager
一、概述
Cluster Manager 在 Standalone 部署模式下为 Master。Master 的设计将直接决定整个集群的可扩展性、可用性和容错性。
二、master 实现
2.1. 属性
address: RpcEnv 的地址(即 RpcAddress)。RpcAddress 只包含 host 和 port 两个属性,用来记录 Master URL 的 host 和 port
webUiPort: 参数要指定的 WebUI 的端口
securityMgr: 即 SecurityManager
conf: 即 SparkConf
forwardMessageThread: 包含一个线程的 ScheduledThreadPoolExecutor,启动的线程以
master-forward-message-thread
作为名称。forwardMessageThread主要用于运行 checkForWorkerTimeOutTask 和 recoveryCompletionTask。checkForWorkerTimeOutTask: 检查 Worker 超时的任务。
1
2
3
4private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
0, workerTimeoutMs, TimeUnit.MILLISECONDS)recoveryCompletionTask: 当 Master 被选举为领导后,用于集群状态恢复的任务。
1
2
3
4
5
6
7
8if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, workerTimeoutMs, TimeUnit.MILLISECONDS)
}WORKER_TIMEOUT_MS: Worker 的超时时间。可通过 spark.worker.timeout 属性配置,默认为 60s
workers: 所有注册到 Master 的 Worker 信息(WorkerInfo)的集合
idToWorker: Worker id 与 WorkerInfo 的映射关系
1
private val idToWorker = new HashMap[String, WorkerInfo]
addressToWorker: Worker 的 RpcEnv 的地址(RpcAddress) 与 WorkerInfo 的映射关系
endpointToApp: RpcEndpointRef 与 ApplicationInfo 的映射关系
addressToApp: Application 对应 Driver 的 RpcEnv 的地址(RpcAddress) 与 ApplicationInfo 的映射关系。
completedApps: 已经完成的 ApplicationInfo 的集合。
RETAINED_APPLICATIONS: completedApps 中最多可以保留的 ApplicationInfo 的数量的限制大小。
当 completedApps 中的 ApplicationInfo 数量大于等于 RETAINED_APPLICATIONS 时,需要对 completedApps 中的部分 ApplicationInfo 进行清除。可通过 spark.deploy.retainedApplications 属性配置,默认为200。
nextAppNumber: 下一个 Application 的号码。
nextAppNumber 将参与到 Application ID 的生成规则中。
drivers: 所有 Driver 信息(DriverInfo)的集合。
completedDrivers: 已经完成的 DriverInfo 的集合。
reaperIterations: 从 workers 中移除处于死亡(DEAD) 状态的 Worker 所对应的 WorkerInfo 的权重。可通过 spark.dead.worker.persistence 属性配置,默认为 15。
1
private val reaperIterations = conf.get(REAPER_ITERATIONS)
nextDriverNumber: 下一个 Driver 的号码
1
2
3
4
5private def newDriverId(submitDate: Date): String = {
val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
appId
}persistenceEngine: 持久化引擎
leaderElectionAgent: 领导选举代理
spreadOutApps: 是否允许 Application 能够在所有节点间调度,避免 Application 总是固定在一小群节点上执行。可通过 spark.deploy.spreadOut 属性配置,默认为 true
defaultCores: 应用程序默认的最大内核数。可通过 spark.deploy.defaultCores 属性配置,默认为 java.lang.Integer.MAX_VALUE。
reverseProxy: SparkUI 是否采用反向代理。可通过 spark.ui.reverseProxy 属性配置,默认为 false。
restServerEnabled: 是否提供 REST 服务以提交应用程序
三、启动~
3.1. 启动脚本
Spark 中各个组件是通过脚本来启动部署的,每个组件对应提供了启动的脚本,同时也会提供停止的脚本~
3.2. Master#main()
来到 Master 伴生对象中的 $main()$ 方法~
1 | def main(argStrings: Array[String]): Unit = { |
Master 类的入口点处包含了对应的参数类 MasterArguments。MasterArguments 类包括 Spark 属性配置相关的一些解析。解析完 Master 的参数后,调用 $startRpcEnvAndEndpoint()$ 方法启动 RPC 通信环境以及 Master 的 RPC 通信终端~
3.2.1.创建 SecurityManager
SecurityManager 主要对账号、权限及身份认证进行设置和管理。
3.2.2. 创建 RpcEnv
3.2.3. 注册 master 通信终端
创建 Master, 并且将 Master 注册到刚创建的 RpcEnv 中,并获得 Master 的 RpcEndpointRef
Master 继承了 ThreadSafeRpcEndpoint 和 LeaderElectable,其中继承 LeaderElectable 涉及 Master 的高可用性 HA 机制。继承 ThreadSafeRpcEndpoint 类后,Master 作为一个 RpoEndpoint,实例化后首先会调用 $onStart()$ 方法。
来到 $onStart()$ ~
创建 Master WebUI
构建 Master Web UI, 查看向 Master 提交的应用程序等信息
1
2
3
4
5
6
7
8
9
10
11webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = webUi.webUrl
if (reverseProxy) {
val uiReverseProxyUrl = conf.get(UI_REVERSE_PROXY_URL).map(_.stripSuffix("/"))
if (uiReverseProxyUrl.nonEmpty) {
System.setProperty("spark.ui.proxyBase", uiReverseProxyUrl.get)
masterWebUiUrl = uiReverseProxyUrl.get + "/"
}
webUi.addProxy()
}Master WebUI 的
spark.ui.killEnabled
设置为 True,可以通过 WebUI 页面把 Spark 的进程杀掉。Master WebUI 中在初始化时用 new() 函数创建 MasterPage, 在 MasterPage 中通过代码去写 Web 页面。在 Master WebUI 的 initialize() 方法, 调用 attachPage() 方法, 在 WebUI 中增加 Web 页面。
1
2attachPage(new ApplicationPage(this))
attachPage(masterPage)- 创建 Master WebUI, 并绑定端口。
- 拼接 Master WebUI 的 URL, 即 masterWebUiUrl
- 如果启用了 Spark UI 的反向代理,那么将 masterWebUiUrl 设置为从 spark.ui.reverseProxyUrl 属性获得的反向代理的 URL
引用本站文章Spark-源码学习-WebUI-架构设计Joker启动 Worker 超时检查任务
在一个守护线程中, 启动定时任务, 周期性检查 Worker 是否超时,当 Worker 节点超时后, 会修改其状态或从 Master 中移除其相关的操作。
1
2
3checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
0, workerTimeoutMs, TimeUnit.MILLISECONDS)定时任务 $checkForWorkerTimeOutTask()$ 以
WORKER_TIMEOUT_MS
为时间间隔,通过不断向 Master 自身发送 CheckForWorkerTimeOut 消息实现对 Worker 的超时检查。 Master 调用 $timeOutDeadworkers()$ 方法处理 CheckForworkerTimeout 消息。过滤超时 Worker
过滤当前时间与
WORKER_TIMEOUT_MS
之差大于 WorkerInfo 的 lastHeartbeat 的 Worker 节点。超时 Worker 处理:
如果 WorkerInfo 的状态不是 DEAD, 则调用 $removeWorker()$ 方法移除 Worker 的相关信息。
如果 WorkerInfo 的状态是 DEAD,则等待足够长的时间后将它从 workers 列表中移除。
足够长的时间的计算公式为: REAPER_ITERATIONS 与 1 的和再乘以 WORKER_TIMEOUT_MS
创建并启动 REST 服务
默认情况下会启动 REST 服务.可以通过该服务向 Master 提交各种请求
1
2
3
4if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}如果需要提供 REST 服务,那么创建并启动 StandaloneRestServer。StandaloneRestServer 的端口可通过 spark.master.rest.port 属性配置,默认为 6066
Livy 是一个提供 Rest 接口和 spark 集群交互的服务。它可以提交 Spark Job 或者 Spark 一段代码,同步或者异步的返回结果;也提供 Sparkcontext 的管理,通过 Restful 接口或 RPC 客户端库。Livy 也简化了与 Spark 与应用服务的交互,这允许通过web 与 Spark 的使用交互。
引用站外地址,不保证站点的可用性和安全性Spark 开源 REST 服务——Apache Livy(Spark 客户端)Joker启动度量系统
度量系统启动后, 将主程序和应用程序度量 handler 处理程序附加到 WebUI 中
引用本站文章Spark-源码系列-SparkCore-度量系统JokerMaster HA
Master 根据
RECOVERY_MODE
创建持久化引擎和领导选举代理。Master 基于高可用性的考虑, 可以同时启动多个 Master。这些 Master 中只有一个是 Active 态,其余的都是 Standby 状态。Master 实现了 LeaderElectable 接口, 当被选举为领导时领导选举代理 LeaderElectionAgent 将会调用 Master 的 $electeaLeader()$ 方法。
引用本站文章Spark-源码学习-容错处理-组件-masterJoker
2.2.4. 发送 BoundPortsResponse
Master 自己发送一个消息 BoundPortsRequest 给自己,确保 masterEndpoint 正常启动起来。返回 BoundPortsResponse