一、概述

$Master.schedule()$ 为处于待分配资源的 Application 分配资源,调度当前 workers 可用的资源。在每次有新的 Application 加入或者新的资源加入时都会调用 $schedule()$ 进行调度。

$Master.schedule()$ 方法将在一个新的 Application 被提交,或者可用的 resource 变化的时候被调用。

二、设计

2.1. 判断 Master 状态

首先判断 Master 的状态不是 ALIVE 的时候,则直接 return

1
2
3
if (state != RecoveryState.ALIVE) {
return
}

2.2. 选择 Worker

对处于 ALIVE 状态的 Workers 进行 Shuffle

1
2
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size

2.3. 在集群 Worker 上启动 Driver

由于 waitingDrivers 不为空,则会走 LaunchDriver 的流程

Master 在接受到 RequestSubmitDriver 后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for (driver <- waitingDrivers.toList) { 
var launched = false
var isClusterIdle = true
// 已经访问过的 Workers
var numWorkersVisited = 0
// 当访问过的 Worker 的数量小于总的活着的 Worker 的数量,并且没有启动 Driver
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty
numWorkersVisited += 1
if (canLaunchDriver(worker, driver.desc)) {
val allocated = worker.acquireResources(driver.desc.resourceReqs)
driver.withResources(allocated)
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
if (!launched && isClusterIdle) {
logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")
}
}

2.3.1. Driver 启动资源判断

判断 worker 的内存、cpu、resource 三个指标是否同时符合 driver 的启动要求

  • Worker空余的内存 >= driver 所需要的内存
  • Worker 空余的 CPU 核数 >= driver 所需要的 CPU 核数
1
2
3
4
5
6
private def canLaunch(worker: WorkerInfo, memoryReq: Int, coresReq: Int, resourceRequirements: Seq[ResourceRequirement]): Boolean = {
val enoughMem = worker.memoryFree >= memoryReq
val enoughCores = worker.coresFree >= coresReq
val enoughResources = ResourceUtils.resourcesMeetRequirements(worker.resourcesAmountFree, resourceRequirements)
enoughMem && enoughCores && enoughResources
}

2.3.2. 启动 Driver

Driver 启动逻辑主要是将 worker 和 driver 相互引用,更新 worker 的资源使用情况,并向对应的 worker 节点发送 LaunchDriver 事件

  1. worker 添加 driver

    1
    2
    worker.addDriver(driver)
    driver.worker = Some(worker)
  2. 向 worker 发送 LaunchDriver 消息

    当前的 application 申请资源,向 worker 发送消息,触发 Worker 的 $receive()$ 方法。

    $Worker.receive()$ 方法中,当 Worker 遇到 LaunchDriver 指令时,创建并启动一个 DriverRunner,DriverRunner 启动一个线程,异步的处理 Driver 启动工作。

    1
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
    • worker 接收到 master 发送来的 LanuchDriver 事件,创建 DriverRunner 对象(内部封装了一个线程)

      DriverRunner 中是开辟线程异步的处理 Driver 启动工作,不会阻塞主进程的执行

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      val driver = new DriverRunner(
      conf,
      driverId,
      workDir,
      sparkHome,
      driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
      self,
      workerUri,
      workerWebUiUrl,
      securityMgr,
      resources_
      )
    • 将 DriverRunner 对象添加进该 Worker 内部的 driver 列表

      1
      val drivers = new HashMap[String, DriverRunner]
    • 调用 $DriverRunner.start()$ 方法,同时修改该 worker 进程的可用 cores 个数,内存个数

      在 $start()$ 方法中,直接启动了一个 Thread,在线程处理中,调用了 $prepareAndRunDriver()$

      • $prepareAndRunDriver()$

        • 创建 driver 工作目录

        • 从 master 中下载要执行的 jar 包(移动计算、不移动数据)

          如果是 java 编写的 spark 程序,使用 maven 打的包

        • 构建 ProcessBuilder

        • 启动 ProcessBuilder(使用 java 的 API 去操作一个 java 的进程 Process Runtime)

        • 返回退出码 exitCode

      • 根据 exitCode,设置 finalState,状态分为完成,被杀死,失败“finalState”

      • 向 worker 发送 DriverStateChanged 信息通知 Driver 发生状态改变

      • 向 worker 发送 DriverStateChanged 消息

        Driver 在此启动起来,向 worker 发送 DriverStateChanged (driverId, state, finalException)事件通知,worker 接收到后,向 master 发送相同的DriverStateChanged(driverId, state, finalException) 事件通知,如果 Driver 的状态是错误、完成、杀死、失败,就移除 Driver

Driver 用于执行 Spark 用户任务中的 $main$ 方法,负责实际代码的执行工作,触发整个任务执行,Master 通知 Worker 启动 Driver,就是启动了一个 JVM,并且开始使用反射的方式执行 DriverWrapper 的 $main$ 方法。在 $DriverWrapper.main()$ 方法中,会执行用户提交的 jar 包中的 $main$ 方法。

  1. 初始化 SparkContext

    1. 初始化 DAGScheduler

      DAGScheduler 的主要作用是: 当 driver 执行 action 算子的时候,DAGScheduler 会从最后一个 rdd 开始往前递归寻找 ShuffleDependency,每找到一个就会划分一个 stage,最终达到的目的是,根据业务逻辑算子依赖关系,划分成一个个 stage,并且根据 stage 先后关系,把 stage 转化成 ShuffleMapTask 或者 ResultTask,封装成 TaskSet,交给 TaskScheduler 提交。

    2. 初始化 TaskScheduler

      TaskScheduler 是一个 trait,它的核心任务是提交 TaskSet 到集群并汇报结果,并且向 DAGScheduler 汇报任务的执行情况。

    3. 初始化 SchedulerBackend

      SchedulerBackend 也是一个 trait,不同的部署模式有不同的实现,比如在 standalone 模式下,实现类是:StandaloneSchedulerBackend。

      它内部有两个 Endpoint 组件,一个是 DriverEndpoint ,用来和 Worker 打交道,比如创建 Executor、提交 Task 等;一个是 ClientEndpoint 用来和 master 打交道。

      SchedulerBackend 专门负责收集 Worker 上的资源信息,它知道每个任务在提交时,自己拥有多少资源,然后去具体运行 Task。

2.4. 在集群 Worker 上启动 Executor

$startExecutorsOnWorkers()$ 方法首先会从 waitingApps 中获取提交的 app,然后会对每个 worker 提供多少 cpu 和启动多少 Executor 做计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private def startExecutorsOnWorkers(): Unit = {
for (app <- waitingApps) {
for (rpId <- app.getRequestedRPIds()) {
val resourceDesc = app.getResourceDescriptionForRpId(rpId)
// Executor 所需要的核数
val coresPerExecutor = resourceDesc.coresPerExecutor.getOrElse(1)

if (app.coresLeft >= coresPerExecutor) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, resourceDesc))
.sortBy(_.coresFree).reverse
val appMayHang = waitingApps.length == 1 &&
waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
if (appMayHang) {
logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
}
// 计算每个 Worker 上所分配的 cores 的集合
val assignedCores = scheduleExecutorsOnWorkers(app, rpId, resourceDesc, usableWorkers, spreadOutApps)
// 针对当前这个 Application 在每一个可以含有足够资源启动 Executors 的 Worker 上启动 Executor
for (pos <- usableWorkers.indices if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(app, assignedCores(pos), resourceDesc, usableWorkers(pos), rpId)
}
}
}
}
}

