Flink 提供了三种状态后端(用于存储状态数据),可以为所有 Flink 作业配置相同的状态后端,也可以为每个 Flink 作业配置指定的状态后端。状态可以存储在 Java 堆内存中或者堆外。

一、概述

1.1. 什么是 state?

Flink 在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互, state 中存储着每条数据消费后数据的消费点(生产环境需要持久化这些状态),当 Job 因为某种错误或者其他原因导致重启时,就能够从 checkpoint 中的 state 数据进行恢复。

1.2. State Backends

当需要对具体的某一种 State 做 Checkpoint 时,此时就需要具体的状态后端存储,Flink 内置提供了不同的状态后端存储,用于指定状态的存储方式和位置。状态可以存储在 Java 堆内存中或者堆外,在 Flink 安装路径下 conf 目录中的 flink-conf.yaml 配置文件中也有状态后端存储相关的配置。

Flink 支持基于每个 Job 单独设置状态后端存储,Flink 提供了以下三种状态后端(用于存储状态数据),可以为所有 Flink 作业配置相同的状态后端(flink-conf.yaml ),也可以为每个 Flink 作业配置指定的状态后端。

1.2.1. MemoryStateBackend

如果 Job 没有配置指定状态后端存储,默认采取 MemoryStateBackend 策略。MemoryStateBackend 是将状态后端存储在 TaskManagers 的内存 (JVM 堆) 中:键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序触发 checkpoint 时,会将此时的状态进行快照然后存储在 JobManager 的内存中。

默认情况下,MemoryStateBackend 异步快照,避免阻塞数据流的处理,从而避免反压的发生。

  1. 不适合在生产环境中使用,仅用于本地测试的情况较多,主要适用于状态很小的 Job,将状态最终存储在 JobManager 中,如果状态较大的话,那么会使得 JobManager 的内存比较紧张,从而导致 JobManager 会出现 OOM 等问题
  2. 另外就是 MemoryStateBackend 支持配置是否是异步快照还是同步快照

1.2.2. FsStateBackend

状态数据后端存储在 TaskManager 中的内存(JVM 堆)中,但是 checkpoint 的时候,它和 MemoryStateBackend 不一样,它是将状态存储在文件(可以是本地文件,也可以是 HDFS)中,比如: “hdfs://namenode:40010/flink/checkpoints” 或 “file://flink/checkpoints”

通常使用 HDFS 比较多,如果是使用本地文件,Job 重启后如果由调度器重新分配在不同的机器,TaskManager 执行时就会导致找不到之前的 checkkpoint

同样 FsStateBackend 也是支持异步或同步进行 checkpoint 的,此外 FsStateBackend 有个参数 fileStateThreshold,如果状态大小比 MAX_FILE_STATE_THRESHOLD(1MB) 小会将状态数据直接存储在 metadata 文件中,而不是存储在配置的文件中,避免出现很小的状态文件,如果fileStateThreshold 设为 “-1” 表示尚未配置,在这种情况下使用默认值(1024,该默认值可以通过 state.backend.fs.memory-threshold来配置)。

  1. 要处理大状态,长窗口等有状态的任务,那么 FsStateBackend 就比较适合
  2. 使用分布式文件系统,如 HDFS 等,这样 failover 时 Job 的状态可以恢复
  3. 工作状态仍然是存储在 Task Manager 中的内存中,虽然在 Checkpoint 的时候会存在文件中,注意这个状态要保证不超过 Task Manager 的内存

1.2.3. RocksDBStateBackend

RocksDB 是一种嵌入式的本地数据库,它会在本地文件系统中维护状态,KeyedStateBackend 等会直接写入本地 RocksDB 中,它还需要配置一个文件系统(一般是 HDFS),比如 hdfs://namenode:40010/flink/checkpoints,当触发 checkpoint 的时候,会把整个 RocksDB 数据库复制到配置的文件系统中去,当 failover 时从文件系统中将数据恢复到本地。

注意: RocksDB 不支持同步的 checkpoint,不过 RocksDB 支持增量的 checkpoint

官方推荐使用 RocksDB 来作为状态的后端存储:

  1. state 直接存放在 RocksDB 中,不需要存在内存中,这样就可以减少 TaskManager 的内存压力,同时 checkpoint 时将状态持久化到远端的文件系统,适合在生产环境中使用
  2. RocksDB 本身支持 checkpoint 功能
  3. RocksDBStateBackend 支持增量的 checkpoint