Paimon 表定时 Compact 数据流程与逻辑详解

目录


一、概述

Apache Paimon 是一个流式数据湖存储系统,采用 LSM-Tree(Log-Structured Merge-Tree)架构来管理数据文件。Compact(压缩)是 Paimon 中至关重要的后台操作,用于:

  1. 减少文件数量:合并小文件,降低读取时的文件扫描开销
  2. 减少空间放大:删除过期数据,回收存储空间
  3. 优化查询性能:减少 Level 0 文件数,加快数据检索
  4. 生成 Changelog:通过 Compact 生成变更日志
  5. 维护删除向量:对于 MOW(Merge-on-Write)模式,维护 Deletion Vectors

本文将深入分析 Paimon 表的定时 Compact 机制,包括触发时机、调度机制、策略选择和执行流程,并结合源码进行详细解析。


二、Compact 核心架构

2.1 整体架构图

存储层

执行端

定时调度

写入端

触发

扫描快照

生成

数据写入

CompactManager

CompactTask

定时器

Coordinator

文件列表

CompactTask队列

Compact执行器

文件合并

新文件

元数据提交

新快照

2.2 核心组件

2.2.1 CompactManager

CompactManager 是 Compact 的管理中心,负责:

  • 维护文件的层级结构(Levels)
  • 判断是否需要触发 Compact
  • 调用策略选择需要合并的文件
  • 提交异步 Compact 任务到线程池

主键表实现MergeTreeCompactManager
Append 表实现BucketedAppendCompactManager

// paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
public interface CompactManager {
    // 判断是否应该等待 Compact 完成
    boolean shouldWaitForLatestCompaction();
    
    // 添加新文件到 Level 0
    void addNewFile(DataFileMeta file);
    
    // 触发 Compact
    void triggerCompaction(boolean fullCompaction);
    
    // 获取 Compact 结果
    Optional<CompactResult> getCompactionResult(boolean blocking);
}
2.2.2 CompactStrategy

CompactStrategy 负责文件选择策略,决定哪些文件需要合并:

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
public interface CompactStrategy {
    /**
     * 从 runs 中选择需要 compact 的单元
     * - compaction 是基于 runs 的,不是基于文件的
     * - level 0 是特殊的,一个文件对应一个 run
     * - 其他 level 是一个 level 对应一个 run
     * - compaction 从小 level 到大 level 依次进行
     */
    Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
}
2.2.3 CompactTask

CompactTask 是实际执行合并的任务单元:

// paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
public abstract class CompactTask implements Callable<CompactResult> {
    @Override
    public CompactResult call() throws Exception {
        long startMillis = System.currentTimeMillis();
        CompactResult result = doCompact();
        
        // 记录指标
        if (metricsReporter != null) {
            metricsReporter.reportCompactionTime(
                System.currentTimeMillis() - startMillis);
            metricsReporter.increaseCompactionsCompletedCount();
        }
        
        return result;
    }
    
    protected abstract CompactResult doCompact() throws Exception;
}

三、Compact 触发机制

Paimon 支持三种 Compact 触发方式:

3.1 内联 Compact(Inline Compaction)

触发时机:数据写入过程中自动触发

核心代码MergeTreeWriter.prepareCommit()

// paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java:209-248
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
        throws Exception {
    if (writeBuffer.size() > 0) {
        // 判断是否需要等待 Compact 完成
        if (compactManager.shouldWaitForLatestCompaction()) {
            waitForLatestCompaction = true;
        }

        // 刷写 buffer 到文件
        final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
                writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
        
        // ... 写入逻辑 ...
        
        // 将新文件添加到 CompactManager
        for (DataFileMeta fileMeta : dataWriter.result()) {
            newFiles.add(fileMeta);
            compactManager.addNewFile(fileMeta);
        }
    }

    // 等待上一次 Compact 完成
    trySyncLatestCompaction(waitForLatestCompaction);
    
    // 触发新的 Compact
    compactManager.triggerCompaction(forcedFullCompaction);
}

判断是否需要等待 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:105-113
@Override
public boolean shouldWaitForLatestCompaction() {
    // 当 SortedRun 数量超过阈值时,阻塞写入等待 Compact
    return levels.numberOfSortedRuns() > numSortedRunStopTrigger;
}

@Override
public boolean shouldWaitForPreparingCheckpoint() {
    // 在 checkpoint 准备阶段,如果 runs 数量超过阈值+1,也要等待
    return levels.numberOfSortedRuns() > (long) numSortedRunStopTrigger + 1;
}

关键配置

  • num-sorted-run.stop-trigger:默认 Integer.MAX_VALUE,当 SortedRun 数量超过此值时阻塞写入

3.2 定时 Compact Job

触发时机:独立的 Compact 作业,定时扫描表快照生成 Compact 任务

适用场景

  • 多个写作业同时写入同一张表
  • 需要将写入和 Compact 资源隔离
  • 大规模数据写入场景

配置方式

-- 在写表时禁用 Compact
CREATE TABLE my_table (
    ...
) WITH (
    'write-only' = 'true'  -- 跳过 Compact 和 Snapshot 过期
);

-- 启动专门的 Compact 作业
CALL sys.compact('default.my_table');

或使用 Action Jar:

<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action.jar \
    compact \
    --warehouse hdfs:///path/to/warehouse \
    --database default \
    --table my_table

定时间隔配置

-- 设置扫描间隔为 30 秒
CREATE TABLE my_table (...) WITH (
    'continuous.discovery-interval' = '30s'
);

3.3 手动触发 Compact

Flink SQL 方式

-- 对特定分区进行 Compact
CALL sys.compact(
    `table` => 'default.my_table', 
    `partitions` => 'dt=2024-01-01',
    `options` => 'sink.parallelism=10'
);

-- Full Compaction
CALL sys.compact(
    `table` => 'default.my_table',
    `compact_strategy` => 'full'
);

Spark 方式

val table = spark.table("default.my_table")
table.createOrReplaceTempView("tmp")
spark.sql("CALL paimon.sys.compact(table => 'tmp')")

四、Append 表 Compact 流程详解

4.1 整体流程图

