1. Checkpoint 是什么:状态 + 位点的一致性快照

开启 checkpointing 后,Flink 会周期性触发快照:

  • 把 managed state 持久化
  • 同时记录对应的 stream positions(例如 Kafka offset)

恢复时:

  • Job 从最新成功 checkpoint 读取 state
  • 并从对应位点继续消费,保证一致性恢复

2. Checkpoint Storage:状态快照到底存哪里?

Flink “开箱即用”提供两种 Checkpoint Storage:

  • JobManagerCheckpointStorage
  • FileSystemCheckpointStorage

默认策略:

  • 如果配置了 checkpoint directory(例如 execution.checkpointing.dirCheckpointingOptions.CHECKPOINTS_DIRECTORY),就用 FileSystemCheckpointStorage
  • 否则用 JobManagerCheckpointStorage

3. JobManagerCheckpointStorage:把快照放到 JM 堆内存(适合轻量)

3.1 原理

  • checkpoint snapshot 存在 JobManager heap
  • 可以设置单个 state 的最大值,超过就让 checkpoint 失败,以免 JM OOM

示例(代码级设置最大 state 大小):

new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);

3.2 限制(非常关键)

  • 单个 state 默认 5MB(可在构造器调大)
  • 不管你怎么调最大 state,也不能超过 Pekko frame size(内部 RPC/消息传输限制)
  • 总的 state 必须能放进 JobManager 内存

3.3 适用场景

  • 本地开发/调试
  • 基本不存 state 的作业(Map/FlatMap/Filter 等 record-at-a-time)
  • Kafka consumer 这类本身只需要极少状态的场景

一句话:方便但不抗打,线上大多数情况不建议。

4. FileSystemCheckpointStorage:把快照写到文件系统(生产首选)

4.1 原理

配置一个文件系统目录:

  • hdfs://namenode:40010/flink/checkpoints
  • file:///data/flink/checkpoints
  • 以及对象存储(S3/OSS/GCS/…)通过对应 FS 插件

checkpoint 时:

  • 状态写入文件系统目录下的文件
  • JobManager 只保留少量元数据(HA 模式下元数据也会更稳妥存储)

4.2 适用场景

  • 所有高可用(HA)集群
  • 绝大多数生产作业(尤其是 RocksDB 大状态)

一句话:生产默认选它


5. Retained / Externalized Checkpoints:取消作业时检查点留不留?

默认行为:

  • checkpoint 只用于故障恢复
  • 作业取消(cancel)时会删除

如果你想“取消后也保留一个 checkpoint 以便重启恢复”,就用 Externalized Checkpoints:

CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointRetention(
    ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION
);

两种模式:

  • RETAIN_ON_CANCELLATION
    取消作业时保留 checkpoint
    ⚠️ 你需要手动清理这些 checkpoint 文件

  • DELETE_ON_CANCELLATION
    取消作业时删除 checkpoint
    只有作业失败时你才可能还有 checkpoint 可用

实战建议:

  • 线上如果经常“停机维护/手动 cancel 再起”,通常选 RETAIN_ON_CANCELLATION
  • 配合存储侧 lifecycle/TTL 或运维脚本做清理,避免存储爆炸

6. 目录结构:一个 checkpoint 目录里都有什么?

checkpoint 目录结构(FLINK-8531 引入的布局):

/user-defined-checkpoint-dir
  /{job-id}
    /shared/
    /taskowned/
    /chk-1/
    /chk-2/
    /chk-3/
    ...

含义(按你给的描述):

  • shared/:可能被多个 checkpoint 复用的状态文件
  • taskowned/:必须永远不被 JobManager 丢弃的状态
  • chk-N/:每次 checkpoint 的元数据与独占数据(不同 state backend 会有差异)

注意:checkpoint 目录不是 public API,未来版本可能会变。

7. 三种配置方式:全局 / 每作业 / 实例化存储

7.1 全局配置(flink-conf.yaml)

execution.checkpointing.dir: hdfs:///checkpoints/

7.2 每个作业配置(Configuration 注入)

Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/");
env.configure(config);

7.3 指定存储实例(可调更底层参数)

比如写缓冲等:

Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/");
config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, FILE_SIZE_THESHOLD);
env.configure(config);

8. 从保留的 Checkpoint 恢复:像恢复 Savepoint 一样

恢复时用 checkpoint 的 meta data file 路径

bin/flink run -s :checkpointMetaDataPath [:runArgs]

注意点:

  • 如果 meta data file 不是 self-contained(会引用 data files)
  • JobManager 必须能访问它引用的那些数据文件(即共享/独占文件仍在)

9. 最常见的选型结论(一句话版)

  • 本地/极小状态:JobManagerCheckpointStorage
  • 生产/HA/大状态:FileSystemCheckpointStorage
  • 需要 cancel 后可恢复:开启 RETAIN_ON_CANCELLATION,并配套清理策略
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