一、概述

二、TaskSetManager

DAGScheduler 将 Stage 打包到 TaskSet 交给 TaskScheduler, TaskSet 调度池中对 Task 进行调度管理的基本单位, TaskScheduler 会 将 TaskSet 封装为 TaskSetManager, 负责监控管理同一个 Stage 中的 Tasks, TaskScheduler 就是以 TaskSetManager 为单元来调度任务。

TaskSetManager 负责监控管理同一个 Stage 中的 Tasks,TaskScheduler 会 先 把 DAGScheduler 给 过 来 的 TaskSet 封装成 TaskSetManager 放到任务队列里,然后再按照指定的调度策略在调度队列中选择 TaskSetManager

三、任务调度器 TaskScheduler

Spark Task 的调度是由 TaskScheduler 来完成,TaskScheduler 是实现多种任务调度器的基础,不过当前 TaskSchedulerImpl 是唯一实现。

3.1. 结构

3.1.1. 属性

3.2. 初始化

$SparkContext.createTaskScheduler()$ 会根据传入的 Master 的 URL 的规则判断集群的部署方式(或者说是资源管理方式),比如是 Standalone、Mesos、YARN 或者是 Local 等。根据不同的部署方式,生成不同的 TaskScheduler 和 SchedulerBackend。

SchedulerBackend 是一个 trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的 Task 分配计算资源(即 Executor),并且在分配的 Executor 上启动 Task,完成计算的调度过程。它使用 $reviveOffers()$ 完成上述的任务调度。

以 Standalone 模式为例,创建过程如下:

  1. 初始化 SchedulerBackend

    使用参数传递的 SchedulerBackend 设置 TaskSchedulerImpl 的 backend 属性 SchedulerBackend 是 TaskScheouler 的调度后端接口。TaskScheduler 给 Task 分配资源实际是通过 SchedulerBackend 来完成的, SchedulerBackend 给 Task 分配完资源后将与分配给 Task 的Executor 通信,并要求后者运行 Task。

  2. 创建根调度池

    创建根调度池

  3. 创建调度池构建器

    1. 根据调度模式,创建相应的调度池构建器。由于 SchedulingMode 默认为 FIFO, 所以创建的调度构建器默认为 FIFOSchedulableBuilder
    2. 调用调度池构建器的 $buildPools()$ 方法构建调度池

3.2.1. 初始化 TaskSchedulerImpl

1
val scheduler = new TaskSchedulerImpl(sc)

3.2.2. 初始化 StandaloneSchedulerBackend

1
2
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

3.2.3. initialize

响应资源调度请求,为 Task 分配具体的资源,输入是 Executor 列表,输出是 TaskDescription 二维数据,存储 Task ID、Executor ID 和 Task 执行环境依赖信息等。

1
scheduler.initialize(backend)

3.3. 启动

启动任务调度器是通过调用其 $start()$ 方法实现。

1
2
3
4
5
6
7
8
9
override def start(): Unit = {
backend.start()

if (!isLocal && conf.get(SPECULATION_ENABLED)) {
speculationScheduler.scheduleWithFixedDelay(
() => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}

3.3.1. 启动 SchedulerBackend

3.3.2. 设置检查可推断任务的定时器

**当应用不在 local 模式下,并且设置了推断执行(即 **spark.speculation 属性为 true), 那么设置一个执行间隔为 SPECULATION_INTERVAL_MS (默认为100ms) 的检查可推断任务的定时器。此定时器通过调用 checkSpeculatableTasks() 方法来检查可推断任务。

四、Task 提交

$TaskSchedulerImpl.submitTasks()$ 负责 Task 提交~

DAGScheduler 将 Stage 中各个分区的 Task 封装为 TaskSet 后,会将 TaskSet 交给 TaskSchedulerImpl 处理。TaskSchedulerImpl 的 $submitTasks()$ 方法是这一过程的入口

  1. 获取 TaskSet 中的所有 Task
  2. 调用 createTaskSetManager() 方法创建 TaskSetManager
  3. 在 taskSetsByStageIdAndAttempt() 中设置 TaskSet 关联的 Stage、 Stage 尝试及刚创建的 TaskSetManager 之间的三级映射
    关系。
  4. 对当前 TaskSet 进行冲突检测, 即 taskSetsByStageldAndAttempt() 中不应该存在同属于当前 Stage, 但是 TaskSet 却不相同的情况。
  5. 调用调度池构建器的 addTaskSetManager() 方法, 将刚创建的 TaskSetManager 添加到调度池构建器的调度池中
  6. 如果当前应用程序不是 Local 模式并且 TaskSchedulerImpl 还没有接收到 Task, 那么设置一个定时器按照 STARVATION_TIMEOUT_MS 指定的时间间隔检查 TaskSchedulerImpl 的饥饿状况, 当 TaskSchedulerImpl 已经运行 Task 后, 取消此定时器
  7. 将 hasReceivedTask 设置为 true,以表示 TaskSchedulerImpl 已经接收到 Task
  8. 调用 SchedulerBackend 的 reviveoffers 方法给 Task 分配资源并运行 Task

4.1. 将 TaskSet 加入 TaskSetManager

4.1.1. 创建 TaskSetManager

4.2. 将 TaskSetManager 加入 调度池

$SchedulableBuilder.addTaskSetManager()$~

4.2.1. 初始化调度池 Pool

TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。这个调度过程实际上还是比较粗粒度的,是面向 TaskSetManager 的。调度方式如果是 FIFO,schedulableBuilder 的实现是 FIFOSchedulableBuilder; 调度方式如果是 FAIR,schedulableBuilder 的实现就是FairSchedulableBuilder。它们的创建方式如下:

1
2
3
4
5
6
7
8
9
10
11
def initialize(backend: SchedulerBackend): Unit = {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, sc)
case _ =>...
}
}
schedulableBuilder.buildPools()
}