Committer FileWriter AppendCompactTask SubCoordinator AppendCompactCoordinator 定时器 Committer FileWriter AppendCompactTask SubCoordinator AppendCompactCoordinator 定时器 批量读取100k个文件 使用FileBin打包 目标:2倍targetFileSize 触发扫描(每10秒) scan() 扫描快照 notifyNewFiles(partition, files) 过滤需要Compact的文件 agePack() 打包文件 生成CompactTask doCompact() 执行合并 读取文件并合并 生成新文件 提交元数据

4.2 核心类:AppendCompactCoordinator

AppendCompactCoordinator 是 Append 表的 Compact 协调器,负责扫描快照、过滤文件、生成任务。

类定义

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:70-109
public class AppendCompactCoordinator {
    private static final int FILES_BATCH = 100_000;  // 批量处理文件数
    protected static final int REMOVE_AGE = 10;      // 移除年龄阈值
    protected static final int COMPACT_AGE = 5;      // 强制Compact年龄

    private final SnapshotManager snapshotManager;
    private final long targetFileSize;               // 目标文件大小
    private final long compactionFileSize;           // Compact阈值
    private final double deleteThreshold;            // 删除率阈值
    private final int minFileNum;                    // 最小文件数
    
    // 按分区组织的子协调器
    final Map<BinaryRow, SubCoordinator> subCoordinators = new HashMap<>();
    
    public List<AppendCompactTask> run() {
        // 扫描快照中的文件
        if (scan()) {
            // 生成 Compact 任务
            return compactPlan();
        }
        return Collections.emptyList();
    }
}

4.3 文件扫描逻辑

scan() 方法

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:122-148
boolean scan() {
    Map<BinaryRow, List<DataFileMeta>> files = new HashMap<>();
    
    // 批量读取文件(每次最多 100,000 个)
    for (int i = 0; i < FILES_BATCH; i++) {
        ManifestEntry entry;
        try {
            entry = filesIterator.next();
        } catch (EndOfScanException e) {
            if (!files.isEmpty()) {
                files.forEach(this::notifyNewFiles);
                return true;
            }
            throw e;
        }
        if (entry == null) {
            break;
        }
        BinaryRow partition = entry.partition();
        files.computeIfAbsent(partition, k -> new ArrayList<>()).add(entry.file());
    }

    if (files.isEmpty()) {
        return false;
    }

    files.forEach(this::notifyNewFiles);
    return true;
}

文件过滤条件

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:467-482
private boolean shouldCompact(BinaryRow partition, DataFileMeta file) {
    // 条件1: 文件大小小于 compaction.file-size
    if (file.fileSize() < compactionFileSize) {
        return true;
    }
    
    // 条件2: 删除率超过阈值(启用 Deletion Vector 时)
    return tooHighDeleteRatio(partition, file);
}

private boolean tooHighDeleteRatio(BinaryRow partition, DataFileMeta file) {
    if (dvMaintainerCache != null) {
        DeletionFile deletionFile =
                dvMaintainerCache.dvMaintainer(partition).getDeletionFile(file.fileName());
        if (deletionFile != null) {
            Long cardinality = deletionFile.cardinality();
            long rowCount = file.rowCount();
            return cardinality == null || cardinality > rowCount * deleteThreshold;
        }
    }
    return false;
}

4.4 文件打包策略:SubCoordinator

每个分区有一个 SubCoordinator,负责该分区的文件打包和任务生成。

打包逻辑

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:233-274
private List<List<DataFileMeta>> agePack() {
    List<List<DataFileMeta>> packed;
    
    if (dvMaintainerCache == null) {
        // 普通打包模式
        packed = pack(toCompact);
    } else {
        // Deletion Vector 模式:按删除文件分组
        packed = packInDeletionVectorVMode(toCompact);
    }
    
    if (packed.isEmpty()) {
        // 如果没有可打包的,增加年龄
        if (++age > COMPACT_AGE && toCompact.size() > 1) {
            // 年龄超过阈值,强制打包所有文件
            List<DataFileMeta> all = new ArrayList<>(toCompact);
            toCompact.clear();
            packed = Collections.singletonList(all);
        }
    }
    
    return packed;
}

private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
    // 按文件大小排序
    ArrayList<DataFileMeta> files = new ArrayList<>(toCompact);
    files.sort(Comparator.comparingLong(DataFileMeta::fileSize));

    List<List<DataFileMeta>> result = new ArrayList<>();
    FileBin fileBin = new FileBin();
    
    for (DataFileMeta fileMeta : files) {
        fileBin.addFile(fileMeta);
        if (fileBin.enoughContent()) {
            // 满足打包条件:文件数>1 且 总大小 >= 2*targetFileSize
            result.add(fileBin.drain());
        }
    }

    if (fileBin.enoughInputFiles()) {
        // 文件数满足最小要求
        result.add(fileBin.drain());
    }

    return result;
}

FileBin 打包单元

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java:324-352
private class FileBin {
    List<DataFileMeta> bin = new ArrayList<>();
    long totalFileSize = 0;

    public void addFile(DataFileMeta file) {
        totalFileSize += file.fileSize() + openFileCost;
        bin.add(file);
    }

    private boolean enoughContent() {
        // 文件数 > 1 且 总大小 >= 2 倍 targetFileSize
        return bin.size() > 1 && totalFileSize >= targetFileSize * 2;
    }

    private boolean enoughInputFiles() {
        // 文件数 >= minFileNum
        return bin.size() >= minFileNum;
    }
}

4.5 任务执行:AppendCompactTask

任务定义

// paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java:44-67
public class AppendCompactTask {
    private final BinaryRow partition;
    private final List<DataFileMeta> compactBefore;  // 待合并文件
    private final List<DataFileMeta> compactAfter;   // 合并后文件

    public AppendCompactTask(BinaryRow partition, List<DataFileMeta> files) {
        this.partition = partition;
        compactBefore = new ArrayList<>(files);
        compactAfter = new ArrayList<>();
    }

    public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite write)
            throws Exception {
        // 执行文件合并
        // ... 合并逻辑 ...
    }
}

4.6 Bucketed Append 表的 Compact

对于有 Bucket 的 Append 表,使用 BucketedAppendCompactManager

文件选择逻辑

// paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java:201-227
Optional<List<DataFileMeta>> pickCompactBefore() {
    if (toCompact.isEmpty()) {
        return Optional.empty();
    }

    long totalFileSize = 0L;
    int fileNum = 0;
    LinkedList<DataFileMeta> candidates = new LinkedList<>();

    // 使用优先队列按序号排序文件
    while (!toCompact.isEmpty()) {
        DataFileMeta file = toCompact.poll();
        candidates.add(file);
        totalFileSize += file.fileSize();
        fileNum++;
        
        if (fileNum >= minFileNum) {
            // 满足最小文件数要求
            return Optional.of(candidates);
        } else if (totalFileSize >= targetFileSize) {
            // 总大小超过目标,移除第一个文件继续
            DataFileMeta removed = candidates.pollFirst();
            totalFileSize -= removed.fileSize();
            fileNum--;
        }
    }
    
    toCompact.addAll(candidates);
    return Optional.empty();
}

五、主键表 Compact 流程详解

5.1 LSM-Tree 架构

主键表采用 LSM-Tree(Log-Structured Merge-Tree)架构,数据文件分为多个 Level:

Level 0:  [File1] [File2] [File3] ...  (可能有重叠)
          ↓ Compact
Level 1:  [File_L1_1 | File_L1_2 | File_L1_3]  (有序,无重叠)
          ↓ Compact  
Level 2:  [File_L2_1 --------- | File_L2_2 ---------]
          ↓ Compact
Level N:  [File_LN_1 -------------------------------]

Level 特点

  • Level 0:新写入的文件,文件之间可能有 Key 重叠,一个文件对应一个 SortedRun
  • Level 1+:合并后的文件,同一 Level 内文件按 Key 有序且不重叠,一个 Level 对应一个 SortedRun

5.2 核心类:MergeTreeCompactManager

类定义

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:53-102
public class MergeTreeCompactManager extends CompactFutureManager {
    private final ExecutorService executor;
    private final Levels levels;                      // LSM-Tree 层级结构
    private final CompactStrategy strategy;           // Compact 策略
    private final Comparator<InternalRow> keyComparator;
    private final long compactionFileSize;
    private final int numSortedRunStopTrigger;
    private final CompactRewriter rewriter;
    
    @Nullable private final CompactionMetrics.Reporter metricsReporter;
    @Nullable private final BucketedDvMaintainer dvMaintainer;
    @Nullable private final RecordLevelExpire recordLevelExpire;
    
    // ... 构造方法 ...
}

5.3 文件添加与触发 Compact

添加新文件到 Level 0

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:116-124
@Override
public void addNewFile(DataFileMeta file) {
    levels.addLevel0File(file);
    MetricUtils.safeCall(this::reportMetrics, LOG);
}

触发 Compact 逻辑

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:127-194
@Override
public void triggerCompaction(boolean fullCompaction) {
    Optional<CompactUnit> optionalUnit;
    List<LevelSortedRun> runs = levels.levelSortedRuns();
    
    if (fullCompaction) {
        // 强制全量 Compact
        Preconditions.checkState(taskFuture == null,
            "A compaction task is still running while forcing a new compaction.");
        
        optionalUnit = CompactStrategy.pickFullCompaction(
                levels.numberOfLevels(),
                runs,
                recordLevelExpire,
                dvMaintainer,
                forceRewriteAllFiles);
    } else {
        // 正常 Compact
        if (taskFuture != null) {
            return;  // 已有任务在运行
        }
        
        // 使用策略选择文件
        optionalUnit = strategy.pick(levels.numberOfLevels(), runs)
                .filter(unit -> !unit.files().isEmpty())
                .filter(unit ->
                        unit.files().size() > 1
                        || unit.files().get(0).level() != unit.outputLevel());
    }

    optionalUnit.ifPresent(unit -> {
        // 判断是否可以删除旧记录
        boolean dropDelete = unit.outputLevel() != 0
                && (unit.outputLevel() >= levels.nonEmptyHighestLevel()
                        || dvMaintainer != null);

        submitCompaction(unit, dropDelete);
    });
}

5.4 Compact 任务提交

submitCompaction() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java:201-245
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
    CompactTask task;
    
    if (unit.fileRewrite()) {
        // 文件级别重写(仅修改元数据)
        task = new FileRewriteCompactTask(rewriter, unit, dropDelete, metricsReporter);
    } else {
        // 正常 Compact(读取合并数据)
        task = new MergeTreeCompactTask(
                keyComparator,
                compactionFileSize,
                rewriter,
                unit,
                dropDelete,
                levels.maxLevel(),
                metricsReporter,
                compactDfSupplier,
                recordLevelExpire,
                forceRewriteAllFiles);
    }

    if (LOG.isDebugEnabled()) {
        LOG.debug("Pick these files (name, level, size) for {} compaction: {}",
                task.getClass().getSimpleName(),
                unit.files().stream()
                        .map(file -> String.format("(%s, %d, %d)",
                                file.fileName(), file.level(), file.fileSize()))
                        .collect(Collectors.joining(", ")));
    }
    
    // 提交到线程池
    taskFuture = executor.submit(task);
    
    if (metricsReporter != null) {
        metricsReporter.increaseCompactionsQueuedCount();
        metricsReporter.increaseCompactionsTotalCount();
    }
}

5.5 MergeTreeCompactTask 执行逻辑

doCompact() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java:82-113
@Override
protected CompactResult doCompact() throws Exception {
    List<List<SortedRun>> candidate = new ArrayList<>();
    CompactResult result = new CompactResult();

    // 遍历分区(IntervalPartition 将重叠文件分组)
    for (List<SortedRun> section : partitioned) {
        if (section.size() > 1) {
            // 有多个 Run,需要合并
            candidate.add(section);
        } else {
            SortedRun run = section.get(0);
            // 单个 Run:根据文件大小决定是否需要重写
            for (DataFileMeta file : run.files()) {
                if (file.fileSize() < minFileSize) {
                    // 小文件:需要重写
                    candidate.add(singletonList(SortedRun.fromSingle(file)));
                } else {
                    // 大文件:先重写之前的候选,然后升级当前文件
                    rewrite(candidate, result);
                    upgrade(file, result);
                }
            }
        }
    }
    
    // 重写剩余的候选文件
    rewrite(candidate, result);
    result.setDeletionFile(compactDfSupplier.get());
    return result;
}

