Spark-源码学习-SparkCore-存储服务-架构设计-存储内存
Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 要执行多次 action 操作, 可以在第一次 action 操作中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。
弹性分布式数据集 RDD 作为 Spark 最根本的数据抽象,是只读的分区记录的集合,基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换 [Transformation] 操作产生一个新的 RDD。转换后的 RDD 与 原始的 RDD 之间产生的依赖关系构成了血统[Lineag]。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。
其中 cache 这个方法是个 Tranformation ,当第一次遇到 action 算子的时才会进行持久化
cache 内部调用了 persist(StorageLevel.MEMORY_ONLY)方法,所以执行 cache 算子其实就是执行了 persist 算子且持久化级别为 MEMORY_ONLY。
RDD 持久化
RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。
在对 RDD 持久化时,
RDD 缓存
Unroll
RDD 在缓存到存储内存之前,数据项 [Record]的对象实例占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同数据项的存储空间并不连续。
缓存到存储内存之后, Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为”展开” [Unroll]
每个 Executor 的 Storage 模块用一个 LinkedHashMap 来管理堆内和堆外存储内存中所有的 Block ,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。
因为不能保证存储空间可以一次容纳所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,对于序列化的 Partition ,其所需的 Unroll 空间可以直接累加计算,一次申请。
对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。
对于非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。
如果最终 Unroll 成功, 当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间
淘汰与落盘
由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中 的旧 Block 进行淘汰,而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘,否则直接删除该 Block。
- 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
- 新旧 Block 不能属于同一个RDD,避免循环淘汰
- 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
- 遍历 LinkedHashMap 中 Block,按照最近最少使用 LRU 的顺序淘汰,直到满足新 Block 所需的空间。 其中 LRU 是 LinkedHashMap 的特性。