Paimon LSM Tree写入 和 Compaction 如何不冲突
Paimon 写入与合并操作的冲突处理与快照构建机制分析
Paimon 通过分离写入(Append)和合并(Compaction)的元数据记录,结合分步原子提交和严格冲突检测,确保并发操作下数据一致性。即使写入与合并并发执行,也不会产生数据冲突。
写入与 Compaction 分开记录是否冲突?
不会冲突。核心机制由 FileStoreCommitImpl
实现,通过以下设计保证:
1. 分离变更类型
-
写入阶段(MergeTreeWriter):
-
新写入数据(
write()
)与后台 Compaction 产生的变更分别记录在不同集合:-
新数据 →
newFiles
-
Compaction 变更 →
compactBefore
/compactAfter
-
-
-
提交阶段(
prepareCommit
):-
所有变更打包为
CommitMessage
,但内部保持分类存储。
-
2. 分步提交与冲突检测流程
步骤 |
操作类型 |
处理逻辑 |
冲突检测重点 |
---|---|---|---|
1 |
APPEND(新写入) |
- 优先处理 |
- 检查新增文件与当前快照中受影响分区文件( |
2 |
生成 APPEND 快照 |
- 创建新快照(如快照 N) |
- 基于原子性文件操作(如 |
3 |
COMPACT(合并) |
- 在 APPEND 快照基础上处理 |
- 验证合并前旧文件( |
4 |
生成 COMPACT 快照 |
- 创建新快照(如快照 N+1) |
- 原子性提交 |
关键机制:一次写入任务可能原子性生成两个连续快照(APPEND + COMPACT)。任一冲突检测失败将导致整体回滚,避免中间状态或元数据损坏。
快照构建详细流程
通过 FileStoreCommitImpl.java
的 commit
方法展开分析:
1. 收集变更(collectChanges
)
将 CommitMessage
中的文件变更分类至不同列表:
-
appendTableFiles
:新写入的数据文件 -
compactTableFiles
:Compaction 涉及的文件变更(删除旧文件 + 添加新文件) -
其他文件(如 changelog、索引文件等)
2. 提交 APPEND 快照(tryCommit
with CommitKind.APPEND
)
关键步骤:
-
锁定基线:获取当前最新快照(
latestSnapshot
) -
读取存量文件:提取
latestSnapshot
中与本次写入相关分区的所有文件列表(baseEntries
) -
冲突检查(
noConflictsOrFail
):-
模拟将
appendTableFiles
应用于baseEntries
-
检查目标:删除文件是否存在 + LSM 结构合法性(如 Level 1+ 文件的 key 范围不重叠)
-
失败时抛出异常并终止提交
-
-
生成新快照:
-
调用
tryCommitOnce
创建新 deltaManifestList 记录appendTableFiles
变更 -
基于
latestSnapshot
的baseManifestList
和新 deltaManifestList 构建APPEND
类型快照 -
通过文件系统原子操作(如
rename
)持久化快照文件
-
3. 提交 COMPACT 快照(tryCommit
with CommitKind.COMPACT
)
关键步骤:
-
更新基线:将上一步
appendTableFiles
内容合并至baseEntries
,形成新基线 -
冲突检查(
noConflictsOrFail
):-
在新基线上模拟应用
compactTableFiles
变更 -
重复文件存在性与 LSM 结构合法性检查
-
-
生成新快照:
-
调用
tryCommitOnce
基于最新 APPEND 快照创建COMPACT
类型快照 -
新 deltaManifestList 记录
compactTableFiles
变更 -
通过原子操作持久化快照文件
-
代码:
public void commit(ManifestCommittable committable, boolean checkAppendFiles) {
LOG.info(
"Ready to commit to table {}, number of commit messages: {}",
tableName,
committable.fileCommittables().size());
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit\n{}", committable);
}
long started = System.nanoTime();
int generatedSnapshot = 0;
int attempts = 0;
Snapshot latestSnapshot = null;
Long safeLatestSnapshotId = null;
List<SimpleFileEntry> baseEntries = new ArrayList<>();
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
appendHashIndexFiles,
compactDvIndexFiles);
try {
List<SimpleFileEntry> appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
|| !appendHashIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
latestSnapshot, appendTableFiles, compactTableFiles));
noConflictsOrFail(
latestSnapshot.commitUser(),
baseEntries,
appendSimpleEntries,
Snapshot.CommitKind.APPEND);
safeLatestSnapshotId = latestSnapshot.id();
}
attempts +=
tryCommit(
appendTableFiles,
appendChangelog,
appendHashIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
Snapshot.CommitKind.APPEND,
noConflictCheck(),
null);
generatedSnapshot += 1;
}
if (!compactTableFiles.isEmpty()
|| !compactChangelog.isEmpty()
|| !compactDvIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 2:
// Add appendChanges to the manifest entries read above and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendSimpleEntries);
noConflictsOrFail(
latestSnapshot.commitUser(),
baseEntries,
SimpleFileEntry.from(compactTableFiles),
Snapshot.CommitKind.COMPACT);
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
}
attempts +=
tryCommit(
compactTableFiles,
compactChangelog,
compactDvIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
Snapshot.CommitKind.COMPACT,
hasConflictChecked(safeLatestSnapshotId),
null);
generatedSnapshot += 1;
}
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
LOG.info("Finished commit to table {}, duration {} ms", tableName, commitDuration);
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
}
}
}
private int tryCommit(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties,
Snapshot.CommitKind commitKind,
ConflictCheck conflictCheck,
@Nullable String statsFileName) {
int retryCount = 0;
RetryResult retryResult = null;
long startMillis = System.currentTimeMillis();
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
CommitResult result =
tryCommitOnce(
retryResult,
tableFiles,
changelogFiles,
indexFiles,
identifier,
watermark,
logOffsets,
properties,
commitKind,
latestSnapshot,
conflictCheck,
statsFileName);
if (result.isSuccess()) {
break;
}
retryResult = (RetryResult) result;
if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
String message =
String.format(
"Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.",
commitTimeout, retryCount);
throw new RuntimeException(message, retryResult.exception);
}
commitRetryWait(retryCount);
retryCount++;
}
return retryCount + 1;
}
为什么 Compact 提交时不重新获取最新快照?
这主要是出于 性能优化 和 单个提交单元的原子性 考虑。
在 FileStoreCommitImpl.java
的 commit
方法中,逻辑如下:
在提交开始时,代码通过以下方式获取一次最新的快照:
latestSnapshot = snapshotManager.latestSnapshot();
基于这个 latestSnapshot
,它为 APPEND 类型的变更(新写入的文件)做了一次冲突检查。如果检查通过,它会记录下:
safeLatestSnapshotId = latestSnapshot.id();
这相当于一个 乐观锁,其含义是:
“我已经基于快照 X 检查过 APPEND 了,如果在我提交时,最新快照仍然是 X,那就不用再检查了。”
执行 tryCommit
,生成一个新的快照(我们称之为 X+1)。
接下来处理 COMPACT 变更时,它并没有去重新请求最新的快照(即 X+1)。
相反,它在 内存中模拟了 APPEND 提交成功后的状态:
baseEntries.addAll(appendSimpleEntries);
然后,基于这个内存中的、模拟的、最新的状态,对 COMPACT 变更进行冲突检查。
这同样是 乐观的,其假设是:APPEND 成功后,没有其他并发的提交插入进来。
这种设计避免了在处理 APPEND 和 COMPACT 之间,再次从文件系统读取和解析大量的 Manifest 文件。
因为 读取 Manifest 是一个相对昂贵的操作。
通过在内存中模拟状态的演进,它可以在 一次提交(ManifestCommittable)内部高效地完成对 APPEND 和 COMPACT 两部分变更的连续性校验。
tryCommit
方法 最终会调用 tryCommitOnce
。在 tryCommitOnce
中有如下逻辑:
// ...existing code...
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
// ...existing code...
noConflictsOrFail(
latestSnapshot.commitUser(),
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
commitKind);
}
// ...existing code...
这里的 conflictCheck.shouldCheck(latestSnapshot.id())
就是关键。
在 commit
方法中,为 COMPACT 变更调用 tryCommit
时,传入的是:
hasConflictChecked(safeLatestSnapshotId)
而此时,safeLatestSnapshotId
已经被更新为 APPEND 快照之后的值(即 X+1)。
当 tryCommitOnce
为 COMPACT 执行时,它获取的 latestSnapshot
确实是 APPEND 提交后产生的那个新快照(X+1)。
此时:
-
latestSnapshot.id()
的值为 X+1 -
safeLatestSnapshotId
的值也是 X+1
因此 shouldCheck
返回 false,从而 跳过了昂贵的冲突检查。
如果在 APPEND 提交后,另一个 Job 抢先提交了一个快照(X+2),那么当我们的 COMPACT 部分执行 tryCommitOnce 时:
-
它获取到的
latestSnapshot
将是 X+2 -
latestSnapshot.id()
(值为 X+2)不等于safeLatestSnapshotId
(值为 X+1) -
因此
shouldCheck
返回 true
此时,代码会 回退到慢速路径:
-
重新从文件系统读取从快照 X+1 到 X+2 的所有变更,
-
应用到
baseDataFiles
上, -
然后基于这个 真正最新的状态,重新进行一次 完整的冲突检查。
如果检查失败,整个提交会重试。
对于只有 Compaction 的任务怎么处理?
这种情况非常常见,例如一个 专门用于合并的 Flink 作业。
其处理流程会 简化很多:
-
在
commit
方法中,经过collectChanges
之后,appendTableFiles
列表是 空的。 -
第一个判断:
if (!ignoreEmptyCommit ...)
其条件不满足,因此 跳过 APPEND 提交。
-
直接进入:
if (!compactTableFiles.isEmpty() ...)
-
此时,
safeLatestSnapshotId
是 null,所以 乐观检查的优化不会被触发。 -
代码直接调用
tryCommit
来提交 COMPACT 变更。 -
在
tryCommitOnce
内部,它会 获取最新的快照,并 基于它进行完整的冲突检查。
所以,对于 只有 Compaction 的任务,它会 直接走标准的 “获取最新快照 -> 检查冲突 -> 提交” 流程,逻辑非常清晰。
noConflictsOrFail
方法
FileStoreCommitImpl中
用于检测在提交操作中是否存在冲突,以确保数据一致性。它主要处理以下几种冲突情况:
-
总桶数变化冲突:
- 当提交类型不是
OVERWRITE
时,会检查所有分区的总桶数是否保持一致。如果某个分区的总桶数发生变化(且没有使用覆盖写入),则会抛出冲突异常。 - 这种检查对于 compaction 和直接写入都适用,确保在没有明确覆盖意图的情况下,分区的桶数结构保持稳定。
- 当提交类型不是
-
文件删除冲突:
- 通过
assertNoDelete
方法检查,确保不会尝试删除之前未添加的文件。 - 如果检测到尝试删除不存在的文件,会抛出异常。
- 这种检查可以防止意外删除数据文件,对于 compaction(可能涉及文件合并和删除)和直接写入(可能涉及文件更新)都很重要。
- 通过
-
LSM 冲突检测:
- 对于 LSM 表(level >= 1),会检查同一分区、桶和级别的文件之间是否存在键范围重叠。
- 如果发现键范围重叠,说明存在冲突,会抛出异常。
- 这种检查主要针对 compaction 操作,确保在合并文件时不会产生键范围冲突。
-
分区过期冲突:
- 在
assertConflictForPartitionExpire
方法中,如果启用了分区过期功能且是基于值的过期策略,会检查是否尝试写入已过期的分区。 - 如果所有被删除的分区都已过期,则会抛出异常,提示用户过滤掉这些数据。
- 在
对于 compaction 和直接写入操作,冲突的定义主要包括:
- 在没有覆盖意图的情况下更改分区结构(如桶数)。
- 尝试删除不存在的文件。
- 在 LSM 表中合并文件时产生键范围重叠。
- 向已过期的分区写入数据。Paimon 允许用户配置分区过期策略,基于分区值的时间戳或分区最后更新时间来判断分区是否过期。过期的分区会被逻辑删除,最新的快照无法查询其数据。
这些冲突检测机制确保了在并发写入或 compaction 过程中数据的一致性和完整性。
总结:Paimon 的一致性保障设计
-
变更类型解耦
写入与合并操作通过分离记录(
newFiles
vscompactBefore/After
)实现逻辑隔离。 -
分阶段严格校验
-
先提交 APPEND 并生成独立快照,再基于其结果提交 COMPACT
-
每阶段均执行文件存在性及 LSM 结构冲突检测
-
-
原子性快照生成
-
每个快照仅记录增量变更(delta),依赖文件系统原子操作(如
rename
)确保“全有或全无” -
避免中间状态,保障系统健壮性
-
最终效果:并发写入与合并操作通过分步提交与严格校验实现无冲突数据一致性,元数据始终处于有效状态。
MergeTreeWriter
MergeTreeWriter
像一个指挥官,它协调多个组件来完成工作。
每个分区的每个桶 有 且 只有 一个
MergeTreeWriter
构造中通过 newSequenceNumber = maxSequenceNumber + 1; 尽可能维护统一的序列号
public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
// ... 内存缓冲与溢写相关配置 ...
private final boolean writeBufferSpillable;
private final MemorySize maxDiskSize;
private final int sortMaxFan;
private final CompressOptions sortCompression;
private final IOManager ioManager;
// ... 核心功能组件 ...
private final CompactManager compactManager;
private final MergeFunction<KeyValue> mergeFunction;
private final KeyValueFileWriterFactory writerFactory;
private final ChangelogProducer changelogProducer;
// ... 状态与结果追踪 ...
private final LinkedHashSet<DataFileMeta> newFiles;
private final LinkedHashSet<DataFileMeta> deletedFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
private final LinkedHashMap<String, DataFileMeta> compactBefore;
private final LinkedHashSet<DataFileMeta> compactAfter;
private final LinkedHashSet<DataFileMeta> compactChangelog;
// ... 内存缓冲区实例 ...
private WriteBuffer writeBuffer;
// ...
}
writeBuffer
(SortBufferWriteBuffer
): 这是它的内存组件。所有新数据首先进入这里进行排序和预合并。setMemoryPool
方法会为其注入内存池。compactManager
: 这是它的合并任务管理器。它负责维护该 Bucket 内所有数据文件的层级结构(Levels),并根据策略决定何时、对哪些文件发起 Compaction。mergeFunction
: 定义了数据合并的逻辑。例如,对于主键表,它可能是“保留最新的值”(Deduplicate);对于聚合表,它可能是“对值进行累加”。writerFactory
(KeyValueFileWriterFactory
): 文件写入工厂,负责创建用于写入数据文件(SST)和 Changelog 文件的RollingFileWriter
。changelogProducer
: 配置项,决定了 Changelog 的生成策略(不生成、从输入生成等)。- 各种
Set
和Map
: 这些集合用于追踪文件状态。newFiles
: 记录从内存writeBuffer
刷盘后生成的新文件。compactBefore
/compactAfter
: 记录一次 Compaction 操作中,被合并的旧文件和生成的新文件。*Changelog
: 记录相关的 Changelog 文件。- 这些集合中的内容最终会在
prepareCommit
时被打包成CommitIncrement
。
写入流程 (write
方法)
// ... existing code ...
@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
// 1. 尝试将数据写入内存缓冲区
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
// 2. 如果内存缓冲区满了,先执行刷盘
flushWriteBuffer(false, false);
// 3. 再次尝试写入
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}
private long newSequenceNumber() {
return newSequenceNumber++;
}
// ... existing code ...
- 为每条记录分配一个递增的
sequenceNumber
,用于保证时序。 - 调用
writeBuffer.put()
尝试将数据写入内存 和 本地磁盘。 - 如果
put()
返回false
,说明内存缓冲区已满,无法容纳新数据。 - 此时,会触发
flushWriteBuffer()
,将buffer数据刷到磁盘。 - 刷盘后,再次尝试写入。如果还是失败,说明单条记录的大小超过了整个内存缓冲区,抛出异常。
刷盘流程 (flushWriteBuffer
方法)
这是将内存数据持久化的核心。
// ... existing code ...
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
if (writeBuffer.size() > 0) {
// ...
// 1. 创建数据文件和Changelog文件的写入器
final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter = ...;
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
try {
// 2. 遍历内存缓冲区中的有序数据,应用合并逻辑,并同时写入数据文件和Changelog文件
writeBuffer.forEach(
keyComparator,
mergeFunction,
changelogWriter == null ? null : changelogWriter::write,
dataWriter::write);
} finally {
// 3. 清空内存缓冲区,关闭写入器
writeBuffer.clear();
// ... close writers ...
}
// 4. 收集新生成的文件元数据
List<DataFileMeta> dataMetas = dataWriter.result();
// ... handle changelog files ...
// 5. 将新文件信息添加到 newFiles 集合,并通知 compactManager
for (DataFileMeta dataMeta : dataMetas) {
newFiles.add(dataMeta);
compactManager.addNewFile(dataMeta);
}
}
// 6. 检查并触发新的Compaction
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
}
// ... existing code ...
- 创建
RollingFileWriter
,用于写入 Level-0 的数据文件,并根据配置决定是否创建 Changelog 文件写入器。 - 调用
writeBuffer.forEach()
,它会提供一个内存中排序好的数据迭代器。MergeTreeWriter
在遍历时,会应用mergeFunction
对数据进行最终合并,然后分别写入数据文件和 Changelog 文件。 - 操作完成后,清空内存缓冲区
writeBuffer
,使其可以接收新的数据。 - 从写入器中获取新生成文件的元数据
DataFileMeta
。 - 将这些元数据记录在
newFiles
集合中,并通知compactManager
有新的 Level-0 文件加入。 compactManager
会根据当前文件层级状态,决定是否需要触发一次新的 Compaction。
在 flushWriteBuffer
中创建的这两个 writer
,dataWriter
和 changelogWriter
,分别服务于两个完全不同的目的:一个用于写入表的状态数据,另一个用于生成流式读取的变更日志(Changelog)。
dataWriter
: 状态数据的写入器
- 用途:
dataWriter
的职责是创建正式的 Level-0 数据文件(SST 文件)。这些文件是表的核心组成部分,包含了经过排序和合并后的数据,代表了表在某个时间点的最新状态。 - 创建方式:
它调用的是// ... existing code ... final RollingFileWriter<KeyValue, DataFileMeta> dataWriter = writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); // ... existing code ...
createRollingMergeTreeFileWriter
。这个方法会生成带有data-
前缀的文件名,例如data-a9f8b8e8-....parquet
。这些文件最终会被CompactManager
管理,并参与后续的合并(Compaction)过程。
changelogWriter
: 变更日志的写入器
- 用途:
changelogWriter
的职责是创建变更日志文件。这些文件记录了本次提交中每一条原始的变更记录(比如INSERT
,UPDATE_BEFORE
,UPDATE_AFTER
,DELETE
)。这些文件专门用于支持下游的流式查询任务,让 Flink 等引擎可以像消费 Kafka 一样消费 Paimon 表的增量变化。 - 创建方式:
它调用的是// ... existing code ... final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter = (changelogProducer == ChangelogProducer.INPUT && !isInsertOnly) ? writerFactory.createRollingChangelogFileWriter(0) : null; // ... existing code ...
createRollingChangelogFileWriter
。这个方法会生成带有changelog-
前缀的文件名,例如changelog-b7c6a5e4-....parquet
。这些文件不会参与数据合并,它们只是作为本次提交产物的一部分,供流作业消费。
MergeTreeWriter 通过 KeyValue 中的 RowKind 来携带增、删、改的语义。在写数据时,它将这些带有语义的记录先放入缓冲区,然后在刷写时,将原始记录流写入 changelog 文件,将合并后的结果写入数据文件。这样既保证了数据文件的紧凑和高效查询,又通过 changelog 文件提供了完整的变更历史。
SortBufferWriteBuffer 的 forEach 接受 keyComparator
public void forEach(
Comparator<InternalRow> keyComparator,
MergeFunction<KeyValue> mergeFunction,
@Nullable KvConsumer rawConsumer,
KvConsumer mergedConsumer)
throws IOException {
// TODO do not use iterator
MergeIterator mergeIterator =
new MergeIterator(
rawConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
while (mergeIterator.hasNext()) {
mergedConsumer.accept(mergeIterator.next());
}
}
readOnce()时直接写入了 rawConsumer:
private boolean readOnce() throws IOException {
try {
currentRow = kvIter.next(currentRow);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (currentRow != null) {
current.fromRow(currentRow);
if (rawConsumer != null) {
rawConsumer.accept(current.getReusedKv());
}
}
return currentRow != null;
}
为什么一次写入需要两种 Writer?
这是由 Paimon 的 changelog-producer
(变更日志生成策略)配置决定的。
当设置 changelog-producer = 'input'
时,你告诉 Paimon:“我需要你把每次写入的原始输入数据作为变更日志给我”。
MergeTreeWriter
在执行 flushWriteBuffer
时,writeBuffer.forEach(...)
会遍历内存中所有待写入的原始数据。为了同时满足“更新表状态”和“生成变更日志”这两个需求,Paimon 采用了最高效的方式:一次遍历,两次写入。
// ... existing code ...
try {
writeBuffer.forEach(
keyComparator,
mergeFunction,
// 如果 changelogWriter 存在,就把数据喂给它
changelogWriter == null ? null : changelogWriter::write,
// 同时,也把数据喂给 dataWriter
dataWriter::write);
} finally {
// ... existing code ...
数据流从 writeBuffer
中出来后,就像一个分叉的水管,被同时送往了 dataWriter
和 changelogWriter
。这样就避免了对数据进行两次排序或两次从磁盘读取,用一次计算同时产出了两种不同用途的文件。
代码中还有一个针对 insert-only
场景的优化。如果 changelog-producer
是 input
且写入模式是 insert-only
,那么 changelogWriter
就不会被创建。
// ... existing code ...
List<DataFileMeta> dataMetas = dataWriter.result();
if (changelogWriter != null) {
newFilesChangelog.addAll(changelogWriter.result());
} else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) {
// 对于 insert-only,数据文件本身就是 changelog
// 所以直接复制数据文件来作为 changelog 文件,避免了重复写入
List<DataFileMeta> changelogMetas = new ArrayList<>();
for (DataFileMeta dataMeta : dataMetas) {
String newFileName = writerFactory.newChangelogFileName(0);
DataFileMeta changelogMeta = dataMeta.rename(newFileName);
writerFactory.copyFile(dataMeta, changelogMeta);
changelogMetas.add(changelogMeta);
}
newFilesChangelog.addAll(changelogMetas);
}
// ... existing code ...
在这种情况下,数据文件里的记录全是 INSERT
,它本身就是一份完美的变更日志。因此 Paimon 只需写入一次数据文件,然后通过成本极低的元数据操作(rename
)和文件复制(copyFile
)来生成 changelog 文件,从而避免了双份的写入开销。
总结
dataWriter
-> 写状态 -> 用于批查询和后续合并。changelogWriter
-> 写变更 -> 用于流式查询。
这两种 writer
的并存,是 Paimon 实现 Streaming & Batch Unification(流批一体)架构的关键设计,它使得同一份数据在写入时就能同时产出满足两种不同查询模式的物理文件。
提交准备流程 (prepareCommit
方法)
这是在 Flink Checkpoint 时被调用的关键方法。
// ... existing code ...
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
// 1. 确保内存中的数据全部刷盘
flushWriteBuffer(waitCompaction, false);
// ...
// 2. 同步等待可能正在进行的Compaction任务完成
trySyncLatestCompaction(waitCompaction);
// 3. 打包并返回所有文件变更
return drainIncrement();
}
// ... existing code ...
- 首先调用
flushWriteBuffer()
,确保内存中所有数据都被持久化。 - 调用
trySyncLatestCompaction()
,等待当前正在进行的 Compaction 任务结束,并将其结果(哪些文件被合并,生成了哪些新文件)更新到compactBefore
和compactAfter
集合中。 - 调用
drainIncrement()
,将newFiles
、compactBefore
、compactAfter
等集合中的文件元数据打包成一个CommitIncrement
对象返回。同时清空这些集合,为下一个 Checkpoint 做准备。
这个 CommitIncrement
对象最终会被上层的 Committer
用来生成 Manifest 文件和 Snapshot。
产生的元数据被怎么使用
newFiles.add(dataMeta)
和 compactManager.addNewFile(dataMeta)
这两行代码是新生成的数据文件(dataMeta
)生命周期的起点,它们分别将 dataMeta
送上了两条不同的但最终会汇合的处理路径:提交路径和合并路径。
newFiles
是一个LinkedHashSet
,它的作用是暂存当前批次写入产生的所有新数据文件。
可以把它理解成一个“待提交”清单。当一个事务(commit)准备完成时,MergeTreeWriter
需要告诉 Paimon 的 Table/Snapshot 管理器,这个事务到底产生了哪些新的变化。
这个过程发生在 prepareCommit
方法中,它会调用 drainIncrement
方法:
// ... existing code ...
private CommitIncrement drainIncrement() {
DataIncrement dataIncrement =
new DataIncrement(
new ArrayList<>(newFiles), // newFiles 在这里被使用
new ArrayList<>(deletedFiles),
new ArrayList<>(newFilesChangelog));
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore.values()),
new ArrayList<>(compactAfter),
new ArrayList<>(compactChangelog));
// ... existing code ...
newFiles.clear(); // 清空 newFiles,为下一批次做准备
deletedFiles.clear();
// ... existing code ...
return new CommitIncrement(dataIncrement, compactIncrement, drainDeletionFile);
}
// ... existing code ...
从 drainIncrement
方法可以看出:
newFiles
集合中的所有DataFileMeta
被打包进一个DataIncrement
对象。- 这个
DataIncrement
最终成为CommitIncrement
的一部分,被MergeTreeWriter
返回。 CommitIncrement
是一个非常重要的数据结构,它完整地描述了一次提交的所有文件变化(新增文件、合并前后文件等)。Paimon 的提交进程会把这个CommitIncrement
的内容记录到 Manifest 文件中,从而生成一个新的快照(Snapshot)。- 一旦
dataMeta
被打包并清空,它就完成了在newFiles
中的使命。
小结:newFiles
的职责是确保新文件能够被正确地记录到 Paimon 的快照中,使其对查询可见。
compactManager.addNewFile(dataMeta)
:进入“合并”路径
compactManager
是 Merge-Tree 的核心组件之一,负责管理 LSM 树的结构和触发 Compaction(合并)操作。
当 compactManager.addNewFile(dataMeta)
被调用时,意味着:
compactManager
得知一个新的 L0 层文件(dataMeta
)诞生了。compactManager
会根据其内部的合并策略(例如,当 L0 层的文数量达到某个阈值时)来决定是否要触发一次 Compaction。- 如果触发了 Compaction,
compactManager
会挑选一些文件(可能就包括我们刚加入的这个dataMeta
)作为输入,启动一个后台任务去执行合并。
合并完成后,compactManager
会产生一个 CompactResult
,这个结果包含了合并前的文件(before
)和合并后的新文件(after
)。MergeTreeWriter
会通过 trySyncLatestCompaction
方法获取这个结果,并调用 updateCompactResult
来处理它:
// ... existing code ...
private void updateCompactResult(CompactResult result) {
// ... existing code ...
for (DataFileMeta file : result.before()) {
// ...
// 将被合并掉的老文件加入 compactBefore 集合
compactBefore.put(file.fileName(), file);
// ...
}
// 将合并后产生的新文件加入 compactAfter 集合
compactAfter.addAll(result.after());
compactChangelog.addAll(result.changelog());
updateCompactDeletionFile(result.deletionFile());
}
// ... existing code ...
可以看到,dataMeta
在被合并后,它的身份就从一个“活跃的数据文件”变成了“待废弃的老文件”,并被记录在 compactBefore
集合中。同时,合并产生的新文件被记录在 compactAfter
集合里。
最终,compactBefore
和 compactAfter
这两个集合里的信息也会在 drainIncrement
方法中被打包进 CommitIncrement
,从而在下一次提交时,通知 Paimon 更新快照:标记 compactBefore
的文件为“已删除”,并添加 compactAfter
的文件为“新文件”。
小结:compactManager
的职责是利用 dataMeta
来维护 LSM 树的健康,通过合并来减少文件数量、消除冗余数据,并把文件从低层级(Level 0)推向高层级。
DataFileMeta
在被创建后,通过 newFiles.add
和 compactManager.addNewFile
被赋予了双重身份:
- 作为新数据:它通过
newFiles
->DataIncrement
->CommitIncrement
的路径,被记录到新的快照中,从而对用户可见。 - 作为合并候选者:它通过
compactManager
进入 LSM 树的管理体系,未来可能会被选中并合并成更大的文件。当它被合并后,它的状态变化(从被合并 -> 产生新文件)会通过CompactResult
->compactBefore/compactAfter
->CompactIncrement
->CommitIncrement
的路径,再次被记录到新的快照中。
updateCompactResult
trySyncLatestCompaction会获取compaction结果,调用这个函数:
private void trySyncLatestCompaction(boolean blocking) throws Exception {
Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
result.ifPresent(this::updateCompactResult);
}
updateCompactResult
方法是 MergeTreeWriter
类中用于处理压缩结果的核心方法,它在每次压缩操作完成后被调用,用于更新文件状态和清理不再需要的文件。
private void updateCompactResult(CompactResult result) {
Set<String> afterFiles =
result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
for (DataFileMeta file : result.before()) {
if (compactAfter.remove(file)) {
// This is an intermediate file (not a new data file), which is no longer needed
// after compaction and can be deleted directly, but upgrade file is required by
// previous snapshot and following snapshot, so we should ensure:
// 1. This file is not the output of upgraded.
// 2. This file is not the input of upgraded.
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
writerFactory.deleteFile(file);
}
} else {
compactBefore.put(file.fileName(), file);
}
}
compactAfter.addAll(result.after());
compactChangelog.addAll(result.changelog());
updateCompactDeletionFile(result.deletionFile());
}
删除逻辑是为了在合并树(MergeTree)的压缩(compaction)过程中,正确处理中间文件。当 compactAfter.remove(file)
返回 true
时,表示 file
是这次被压缩的文件(因此先移出之前的compactAfter
)。但是,为了确保数据的一致性和可恢复性,需要进行额外的检查:
!compactBefore.containsKey(file.fileName())
:检查该文件是否不是压缩操作的输入文件。如果是输入文件,则不应直接删除,因为可能需要保留其历史版本或用于回滚。!afterFiles.contains(file.fileName())
:检查该文件是否不是压缩操作的输出文件。如果是输出文件,则不应删除,因为它是新生成的数据文件。
只有当一个文件既不是压缩的输入文件,也不是压缩的输出文件,并且在压缩后不再需要时,writerFactory.deleteFile(file)
才会执行,将其安全删除。这确保了只有真正的中间文件才会被删除,而不会影响到历史快照或未来的数据恢复。
启动独立Compaction任务 能不能取消写入任务的Compaction
可以通过设置 write-only
配置项为 true
来取消写入任务的 compaction。这将跳过 compaction 和快照过期操作。这个选项通常与专用的 compaction 作业一起使用。
可以在创建表或提交作业时设置该参数,例如:
tableConf.set(CoreOptions.WRITE_ONLY, true);
或者在 SQL 中设置:
SET 'write-only' = 'true';
相关的代码在 CoreOptions.java
中定义:
public static final ConfigOption<Boolean> WRITE_ONLY =
key("write-only")
.booleanType()
.defaultValue(false)
.withFallbackKeys("write.compaction-skip")
.withDescription(
"If set to true, compactions and snapshot expiration will be skipped. "
+ "This option is used along with dedicated compact jobs.");
并且在 BucketedAppendFileStoreWrite.java
中使用:
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
// ...
}
KeyValueFileStoreWrite 也同样进行了配置
private CompactManager createCompactManager(
BinaryRow partition,
int bucket,
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();
CompactRewriter rewriter =
createRewriter(
partition,
bucket,
keyComparator,
userDefinedSeqComparator,
levels,
dvMaintainer);
return new MergeTreeCompactManager(
compactExecutor,
levels,
compactStrategy,
keyComparator,
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
compactionMetrics == null
? null
: compactionMetrics.createReporter(partition, bucket),
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
options.forceRewriteAllFiles());
}
}
NoopCompactManager
是一个不执行任何 compaction 操作的管理器。
更多推荐
所有评论(0)