一、概述

执行内存池内存只会分配给 Task 进行使用,主要用于 Task 中的 Shuffle、Join、Aggregation 等操作时候的内存提供。由于执行内存会由多个 Task 进行共享,所以为了保证 Task 合理地进行内存使用,避免某些 Task 过度使用内存导致其它的Task频繁将数据溢写到磁盘,拖垮整体执行速度,执行内存池需要保证在 N 个 Task 的情况下,每个 Task 所能分配到的内存在总内存的 1/2N~1/N 之间,由于 Task 数量是动态的,因此会跟踪所有激活的Task的数量以便重新计算 1/2N 和 1/N 的值。

二、实现

2.1. 结构

2.1.1. 属性

ExecutionMemoryPool 用一个 HashMap 来维护一个 TaskAttempt[身份标识为 taskAttemptId]与所消费内存的大小之间的映射关系。

1
2
3
4
5
/**
* Map from taskAttemptId -> memory consumption in bytes
*/
@GuardedBy("lock")
private val memoryForTask = new mutable.HashMap[Long, Long]()

三、内存管理

3.1. 分配内存

$ExecutionMemoryPool.acquireMemory()$ 为每个任务分配内存,返回实际分配的内存大小,如果当任务数量增多,而老任务已经占据大量内存时,新来的任务不能获取到至少 1/2N 的内存时,来保证每个任务都有机会获取到 execution 总内存的 1/2N 时候,会阻塞申请内存的任务直到其他 Task 释放内存唤醒当前线程,重新进行计算,尝试获取到足够的空闲内存。

  1. 如果该 task 之前没有进行过内存申请,则将其加入 memoryForTask,内存大小为0,并且唤醒所有等待申请内存的线程。

    1
    2
    3
    4
    if (!memoryForTask.contains(taskAttemptId)) {
    memoryForTask(taskAttemptId) = 0L
    lock.notifyAll()
    }
  2. 获取当前 task 的数量 numActiveTasks,以及当前待申请内存Task的已有内存 curMem

    1
    2
    val numActiveTasks = memoryForTask.keys.size
    val curMem = memoryForTask(taskAttemptId)
  3. 执行内存增长策略 maybeGrowPool

    如果执行内存不足,在有一些 MemoryManager 比如 UnifiedMemoryManager 时候,会向存储内存借用或者回收执行内存挪用给存储内存的内存;

    1
    maybeGrowPool(numBytes - memoryFree)
  4. 执行完内存增长策略后,调用 $computeMaxPollSize()$ ,计算释放存储内存后,执行内存池可用的最大大小: maxPoolSize

    1
    val maxPoolSize = computeMaxPoolSize()
  5. 计算每个 Task 可以申请的最大内存: maxPoolSize / numActiveTasks,表示内存增长策略后当前总内存的 1/n

  6. 计算每个 Task 可以申请的最小内存: poolSize / (2 * numActiveTasks),表示没有进行增长策略时候执行内存总大小的 1/2n

  7. 计算当前任务可以申请到最大的内存大小 maxToGrant

    1
    val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))

    表示不超过 numBytes 并且不超过每个 task 可申请最大内存。保持在0 <= X <= 1 / numActiveTasks 之间;

  8. 计算当前任务真正可以申请到的内存大小(toGrant)

    1
    val toGrant = math.min(maxToGrant, memoryFree)
  9. 如果申请的内存小于待申请内存 numBytes,且当前总的 Task 内存小于 Task 可以申请的最小内存,说明连 Task 执行的最基本内存要求都无法满足,则执行 $lock.wait()$ 进行线程等待,等待内存有释放再唤醒;否则,更新 memoryForTask 的当前 Task 内存大小为 toGrant,并返回 toGrant,退出循环。

    1
    2
    3
    4
    5
    6
    if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
    lock.wait()
    } else {
    memoryForTask(taskAttemptId) += toGrant
    return toGrant
    }

3.2. 回收内存

  1. 如果释放内存大于分配给当前 Task 已经分配的内存,那么需要释放的内存大小是当前 Task 所申请的内存大小,否则是指定的内存大小。

    1
    2
    3
    4
    5
    6
    val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
    val memoryToFree = if (curMem < numBytes) {
    curMem
    } else {
    numBytes
    }
  2. 更新记录 Task 占用内存的 Map[memoryForTask],对该 Task 需要释放的内存大小进行收回,如果剩余内存为 0,则将该 Task 移除

    1
    2
    3
    4
    5
    6
    if (memoryForTask.contains(taskAttemptId)) {
    memoryForTask(taskAttemptId) -= memoryToFree
    if (memoryForTask(taskAttemptId) <= 0) {
    memoryForTask.remove(taskAttemptId)
    }
    }
  3. 最后由于释放了内存,其他正在等待内存分配的 Task 或许可以申请到需要的内存大小,所以通过 $lock.notifyAll() $ 唤醒等待申请内存的其他线程

    1
    lock.notifyAll()