Flink Checkpoints 深入解读存储选型、目录结构与“保留检查点”实战
Flink通过周期性创建包含状态和流位置的checkpoint快照实现容错恢复。系统提供两种存储方式:JobManagerCheckpointStorage将快照保存在JobManager堆内存,适合轻量级场景但受限于内存大小;FileSystemCheckpointStorage则将快照持久化到文件系统,是生产环境首选方案。用户可以配置检查点在作业取消时保留(RETAIN_ON_CANCELLA
1. Checkpoint 是什么:状态 + 位点的一致性快照
开启 checkpointing 后,Flink 会周期性触发快照:
- 把 managed state 持久化
- 同时记录对应的 stream positions(例如 Kafka offset)
恢复时:
- Job 从最新成功 checkpoint 读取 state
- 并从对应位点继续消费,保证一致性恢复
2. Checkpoint Storage:状态快照到底存哪里?
Flink “开箱即用”提供两种 Checkpoint Storage:
- JobManagerCheckpointStorage
- FileSystemCheckpointStorage
默认策略:
- 如果配置了 checkpoint directory(例如
execution.checkpointing.dir或CheckpointingOptions.CHECKPOINTS_DIRECTORY),就用 FileSystemCheckpointStorage - 否则用 JobManagerCheckpointStorage
3. JobManagerCheckpointStorage:把快照放到 JM 堆内存(适合轻量)
3.1 原理
- checkpoint snapshot 存在 JobManager heap 里
- 可以设置单个 state 的最大值,超过就让 checkpoint 失败,以免 JM OOM
示例(代码级设置最大 state 大小):
new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
3.2 限制(非常关键)
- 单个 state 默认 5MB(可在构造器调大)
- 不管你怎么调最大 state,也不能超过 Pekko frame size(内部 RPC/消息传输限制)
- 总的 state 必须能放进 JobManager 内存
3.3 适用场景
- 本地开发/调试
- 基本不存 state 的作业(Map/FlatMap/Filter 等 record-at-a-time)
- Kafka consumer 这类本身只需要极少状态的场景
一句话:方便但不抗打,线上大多数情况不建议。
4. FileSystemCheckpointStorage:把快照写到文件系统(生产首选)
4.1 原理
配置一个文件系统目录:
hdfs://namenode:40010/flink/checkpointsfile:///data/flink/checkpoints- 以及对象存储(S3/OSS/GCS/…)通过对应 FS 插件
checkpoint 时:
- 状态写入文件系统目录下的文件
- JobManager 只保留少量元数据(HA 模式下元数据也会更稳妥存储)
4.2 适用场景
- 所有高可用(HA)集群
- 绝大多数生产作业(尤其是 RocksDB 大状态)
一句话:生产默认选它。
5. Retained / Externalized Checkpoints:取消作业时检查点留不留?
默认行为:
- checkpoint 只用于故障恢复
- 作业取消(cancel)时会删除
如果你想“取消后也保留一个 checkpoint 以便重启恢复”,就用 Externalized Checkpoints:
CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointRetention(
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION
);
两种模式:
-
RETAIN_ON_CANCELLATION
取消作业时保留 checkpoint
⚠️ 你需要手动清理这些 checkpoint 文件 -
DELETE_ON_CANCELLATION
取消作业时删除 checkpoint
只有作业失败时你才可能还有 checkpoint 可用
实战建议:
- 线上如果经常“停机维护/手动 cancel 再起”,通常选 RETAIN_ON_CANCELLATION
- 配合存储侧 lifecycle/TTL 或运维脚本做清理,避免存储爆炸
6. 目录结构:一个 checkpoint 目录里都有什么?
checkpoint 目录结构(FLINK-8531 引入的布局):
/user-defined-checkpoint-dir
/{job-id}
/shared/
/taskowned/
/chk-1/
/chk-2/
/chk-3/
...
含义(按你给的描述):
shared/:可能被多个 checkpoint 复用的状态文件taskowned/:必须永远不被 JobManager 丢弃的状态chk-N/:每次 checkpoint 的元数据与独占数据(不同 state backend 会有差异)
注意:checkpoint 目录不是 public API,未来版本可能会变。
7. 三种配置方式:全局 / 每作业 / 实例化存储
7.1 全局配置(flink-conf.yaml)
execution.checkpointing.dir: hdfs:///checkpoints/
7.2 每个作业配置(Configuration 注入)
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/");
env.configure(config);
7.3 指定存储实例(可调更底层参数)
比如写缓冲等:
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/");
config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, FILE_SIZE_THESHOLD);
env.configure(config);
8. 从保留的 Checkpoint 恢复:像恢复 Savepoint 一样
恢复时用 checkpoint 的 meta data file 路径:
bin/flink run -s :checkpointMetaDataPath [:runArgs]
注意点:
- 如果 meta data file 不是 self-contained(会引用 data files)
- JobManager 必须能访问它引用的那些数据文件(即共享/独占文件仍在)
9. 最常见的选型结论(一句话版)
- 本地/极小状态:
JobManagerCheckpointStorage - 生产/HA/大状态:
FileSystemCheckpointStorage - 需要 cancel 后可恢复:开启
RETAIN_ON_CANCELLATION,并配套清理策略
更多推荐


所有评论(0)