删除快照基本情况

Paimon Snapshot 的删除主要发生在以下几种情况:

快照过期 (Snapshot Expiration)

这是最常见的情况。Paimon 不会立即删除旧的快照,而是通过一个“过期”机制来清理它们。这个过程会删除快照文件以及这些快照所引入但后续快照不再使用的数据文件。

正如 docs/content/learn-paimon/understand-files.md 中提到的:

Files are only really deleted when Snapshot is expired... Flink writer will automatically expire snapshots. (文件仅在快照过期时才会被真正删除... Flink writer 会自动使快照过期。)

这个过程是自动的,并且可以通过一些表属性来配置,其核心逻辑在 paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 中实现。主要受以下参数控制:

  • snapshot.num-retained.min: 保留的最小快照数量。
  • snapshot.num-retained.max: 保留的最大快照数量。
  • snapshot.time-retained: 快照的最短保留时间。

CoreOptions.java相关配置

        public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
            key("snapshot.num-retained.min")
                    .intType()
                    .defaultValue(10)
                    .withDescription(
                            "The minimum number of completed snapshots to retain. Should be greater than or equal to 1.");

    @Documentation.OverrideDefault("infinite")
    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX =
            key("snapshot.num-retained.max")
                    .intType()
                    .defaultValue(Integer.MAX_VALUE)
                    .withDescription(
                            "The maximum number of completed snapshots to retain. Should be greater than or equal to the minimum number.");

    public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
            key("snapshot.time-retained")
                    .durationType()
                    .defaultValue(Duration.ofHours(1))
                    .withDescription("The maximum time of completed snapshots to retain.");


    public static final ConfigOption<ExpireExecutionMode> SNAPSHOT_EXPIRE_EXECUTION_MODE =
            key("snapshot.expire.execution-mode")
                    .enumType(ExpireExecutionMode.class)
                    .defaultValue(ExpireExecutionMode.SYNC)
                    .withDescription("Specifies the execution mode of expire.");

    public static final ConfigOption<Integer> SNAPSHOT_EXPIRE_LIMIT =
            key("snapshot.expire.limit")
                    .intType()
                    .defaultValue(50)
                    .withDescription(
                            "The maximum number of snapshots allowed to expire at a time.");

过期的基本逻辑是:

  1. 确定哪些快照可以被删除。一个快照成为候选者,通常需要同时满足:它的存在时间超过了 snapshot.time-retained,并且当前总快照数超过了 snapshot.num-retained.max
  2. 系统会确保至少保留 snapshot.num-retained.min 个快照。
  3. 正在被消费者(Consumer)读取的快照不会被删除,以保证消费任务的稳定。
  4. 最后,调用 snapshotManager.deleteSnapshot(id) 方法删除快照文件本身,并清理相关的数据文件。

ExpireSnapshotsImpl.java

// ... existing code ...
    @Override
    public int expire() {
// ... existing code ...
        int retainMax = expireConfig.getSnapshotRetainMax();
        int retainMin = expireConfig.getSnapshotRetainMin();
// ... existing code ...
        long olderThanMills =
                System.currentTimeMillis() - expireConfig.getSnapshotTimeRetain().toMillis();

        Long latestSnapshotId = snapshotManager.latestSnapshotId();
// ... existing code ...
        // the min snapshot to retain from 'snapshot.num-retained.max'
        // (the maximum number of snapshots to retain)
        long min = Math.max(latestSnapshotId - retainMax + 1, earliest);

        // the max exclusive snapshot to expire until
        // protected by 'snapshot.num-retained.min'
        // (the minimum number of completed snapshots to retain)
        long maxExclusive = latestSnapshotId - retainMin + 1;

        // the snapshot being read by the consumer cannot be deleted
        maxExclusive =
                Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));

// ... existing code ...
        // delete snapshot file finally
        for (long id = beginInclusiveId; id < endExclusiveId; id++) {
// ... existing code ...
            snapshotManager.deleteSnapshot(id);
        }

        writeEarliestHint(endExclusiveId);
        return (int) (endExclusiveId - beginInclusiveId);
