Spark-源码学习-SparkCore-存储服务-内存-存储内存池 StorageMemoryPool
一、概述
存储内存池主要用于 RDD 的缓存,广播以及备份中。不像执行内存池需要维护每个 Task 的内存占用情况,存储内存池只提供了一个 _memoryUsed
的变量来进行当前内存的使用情况。
二、实现
2.1. 结构
2.1.1. 属性
_memoryUsed: 已经使用的内存大小
_memoryStore: 当前 StorageMemoryPool 所关联的 MemoryStore
三、内存管理
3.1. 申请内存
$acquireMemoryacquireMemory()$ 用于给 BlockId 对应的 Block 获取 numBytes 指定大小的内存。
首先计算要申请的内存大小 numBytes 与空闲空间 memoryFree 的差值 numBytesToFree,然后调用重载的 $acquireMemory()$ 方法申请获得内存。
1
2
3
4def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
val numBytesToFree = math.max(0, numBytes - memoryFree)
acquireMemory(blockId, numBytes, numBytesToFree)
}重载的 $acquireMemory()$ 用于给 BlockId 对应的 Block 获取 Block 所需大小 (numBytes-ToAcquire) 的内存。
当 StorageMemoryPool 内存不足时(numBytesToFree > 0),还需要腾出其他 Block 占用的内存给当前 Block,调用 $MemoryStore.evictBlocksToFreeSpace()$ 方法,腾出 numBytesToFree 属性指定大小的空间。
1
2
3if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}然后判断驱逐内存后的 free 内存是否比待申请内存大,如果满足条件,则进行内存分配(将 numBytesToAcquire 增加到 _memoryUsed,即逻辑上获得了用于存储Block 的内存空间),否则不进行内存分配,返回 false,告知失败。
StorageMemoryPool 与 ExecutionMemoryPool 不同,不会分不到资源就进行等待,$acquireStorageMemory()$ 只会返回一个 true 或是 false,告知内存分配是否成功
3.2. 回收内存
释放内存的逻辑由 $releaseMemory()$ 和 $releaseAllMemory() $ 方法来实现,存储内存池直接将 _memoryUsed=0
,如果需要释放指定大小内存,那么只要减少一部分即可。
1 | def releaseAllMemory(): Unit = lock.synchronized { |
3.3. 内存池收缩
$shrinkPoolToFreeSpace()$ 方法负责对 Storage 内存池收缩 spaceToFree 字节大小,返回实际收缩的大小值。
取试图收缩大小
spaceToFree
和可用内存memoryFree
的较小者1
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
即如果试图收缩的
spaceToFree
大于可用内存大小,那么最大也就是收缩可用内存大小 memoryFree计算预设定收缩大小中未完成的部分:
remainingSpaceToFree
1
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
如果需要缩小的部分大于 free 内存,不能缩小的部分,需要调用 $memoryStore.evictBlocksToFreeSpace()$ 进行内存驱逐,之后返回总的可以释放的内存。
1
val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
内存池做相应的减少 spaceFreedByEviction
返回收缩的实际大小