Flink 状态存储
Flink 提供了三种状态后端(用于存储状态数据),可以为所有 Flink 作业配置相同的状态后端,也可以为每个 Flink 作业配置指定的状态后端。状态可以存储在 Java 堆内存中或者堆外。
一、概述
1.1. 什么是 state?
Flink 在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互, state 中存储着每条数据消费后数据的消费点(生产环境需要持久化这些状态),当 Job 因为某种错误或者其他原因导致重启时,就能够从 checkpoint 中的 state 数据进行恢复。
1.2. State Backends
当需要对具体的某一种 State 做 Checkpoint 时,此时就需要具体的状态后端存储,Flink 内置提供了不同的状态后端存储,用于指定状态的存储方式和位置。状态可以存储在 Java 堆内存中或者堆外,在 Flink 安装路径下 conf 目录中的 flink-conf.yaml 配置文件中也有状态后端存储相关的配置。
Flink 支持基于每个 Job 单独设置状态后端存储,Flink 提供了以下三种状态后端(用于存储状态数据),可以为所有 Flink 作业配置相同的状态后端(flink-conf.yaml ),也可以为每个 Flink 作业配置指定的状态后端。
1.2.1. MemoryStateBackend
如果 Job 没有配置指定状态后端存储,默认采取 MemoryStateBackend 策略。MemoryStateBackend 是将状态后端存储在 TaskManagers 的内存 (JVM 堆) 中:键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序触发 checkpoint 时,会将此时的状态进行快照然后存储在 JobManager 的内存中。
默认情况下,MemoryStateBackend 异步快照,避免阻塞数据流的处理,从而避免反压的发生。
- 不适合在生产环境中使用,仅用于本地测试的情况较多,主要适用于状态很小的 Job,将状态最终存储在 JobManager 中,如果状态较大的话,那么会使得 JobManager 的内存比较紧张,从而导致 JobManager 会出现 OOM 等问题
- 另外就是 MemoryStateBackend 支持配置是否是异步快照还是同步快照
1.2.2. FsStateBackend
状态数据后端存储在 TaskManager 中的内存(JVM 堆)中,但是 checkpoint 的时候,它和 MemoryStateBackend 不一样,它是将状态存储在文件(可以是本地文件,也可以是 HDFS)中,比如: “hdfs://namenode:40010/flink/checkpoints” 或 “file://flink/checkpoints”
通常使用 HDFS 比较多,如果是使用本地文件,Job 重启后如果由调度器重新分配在不同的机器,TaskManager 执行时就会导致找不到之前的 checkkpoint
同样 FsStateBackend 也是支持异步或同步进行 checkpoint 的,此外 FsStateBackend 有个参数 fileStateThreshold,如果状态大小比 MAX_FILE_STATE_THRESHOLD(1MB)
小会将状态数据直接存储在 metadata 文件中,而不是存储在配置的文件中,避免出现很小的状态文件,如果fileStateThreshold 设为 “-1” 表示尚未配置,在这种情况下使用默认值(1024,该默认值可以通过 state.backend.fs.memory-threshold
来配置)。
- 要处理大状态,长窗口等有状态的任务,那么 FsStateBackend 就比较适合
- 使用分布式文件系统,如 HDFS 等,这样 failover 时 Job 的状态可以恢复
- 工作状态仍然是存储在 Task Manager 中的内存中,虽然在 Checkpoint 的时候会存在文件中,注意这个状态要保证不超过 Task Manager 的内存
1.2.3. RocksDBStateBackend
RocksDB 是一种嵌入式的本地数据库,它会在本地文件系统中维护状态,KeyedStateBackend 等会直接写入本地 RocksDB 中,它还需要配置一个文件系统(一般是 HDFS),比如 hdfs://namenode:40010/flink/checkpoints
,当触发 checkpoint 的时候,会把整个 RocksDB 数据库复制到配置的文件系统中去,当 failover 时从文件系统中将数据恢复到本地。
注意: RocksDB 不支持同步的 checkpoint,不过 RocksDB 支持增量的 checkpoint
官方推荐使用 RocksDB 来作为状态的后端存储:
- state 直接存放在 RocksDB 中,不需要存在内存中,这样就可以减少 TaskManager 的内存压力,同时 checkpoint 时将状态持久化到远端的文件系统,适合在生产环境中使用
- RocksDB 本身支持 checkpoint 功能
- RocksDBStateBackend 支持增量的 checkpoint