Spark-源码学习-SparkCore-调度机制-任务调度-Task 调度
一、概述
二、TaskSetManager
DAGScheduler 将 Stage 打包到 TaskSet 交给 TaskScheduler, TaskSet 调度池中对 Task 进行调度管理的基本单位, TaskScheduler 会 将 TaskSet 封装为 TaskSetManager, 负责监控管理同一个 Stage 中的 Tasks, TaskScheduler 就是以 TaskSetManager 为单元来调度任务。
TaskSetManager 负责监控管理同一个 Stage 中的 Tasks,TaskScheduler 会 先 把 DAGScheduler 给 过 来 的 TaskSet 封装成 TaskSetManager 放到任务队列里,然后再按照指定的调度策略在调度队列中选择 TaskSetManager
三、任务调度器 TaskScheduler
Spark Task 的调度是由 TaskScheduler 来完成,TaskScheduler 是实现多种任务调度器的基础,不过当前 TaskSchedulerImpl 是唯一实现。
3.1. 结构
3.1.1. 属性
3.2. 初始化
$SparkContext.createTaskScheduler()$ 会根据传入的 Master 的 URL 的规则判断集群的部署方式(或者说是资源管理方式),比如是 Standalone、Mesos、YARN 或者是 Local 等。根据不同的部署方式,生成不同的 TaskScheduler 和 SchedulerBackend。
SchedulerBackend 是一个 trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的 Task 分配计算资源(即 Executor),并且在分配的 Executor 上启动 Task,完成计算的调度过程。它使用 $reviveOffers()$ 完成上述的任务调度。