upgrade() 方法(文件升级)

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java:123-138
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
    // 需要重写的情况:
    // 1. 输出到最高层且包含删除记录
    // 2. 强制重写所有文件
    // 3. 包含过期记录
    if ((outputLevel == maxLevel && containsDeleteRecords(file))
            || forceRewriteAllFiles
            || containsExpiredRecords(file)) {
        List<List<SortedRun>> candidate = new ArrayList<>();
        candidate.add(singletonList(SortedRun.fromSingle(file)));
        rewriteImpl(candidate, toUpdate);
        return;
    }

    // 仅修改文件的 level 元数据(无需重写数据)
    if (file.level() != outputLevel) {
        CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
        toUpdate.merge(upgradeResult);
        upgradeFilesNum++;
    }
}

5.6 IntervalPartition 算法

IntervalPartition 用于将多个数据文件划分为最少数量的 SortedRun,处理文件间的 Key 重叠:

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java:32-91
public class IntervalPartition {
    private final List<DataFileMeta> files;
    private final Comparator<InternalRow> keyComparator;

    /**
     * 返回二维列表:
     * - 外层列表:sections(不同 section 的 Key 区间不重叠)
     * - 内层列表:SortedRuns(同一 section 内的多个 Run)
     */
    public List<List<SortedRun>> partition() {
        List<List<SortedRun>> result = new ArrayList<>();
        List<DataFileMeta> section = new ArrayList<>();
        InternalRow bound = null;

        // 按 minKey 排序
        files.sort((o1, o2) -> {
            int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
            return leftResult == 0
                    ? keyComparator.compare(o1.maxKey(), o2.maxKey())
                    : leftResult;
        });

        for (DataFileMeta meta : files) {
            // 如果当前文件的 minKey > 上一个 section 的 bound
            // 说明没有重叠,开始新的 section
            if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
                result.add(partition(section));
                section.clear();
                bound = null;
            }
            section.add(meta);
            
            // 更新 bound 为当前 section 的最大 maxKey
            if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
                bound = meta.maxKey();
            }
        }

        if (!section.isEmpty()) {
            result.add(partition(section));
        }

        return result;
    }
}

5.7 Levels 结构维护

Levels 类管理 LSM-Tree 的层级结构

// paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java:38-98
public class Levels {
    private final Comparator<InternalRow> keyComparator;
    private final TreeSet<DataFileMeta> level0;  // Level 0 文件(按序号排序)
    private final List<SortedRun> levels;        // Level 1+ 文件

    public void update(List<DataFileMeta> before, List<DataFileMeta> after) {
        // 按 Level 分组
        Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
        Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);
        
        // 更新每个 Level
        for (int i = 0; i < numberOfLevels(); i++) {
            updateLevel(i,
                    groupedBefore.getOrDefault(i, emptyList()),
                    groupedAfter.getOrDefault(i, emptyList()));
        }
    }

    private void updateLevel(int level, List<DataFileMeta> before, List<DataFileMeta> after) {
        if (level == 0) {
            // Level 0: 从 TreeSet 中移除旧文件,添加新文件
            before.forEach(level0::remove);
            level0.addAll(after);
        } else {
            // Level 1+: 从 SortedRun 中移除旧文件,添加新文件,重新排序
            List<DataFileMeta> files = new ArrayList<>(runOfLevel(level).files());
            files.removeAll(before);
            files.addAll(after);
            levels.set(level - 1, SortedRun.fromUnsorted(files, keyComparator));
        }
    }
}

六、UniversalCompaction 策略深度解析

6.1 策略概述

UniversalCompaction 是 Paimon 的核心 Compact 策略,源自 RocksDB 的 Universal Compaction

设计目标

  • 降低写放大(Write Amplification)
  • 控制空间放大(Space Amplification)
  • 平衡读放大(Read Amplification)

6.2 策略参数

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:42-64
public class UniversalCompaction implements CompactStrategy {
    private final int maxSizeAmp;              // 最大空间放大百分比
    private final int sizeRatio;               // 大小比例阈值
    private final int numRunCompactionTrigger; // 触发 Compact 的 Run 数量
    
    @Nullable private final FullCompactTrigger fullCompactTrigger;
    @Nullable private final OffPeakHours offPeakHours;
}

配置项对应

参数 配置键 默认值 说明
maxSizeAmp compaction.max-size-amplification-percent 200 最大空间放大百分比
sizeRatio compaction.size-ratio 1 大小比例阈值
numRunCompactionTrigger num-sorted-run.compaction-trigger 5 触发 Compact 的 Run 数

6.3 策略决策流程

pick() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:67-107
@Override
public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
    int maxLevel = numLevels - 1;

    // 【优先级0】尝试定时全量 Compact
    if (fullCompactTrigger != null) {
        Optional<CompactUnit> unit = fullCompactTrigger.tryFullCompact(numLevels, runs);
        if (unit.isPresent()) {
            return unit;
        }
    }

    // 【优先级1】检查空间放大
    CompactUnit unit = pickForSizeAmp(maxLevel, runs);
    if (unit != null) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Universal compaction due to size amplification");
        }
        return Optional.of(unit);
    }

    // 【优先级2】检查大小比例
    unit = pickForSizeRatio(maxLevel, runs);
    if (unit != null) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Universal compaction due to size ratio");
        }
        return Optional.of(unit);
    }

    // 【优先级3】检查文件数量
    if (runs.size() > numRunCompactionTrigger) {
        // 超过阈值,触发 Compact
        int candidateCount = runs.size() - numRunCompactionTrigger + 1;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Universal compaction due to file num");
        }
        return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
    }

    return Optional.empty();
}

6.4 详细策略说明

6.4.1 定时全量 Compact(FullCompactTrigger)

触发条件

  1. 距离上次全量 Compact 的时间间隔超过 compaction.optimization-interval
  2. 文件总大小小于 compaction.total-size-threshold
// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java:64-88
public Optional<CompactUnit> tryFullCompact(int numLevels, List<LevelSortedRun> runs) {
    if (runs.size() == 1) {
        return Optional.empty();
    }

    int maxLevel = numLevels - 1;
    
    // 条件1: 时间间隔触发
    if (fullCompactionInterval != null) {
        if (lastFullCompaction == null
                || currentTimeMillis() - lastFullCompaction > fullCompactionInterval) {
            LOG.debug("Universal compaction due to full compaction interval");
            updateLastFullCompaction();
            return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
        }
    }
    
    // 条件2: 总大小触发
    if (totalSizeThreshold != null) {
        long totalSize = 0;
        for (LevelSortedRun run : runs) {
            totalSize += run.run().totalSize();
        }
        if (totalSize < totalSizeThreshold) {
            return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
        }
    }
    
    return Optional.empty();
}

