一、概述

存储内存池主要用于 RDD 的缓存,广播以及备份中。不像执行内存池需要维护每个 Task 的内存占用情况,存储内存池只提供了一个 _memoryUsed 的变量来进行当前内存的使用情况。

二、实现

2.1. 结构

2.1.1. 属性

  1. _memoryUsed: 已经使用的内存大小

  2. _memoryStore: 当前 StorageMemoryPool 所关联的 MemoryStore

三、内存管理

3.1. 申请内存

$acquireMemoryacquireMemory()$ 用于给 BlockId 对应的 Block 获取 numBytes 指定大小的内存。

  1. 首先计算要申请的内存大小 numBytes 与空闲空间 memoryFree 的差值 numBytesToFree,然后调用重载的 $acquireMemory()$ 方法申请获得内存。

    1
    2
    3
    4
    def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
    val numBytesToFree = math.max(0, numBytes - memoryFree)
    acquireMemory(blockId, numBytes, numBytesToFree)
    }
  2. 重载的 $acquireMemory()$ 用于给 BlockId 对应的 Block 获取 Block 所需大小 (numBytes-ToAcquire) 的内存。

  3. 当 StorageMemoryPool 内存不足时(numBytesToFree > 0),还需要腾出其他 Block 占用的内存给当前 Block,调用 $MemoryStore.evictBlocksToFreeSpace()$ 方法,腾出 numBytesToFree 属性指定大小的空间。

    1
    2
    3
    if (numBytesToFree > 0) {
    memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    }

    然后判断驱逐内存后的 free 内存是否比待申请内存大,如果满足条件,则进行内存分配(将 numBytesToAcquire 增加到 _memoryUsed,即逻辑上获得了用于存储Block 的内存空间),否则不进行内存分配,返回 false,告知失败。

StorageMemoryPool 与 ExecutionMemoryPool 不同,不会分不到资源就进行等待,$acquireStorageMemory()$ 只会返回一个 true 或是 false,告知内存分配是否成功

3.2. 回收内存

释放内存的逻辑由 $releaseMemory()$ 和 $releaseAllMemory() $ 方法来实现,存储内存池直接将 _memoryUsed=0,如果需要释放指定大小内存,那么只要减少一部分即可。

1
2
3
def releaseAllMemory(): Unit = lock.synchronized {
_memoryUsed = 0
}

3.3. 内存池收缩

$shrinkPoolToFreeSpace()$ 方法负责对 Storage 内存池收缩 spaceToFree 字节大小,返回实际收缩的大小值。

  1. 取试图收缩大小 spaceToFree 和可用内存 memoryFree 的较小者

    1
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)

    即如果试图收缩的 spaceToFree 大于可用内存大小,那么最大也就是收缩可用内存大小 memoryFree

  2. 计算预设定收缩大小中未完成的部分: remainingSpaceToFree

    1
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
  3. 如果需要缩小的部分大于 free 内存,不能缩小的部分,需要调用 $memoryStore.evictBlocksToFreeSpace()$ 进行内存驱逐,之后返回总的可以释放的内存。

    1
    val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
  4. 内存池做相应的减少 spaceFreedByEviction

  5. 返回收缩的实际大小