一、概述

MemoryManager 是 Flink 中管理托管内存的组件,其管理的托管内存只使用堆外内存。在批处理中用在排序、Hash 表和中间结果的缓存中,在流计算中作为 RocksDBStateBackend 的内存。
1.10 之前的 Flink 版本中,MemoryManager 负责 TaskManager 的所有内存,1.10 版本中,MemoryManager 的管理范围缩小为 Slot 级别,即为 Task 管理内容,

TaskManager 为每个 Slot 分配相同的内容,Task 不能使用超过其 Slot 分配的资源。这样的实现并不完美,但是相比 1.10 之前的版本,能够更好地隔离任务,系统更加稳定。

需要注意的是,并不是所有的算子都会使用 MemoryManager 申请内存空间,这个步骤主要针对批计算类型的算子,例如 HashJoinOperator SorMergeJoinOperator 和 SortOperator等,这些算子往往需要借助非常大的内存空间进行数据的排序等操作。
MemoryManager 主要通过内部接口 MemoryPool 来管理所有的 MemorySegment。托管内存的管理相比于 Network Buffers 的管理更为简单,因为不需要Buffer的那一层封装。

二、架构设计

三、实现

3.1. 内存申请

3.1.1. 批处理计算任务

批处理计算任务中,MemoryManager 负责为算子动态申请内存空间,最终用于算子的具体计算过程。

3.1.2. 流计算任务

流计算任务中,MemoryManager 更多的作用是管理,控制 RocksDB 的内存使用量,通过 RocksDB 的 BlockCache 和 WriterBufferManager 参数来限制,参数的具体值从 TaskManager 的内存配置参数中计算而来。RocksDB 自己负责运行过程中的内存申请和内存释放。

3.2. 内存释放

Buffer 使用了引用计数机制来判断什么时候可以释放 Buffer。每创建一个 BufferConsumer,就会对 Buffer 的引用用计数+1,每个 Buffer被消费完,就会对 Buffer 的引用用计数-1。
Buffer 的主要实现类是 NetworkBuffer,同时继承了 AbstractPeferenceCountedByteBuf。当 Buffer 被消费一次后,就会对 Buffer 与引用计数-1,Buffer 回收之后,并不会释放 MemorySegment,此时 MemorySegment 仍然在 LocalBufferPool 的资源池中,除非 TaskManager 级别内存不足,才会释放回 TaskManager 持有的全局资源池。
释放 MemorySegment 的时候,同样要根据 MemorySegment 的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中,变为可用内存,后续申请 MemorySegment 的时候,可以重复利用该内存片段。MemorySegment 当 NetworkBufferPool 关闭的时候进行内存的释放,交还给操作系统。