Spark-源码学习-SparkCore-存储服务-内存组件-内存管理器 UnifiedMemoryManager
一、概述
UnifiedMemoryManager 是从 1.6 开始的统一内存管理模型,统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域
二、实现
2.1. 成员属性
- maxHeapMemory: 最大堆内存。大小为系统可用内存与 spark.memory.fraction 属性值的乘积
- onHeadpStorageRegionSize: 用于存储的堆内存大小。
- numCores: CPU 内核数
2.2. 主要方法
三、内存申请
3.1. 执行内存
$UnifiedMemoryManager.accquireExecutionMemory()$ 方法申请执行内存~当任务尝试从 executor 中申请 numBytes 大小的内存
该方法直接向 ExecutionMemoryPool 索要所需内存:
当 ExecutionMemory 内存充足,则不会触发向 Storage 申请内存: $maybeGrowExecutionPool()$
UnifiedMemoryManager 其中最重要的优化在于动态占用机制, 其规则如下:
设定基本的存储内存和执行内存区域(
spark.storage.storageFraction
参数),该设定确定了双方各自拥有的空间的范围;双方的空间都不足时,则存储到硬盘,若己方空间不足而对方空余时,可借用对方的空间; [注: 存储空间不足是指不足以放下一个完整的Block]
执行内存的空间被对方占用后,可让对方将占用的部分转存到磁盘,然后”归还”借用的空间;
存储内存的空间被对方占用后,无法让对方 “归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
1
2
3
4
5
6
7
8
9
10def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
val memoryReclaimableFromStorage = math.max(storagePool.memoryFree, storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}$storagePool.freeSpaceToShrinkPool()$
StoragePool 释放指定大小的空间,缩小内存池的大小。
每个 Task 能够被使用的内存是被限制的
3.2. 存储内存
流程和 $acquireExecutionMemory()$ 类似,当 storage 的内存不足时,同样会向execution借内存,但区别是当且仅当ExecutionMemory有空闲内存时,StorageMemory 才能借走该内存
3.3. UnRoll 内存
1 | override def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { |