一、概述

1.1. JVM 内存管理

基于 Java 语言构建的应用,借助 JVM 提供的 GC 能力能够实现内存的自动管理,但会遇到一些基于 JVM 的内存管理问题。尤其对于大数据处理场景而言,需要处理非常庞大的数据,JVM 内存管理的问题就更加突出了,主要体现在以下几点:

  1. Java 对象存储密度相对较低

    对于常用的数据类型,例如Boolean类型数据占16字节内存空间,其中对象头占字节,Boolean 属性仅占 1 字节,其余7字节做对齐填充。而实际上仅1字节就能够代表Boolean值,这种情况造成了比较严重的内存空间浪费

  2. Full GC 影响系统性能
    使用 JVM 的垃圾回收机制对內存进行回收,在大数据量的情况下 GC 的性能会比较差,尤其对于大数据处理,有些数据对象处理完希望立即释放內存空间,但如果借助 JVM GC 自动回收,通常情况下会有秒级甚至分钟级别的延迟,这对系统的性能造成了非常大的影响

  3. OutOfMemoryError 影响系统稳定性

    系统出现对象大小分配超过 JVM 内存限制时,就会触发 OutofMemoryError。导致 JVM 宕机,影响整个数据处理进程

1.2. 堆外内存管理

鉴于以上 JVM 堆内存管理存在的问题,在大数据领域已经有非常多的项目开始自己管理内存。

对 Flink 内存管理来讲,主要是将本来直接存储在堆内存上的数据对象,通过数据序列化处理,存储在预先分配的内存块上,该内存块也叫作MemorySegment,代表了固定长度的內存范围,默认大小为 32KB,同时 MemorySegment 也是 Flink 的最小内存分配单元。

二、内存模型

Flink 的 JVM 的进程总内存(Total Process Memory) 包含了 Flink 总内存(Total Flink Memory) 和运行 Flink 的 JVM 特定内存(JVM Specific Memory),Flink 将内存划分成不同的区域,实现了更加精准地内存控制。

内存

Flink 总内存又包括 JVM 堆内存(JVM Heap)以及堆外内存。

2.1.1. 堆内存

在Flink 中将 JVM 堆内存分为 Framework 堆内存和 Task 堆内存两种类型,其中 Framework 堆内存主要用于 Flink 框架本身需要的内存空间,Task 堆内存则用于 Flink 算子及用户代码的执行。两者主要的区别在于是否将内存计入 Slot 计算资源中,Framework 堆内存和 Task 堆内存之间没有做明确的隔离。

  1. 框架堆内存(Framework Heap Memory)

    用于 Flink 框架的 JVM 堆内存

  2. 任务堆内存(Task Heap Memory)

    用于 Flink 应用的算子及用户代码的 JVM 堆内存

2.1.2. 堆外内存

堆外内存包括托管内存(ManagedMemory)以及直接内存 (Direct Memory)

  1. 托管内存(Managed memory)

    托管内存是由 Flink 负责分配和管理的本地(堆外)内存,用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存

  2. 直接内存

    直接内存分为 Framework 非堆内存、Task 非堆内存和网络内存三个部分,其中 Framework 非堆内存和 Task 非堆内存主要根据堆外内存是否计入 slot 资源进行区分,堆外内存没有对 Framework 和 Task 之间进行隔离。网络内存存储空间主要用于基于 Netty 进行网络数据交换。

    • 框架堆外内存(Framework Off-heap Memory)

      用于 Flink 框架的堆外内存(直接内存或本地内存)

    • 任务堆外内存(Task off-heap Memory)

      用于 Flink 应用的算计及用户代码的堆外内存(直接内存或本地内存)

    • 网络内存(Network Memory)

      用于任务之间数据传输的直接内存(例如网络传输缓冲), 该内存部分为基于 Flink 总内存的受限的等比内存部分。

2.2. JVM 特定使用内存(JVM Special Memory)

JVM 本身直接使用了操作系统的内存不在 Flink 总内存范围之内,包括 JVM 元空间和 JVM 执行开销内存,其中 JVM 元空间存储了JVM 加载类的元数据,加载的类越多,需要的内存空间越大,JVM 执行开销内存指的就是 JVM 在执行时自身所需要的内存,包括线程堆栈、I/O、编译缓存等所使用的内存。

三、内存计算

目前的实现中,在 JVM 启动之前就需要确定各个内存区块的大小。一旦 JVM 启动了,在 TaskManager 进程内部就不再重新计算。Flink 中有两个地方进行内存大小计算

  1. 在 Standalone 部署模式下,内存的计算在启动脚本中实现。
  2. 在容器环境下(Yarn、K8s、 Mesos),计算在 ResourceManager 中进行。

计算时,需要配置如下 3 个参数组合中的至少 1 个:

  1. Task 的堆内内存和托管内存

如果手动配置了网络缓冲区内存大小,则使用该参数。如果没有明确配置,则使用分配系数 fraction × 总体 Flink 使用内存 计算网络缓冲区内存大小。

  1. 总体 Flink 使用内存

如果配置了该选项,而没有配置(1),则从总体 Flink 内存中划分网络缓冲区内存和托管内存,剩余的内存作为 Task 堆内存。如果手动设置了网络缓冲内存,则使用其值,否则使用默认分配系数 fraction × 总体 Flink 使用内存。如果手动设置了托管内存,则使用其值,否则使用默认的分配系数 fraction × 总体 Fink 使用内存。

  1. 总体进程使用内存

如果只配置了总体进程使用内存,则从总体进程中扣除 JVM 元空间和 JVM 执行开销内存,剩余的内存作为总体 Flink 使用内存

四、实现

4.1. 数据结构

Flink 的内存管理像操作系统管理内存一样, 将内存划分为内存段、内存页等结构。

4.1.1. 内存段

Flink 将本来直接存储在堆内存上的数据对象,通过数据序列化处理,存储在预先分配的内存块 MemorySegment,MemorySegment 是 Flink 的最小内存分配单元。默认一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆内存,也可以是堆外内存,如果 MemorySegment 底层使用的是 JVM 堆内存,数据通常存储至 Java 字节数组 byte[] 中, 如果 MemorySegment 底层使用的是堆外内存,数据会以序列化的形式存储在一个或多个 MemorySegment 中。MemorySegment 将 JVM 堆内存和堆外内存进行集中管理,形成统一的内存访问视图。MemorySegment 提供了高效的内存读写方法。

4.1.2. 内存页

Flink通过 MemorySegmentFactory 来创建 MemorySegment,MemorySegment 是 Flink 内存分配的最小单位。对于跨 MemorySegment 的数据访问,Flink 抽象出一个访问视图,数据读取视图 datainputView 以及数据写入视图 dataoutputview。

对于跨 MemorySegment 保存的数据,如果需要上层的使用者,需要考虑所有的细节,非常烦琐,所以 Flink 又抽象了一层内存页。内存页是 MemorySegment 之上的数据访问视图,上层使用者无须关心 MemorySegment 的细节,该层会自动处理跨 MemorySegment 的读取和写入。

4.1.3. Buffer

在 Flink 内存模型中,一个非常重要的堆外内存使用区域就是 Network 内存。Network 内存主要用于网络传输中 Buffer 数据的缓冲区,Buffer 接口是网络层面上传输数据和事件的统一抽象,其实现类是 NetworkBuffer。

Flink 在各个 TaskManager 之间传递数据时,使用的是这一层的抽象。1 个 NetworkBuffer 中包装了 1 个 MemorySegment。

4.1.4. BufferBool

BufferPool 用来管理 Buffer,包含Buffer的申请、释放、销毁、可用Buffer通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。

4.2. 内存管理

Flink 将内存按照使用方式、内存类型分为不同的内存区域,底层会借助 MemorySegment 对内存块进行管理和访问。

  1. JVM 堆内存管理

  2. 堆外内存管理

    在 MemorySegment 中通过 ByteBuffer.allocateDirect(numBytes) 方法申请堆外内存,然后用 sun.misc.Unsafe 对象操作堆外内存

    • 托管内存管理器 MemoryManager

      MemoryManager 是 Flink 中管理托管内存的组件,其管理的托管内存只使用堆外内存。在批处理中用在排序、Hash 表和中间结果的缓存中,在流计算中作为 RocksDBStateBackend 的内存。

      需要注意的是,并不是所有的算子都会使用 MemoryManager 申请 MemorySegment 内存空间,主要针对批计算类型的算子,例如 HashJoinOperator、SortMergeJoinOperator 和 SortOperator 等,这些算子往往需要借助非常大的内存空间进行数据的排序等操作。

    • 直接内存管理

      1. 网络内存管理 NetworkBuffer

        网络缓存器(NetworkBuffer) 是网络交换数据的包装,其对应于 MemorySegment 内存段,当结果分区(ResultParition)开始写出数据的时候,需要向 LocalBufferPool 申请 Buffer 资源,使用 BufferBuilder 将数据写入 MemorySegment。

      2. Framework 堆外内存管理

      3. Task 堆外内存管理

4.2.2. JVM Special Memory

五、总结

大数据场景下,使用Java的内存管理会带来一系列的问题,所以Flink从一开始就选择自主管理内存。为了实现内存管理,Flink 对内进行了一系列的抽象,内存段 MemorySegment 是最小的内存分配单位,对于跨段的内存访问,Flink 抽象了 DataInputView 和 DataOutputView,可以看作是内存页。

Flink 在 1.10 版本重构了其 TaskManger 的内存管理模型,主要分为堆上内存和堆外内存,并简化了內存参数。在计算层面上,Flink 的内存管理器提供了对内存的申请和释放,在数据传输层面上,Flink抽象了网络内存缓冲Buffer(1个Buffer对应一个MemarySegment) 的申请和释放。