Spark-源码学习-SparkCore-调度机制-Master 资源调度
一、概述
$Master.schedule()$ 为处于待分配资源的 Application 分配资源,调度当前 workers 可用的资源。在每次有新的 Application 加入或者新的资源加入时都会调用 $schedule()$ 进行调度。
$Master.schedule()$ 方法将在一个新的 Application 被提交,或者可用的 resource 变化的时候被调用。
二、设计
2.1. 判断 Master 状态
首先判断 Master 的状态不是 ALIVE
的时候,则直接 return
1 | if (state != RecoveryState.ALIVE) { |
2.2. 选择 Worker
对处于 ALIVE 状态的 Workers 进行 Shuffle
1 | val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) |
2.3. 在集群 Worker 上启动 Driver
由于 waitingDrivers 不为空,则会走 LaunchDriver 的流程
Master 在接受到
RequestSubmitDriver
后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中
1 | for (driver <- waitingDrivers.toList) { |
2.3.1. Driver 启动资源判断
判断 worker 的内存、cpu、resource 三个指标是否同时符合 driver 的启动要求
- Worker空余的内存 >= driver 所需要的内存
- Worker 空余的 CPU 核数 >= driver 所需要的 CPU 核数
1 | private def canLaunch(worker: WorkerInfo, memoryReq: Int, coresReq: Int, resourceRequirements: Seq[ResourceRequirement]): Boolean = { |
2.3.2. 启动 Driver
Driver 启动逻辑主要是将 worker 和 driver 相互引用,更新 worker 的资源使用情况,并向对应的 worker 节点发送 LaunchDriver
事件
worker 添加 driver
1
2worker.addDriver(driver)
driver.worker = Some(worker)向 worker 发送
LaunchDriver
消息当前的 application 申请资源,向 worker 发送消息,触发 Worker 的 $receive()$ 方法。
$Worker.receive()$ 方法中,当 Worker 遇到
LaunchDriver
指令时,创建并启动一个 DriverRunner,DriverRunner 启动一个线程,异步的处理 Driver 启动工作。1
worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
worker 接收到 master 发送来的
LanuchDriver
事件,创建 DriverRunner 对象(内部封装了一个线程)DriverRunner 中是开辟线程异步的处理 Driver 启动工作,不会阻塞主进程的执行
1
2
3
4
5
6
7
8
9
10
11
12val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
workerWebUiUrl,
securityMgr,
resources_
)将 DriverRunner 对象添加进该 Worker 内部的 driver 列表
1
val drivers = new HashMap[String, DriverRunner]
调用 $DriverRunner.start()$ 方法,同时修改该 worker 进程的可用 cores 个数,内存个数
在 $start()$ 方法中,直接启动了一个 Thread,在线程处理中,调用了 $prepareAndRunDriver()$
$prepareAndRunDriver()$
创建 driver 工作目录
从 master 中下载要执行的 jar 包(移动计算、不移动数据)
如果是 java 编写的 spark 程序,使用 maven 打的包
构建 ProcessBuilder
启动 ProcessBuilder(使用 java 的 API 去操作一个 java 的进程 Process Runtime)
返回退出码 exitCode
根据 exitCode,设置 finalState,状态分为完成,被杀死,失败“finalState”
向 worker 发送
DriverStateChanged
信息通知 Driver 发生状态改变向 worker 发送
DriverStateChanged
消息Driver 在此启动起来,向 worker 发送
DriverStateChanged
(driverId, state, finalException)事件通知,worker 接收到后,向 master 发送相同的DriverStateChanged(driverId, state, finalException) 事件通知,如果 Driver 的状态是错误、完成、杀死、失败,就移除 Driver
Driver 用于执行 Spark 用户任务中的 $main$ 方法,负责实际代码的执行工作,触发整个任务执行,Master 通知 Worker 启动 Driver,就是启动了一个 JVM,并且开始使用反射的方式执行 DriverWrapper 的 $main$ 方法。在 $DriverWrapper.main()$ 方法中,会执行用户提交的 jar 包中的 $main$ 方法。
初始化 SparkContext
初始化 DAGScheduler
DAGScheduler 的主要作用是: 当 driver 执行 action 算子的时候,DAGScheduler 会从最后一个 rdd 开始往前递归寻找 ShuffleDependency,每找到一个就会划分一个 stage,最终达到的目的是,根据业务逻辑算子依赖关系,划分成一个个 stage,并且根据 stage 先后关系,把 stage 转化成 ShuffleMapTask 或者 ResultTask,封装成 TaskSet,交给 TaskScheduler 提交。
初始化 TaskScheduler
TaskScheduler 是一个 trait,它的核心任务是提交 TaskSet 到集群并汇报结果,并且向 DAGScheduler 汇报任务的执行情况。
初始化 SchedulerBackend
SchedulerBackend 也是一个 trait,不同的部署模式有不同的实现,比如在 standalone 模式下,实现类是:StandaloneSchedulerBackend。
它内部有两个 Endpoint 组件,一个是 DriverEndpoint ,用来和 Worker 打交道,比如创建 Executor、提交 Task 等;一个是 ClientEndpoint 用来和 master 打交道。
SchedulerBackend 专门负责收集 Worker 上的资源信息,它知道每个任务在提交时,自己拥有多少资源,然后去具体运行 Task。
2.4. 在集群 Worker 上启动 Executor
$startExecutorsOnWorkers()$ 方法首先会从 waitingApps 中获取提交的 app,然后会对每个 worker 提供多少 cpu 和启动多少 Executor 做计算。
1 | private def startExecutorsOnWorkers(): Unit = { |
2.4.1. 资源判断
当 application 剩余需要的 cpu 数大于单个 executor 需要的 cpu 数据才进行新 executor 的创建
如果 application 的剩余需求 cpu 小于单 executor 需要的 cpu 数,要么是已经存在了运行的 executor,要么就是配置有问题,正常情况下很少出现 application 需求 cpu 数小于 executor 创建需求 cpu 数的情况。
2.4.2. 过滤出含有足够的资源启动 Executors 的 Workers
1 | val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) |
Worker 的状态是 ALIVE
$canLaunchExecutor$
- Worker 的空余内存 >= Application 启动 Executor 所需要的内存
- Worker 的空余核数 >= Application 启动 Executor 所需要的核数
根据 Worker 空余的核数倒叙排序
2.4.3. 计算每个 worker 要分配出去的 CPU 数: scheduleExecutorsOnWorkers()
计算 worker 分配 cpu 数的算法有两种,一种是 spreadOutApps(默认):遍历 worker,一次只在 worker 上分配一个 executor,另一种是非 spreadOutApps:尽量在一个 worker 启动所有 executor,直到分配结束或者资源不够。
变量
Executor CPU
1
2
3
4val coresPerExecutor = app.desc.coresPerExecutor // 配置中每个 Executor 的 CPU
val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // 每个 Executor 的最小 CPU 数(默认为 1 核)
// // 判断配置中的每个 Executor 的 CPU 是否为空(如果为空,则表示每个 Executor 只用分配一个 CPU)
val oneExecutorPerWorker = coresPerExecutor.isEmptyExecutor 内存
1
val memoryPerExecutor = resourceDesc.memoryMbPerExecutor
可用 Worker 的数量
1
val numUsable = usableWorkers.length
每个 Worker 分配的 CPU 的集合
创建一个空数组,存储了每个 worker 要分配出去的 CPU 数量
1
val assignedCores = new Array[Int](numUsable)
每个 Worker 上分配的 Executor 个数的集合
1
val assignedExecutors = new Array[Int](numUsable)
可以/需要分配的核数(取 Application 所需要的核数与所有 Worker 空余的核数总和的最小值)
1
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
获取可以符合 executor 构建条件的 workers
1
var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
判断指定的 worker 是否可以为这个 Application 启动一个 Executor: $canLaunchExecutor()$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29def canLaunchExecutorForApp(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
// 判断是否有足够的核数
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
val assignedExecutorNum = assignedExecutors(pos)
// 如果在当前 Worker 上没有启动 Executor,或者一个 Executor 上需要启动多个 cores
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
if (launchingNewExecutor) {
// 已经分配的 memory
val assignedMemory = assignedExecutorNum * memoryPerExecutor
// 判断是否有足够的 memory: 当前 worker 空余 memory - 当前 worker 已经分配的 memory >= 一个 Executor 所需要的最小 memory
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val assignedResources = resourceReqsPerExecutor.map {
req => req.resourceName -> req.amount * assignedExecutorNum
}.toMap
val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
}
val enoughResources = ResourceUtils.resourcesMeetRequirements(
resourcesFree, resourceReqsPerExecutor)
val executorNum = app.getOrUpdateExecutorsForRPId(rpId).size
val executorLimit = app.getTargetExecutorNumForRPId(rpId)
val underLimit = assignedExecutors.sum + executorNum < executorLimit
keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
} else {
keepScheduling && enoughCores
}
}
worker 分配 CPU
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// 一直提交 Executor,直到没有可用的 Worker 或者是到达了 Application 所需要的的 Executor 的上限
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutorForApp(pos)) {
// 更新待分配 CPU 核数和已分配 CPU 核数
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// 如果每个 Worker 只启动一个 Executor ,每一次循环给这个 Executor 分配一个 core 否则。每一次循环给一个新的 Executor 增加一个 core
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// spreadOutApps >>> 尽量分配 Executors 到最多的 Worker 上;
// 非 spreadOutApps >>> 紧着一个 Worker 分配 Executors,直到这个 Worker 的资源被用尽。
if (spreadOutApps) {
keepScheduling = false
}
}
}
// 每一次循环,过滤出来可以提交至少一个 Executor 的 workers
freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
}
2.4.4. 向 worker 发送对应的 LaunchExecutor 事件
通过上一步计算出了每个 worker 要分配出去的 cpu 数,$allocateWorkerResourceToExecutors()$ 遍历可用 worker,如果该 worker 需要分配的 cpu 数大于 0,则向该 worker 发送对应的 LaunchExecutor
事件
计算 worker 运行 executor 数目
根据当前 worker 要分配的 cpu 总核数和每个 executor 需要的 cpu 数,计算出当前 worker 要分配的 executor 数
1
2val coresPerExecutor = resourceDesc.coresPerExecutor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)获取每个 executor 需要的 cpu 数
1
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
创建指定数目的 executor
1
2
3
4
5
6
7
8
9for (i <- 1 to numExecutors) {
// //从 worker 获取指定资源
val allocated = worker.acquireResources(resourceDesc.customResourcesPerExecutor)
// 创建 ExecutorDesc 对象,并将其记录到 application 中
val exec = app.addExecutor(worker, coresToAssign, resourceDesc.memoryMbPerExecutor, allocated, rpId)
// 发起 executor 的创建
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}Worker 添加 executor
将 executor 加入 worker 内部的缓存
1
worker.addExecutor(exec)
向 Worker 发送
LaunchExecutor
消息,通知 Worker 节点启动 executor1
2worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
exec.rpId, exec.application.desc, exec.cores, exec.memory, exec.resources))向 Driver 发送
ExecutorAdded
消息发消息告诉该 application 对应 Driver 该 worker 上的 executor 已经启动
1
exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
https://blog.csdn.net/Interest1_wyt/article/details/125345385
管理Driver的执行,包括在Driver失败的时候自动重启,要是在Standalone的模式下。失败是否重试是看DriverDescription中的supervise是否为true,如果指定了这个参数为true,driver在失败的时候worker会负责启动这个Driver
https://cloud.tencent.com/developer/article/1887711
https://segmentfault.com/a/1190000040448608