// ... existing code ...

    回滚操作 (Rollback)

    当执行回滚(Rollback)操作到某个特定的快照或标签(Tag)时,所有在该快照之后生成的快照都将被删除。这是一个主动的操作。

    /paimon/table/RollbackHelper.java 中的逻辑展示了这一点:

    // ... existing code ...
        private List<Snapshot> updateLatestAndCleanSnapshotsDataFiles(Snapshot retainedSnapshot) {
    // ... existing code ...
            long latest =
                    checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find latest snapshot.");
    
    // ... existing code ...
            // delete snapshot files first, cannot be read now
            // it is possible that some snapshots have been expired
            List<Snapshot> toBeCleaned = new ArrayList<>();
            long to = Math.max(earliest, retainedSnapshot.id() + 1);
            for (long i = latest; i >= to; i--) {
                // Ignore the non-existent snapshots
                if (snapshotManager.snapshotExists(i)) {
                    toBeCleaned.add(snapshotManager.snapshot(i));
                    snapshotManager.deleteSnapshot(i);
                }
            }
    
            // delete data files of snapshots
    // ... existing code ...
    

    分支合并 (Branch Merge)

    在进行分支管理时,例如将一个 branch 合并回 main,可能会涉及到删除 main 分支上在分支创建点之后的快照,然后拷贝分支上的快照过来。

    paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java 中可以看到相关逻辑:

    FileSystemBranchManager.java

    // ... existing code ...
            try {
                // Delete snapshot, schema, and tag from the main branch which occurs after
                // earliestSnapshotId
                List<Path> deleteSnapshotPaths =
                        snapshotManager.snapshotPaths(id -> id >= earliestSnapshotId);
    // ... existing code ...
                fileIO.deleteFilesQuietly(deletePaths);
    // ... existing code ...
    

    特别说明:标签 (Tag) 的作用

    值得注意的是,可以为一个快照创建标签(Tag)。被打了标签的快照及其相关的数据文件不会被常规的“快照过期”机制删除,从而可以长期保留特定的历史版本。只有当这个标签被手动删除,并且没有其他标签指向这个快照时,相关的数据文件才会被清理。

    总结来说,Paimon Snapshot 的删除主要是通过自动过期机制手动回滚/分支操作来完成的,而 标签(Tag) 可以保护快照不被自动过期。

    谁执行expire

    AbstractFileStoreTable会创建ExpireSnapshotsImpl交给 Tablecommit,

    @Override
        public TableCommitImpl newCommit(String commitUser) {
            CoreOptions options = coreOptions();
            return new TableCommitImpl(
                    store().newCommit(commitUser, this),
                    newExpireRunnable(),
                    options.writeOnly() ? null : store().newPartitionExpire(commitUser, this),
                    options.writeOnly() ? null : store().newTagAutoManager(this),
                    CoreOptions.fromMap(options()).consumerExpireTime(),
                    new ConsumerManager(fileIO, path, snapshotManager().branch()),
                    options.snapshotExpireExecutionMode(),
                    name(),
                    options.forceCreatingSnapshot());
        }
    
        @Nullable
        protected Runnable newExpireRunnable() {
            CoreOptions options = coreOptions();
            Runnable snapshotExpire = null;
    
            if (!options.writeOnly()) {
                boolean changelogDecoupled = options.changelogLifecycleDecoupled();
                ExpireConfig expireConfig = options.expireConfig();
                ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig);
                ExpireSnapshots expireSnapshots = newExpireSnapshots().config(expireConfig);
                snapshotExpire =
                        () -> {
                            expireSnapshots.expire();
                            if (changelogDecoupled) {
                                expireChangelog.expire();
                            }
                        };
            }
    
            return snapshotExpire;
        }

    TableCommitImpl 快照过期处理与线程模型

    TableCommitImpl 在构造函数中接收一个 expireSnapshots 参数,该参数是快照过期的核心组件:

    public TableCommitImpl(
        FileStoreCommit commit,
        @Nullable Runnable expireSnapshots,
        @Nullable PartitionExpire partitionExpire,
        @Nullable TagAutoManager tagAutoManager,
        @Nullable Duration consumerExpireTime,
        ConsumerManager consumerManager,
        ExpireExecutionMode expireExecutionMode,
        String tableName,
        boolean forceCreatingSnapshot) { 
        // ...
        this.expireSnapshots = expireSnapshots;
        // ...
    }

    快照过期主要在以下两种情况下被触发:

    1. 常规提交后自动触发

    public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        if (overwritePartition == null) {
            for (ManifestCommittable committable : committables) {
                commit.commit(committable, checkAppendFiles);
            }
            if (!committables.isEmpty()) {
                expire(committables.get(committables.size() - 1).identifier(), expireMainExecutor);
            }
        } else {
            // ...
            commit.overwrite(overwritePartition, committable, Collections.emptyMap());
            expire(committable.identifier(), expireMainExecutor);
        }
    }

    2. 手动触发

    public void expireSnapshots() {
        if (expireSnapshots != null) {
            expireSnapshots.run();
        }
    }

    完整的过期流程

    当调用 expire() 方法时,会按顺序执行完整的过期流程:

    private void expire(long partitionExpireIdentifier) {
        // 1. 首先过期消费者,避免阻碍快照过期
        if (consumerExpireTime != null) {
            consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
        }
    
        // 2. 执行快照过期
        expireSnapshots();
    
        // 3. 执行分区过期
        if (partitionExpire != null) {
            partitionExpire.expire(partitionExpireIdentifier);
        }
    
        // 4. 执行标签管理
        if (tagAutoManager != null) {
            tagAutoManager.run();
        }
    }

    线程模型分析

    1. 执行模式配置

    TableCommitImpl 支持两种执行模式,通过 ExpireExecutionMode 枚举进行控制:

    this.expireMainExecutor = expireExecutionMode == ExpireExecutionMode.SYNC
        ? MoreExecutors.newDirectExecutorService()
        : Executors.newSingleThreadExecutor(
            new ExecutorThreadFactory(Thread.currentThread().getName() + "expire-main-thread")
        );

    两种模式的区别:

    模式 实现方式 特点
    SYNC MoreExecutors.newDirectExecutorService() 任务在调用线程中直接执行,同步阻塞
    ASYNC Executors.newSingleThreadExecutor(...) 创建单线程执行器,在独立后台线程中执行任务

    2. 线程池的使用

    过期任务通过 expireMainExecutor 执行,相关方法如下:

    private void expire(long partitionExpireIdentifier, ExecutorService executor) {
        if (expireError.get() != null) {
            throw new RuntimeException(expireError.get());
        }
        executor.execute(() -> {
            try {
                expire(partitionExpireIdentifier);
            } catch (Throwable t) {
                LOG.error("Executing expire encountered an error.", t);
                expireError.compareAndSet(null, t);
            }
        });
    }

    该设计具有如下优势:

    • ​异步执行​​:避免阻塞主提交线程,提升提交性能;
    • ​错误隔离​​:通过 expireError 原子引用捕获异常,防止过期任务异常影响主流程;
    • ​单线程执行​​:保证过期操作的顺序性,避免并发问题。

    3. 线程安全机制

    通过 AtomicReference<Throwable> expireError 来处理异常:

    • 使用原子操作保证线程安全;
    • 一旦发生错误,后续的过期操作将被阻止;
    • 错误会在下次尝试执行过期时抛出。

    整体工作流程总结

    1. ​提交数据​​:首先执行数据提交操作;
    2. ​触发过期​​:提交完成后,根据配置的执行模式触发过期任务;
    3. ​异步处理​​:若配置为异步模式,过期任务将在独立线程中执行;
    4. ​多层过期​​:依次进行以下处理:
      • 消费者过期
      • 快照过期
      • 分区过期
      • 标签管理
    5. ​错误处理​​:任一过期阶段出现异常都将被捕获,并阻止后续过期操作的执行。

    删除快照怎么保证读取任务不会读到空

    首先 Paimon有消费者机制,expire不会过期消费者消费的快照。

    对于lookup来说,当本地存在元数据但快照被删除时,Paimon 通过以下机制处理:

    当快照过期并被删除时,系统会通过 Levels 类的 update 方法识别出哪些文件需要被删除。这些文件会被通知给所有注册的 DropFileCallback,其中包括 LookupLevels

    LookupLevels 实现了 DropFileCallback 接口,其 notifyDropFile 方法会从本地缓存(lookupFileCache)中移除对应的文件条目。这样,当后续查询需要该文件时,会发现缓存中已不存在,从而触发重新创建。

    @Override
    public void notifyDropFile(String file) {
        lookupFileCache.invalidate(file); // 从缓存中移除
    }

    LookupLevels.lookup 方法中,如果发现缓存中没有对应的 LookupFile,会调用 createLookupFile 方法重新从远程文件创建本地缓存。如果此时远程文件也已被删除,则会在 createLookupFile 过程中抛出异常。

    @Nullable
    private T lookup(InternalRow key, DataFileMeta file) throws IOException {
        LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
    
        boolean newCreatedLookupFile = false;
        if (lookupFile == null) {
            lookupFile = createLookupFile(file); // 重新创建
            newCreatedLookupFile = true;
        }
        // ... rest of the method
    }

    如果在 createLookupFile 过程中无法访问远程文件(因为快照已被删除),会抛出 Exception,上层调用者需要处理这种异常情况。

    private LookupFile createLookupFile(DataFileMeta file) throws IOException {
            File localFile = localFileFactory.apply(file.fileName());
            if (!localFile.createNewFile()) {
                throw new IOException("Can not create new file: " + localFile);
            }
            LookupStoreWriter kvWriter =
                    lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount()));
            LookupStoreFactory.Context context;
            try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
                KeyValue kv;
                if (valueProcessor.withPosition()) {
                    FileRecordIterator<KeyValue> batch;
                    while ((batch = (FileRecordIterator<KeyValue>) reader.readBatch()) != null) {
                        while ((kv = batch.next()) != null) {
                            byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
                            byte[] valueBytes =
                                    valueProcessor.persistToDisk(kv, batch.returnedPosition());
                            kvWriter.put(keyBytes, valueBytes);
                        }
                        batch.releaseBatch();
                    }
                } else {
                    RecordReader.RecordIterator<KeyValue> batch;
                    while ((batch = reader.readBatch()) != null) {
                        while ((kv = batch.next()) != null) {
                            byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
                            byte[] valueBytes = valueProcessor.persistToDisk(kv);
                            kvWriter.put(keyBytes, valueBytes);
                        }
                        batch.releaseBatch();
                    }
                }
            } catch (IOException e) {
    // 这里进行删除
                FileIOUtils.deleteFileOrDirectory(localFile);
                throw e;
            } finally {
                context = kvWriter.close();
            }

    这里的异常不是IOException,而是OutOfRangeException

    /** Indicates that the snapshot to be consumed has been deleted from storage. */
    public class OutOfRangeException extends RuntimeException {
        private static final long serialVersionUID = 1L;
    
        public OutOfRangeException(String msg) {
            super(msg);
        }
    }

    这个异常直到FileStoreLookupFunction层面才被捕获

        public Collection<RowData> lookup(RowData keyRow) {
            try {
                tryRefresh();
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("lookup key:{}", keyRow.toString());
                }
                InternalRow key = new FlinkRowWrapper(keyRow);
                if (partitionLoader == null) {
                    return lookupInternal(key);
                }
    
                if (partitionLoader.partitions().isEmpty()) {
                    return Collections.emptyList();
                }
    
                List<RowData> rows = new ArrayList<>();
                for (BinaryRow partition : partitionLoader.partitions()) {
                    rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
                }
                return rows;
            } catch (OutOfRangeException | ReopenException e) {
                reopen();
                return lookup(keyRow);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

    ​总结​​:
    这种设计确保了即使快照被删除,系统也能通过 ​​缓存失效​​ 和 ​​重新创建机制​​ 来处理本地元数据,同时在无法获取数据时通过 ​​异常​​ 通知上层应用。

    expire 过程

    ExpireSnapshotsImpl.java 的 expire和expireUntil

    • expire() 是决策者:它的主要职责是根据各种配置和当前系统状态(比如消费者位点),计算出本次可以安全删除到哪个快照。它决定了“做什么”,但它自己不执行具体的删除操作。
    • expireUntil() 是执行者:它接收 expire() 方法的决策结果(一个快照范围),然后严格按照安全顺序执行所有文件的删除工作,包括数据文件(SST)、清单文件(manifest)和快照元数据文件。它负责“怎么做”。

    下面我们来详细分解这两个方法的执行流程。

    第一步:expire() 方法 - 决策者(“做什么”)

    expire() 方法的目标是计算出一个安全的、可以被删除的快照区间的上边界,我们称之为 maxExclusive。它综合考虑了以下几个核心约束,取其中最严格(即最小)的值:

    1. 最小保留数 (snapshot.num-retained.min): 这是最硬性的约束。比如最新快照是 100retainMin 是 10,那么无论如何都要保留 91 到 100 这10个快照。所以 maxExclusive 最多只能是 91 (100 - 10 + 1)。

    2. 消费者位点 (consumerManager.minNextSnapshot()): 为了保证数据读取的正确性,系统不能删除任何正在被消费者读取或即将读取的快照。consumerManager 会找到所有消费者中最小的那个快照ID,maxExclusive 不能超过这个ID。

    3. 单次最大删除数 (snapshot.expire.limit): 为了防止一次性删除过多快照导致长时间的IO操作,这个参数限制了单次操作能删除的最大数量。比如 earliest 是 1maxDeletes 是 50,那么 maxExclusive 不能超过 51 (1 + 50)。

    4. 最大保留数 (snapshot.num-retained.max): 这个参数用来确定一个检查的起点 min。比如最新快照是 100retainMax 是 30,那么从 71 (100 - 30 + 1) 之前的快照都可能被删除。

    5. 保留时间 (snapshot.time-retained): 这是基于时间的约束。expire() 方法会从 min 开始遍历到 maxExclusive,检查每个快照的创建时间。如果发现某个快照的创建时间还在保留期内,那么这个快照以及它之后的所有快照都不能被删除。此时,expire() 会提前退出循环,并把这个快照的ID作为新的 maxExclusive 传给 expireUntil

    expire() 方法的最后一行 return expireUntil(earliest, maxExclusive); 的含义是

    “我已经根据所有规则(保留数、消费者、时间等)计算出了一个绝对安全的、可以被删除的快照范围的终点 maxExclusive。现在,expireUntil,请你去把从 earliest 开始到这个 maxExclusive(不含)的快照以及它们相关的所有文件都清理掉。”

    第二步:expireUntil方法 - 执行者(“怎么做”)

    这个方法接收 expire() 方法计算出的指令(即要删除的快照范围),并负责执行具体的、有序的删除操作。它的逻辑非常严谨,以确保数据安全:

    1. 确定实际删除范围: 它会从 endExclusiveId - 1 向前遍历,找到第一个实际存在的快照,以防止因中间有快照缺失而导致错误。
    2. 获取受保护的快照: 首先获取所有被打了标签(Tag)的快照列表 taggedSnapshots,这些快照引用的文件是不能被删除的。
    3. 删除数据文件 (SST): 遍历待删除的快照,对于每个快照,构建一个包含所有受保护文件(来自 taggedSnapshots 和后续快照)的“白名单”,然后调用 snapshotDeletion.cleanUnusedDataFiles() 来删除那些在这个白名单里的数据文件。
    4. 删除Changelog文件: 如果需要,删除相关的Changelog文件。
    5. 删除清单文件 (Manifest): 再次构建一个包含所有受保护文件的“白名单”,然后调用 snapshotDeletion.cleanUnusedManifests() 来删除不再需要的清单文件。
    6. 最后,删除快照元数据文件: 在所有相关的数据和清单文件都被安全清理后,才循环调用 snapshotManager.deleteSnapshot(id) 删除 snapshot-xxx 这个元数据文件。

    Logo

    有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

    更多推荐