Spark-源码学习-SparkCore-调度机制-任务调度-Task 调度-本地性
Task 本地性的分配优先考虑有较高的本地性的级别,否则分配较低的本地性级别,直到 ANY。
TaskSet 可以有一到多个本地性级别, 但在给 Task 分配本地性时只能是其中的一个。Taskset 中的所有 Task 都具有相同的允许使用的本地性级别,但在运行期可能因为资源不足、运行时间等因素,导致同一 Taskset 中的各个 Task 的本地性级别可能不同。
TaskSet 中实现的本地性操作包括对 Taskset 的本地性级别进行计算、获取某个本地性级别的等待时间、给 Task 分配资源时获取允许的本地性级别等。
- computeValidLocalityLevels()
computeValidLocalityLevels() 用于计算有效的本地性级别,将 Task 按照本地性级别,由高到低地分配给允许的 Executor。
- 如果存在 Executor 上待处理的 Task 的集合且
PROCESS_LOCAL
级别的等待时间不为0,还存在已被激活的 Executor (即
pendingTasksForExecutor 中的 Executorld 有存在于 TaskSchedulerImpl 的 executorIdToRunningTasklds中的),那么
允许的本地性级别里包括PROCESS_LOCAL
- 如果存在 Host 上待处理的Task的集合(即 pendingTasksForHost不为空) 且
NODE_LOCAL
级别的等待时间不为0,除此
以外, Host 上存在已被激活的Executor(即 pendingTasksForHost 中的 Host 有存在于 TaskSchedulerImpl 的
hostToExecutors 中的), 那么允许的本地性级别里包括NODE_LOCAL
。 - 如果存在没有任何本地性偏好的待处理 Task,那么允许的本地性级别里包括
NO_PREF
- 如果存在机架上待处理的Task的集合(即 pendingTasksForRack 不为空)且
RACK_LOCAL
级别的等待时间不为 0,除此
以外,机架上存在已被激活的 Executor (即 pendingTasksForRack 中的机架有存在于 TaskSchedulerImpl 的 hostsByRack中的),那么允许的本地性级别里包括RACK_LOCAL
- 允许的本地性级别里增加 ANY。
- 返回所有允许的本地性级别。
- getLocalityWait()
getLocalityWait() 方法用于获取某个本地性级别的等待时间。
- 获取默认的等待时间。默认的等待时间可以通过 sparklocalitywait 属性配置,默认为3秒。
- 根据本地性级别匹配到对应的配置展性。
PROCESS_LOCAL
级别的配置属性为spark.locality.wait.process
NODE_LOCAL
级别的配置属性为spark.locality.wait.node
,RACK_LOCAL
级别的配置属性为spark.locality.wait.rack
。其他级别没有配置属性。 - 使用第(2)步得到的展性名称, 获取本地性级别对应的等待时间。如果第(2)步没有得到属性名称,那么将返回0。
- getLocalityIndex()
从 myLocalityLevels 中找出指定的本地性级别所对应的索引。
- getAllowedLocalityLevel()
获取允许的本地性级别
- 按照索引由高到低从 myLocalityLevels 读取本地性级别,然后执行以下操作:
- 调用 moreTasksToRunln 方法判断本地性级别对应的待处理 Task 的缓存结构中是否有 Task 需要处理。
- 如果没有 Task 需要处理, 则将最后的运行时间设置为curTime。
- 如果有 Task 需要处理且 curTime 与最后运行时问的差值大于当前本地性级别的等待时间,则将最后的运行时间增加
当前本地性级别的等待时间(这样实际将直接跳入更低的本地性级别) - 如果有Task需要处理且curTime与最后运行时间的差值小于等于当前本地性级别的等待时间,则返回当前本地性级
别。
如果上一步未能找到允许的本地性级别,那么返回最低的本地性级别。
resourceOffer()
resourceOffer() 方法在 TasksetManager 处于 “僵尸” 状态并且分配 Task 的 Host 和 Executor 在黑名单中时直接返回 None,否则执行如下步骤:
- 计算允许的本地性级别。
如果最大本地性级别(即 maxLocality )为 NO_PREF
, 则允许的本地性级别为 NO_PREF
。如果
maxLocality 不是 NO_PREF
, 则允许的本地性级别为 maxLocality 和调用 getAllowedLocalityLevel() 获取的本地性级别中较小的
本地性级别。
- 调用 dequeueTask() 方法, 根据指定的 Host、Executor 和本地性级别,找出三元组(包括要执行的 Task 的索引、相应的本地性
级别、是否推断执行的标记),并对三元组进行如下操作:
- 根据要执行的 Task 的索引找到要执行的 Task
- 为 Task 生成新的身份标识
- 将 Task 对应的 copiesRunning 信息 +1,即增加复制运行数
- 获取任务尝试号 attemptNum
- 创建任务尝试信息(TaskInfo)。
- 将 Task 的身份标识与 TaskInfo 的对应关系放入 taskInfos。
- 如果 maxLocality 不是
NO_PREF
,那么调用 getLocalityIndex() 方法,获取任务的本地性偏好级别在 myLocalityLevels 中的
索引, 并将最后一次运行时间设置为当前系统时问。 - 将 Task、用户添加的 Jar 包及其他文件序列化,得到需要经过网络传输的序列化 Task。
- 如果序列化 Task 的大小超过了 100KB,且 emittedTaskSizeWarning 仍然为 false, 则将 emittedTaskSizeWarning 设置为
true 并且打印警告日志。 - 调用 addRunningTask 方法, 向 runningTasksSet 中添加 Task 的身份标识,并增加父调度池及祖父调度池中记录的当前正在运行的任务数量。
- 生成Task的名称,格式为: “task $index.$attemptNumber in stage $stageld.$stage AttemptId”。
- 调用 DAGScheduler 的 taskStarted() 方法向 DAGSchedulerEventProcessLoop 投递 BeginEvent 事件。
- 创建并返回 TaskDes
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Joker!
评论
ValineTwikoo