Spark-源码学习-SparkCore-存储服务-内存-执行内存池 ExecutionMemoryPool
一、概述
执行内存池内存只会分配给 Task 进行使用,主要用于 Task 中的 Shuffle、Join、Aggregation 等操作时候的内存提供。由于执行内存会由多个 Task 进行共享,所以为了保证 Task 合理地进行内存使用,避免某些 Task 过度使用内存导致其它的Task频繁将数据溢写到磁盘,拖垮整体执行速度,执行内存池需要保证在 N 个 Task 的情况下,每个 Task 所能分配到的内存在总内存的 1/2N~1/N 之间,由于 Task 数量是动态的,因此会跟踪所有激活的Task的数量以便重新计算 1/2N 和 1/N 的值。
二、实现
2.1. 结构
2.1.1. 属性
ExecutionMemoryPool 用一个 HashMap 来维护一个 TaskAttempt[身份标识为 taskAttemptId]与所消费内存的大小之间的映射关系。
1 | /** |
三、内存管理
3.1. 分配内存
$ExecutionMemoryPool.acquireMemory()$ 为每个任务分配内存,返回实际分配的内存大小,如果当任务数量增多,而老任务已经占据大量内存时,新来的任务不能获取到至少 1/2N
的内存时,来保证每个任务都有机会获取到 execution 总内存的 1/2N
时候,会阻塞申请内存的任务直到其他 Task 释放内存唤醒当前线程,重新进行计算,尝试获取到足够的空闲内存。
如果该 task 之前没有进行过内存申请,则将其加入 memoryForTask,内存大小为0,并且唤醒所有等待申请内存的线程。
1
2
3
4if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
lock.notifyAll()
}获取当前 task 的数量
numActiveTasks
,以及当前待申请内存Task的已有内存curMem
1
2val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)执行内存增长策略 maybeGrowPool
如果执行内存不足,在有一些 MemoryManager 比如 UnifiedMemoryManager 时候,会向存储内存借用或者回收执行内存挪用给存储内存的内存;
1
maybeGrowPool(numBytes - memoryFree)
执行完内存增长策略后,调用 $computeMaxPollSize()$ ,计算释放存储内存后,执行内存池可用的最大大小: maxPoolSize
1
val maxPoolSize = computeMaxPoolSize()
计算每个 Task 可以申请的最大内存: maxPoolSize / numActiveTasks,表示内存增长策略后当前总内存的 1/n
计算每个 Task 可以申请的最小内存: poolSize / (2 * numActiveTasks),表示没有进行增长策略时候执行内存总大小的 1/2n
计算当前任务可以申请到最大的内存大小 maxToGrant
1
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
表示不超过 numBytes 并且不超过每个 task 可申请最大内存。保持在0 <= X <= 1 / numActiveTasks 之间;
计算当前任务真正可以申请到的内存大小(toGrant)
1
val toGrant = math.min(maxToGrant, memoryFree)
如果申请的内存小于待申请内存 numBytes,且当前总的 Task 内存小于 Task 可以申请的最小内存,说明连 Task 执行的最基本内存要求都无法满足,则执行 $lock.wait()$ 进行线程等待,等待内存有释放再唤醒;否则,更新 memoryForTask 的当前 Task 内存大小为 toGrant,并返回 toGrant,退出循环。
1
2
3
4
5
6if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
3.2. 回收内存
如果释放内存大于分配给当前 Task 已经分配的内存,那么需要释放的内存大小是当前 Task 所申请的内存大小,否则是指定的内存大小。
1
2
3
4
5
6val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
val memoryToFree = if (curMem < numBytes) {
curMem
} else {
numBytes
}更新记录 Task 占用内存的 Map[memoryForTask],对该 Task 需要释放的内存大小进行收回,如果剩余内存为 0,则将该 Task 移除
1
2
3
4
5
6if (memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) -= memoryToFree
if (memoryForTask(taskAttemptId) <= 0) {
memoryForTask.remove(taskAttemptId)
}
}最后由于释放了内存,其他正在等待内存分配的 Task 或许可以申请到需要的内存大小,所以通过 $lock.notifyAll() $ 唤醒等待申请内存的其他线程
1
lock.notifyAll()