Flink 流式写入 Paimon 的完整机制:Checkpoint 与 Commit 深度剖析

一、整体架构概览

1.1 核心组件架构

Flink 流式写入 Paimon 的架构采用了经典的**两阶段提交(2PC)**模式,与 Flink 的 Checkpoint 机制深度集成,实现了端到端的 Exactly-Once 语义保证。

State Backend
Paimon Storage Layer
Checkpoint Coordinator
Flink Runtime
Trigger Checkpoint
Trigger Checkpoint
Notify Complete
Notify Complete
写入数据
Flush
Commit
snapshotState
snapshotState
Writer State
写入器状态
Committable State
待提交元数据
Write Buffer
in Memory
Temporary Files
未提交的数据文件
Manifest Files
元数据文件
Manifest List
快照索引
Snapshot
版本化快照
Checkpoint Coordinator
Data Source
Transformation Operators
TableWriteOperator
并行度=N
CommitterOperator
并行度=1

1.2 算子拓扑结构

Paimon Sink 由两个核心算子组成: 1

  1. Writer Operator(写入算子):负责接收数据流并写入临时文件,支持多并行度
  2. Committer Operator(提交算子):负责协调全局提交,并行度固定为 1

这种设计确保了:

  • 写入并行化:多个 Writer 并行写入,提高吞吐量
  • 提交串行化:单一 Committer 保证事务的原子性和顺序性

二、Checkpoint Barrier 传播机制

2.1 Barrier 传播流程

当 Flink Checkpoint Coordinator 触发 Checkpoint 时,Checkpoint Barrier 会沿着数据流从上游向下游传播:

Checkpoint Coordinator TableWriteOperator CommitterOperator Paimon Storage Trigger Checkpoint(checkpointId) 1. prepareSnapshotPreBarrier() 调用 prepareCommit() 生成 Committable 对象 发送 Committable(checkpointId) 2. snapshotState() write.snapshotState() state.snapshotState() Trigger Checkpoint(checkpointId) pollInputs() 按 checkpointId 分组 合并为 GlobalCommittable snapshotState() 持久化到 State Backend notifyCheckpointComplete(checkpointId) notifyCheckpointComplete(checkpointId) commitUpToCheckpoint(checkpointId) Snapshot Created Checkpoint Coordinator TableWriteOperator CommitterOperator Paimon Storage

2.2 PrepareSnapshotPreBarrier 阶段

在 Checkpoint Barrier 到达 Writer 之前,会先调用 prepareSnapshotPreBarrier() 方法: 2

这个阶段的核心职责是:

  1. 刷新 Buffer:将内存中的写入缓冲区刷新到磁盘
  2. 生成 Committable:收集本次 Checkpoint 期间写入的文件元数据
  3. 发送下游:将 Committable 发送到 CommitterOperator 3

2.3 Barrier 对齐机制

由于 Paimon 要求 Exactly-Once 语义,因此必须使用对齐的 Checkpoint(Aligned Checkpoint)4

这确保了:

  • 所有 Writer 在同一个 Checkpoint 边界刷新数据
  • 避免了未对齐 Checkpoint 导致的数据不一致

三、Pre-Commit 阶段深度解析

3.1 Writer Operator 的 snapshotState

Writer Operator 的 snapshotState() 方法是 Pre-Commit 阶段的核心: 5

这个方法执行以下操作:

3.1.1 调用 Write 层的 snapshotState 6

对于 StoreSinkWriteImpl,这个方法实际上是空实现,因为状态管理由 State 对象负责。

3.1.2 持久化 State 到 State Backend 7

State 对象会将以下信息持久化到 Flink State Backend:

  • CommitUser:提交用户标识,用于跨重启的一致性
  • Writer 状态:包括分区、桶、序列号等信息

3.2 Committer Operator 的 snapshotState

Committer Operator 的 snapshotState() 方法负责持久化待提交的元数据: 8

3.2.1 收集输入的 Committable 9

这个方法会:

  1. 按 CheckpointId 分组:将上游发来的 Committable 按照 Checkpoint ID 进行分组
  2. 合并元数据:调用 toCommittables() 将同一 Checkpoint 的所有 Committable 合并为 GlobalCommittable
  3. 防止重复提交:检测并防止相同 Checkpoint 的重复提交
3.2.2 持久化 GlobalCommittable 10

CommittableStateManager 会将 GlobalCommittable 序列化并存储到 Flink 的 Operator State 中,确保在作业重启后可以恢复这些待提交的数据。

3.3 未提交文件的临时状态管理

在 Pre-Commit 阶段,Paimon 生成的数据文件处于"临时"状态:

Commit Phase
Pre-Commit Phase
Write Phase
写入 Manifest
创建 Snapshot
文件变为正式文件
封装为 CommitMessage
包装为 Committable
持久化到 State Backend
内存 Buffer
数据写入
Flush 到磁盘
生成数据文件
file-xxx.parquet
3.3.1 数据文件生成 11