2.4.1. 资源判断

当 application 剩余需要的 cpu 数大于单个 executor 需要的 cpu 数据才进行新 executor 的创建

如果 application 的剩余需求 cpu 小于单 executor 需要的 cpu 数,要么是已经存在了运行的 executor,要么就是配置有问题,正常情况下很少出现 application 需求 cpu 数小于 executor 创建需求 cpu 数的情况。

2.4.2. 过滤出含有足够的资源启动 Executors 的 Workers

1
2
3
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, resourceDesc))
.sortBy(_.coresFree).reverse
  1. Worker 的状态是 ALIVE

  2. $canLaunchExecutor$

    1. Worker 的空余内存 >= Application 启动 Executor 所需要的内存
    2. Worker 的空余核数 >= Application 启动 Executor 所需要的核数
  3. 根据 Worker 空余的核数倒叙排序

2.4.3. 计算每个 worker 要分配出去的 CPU 数: scheduleExecutorsOnWorkers()

计算 worker 分配 cpu 数的算法有两种,一种是 spreadOutApps(默认):遍历 worker,一次只在 worker 上分配一个 executor,另一种是非 spreadOutApps:尽量在一个 worker 启动所有 executor,直到分配结束或者资源不够。

  1. 变量

    1. Executor CPU

      1
      2
      3
      4
      val coresPerExecutor = app.desc.coresPerExecutor   // 配置中每个 Executor 的 CPU
      val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // 每个 Executor 的最小 CPU 数(默认为 1 核)
      // // 判断配置中的每个 Executor 的 CPU 是否为空(如果为空,则表示每个 Executor 只用分配一个 CPU)
      val oneExecutorPerWorker = coresPerExecutor.isEmpty
    2. Executor 内存

      1
      val memoryPerExecutor = resourceDesc.memoryMbPerExecutor
    3. 可用 Worker 的数量

      1
      val numUsable = usableWorkers.length
    4. 每个 Worker 分配的 CPU 的集合

      创建一个空数组,存储了每个 worker 要分配出去的 CPU 数量

      1
      val assignedCores = new Array[Int](numUsable)    
    5. 每个 Worker 上分配的 Executor 个数的集合

      1
      val assignedExecutors = new Array[Int](numUsable)
    6. 可以/需要分配的核数(取 Application 所需要的核数与所有 Worker 空余的核数总和的最小值)

      1
      var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) 
  2. 获取可以符合 executor 构建条件的 workers

    1
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
    • 判断指定的 worker 是否可以为这个 Application 启动一个 Executor: $canLaunchExecutor()$

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      def canLaunchExecutorForApp(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      // 判断是否有足够的核数
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
      val assignedExecutorNum = assignedExecutors(pos)

      // 如果在当前 Worker 上没有启动 Executor,或者一个 Executor 上需要启动多个 cores
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
      if (launchingNewExecutor) {
      // 已经分配的 memory
      val assignedMemory = assignedExecutorNum * memoryPerExecutor
      // 判断是否有足够的 memory: 当前 worker 空余 memory - 当前 worker 已经分配的 memory >= 一个 Executor 所需要的最小 memory
      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
      val assignedResources = resourceReqsPerExecutor.map {
      req => req.resourceName -> req.amount * assignedExecutorNum
      }.toMap
      val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
      case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
      }
      val enoughResources = ResourceUtils.resourcesMeetRequirements(
      resourcesFree, resourceReqsPerExecutor)
      val executorNum = app.getOrUpdateExecutorsForRPId(rpId).size
      val executorLimit = app.getTargetExecutorNumForRPId(rpId)
      val underLimit = assignedExecutors.sum + executorNum < executorLimit
      keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
      } else {
      keepScheduling && enoughCores
      }
      }
  1. worker 分配 CPU

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // 一直提交 Executor,直到没有可用的 Worker 或者是到达了 Application 所需要的的 Executor 的上限
    while (freeWorkers.nonEmpty) {
    freeWorkers.foreach { pos =>
    var keepScheduling = true
    while (keepScheduling && canLaunchExecutorForApp(pos)) {
    // 更新待分配 CPU 核数和已分配 CPU 核数
    coresToAssign -= minCoresPerExecutor
    assignedCores(pos) += minCoresPerExecutor
    // 如果每个 Worker 只启动一个 Executor ,每一次循环给这个 Executor 分配一个 core 否则。每一次循环给一个新的 Executor 增加一个 core
    if (oneExecutorPerWorker) {
    assignedExecutors(pos) = 1
    } else {
    assignedExecutors(pos) += 1
    }
    // spreadOutApps >>> 尽量分配 Executors 到最多的 Worker 上;
    // 非 spreadOutApps >>> 紧着一个 Worker 分配 Executors,直到这个 Worker 的资源被用尽。
    if (spreadOutApps) {
    keepScheduling = false
    }
    }
    }
    // 每一次循环,过滤出来可以提交至少一个 Executor 的 workers
    freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
    }

