Task 本地性的分配优先考虑有较高的本地性的级别,否则分配较低的本地性级别,直到 ANY。
TaskSet 可以有一到多个本地性级别, 但在给 Task 分配本地性时只能是其中的一个。Taskset 中的所有 Task 都具有相同的允许使用的本地性级别,但在运行期可能因为资源不足、运行时间等因素,导致同一 Taskset 中的各个 Task 的本地性级别可能不同。
TaskSet 中实现的本地性操作包括对 Taskset 的本地性级别进行计算、获取某个本地性级别的等待时间、给 Task 分配资源时获取允许的本地性级别等。

  1. computeValidLocalityLevels()

computeValidLocalityLevels() 用于计算有效的本地性级别,将 Task 按照本地性级别,由高到低地分配给允许的 Executor。

  • 如果存在 Executor 上待处理的 Task 的集合且 PROCESS_LOCAL 级别的等待时间不为0,还存在已被激活的 Executor (即
    pendingTasksForExecutor 中的 Executorld 有存在于 TaskSchedulerImpl 的 executorIdToRunningTasklds中的),那么
    允许的本地性级别里包括 PROCESS_LOCAL
  • 如果存在 Host 上待处理的Task的集合(即 pendingTasksForHost不为空) 且 NODE_LOCAL 级别的等待时间不为0,除此
    以外, Host 上存在已被激活的Executor(即 pendingTasksForHost 中的 Host 有存在于 TaskSchedulerImpl 的
    hostToExecutors 中的), 那么允许的本地性级别里包括 NODE_LOCAL
  • 如果存在没有任何本地性偏好的待处理 Task,那么允许的本地性级别里包括 NO_PREF
  • 如果存在机架上待处理的Task的集合(即 pendingTasksForRack 不为空)且 RACK_LOCAL 级别的等待时间不为 0,除此
    以外,机架上存在已被激活的 Executor (即 pendingTasksForRack 中的机架有存在于 TaskSchedulerImpl 的 hostsByRack中的),那么允许的本地性级别里包括 RACK_LOCAL
  • 允许的本地性级别里增加 ANY。
  • 返回所有允许的本地性级别。
  1. getLocalityWait()

getLocalityWait() 方法用于获取某个本地性级别的等待时间。

  • 获取默认的等待时间。默认的等待时间可以通过 sparklocalitywait 属性配置,默认为3秒。
  • 根据本地性级别匹配到对应的配置展性。PROCESS_LOCAL 级别的配置属性为 spark.locality.wait.process
    NODE_LOCAL 级别的配置属性为 spark.locality.wait.node, RACK_LOCAL 级别的配置属性为 spark.locality.wait.rack 。其他级别没有配置属性。
  • 使用第(2)步得到的展性名称, 获取本地性级别对应的等待时间。如果第(2)步没有得到属性名称,那么将返回0。
  1. getLocalityIndex()

从 myLocalityLevels 中找出指定的本地性级别所对应的索引。

  1. getAllowedLocalityLevel()

获取允许的本地性级别

  • 按照索引由高到低从 myLocalityLevels 读取本地性级别,然后执行以下操作:
    • 调用 moreTasksToRunln 方法判断本地性级别对应的待处理 Task 的缓存结构中是否有 Task 需要处理。
    • 如果没有 Task 需要处理, 则将最后的运行时间设置为curTime。
    • 如果有 Task 需要处理且 curTime 与最后运行时问的差值大于当前本地性级别的等待时间,则将最后的运行时间增加
      当前本地性级别的等待时间(这样实际将直接跳入更低的本地性级别)
    • 如果有Task需要处理且curTime与最后运行时间的差值小于等于当前本地性级别的等待时间,则返回当前本地性级
      别。

如果上一步未能找到允许的本地性级别,那么返回最低的本地性级别。

resourceOffer()

resourceOffer() 方法在 TasksetManager 处于 “僵尸” 状态并且分配 Task 的 Host 和 Executor 在黑名单中时直接返回 None,否则执行如下步骤:

  1. 计算允许的本地性级别。

如果最大本地性级别(即 maxLocality )为 NO_PREF, 则允许的本地性级别为 NO_PREF 。如果
maxLocality 不是 NO_PREF, 则允许的本地性级别为 maxLocality 和调用 getAllowedLocalityLevel() 获取的本地性级别中较小的
本地性级别。

  1. 调用 dequeueTask() 方法, 根据指定的 Host、Executor 和本地性级别,找出三元组(包括要执行的 Task 的索引、相应的本地性
    级别、是否推断执行的标记),并对三元组进行如下操作:
  • 根据要执行的 Task 的索引找到要执行的 Task
  • 为 Task 生成新的身份标识
  • 将 Task 对应的 copiesRunning 信息 +1,即增加复制运行数
  • 获取任务尝试号 attemptNum
  • 创建任务尝试信息(TaskInfo)。
  • 将 Task 的身份标识与 TaskInfo 的对应关系放入 taskInfos。
  • 如果 maxLocality 不是 NO_PREF ,那么调用 getLocalityIndex() 方法,获取任务的本地性偏好级别在 myLocalityLevels 中的
    索引, 并将最后一次运行时间设置为当前系统时问。
  • 将 Task、用户添加的 Jar 包及其他文件序列化,得到需要经过网络传输的序列化 Task。
  • 如果序列化 Task 的大小超过了 100KB,且 emittedTaskSizeWarning 仍然为 false, 则将 emittedTaskSizeWarning 设置为
    true 并且打印警告日志。
  • 调用 addRunningTask 方法, 向 runningTasksSet 中添加 Task 的身份标识,并增加父调度池及祖父调度池中记录的当前正在运行的任务数量。
  • 生成Task的名称,格式为: “task $index.$attemptNumber in stage $stageld.$stage AttemptId”。
  • 调用 DAGScheduler 的 taskStarted() 方法向 DAGSchedulerEventProcessLoop 投递 BeginEvent 事件。
  • 创建并返回 TaskDes