prepareCommit() 方法会:

  1. 调用底层的 TableWriteImpl.prepareCommit() 刷新所有 Writer
  2. 收集所有新写入的数据文件元数据(DataFileMeta
  3. 将文件元数据封装为 CommitMessage
  4. 再包装为 Committable 对象并标记对应的 CheckpointId

这些数据文件虽然已经写入磁盘,但在 Snapshot 创建之前,它们对读取者是不可见的。

四、Commit 阶段深度解析

4.1 notifyCheckpointComplete 的触发时机

当 Flink Checkpoint Coordinator 确认所有算子的 Checkpoint 都已成功完成后,会回调 notifyCheckpointComplete()12

4.1.1 触发条件

只有在以下条件都满足时,才会触发 Commit:

  1. 所有算子 Checkpoint 成功
  2. State Backend 持久化完成
  3. Checkpoint 协调器确认全局 Checkpoint 完成
4.1.2 Streaming vs Batch 模式 13

对于流式作业:

  • Checkpoint 开启时,数据只在 notifyCheckpointComplete() 中提交
  • endInput() 不执行提交操作

对于批作业:

  • Checkpoint 未开启,在 endInput() 时执行提交

4.2 commitUpToCheckpoint 执行流程 14

这个方法是 Commit 阶段的核心逻辑:

4.2.1 收集待提交的 Committable
committablesPerCheckpoint
TreeMap
headMap获取
checkpointId及之前
List
committables为空?
forceCreatingSnapshot?
创建空 Snapshot
跳过
执行 Commit
清理已提交的条目

方法会:

  1. 使用 headMap() 获取所有小于等于当前 CheckpointId 的 Committable
  2. 如果列表为空但需要强制创建快照,则创建一个空的 Committable
  3. 调用 committer.commit()committer.filterAndCommit() 执行提交
  4. 清理 committablesPerCheckpoint 中已提交的条目

4.3 StoreCommitter 的 Commit 执行 15

StoreCommitter 将 Committable 转换为 Paimon 可识别的格式:

4.3.1 Combine 阶段 16

这个方法会:

  1. 创建 ManifestCommittable 对象
  2. 遍历所有 Committable,根据类型分类:
    • FILE:添加到 fileCommittables
    • LOG_OFFSET:添加到 logOffsets(用于流式日志系统集成)
4.3.2 TableCommit 执行 17

TableCommitImpl.commitMultiple() 方法会:

  1. 遍历所有 ManifestCommittable
  2. 调用 FileStoreCommit.commit() 执行底层提交
  3. 如果有新快照产生,触发维护任务(快照过期、分区过期等)

4.4 Snapshot 创建与版本递增

Snapshot 创建是 Commit 阶段的最核心环节: 18

4.4.1 快照 ID 生成
获取最新快照
latestSnapshot 存在?
newSnapshotId = latestSnapshot.id + 1
newSnapshotId = 1
检查冲突
创建新快照

快照 ID 严格递增,保证了版本的有序性。

4.4.2 Manifest 文件合并

在创建新快照时,需要合并现有的 Manifest 文件以优化存储:

  1. 读取基线 Manifest:从最新快照读取所有 Manifest 文件
  2. 合并优化:使用 ManifestFileMerger.merge() 合并小文件
  3. 写入 Base Manifest List:将合并后的 Manifest 写入基线列表
4.4.3 写入 Delta Manifest

新增或修改的文件会写入 Delta Manifest:

  1. 写入 Manifest Files:调用 manifestFile.write(deltaFiles) 写入新的 Manifest 文件
  2. 创建 Manifest List:调用 manifestList.write() 创建 Delta Manifest List
  3. 处理 Changelog:如果有 Changelog 文件,单独写入 Changelog Manifest
4.4.4 Snapshot 对象创建

最终创建一个新的 Snapshot 对象,包含:

  • snapshotId:快照 ID
  • schemaId:Schema 版本
  • baseManifestList:基线 Manifest List(文件名和大小)
  • deltaManifestList:增量 Manifest List
  • changelogManifestList:Changelog Manifest List(如果有)
  • commitUser:提交用户
  • commitIdentifier:提交标识符
  • commitKind:提交类型(APPEND/COMPACT/OVERWRITE)
  • timeMillis:时间戳
  • logOffsets:日志偏移量(流式集成)
  • totalRecordCount:总记录数
  • deltaRecordCount:增量记录数
  • watermark:水位线
4.4.5 原子性提交

Snapshot 通过原子重命名或外部锁机制保证提交的原子性,确保并发提交时的安全性。

4.5 从临时文件到正式文件的转换

Writer Temporary Files Manifest Snapshot Reader Pre-Commit 阶段 写入数据文件 data-xxx.parquet 文件存在但不可见 Commit 阶段 文件元数据写入 Manifest Manifest 引用加入 Snapshot 原子性创建快照文件 snapshot-N 读取阶段 读取最新 Snapshot 获取 Manifest 列表 读取数据文件 文件变为正式可见 Writer Temporary Files Manifest Snapshot Reader

关键点:

  1. 文件不移动:数据文件从写入到提交,物理位置不变
  2. 可见性切换:通过 Snapshot 的创建,使文件对读取者可见
  3. 原子性保证:整个过程要么全部成功,要么全部失败

五、Abort 处理机制

5.1 Checkpoint 失败的回滚

当 Checkpoint 失败时,Flink 会触发作业重启,Paimon 的恢复机制如下:

存在
不存在
Checkpoint 失败
Flink 触发重启
从 State Backend
恢复状态
Writer 恢复
CommitUser
Committer 恢复
待提交的 Committable
Writer 继续处理
新数据
有待提交数据?
filterAndCommit
过滤已提交
检查快照是否存在
跳过该 Committable
重新提交
提交成功?
IntentionalFailure
触发重启
继续重试
5.1.1 State 恢复机制 19

Committer 在 initializeState() 时会:

  1. 从 State Backend 恢复 commitUser
  2. 通过 CommittableStateManager 恢复待提交的 Committable
  3. 创建 Committer 实例并传递恢复的状态
5.1.2 RestoreAndFailCommittableStateManager 20

这个特殊的 State Manager 实现了一个巧妙的机制:

  1. 恢复状态后,先提交所有待提交的 Committable
  2. 提交成功后,故意抛出异常
  3. 触发作业重启,让 Writer 基于最新的快照继续写入

这确保了:

  • 已经在 State 中的 Committable 不会丢失
  • Writer 能够基于最新提交的快照继续工作
5.1.3 FilterAndCommit 去重机制 21

filterAndCommitMultiple() 方法会:

  1. 按 identifier 排序所有 Committable
  2. 调用 commit.filterCommitted() 过滤已经提交的快照
  3. 只提交未提交的 Committable

过滤逻辑通过检查快照文件是否存在来判断是否已提交,这是一个幂等操作。

5.2 临时文件的清理机制

5.2.1 Abort 方法 22

当需要清理未提交的文件时,会调用 abort() 方法:

虽然 FileStoreCommitImpl.abort() 的具体实现在我查看的代码片段中没有完整展示,但根据架构设计,它会:

  1. 遍历所有 CommitMessage 中的文件
  2. 删除所有新文件(newFiles
  3. 删除所有 Changelog 文件(changelogFiles
  4. 删除所有 Compact 后的文件(compactAfter
5.2.2 文件存在性检查 23

在重试提交时,Paimon 会检查文件是否仍然存在:

  1. 收集所有需要提交的文件路径
  2. 并行检查文件是否存在于文件系统
  3. 如果有文件缺失,抛出异常并说明原因

这防止了因文件被误删除而导致的数据不一致。

5.2.3 Snapshot 过期与文件清理 24

maintain() 方法中,会触发:

  1. Consumer 过期:清理过期的消费者记录
  2. Snapshot 过期:删除旧快照及其 Manifest 文件
  3. Partition 过期:删除过期分区的数据
  4. Tag 管理:创建和过期标签

这是一个异步后台任务,不会阻塞提交流程。

六、完整的端到端流程

6.1 正常提交流程

Upstream Writer Operator Committer Operator State Backend Paimon Storage Checkpoint Coordinator 数据写入阶段 发送数据记录 写入内存 Buffer Checkpoint 触发 triggerCheckpoint(1) Pre-Commit 阶段 prepareSnapshotPreBarrier(1) 刷新 Buffer 到磁盘 生成 CommitMessage 发送 Committable(cp=1) snapshotState() 持久化 Writer State triggerCheckpoint(1) pollInputs() 按 cp 分组并合并 snapshotState() 持久化 GlobalCommittable notifyCheckpointComplete(1) Commit 阶段 commitUpToCheckpoint(1) committer.commit(committables) 写入 Manifest 创建 Snapshot-1 Success 数据可见 Upstream Writer Operator Committer Operator State Backend Paimon Storage Checkpoint Coordinator

6.2 失败恢复流程

Writer Operator Committer Operator State Backend Paimon Storage Flink Runtime Checkpoint-1 失败 触发作业重启 状态恢复阶段 initializeState() 恢复 CommitUser initializeState() 恢复 GlobalCommittable(cp=1) 提交恢复的数据 filterAndCommit() 检查 Snapshot-1 是否存在 不存在 提交 Committable(cp=1) 创建 Snapshot-1 抛出 IntentionalFailure 再次重启 触发作业重启 initializeState() initializeState() 检查 Snapshot-1 是否存在 存在,跳过 正常运行 基于 Snapshot-1 继续写入 Writer Operator Committer Operator State Backend Paimon Storage Flink Runtime

七、关键设计要点分析

7.1 两阶段提交的实现

Paimon 采用了经典的 2PC 协议:

准备阶段(Prepare)

  • Writer 生成临时文件和元数据
  • 将元数据持久化到 State Backend
  • 此时事务未提交,文件不可见

提交阶段(Commit)

  • Committer 收到 Checkpoint 完成通知
  • 将元数据写入 Manifest 和 Snapshot
  • 原子性地使数据可见

7.2 精确一次语义的保证

写入端

  • 每个数据记录只写入一次
  • 通过 Checkpoint ID 关联文件和快照

提交端

  • 单一 Committer 保证提交的串行性
  • filterCommitted 机制防止重复提交

读取端

  • 基于 Snapshot 的快照隔离
  • 读取到的数据总是已提交的完整快照

7.3 性能优化设计

并行写入

  • 多个 Writer 并行处理数据
  • 每个 Writer 独立管理分区和桶

异步 Compaction

  • 后台异步执行 Compaction
  • 不阻塞数据写入

Manifest 合并

  • 定期合并小 Manifest 文件
  • 减少小文件数量,提高读取性能

状态共享

  • State Backend 只存储必要的元数据
  • 数据文件直接写入对象存储

八、实战建议与最佳实践

8.1 Checkpoint 配置建议

# 推荐的 Checkpoint 配置
execution.checkpointing.interval: 60s  # 1分钟一次 Checkpoint
execution.checkpointing.mode: EXACTLY_ONCE  # 必须使用精确一次
execution.checkpointing.timeout: 10min  # 超时时间
execution.checkpointing.min-pause: 30s  # 最小间隔
execution.checkpointing.max-concurrent-checkpoints: 1  # 不允许并发

配置说明:

  1. Checkpoint 间隔:平衡延迟和吞吐量,建议 1-5 分钟
  2. 超时时间:根据数据量和文件系统性能调整
  3. 禁止并发 Checkpoint:Paimon 要求串行提交

8.2 资源配置建议

Writer 并行度

  • 根据数据量和分区数调整
  • 建议每个 Writer 处理 100-500 MB/s

Committer 资源25

  • 可以配置独立的 CPU 和内存
  • 建议至少 2GB 内存

Memory 配置

  • 合理配置 write-buffer-size 控制内存使用
  • 避免内存溢出导致的性能下降

8.3 监控指标

关键监控指标:

  1. Checkpoint 时长:监控是否有性能退化
  2. Committable 数量:过多表示提交积压
  3. Snapshot 创建频率:应与 Checkpoint 频率一致
  4. 文件数量:监控小文件问题
  5. 提交延迟:从 Checkpoint 完成到 Snapshot 创建的时间

九、常见问题与故障排查

9.1 Checkpoint 超时

原因分析

  • Writer 生成的文件过多
  • 网络 IO 瓶颈
  • State Backend 性能不足

解决方案

  1. 增加 Checkpoint 超时时间
  2. 优化 Writer 并行度
  3. 使用高性能 State Backend(如 RocksDB)

9.2 提交延迟过高

原因分析

  • Manifest 文件过多需要合并
  • Committer 资源不足
  • 文件系统性能问题

解决方案

  1. 增加 Committer 内存
  2. 调整 Manifest 合并参数
  3. 使用高性能文件系统

9.3 数据可见性延迟

现象:写入后一段时间才能读到数据

原因:这是正常现象,数据只有在 Checkpoint 完成后才可见

优化

  1. 缩短 Checkpoint 间隔(但会增加开销)
  2. 如果需要低延迟可见,考虑使用 Streaming Read

十、总结

Flink 流式写入 Paimon 的机制是一个精心设计的分布式事务系统,它巧妙地结合了 Flink 的 Checkpoint 机制和 Paimon 的快照隔离,实现了高性能、高可靠的流式数据湖写入。

核心特点

  1. 端到端精确一次:通过两阶段提交保证
  2. 高吞吐量:并行写入 + 异步提交
  3. 强一致性:基于 Snapshot 的快照隔离
  4. 容错能力:完善的故障恢复机制

关键技术点

  1. PrepareSnapshotPreBarrier:在 Barrier 前预提交
  2. GlobalCommittable:全局事务协调
  3. FilterAndCommit:幂等提交机制
  4. RestoreAndFail:智能故障恢复

最佳实践

  1. 合理配置 Checkpoint 参数
  2. 监控关键性能指标
  3. 根据业务需求调整并行度和资源

通过对这套机制的深入理解,可以更好地使用 Paimon 构建实时数据湖,实现高效的流批一体数据处理。


参考文献

  • Apache Paimon 官方文档
  • Flink Checkpoint 机制文档
  • 本文所有代码引用均来自 apache/paimon 仓库主分支

Citations

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java (L88-102)

    public DataStreamSink<?> sinkFrom(DataStream<T> input) {
        // This commitUser is valid only for new jobs.
        // After the job starts, this commitUser will be recorded into the states of write and
        // commit operators.
        // When the job restarts, commitUser will be recovered from states and this value is
        // ignored.
        return sinkFrom(input, createCommitUser(table.coreOptions().toConfiguration()));
    }

    public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
        // do the actually writing action, no snapshot generated in this stage
        DataStream<Committable> written = doWrite(input, initialCommitUser, null);
        // commit the committable to generate a new snapshot
        return doCommit(written, initialCommitUser);
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java (L241-242)

        configureSlotSharingGroup(
                committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java (L264-273)

    public static void assertStreamingConfiguration(StreamExecutionEnvironment env) {
        checkArgument(
                !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                "Paimon sink currently does not support unaligned checkpoints. Please set "
                        + "execution.checkpointing.unaligned.enabled to false.");
        checkArgument(
                env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE,
                "Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set "
                        + "execution.checkpointing.mode to exactly-once");
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java (L82-87)

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (!endOfInput) {
            emitCommittables(false, checkpointId);
        }
        // no records are expected to emit after endOfInput
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java (L103-106)

    private void emitCommittables(boolean waitCompaction, long checkpointId) throws IOException {
        prepareCommit(waitCompaction, checkpointId)
                .forEach(committable -> output.collect(new StreamRecord<>(committable)));
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java (L133-138)

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);

        write.snapshotState();
        state.snapshotState();
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java (L210-226)

    @Override
    public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
            throws IOException {
        List<Committable> committables = new ArrayList<>();
        if (write != null) {
            try {
                for (CommitMessage committable :
                        write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)) {
                    committables.add(
                            new Committable(checkpointId, Committable.Kind.FILE, committable));
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
        return committables;
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java (L228-231)

    @Override
    public void snapshotState() throws Exception {
        // do nothing
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java (L38-38)

    void snapshotState() throws Exception;

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L54-60)

    /**
     * If checkpoint is enabled we should do nothing in {@link CommitterOperator#endInput}.
     * Remaining data will be committed in {@link CommitterOperator#notifyCheckpointComplete}. If
     * checkpoint is not enabled we need to commit remaining data in {@link
     * CommitterOperator#endInput}.
     */
    protected final boolean streamingCheckpointEnabled;

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L115-148)

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        Preconditions.checkArgument(
                !forceSingleParallelism
                        || RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())
                                == 1,
                "Committer Operator parallelism in paimon MUST be one.");

        this.currentWatermark = Long.MIN_VALUE;
        this.endInput = false;
        // each job can only have one user name and this name must be consistent across restarts
        // we cannot use job id as commit user name here because user may change job id by creating
        // a savepoint, stop the job and then resume from savepoint
        commitUser =
                StateUtils.getSingleValueFromState(
                        context, "commit_user_state", String.class, initialCommitUser);
        int parallelism = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
        int index = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());

        // parallelism of commit operator is always 1, so commitUser will never be null
        committer =
                committerFactory.create(
                        Committer.createContext(
                                commitUser,
                                getMetricGroup(),
                                streamingCheckpointEnabled,
                                context.isRestored(),
                                context.getOperatorStateStore(),
                                parallelism,
                                index));

        committableStateManager.initializeState(context, committer);
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L164-168)

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        pollInputs();
        committableStateManager.snapshotState(context, committables(committablesPerCheckpoint));
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L190-193)

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        commitUpToCheckpoint(endInput ? END_INPUT_CHECKPOINT_ID : checkpointId);
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L195-218)

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        NavigableMap<Long, GlobalCommitT> headMap =
                committablesPerCheckpoint.headMap(checkpointId, true);
        List<GlobalCommitT> committables = committables(headMap);
        if (committables.isEmpty() && committer.forceCreatingSnapshot()) {
            committables =
                    Collections.singletonList(
                            toCommittables(checkpointId, Collections.emptyList()));
        }

        if (checkpointId == END_INPUT_CHECKPOINT_ID) {
            // In new versions of Flink, if a batch job fails, it might restart from some operator
            // in the middle.
            // If the job is restarted from the commit operator, endInput will be called again, and
            // the same commit messages will be committed again.
            // So when `endInput` is called, we must check if the corresponding snapshot exists.
            // However, if the snapshot does not exist, then append files must be new files. So
            // there is no need to check for duplicated append files.
            committer.filterAndCommit(committables, false, true);
        } else {
            committer.commit(committables);
        }
        headMap.clear();
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L240-276)

    private void pollInputs() throws Exception {
        Map<Long, List<CommitT>> grouped = committer.groupByCheckpoint(inputs);

        for (Map.Entry<Long, List<CommitT>> entry : grouped.entrySet()) {
            Long cp = entry.getKey();
            List<CommitT> committables = entry.getValue();
            // To prevent the asynchronous completion of tasks with multiple concurrent bounded
            // stream inputs, which leads to some tasks passing a Committable with cp =
            // END_INPUT_CHECKPOINT_ID during the endInput method call of the current checkpoint,
            // while other tasks pass a Committable with END_INPUT_CHECKPOINT_ID during other
            // checkpoints hence causing an error here, we have a special handling for Committables
            // with END_INPUT_CHECKPOINT_ID: instead of throwing an error, we merge them.
            if (cp != null
                    && cp == END_INPUT_CHECKPOINT_ID
                    && committablesPerCheckpoint.containsKey(cp)) {
                // Merge the END_INPUT_CHECKPOINT_ID committables here.
                GlobalCommitT commitT =
                        committer.combine(
                                cp,
                                currentWatermark,
                                committablesPerCheckpoint.get(cp),
                                committables);
                committablesPerCheckpoint.put(cp, commitT);
            } else if (committablesPerCheckpoint.containsKey(cp)) {
                throw new RuntimeException(
                        String.format(
                                "Repeatedly commit the same checkpoint files. \n"
                                        + "The previous files is %s, \n"
                                        + "and the subsequent files is %s",
                                committablesPerCheckpoint.get(cp), committables));
            } else {
                committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
            }
        }

        this.inputs.clear();
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableStateManager.java (L36-37)

    void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables)
            throws Exception;

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java (L79-106)

    public ManifestCommittable combine(
            long checkpointId, long watermark, List<Committable> committables) {
        ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
        return combine(checkpointId, watermark, manifestCommittable, committables);
    }

    @Override
    public ManifestCommittable combine(
            long checkpointId,
            long watermark,
            ManifestCommittable manifestCommittable,
            List<Committable> committables) {
        for (Committable committable : committables) {
            switch (committable.kind()) {
                case FILE:
                    CommitMessage file = (CommitMessage) committable.wrappedCommittable();
                    manifestCommittable.addFileCommittable(file);
                    break;
                case LOG_OFFSET:
                    LogOffsetCommittable offset =
                            (LogOffsetCommittable) committable.wrappedCommittable();
                    manifestCommittable.addLogOffset(
                            offset.bucket(), offset.offset(), allowLogOffsetDuplicate);
                    break;
            }
        }
        return manifestCommittable;
    }

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java (L109-114)

    public void commit(List<ManifestCommittable> committables)
            throws IOException, InterruptedException {
        commit.commitMultiple(committables, false);
        calcNumBytesAndRecordsOut(committables);
        commitListeners.notifyCommittable(committables);
    }

File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L223-256)

    public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        if (overwritePartition == null) {
            int newSnapshots = 0;
            for (ManifestCommittable committable : committables) {
                newSnapshots += commit.commit(committable, checkAppendFiles);
            }
            if (!committables.isEmpty()) {
                maintain(
                        committables.get(committables.size() - 1).identifier(),
                        maintainExecutor,
                        newSnapshots > 0 || expireForEmptyCommit);
            }
        } else {
            ManifestCommittable committable;
            if (committables.size() > 1) {
                throw new RuntimeException(
                        "Multiple committables appear in overwrite mode, this may be a bug, please report it: "
                                + committables);
            } else if (committables.size() == 1) {
                committable = committables.get(0);
            } else {
                // create an empty committable
                // identifier is Long.MAX_VALUE, come from batch job
                // TODO maybe it can be produced by CommitterOperator
                committable = new ManifestCommittable(Long.MAX_VALUE);
            }
            int newSnapshots =
                    commit.overwrite(overwritePartition, committable, Collections.emptyMap());
            maintain(
                    committable.identifier(),
                    maintainExecutor,
                    newSnapshots > 0 || expireForEmptyCommit);
        }
    }