配置示例

CREATE TABLE my_table (...) WITH (
    'compaction.optimization-interval' = '1 h',  -- 每小时全量 Compact
    'compaction.total-size-threshold' = '10 GB'  -- 总大小小于10GB时全量 Compact
);
6.4.2 空间放大检查(pickForSizeAmp)

空间放大定义:额外空间占比 = (所有文件大小 - 最早文件大小) / 最早文件大小 × 100%

触发条件:当空间放大超过 maxSizeAmp 时,触发全量 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:125-147
CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
    if (runs.size() < numRunCompactionTrigger) {
        return null;
    }

    // 计算候选文件总大小(除了最早的文件)
    long candidateSize =
            runs.subList(0, runs.size() - 1).stream()
                    .map(LevelSortedRun::run)
                    .mapToLong(SortedRun::totalSize)
                    .sum();

    // 最早文件的大小
    long earliestRunSize = runs.get(runs.size() - 1).run().totalSize();

    // 空间放大 = candidateSize * 100 / earliestRunSize
    if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
        if (fullCompactTrigger != null) {
            fullCompactTrigger.updateLastFullCompaction();
        }
        // 触发全量 Compact
        return CompactUnit.fromLevelRuns(maxLevel, runs);
    }

    return null;
}

示例

假设 maxSizeAmp = 200,有以下文件:

  • Run 0: 10 MB
  • Run 1: 20 MB
  • Run 2: 30 MB
  • Run 3: 100 MB (最早)

空间放大 = (10 + 20 + 30) × 100 / 100 = 60% < 200%,不触发

如果 Run 3 只有 20 MB:
空间放大 = 60 × 100 / 20 = 300% > 200%,触发全量 Compact

6.4.3 大小比例检查(pickForSizeRatio)

触发条件:当较小的 Run 的总大小与较大 Run 的比例超过阈值时,触发 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:163-182
public CompactUnit pickForSizeRatio(
        int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {
    long candidateSize = candidateSize(runs, candidateCount);
    
    // 从 candidateCount 开始,向后扩展
    for (int i = candidateCount; i < runs.size(); i++) {
        LevelSortedRun next = runs.get(i);
        
        // 计算比例:(candidateSize * (100 + sizeRatio + offPeakRatio)) / 100
        if (candidateSize * (100.0 + sizeRatio + ratioForOffPeak()) / 100.0
                < next.run().totalSize()) {
            // 下一个文件太大,停止扩展
            break;
        }

        // 继续包含下一个文件
        candidateSize += next.run().totalSize();
        candidateCount++;
    }

    if (forcePick || candidateCount > 1) {
        return createUnit(runs, maxLevel, candidateCount);
    }

    return null;
}

示例

假设 sizeRatio = 1(即 100%),有以下 Runs:

  • Run 0: 10 MB
  • Run 1: 15 MB
  • Run 2: 40 MB
  • Run 3: 100 MB

从 Run 0 开始:

  1. candidateSize = 10 MB
  2. 判断:10 × (100 + 100) / 100 = 20 MB < 15 MB?,包含 Run 1
  3. candidateSize = 25 MB
  4. 判断:25 × 200 / 100 = 50 MB < 40 MB?,包含 Run 2
  5. candidateSize = 65 MB
  6. 判断:65 × 200 / 100 = 130 MB < 100 MB?,包含 Run 3
  7. candidateSize = 165 MB
  8. 没有更多 Run,返回所有 4 个 Runs 的 CompactUnit
6.4.4 文件数量检查

触发条件:当 Run 数量超过 numRunCompactionTrigger 时,触发 Compact

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:96-104
if (runs.size() > numRunCompactionTrigger) {
    // 超过阈值,触发 Compact
    int candidateCount = runs.size() - numRunCompactionTrigger + 1;
    if (LOG.isDebugEnabled()) {
        LOG.debug("Universal compaction due to file num");
    }
    return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));
}

示例

假设 numRunCompactionTrigger = 5,当前有 8 个 Runs:

  • candidateCount = 8 - 5 + 1 = 4
  • 调用 pickForSizeRatio(maxLevel, runs, 4) 选择至少 4 个 Runs 进行 Compact

6.5 输出 Level 的确定

createUnit() 方法

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:197-226
CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
    int outputLevel;
    
    if (runCount == runs.size()) {
        // 合并所有 Runs,输出到最高层
        outputLevel = maxLevel;
    } else {
        // 输出到下一个 Run 的 level - 1
        outputLevel = Math.max(0, runs.get(runCount).level() - 1);
    }

    if (outputLevel == 0) {
        // 不输出到 Level 0,向后扩展
        for (int i = runCount; i < runs.size(); i++) {
            LevelSortedRun next = runs.get(i);
            runCount++;
            if (next.level() != 0) {
                outputLevel = next.level();
                break;
            }
        }
    }

    if (runCount == runs.size()) {
        if (fullCompactTrigger != null) {
            fullCompactTrigger.updateLastFullCompaction();
        }
        outputLevel = maxLevel;
    }

    return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));
}

6.6 Off-Peak Hours 优化

定义:在非高峰时段使用更激进的 Compact 策略

// paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java:184-186
private int ratioForOffPeak() {
    return offPeakHours == null ? 0 : offPeakHours.currentRatio(LocalDateTime.now().getHour());
}

配置示例

CREATE TABLE my_table (...) WITH (
    'compaction.offpeak.start.hour' = '2',   -- 凌晨2点开始
    'compaction.offpeak.end.hour' = '6',     -- 早上6点结束
    'compaction.offpeak-ratio' = '20'        -- 非高峰期使用20%的比例
);

在非高峰时段(2:00-6:00),sizeRatio 会加上 offpeak-ratio,使 Compact 更容易触发。


七、定时调度机制实现

7.1 Flink 流式 Compact 作业架构

主键表Compact

CompactorSource

定时扫描

发现需要Compact的Bucket

StoreCompactOperator

执行Compact

Append表Compact

定时器

AppendBypassCoordinateOperator

生成CompactTask

下游执行器

7.2 Append 表定时调度

7.2.1 AppendBypassCoordinateOperator

