一、概述

ExecutorAllocationManager 用于对已分配的 Executor 进行管理,默认情况下不会创建 ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled 为 true 来创建。

ExecutorAllocationManager 内部会定时根据工作负载计算所需的 Executor 数量,如果对 Executor 需求数量大于之前向集群管理器申请的Executor 数量,那么向集群管理器申请添加 Executor;如果对 Executor 需求数量小于之前向集群管理器申请的 Executor 数量,那么向集群管理器申请取消部分 Executor。此外,ExecutorAllocationManager 内部还会定时向集群管理器申请移除过期的 Executor。

二、实现

ExecutorAllocationManager 可以设置动态分配最小 Executor 数量、动态分配最大 Executor 数量、每个 Executor 可以运行的 Task 数量等配置信息,并对配置信息进行校验。

ExecutorAllocationManager 会基于一个初始 executor 数,结合当前任务的负载情况动态调整 executor 个数。

初始值一般是我们提交作业的时候指定的,也就是 --num-executors,也有一个配置参数spark.dynamicAllocation.initialExecutors 来控制

在作业运行的过程当中,会通过 SparkListener 来监测 executor 情况,如果有 executor 是处于空闲的状态,而且超过了一定的时间就会被代理给发起 $kill$ 请求。这个一定的空闲期由 spark.dynamicAllocation.executorIdleTimeout 来设置

在运行的过程当中,仍然有任务处于 pending 的状态,而且这个等待的时间超过一定的阈值后,ExecutorAllocationManager 会新增 executor 个数,等待时间的阈值可以通过 spark.dynamicAllocation.schedulerBacklogTimeout 设置

2.1. 属性

2.2. 方法

  1. $start()$

    该方法将 ExecutorAllocationListener 加入 listenerBus 中,ExecutorAllocationListener 通过监听 listenerBus 里的事件,动态添加、删除 Executor。并且通过 Thread 不断添加 Executor,遍历 Executor,将超时的 Executor 杀掉并移除。

    1. $updateAndSyncNumExecutorsTarget()$

      更新 executor 个数并把该结果值同步给集群管理器,调用 $maxNumExecutorNeeded()$ 方法来计算当前负载下所需要的最大executor 个数

三、初始化

ExecutorAllocationManager 的初始化是在 SparkContext 中~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner, resourceProfileManager = resourceProfileManager,
reliableShuffleStorage = _shuffleDriverComponents.supportsReliableStorage()))
case _ => None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())