Spark-源码学习-SparkCore-调度机制-资源调度-Standalone-动态管理 Executor
一、概述
ExecutorAllocationManager 用于对已分配的 Executor 进行管理,默认情况下不会创建 ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled 为 true 来创建。
ExecutorAllocationManager 内部会定时根据工作负载计算所需的 Executor 数量,如果对 Executor 需求数量大于之前向集群管理器申请的Executor 数量,那么向集群管理器申请添加 Executor;如果对 Executor 需求数量小于之前向集群管理器申请的 Executor 数量,那么向集群管理器申请取消部分 Executor。此外,ExecutorAllocationManager 内部还会定时向集群管理器申请移除过期的 Executor。
二、实现
ExecutorAllocationManager 可以设置动态分配最小 Executor 数量、动态分配最大 Executor 数量、每个 Executor 可以运行的 Task 数量等配置信息,并对配置信息进行校验。
ExecutorAllocationManager 会基于一个初始 executor 数,结合当前任务的负载情况动态调整 executor 个数。
初始值一般是我们提交作业的时候指定的,也就是
--num-executors
,也有一个配置参数spark.dynamicAllocation.initialExecutors 来控制
在作业运行的过程当中,会通过 SparkListener 来监测 executor 情况,如果有 executor 是处于空闲的状态,而且超过了一定的时间就会被代理给发起 $kill$ 请求。这个一定的空闲期由 spark.dynamicAllocation.executorIdleTimeout 来设置
在运行的过程当中,仍然有任务处于 pending 的状态,而且这个等待的时间超过一定的阈值后,ExecutorAllocationManager 会新增 executor 个数,等待时间的阈值可以通过 spark.dynamicAllocation.schedulerBacklogTimeout 设置
2.1. 属性
2.2. 方法
$start()$
该方法将 ExecutorAllocationListener 加入 listenerBus 中,ExecutorAllocationListener 通过监听 listenerBus 里的事件,动态添加、删除 Executor。并且通过 Thread 不断添加 Executor,遍历 Executor,将超时的 Executor 杀掉并移除。
$updateAndSyncNumExecutorsTarget()$
更新 executor 个数并把该结果值同步给集群管理器,调用 $maxNumExecutorNeeded()$ 方法来计算当前负载下所需要的最大executor 个数
三、初始化
ExecutorAllocationManager 的初始化是在 SparkContext 中~
1 | val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) |