核心类AppendBypassCoordinateOperator 是 Flink 的一个 OneInputStreamOperator,负责定时生成 Compact 任务。

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:49-86
public class AppendBypassCoordinateOperator<CommitT>
        extends AbstractStreamOperator<Either<CommitT, AppendCompactTask>>
        implements OneInputStreamOperator<CommitT, Either<CommitT, AppendCompactTask>>,
                ProcessingTimeCallback {

    private static final long MAX_PENDING_TASKS = 5000;

    private final FileStoreTable table;
    private transient ScheduledExecutorService executorService;
    private transient LinkedBlockingQueue<AppendCompactTask> compactTasks;

    @Override
    public void open() throws Exception {
        super.open();
        
        // 获取扫描间隔
        long intervalMs = table.coreOptions().continuousDiscoveryInterval().toMillis();
        
        // 初始化任务队列
        this.compactTasks = new LinkedBlockingQueue<>();
        
        // 创建协调器
        AppendCompactCoordinator coordinator = new AppendCompactCoordinator(table, true, null);
        
        // 创建定时执行器
        this.executorService =
                Executors.newSingleThreadScheduledExecutor(
                        newDaemonThreadFactory("Compaction Coordinator"));
        
        // 定时异步生成任务
        this.executorService.scheduleWithFixedDelay(
                () -> asyncPlan(coordinator), 0, intervalMs, TimeUnit.MILLISECONDS);
        
        // 定时发送任务到下游
        this.getProcessingTimeService().scheduleWithFixedDelay(this, 0, intervalMs);
    }
}

异步生成任务

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:88-102
private void asyncPlan(AppendCompactCoordinator coordinator) {
    // 持续生成任务,直到队列满或没有新任务
    while (compactTasks.size() < MAX_PENDING_TASKS) {
        try {
            List<AppendCompactTask> tasks = coordinator.run();
            compactTasks.addAll(tasks);
            if (tasks.isEmpty()) {
                break;
            }
        } catch (Throwable t) {
            LOG.error("Fatal exception happened when generating compaction tasks.", t);
            this.throwable = t;
            break;
        }
    }
}

发送任务到下游

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java:104-113
@Override
public void onProcessingTime(long time) {
    // 从队列中取出任务,发送到下游
    while (true) {
        AppendCompactTask task = compactTasks.poll();
        if (task == null) {
            return;
        }
        output.collect(new StreamRecord<>(Either.Right(task)));
    }
}
7.2.2 Flink Job 构建

AppendTableCompact 类

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompact.java:87-128
public void build() {
    // 构建 Source
    DataStreamSource<AppendCompactTask> source = buildSource();
    
    // 如果配置了分区空闲时间,过滤任务
    if (!isContinuous && partitionIdleTime != null) {
        Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
        long historyMilli =
                LocalDateTime.now()
                        .minus(partitionIdleTime)
                        .atZone(ZoneId.systemDefault())
                        .toInstant()
                        .toEpochMilli();
        
        SingleOutputStreamOperator<AppendCompactTask> filterStream =
                source.filter(task -> {
                    BinaryRow partition = task.partition();
                    return partitionInfo.get(partition) <= historyMilli;
                });
        source = new DataStreamSource<>(filterStream);
    }

    // 构建完整的 Flink 作业
    sinkFromSource(source);
}

private DataStreamSource<AppendCompactTask> buildSource() {
    // 获取扫描间隔
    long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
    
    // 创建 Source
    AppendTableCompactSource source =
            new AppendTableCompactSource(table, isContinuous, scanInterval, partitionPredicate);

    return AppendTableCompactSource.buildSource(env, source, tableIdentifier);
}

7.3 主键表定时调度

7.3.1 CompactorSource

核心类CompactorSource 定时扫描表快照,发现需要 Compact 的 Bucket。

工作流程

  1. 定时扫描表快照(间隔由 continuous.discovery-interval 控制)
  2. 比较当前快照与上次快照,发现新增/修改的文件
  3. 找出需要 Compact 的 partition-bucket 组合
  4. 生成 Compact 记录发送到下游

关键配置

// paimon-api/src/main/java/org/apache/paimon/CoreOptions.java:464-468
public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
        key("continuous.discovery-interval")
                .durationType()
                .defaultValue(Duration.ofSeconds(10))
                .withDescription("The discovery interval of continuous reading.");
7.3.2 StoreCompactOperator

核心类StoreCompactOperator 接收 Compact 任务并执行。

// paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java:133-180
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
    RowData record = element.getValue();

    long snapshotId = record.getLong(0);
    BinaryRow partition = deserializeBinaryRow(record.getBinary(1));
    int bucket = record.getInt(2);
    byte[] serializedFiles = record.getBinary(3);
    List<DataFileMeta> files = dataFileMetaSerializer.deserializeList(serializedFiles);

    if (write.streamingMode()) {
        // 流式模式:通知新文件
        write.notifyNewFiles(snapshotId, partition, bucket, files);
    }

    // 记录需要 Compact 的 partition-bucket
    waitToCompact.add(Pair.of(partition, bucket));
}

@Override
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
        throws IOException {
    try {
        // 执行所有待 Compact 的 partition-bucket
        for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
            write.compact(partitionBucket.getKey(), partitionBucket.getRight(), fullCompaction);
        }
    } catch (Exception e) {
        throw new RuntimeException("Exception happens while executing compaction.", e);
    }
    waitToCompact.clear();

    // 准备提交
    List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
    return committables;
}

7.4 调度流程图

Committer CompactExecutor CompactManager StoreCompactOperator Enumerator CompactorSource 定时器 Committer CompactExecutor CompactManager StoreCompactOperator Enumerator CompactorSource 定时器 触发扫描(每10秒) discoverSplits() 扫描快照,发现新文件 返回 CompactTask 发送记录 write.compact(partition, bucket) triggerCompaction() 提交异步任务 执行文件合并 返回 CompactResult 更新 Levels prepareCommit() 提交新快照

八、关键配置项详解

8.1 定时调度配置

配置项 默认值 类型 说明
continuous.discovery-interval 10s Duration Compact 任务的扫描间隔,控制多久检查一次是否有新文件需要 Compact

调优建议

  • 高频写入场景:缩短间隔(如 5s),及时发现需要 Compact 的文件
  • 低频写入场景:延长间隔(如 30s-1min),减少不必要的扫描开销

8.2 Compact 策略配置

配置项 默认值 类型 说明
num-sorted-run.compaction-trigger 5 Integer 触发 Compact 的 Run 数量阈值
num-sorted-run.stop-trigger Integer.MAX_VALUE Integer 阻塞写入等待 Compact 的 Run 数量阈值
compaction.max-size-amplification-percent 200 Integer 最大空间放大百分比,超过此值触发全量 Compact
compaction.size-ratio 1 Integer 大小比例阈值,控制是否合并相邻的 Runs
compaction.optimization-interval Duration 定时全量 Compact 间隔
compaction.total-size-threshold MemorySize 触发全量 Compact 的总文件大小阈值

调优建议

1. 控制文件数量

-- 更激进的 Compact:Run 数量达到 3 就触发
CREATE TABLE my_table (...) WITH (
    'num-sorted-run.compaction-trigger' = '3'
);

2. 控制空间放大

-- 更严格的空间控制:100% 空间放大就触发
CREATE TABLE my_table (...) WITH (
    'compaction.max-size-amplification-percent' = '100'
);

3. 定时全量 Compact

-- 每天凌晨2点执行全量 Compact
CREATE TABLE my_table (...) WITH (
    'compaction.optimization-interval' = '24 h'
);

8.3 文件大小配置

配置项 默认值 类型 说明
target-file-size 128 MB MemorySize 目标文件大小,Compact 后的文件尽量接近此大小
compaction.file-size target-file-size MemorySize 触发 Compact 的文件大小阈值,小于此值的文件会被合并
compaction.min.file-num 5 Integer Append 表触发 Compact 的最小文件数

调优建议

-- 小表场景:减小文件大小,加快 Compact
CREATE TABLE small_table (...) WITH (
    'target-file-size' = '64 MB',
    'compaction.min.file-num' = '3'
);

-- 大表场景:增大文件大小,减少文件数量
CREATE TABLE large_table (...) WITH (
    'target-file-size' = '256 MB',
    'compaction.min.file-num' = '10'
);

8.4 Off-Peak Hours 配置

配置项 默认值 类型 说明
compaction.offpeak.start.hour -1 Integer 非高峰时段开始时间(小时,0-23),-1 表示禁用
compaction.offpeak.end.hour -1 Integer 非高峰时段结束时间(小时,0-23),-1 表示禁用
compaction.offpeak-ratio 0 Integer 非高峰时段的额外 Compact 比例

示例

-- 凌晨2点到6点使用更激进的 Compact 策略
CREATE TABLE my_table (...) WITH (
    'compaction.offpeak.start.hour' = '2',
    'compaction.offpeak.end.hour' = '6',
    'compaction.offpeak-ratio' = '20',
    'compaction.size-ratio' = '1'
);
-- 在非高峰时段,effective size-ratio = 1 + 20 = 21

8.5 其他重要配置

配置项 默认值 类型 说明
write-only false Boolean 是否仅写入不 Compact,适用于专门的 Compact 作业场景
compaction.delete-ratio-threshold 0.3 Double 删除率阈值,启用 Deletion Vector 时使用
prepare-commit.wait-compaction false Boolean Checkpoint 时是否等待 Compact 完成

专门 Compact 作业配置

-- 写表配置
CREATE TABLE my_table (...) WITH (
    'write-only' = 'true'  -- 禁用 Compact
);

-- Compact 作业配置
CALL sys.compact(
    `table` => 'default.my_table',
    `options` => 'continuous.discovery-interval=5s,sink.parallelism=20'
);

九、源码核心类总结

9.1 Compact 管理类

类名 路径 说明
CompactManager paimon-core/src/main/java/org/apache/paimon/compact/ Compact 管理器接口
MergeTreeCompactManager paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 主键表 Compact 管理器
BucketedAppendCompactManager paimon-core/src/main/java/org/apache/paimon/append/ Append 表(有 Bucket)Compact 管理器
AppendCompactCoordinator paimon-core/src/main/java/org/apache/paimon/append/ Append 表 Compact 协调器

9.2 Compact 策略类

类名 路径 说明
CompactStrategy paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ Compact 策略接口
UniversalCompaction paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ Universal Compaction 策略实现
FullCompactTrigger paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 定时全量 Compact 触发器
ForceUpLevel0Compaction paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 强制 Level 0 Compact 策略
OffPeakHours paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 非高峰时段配置

9.3 Compact 任务类

类名 路径 说明
CompactTask paimon-core/src/main/java/org/apache/paimon/compact/ Compact 任务抽象类
MergeTreeCompactTask paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 主键表 Compact 任务
FileRewriteCompactTask paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 文件重写任务(仅修改元数据)
AppendCompactTask paimon-core/src/main/java/org/apache/paimon/append/ Append 表 Compact 任务
CompactResult paimon-core/src/main/java/org/apache/paimon/compact/ Compact 结果
CompactUnit paimon-core/src/main/java/org/apache/paimon/compact/ Compact 单元(待合并的文件集合)

9.4 数据结构类

类名 路径 说明
Levels paimon-core/src/main/java/org/apache/paimon/mergetree/ LSM-Tree 层级结构
SortedRun paimon-core/src/main/java/org/apache/paimon/mergetree/ 有序文件集合
LevelSortedRun paimon-core/src/main/java/org/apache/paimon/mergetree/ 带 Level 信息的 SortedRun
IntervalPartition paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ 文件分区算法(处理重叠文件)

9.5 Flink 集成类

类名 路径 说明
AppendBypassCoordinateOperator paimon-flink/paimon-flink-common/.../flink/source/ Append 表 Compact 协调算子
StoreCompactOperator paimon-flink/paimon-flink-common/.../flink/sink/ 主键表 Compact 执行算子
CompactorSourceBuilder paimon-flink/paimon-flink-common/.../flink/source/ Compact Source 构建器
AppendTableCompact paimon-flink/paimon-flink-common/.../flink/compact/ Append 表 Compact 作业构建器
CompactAction paimon-flink/paimon-flink-common/.../flink/action/ Compact Action 实现

9.6 写入相关类

类名 路径 说明
MergeTreeWriter paimon-core/src/main/java/org/apache/paimon/mergetree/ 主键表写入器
AppendOnlyWriter paimon-core/src/main/java/org/apache/paimon/append/ Append 表写入器
AbstractFileStoreWrite paimon-core/src/main/java/org/apache/paimon/operation/ 文件存储写入抽象类
KeyValueFileStoreWrite paimon-core/src/main/java/org/apache/paimon/operation/ 主键表写入实现
BaseAppendFileStoreWrite paimon-core/src/main/java/org/apache/paimon/operation/ Append 表写入实现

