Flink 写入 Paimon 流程:Checkpoint 与 Commit 深度剖析
Flink 流式写入 Paimon 的完整机制:Checkpoint 与 Commit 深度剖析
一、整体架构概览
1.1 核心组件架构
Flink 流式写入 Paimon 的架构采用了经典的**两阶段提交(2PC)**模式,与 Flink 的 Checkpoint 机制深度集成,实现了端到端的 Exactly-Once 语义保证。
1.2 算子拓扑结构
Paimon Sink 由两个核心算子组成: 1
- Writer Operator(写入算子):负责接收数据流并写入临时文件,支持多并行度
- Committer Operator(提交算子):负责协调全局提交,并行度固定为 1
这种设计确保了:
- 写入并行化:多个 Writer 并行写入,提高吞吐量
- 提交串行化:单一 Committer 保证事务的原子性和顺序性
二、Checkpoint Barrier 传播机制
2.1 Barrier 传播流程
当 Flink Checkpoint Coordinator 触发 Checkpoint 时,Checkpoint Barrier 会沿着数据流从上游向下游传播:
2.2 PrepareSnapshotPreBarrier 阶段
在 Checkpoint Barrier 到达 Writer 之前,会先调用 prepareSnapshotPreBarrier() 方法: 2
这个阶段的核心职责是:
- 刷新 Buffer:将内存中的写入缓冲区刷新到磁盘
- 生成 Committable:收集本次 Checkpoint 期间写入的文件元数据
- 发送下游:将 Committable 发送到 CommitterOperator 3
2.3 Barrier 对齐机制
由于 Paimon 要求 Exactly-Once 语义,因此必须使用对齐的 Checkpoint(Aligned Checkpoint): 4
这确保了:
- 所有 Writer 在同一个 Checkpoint 边界刷新数据
- 避免了未对齐 Checkpoint 导致的数据不一致
三、Pre-Commit 阶段深度解析
3.1 Writer Operator 的 snapshotState
Writer Operator 的 snapshotState() 方法是 Pre-Commit 阶段的核心: 5
这个方法执行以下操作:
3.1.1 调用 Write 层的 snapshotState 6
对于 StoreSinkWriteImpl,这个方法实际上是空实现,因为状态管理由 State 对象负责。
3.1.2 持久化 State 到 State Backend 7
State 对象会将以下信息持久化到 Flink State Backend:
- CommitUser:提交用户标识,用于跨重启的一致性
- Writer 状态:包括分区、桶、序列号等信息
3.2 Committer Operator 的 snapshotState
Committer Operator 的 snapshotState() 方法负责持久化待提交的元数据: 8
3.2.1 收集输入的 Committable 9
这个方法会:
- 按 CheckpointId 分组:将上游发来的 Committable 按照 Checkpoint ID 进行分组
- 合并元数据:调用
toCommittables()将同一 Checkpoint 的所有 Committable 合并为 GlobalCommittable - 防止重复提交:检测并防止相同 Checkpoint 的重复提交
3.2.2 持久化 GlobalCommittable 10
CommittableStateManager 会将 GlobalCommittable 序列化并存储到 Flink 的 Operator State 中,确保在作业重启后可以恢复这些待提交的数据。
3.3 未提交文件的临时状态管理
在 Pre-Commit 阶段,Paimon 生成的数据文件处于"临时"状态:
3.3.1 数据文件生成 11
prepareCommit() 方法会:
- 调用底层的
TableWriteImpl.prepareCommit()刷新所有 Writer - 收集所有新写入的数据文件元数据(
DataFileMeta) - 将文件元数据封装为
CommitMessage - 再包装为
Committable对象并标记对应的 CheckpointId
这些数据文件虽然已经写入磁盘,但在 Snapshot 创建之前,它们对读取者是不可见的。
四、Commit 阶段深度解析
4.1 notifyCheckpointComplete 的触发时机
当 Flink Checkpoint Coordinator 确认所有算子的 Checkpoint 都已成功完成后,会回调 notifyCheckpointComplete(): 12
4.1.1 触发条件
只有在以下条件都满足时,才会触发 Commit:
- 所有算子 Checkpoint 成功
- State Backend 持久化完成
- Checkpoint 协调器确认全局 Checkpoint 完成
4.1.2 Streaming vs Batch 模式 13
对于流式作业:
- Checkpoint 开启时,数据只在
notifyCheckpointComplete()中提交 endInput()不执行提交操作
对于批作业:
- Checkpoint 未开启,在
endInput()时执行提交
4.2 commitUpToCheckpoint 执行流程 14
这个方法是 Commit 阶段的核心逻辑:
4.2.1 收集待提交的 Committable
方法会:
- 使用
headMap()获取所有小于等于当前 CheckpointId 的 Committable - 如果列表为空但需要强制创建快照,则创建一个空的 Committable
- 调用
committer.commit()或committer.filterAndCommit()执行提交 - 清理
committablesPerCheckpoint中已提交的条目
4.3 StoreCommitter 的 Commit 执行 15
StoreCommitter 将 Committable 转换为 Paimon 可识别的格式:
4.3.1 Combine 阶段 16
这个方法会:
- 创建
ManifestCommittable对象 - 遍历所有 Committable,根据类型分类:
FILE:添加到fileCommittablesLOG_OFFSET:添加到logOffsets(用于流式日志系统集成)
4.3.2 TableCommit 执行 17
TableCommitImpl.commitMultiple() 方法会:
- 遍历所有
ManifestCommittable - 调用
FileStoreCommit.commit()执行底层提交 - 如果有新快照产生,触发维护任务(快照过期、分区过期等)
4.4 Snapshot 创建与版本递增
Snapshot 创建是 Commit 阶段的最核心环节: 18
4.4.1 快照 ID 生成
快照 ID 严格递增,保证了版本的有序性。
4.4.2 Manifest 文件合并
在创建新快照时,需要合并现有的 Manifest 文件以优化存储:
- 读取基线 Manifest:从最新快照读取所有 Manifest 文件
- 合并优化:使用
ManifestFileMerger.merge()合并小文件 - 写入 Base Manifest List:将合并后的 Manifest 写入基线列表
4.4.3 写入 Delta Manifest
新增或修改的文件会写入 Delta Manifest:
- 写入 Manifest Files:调用
manifestFile.write(deltaFiles)写入新的 Manifest 文件 - 创建 Manifest List:调用
manifestList.write()创建 Delta Manifest List - 处理 Changelog:如果有 Changelog 文件,单独写入 Changelog Manifest
4.4.4 Snapshot 对象创建
最终创建一个新的 Snapshot 对象,包含:
- snapshotId:快照 ID
- schemaId:Schema 版本
- baseManifestList:基线 Manifest List(文件名和大小)
- deltaManifestList:增量 Manifest List
- changelogManifestList:Changelog Manifest List(如果有)
- commitUser:提交用户
- commitIdentifier:提交标识符
- commitKind:提交类型(APPEND/COMPACT/OVERWRITE)
- timeMillis:时间戳
- logOffsets:日志偏移量(流式集成)
- totalRecordCount:总记录数
- deltaRecordCount:增量记录数
- watermark:水位线
4.4.5 原子性提交
Snapshot 通过原子重命名或外部锁机制保证提交的原子性,确保并发提交时的安全性。
4.5 从临时文件到正式文件的转换
关键点:
- 文件不移动:数据文件从写入到提交,物理位置不变
- 可见性切换:通过 Snapshot 的创建,使文件对读取者可见
- 原子性保证:整个过程要么全部成功,要么全部失败
五、Abort 处理机制
5.1 Checkpoint 失败的回滚
当 Checkpoint 失败时,Flink 会触发作业重启,Paimon 的恢复机制如下:
5.1.1 State 恢复机制 19
Committer 在 initializeState() 时会:
- 从 State Backend 恢复
commitUser - 通过
CommittableStateManager恢复待提交的 Committable - 创建 Committer 实例并传递恢复的状态
5.1.2 RestoreAndFailCommittableStateManager 20
这个特殊的 State Manager 实现了一个巧妙的机制:
- 恢复状态后,先提交所有待提交的 Committable
- 提交成功后,故意抛出异常
- 触发作业重启,让 Writer 基于最新的快照继续写入
这确保了:
- 已经在 State 中的 Committable 不会丢失
- Writer 能够基于最新提交的快照继续工作
5.1.3 FilterAndCommit 去重机制 21
filterAndCommitMultiple() 方法会:
- 按 identifier 排序所有 Committable
- 调用
commit.filterCommitted()过滤已经提交的快照 - 只提交未提交的 Committable
过滤逻辑通过检查快照文件是否存在来判断是否已提交,这是一个幂等操作。
5.2 临时文件的清理机制
5.2.1 Abort 方法 22
当需要清理未提交的文件时,会调用 abort() 方法:
虽然 FileStoreCommitImpl.abort() 的具体实现在我查看的代码片段中没有完整展示,但根据架构设计,它会:
- 遍历所有
CommitMessage中的文件 - 删除所有新文件(
newFiles) - 删除所有 Changelog 文件(
changelogFiles) - 删除所有 Compact 后的文件(
compactAfter)
5.2.2 文件存在性检查 23
在重试提交时,Paimon 会检查文件是否仍然存在:
- 收集所有需要提交的文件路径
- 并行检查文件是否存在于文件系统
- 如果有文件缺失,抛出异常并说明原因
这防止了因文件被误删除而导致的数据不一致。
5.2.3 Snapshot 过期与文件清理 24
在 maintain() 方法中,会触发:
- Consumer 过期:清理过期的消费者记录
- Snapshot 过期:删除旧快照及其 Manifest 文件
- Partition 过期:删除过期分区的数据
- Tag 管理:创建和过期标签
这是一个异步后台任务,不会阻塞提交流程。
六、完整的端到端流程
6.1 正常提交流程
6.2 失败恢复流程
七、关键设计要点分析
7.1 两阶段提交的实现
Paimon 采用了经典的 2PC 协议:
准备阶段(Prepare):
- Writer 生成临时文件和元数据
- 将元数据持久化到 State Backend
- 此时事务未提交,文件不可见
提交阶段(Commit):
- Committer 收到 Checkpoint 完成通知
- 将元数据写入 Manifest 和 Snapshot
- 原子性地使数据可见
7.2 精确一次语义的保证
写入端:
- 每个数据记录只写入一次
- 通过 Checkpoint ID 关联文件和快照
提交端:
- 单一 Committer 保证提交的串行性
- filterCommitted 机制防止重复提交
读取端:
- 基于 Snapshot 的快照隔离
- 读取到的数据总是已提交的完整快照
7.3 性能优化设计
并行写入:
- 多个 Writer 并行处理数据
- 每个 Writer 独立管理分区和桶
异步 Compaction:
- 后台异步执行 Compaction
- 不阻塞数据写入
Manifest 合并:
- 定期合并小 Manifest 文件
- 减少小文件数量,提高读取性能
状态共享:
- State Backend 只存储必要的元数据
- 数据文件直接写入对象存储
八、实战建议与最佳实践
8.1 Checkpoint 配置建议
# 推荐的 Checkpoint 配置
execution.checkpointing.interval: 60s # 1分钟一次 Checkpoint
execution.checkpointing.mode: EXACTLY_ONCE # 必须使用精确一次
execution.checkpointing.timeout: 10min # 超时时间
execution.checkpointing.min-pause: 30s # 最小间隔
execution.checkpointing.max-concurrent-checkpoints: 1 # 不允许并发
配置说明:
- Checkpoint 间隔:平衡延迟和吞吐量,建议 1-5 分钟
- 超时时间:根据数据量和文件系统性能调整
- 禁止并发 Checkpoint:Paimon 要求串行提交
8.2 资源配置建议
Writer 并行度:
- 根据数据量和分区数调整
- 建议每个 Writer 处理 100-500 MB/s
Committer 资源: 25
- 可以配置独立的 CPU 和内存
- 建议至少 2GB 内存
Memory 配置:
- 合理配置
write-buffer-size控制内存使用 - 避免内存溢出导致的性能下降
8.3 监控指标
关键监控指标:
- Checkpoint 时长:监控是否有性能退化
- Committable 数量:过多表示提交积压
- Snapshot 创建频率:应与 Checkpoint 频率一致
- 文件数量:监控小文件问题
- 提交延迟:从 Checkpoint 完成到 Snapshot 创建的时间
九、常见问题与故障排查
9.1 Checkpoint 超时
原因分析:
- Writer 生成的文件过多
- 网络 IO 瓶颈
- State Backend 性能不足
解决方案:
- 增加 Checkpoint 超时时间
- 优化 Writer 并行度
- 使用高性能 State Backend(如 RocksDB)
9.2 提交延迟过高
原因分析:
- Manifest 文件过多需要合并
- Committer 资源不足
- 文件系统性能问题
解决方案:
- 增加 Committer 内存
- 调整 Manifest 合并参数
- 使用高性能文件系统
9.3 数据可见性延迟
现象:写入后一段时间才能读到数据
原因:这是正常现象,数据只有在 Checkpoint 完成后才可见
优化:
- 缩短 Checkpoint 间隔(但会增加开销)
- 如果需要低延迟可见,考虑使用 Streaming Read
十、总结
Flink 流式写入 Paimon 的机制是一个精心设计的分布式事务系统,它巧妙地结合了 Flink 的 Checkpoint 机制和 Paimon 的快照隔离,实现了高性能、高可靠的流式数据湖写入。
核心特点:
- 端到端精确一次:通过两阶段提交保证
- 高吞吐量:并行写入 + 异步提交
- 强一致性:基于 Snapshot 的快照隔离
- 容错能力:完善的故障恢复机制
关键技术点:
- PrepareSnapshotPreBarrier:在 Barrier 前预提交
- GlobalCommittable:全局事务协调
- FilterAndCommit:幂等提交机制
- RestoreAndFail:智能故障恢复
最佳实践:
- 合理配置 Checkpoint 参数
- 监控关键性能指标
- 根据业务需求调整并行度和资源
通过对这套机制的深入理解,可以更好地使用 Paimon 构建实时数据湖,实现高效的流批一体数据处理。
参考文献:
- Apache Paimon 官方文档
- Flink Checkpoint 机制文档
- 本文所有代码引用均来自 apache/paimon 仓库主分支
Citations
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java (L88-102)
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the states of write and
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
return sinkFrom(input, createCommitUser(table.coreOptions().toConfiguration()));
}
public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser, null);
// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java (L241-242)
configureSlotSharingGroup(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java (L264-273)
public static void assertStreamingConfiguration(StreamExecutionEnvironment env) {
checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
"Paimon sink currently does not support unaligned checkpoints. Please set "
+ "execution.checkpointing.unaligned.enabled to false.");
checkArgument(
env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE,
"Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set "
+ "execution.checkpointing.mode to exactly-once");
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java (L82-87)
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
if (!endOfInput) {
emitCommittables(false, checkpointId);
}
// no records are expected to emit after endOfInput
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java (L103-106)
private void emitCommittables(boolean waitCompaction, long checkpointId) throws IOException {
prepareCommit(waitCompaction, checkpointId)
.forEach(committable -> output.collect(new StreamRecord<>(committable)));
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java (L133-138)
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
write.snapshotState();
state.snapshotState();
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java (L210-226)
@Override
public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {
List<Committable> committables = new ArrayList<>();
if (write != null) {
try {
for (CommitMessage committable :
write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)) {
committables.add(
new Committable(checkpointId, Committable.Kind.FILE, committable));
}
} catch (Exception e) {
throw new IOException(e);
}
}
return committables;
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java (L228-231)
@Override
public void snapshotState() throws Exception {
// do nothing
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java (L38-38)
void snapshotState() throws Exception;
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L54-60)
/**
* If checkpoint is enabled we should do nothing in {@link CommitterOperator#endInput}.
* Remaining data will be committed in {@link CommitterOperator#notifyCheckpointComplete}. If
* checkpoint is not enabled we need to commit remaining data in {@link
* CommitterOperator#endInput}.
*/
protected final boolean streamingCheckpointEnabled;
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L115-148)
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
Preconditions.checkArgument(
!forceSingleParallelism
|| RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())
== 1,
"Committer Operator parallelism in paimon MUST be one.");
this.currentWatermark = Long.MIN_VALUE;
this.endInput = false;
// each job can only have one user name and this name must be consistent across restarts
// we cannot use job id as commit user name here because user may change job id by creating
// a savepoint, stop the job and then resume from savepoint
commitUser =
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);
int parallelism = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
int index = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
// parallelism of commit operator is always 1, so commitUser will never be null
committer =
committerFactory.create(
Committer.createContext(
commitUser,
getMetricGroup(),
streamingCheckpointEnabled,
context.isRestored(),
context.getOperatorStateStore(),
parallelism,
index));
committableStateManager.initializeState(context, committer);
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L164-168)
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
pollInputs();
committableStateManager.snapshotState(context, committables(committablesPerCheckpoint));
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L190-193)
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
commitUpToCheckpoint(endInput ? END_INPUT_CHECKPOINT_ID : checkpointId);
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L195-218)
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
List<GlobalCommitT> committables = committables(headMap);
if (committables.isEmpty() && committer.forceCreatingSnapshot()) {
committables =
Collections.singletonList(
toCommittables(checkpointId, Collections.emptyList()));
}
if (checkpointId == END_INPUT_CHECKPOINT_ID) {
// In new versions of Flink, if a batch job fails, it might restart from some operator
// in the middle.
// If the job is restarted from the commit operator, endInput will be called again, and
// the same commit messages will be committed again.
// So when `endInput` is called, we must check if the corresponding snapshot exists.
// However, if the snapshot does not exist, then append files must be new files. So
// there is no need to check for duplicated append files.
committer.filterAndCommit(committables, false, true);
} else {
committer.commit(committables);
}
headMap.clear();
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java (L240-276)
private void pollInputs() throws Exception {
Map<Long, List<CommitT>> grouped = committer.groupByCheckpoint(inputs);
for (Map.Entry<Long, List<CommitT>> entry : grouped.entrySet()) {
Long cp = entry.getKey();
List<CommitT> committables = entry.getValue();
// To prevent the asynchronous completion of tasks with multiple concurrent bounded
// stream inputs, which leads to some tasks passing a Committable with cp =
// END_INPUT_CHECKPOINT_ID during the endInput method call of the current checkpoint,
// while other tasks pass a Committable with END_INPUT_CHECKPOINT_ID during other
// checkpoints hence causing an error here, we have a special handling for Committables
// with END_INPUT_CHECKPOINT_ID: instead of throwing an error, we merge them.
if (cp != null
&& cp == END_INPUT_CHECKPOINT_ID
&& committablesPerCheckpoint.containsKey(cp)) {
// Merge the END_INPUT_CHECKPOINT_ID committables here.
GlobalCommitT commitT =
committer.combine(
cp,
currentWatermark,
committablesPerCheckpoint.get(cp),
committables);
committablesPerCheckpoint.put(cp, commitT);
} else if (committablesPerCheckpoint.containsKey(cp)) {
throw new RuntimeException(
String.format(
"Repeatedly commit the same checkpoint files. \n"
+ "The previous files is %s, \n"
+ "and the subsequent files is %s",
committablesPerCheckpoint.get(cp), committables));
} else {
committablesPerCheckpoint.put(cp, toCommittables(cp, committables));
}
}
this.inputs.clear();
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableStateManager.java (L36-37)
void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables)
throws Exception;
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java (L79-106)
public ManifestCommittable combine(
long checkpointId, long watermark, List<Committable> committables) {
ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId, watermark);
return combine(checkpointId, watermark, manifestCommittable, committables);
}
@Override
public ManifestCommittable combine(
long checkpointId,
long watermark,
ManifestCommittable manifestCommittable,
List<Committable> committables) {
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
CommitMessage file = (CommitMessage) committable.wrappedCommittable();
manifestCommittable.addFileCommittable(file);
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
(LogOffsetCommittable) committable.wrappedCommittable();
manifestCommittable.addLogOffset(
offset.bucket(), offset.offset(), allowLogOffsetDuplicate);
break;
}
}
return manifestCommittable;
}
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java (L109-114)
public void commit(List<ManifestCommittable> committables)
throws IOException, InterruptedException {
commit.commitMultiple(committables, false);
calcNumBytesAndRecordsOut(committables);
commitListeners.notifyCommittable(committables);
}
File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L223-256)
public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
if (overwritePartition == null) {
int newSnapshots = 0;
for (ManifestCommittable committable : committables) {
newSnapshots += commit.commit(committable, checkAppendFiles);
}
if (!committables.isEmpty()) {
maintain(
committables.get(committables.size() - 1).identifier(),
maintainExecutor,
newSnapshots > 0 || expireForEmptyCommit);
}
} else {
ManifestCommittable committable;
if (committables.size() > 1) {
throw new RuntimeException(
"Multiple committables appear in overwrite mode, this may be a bug, please report it: "
+ committables);
} else if (committables.size() == 1) {
committable = committables.get(0);
} else {
// create an empty committable
// identifier is Long.MAX_VALUE, come from batch job
// TODO maybe it can be produced by CommitterOperator
committable = new ManifestCommittable(Long.MAX_VALUE);
}
int newSnapshots =
commit.overwrite(overwritePartition, committable, Collections.emptyMap());
maintain(
committable.identifier(),
maintainExecutor,
newSnapshots > 0 || expireForEmptyCommit);
}
}
File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L258-276)
public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
return filterAndCommitMultiple(committables, true);
}
public int filterAndCommitMultiple(
List<ManifestCommittable> committables, boolean checkAppendFiles) {
List<ManifestCommittable> sortedCommittables =
committables.stream()
// identifier must be in increasing order
.sorted(Comparator.comparingLong(ManifestCommittable::identifier))
.collect(Collectors.toList());
List<ManifestCommittable> retryCommittables = commit.filterCommitted(sortedCommittables);
if (!retryCommittables.isEmpty()) {
checkFilesExistence(retryCommittables);
commitMultiple(retryCommittables, checkAppendFiles);
}
return retryCommittables.size();
}
File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L278-333)
private void checkFilesExistence(List<ManifestCommittable> committables) {
List<Path> files = new ArrayList<>();
DataFilePathFactories factories = new DataFilePathFactories(commit.pathFactory());
IndexFilePathFactories indexFactories = new IndexFilePathFactories(commit.pathFactory());
for (ManifestCommittable committable : committables) {
for (CommitMessage message : committable.fileCommittables()) {
CommitMessageImpl msg = (CommitMessageImpl) message;
DataFilePathFactory pathFactory =
factories.get(message.partition(), message.bucket());
IndexPathFactory indexFileFactory =
indexFactories.get(message.partition(), message.bucket());
Consumer<DataFileMeta> collector = f -> files.addAll(f.collectFiles(pathFactory));
msg.newFilesIncrement().newFiles().forEach(collector);
msg.newFilesIncrement().changelogFiles().forEach(collector);
msg.compactIncrement().compactBefore().forEach(collector);
msg.compactIncrement().compactAfter().forEach(collector);
msg.indexIncrement().newIndexFiles().stream()
.map(indexFileFactory::toPath)
.forEach(files::add);
msg.indexIncrement().deletedIndexFiles().stream()
.map(indexFileFactory::toPath)
.forEach(files::add);
}
}
Predicate<Path> nonExists =
p -> {
try {
return !commit.fileIO().exists(p);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
List<Path> nonExistFiles =
Lists.newArrayList(
randomlyExecuteSequentialReturn(
getExecutorService(null),
f -> nonExists.test(f) ? singletonList(f) : emptyList(),
files));
if (!nonExistFiles.isEmpty()) {
String message =
String.join(
"\n",
"Cannot recover from this checkpoint because some files in the snapshot that"
+ " need to be resubmitted have been deleted:",
" "
+ nonExistFiles.stream()
.map(Object::toString)
.collect(Collectors.joining(",")),
" The most likely reason is because you are recovering from a very old savepoint that"
+ " contains some uncommitted files that have already been deleted.");
throw new RuntimeException(message);
}
}
File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L335-376)
private void maintain(long identifier, ExecutorService executor, boolean doExpire) {
if (maintainError.get() != null) {
throw new RuntimeException(maintainError.get());
}
executor.execute(
() -> {
try {
maintain(identifier, doExpire);
} catch (Throwable t) {
LOG.error("Executing maintain encountered an error.", t);
maintainError.compareAndSet(null, t);
}
});
}
private void maintain(long identifier, boolean doExpire) {
// expire consumer first to avoid preventing snapshot expiration
if (doExpire && consumerExpireTime != null) {
consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
}
if (doExpire && expireSnapshots != null) {
expireSnapshots.run();
}
if (doExpire && partitionExpire != null) {
partitionExpire.expire(identifier);
}
if (tagAutoManager != null) {
TagAutoCreation tagAutoCreation = tagAutoManager.getTagAutoCreation();
if (tagAutoCreation != null) {
tagAutoCreation.run();
}
TagTimeExpire tagTimeExpire = tagAutoManager.getTagTimeExpire();
if (doExpire && tagTimeExpire != null) {
tagTimeExpire.expire();
}
}
}
File: paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java (L391-393)
public void abort(List<CommitMessage> commitMessages) {
commit.abort(commitMessages);
}
File: paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java (L897-1100)
List<ManifestEntry> deltaFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties,
Snapshot.CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
ConflictCheck conflictCheck,
@Nullable String newStatsFileName) {
long startMillis = System.currentTimeMillis();
// Check if the commit has been completed. At this point, there will be no more repeated
// commits and just return success
if (retryResult != null && latestSnapshot != null) {
Map<Long, Snapshot> snapshotCache = new HashMap<>();
snapshotCache.put(latestSnapshot.id(), latestSnapshot);
long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
if (retryResult.latestSnapshot != null) {
snapshotCache.put(retryResult.latestSnapshot.id(), retryResult.latestSnapshot);
startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
}
for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
Snapshot snapshot = snapshotCache.computeIfAbsent(i, snapshotManager::snapshot);
if (snapshot.commitUser().equals(commitUser)
&& snapshot.commitIdentifier() == identifier
&& snapshot.commitKind() == commitKind) {
return new SuccessResult();
}
}
}
long newSnapshotId = Snapshot.FIRST_SNAPSHOT_ID;
long firstRowIdStart = 0;
if (latestSnapshot != null) {
newSnapshotId = latestSnapshot.id() + 1;
Long nextRowId = latestSnapshot.nextRowId();
if (nextRowId != null) {
firstRowIdStart = nextRowId;
}
}
if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot >= 0) {
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) {
Snapshot snapshot = snapshotManager.snapshot(id);
if ((snapshot.commitKind() == Snapshot.CommitKind.COMPACT
|| snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE)
&& !snapshot.commitUser().equals(commitUser)) {
throw new RuntimeException(
String.format(
"When trying to commit snapshot %d, "
+ "commit user %s has found a %s snapshot (id: %d) by another user %s. "
+ "Giving up committing as %s is set.",
newSnapshotId,
commitUser,
snapshot.commitKind().name(),
id,
snapshot.commitUser(),
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
}
}
strictModeLastSafeSnapshot = newSnapshotId - 1;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
for (ManifestEntry entry : deltaFiles) {
LOG.debug(" * {}", entry);
}
LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId);
for (ManifestEntry entry : changelogFiles) {
LOG.debug(" * {}", entry);
}
}
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
List<BinaryRow> changedPartitions =
deltaFiles.stream()
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
if (retryResult != null && retryResult.latestSnapshot != null) {
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
List<SimpleFileEntry> incremental =
readIncrementalChanges(
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
if (!incremental.isEmpty()) {
baseDataFiles.addAll(incremental);
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
}
} else {
baseDataFiles =
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
}
noConflictsOrFail(
latestSnapshot.commitUser(),
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
commitKind);
}
Snapshot newSnapshot;
Pair<String, Long> baseManifestList = null;
Pair<String, Long> deltaManifestList = null;
List<PartitionEntry> deltaStatistics;
Pair<String, Long> changelogManifestList = null;
String oldIndexManifest = null;
String indexManifest = null;
List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>();
long nextRowIdStart = firstRowIdStart;
try {
long previousTotalRecordCount = 0L;
Long currentWatermark = watermark;
if (latestSnapshot != null) {
previousTotalRecordCount = scan.totalRecordCount(latestSnapshot);
// read all previous manifest files
mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot);
// read the last snapshot to complete the bucket's offsets when logOffsets does not
// contain all buckets
Map<Integer, Long> latestLogOffsets = latestSnapshot.logOffsets();
if (latestLogOffsets != null) {
latestLogOffsets.forEach(logOffsets::putIfAbsent);
}
Long latestWatermark = latestSnapshot.watermark();
if (latestWatermark != null) {
currentWatermark =
currentWatermark == null
? latestWatermark
: Math.max(currentWatermark, latestWatermark);
}
oldIndexManifest = latestSnapshot.indexManifest();
}
// try to merge old manifest files to create base manifest list
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
manifestMergeMinCount,
manifestFullCompactionSize.getBytes(),
partitionType,
manifestReadParallelism);
baseManifestList = manifestList.write(mergeAfterManifests);
if (rowTrackingEnabled) {
// assigned snapshot id to delta files
List<ManifestEntry> snapshotAssigned = new ArrayList<>();
assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
// assign row id for new files
List<ManifestEntry> rowIdAssigned = new ArrayList<>();
nextRowIdStart =
assignRowLineageMeta(firstRowIdStart, snapshotAssigned, rowIdAssigned);
deltaFiles = rowIdAssigned;
}
// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
// write changelog into manifest files
if (!changelogFiles.isEmpty()) {
changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
}
indexManifest =
indexManifestFile.writeIndexFiles(oldIndexManifest, indexFiles, bucketMode);
long latestSchemaId =
schemaManager
.latestOrThrow("Cannot get latest schema for table " + tableName)
.id();
// write new stats or inherit from the previous snapshot
String statsFileName = null;
if (newStatsFileName != null) {
statsFileName = newStatsFileName;
} else if (latestSnapshot != null) {
Optional<Statistics> previousStatistic = statsFileHandler.readStats(latestSnapshot);
if (previousStatistic.isPresent()) {
if (previousStatistic.get().schemaId() != latestSchemaId) {
LOG.warn("Schema changed, stats will not be inherited");
} else {
statsFileName = latestSnapshot.statistics();
}
}
}
// prepare snapshot file
newSnapshot =
new Snapshot(
newSnapshotId,
latestSchemaId,
baseManifestList.getLeft(),
baseManifestList.getRight(),
File: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java (L49-60)
protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer)
throws Exception {
int numCommitted = super.recover(committables, committer);
if (numCommitted > 0) {
throw new RuntimeException(
"This exception is intentionally thrown "
+ "after committing the restored checkpoints. "
+ "By restarting the job we hope that "
+ "writers can start writing based on these new commits.");
}
return numCommitted;
}
如果你喜欢这篇文章,请转发、点赞。扫描下方二维码关注我们,您会收到更多优质文章推送
关注「Java源码进阶」,获取海量java,大数据,机器学习资料!
更多推荐



所有评论(0)