2.4.4. 向 worker 发送对应的 LaunchExecutor 事件

通过上一步计算出了每个 worker 要分配出去的 cpu 数,$allocateWorkerResourceToExecutors()$ 遍历可用 worker,如果该 worker 需要分配的 cpu 数大于 0,则向该 worker 发送对应的 LaunchExecutor 事件

  1. 计算 worker 运行 executor 数目

    根据当前 worker 要分配的 cpu 总核数和每个 executor 需要的 cpu 数,计算出当前 worker 要分配的 executor 数

    1
    2
    val coresPerExecutor = resourceDesc.coresPerExecutor
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  2. 获取每个 executor 需要的 cpu 数

    1
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  3. 创建指定数目的 executor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    for (i <- 1 to numExecutors) {
    // //从 worker 获取指定资源
    val allocated = worker.acquireResources(resourceDesc.customResourcesPerExecutor)
    // 创建 ExecutorDesc 对象,并将其记录到 application 中
    val exec = app.addExecutor(worker, coresToAssign, resourceDesc.memoryMbPerExecutor, allocated, rpId)
    // 发起 executor 的创建
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING
    }
    • Worker 添加 executor

      将 executor 加入 worker 内部的缓存

      1
      worker.addExecutor(exec)
    • 向 Worker 发送 LaunchExecutor 消息,通知 Worker 节点启动 executor

      1
      2
      worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
      exec.rpId, exec.application.desc, exec.cores, exec.memory, exec.resources))
    • 向 Driver 发送 ExecutorAdded 消息

      发消息告诉该 application 对应 Driver 该 worker 上的 executor 已经启动

      1
      exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))

https://blog.csdn.net/Interest1_wyt/article/details/125345385

管理Driver的执行,包括在Driver失败的时候自动重启,要是在Standalone的模式下。失败是否重试是看DriverDescription中的supervise是否为true,如果指定了这个参数为true,driver在失败的时候worker会负责启动这个Driver

https://cloud.tencent.com/developer/article/1887711
https://segmentfault.com/a/1190000040448608