十、最佳实践与优化建议

10.1 场景一:多写作业共享表

问题:多个作业同时写入同一张表,内联 Compact 会导致文件冲突。

解决方案:使用专门的 Compact 作业

配置

-- 写表配置:禁用 Compact
CREATE TABLE shared_table (
    id BIGINT,
    name STRING,
    dt STRING,
    PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
    'write-only' = 'true',
    'bucket' = '8'
);

-- 专门的 Compact 作业
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action.jar \
    compact \
    --warehouse hdfs:///warehouse \
    --database default \
    --table shared_table \
    --continuous \
    --table_conf continuous.discovery-interval=10s \
    --table_conf sink.parallelism=16

优点

  • 写作业之间无冲突
  • 资源隔离,Compact 不影响写入性能
  • 可以独立调整 Compact 并行度

10.2 场景二:大规模数据写入

问题:高吞吐写入导致 Level 0 文件数量激增,影响查询性能。

解决方案:调整 Compact 策略,加快 Compact 速度

配置

CREATE TABLE high_throughput_table (...) WITH (
    -- 减少触发阈值,更频繁地 Compact
    'num-sorted-run.compaction-trigger' = '3',
    
    -- 增大目标文件大小,减少文件数量
    'target-file-size' = '256 MB',
    
    -- 设置阻塞阈值,防止 Level 0 文件过多
    'num-sorted-run.stop-trigger' = '10',
    
    -- 使用更激进的空间放大控制
    'compaction.max-size-amplification-percent' = '150'
);

Compact 作业并行度

--table_conf sink.parallelism=32  # 增大并行度加快 Compact

10.3 场景三:低延迟查询要求

问题:Level 0 文件过多导致查询延迟较高。

解决方案:优化 Compact 策略,保持 Level 0 文件数量较少

配置

CREATE TABLE low_latency_table (...) WITH (
    -- 更激进的 Compact 触发
    'num-sorted-run.compaction-trigger' = '2',
    
    -- 定时全量 Compact,确保查询性能
    'compaction.optimization-interval' = '1 h',
    
    -- Checkpoint 时等待 Compact 完成
    'prepare-commit.wait-compaction' = 'true'
);

10.4 场景四:定时离峰 Compact

问题:白天业务高峰期 Compact 占用资源,影响写入和查询。

解决方案:配置 Off-Peak Hours,在夜间进行更激进的 Compact

配置

CREATE TABLE peak_sensitive_table (...) WITH (
    -- 白天使用温和的 Compact 策略
    'compaction.size-ratio' = '5',
    'num-sorted-run.compaction-trigger' = '10',
    
    -- 夜间2点到6点使用激进策略
    'compaction.offpeak.start.hour' = '2',
    'compaction.offpeak.end.hour' = '6',
    'compaction.offpeak-ratio' = '30'
);

效果

  • 白天:effective size-ratio = 5,Compact 触发较少
  • 夜间:effective size-ratio = 5 + 30 = 35,Compact 更频繁

10.5 场景五:Append 表小文件过多

问题:Append 表写入产生大量小文件。

解决方案:调整文件打包策略

配置

CREATE TABLE append_table (
    id BIGINT,
    data STRING,
    dt STRING
) PARTITIONED BY (dt) WITH (
    -- 降低最小文件数要求
    'compaction.min.file-num' = '3',
    
    -- 减小 Compact 文件大小阈值
    'compaction.file-size' = '64 MB',
    
    -- 增大目标文件大小
    'target-file-size' = '256 MB',
    
    -- 缩短扫描间隔
    'continuous.discovery-interval' = '5s'
);

10.6 监控指标

关键指标

  1. Level 0 文件数level0.file.count

    • 正常范围:< 10
    • 告警阈值:> 20
  2. Compact 延迟compaction.time.ms

    • 正常范围:< 5min
    • 告警阈值:> 15min
  3. Compact 队列长度compactions.queued.count

    • 正常范围:< 5
    • 告警阈值:> 10
  4. 总文件大小total.file.size.bytes

    • 监控空间放大情况

Flink Metrics

// paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
public interface Reporter {
    void reportLevel0FileCount(long count);
    void reportCompactionTime(long milliseconds);
    void increaseCompactionsQueuedCount();
    void increaseCompactionsCompletedCount();
    void reportTotalFileSize(long bytes);
}

10.7 故障排查

问题1:Compact 任务失败

排查步骤

  1. 检查日志中的异常信息
  2. 确认是否有并发写冲突
  3. 检查资源是否充足(内存、磁盘)

解决方案

  • 启用 write-only 模式,使用专门的 Compact 作业
  • 增加 Compact 作业的资源配置

问题2:Compact 速度慢

排查步骤

  1. 检查 compactions.queued.count 指标
  2. 查看 Compact 任务的并行度
  3. 确认文件大小和数量

解决方案

  • 增大 Compact 并行度:sink.parallelism
  • 调整策略参数:降低 num-sorted-run.compaction-trigger
  • 增大文件大小:target-file-size

问题3:内存溢出(OOM)

排查步骤

  1. 检查 Compact 任务的堆内存使用
  2. 查看单个 Compact 任务处理的文件数量
  3. 确认是否有大文件

解决方案

  • 增大 TaskManager 内存
  • 减小 target-file-size
  • 限制单次 Compact 的文件数量

总结

Paimon 的定时 Compact 机制是保证数据湖性能的关键组件。本文详细分析了:

  1. 三种触发方式:内联 Compact、定时 Compact Job、手动触发
  2. Append 表流程:基于 AppendCompactCoordinator 的文件扫描、打包和任务生成
  3. 主键表流程:基于 LSM-Tree 的 Levels 管理和 MergeTreeCompactManager
  4. UniversalCompaction 策略:四级策略(定时全量、空间放大、大小比例、文件数量)
  5. 定时调度机制:基于 Flink 的流式处理框架,使用 ScheduledExecutorService 和 ProcessingTimeService
  6. 关键配置项:扫描间隔、策略参数、文件大小、Off-Peak Hours 等
  7. 最佳实践:多写作业、大规模写入、低延迟查询、离峰 Compact、小文件处理等场景的优化方案

通过合理配置和监控,可以在写入性能、查询性能和存储成本之间取得良好的平衡。


参考资料

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