以 Standalone 模式为例,创建过程如下:
初始化 SchedulerBackend
使用参数传递的 SchedulerBackend 设置 TaskSchedulerImpl 的 backend 属性 SchedulerBackend 是 TaskScheouler 的调度后端接口。TaskScheduler 给 Task 分配资源实际是通过 SchedulerBackend 来完成的, SchedulerBackend 给 Task 分配完资源后将与分配给 Task 的Executor 通信,并要求后者运行 Task。
创建根调度池
创建根调度池
创建调度池构建器
- 根据调度模式,创建相应的调度池构建器。由于 SchedulingMode 默认为 FIFO, 所以创建的调度构建器默认为 FIFOSchedulableBuilder
- 调用调度池构建器的 $buildPools()$ 方法构建调度池
3.2.1. 初始化 TaskSchedulerImpl
1 | val scheduler = new TaskSchedulerImpl(sc) |
3.2.2. 初始化 StandaloneSchedulerBackend
1 | val masterUrls = sparkUrl.split(",").map("spark://" + _) |
3.2.3. initialize
响应资源调度请求,为 Task 分配具体的资源,输入是 Executor 列表,输出是 TaskDescription 二维数据,存储 Task ID、Executor ID 和 Task 执行环境依赖信息等。
1 | scheduler.initialize(backend) |
3.3. 启动
启动任务调度器是通过调用其 $start()$ 方法实现。
1 | override def start(): Unit = { |
3.3.1. 启动 SchedulerBackend
3.3.2. 设置检查可推断任务的定时器
**当应用不在 local 模式下,并且设置了推断执行(即 **spark.speculation
属性为 true), 那么设置一个执行间隔为 SPECULATION_INTERVAL_MS
(默认为100ms) 的检查可推断任务的定时器。此定时器通过调用 checkSpeculatableTasks() 方法来检查可推断任务。
四、Task 提交
$TaskSchedulerImpl.submitTasks()$ 负责 Task 提交~
DAGScheduler 将 Stage 中各个分区的 Task 封装为 TaskSet 后,会将 TaskSet 交给 TaskSchedulerImpl 处理。TaskSchedulerImpl 的 $submitTasks()$ 方法是这一过程的入口
- 获取 TaskSet 中的所有 Task
- 调用 createTaskSetManager() 方法创建 TaskSetManager
- 在 taskSetsByStageIdAndAttempt() 中设置 TaskSet 关联的 Stage、 Stage 尝试及刚创建的 TaskSetManager 之间的三级映射
关系。 - 对当前 TaskSet 进行冲突检测, 即 taskSetsByStageldAndAttempt() 中不应该存在同属于当前 Stage, 但是 TaskSet 却不相同的情况。
- 调用调度池构建器的 addTaskSetManager() 方法, 将刚创建的 TaskSetManager 添加到调度池构建器的调度池中
- 如果当前应用程序不是 Local 模式并且 TaskSchedulerImpl 还没有接收到 Task, 那么设置一个定时器按照
STARVATION_TIMEOUT_MS
指定的时间间隔检查 TaskSchedulerImpl 的饥饿状况, 当 TaskSchedulerImpl 已经运行 Task 后, 取消此定时器 - 将 hasReceivedTask 设置为 true,以表示 TaskSchedulerImpl 已经接收到 Task
- 调用 SchedulerBackend 的 reviveoffers 方法给 Task 分配资源并运行 Task
4.1. 将 TaskSet 加入 TaskSetManager
4.1.1. 创建 TaskSetManager
4.2. 将 TaskSetManager 加入 调度池
$SchedulableBuilder.addTaskSetManager()$~
4.2.1. 初始化调度池 Pool
TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。这个调度过程实际上还是比较粗粒度的,是面向 TaskSetManager 的。调度方式如果是 FIFO,schedulableBuilder 的实现是 FIFOSchedulableBuilder; 调度方式如果是 FAIR,schedulableBuilder 的实现就是FairSchedulableBuilder。它们的创建方式如下:
1 | def initialize(backend: SchedulerBackend): Unit = { |
4.2.2. 将 TaskSetManager 加入调度池

4.3. 资源分配
$backend.reviveOffers()$
以 Local 模式下 SchedulerBackend 的实现 LocalSchedulerBackend 为例, LocalSchedulerBackend 的 receiveOffers() 方法实际向 LocalEndpoint 发送了 Reviveoffers 消息。LocalEndpoint 接收到 Reviveoffers 消息后。将调用 LocalEndpoint 自己的receiveOffers() 方法, 最终调用 TaskSchedulerImpl 的 resourceOffers() 方法给 Task 分配能源。
遍历 WorkerOffer 序列,对每一个 WorkerOffer 执行以下操作:
更新 Host 与 Executor 的各种映射关系
调用 TaskSchedulerImpl 的 executorAdded 方法向 DAGScheduler 的 DAGSchedulerEventProcessLoop 投递 ExecutorAdded事件
标记添加了新的 Executor(即将 newExecAvall 设置为 true)
更新 Host 与机架之间的关系。
这里的 hostToExecutors 及 hostsByRack 是为了在资源分配时计算 Task 本地性时使用
对所有 Workeroffer 随机洗牌,避免将任务总是分配给同样一组 Worker
根据每个 Workeroffer 的可用的 CPU 核数创建同等尺寸的任务描述 (TaskDescription) 数组
将每个 Workeroffer 的可用的 CPU 核数统计到可用 CPU (availableCpus) 数组中
调用 rootPool 的 getSortedTaskSetQueue() 方法,对 rootPool 中的所有 TaskSetManager 按照调度算法排序
如果 newExecAvail 为 true, 调用每个 TaskSetManager 的 executorAdded 方法。此 executorAdded 方法实际调用了
computeValidLocalityLevels() 方法重新计算 TaskSet 的本地性
遍历 TaskSetManager,按照最大本地性的原则 (即从高本地性级别到低本地性级别)调用 resourceOfferSingleTaskSet() 方
法, 给单个 TaskSet 中的 Task 提供资源。如果在任何 TaskSet 所允许的本地性级别下, TaskSet 中没有任何一个任务获得了资
源, 那么将调用 TaskSetManager 的 abortlfCompletelyBlacklisted 方法, 放弃在黑名单中的Task。
返回生成的 TaskDescription 列表, 即已经获得了资源的任务列表。
resourceOfferSingleTaskSet() 方法将遍历 Workeroffer 序列, 对每一个 Workeroffer 执行以下操作:
获取 WorkerOffer 的 Executor 的身份标识
获取 WorkerOffer 的 Host
如果 Workeroffer 的可用的 CPU 核数大于等于
CPUS_PER_TASK
, 则执行以下操作:调用 TaskSetManager 的 resourceoffer() 方法, 给符合条件的待处理 Task 创建 TaskDescription
将 TaskDescription 添加到 tasks 数组。
更新 Task 的身份标识与 TaskSet、Executor 的身份标识相关的缓存映射。
由于给 Task 分配了
CPUS_PER_TASK
指定数量的 CPU 内核数, 因此 Workeroffer 的可用的 CPU 核数减去CPUS_PER_TASK
五、总结
代表 DAGScheduler 调用 TaskScheduler 的 submitTasks() 方法向 TaskScheduler 提交 TaskSet
代表 TaskScheduler 接收到 Taskset 后, 创建对此 TaskSet 进行管理的 TaskSetManeger, 并将此 TaskSetManager 通过调度池
构建器添加到根调度池中
代表 TaskScheduler 调用 SchedulerBackend 的 reviveOffers() 方法给 Task 提供资源
SchedulerBackend 向 RpcEndpoint 发送 ReviveOffers 消息
RpcEndpoint 将调用 TaskScheduler 的 resourceOffers() 方法给 Task 提供资源
TaskScheduler 调用根调度池的 getSortedTaskSetQueue() 方法对所有 TaskSetManager 按照调度算法进行排序后, 对
TaskScheduler 管理的 TaskSet 按服 “最大本地性” 的原则选择其中的 Task,最后为 Task 创建尝试执行信息、对 Task 进行序列化、生成 TaskDescription 等。
调用 Executor 的 launchTask() 方法运行 Task 尝试
https://weread.qq.com/web/reader/0c832fb05e12e40c8ca6190kad63251024aad61ab143c7e