4.2.2. 将 TaskSetManager 加入调度池

4.3. 资源分配

$backend.reviveOffers()$

以 Local 模式下 SchedulerBackend 的实现 LocalSchedulerBackend 为例, LocalSchedulerBackend 的 receiveOffers() 方法实际向 LocalEndpoint 发送了 Reviveoffers 消息。LocalEndpoint 接收到 Reviveoffers 消息后。将调用 LocalEndpoint 自己的receiveOffers() 方法, 最终调用 TaskSchedulerImpl 的 resourceOffers() 方法给 Task 分配能源。

  1. 遍历 WorkerOffer 序列,对每一个 WorkerOffer 执行以下操作:

    • 更新 Host 与 Executor 的各种映射关系

    • 调用 TaskSchedulerImpl 的 executorAdded 方法向 DAGScheduler 的 DAGSchedulerEventProcessLoop 投递 ExecutorAdded事件

    • 标记添加了新的 Executor(即将 newExecAvall 设置为 true)

    • 更新 Host 与机架之间的关系。

      这里的 hostToExecutors 及 hostsByRack 是为了在资源分配时计算 Task 本地性时使用

  2. 对所有 Workeroffer 随机洗牌,避免将任务总是分配给同样一组 Worker

  3. 根据每个 Workeroffer 的可用的 CPU 核数创建同等尺寸的任务描述 (TaskDescription) 数组

  4. 将每个 Workeroffer 的可用的 CPU 核数统计到可用 CPU (availableCpus) 数组中

  5. 调用 rootPool 的 getSortedTaskSetQueue() 方法,对 rootPool 中的所有 TaskSetManager 按照调度算法排序

  6. 如果 newExecAvail 为 true, 调用每个 TaskSetManager 的 executorAdded 方法。此 executorAdded 方法实际调用了

    computeValidLocalityLevels() 方法重新计算 TaskSet 的本地性

  7. 遍历 TaskSetManager,按照最大本地性的原则 (即从高本地性级别到低本地性级别)调用 resourceOfferSingleTaskSet() 方

    法, 给单个 TaskSet 中的 Task 提供资源。如果在任何 TaskSet 所允许的本地性级别下, TaskSet 中没有任何一个任务获得了资

    源, 那么将调用 TaskSetManager 的 abortlfCompletelyBlacklisted 方法, 放弃在黑名单中的Task。

  8. 返回生成的 TaskDescription 列表, 即已经获得了资源的任务列表。

    resourceOfferSingleTaskSet() 方法将遍历 Workeroffer 序列, 对每一个 Workeroffer 执行以下操作:

    • 获取 WorkerOffer 的 Executor 的身份标识

    • 获取 WorkerOffer 的 Host

    • 如果 Workeroffer 的可用的 CPU 核数大于等于 CPUS_PER_TASK, 则执行以下操作:

      • 调用 TaskSetManager 的 resourceoffer() 方法, 给符合条件的待处理 Task 创建 TaskDescription

      • 将 TaskDescription 添加到 tasks 数组。

      • 更新 Task 的身份标识与 TaskSet、Executor 的身份标识相关的缓存映射。

      • 由于给 Task 分配了 CPUS_PER_TASK 指定数量的 CPU 内核数, 因此 Workeroffer 的可用的 CPU 核数减去

        CPUS_PER_TASK

五、总结

  1. 代表 DAGScheduler 调用 TaskScheduler 的 submitTasks() 方法向 TaskScheduler 提交 TaskSet

  2. 代表 TaskScheduler 接收到 Taskset 后, 创建对此 TaskSet 进行管理的 TaskSetManeger, 并将此 TaskSetManager 通过调度池

    构建器添加到根调度池中

  3. 代表 TaskScheduler 调用 SchedulerBackend 的 reviveOffers() 方法给 Task 提供资源

  4. SchedulerBackend 向 RpcEndpoint 发送 ReviveOffers 消息

  5. RpcEndpoint 将调用 TaskScheduler 的 resourceOffers() 方法给 Task 提供资源

  6. TaskScheduler 调用根调度池的 getSortedTaskSetQueue() 方法对所有 TaskSetManager 按照调度算法进行排序后, 对

    TaskScheduler 管理的 TaskSet 按服 “最大本地性” 的原则选择其中的 Task,最后为 Task 创建尝试执行信息、对 Task 进行序列化、生成 TaskDescription 等。

  7. 调用 Executor 的 launchTask() 方法运行 Task 尝试

https://weread.qq.com/web/reader/0c832fb05e12e40c8ca6190kad63251024aad61ab143c7e