Paimon 表定时 Compact 数据流程与逻辑详解
摘要 Apache Paimon 的 Compact 机制是流式数据湖存储系统的核心功能,主要用于优化文件管理、空间利用和查询性能。本文详细解析了其架构设计、触发机制和执行流程: 架构核心:包含 CompactManager、CompactStrategy 和 CompactTask 三大组件,分别负责管理、策略选择和执行合并任务 触发机制:支持内联Compact(写入时触发)、定时Compact
Paimon 表定时 Compact 数据流程与逻辑详解
目录
- 一、概述
- 二、Compact 核心架构
- 三、Compact 触发机制
- 四、Append 表 Compact 流程详解
- 五、主键表 Compact 流程详解
- 六、UniversalCompaction 策略深度解析
- 七、定时调度机制实现
- 八、关键配置项详解
- 九、源码核心类总结
- 十、最佳实践与优化建议
一、概述
Apache Paimon 是一个流式数据湖存储系统,采用 LSM-Tree(Log-Structured Merge-Tree)架构来管理数据文件。Compact(压缩)是 Paimon 中至关重要的后台操作,用于:
- 减少文件数量:合并小文件,降低读取时的文件扫描开销
- 减少空间放大:删除过期数据,回收存储空间
- 优化查询性能:减少 Level 0 文件数,加快数据检索
- 生成 Changelog:通过 Compact 生成变更日志
- 维护删除向量:对于 MOW(Merge-on-Write)模式,维护 Deletion Vectors
本文将深入分析 Paimon 表的定时 Compact 机制,包括触发时机、调度机制、策略选择和执行流程,并结合源码进行详细解析。
二、Compact 核心架构
2.1 整体架构图
2.2 核心组件
2.2.1 CompactManager
CompactManager 是 Compact 的管理中心,负责:
- 维护文件的层级结构(Levels)
- 判断是否需要触发 Compact
- 调用策略选择需要合并的文件
- 提交异步 Compact 任务到线程池
主键表实现:MergeTreeCompactManager
Append 表实现:BucketedAppendCompactManager
// paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
public interface CompactManager {
// 判断是否应该等待 Compact 完成
boolean shouldWaitForLatestCompaction();
// 添加新文件到 Level 0
void addNewFile(DataFileMeta file);
// 触发 Compact
void triggerCompaction(boolean fullCompaction);
// 获取 Compact 结果
Optional<CompactResult> getCompactionResult(boolean blocking);
}
2.2.2 CompactStrategy
CompactStrategy 负责文件选择策略,决定哪些文件需要合并:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
public interface CompactStrategy {
/**
* 从 runs 中选择需要 compact 的单元
* - compaction 是基于 runs 的,不是基于文件的
* - level 0 是特殊的,一个文件对应一个 run
* - 其他 level 是一个 level 对应一个 run
* - compaction 从小 level 到大 level 依次进行
*/
Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
}
2.2.3 CompactTask
CompactTask 是实际执行合并的任务单元:
// paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
public abstract class CompactTask implements Callable<CompactResult> {
@Override
public CompactResult call() throws Exception {
long startMillis = System.currentTimeMillis();
CompactResult result = doCompact();
// 记录指标
if (metricsReporter != null) {
metricsReporter.reportCompactionTime(
System.currentTimeMillis() - startMillis);
metricsReporter.increaseCompactionsCompletedCount();
}
return result;
}
protected abstract CompactResult doCompact() throws Exception;
}
三、Compact 触发机制
Paimon 支持三种 Compact 触发方式:
3.1 内联 Compact(Inline Compaction)
触发时机:数据写入过程中自动触发
核心代码:MergeTreeWriter.prepareCommit()
// paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java:209-248
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
if (writeBuffer.size() > 0) {
// 判断是否需要等待 Compact 完成
if (compactManager.shouldWaitForLatestCompaction()) {
waitForLatestCompaction = true;
}
// 刷写 buffer 到文件
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
// ... 写入逻辑 ...
// 将新文件添加到 CompactManager
for (DataFileMeta fileMeta : dataWriter.result()) {
newFiles.add(fileMeta);
compactManager.addNewFile(fileMeta);
}
}
// 等待上一次 Compact 完成
trySyncLatestCompaction(waitForLatestCompaction);
// 触发新的 Compact
compactManager.triggerCompaction(forcedFullCompaction);
}
判断是否需要等待 Compact:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:105-113
@Override
public boolean shouldWaitForLatestCompaction() {
// 当 SortedRun 数量超过阈值时,阻塞写入等待 Compact
return levels.numberOfSortedRuns() > numSortedRunStopTrigger;
}
@Override
public boolean shouldWaitForPreparingCheckpoint() {
// 在 checkpoint 准备阶段,如果 runs 数量超过阈值+1,也要等待
return levels.numberOfSortedRuns() > (long) numSortedRunStopTrigger + 1;
}
关键配置:
num-sorted-run.stop-trigger:默认Integer.MAX_VALUE,当 SortedRun 数量超过此值时阻塞写入
3.2 定时 Compact Job
触发时机:独立的 Compact 作业,定时扫描表快照生成 Compact 任务
适用场景:
- 多个写作业同时写入同一张表
- 需要将写入和 Compact 资源隔离
- 大规模数据写入场景
配置方式:
-- 在写表时禁用 Compact
CREATE TABLE my_table (
...
) WITH (
'write-only' = 'true' -- 跳过 Compact 和 Snapshot 过期
);
-- 启动专门的 Compact 作业
CALL sys.compact('default.my_table');
或使用 Action Jar:
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action.jar \
compact \
--warehouse hdfs:///path/to/warehouse \
--database default \
--table my_table
定时间隔配置:
-- 设置扫描间隔为 30 秒
CREATE TABLE my_table (...) WITH (
'continuous.discovery-interval' = '30s'
);
3.3 手动触发 Compact
Flink SQL 方式:
-- 对特定分区进行 Compact
CALL sys.compact(
`table` => 'default.my_table',
`partitions` => 'dt=2024-01-01',
`options` => 'sink.parallelism=10'
);
-- Full Compaction
CALL sys.compact(
`table` => 'default.my_table',
`compact_strategy` => 'full'
);
Spark 方式:
val table = spark.table("default.my_table")
table.createOrReplaceTempView("tmp")
spark.sql("CALL paimon.sys.compact(table => 'tmp')")
四、Append 表 Compact 流程详解
4.1 整体流程图
4.2 核心类:AppendCompactCoordinator
AppendCompactCoordinator 是 Append 表的 Compact 协调器,负责扫描快照、过滤文件、生成任务。
类定义:
// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:70-109
public class AppendCompactCoordinator {
private static final int FILES_BATCH = 100_000; // 批量处理文件数
protected static final int REMOVE_AGE = 10; // 移除年龄阈值
protected static final int COMPACT_AGE = 5; // 强制Compact年龄
private final SnapshotManager snapshotManager;
private final long targetFileSize; // 目标文件大小
private final long compactionFileSize; // Compact阈值
private final double deleteThreshold; // 删除率阈值
private final int minFileNum; // 最小文件数
// 按分区组织的子协调器
final Map<BinaryRow, SubCoordinator> subCoordinators = new HashMap<>();
public List<AppendCompactTask> run() {
// 扫描快照中的文件
if (scan()) {
// 生成 Compact 任务
return compactPlan();
}
return Collections.emptyList();
}
}
4.3 文件扫描逻辑
scan() 方法:
// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:122-148
boolean scan() {
Map<BinaryRow, List<DataFileMeta>> files = new HashMap<>();
// 批量读取文件(每次最多 100,000 个)
for (int i = 0; i < FILES_BATCH; i++) {
ManifestEntry entry;
try {
entry = filesIterator.next();
} catch (EndOfScanException e) {
if (!files.isEmpty()) {
files.forEach(this::notifyNewFiles);
return true;
}
throw e;
}
if (entry == null) {
break;
}
BinaryRow partition = entry.partition();
files.computeIfAbsent(partition, k -> new ArrayList<>()).add(entry.file());
}
if (files.isEmpty()) {
return false;
}
files.forEach(this::notifyNewFiles);
return true;
}
文件过滤条件:
// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:467-482
private boolean shouldCompact(BinaryRow partition, DataFileMeta file) {
// 条件1: 文件大小小于 compaction.file-size
if (file.fileSize() < compactionFileSize) {
return true;
}
// 条件2: 删除率超过阈值(启用 Deletion Vector 时)
return tooHighDeleteRatio(partition, file);
}
private boolean tooHighDeleteRatio(BinaryRow partition, DataFileMeta file) {
if (dvMaintainerCache != null) {
DeletionFile deletionFile =
dvMaintainerCache.dvMaintainer(partition).getDeletionFile(file.fileName());
if (deletionFile != null) {
Long cardinality = deletionFile.cardinality();
long rowCount = file.rowCount();
return cardinality == null || cardinality > rowCount * deleteThreshold;
}
}
return false;
}
4.4 文件打包策略:SubCoordinator
每个分区有一个 SubCoordinator,负责该分区的文件打包和任务生成。
打包逻辑:
// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:233-274
private List<List<DataFileMeta>> agePack() {
List<List<DataFileMeta>> packed;
if (dvMaintainerCache == null) {
// 普通打包模式
packed = pack(toCompact);
} else {
// Deletion Vector 模式:按删除文件分组
packed = packInDeletionVectorVMode(toCompact);
}
if (packed.isEmpty()) {
// 如果没有可打包的,增加年龄
if (++age > COMPACT_AGE && toCompact.size() > 1) {
// 年龄超过阈值,强制打包所有文件
List<DataFileMeta> all = new ArrayList<>(toCompact);
toCompact.clear();
packed = Collections.singletonList(all);
}
}
return packed;
}
private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
// 按文件大小排序
ArrayList<DataFileMeta> files = new ArrayList<>(toCompact);
files.sort(Comparator.comparingLong(DataFileMeta::fileSize));
List<List<DataFileMeta>> result = new ArrayList<>();
FileBin fileBin = new FileBin();
for (DataFileMeta fileMeta : files) {
fileBin.addFile(fileMeta);
if (fileBin.enoughContent()) {
// 满足打包条件:文件数>1 且 总大小 >= 2*targetFileSize
result.add(fileBin.drain());
}
}
if (fileBin.enoughInputFiles()) {
// 文件数满足最小要求
result.add(fileBin.drain());
}
return result;
}
FileBin 打包单元:
// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:324-352
private class FileBin {
List<DataFileMeta> bin = new ArrayList<>();
long totalFileSize = 0;
public void addFile(DataFileMeta file) {
totalFileSize += file.fileSize() + openFileCost;
bin.add(file);
}
private boolean enoughContent() {
// 文件数 > 1 且 总大小 >= 2 倍 targetFileSize
return bin.size() > 1 && totalFileSize >= targetFileSize * 2;
}
private boolean enoughInputFiles() {
// 文件数 >= minFileNum
return bin.size() >= minFileNum;
}
}
4.5 任务执行:AppendCompactTask
任务定义:
// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java:44-67
public class AppendCompactTask {
private final BinaryRow partition;
private final List<DataFileMeta> compactBefore; // 待合并文件
private final List<DataFileMeta> compactAfter; // 合并后文件
public AppendCompactTask(BinaryRow partition, List<DataFileMeta> files) {
this.partition = partition;
compactBefore = new ArrayList<>(files);
compactAfter = new ArrayList<>();
}
public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite write)
throws Exception {
// 执行文件合并
// ... 合并逻辑 ...
}
}
4.6 Bucketed Append 表的 Compact
对于有 Bucket 的 Append 表,使用 BucketedAppendCompactManager:
文件选择逻辑:
// paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java:201-227
Optional<List<DataFileMeta>> pickCompactBefore() {
if (toCompact.isEmpty()) {
return Optional.empty();
}
long totalFileSize = 0L;
int fileNum = 0;
LinkedList<DataFileMeta> candidates = new LinkedList<>();
// 使用优先队列按序号排序文件
while (!toCompact.isEmpty()) {
DataFileMeta file = toCompact.poll();
candidates.add(file);
totalFileSize += file.fileSize();
fileNum++;
if (fileNum >= minFileNum) {
// 满足最小文件数要求
return Optional.of(candidates);
} else if (totalFileSize >= targetFileSize) {
// 总大小超过目标,移除第一个文件继续
DataFileMeta removed = candidates.pollFirst();
totalFileSize -= removed.fileSize();
fileNum--;
}
}
toCompact.addAll(candidates);
return Optional.empty();
}
五、主键表 Compact 流程详解
5.1 LSM-Tree 架构
主键表采用 LSM-Tree(Log-Structured Merge-Tree)架构,数据文件分为多个 Level:
Level 0: [File1] [File2] [File3] ... (可能有重叠)
↓ Compact
Level 1: [File_L1_1 | File_L1_2 | File_L1_3] (有序,无重叠)
↓ Compact
Level 2: [File_L2_1 --------- | File_L2_2 ---------]
↓ Compact
Level N: [File_LN_1 -------------------------------]
Level 特点:
- Level 0:新写入的文件,文件之间可能有 Key 重叠,一个文件对应一个 SortedRun
- Level 1+:合并后的文件,同一 Level 内文件按 Key 有序且不重叠,一个 Level 对应一个 SortedRun
5.2 核心类:MergeTreeCompactManager
类定义:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:53-102
public class MergeTreeCompactManager extends CompactFutureManager {
private final ExecutorService executor;
private final Levels levels; // LSM-Tree 层级结构
private final CompactStrategy strategy; // Compact 策略
private final Comparator<InternalRow> keyComparator;
private final long compactionFileSize;
private final int numSortedRunStopTrigger;
private final CompactRewriter rewriter;
@Nullable private final CompactionMetrics.Reporter metricsReporter;
@Nullable private final BucketedDvMaintainer dvMaintainer;
@Nullable private final RecordLevelExpire recordLevelExpire;
// ... 构造方法 ...
}
5.3 文件添加与触发 Compact
添加新文件到 Level 0:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:116-124
@Override
public void addNewFile(DataFileMeta file) {
levels.addLevel0File(file);
MetricUtils.safeCall(this::reportMetrics, LOG);
}
触发 Compact 逻辑:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:127-194
@Override
public void triggerCompaction(boolean fullCompaction) {
Optional<CompactUnit> optionalUnit;
List<LevelSortedRun> runs = levels.levelSortedRuns();
if (fullCompaction) {
// 强制全量 Compact
Preconditions.checkState(taskFuture == null,
"A compaction task is still running while forcing a new compaction.");
optionalUnit = CompactStrategy.pickFullCompaction(
levels.numberOfLevels(),
runs,
recordLevelExpire,
dvMaintainer,
forceRewriteAllFiles);
} else {
// 正常 Compact
if (taskFuture != null) {
return; // 已有任务在运行
}
// 使用策略选择文件
optionalUnit = strategy.pick(levels.numberOfLevels(), runs)
.filter(unit -> !unit.files().isEmpty())
.filter(unit ->
unit.files().size() > 1
|| unit.files().get(0).level() != unit.outputLevel());
}
optionalUnit.ifPresent(unit -> {
// 判断是否可以删除旧记录
boolean dropDelete = unit.outputLevel() != 0
&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()
|| dvMaintainer != null);
submitCompaction(unit, dropDelete);
});
}
5.4 Compact 任务提交
submitCompaction() 方法:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:201-245
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
CompactTask task;
if (unit.fileRewrite()) {
// 文件级别重写(仅修改元数据)
task = new FileRewriteCompactTask(rewriter, unit, dropDelete, metricsReporter);
} else {
// 正常 Compact(读取合并数据)
task = new MergeTreeCompactTask(
keyComparator,
compactionFileSize,
rewriter,
unit,
dropDelete,
levels.maxLevel(),
metricsReporter,
compactDfSupplier,
recordLevelExpire,
forceRewriteAllFiles);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Pick these files (name, level, size) for {} compaction: {}",
task.getClass().getSimpleName(),
unit.files().stream()
.map(file -> String.format("(%s, %d, %d)",
file.fileName(), file.level(), file.fileSize()))
.collect(Collectors.joining(", ")));
}
// 提交到线程池
taskFuture = executor.submit(task);
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
metricsReporter.increaseCompactionsTotalCount();
}
}
5.5 MergeTreeCompactTask 执行逻辑
doCompact() 方法:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java:82-113
@Override
protected CompactResult doCompact() throws Exception {
List<List<SortedRun>> candidate = new ArrayList<>();
CompactResult result = new CompactResult();
// 遍历分区(IntervalPartition 将重叠文件分组)
for (List<SortedRun> section : partitioned) {
if (section.size() > 1) {
// 有多个 Run,需要合并
candidate.add(section);
} else {
SortedRun run = section.get(0);
// 单个 Run:根据文件大小决定是否需要重写
for (DataFileMeta file : run.files()) {
if (file.fileSize() < minFileSize) {
// 小文件:需要重写
candidate.add(singletonList(SortedRun.fromSingle(file)));
} else {
// 大文件:先重写之前的候选,然后升级当前文件
rewrite(candidate, result);
upgrade(file, result);
}
}
}
}
// 重写剩余的候选文件
rewrite(candidate, result);
result.setDeletionFile(compactDfSupplier.get());
return result;
}
upgrade() 方法(文件升级):
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java:123-138
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
// 需要重写的情况:
// 1. 输出到最高层且包含删除记录
// 2. 强制重写所有文件
// 3. 包含过期记录
if ((outputLevel == maxLevel && containsDeleteRecords(file))
|| forceRewriteAllFiles
|| containsExpiredRecords(file)) {
List<List<SortedRun>> candidate = new ArrayList<>();
candidate.add(singletonList(SortedRun.fromSingle(file)));
rewriteImpl(candidate, toUpdate);
return;
}
// 仅修改文件的 level 元数据(无需重写数据)
if (file.level() != outputLevel) {
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
toUpdate.merge(upgradeResult);
upgradeFilesNum++;
}
}
5.6 IntervalPartition 算法
IntervalPartition 用于将多个数据文件划分为最少数量的 SortedRun,处理文件间的 Key 重叠:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java:32-91
public class IntervalPartition {
private final List<DataFileMeta> files;
private final Comparator<InternalRow> keyComparator;
/**
* 返回二维列表:
* - 外层列表:sections(不同 section 的 Key 区间不重叠)
* - 内层列表:SortedRuns(同一 section 内的多个 Run)
*/
public List<List<SortedRun>> partition() {
List<List<SortedRun>> result = new ArrayList<>();
List<DataFileMeta> section = new ArrayList<>();
InternalRow bound = null;
// 按 minKey 排序
files.sort((o1, o2) -> {
int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
return leftResult == 0
? keyComparator.compare(o1.maxKey(), o2.maxKey())
: leftResult;
});
for (DataFileMeta meta : files) {
// 如果当前文件的 minKey > 上一个 section 的 bound
// 说明没有重叠,开始新的 section
if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
result.add(partition(section));
section.clear();
bound = null;
}
section.add(meta);
// 更新 bound 为当前 section 的最大 maxKey
if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
bound = meta.maxKey();
}
}
if (!section.isEmpty()) {
result.add(partition(section));
}
return result;
}
}
5.7 Levels 结构维护
Levels 类管理 LSM-Tree 的层级结构:
// paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java:38-98
public class Levels {
private final Comparator<InternalRow> keyComparator;
private final TreeSet<DataFileMeta> level0; // Level 0 文件(按序号排序)
private final List<SortedRun> levels; // Level 1+ 文件
public void update(List<DataFileMeta> before, List<DataFileMeta> after) {
// 按 Level 分组
Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);
// 更新每个 Level
for (int i = 0; i < numberOfLevels(); i++) {
updateLevel(i,
groupedBefore.getOrDefault(i, emptyList()),
groupedAfter.getOrDefault(i, emptyList()));
}
}
private void updateLevel(int level, List<DataFileMeta> before, List<DataFileMeta> after) {
if (level == 0) {
// Level 0: 从 TreeSet 中移除旧文件,添加新文件
before.forEach(level0::remove);
level0.addAll(after);
} else {
// Level 1+: 从 SortedRun 中移除旧文件,添加新文件,重新排序
List<DataFileMeta> files = new ArrayList<>(runOfLevel(level).files());
files.removeAll(before);
files.addAll(after);
levels.set(level - 1, SortedRun.fromUnsorted(files, keyComparator));
}
}
}
六、UniversalCompaction 策略深度解析
6.1 策略概述
UniversalCompaction 是 Paimon 的核心 Compact 策略,源自 RocksDB 的 Universal Compaction。
设计目标:
- 降低写放大(Write Amplification)
- 控制空间放大(Space Amplification)
- 平衡读放大(Read Amplification)
6.2 策略参数
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:42-64
public class UniversalCompaction implements CompactStrategy {
private final int maxSizeAmp; // 最大空间放大百分比
private final int sizeRatio; // 大小比例阈值
private final int numRunCompactionTrigger; // 触发 Compact 的 Run 数量
@Nullable private final FullCompactTrigger fullCompactTrigger;
@Nullable private final OffPeakHours offPeakHours;
}
配置项对应:
| 参数 | 配置键 | 默认值 | 说明 |
|---|---|---|---|
maxSizeAmp |
compaction.max-size-amplification-percent |
200 | 最大空间放大百分比 |
sizeRatio |
compaction.size-ratio |
1 | 大小比例阈值 |
numRunCompactionTrigger |
num-sorted-run.compaction-trigger |
5 | 触发 Compact 的 Run 数 |
6.3 策略决策流程
pick() 方法:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:67-107
@Override
public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
int maxLevel = numLevels - 1;
// 【优先级0】尝试定时全量 Compact
if (fullCompactTrigger != null) {
Optional<CompactUnit> unit = fullCompactTrigger.tryFullCompact(numLevels, runs);
if (unit.isPresent()) {
return unit;
}
}
// 【优先级1】检查空间放大
CompactUnit unit = pickForSizeAmp(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Universal compaction due to size amplification");
}
return Optional.of(unit);
}
// 【优先级2】检查大小比例
unit = pickForSizeRatio(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Universal compaction due to size ratio");
}
return Optional.of(unit);
}
// 【优先级3】检查文件数量
if (runs.size() > numRunCompactionTrigger) {
// 超过阈值,触发 Compact
int candidateCount = runs.size() - numRunCompactionTrigger + 1;
if (LOG.isDebugEnabled()) {
LOG.debug("Universal compaction due to file num");
}
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
}
return Optional.empty();
}
6.4 详细策略说明
6.4.1 定时全量 Compact(FullCompactTrigger)
触发条件:
- 距离上次全量 Compact 的时间间隔超过
compaction.optimization-interval - 文件总大小小于
compaction.total-size-threshold
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java:64-88
public Optional<CompactUnit> tryFullCompact(int numLevels, List<LevelSortedRun> runs) {
if (runs.size() == 1) {
return Optional.empty();
}
int maxLevel = numLevels - 1;
// 条件1: 时间间隔触发
if (fullCompactionInterval != null) {
if (lastFullCompaction == null
|| currentTimeMillis() - lastFullCompaction > fullCompactionInterval) {
LOG.debug("Universal compaction due to full compaction interval");
updateLastFullCompaction();
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
}
}
// 条件2: 总大小触发
if (totalSizeThreshold != null) {
long totalSize = 0;
for (LevelSortedRun run : runs) {
totalSize += run.run().totalSize();
}
if (totalSize < totalSizeThreshold) {
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
}
}
return Optional.empty();
}
配置示例:
CREATE TABLE my_table (...) WITH (
'compaction.optimization-interval' = '1 h', -- 每小时全量 Compact
'compaction.total-size-threshold' = '10 GB' -- 总大小小于10GB时全量 Compact
);
6.4.2 空间放大检查(pickForSizeAmp)
空间放大定义:额外空间占比 = (所有文件大小 - 最早文件大小) / 最早文件大小 × 100%
触发条件:当空间放大超过 maxSizeAmp 时,触发全量 Compact
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:125-147
CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
if (runs.size() < numRunCompactionTrigger) {
return null;
}
// 计算候选文件总大小(除了最早的文件)
long candidateSize =
runs.subList(0, runs.size() - 1).stream()
.map(LevelSortedRun::run)
.mapToLong(SortedRun::totalSize)
.sum();
// 最早文件的大小
long earliestRunSize = runs.get(runs.size() - 1).run().totalSize();
// 空间放大 = candidateSize * 100 / earliestRunSize
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
if (fullCompactTrigger != null) {
fullCompactTrigger.updateLastFullCompaction();
}
// 触发全量 Compact
return CompactUnit.fromLevelRuns(maxLevel, runs);
}
return null;
}
示例:
假设 maxSizeAmp = 200,有以下文件:
- Run 0: 10 MB
- Run 1: 20 MB
- Run 2: 30 MB
- Run 3: 100 MB (最早)
空间放大 = (10 + 20 + 30) × 100 / 100 = 60% < 200%,不触发
如果 Run 3 只有 20 MB:
空间放大 = 60 × 100 / 20 = 300% > 200%,触发全量 Compact
6.4.3 大小比例检查(pickForSizeRatio)
触发条件:当较小的 Run 的总大小与较大 Run 的比例超过阈值时,触发 Compact
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:163-182
public CompactUnit pickForSizeRatio(
int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {
long candidateSize = candidateSize(runs, candidateCount);
// 从 candidateCount 开始,向后扩展
for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
// 计算比例:(candidateSize * (100 + sizeRatio + offPeakRatio)) / 100
if (candidateSize * (100.0 + sizeRatio + ratioForOffPeak()) / 100.0
< next.run().totalSize()) {
// 下一个文件太大,停止扩展
break;
}
// 继续包含下一个文件
candidateSize += next.run().totalSize();
candidateCount++;
}
if (forcePick || candidateCount > 1) {
return createUnit(runs, maxLevel, candidateCount);
}
return null;
}
示例:
假设 sizeRatio = 1(即 100%),有以下 Runs:
- Run 0: 10 MB
- Run 1: 15 MB
- Run 2: 40 MB
- Run 3: 100 MB
从 Run 0 开始:
- candidateSize = 10 MB
- 判断:10 × (100 + 100) / 100 = 20 MB < 15 MB?否,包含 Run 1
- candidateSize = 25 MB
- 判断:25 × 200 / 100 = 50 MB < 40 MB?否,包含 Run 2
- candidateSize = 65 MB
- 判断:65 × 200 / 100 = 130 MB < 100 MB?否,包含 Run 3
- candidateSize = 165 MB
- 没有更多 Run,返回所有 4 个 Runs 的 CompactUnit
6.4.4 文件数量检查
触发条件:当 Run 数量超过 numRunCompactionTrigger 时,触发 Compact
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:96-104
if (runs.size() > numRunCompactionTrigger) {
// 超过阈值,触发 Compact
int candidateCount = runs.size() - numRunCompactionTrigger + 1;
if (LOG.isDebugEnabled()) {
LOG.debug("Universal compaction due to file num");
}
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
}
示例:
假设 numRunCompactionTrigger = 5,当前有 8 个 Runs:
- candidateCount = 8 - 5 + 1 = 4
- 调用
pickForSizeRatio(maxLevel, runs, 4)选择至少 4 个 Runs 进行 Compact
6.5 输出 Level 的确定
createUnit() 方法:
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:197-226
CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
int outputLevel;
if (runCount == runs.size()) {
// 合并所有 Runs,输出到最高层
outputLevel = maxLevel;
} else {
// 输出到下一个 Run 的 level - 1
outputLevel = Math.max(0, runs.get(runCount).level() - 1);
}
if (outputLevel == 0) {
// 不输出到 Level 0,向后扩展
for (int i = runCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
runCount++;
if (next.level() != 0) {
outputLevel = next.level();
break;
}
}
}
if (runCount == runs.size()) {
if (fullCompactTrigger != null) {
fullCompactTrigger.updateLastFullCompaction();
}
outputLevel = maxLevel;
}
return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));
}
6.6 Off-Peak Hours 优化
定义:在非高峰时段使用更激进的 Compact 策略
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:184-186
private int ratioForOffPeak() {
return offPeakHours == null ? 0 : offPeakHours.currentRatio(LocalDateTime.now().getHour());
}
配置示例:
CREATE TABLE my_table (...) WITH (
'compaction.offpeak.start.hour' = '2', -- 凌晨2点开始
'compaction.offpeak.end.hour' = '6', -- 早上6点结束
'compaction.offpeak-ratio' = '20' -- 非高峰期使用20%的比例
);
在非高峰时段(2:00-6:00),sizeRatio 会加上 offpeak-ratio,使 Compact 更容易触发。
七、定时调度机制实现
7.1 Flink 流式 Compact 作业架构
7.2 Append 表定时调度
7.2.1 AppendBypassCoordinateOperator
核心类:AppendBypassCoordinateOperator 是 Flink 的一个 OneInputStreamOperator,负责定时生成 Compact 任务。
// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:49-86
public class AppendBypassCoordinateOperator<CommitT>
extends AbstractStreamOperator<Either<CommitT, AppendCompactTask>>
implements OneInputStreamOperator<CommitT, Either<CommitT, AppendCompactTask>>,
ProcessingTimeCallback {
private static final long MAX_PENDING_TASKS = 5000;
private final FileStoreTable table;
private transient ScheduledExecutorService executorService;
private transient LinkedBlockingQueue<AppendCompactTask> compactTasks;
@Override
public void open() throws Exception {
super.open();
// 获取扫描间隔
long intervalMs = table.coreOptions().continuousDiscoveryInterval().toMillis();
// 初始化任务队列
this.compactTasks = new LinkedBlockingQueue<>();
// 创建协调器
AppendCompactCoordinator coordinator = new AppendCompactCoordinator(table, true, null);
// 创建定时执行器
this.executorService =
Executors.newSingleThreadScheduledExecutor(
newDaemonThreadFactory("Compaction Coordinator"));
// 定时异步生成任务
this.executorService.scheduleWithFixedDelay(
() -> asyncPlan(coordinator), 0, intervalMs, TimeUnit.MILLISECONDS);
// 定时发送任务到下游
this.getProcessingTimeService().scheduleWithFixedDelay(this, 0, intervalMs);
}
}
异步生成任务:
// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:88-102
private void asyncPlan(AppendCompactCoordinator coordinator) {
// 持续生成任务,直到队列满或没有新任务
while (compactTasks.size() < MAX_PENDING_TASKS) {
try {
List<AppendCompactTask> tasks = coordinator.run();
compactTasks.addAll(tasks);
if (tasks.isEmpty()) {
break;
}
} catch (Throwable t) {
LOG.error("Fatal exception happened when generating compaction tasks.", t);
this.throwable = t;
break;
}
}
}
发送任务到下游:
// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:104-113
@Override
public void onProcessingTime(long time) {
// 从队列中取出任务,发送到下游
while (true) {
AppendCompactTask task = compactTasks.poll();
if (task == null) {
return;
}
output.collect(new StreamRecord<>(Either.Right(task)));
}
}
7.2.2 Flink Job 构建
AppendTableCompact 类:
// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java:87-128
public void build() {
// 构建 Source
DataStreamSource<AppendCompactTask> source = buildSource();
// 如果配置了分区空闲时间,过滤任务
if (!isContinuous && partitionIdleTime != null) {
Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
long historyMilli =
LocalDateTime.now()
.minus(partitionIdleTime)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
SingleOutputStreamOperator<AppendCompactTask> filterStream =
source.filter(task -> {
BinaryRow partition = task.partition();
return partitionInfo.get(partition) <= historyMilli;
});
source = new DataStreamSource<>(filterStream);
}
// 构建完整的 Flink 作业
sinkFromSource(source);
}
private DataStreamSource<AppendCompactTask> buildSource() {
// 获取扫描间隔
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
// 创建 Source
AppendTableCompactSource source =
new AppendTableCompactSource(table, isContinuous, scanInterval, partitionPredicate);
return AppendTableCompactSource.buildSource(env, source, tableIdentifier);
}
7.3 主键表定时调度
7.3.1 CompactorSource
核心类:CompactorSource 定时扫描表快照,发现需要 Compact 的 Bucket。
工作流程:
- 定时扫描表快照(间隔由
continuous.discovery-interval控制) - 比较当前快照与上次快照,发现新增/修改的文件
- 找出需要 Compact 的 partition-bucket 组合
- 生成 Compact 记录发送到下游
关键配置:
// paimon-api/src/main/java/org/apache/paimon/CoreOptions.java:464-468
public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
key("continuous.discovery-interval")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription("The discovery interval of continuous reading.");
7.3.2 StoreCompactOperator
核心类:StoreCompactOperator 接收 Compact 任务并执行。
// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java:133-180
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
RowData record = element.getValue();
long snapshotId = record.getLong(0);
BinaryRow partition = deserializeBinaryRow(record.getBinary(1));
int bucket = record.getInt(2);
byte[] serializedFiles = record.getBinary(3);
List<DataFileMeta> files = dataFileMetaSerializer.deserializeList(serializedFiles);
if (write.streamingMode()) {
// 流式模式:通知新文件
write.notifyNewFiles(snapshotId, partition, bucket, files);
}
// 记录需要 Compact 的 partition-bucket
waitToCompact.add(Pair.of(partition, bucket));
}
@Override
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {
try {
// 执行所有待 Compact 的 partition-bucket
for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
write.compact(partitionBucket.getKey(), partitionBucket.getRight(), fullCompaction);
}
} catch (Exception e) {
throw new RuntimeException("Exception happens while executing compaction.", e);
}
waitToCompact.clear();
// 准备提交
List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
return committables;
}
7.4 调度流程图
八、关键配置项详解
8.1 定时调度配置
| 配置项 | 默认值 | 类型 | 说明 |
|---|---|---|---|
continuous.discovery-interval |
10s | Duration | Compact 任务的扫描间隔,控制多久检查一次是否有新文件需要 Compact |
调优建议:
- 高频写入场景:缩短间隔(如 5s),及时发现需要 Compact 的文件
- 低频写入场景:延长间隔(如 30s-1min),减少不必要的扫描开销
8.2 Compact 策略配置
| 配置项 | 默认值 | 类型 | 说明 |
|---|---|---|---|
num-sorted-run.compaction-trigger |
5 | Integer | 触发 Compact 的 Run 数量阈值 |
num-sorted-run.stop-trigger |
Integer.MAX_VALUE | Integer | 阻塞写入等待 Compact 的 Run 数量阈值 |
compaction.max-size-amplification-percent |
200 | Integer | 最大空间放大百分比,超过此值触发全量 Compact |
compaction.size-ratio |
1 | Integer | 大小比例阈值,控制是否合并相邻的 Runs |
compaction.optimization-interval |
无 | Duration | 定时全量 Compact 间隔 |
compaction.total-size-threshold |
无 | MemorySize | 触发全量 Compact 的总文件大小阈值 |
调优建议:
1. 控制文件数量:
-- 更激进的 Compact:Run 数量达到 3 就触发
CREATE TABLE my_table (...) WITH (
'num-sorted-run.compaction-trigger' = '3'
);
2. 控制空间放大:
-- 更严格的空间控制:100% 空间放大就触发
CREATE TABLE my_table (...) WITH (
'compaction.max-size-amplification-percent' = '100'
);
3. 定时全量 Compact:
-- 每天凌晨2点执行全量 Compact
CREATE TABLE my_table (...) WITH (
'compaction.optimization-interval' = '24 h'
);
8.3 文件大小配置
| 配置项 | 默认值 | 类型 | 说明 |
|---|---|---|---|
target-file-size |
128 MB | MemorySize | 目标文件大小,Compact 后的文件尽量接近此大小 |
compaction.file-size |
target-file-size | MemorySize | 触发 Compact 的文件大小阈值,小于此值的文件会被合并 |
compaction.min.file-num |
5 | Integer | Append 表触发 Compact 的最小文件数 |
调优建议:
-- 小表场景:减小文件大小,加快 Compact
CREATE TABLE small_table (...) WITH (
'target-file-size' = '64 MB',
'compaction.min.file-num' = '3'
);
-- 大表场景:增大文件大小,减少文件数量
CREATE TABLE large_table (...) WITH (
'target-file-size' = '256 MB',
'compaction.min.file-num' = '10'
);
8.4 Off-Peak Hours 配置
| 配置项 | 默认值 | 类型 | 说明 |
|---|---|---|---|
compaction.offpeak.start.hour |
-1 | Integer | 非高峰时段开始时间(小时,0-23),-1 表示禁用 |
compaction.offpeak.end.hour |
-1 | Integer | 非高峰时段结束时间(小时,0-23),-1 表示禁用 |
compaction.offpeak-ratio |
0 | Integer | 非高峰时段的额外 Compact 比例 |
示例:
-- 凌晨2点到6点使用更激进的 Compact 策略
CREATE TABLE my_table (...) WITH (
'compaction.offpeak.start.hour' = '2',
'compaction.offpeak.end.hour' = '6',
'compaction.offpeak-ratio' = '20',
'compaction.size-ratio' = '1'
);
-- 在非高峰时段,effective size-ratio = 1 + 20 = 21
8.5 其他重要配置
| 配置项 | 默认值 | 类型 | 说明 |
|---|---|---|---|
write-only |
false | Boolean | 是否仅写入不 Compact,适用于专门的 Compact 作业场景 |
compaction.delete-ratio-threshold |
0.3 | Double | 删除率阈值,启用 Deletion Vector 时使用 |
prepare-commit.wait-compaction |
false | Boolean | Checkpoint 时是否等待 Compact 完成 |
专门 Compact 作业配置:
-- 写表配置
CREATE TABLE my_table (...) WITH (
'write-only' = 'true' -- 禁用 Compact
);
-- Compact 作业配置
CALL sys.compact(
`table` => 'default.my_table',
`options` => 'continuous.discovery-interval=5s,sink.parallelism=20'
);
九、源码核心类总结
9.1 Compact 管理类
| 类名 | 路径 | 说明 |
|---|---|---|
CompactManager |
paimon-core/src/main/java/org/apache/paimon/compact/ |
Compact 管理器接口 |
MergeTreeCompactManager |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
主键表 Compact 管理器 |
BucketedAppendCompactManager |
paimon-core/src/main/java/org/apache/paimon/append/ |
Append 表(有 Bucket)Compact 管理器 |
AppendCompactCoordinator |
paimon-core/src/main/java/org/apache/paimon/append/ |
Append 表 Compact 协调器 |
9.2 Compact 策略类
| 类名 | 路径 | 说明 |
|---|---|---|
CompactStrategy |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
Compact 策略接口 |
UniversalCompaction |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
Universal Compaction 策略实现 |
FullCompactTrigger |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
定时全量 Compact 触发器 |
ForceUpLevel0Compaction |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
强制 Level 0 Compact 策略 |
OffPeakHours |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
非高峰时段配置 |
9.3 Compact 任务类
| 类名 | 路径 | 说明 |
|---|---|---|
CompactTask |
paimon-core/src/main/java/org/apache/paimon/compact/ |
Compact 任务抽象类 |
MergeTreeCompactTask |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
主键表 Compact 任务 |
FileRewriteCompactTask |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
文件重写任务(仅修改元数据) |
AppendCompactTask |
paimon-core/src/main/java/org/apache/paimon/append/ |
Append 表 Compact 任务 |
CompactResult |
paimon-core/src/main/java/org/apache/paimon/compact/ |
Compact 结果 |
CompactUnit |
paimon-core/src/main/java/org/apache/paimon/compact/ |
Compact 单元(待合并的文件集合) |
9.4 数据结构类
| 类名 | 路径 | 说明 |
|---|---|---|
Levels |
paimon-core/src/main/java/org/apache/paimon/mergetree/ |
LSM-Tree 层级结构 |
SortedRun |
paimon-core/src/main/java/org/apache/paimon/mergetree/ |
有序文件集合 |
LevelSortedRun |
paimon-core/src/main/java/org/apache/paimon/mergetree/ |
带 Level 信息的 SortedRun |
IntervalPartition |
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ |
文件分区算法(处理重叠文件) |
9.5 Flink 集成类
| 类名 | 路径 | 说明 |
|---|---|---|
AppendBypassCoordinateOperator |
paimon-flink/paimon-flink-common/.../flink/source/ |
Append 表 Compact 协调算子 |
StoreCompactOperator |
paimon-flink/paimon-flink-common/.../flink/sink/ |
主键表 Compact 执行算子 |
CompactorSourceBuilder |
paimon-flink/paimon-flink-common/.../flink/source/ |
Compact Source 构建器 |
AppendTableCompact |
paimon-flink/paimon-flink-common/.../flink/compact/ |
Append 表 Compact 作业构建器 |
CompactAction |
paimon-flink/paimon-flink-common/.../flink/action/ |
Compact Action 实现 |
9.6 写入相关类
| 类名 | 路径 | 说明 |
|---|---|---|
MergeTreeWriter |
paimon-core/src/main/java/org/apache/paimon/mergetree/ |
主键表写入器 |
AppendOnlyWriter |
paimon-core/src/main/java/org/apache/paimon/append/ |
Append 表写入器 |
AbstractFileStoreWrite |
paimon-core/src/main/java/org/apache/paimon/operation/ |
文件存储写入抽象类 |
KeyValueFileStoreWrite |
paimon-core/src/main/java/org/apache/paimon/operation/ |
主键表写入实现 |
BaseAppendFileStoreWrite |
paimon-core/src/main/java/org/apache/paimon/operation/ |
Append 表写入实现 |
十、最佳实践与优化建议
10.1 场景一:多写作业共享表
问题:多个作业同时写入同一张表,内联 Compact 会导致文件冲突。
解决方案:使用专门的 Compact 作业
配置:
-- 写表配置:禁用 Compact
CREATE TABLE shared_table (
id BIGINT,
name STRING,
dt STRING,
PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'write-only' = 'true',
'bucket' = '8'
);
-- 专门的 Compact 作业
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action.jar \
compact \
--warehouse hdfs:///warehouse \
--database default \
--table shared_table \
--continuous \
--table_conf continuous.discovery-interval=10s \
--table_conf sink.parallelism=16
优点:
- 写作业之间无冲突
- 资源隔离,Compact 不影响写入性能
- 可以独立调整 Compact 并行度
10.2 场景二:大规模数据写入
问题:高吞吐写入导致 Level 0 文件数量激增,影响查询性能。
解决方案:调整 Compact 策略,加快 Compact 速度
配置:
CREATE TABLE high_throughput_table (...) WITH (
-- 减少触发阈值,更频繁地 Compact
'num-sorted-run.compaction-trigger' = '3',
-- 增大目标文件大小,减少文件数量
'target-file-size' = '256 MB',
-- 设置阻塞阈值,防止 Level 0 文件过多
'num-sorted-run.stop-trigger' = '10',
-- 使用更激进的空间放大控制
'compaction.max-size-amplification-percent' = '150'
);
Compact 作业并行度:
--table_conf sink.parallelism=32 # 增大并行度加快 Compact
10.3 场景三:低延迟查询要求
问题:Level 0 文件过多导致查询延迟较高。
解决方案:优化 Compact 策略,保持 Level 0 文件数量较少
配置:
CREATE TABLE low_latency_table (...) WITH (
-- 更激进的 Compact 触发
'num-sorted-run.compaction-trigger' = '2',
-- 定时全量 Compact,确保查询性能
'compaction.optimization-interval' = '1 h',
-- Checkpoint 时等待 Compact 完成
'prepare-commit.wait-compaction' = 'true'
);
10.4 场景四:定时离峰 Compact
问题:白天业务高峰期 Compact 占用资源,影响写入和查询。
解决方案:配置 Off-Peak Hours,在夜间进行更激进的 Compact
配置:
CREATE TABLE peak_sensitive_table (...) WITH (
-- 白天使用温和的 Compact 策略
'compaction.size-ratio' = '5',
'num-sorted-run.compaction-trigger' = '10',
-- 夜间2点到6点使用激进策略
'compaction.offpeak.start.hour' = '2',
'compaction.offpeak.end.hour' = '6',
'compaction.offpeak-ratio' = '30'
);
效果:
- 白天:effective size-ratio = 5,Compact 触发较少
- 夜间:effective size-ratio = 5 + 30 = 35,Compact 更频繁
10.5 场景五:Append 表小文件过多
问题:Append 表写入产生大量小文件。
解决方案:调整文件打包策略
配置:
CREATE TABLE append_table (
id BIGINT,
data STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
-- 降低最小文件数要求
'compaction.min.file-num' = '3',
-- 减小 Compact 文件大小阈值
'compaction.file-size' = '64 MB',
-- 增大目标文件大小
'target-file-size' = '256 MB',
-- 缩短扫描间隔
'continuous.discovery-interval' = '5s'
);
10.6 监控指标
关键指标:
-
Level 0 文件数:
level0.file.count- 正常范围:< 10
- 告警阈值:> 20
-
Compact 延迟:
compaction.time.ms- 正常范围:< 5min
- 告警阈值:> 15min
-
Compact 队列长度:
compactions.queued.count- 正常范围:< 5
- 告警阈值:> 10
-
总文件大小:
total.file.size.bytes- 监控空间放大情况
Flink Metrics:
// paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
public interface Reporter {
void reportLevel0FileCount(long count);
void reportCompactionTime(long milliseconds);
void increaseCompactionsQueuedCount();
void increaseCompactionsCompletedCount();
void reportTotalFileSize(long bytes);
}
10.7 故障排查
问题1:Compact 任务失败
排查步骤:
- 检查日志中的异常信息
- 确认是否有并发写冲突
- 检查资源是否充足(内存、磁盘)
解决方案:
- 启用
write-only模式,使用专门的 Compact 作业 - 增加 Compact 作业的资源配置
问题2:Compact 速度慢
排查步骤:
- 检查
compactions.queued.count指标 - 查看 Compact 任务的并行度
- 确认文件大小和数量
解决方案:
- 增大 Compact 并行度:
sink.parallelism - 调整策略参数:降低
num-sorted-run.compaction-trigger - 增大文件大小:
target-file-size
问题3:内存溢出(OOM)
排查步骤:
- 检查 Compact 任务的堆内存使用
- 查看单个 Compact 任务处理的文件数量
- 确认是否有大文件
解决方案:
- 增大 TaskManager 内存
- 减小
target-file-size - 限制单次 Compact 的文件数量
总结
Paimon 的定时 Compact 机制是保证数据湖性能的关键组件。本文详细分析了:
- 三种触发方式:内联 Compact、定时 Compact Job、手动触发
- Append 表流程:基于 AppendCompactCoordinator 的文件扫描、打包和任务生成
- 主键表流程:基于 LSM-Tree 的 Levels 管理和 MergeTreeCompactManager
- UniversalCompaction 策略:四级策略(定时全量、空间放大、大小比例、文件数量)
- 定时调度机制:基于 Flink 的流式处理框架,使用 ScheduledExecutorService 和 ProcessingTimeService
- 关键配置项:扫描间隔、策略参数、文件大小、Off-Peak Hours 等
- 最佳实践:多写作业、大规模写入、低延迟查询、离峰 Compact、小文件处理等场景的优化方案
通过合理配置和监控,可以在写入性能、查询性能和存储成本之间取得良好的平衡。
参考资料:
更多推荐


所有评论(0)