File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L258-276)

    public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
        return filterAndCommitMultiple(committables, true);
    }

    public int filterAndCommitMultiple(
            List<ManifestCommittable> committables, boolean checkAppendFiles) {
        List<ManifestCommittable> sortedCommittables =
                committables.stream()
                        // identifier must be in increasing order
                        .sorted(Comparator.comparingLong(ManifestCommittable::identifier))
                        .collect(Collectors.toList());
        List<ManifestCommittable> retryCommittables = commit.filterCommitted(sortedCommittables);

        if (!retryCommittables.isEmpty()) {
            checkFilesExistence(retryCommittables);
            commitMultiple(retryCommittables, checkAppendFiles);
        }
        return retryCommittables.size();
    }

File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L278-333)

    private void checkFilesExistence(List<ManifestCommittable> committables) {
        List<Path> files = new ArrayList<>();
        DataFilePathFactories factories = new DataFilePathFactories(commit.pathFactory());
        IndexFilePathFactories indexFactories = new IndexFilePathFactories(commit.pathFactory());
        for (ManifestCommittable committable : committables) {
            for (CommitMessage message : committable.fileCommittables()) {
                CommitMessageImpl msg = (CommitMessageImpl) message;
                DataFilePathFactory pathFactory =
                        factories.get(message.partition(), message.bucket());
                IndexPathFactory indexFileFactory =
                        indexFactories.get(message.partition(), message.bucket());
                Consumer<DataFileMeta> collector = f -> files.addAll(f.collectFiles(pathFactory));
                msg.newFilesIncrement().newFiles().forEach(collector);
                msg.newFilesIncrement().changelogFiles().forEach(collector);
                msg.compactIncrement().compactBefore().forEach(collector);
                msg.compactIncrement().compactAfter().forEach(collector);
                msg.indexIncrement().newIndexFiles().stream()
                        .map(indexFileFactory::toPath)
                        .forEach(files::add);
                msg.indexIncrement().deletedIndexFiles().stream()
                        .map(indexFileFactory::toPath)
                        .forEach(files::add);
            }
        }

        Predicate<Path> nonExists =
                p -> {
                    try {
                        return !commit.fileIO().exists(p);
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                };

        List<Path> nonExistFiles =
                Lists.newArrayList(
                        randomlyExecuteSequentialReturn(
                                getExecutorService(null),
                                f -> nonExists.test(f) ? singletonList(f) : emptyList(),
                                files));

        if (!nonExistFiles.isEmpty()) {
            String message =
                    String.join(
                            "\n",
                            "Cannot recover from this checkpoint because some files in the snapshot that"
                                    + " need to be resubmitted have been deleted:",
                            "    "
                                    + nonExistFiles.stream()
                                            .map(Object::toString)
                                            .collect(Collectors.joining(",")),
                            "    The most likely reason is because you are recovering from a very old savepoint that"
                                    + " contains some uncommitted files that have already been deleted.");
            throw new RuntimeException(message);
        }
    }

File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L335-376)

    private void maintain(long identifier, ExecutorService executor, boolean doExpire) {
        if (maintainError.get() != null) {
            throw new RuntimeException(maintainError.get());
        }

        executor.execute(
                () -> {
                    try {
                        maintain(identifier, doExpire);
                    } catch (Throwable t) {
                        LOG.error("Executing maintain encountered an error.", t);
                        maintainError.compareAndSet(null, t);
                    }
                });
    }

    private void maintain(long identifier, boolean doExpire) {
        // expire consumer first to avoid preventing snapshot expiration
        if (doExpire && consumerExpireTime != null) {
            consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
        }

        if (doExpire && expireSnapshots != null) {
            expireSnapshots.run();
        }

        if (doExpire && partitionExpire != null) {
            partitionExpire.expire(identifier);
        }

        if (tagAutoManager != null) {
            TagAutoCreation tagAutoCreation = tagAutoManager.getTagAutoCreation();
            if (tagAutoCreation != null) {
                tagAutoCreation.run();
            }

            TagTimeExpire tagTimeExpire = tagAutoManager.getTagTimeExpire();
            if (doExpire && tagTimeExpire != null) {
                tagTimeExpire.expire();
            }
        }
    }

