Flink状态后端:HashMap与RocksDB
一、前言
在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(State Backend)。状态后端主要负责管理本地状态的存储方式和位置,是Flink容错机制的核心支撑。
理解状态后端的选择与配置,对于保障Flink作业的性能、稳定性和可扩展性至关重要。本文将从原理到实战,全面剖析Flink的两种状态后端。
二、状态后端概述
2.1 什么是状态后端
状态后端(State Backend)是一个"开箱即用"的组件,可以在不改变应用程序逻辑的情况下独立配置。它决定了:
- 状态数据存储在哪里(内存/磁盘/远程存储)
- 状态如何被序列化和反序列化
- Checkpoint时状态如何被持久化
如上图所示,状态后端分为本地状态存储和持久化快照两部分。用户代码通过本地状态后端进行读写操作,而Checkpoint时将状态持久化到分布式文件系统(DFS)中。
2.2 状态后端的分类
Flink提供了两类不同的状态后端:
| 状态后端 | 存储位置 | 特点 |
|---|---|---|
| HashMapStateBackend | JVM堆内存 | 内存计算,读写速度快,受内存限制 |
| EmbeddedRocksDBStateBackend | 本地磁盘(RocksDB) | 可存储海量状态,支持增量Checkpoint |
如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
三、HashMapStateBackend详解
3.1 原理与架构
HashMapStateBackend是把状态存放在内存里。具体实现上,它在内部会直接把状态当作对象(objects),保存在TaskManager的JVM堆上。
存储结构:
- 普通的状态以及窗口中收集的数据和触发器,都会以键值对的形式存储起来
- 底层是一个哈希表(HashMap),因此得名HashMapStateBackend
- 状态数据以Java对象的形式直接存储在内存中
3.2 特点与适用场景
优点:
- 读写速度极快:状态以对象形式存储在内存中,无需序列化/反序列化开销
- 低延迟:内存访问速度远快于磁盘I/O
- 实现简单:直接利用JVM堆内存管理
缺点:
- 受内存限制:状态大小不能超过TaskManager可用内存
- 大状态容易OOM:如果状态持续增长,可能导致内存溢出
- Checkpoint时全量快照:每次Checkpoint都需要将全部状态数据写入外部存储
适用场景:
- 状态量较小(GB级别以下)
- 对延迟要求极高的场景
- 状态大小相对稳定,不会持续增长
四、EmbeddedRocksDBStateBackend详解
4.1 原理与架构
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中。
如上图所示,RocksDB状态后端的工作机制:
- TaskManager(JVM进程)内运行RocksDB(native线程)
- 状态数据存储在Local Disks本地磁盘上
- Checkpoint时将状态异步上传到Remote Durable Storage(如HDFS、S3)
4.2 特点与适用场景
优点:
- 存储容量大:可根据可用磁盘空间进行扩展,适合超级海量状态
- 异步快照:不会因为保存Checkpoint而阻塞数据处理
- 增量Checkpoint:只保存自上次Checkpoint以来的变化,大幅提升效率
- 状态数据序列化存储:状态被存储为序列化的字节数组
缺点:
- 读写性能较低:每次访问需要序列化/反序列化,性能比HashMap慢一个数量级
- key按字节比较:key的比较按照字节进行,而不是直接调用hashCode()和equals()
- 依赖本地磁盘:需要足够的本地磁盘空间
适用场景:
- 状态量极大(TB级别)
- 状态持续增长,需要持久化存储
- 对Checkpoint效率有要求,需要增量备份
4.3 RocksDB的存储结构
RocksDB使用LSM-Tree(Log-Structured Merge-Tree)结构存储数据:
- MemTable:内存中的写入缓冲区
- SST Files:磁盘上的有序字符串表文件
- Block Cache:缓存热点数据,加速读取
五、两种状态后端对比
5.1 核心对比表
| 对比维度 | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| 存储位置 | JVM堆内存 | 本地磁盘(RocksDB) |
| 存储容量 | 受内存限制 | 受磁盘空间限制 |
| 读写性能 | 极快(无序列化开销) | 较慢(需序列化/反序列化) |
| Checkpoint方式 | 同步/异步快照 | 始终异步快照 |
| 增量Checkpoint | 不支持 | 支持 |
| 状态对象存储 | 以Java对象存储 | 以序列化字节数组存储 |
| key比较方式 | 调用hashCode()和equals() | 按字节比较 |
| 适用状态大小 | 小状态(GB级以下) | 大状态(TB级) |
| 延迟要求 | 低延迟场景 | 可接受一定延迟 |
| 默认配置 | 是 | 否 |
5.2 如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。
选择HashMapStateBackend:
- 状态大小较小,可以放入内存
- 对处理延迟要求极高
- 状态大小相对稳定,不会持续增长
选择EmbeddedRocksDBStateBackend:
- 状态量极大,需要磁盘存储
- 状态持续增长,需要持久化
- 需要增量Checkpoint提升效率
- 可以接受一定的性能损耗
六、状态后端的配置方式
6.1 配置文件配置(全局配置)
在flink-conf.yaml中,可以使用state.backend配置默认状态后端。
# 默认状态后端state.backend:hashmap# 存放检查点的文件路径state.checkpoints.dir:hdfs://hadoop102:8020/flink/checkpoints配置项的可能值:
hashmap:配置HashMapStateBackendrocksdb:配置EmbeddedRocksDBStateBackend
配置示例:
# 配置HashMapStateBackendstate.backend:hashmapstate.checkpoints.dir:hdfs://hadoop102:8020/flink/checkpoints# 配置EmbeddedRocksDBStateBackendstate.backend:rocksdbstate.checkpoints.dir:hdfs://hadoop102:8020/flink/checkpoints# RocksDB增量Checkpoint配置(可选)state.backend.incremental:true6.2 代码中配置(单作业配置)
通过执行环境设置,可以为每个作业单独配置状态后端,覆盖集群配置文件的默认值。
配置HashMapStateBackend:
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// 设置HashMapStateBackendenv.setStateBackend(newHashMapStateBackend());配置EmbeddedRocksDBStateBackend:
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// 设置EmbeddedRocksDBStateBackendenv.setStateBackend(newEmbeddedRocksDBStateBackend());注意事项:
- 如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency>- 由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink),所以只要代码中没有使用RocksDB的相关内容,就不需要引入这个依赖。
6.3 配置方式优先级
| 配置方式 | 优先级 | 作用范围 |
|---|---|---|
| 代码中配置 | 最高 | 当前作业 |
| 提交参数(-D) | 中 | 当前作业 |
| flink-conf.yaml | 最低 | 集群所有作业 |
七、RocksDB增量Checkpoint
7.1 什么是增量Checkpoint
在Flink 1.15之前,只有RocksDB支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
7.2 增量Checkpoint的执行过程
增量Checkpoint的执行过程分为以下几个阶段:
- 带状态的算子任务将状态更改写入变更日志(记录状态)
- 状态物化:状态表定期保存,独立于Checkpoint
- 状态物化完成后,状态变更日志可以被截断到相应的点
7.3 增量Checkpoint的配置
在flink-conf.yaml中配置:
state.backend:rocksdbstate.backend.incremental:true在代码中配置:
// 启用RocksDB增量CheckpointEmbeddedRocksDBStateBackendbackend=newEmbeddedRocksDBStateBackend(true);env.setStateBackend(backend);7.4 通用增量Checkpoint(Changelog)
从Flink 1.15开始,不管HashMap还是RocksDB状态后端,都可以通过开启changelog实现通用的增量Checkpoint。
配置方式一:配置文件指定
state.backend.changelog.enabled:truestate.backend.changelog.storage:filesystem# 存储changelog数据dstl.dfs.base-path:hdfs://hadoop102:8020/changelogexecution.checkpointing.max-concurrent-checkpoints:1execution.savepoint-restore-mode:CLAIM配置方式二:代码中设置
需要引入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope></dependency>开启changelog:
env.enableChangelogStateBackend(true);注意事项:
- 目前标记为实验性功能
- Checkpoint的最大并发必须为1
- 从Flink 1.15开始,只有文件系统的存储类型实现可用
- 不支持NO_CLAIM模式
八、状态后端切换实战
8.1 使用Savepoint切换状态后端
使用Savepoint恢复状态时,可以更换状态后端。但需要注意:不要在代码中硬编码状态后端,而是通过配置文件或-D参数配置。
步骤一:提交作业(使用HashMapStateBackend)
bin/flink run-application-d-tyarn-application-Dstate.backend=hashmap-ccom.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar步骤二:停止作业时触发Savepoint
# 优雅停止并触发Savepointbin/flink stop-psavepoint路径 job-id-yidapplication-id# 或立即停止并触发Savepointbin/flink cancel-ssavepoint路径 job-id-yidapplication-id步骤三:从Savepoint恢复,同时修改状态后端
bin/flink run-application-d-tyarn-application-shdfs://hadoop102:8020/sp/savepoint-xxx-Dstate.backend=rocksdb-ccom.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar8.2 从Checkpoint恢复作业
bin/flink run-application-d-tyarn-application-Dstate.backend=rocksdb-shdfs://hadoop102:8020/chk/xxx/chk-xxx-ccom.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar注意:从Checkpoint恢复时不支持切换状态后端。
总结
本文详细讲解了Flink中的两种核心状态后端:
HashMapStateBackend:将状态存储在JVM堆内存中,读写速度快,适合小状态、低延迟场景
EmbeddedRocksDBStateBackend:将状态存储在本地RocksDB中,容量大、支持增量Checkpoint,适合大状态场景
配置方式:支持配置文件全局配置和代码单作业配置,代码配置优先级最高
RocksDB增量Checkpoint:只备份变化数据,显著提升Checkpoint效率,从Flink 1.15开始支持通用增量Checkpoint(changelog)
状态后端切换:可以通过Savepoint在恢复时切换状态后端,但Checkpoint恢复不支持切换
选择合适的状态后端,是保障Flink作业性能和稳定性的关键决策。在实际项目中,应根据状态大小、增长趋势、延迟要求等因素综合考量。
如果本文对你有帮助,欢迎点赞 👍 + 收藏 ⭐ + 关注 🔖,你的支持是我持续创作的动力!