File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L391-393)

    public void abort(List<CommitMessage> commitMessages) {
        commit.abort(commitMessages);
    }

File: paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java (L897-1100)

            List<ManifestEntry> deltaFiles,
            List<ManifestEntry> changelogFiles,
            List<IndexManifestEntry> indexFiles,
            long identifier,
            @Nullable Long watermark,
            Map<Integer, Long> logOffsets,
            Map<String, String> properties,
            Snapshot.CommitKind commitKind,
            @Nullable Snapshot latestSnapshot,
            ConflictCheck conflictCheck,
            @Nullable String newStatsFileName) {
        long startMillis = System.currentTimeMillis();

        // Check if the commit has been completed. At this point, there will be no more repeated
        // commits and just return success
        if (retryResult != null && latestSnapshot != null) {
            Map<Long, Snapshot> snapshotCache = new HashMap<>();
            snapshotCache.put(latestSnapshot.id(), latestSnapshot);
            long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
            if (retryResult.latestSnapshot != null) {
                snapshotCache.put(retryResult.latestSnapshot.id(), retryResult.latestSnapshot);
                startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
            }
            for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
                Snapshot snapshot = snapshotCache.computeIfAbsent(i, snapshotManager::snapshot);
                if (snapshot.commitUser().equals(commitUser)
                        && snapshot.commitIdentifier() == identifier
                        && snapshot.commitKind() == commitKind) {
                    return new SuccessResult();
                }
            }
        }

        long newSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
        long firstRowIdStart = 0;
        if (latestSnapshot != null) {
            newSnapshotId = latestSnapshot.id() + 1;
            Long nextRowId = latestSnapshot.nextRowId();
            if (nextRowId != null) {
                firstRowIdStart = nextRowId;
            }
        }

        if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot >= 0) {
            for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) {
                Snapshot snapshot = snapshotManager.snapshot(id);
                if ((snapshot.commitKind() == Snapshot.CommitKind.COMPACT
                                || snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE)
                        && !snapshot.commitUser().equals(commitUser)) {
                    throw new RuntimeException(
                            String.format(
                                    "When trying to commit snapshot %d, "
                                            + "commit user %s has found a %s snapshot (id: %d) by another user %s. "
                                            + "Giving up committing as %s is set.",
                                    newSnapshotId,
                                    commitUser,
                                    snapshot.commitKind().name(),
                                    id,
                                    snapshot.commitUser(),
                                    CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
                }
            }
            strictModeLastSafeSnapshot = newSnapshotId - 1;
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
            for (ManifestEntry entry : deltaFiles) {
                LOG.debug("  * {}", entry);
            }
            LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId);
            for (ManifestEntry entry : changelogFiles) {
                LOG.debug("  * {}", entry);
            }
        }

        List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
        if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
            // latestSnapshotId is different from the snapshot id we've checked for conflicts,
            // so we have to check again
            List<BinaryRow> changedPartitions =
                    deltaFiles.stream()
                            .map(ManifestEntry::partition)
                            .distinct()
                            .collect(Collectors.toList());
            if (retryResult != null && retryResult.latestSnapshot != null) {
                baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
                List<SimpleFileEntry> incremental =
                        readIncrementalChanges(
                                retryResult.latestSnapshot, latestSnapshot, changedPartitions);
                if (!incremental.isEmpty()) {
                    baseDataFiles.addAll(incremental);
                    baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
                }
            } else {
                baseDataFiles =
                        readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
            }
            noConflictsOrFail(
                    latestSnapshot.commitUser(),
                    baseDataFiles,
                    SimpleFileEntry.from(deltaFiles),
                    commitKind);
        }

        Snapshot newSnapshot;
        Pair<String, Long> baseManifestList = null;
        Pair<String, Long> deltaManifestList = null;
        List<PartitionEntry> deltaStatistics;
        Pair<String, Long> changelogManifestList = null;
        String oldIndexManifest = null;
        String indexManifest = null;
        List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
        List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>();
        long nextRowIdStart = firstRowIdStart;
        try {
            long previousTotalRecordCount = 0L;
            Long currentWatermark = watermark;
            if (latestSnapshot != null) {
                previousTotalRecordCount = scan.totalRecordCount(latestSnapshot);
                // read all previous manifest files
                mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot);
                // read the last snapshot to complete the bucket's offsets when logOffsets does not
                // contain all buckets
                Map<Integer, Long> latestLogOffsets = latestSnapshot.logOffsets();
                if (latestLogOffsets != null) {
                    latestLogOffsets.forEach(logOffsets::putIfAbsent);
                }
                Long latestWatermark = latestSnapshot.watermark();
                if (latestWatermark != null) {
                    currentWatermark =
                            currentWatermark == null
                                    ? latestWatermark
                                    : Math.max(currentWatermark, latestWatermark);
                }
                oldIndexManifest = latestSnapshot.indexManifest();
            }

            // try to merge old manifest files to create base manifest list
            mergeAfterManifests =
                    ManifestFileMerger.merge(
                            mergeBeforeManifests,
                            manifestFile,
                            manifestTargetSize.getBytes(),
                            manifestMergeMinCount,
                            manifestFullCompactionSize.getBytes(),
                            partitionType,
                            manifestReadParallelism);
            baseManifestList = manifestList.write(mergeAfterManifests);

            if (rowTrackingEnabled) {
                // assigned snapshot id to delta files
                List<ManifestEntry> snapshotAssigned = new ArrayList<>();
                assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
                // assign row id for new files
                List<ManifestEntry> rowIdAssigned = new ArrayList<>();
                nextRowIdStart =
                        assignRowLineageMeta(firstRowIdStart, snapshotAssigned, rowIdAssigned);
                deltaFiles = rowIdAssigned;
            }

            // the added records subtract the deleted records from
            long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
            long totalRecordCount = previousTotalRecordCount + deltaRecordCount;

            // write new delta files into manifest files
            deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
            deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));

            // write changelog into manifest files
            if (!changelogFiles.isEmpty()) {
                changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
            }

            indexManifest =
                    indexManifestFile.writeIndexFiles(oldIndexManifest, indexFiles, bucketMode);

            long latestSchemaId =
                    schemaManager
                            .latestOrThrow("Cannot get latest schema for table " + tableName)
                            .id();

            // write new stats or inherit from the previous snapshot
            String statsFileName = null;
            if (newStatsFileName != null) {
                statsFileName = newStatsFileName;
            } else if (latestSnapshot != null) {
                Optional<Statistics> previousStatistic = statsFileHandler.readStats(latestSnapshot);
                if (previousStatistic.isPresent()) {
                    if (previousStatistic.get().schemaId() != latestSchemaId) {
                        LOG.warn("Schema changed, stats will not be inherited");
                    } else {
                        statsFileName = latestSnapshot.statistics();
                    }
                }
            }

            // prepare snapshot file
            newSnapshot =
                    new Snapshot(
                            newSnapshotId,
                            latestSchemaId,
                            baseManifestList.getLeft(),
                            baseManifestList.getRight(),

File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java (L49-60)

    protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer)
            throws Exception {
        int numCommitted = super.recover(committables, committer);
        if (numCommitted > 0) {
            throw new RuntimeException(
                    "This exception is intentionally thrown "
                            + "after committing the restored checkpoints. "
                            + "By restarting the job we hope that "
                            + "writers can start writing based on these new commits.");
        }
        return numCommitted;
    }

如果你喜欢这篇文章,请转发、点赞。扫描下方二维码关注我们,您会收到更多优质文章推送
在这里插入图片描述
关注「Java源码进阶」,获取海量java,大数据,机器学习资料!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