Paimon DataFileMeta、Manifest、Manifest List、SnapShot: 元数据序列化和内存占用
本文详细分析了Paimon中各类元数据文件的序列化机制和存储结构。主要内容包括: ManifestEntry使用嵌套序列化方式,将DataFileMeta作为内部字段处理,通过ManifestEntrySerializer完成转换。 ManifestFile存储变更记录,采用层级结构:Snapshot→ManifestList→ManifestFileMeta→ManifestFile→Manif
DataFileMeta
可以把它理解为 Paimon 表中每一个数据文件(Data File)的“身份证”。它详细记录了一个数据文件的所有关键信息,这些信息被用于查询优化、MVCC(多版本并发控制)、Compaction(合并)等核心功能。
DataFileMeta
的核心作用是描述一个数据文件的静态属性。Paimon 将一张表的数据存储在多个文件中(例如 Parquet、ORC 格式),DataFileMeta
就是这些文件在 Paimon 系统中的元数据表示。
其设计思想是不可变性(Immutability)。你会发现 DataFileMeta
的所有字段都是 final
的,并且没有 setter
方法。任何对 DataFileMeta
的修改(如 upgrade
、rename
)都会返回一个新的 DataFileMeta
对象,而不是修改自身。这种设计在分布式和并发环境中能极大地简化状态管理,保证线程安全。
核心字段(属性)分析
DataFileMeta
的字段可以分为几类,我们逐一来看。
// ...
public class DataFileMeta {
// ...
private final String fileName;
private final long fileSize;
// total number of rows (including add & delete) in this file
private final long rowCount;
private final @Nullable Long deleteRowCount;
private final BinaryRow minKey;
private final BinaryRow maxKey;
private final SimpleStats keyStats;
private final SimpleStats valueStats;
private final long minSequenceNumber;
private final long maxSequenceNumber;
private final long schemaId;
private final int level;
private final List<String> extraFiles;
private final Timestamp creationTime;
private final @Nullable byte[] embeddedIndex;
private final @Nullable FileSource fileSource;
private final @Nullable String externalPath;
// ...
}
a. 基础信息
fileName
: 文件名。Paimon 会根据它定位到物理存储上的文件。fileSize
: 文件大小(字节)。可用于任务切分、成本估算等。rowCount
: 文件中的总行数。deleteRowCount
: 文件中标记为删除的行数。Paimon 的Changelog-with-upsert
表支持DELETE
操作,这个字段用于记录删除的行数。addRowCount
可以通过rowCount - deleteRowCount
计算得出。
b. 统计信息(用于查询优化)
minKey
/maxKey
: 文件中主键的最小值和最大值。这是查询优化的关键。例如,当查询条件为pk = 100
时,如果某个文件的minKey
是 101,maxKey
是 200,Paimon 就可以直接跳过扫描这个文件。keyStats
/valueStats
: 更详细的列统计信息,由SimpleStats
类表示,通常包含列的min/max
值和null
计数。这使得 Paimon 可以对非主键列也进行谓词下推(Predicate Pushdown),进一步减少需要扫描的数据量。
c. MVCC 和 LSM-Tree 信息
minSequenceNumber
/maxSequenceNumber
: 文件中记录的最小和最大序列号。SequenceNumber
是 Paimon 实现 MVCC 的核心,每次提交(Commit)都会生成一个新的、递增的序列号。查询时,Paimon 会根据指定的snapshot
(对应一个SequenceNumber
)来决定哪些文件是可见的。schemaId
: 该文件使用的数据模式(Schema)的 ID。Paimon 支持 Schema Evolution(模式演进),这个字段确保了在读取旧文件时能找到正确的 Schema 来解析数据。level
: 文件在 LSM-Tree(Log-Structured Merge-Tree)结构中所处的层级。Paimon 使用 LSM 结构来优化写入。level = 0
的文件是新写入的,通常较小且数量多。后台的 Compaction 任务会不断将低层级的文件合并(merge)成高层级的、更大的文件,以优化读取性能。upgrade()
方法就是用于在文件被合并到更高层级时更新这个元信息。
d. 其他辅助信息
extraFiles
: 额外关联的文件列表。早期版本用于存储changelog
文件,现在用途较少。creationTime
: 文件的创建时间戳。embeddedIndex
: 内嵌的文件索引。对于小的索引(如 Bloom Filter),可以直接存储在元数据中,避免一次额外的 I/O。fileSource
: 文件的来源信息,用于追踪数据血缘。externalPath
: 文件的外部路径。如果该字段不为null
,表示文件存储在默认仓库路径之外。
序列化 Schema (SCHEMA
)
DataFileMeta
对象本身是存在于 JVM 内存中的。为了持久化,Paimon 需要将它写入到 manifest
文件中。SCHEMA
字段就定义了 DataFileMeta
对象和持久化格式(BinaryRow
)之间的映射关系。
// ...
public class DataFileMeta {
public static final RowType SCHEMA =
new RowType(
false,
Arrays.asList(
new DataField(0, "_FILE_NAME", newStringType(false)),
new DataField(1, "_FILE_SIZE", new BigIntType(false)),
new DataField(2, "_ROW_COUNT", new BigIntType(false)),
new DataField(3, "_MIN_KEY", newBytesType(false)),
new DataField(4, "_MAX_KEY", newBytesType(false)),
new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA),
new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA),
new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)),
new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)),
new DataField(9, "_SCHEMA_ID", new BigIntType(false)),
new DataField(10, "_LEVEL", new IntType(false)),
// ... 其他字段
));
// ...
}
这个 SCHEMA
定义了 DataFileMeta
在 manifest 文件中的存储结构,每个 DataField
对应一个类的属性。这使得 Paimon 可以像读写普通表数据一样来读写元数据。
构造函数与核心方法
- 构造函数:
DataFileMeta
提供了多个重载的构造函数,以方便在不同场景下创建实例。例如,forAppend()
工厂方法用于创建一个适用于append-only
表的文件元数据,此时minKey
、maxKey
等字段会使用默认的空值。 - 核心方法:
upgrade(int newLevel)
: 当文件被 Compaction 到更高层级时调用,返回一个level
更新后的新DataFileMeta
对象。rename(String newFileName)
: 重命名文件时调用,返回一个fileName
和externalPath
更新后的新对象。addRowCount()
/deleteRowCount()
: 计算并返回Optional<Long>
类型的新增行数和删除行数。fileFormat()
: 从文件名中解析出文件格式(如orc
,parquet
)。toString()
/equals()
/hashCode()
: 标准的 Java 方法,用于调试、日志打印和在集合中进行比较。
总结
DataFileMeta
是 Paimon 数据湖存储引擎的基石之一。它通过一个不可变的、结构化的对象,精确描述了底层数据文件的全貌。这些丰富的元信息是 Paimon 实现高性能查询(谓词下推)、ACID 事务(通过 MVCC)和后台自动优化(LSM Compaction)等高级功能的技术保障。理解 DataFileMeta
的结构和作用,是深入理解 Paimon 工作原理的关键一步。
ManifestEntry
ManifestEntry
代表了清单文件(manifest file)中的一个条目。每个 ManifestEntry
对象都记录了一次对数据文件的原子操作,这个操作可以是添加一个文件,也可以是删除一个文件。
简单来说,ManifestEntry
的作用就是跟踪数据文件的变化。ManifestEntry
包含以下关键信息:
kind
: 表示这个条目的类型,是ADD
(添加)还是DELETE
(删除)。partition
: 数据文件所属的分区信息。bucket
: 数据文件所属的桶(bucket)的 ID。totalBuckets
: 总共有多少个桶。file
: 一个DataFileMeta
对象,包含了被添加或删除的数据文件的详细元数据,例如:- 文件名
- 文件大小
- 数据行数
- key的范围
- 等等
ManifestEntry
的序列化和反序列化主要由 ManifestEntrySerializer
类来完成。它将一个 ManifestEntry
Java 对象转换成 Paimon 内部的行存格式(InternalRow
),DataFileMeta
对象作为其内嵌的一个字段也被序列化。
ManifestEntry
的序列化入口
在 ManifestEntry.java
中,提供了 toBytes()
方法作为序列化的入口,它通过一个 ThreadLocal
的 ManifestEntrySerializer
实例来执行序列化操作。
// ... existing code ...
// ----------------------- Serialization -----------------------------
private static final ThreadLocal<ManifestEntrySerializer> SERIALIZER_THREAD_LOCAL =
ThreadLocal.withInitial(ManifestEntrySerializer::new);
public byte[] toBytes() throws IOException {
return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
}
public ManifestEntry fromBytes(byte[] bytes) throws IOException {
return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
}
}
ManifestEntry
的 Schema 定义
ManifestEntry
的序列化结构由其 SCHEMA
字段定义。它是一个 RowType
,包含了5个字段。请注意,第5个字段 _FILE
的类型是 DataFileMeta.SCHEMA
,这表明 DataFileMeta
是作为一个嵌套结构存在的。
// ... existing code ...
public class ManifestEntry implements FileEntry {
public static final RowType SCHEMA =
new RowType(
false,
Arrays.asList(
new DataField(0, "_KIND", new TinyIntType(false)),
new DataField(1, "_PARTITION", newBytesType(false)),
new DataField(2, "_BUCKET", new IntType(false)),
new DataField(3, "_TOTAL_BUCKETS", new IntType(false)),
new DataField(4, "_FILE", DataFileMeta.SCHEMA)));
// ... existing code ...
核心序列化逻辑 ManifestEntrySerializer
ManifestEntrySerializer
中的 convertTo
方法是序列化的核心。它接收一个 ManifestEntry
对象,并返回一个 InternalRow
。
kind
、partition
、bucket
、totalBuckets
被直接或经过简单转换后设置到GenericRow
的前4个字段。file
字段(DataFileMeta
对象):entry.file()
获取到DataFileMeta
对象,然后通过dataFileMetaSerializer.toRow(entry.file())
将其也序列化成一个InternalRow
。这个InternalRow
被设置到GenericRow
的第5个字段。
这样,DataFileMeta
就被作为嵌套行包含在了 ManifestEntry
的序列化结果中。
// ... existing code ...
@Override
public InternalRow convertTo(ManifestEntry entry) {
GenericRow row = new GenericRow(5);
row.setField(0, entry.kind().toByteValue());
row.setField(1, serializeBinaryRow(entry.partition()));
row.setField(2, entry.bucket());
row.setField(3, entry.totalBuckets());
row.setField(4, dataFileMetaSerializer.toRow(entry.file()));
return row;
}
@Override
public ManifestEntry convertFrom(int version, InternalRow row) {
// ... existing code ...
return new ManifestEntry(
FileKind.fromByteValue(row.getByte(0)),
deserializeBinaryRow(row.getBinary(1)),
row.getInt(2),
row.getInt(3),
dataFileMetaSerializer.fromRow(row.getRow(4, dataFileMetaSerializer.numFields())));
}
// ... existing code ...
DataFileMeta
的 Schema
DataFileMeta
自身也是一个复杂的对象,它的序列化结构由 DataFileMeta.SCHEMA
定义,包含了文件名、大小、行数、统计信息等多个字段。
// ... existing code ...
public static final RowType SCHEMA =
new RowType(
false,
Arrays.asList(
new DataField(0, "_FILE_NAME", newStringType(false)),
new DataField(1, "_FILE_SIZE", new BigIntType(false)),
new DataField(2, "_ROW_COUNT", new BigIntType(false)),
new DataField(3, "_MIN_KEY", newBytesType(false)),
new DataField(4, "_MAX_KEY", newBytesType(false)),
new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA),
new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA),
new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)),
new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)),
new DataField(9, "_SCHEMA_ID", new BigIntType(false)),
new DataField(10, "_LEVEL", new IntType(false)),
new DataField(
11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))),
new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()),
new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)),
new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)),
new DataField(15, "_FILE_SOURCE", new TinyIntType(true)),
new DataField(
16,
"_VALUE_STATS_COLS",
DataTypes.ARRAY(DataTypes.STRING().notNull())),
new DataField(17, "_EXTERNAL_PATH", newStringType(true))));
// ... existing code ...
总结
ManifestEntry
的序列化是一个嵌套的过程:
ManifestEntrySerializer
负责将ManifestEntry
对象转换为一个InternalRow
。- 在这个过程中,对于
ManifestEntry
中的DataFileMeta
成员变量,它会调用DataFileMetaSerializer
将其也转换为一个InternalRow
。 - 最终,
DataFileMeta
的InternalRow
作为ManifestEntry
的InternalRow
的一个字段被存储,实现了嵌套序列化。
ManifestFile 的结构
ManifestFile
本身是一个文件。它的核心作用是记录一个快照(Snapshot)中数据文件的变更信息。你可以把它理解成一个清单文件。
从类的定义来看:
public class ManifestFile extends ObjectsFile<ManifestEntry> {
//...
}
-
内部结构:
ManifestFile
继承了ObjectsFile<ManifestEntry>
,这意味着一个ManifestFile
文件中存储了一个或多个ManifestEntry
对象。 -
ManifestEntry
: 每个ManifestEntry
对象代表了一条对数据文件的变更记录。它主要包含以下信息:kind
: 变更类型,ADD
(新增文件)或DELETE
(删除文件)。partition
: 数据文件所属的分区。bucket
: 数据文件所属的分桶。file
: 文件的元数据(DataFileMeta
),包含了文件名、大小、schema ID 等详细信息。
-
ManifestFileMeta
: 当一个ManifestFile
被写入后,会生成一个对应的ManifestFileMeta
对象。这个对象是ManifestFile
的摘要信息,包含了文件名、文件大小、新增文件数、删除文件数、分区统计信息等。这些元数据非常重要,可以用于在查询时进行文件过滤和裁剪,提高性能。// ... existing code ... public class ManifestFileMeta { // ... private final String fileName; private final long fileSize; private final long numAddedFiles; private final long numDeletedFiles; private final SimpleStats partitionStats; private final long schemaId; // ... }
-
ManifestList
: 多个ManifestFileMeta
对象会被写入到另一个称为ManifestList
的文件中。Snapshot
文件会直接指向这个ManifestList
文件。
所以,整个结构是一个层级关系: Snapshot -> ManifestList -> ManifestFileMeta -> ManifestFile -> ManifestEntry
ManifestFileMeta
ManifestFileMeta
类代表一个清单文件(manifest file)的元数据。它的序列化和反序列化主要由 ManifestFileMetaSerializer
类来处理。
ManifestFileMeta
的实例在需要持久化时,会通过 ManifestFileMetaSerializer
被转换成一个 InternalRow
对象。这个转换的依据是 ManifestFileMeta
类中定义的静态常量 SCHEMA
。
-
序列化 Schema (
ManifestFileMeta.SCHEMA
)SCHEMA
定义了ManifestFileMeta
序列化后的数据结构、字段顺序和类型。// ... existing code ... public static final RowType SCHEMA = new RowType( false, Arrays.asList( new DataField( 0, "_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)), new DataField(1, "_FILE_SIZE", new BigIntType(false)), new DataField(2, "_NUM_ADDED_FILES", new BigIntType(false)), new DataField(3, "_NUM_DELETED_FILES", new BigIntType(false)), new DataField(4, "_PARTITION_STATS", SimpleStats.SCHEMA), new DataField(5, "_SCHEMA_ID", new BigIntType(false)), new DataField(6, "_MIN_BUCKET", new IntType(true)), new DataField(7, "_MAX_BUCKET", new IntType(true)), new DataField(8, "_MIN_LEVEL", new IntType(true)), new DataField(9, "_MAX_LEVEL", new IntType(true)))); private final String fileName; // ... existing code ...
从
SCHEMA
的定义可以看出,序列化后的InternalRow
包含10个字段,其结构如下:
字段索引 | 字段名 | 数据类型 | 对应 ManifestFileMeta 属性 |
---|---|---|---|
0 | _FILE_NAME | VarCharType | fileName |
1 | _FILE_SIZE | BigIntType | fileSize |
2 | _NUM_ADDED_FILES | BigIntType | numAddedFiles |
3 | _NUM_DELETED_FILES | BigIntType | numDeletedFiles |
4 | _PARTITION_STATS | RowType (来自 SimpleStats.SCHEMA) | partitionStats |
5 | _SCHEMA_ID | BigIntType | schemaId |
6 | _MIN_BUCKET | IntType (可为 null) | minBucket |
7 | _MAX_BUCKET | IntType (可为 null) | maxBucket |
8 | _MIN_LEVEL | IntType (可为 null) | minLevel |
9 | _MAX_LEVEL | IntType (可为 null) | maxLevel |
-
序列化与反序列化实现 (
ManifestFileMetaSerializer
)ManifestFileMetaSerializer
继承了VersionedObjectSerializer
,这意味着序列化是带版本信息的。当前版本为2
。- 序列化 (
convertTo
): 将ManifestFileMeta
对象转换为InternalRow
。它按照SCHEMA
定义的顺序,从ManifestFileMeta
对象中获取属性值并填充到GenericRow
中。
// ... existing code ... @Override public InternalRow convertTo(ManifestFileMeta meta) { return GenericRow.of( BinaryString.fromString(meta.fileName()), meta.fileSize(), meta.numAddedFiles(), meta.numDeletedFiles(), meta.partitionStats().toRow(), meta.schemaId(), meta.minBucket(), meta.maxBucket(), meta.minLevel(), meta.maxLevel()); } // ... existing code ...
- 反序列化 (
convertFrom
): 将InternalRow
转换回ManifestFileMeta
对象。它根据SCHEMA
定义的索引和类型,从InternalRow
中读取数据,并调用ManifestFileMeta
的构造函数创建新对象。
// ... existing code ... @Override public ManifestFileMeta convertFrom(int version, InternalRow row) { if (version != 2) { // ... existing code ... } return new ManifestFileMeta( row.getString(0).toString(), row.getLong(1), row.getLong(2), row.getLong(3), SimpleStats.fromRow(row.getRow(4, 3)), row.getLong(5), row.isNullAt(6) ? null : row.getInt(6), row.isNullAt(7) ? null : row.getInt(7), row.isNullAt(8) ? null : row.getInt(8), row.isNullAt(9) ? null : row.getInt(9)); } // ... existing code ...
- 序列化 (
-
便捷方法 (
toBytes
/fromBytes
)ManifestFileMeta
类提供了toBytes()
和fromBytes()
方法,但它们内部使用了ThreadLocal
缓存的ManifestFileMetaSerializer
实例来完成实际的序列化和反序列化工作,以提高性能。// ... existing code ... // ----------------------- Serialization ----------------------------- private static final ThreadLocal<ManifestFileMetaSerializer> SERIALIZER_THREAD_LOCAL = ThreadLocal.withInitial(ManifestFileMetaSerializer::new); public byte[] toBytes() throws IOException { return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this); } public ManifestFileMeta fromBytes(byte[] bytes) throws IOException { return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes); }
ManifestFileMeta
的序列化是一个定义良好、带版本控制的过程。其核心是将 Java 对象与一个固定 SCHEMA
的 InternalRow
进行相互转换。这个 InternalRow
随后可以被 Paimon 的文件格式(如 Avro)写入到持久化存储中。整个结构清晰地反映了清单文件元数据所需要包含的所有信息。
ManifestFileMeta
不直接包含 ManifestEntry
这个设计的核心思想是将一个文件的 “元数据摘要” 与其 “详细内容” 分开。
-
ManifestFileMeta
的角色:元数据摘要ManifestFileMeta
扮演的是一个清单文件(manifest file)的“属性”或“摘要”角色。它描述了这个清单文件本身,但不包含它的具体内容。它包含的字段都是对内容的聚合统计信息,例如:fileName
: 清单文件的物理路径,告诉 Paimon 去哪里找到它。fileSize
: 文件大小。numAddedFiles
/numDeletedFiles
: 文件中包含的“增加文件”和“删除文件”的条目数量。partitionStats
: 文件中所有条目涉及到的分区值的统计信息(最大/最小值等)。minBucket
/maxBucket
: 涉及到的最小和最大 bucket ID。
Paimon 在做查询计划或者管理快照时,可以只读取
ManifestFileMeta
的列表。通过这些摘要信息,它可以快速地进行过滤和剪枝(pruning),判断哪些清单文件(manifest file)根本不需要打开读取,从而极大地提升性能。如果把所有ManifestEntry
都放到ManifestFileMeta
里,那每次读取元数据都会加载所有清单文件的全部内容,会非常低效。 -
ManifestEntry
的角色:详细内容ManifestEntry
列表是清单文件(manifest file)内部存储的实际数据。每一个ManifestEntry
对象都代表一个具体的数据文件(DataFileMeta
)的增加(ADD)或删除(DELETE)操作。
两者如何关联?
ManifestFileMeta
通过 fileName
字段与 ManifestEntry
列表关联起来。工作流程通常是这样的:
- Paimon 首先读取一个
snapshot
文件,获取到它包含的ManifestFileMeta
列表。 - Paimon 遍历这个
ManifestFileMeta
列表,根据查询条件和ManifestFileMeta
中的统计信息,决定哪些清单文件需要被进一步处理。 - 对于需要处理的
ManifestFileMeta
,Paimon 会使用它的fileName()
和fileSize()
属性,通过ManifestFile
类去读取物理文件,并将文件内容反序列化成一个List<ManifestEntry>
。
可以从 ManifestFile
类的代码中看到这个过程:
// ... existing code ...
public List<ManifestEntry> read(String fileName, @Nullable Long fileSize) {
// ... existing code ...
}
// ... existing code ...
这个 read
方法接收 fileName
作为参数,返回 List<ManifestEntry>
,清晰地展示了它们之间的关系。
与 ManifestEntry
包含 DataFileMeta
的对比
ManifestEntry
包含 DataFileMeta
是一个很好的对比。
// ... existing code ...
private final FileKind kind;
// for tables without partition this field should be a row with 0 columns (not null)
private final BinaryRow partition;
private final int bucket;
private final int totalBuckets;
private final DataFileMeta file;
// ... existing code ...
这里的关系是直接包含。因为一个 ManifestEntry
的核心职责就是描述对某一个数据文件的操作,所以它必须包含这个数据文件的完整元数据(DataFileMeta
)。这里的关系是一对一的,逻辑上紧密耦合,因此直接作为成员变量是合理的。
总结
ManifestFileMeta
->ManifestEntry
:是 “元数据 -> 内容” 的关系。ManifestFileMeta
是对一个物理文件的描述和摘要,这个物理文件的内容是ManifestEntry
列表。它们通过fileName
间接关联。ManifestEntry
->DataFileMeta
:是 “操作 -> 对象” 的关系。ManifestEntry
描述了一个对数据文件的操作,它必须指明操作的对象是哪个数据文件,因此直接包含DataFileMeta
。
ManifestList
ManifestList
本身是一个物理文件,其内部存储的是 ManifestFileMeta
对象的列表。所以,ManifestList
的序列化结构,实际上就是将多个 ManifestFileMeta
对象序列化后,逐条写入一个文件的结构。
-
继承自
ObjectsFile<ManifestFileMeta>
ManifestList
继承了ObjectsFile<ManifestFileMeta>
。ObjectsFile
是一个通用的工具类,用于将一系列指定类型(这里是ManifestFileMeta
)的对象写入或读出单个物理文件。这表明ManifestList
复用了 Paimon 中通用的对象文件读写逻辑。public class ManifestList extends ObjectsFile<ManifestFileMeta> { //... }
-
序列化器 (
ManifestFileMetaSerializer
)在
ManifestList.Factory
的create()
方法中,明确指定了用于序列化/反序列化ManifestFileMeta
对象的序列化器是ManifestFileMetaSerializer
。// ... existing code ... public ManifestList create() { RowType metaType = VersionedObjectSerializer.versionType(ManifestFileMeta.SCHEMA); return new ManifestList( fileIO, new ManifestFileMetaSerializer(), metaType, fileFormat.createReaderFactory(metaType), fileFormat.createWriterFactory(metaType), compression, pathFactory.manifestListFactory(), cache); } // ... existing code ...
正如我们之前讨论的,
ManifestFileMetaSerializer
会将一个ManifestFileMeta
对象转换成一个InternalRow
。 -
文件格式 (FileFormat)
ManifestList
文件本身也是有格式的,通常是 Avro 格式(这是 Paimon 的默认配置)。ManifestList.Factory
在创建ManifestList
实例时,会根据传入的fileFormat
(例如avro
)来创建对应的FormatReaderFactory
和FormatWriterFactory
。这意味着,
ManifestList
文件的物理结构是:- 一个 Avro 文件(或其他指定格式的文件)。
- 这个 Avro 文件的 Schema 是由
ManifestFileMeta.SCHEMA
决定的。 - 文件中的每一行(record)都代表一个序列化后的
ManifestFileMeta
对象。
-
写操作 (
write
方法)ManifestList
的write
方法接收一个List<ManifestFileMeta>
,然后调用父类ObjectsFile
的writeWithoutRolling
方法,将这个列表中的每一个ManifestFileMeta
对象通过ManifestFileMetaSerializer
序列化成InternalRow
,再由FormatWriter
写入到物理文件中。// ... existing code ... public Pair<String, Long> write(List<ManifestFileMeta> metas) { return super.writeWithoutRolling(metas.iterator()); } // ... existing code ...
总结
ManifestList
的序列化结构可以概括为:
- 它是一个物理文件,通常是 Avro 格式。
- 这个文件的 Schema(数据结构)由
ManifestFileMeta.SCHEMA
定义。 - 文件中的每一行记录都对应一个完整的、序列化后的
ManifestFileMeta
对象。
Avro 自带格式:
- 自带 Schema: 文件本身就包含了数据的结构信息,这使得数据解析非常可靠,并且支持 Schema 的演进(比如未来在 ManifestFileMeta 中增加或删除字段)。
- 强类型: 每个字段都以其定义的类型(如 long, string, struct)进行存储,避免了类型转换的模糊性和错误。
- 压缩效率高: 二进制格式通常比文本格式更紧凑,并且可以与各种压缩算法(如 snappy, zstd)高效结合,节省存储空间。
- 生态系统支持: Avro 是大数据生态中非常成熟和通用的格式,易于与其他系统集成。
所以,当读取一个 ManifestList
文件时,会得到一个 List<ManifestFileMeta>
,列表中的每个元素都是从文件的一行记录中反序列化出来的。
这个结构与 ManifestFile
非常相似,ManifestFile
的内容是 ManifestEntry
列表,而 ManifestList
的内容是 ManifestFileMeta
列表。这体现了 Paimon 文件组织中分层的元数据管理思想。
Snapshot
与 ManifestList
和 ManifestFile
不同,Snapshot
对象不是被序列化成 Avro 或其他二进制行式文件。Snapshot
对象是序列化成一个 JSON 文件。
-
JSON 格式
Snapshot
类使用了Jackson
库(通过org.apache.paimon.shade.jackson2
进行了 shaded,以避免依赖冲突)来进行 JSON 的序列化和反序列化。你可以从类和字段上的注解看出来:@JsonIgnoreProperties(ignoreUnknown = true)
: 在反序列化时,如果 JSON 中有多余的字段,会忽略它们,这增强了向后兼容性。@JsonProperty("...")
: 将 Java 类的字段名与 JSON 文件中的键(key)进行映射。@JsonCreator
: 标记用于反序列化的构造函数。@JsonGetter
: 标记用于序列化的 getter 方法。@JsonInclude(JsonInclude.Include.NON_NULL)
: 序列化时,如果字段值为null
,则不包含这个键值对,可以减小 JSON 文件的大小。
// ... existing code ... @Public @JsonIgnoreProperties(ignoreUnknown = true) public class Snapshot implements Serializable { // ... existing code ... protected static final String FIELD_VERSION = "version"; // ... 其他字段定义 ... // ... existing code ... @JsonProperty(FIELD_VERSION) @Nullable protected final Integer version; @JsonProperty(FIELD_ID) protected final long id; // ... 其他属性和注解 ... @JsonCreator public Snapshot( @JsonProperty(FIELD_VERSION) @Nullable Integer version, @JsonProperty(FIELD_ID) long id, // ... 其他构造函数参数 ... ) { // ... 构造函数实现 ... } public String toJson() { return JsonSerdeUtil.toJson(this); } public static Snapshot fromJson(String json) { return JsonSerdeUtil.fromJson(json, Snapshot.class); } public static Snapshot fromPath(FileIO fileIO, Path path) { // ... 从文件路径读取并反序列化 ... } }
-
序列化内容 一个
Snapshot
JSON 文件包含了一个特定时间点(即一次 commit)的所有元数据信息。这些信息本身不是数据,而是指向其他元数据文件的指针以及一些统计信息。主要字段包括:id
: 快照的唯一ID。schemaId
: 此快照对应的表结构(Schema)的ID。baseManifestList
: 指向一个ManifestList
文件的文件名。这个 manifest list 记录了从上一个快照继承来的所有数据文件。deltaManifestList
: 指向另一个ManifestList
文件的文件名。这个 manifest list 只记录了本次提交中新增的数据文件。changelogManifestList
: 指向一个可选的ManifestList
文件名,用于记录变更日志(Changelog)。commitKind
: 提交类型,如APPEND
,COMPACT
等。timeMillis
: 提交发生的时间戳。- 以及其他统计数据,如
totalRecordCount
,deltaRecordCount
等。
-
物理存储 每个
Snapshot
对象序列化后,会以snapshot-<id>
的形式存储在表的snapshot
目录下。例如,ID 为 3 的快照会存为snapshot-3
文件。
Snapshot
的序列化机制可以概括为:
- 格式: 使用 JSON 格式。
- 实现: 依赖 Jackson 库进行对象和 JSON 字符串之间的转换。
- 内容: 存储的是指向
ManifestList
文件的文件名(指针) 和其他元数据信息,而不是ManifestList
的实际内容。 - 角色:
Snapshot
文件是 Paimon 数据湖的入口点。通过读取一个snapshot-id
文件,Paimon 就能顺着baseManifestList
和deltaManifestList
的指引,层层找到最终的数据文件。
这种设计将轻量级的、易于人类阅读的 JSON 用于顶层元数据(Snapshot
),而将高效的、结构化的二进制格式(如 Avro)用于存储更大量的底层元数据列表(ManifestList
, ManifestFile
),实现了性能和可读性之间的平衡。
Changelog
Changelog
作为 Snapshot
的子类,在序列化机制上和父类几乎完全相同,但它在概念和用途上有所区别。
序列化机制:完全继承,没有不同。Changelog 类没有定义任何自己独有的新字段。它继承了 Snapshot 的所有字段,并且在构造时,无论是从一个 Snapshot 对象创建,还是从 JSON 反序列化,都是对这些继承来的字段进行赋值。
从代码实现上看,Changelog
的序列化方式与 Snapshot
是一致的:
- 继承关系:
public class Changelog extends Snapshot
这行代码明确了Changelog
继承了Snapshot
的所有字段和方法。 - JSON 格式: 因为父类
Snapshot
使用 Jackson 进行 JSON 序列化,子类Changelog
自然也遵循同样的机制。它会把所有继承来的字段(如id
,schemaId
,baseManifestList
等)序列化成一个 JSON 字符串。 - 注解:
Changelog
类本身也使用了@JsonIgnoreProperties(ignoreUnknown = true)
注解,并且它的构造函数使用了@JsonCreator
和@JsonProperty
,这都和Snapshot
的模式完全一样。 - 序列化/反序列化方法:
Changelog
提供了自己的fromJson
和fromPath
静态方法,但其内部实现依然是调用JsonSerdeUtil
工具类,这和Snapshot
的做法也是一致的。
// ... existing code ...
public static Changelog fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Changelog.class);
}
public static Changelog fromPath(FileIO fileIO, Path path) {
// ... existing code ...
}
// ... existing code ...
结论就是:在序列化技术和最终生成的 JSON 结构上,Changelog
和 Snapshot
没有区别。一个 Changelog
对象的 JSON 产物和一个 Snapshot
对象的 JSON 产物,如果字段值都相同,那么它们的格式和内容也是完全一样的。
为什么需要一个子类?
既然序列化没有区别,为什么还要创建一个 Changelog
子类呢?这主要是出于以下几个原因:
-
语义和类型安全:
- 虽然它们的结构相同,但它们代表的概念不同。
Snapshot
代表一个完整的表状态,而Changelog
代表一次增量变更。 - 通过创建不同的类,可以在代码中实现类型安全。一个方法的参数如果是
Changelog
,你就不能错误地传入一个Snapshot
对象(尽管可以强制转换,但编译器会提示)。这使得代码的意图更清晰,也更健壮。例如,在处理 Changelog 相关的逻辑时,方法签名会明确要求Changelog
类型,避免了混淆。
- 虽然它们的结构相同,但它们代表的概念不同。
-
生命周期和管理:
Snapshot
文件会随着表的写入不断生成,并且会根据过期策略(snapshot.expire.num-retained-min
等)被清理掉。Changelog
文件是为了让变更数据能够独立于Snapshot
的生命周期而存在。当一个Snapshot
因为过期被删除时,它对应的变更信息可以被保存为一个Changelog
文件(物理文件名为changelog-<id>
),从而让下游的流式作业可以从更早的时间点开始消费变更数据。ChangelogManager
就是专门用来管理这些changelog-
文件的生命周期的。
-
未来的扩展性:
- 如果未来
Changelog
需要增加一些Snapshot
所没有的特殊属性,直接在Changelog
子类中添加会非常方便,而不会影响到Snapshot
类。
- 如果未来
为什么序列化一样
Changelog
的核心目的是为了支持增量流式消费。
那为什么它和 Snapshot
的结构(Class 定义)完全一样呢?
答案是:真正的“增量”信息,并不体现在 Changelog
这个元数据文件的结构里,而是体现在它所指向的底层数据文件(Data File)的内容里。
可以把 Snapshot
或 Changelog
的 JSON 文件想象成一个 “货运清单” 。
-
清单格式(元数据结构):无论是运送整个仓库的货物(全量),还是只运送今天新到和发走的货物(增量),货运清单本身的格式都可以是完全一样的。它都需要有清单ID、日期、发货人、以及一个“货物列表”字段。
Snapshot
和Changelog
类就是这个清单的“格式定义”,为了代码复用,它们被设计成了一样的。 -
清单内容(指向的数据):区别在于清单上“货物列表”里记的东西。
- 一个普通
Snapshot
的清单,它指向的数据文件里只记录了数据的“最终态”。比如一条记录从 A 更新到 B,数据文件里可能就只有一条+U
(B) 的记录。下游消费者只知道它现在是 B,但不知道它之前是 A。 - 一个为增量消费设计的
Changelog
,它指向的数据文件里会记录完整的“变更历史”。对于同样的更新操作,数据文件里会包含-U
(A) 和+U
(B) 两条记录。这样下游消费者就能精确地知道发生了什么变化。
- 一个普通
这个“数据文件里到底记录什么内容”的行为,是由表的 changelog-producer
属性控制的。
-
changelog-producer
= 'none'` (默认)- 数据文件只包含最新值。
Snapshot
文件中的changelogManifestList
字段通常是空的。- 这种模式下,流读只能看到合并后的结果,无法获取精确的
before
值,不适合需要完整 changelog 的场景(比如计算 SUM)。
-
changelog-producer
= 'input' / 'lookup' / 'full-compaction'`- Paimon 会在数据文件中额外生成并保留完整的 changelog 数据(
-U
,+U
等)。 Snapshot
文件中的changelogManifestList
字段会指向一个专门记录这些 changelog 数据文件的ManifestList
。- 这样,即使
Snapshot
本身的结构没变,但它通过changelogManifestList
指向了包含丰富变更信息的数据文件,从而实现了真正的增量流读能力。
- Paimon 会在数据文件中额外生成并保留完整的 changelog 数据(
既然开启 changelog-producer
后,Snapshot
文件自身就能指向增量数据,为什么还需要 Changelog
这个子类呢?
这就是为了生命周期管理。
Snapshot
文件会根据过期策略被定期删除。但我们往往希望增量日志的保留时间比快照更长,以便下游任务可以从更早的时间点回溯消费。
因此,当一个带有宝贵增量信息(由 changelog-producer
生成)的 Snapshot
即将过期时,Paimon 会:
- 读取这个
Snapshot
对象。 - 用它创建一个一模一样的
Changelog
对象(new Changelog(snapshot)
)。 - 将这个
Changelog
对象序列化成一个changelog-<id>
文件,存入changelog/
目录。 - 这样,即使原始的
snapshot-<id>
文件被删除了,它的所有元数据和指向增量数据文件的能力,都以Changelog
的形式被永久或更久地保留了下来。
总结
Changelog
在序列化层面与 Snapshot
没有区别,它完全复用了父类的 JSON 序列化机制。
- 结构相同是为了代码复用:
Changelog
和Snapshot
都是元数据容器,它们的结构(字段)是一样的。 - 内容不同是实现增量的关键:是否能增量消费,取决于它们指向的数据文件中是否包含完整的变更记录(
-U
/+U
),这由changelog-producer
配置决定。 Changelog
类是语义和生命周期的区分:它在代码层面提供了清晰的类型区分,并且它的核心使命是将Snapshot
的增量元数据从其自身的过期策略中“解救”出来,实现更长的生命周期。
可以把它想象成一种“身份转换”或者“贴标签”。一个 Snapshot 对象在即将被系统根据过期策略清理时,可以“变身”成一个 Changelog 对象被持久化下来。它们携带的信息(字段)完全一样,但“身份”(Java 类型)和“归宿”(存储路径以 changelog- 开头,并由 ChangelogManager 管理)不同了。
设立 Changelog
这个子类的主要目的是为了在代码层面提供更清晰的语义区分和类型安全,并对 Snapshot
和 Changelog
这两种不同生命周期的元数据文件进行分开管理,同时也为未来的功能扩展提供了便利。
Tag
Tag
在概念和实现上都与 Snapshot
、Changelog
有着显著的区别。
Tag
的核心含义是一个 “命名的快照”或“标签” 。可以把它理解为 Git 中的 tag。它的主要作用是:
- 人类可读的引用:为某个特定的
Snapshot
ID(通常是一个长整型数字)赋予一个有意义的、人类可读的名称,比如release-v1.0
或daily-backup-20231026
。 - 防止过期:被打了 Tag 的
Snapshot
以及它所依赖的所有数据文件,默认情况下不会被 Paimon 的自动过期清理机制删除。这为数据版本回溯和归档提供了一个可靠的保障。
Tag
的序列化和存储机制与 Snapshot
和 Changelog
完全不同,这主要体现在存储的内容和方式上。
存储内容:轻量级指针
Snapshot
/Changelog
:它们的文件(如snapshot-3
)存储的是一个完整的元数据对象,包含了指向ManifestList
的文件名、提交类型、记录数统计等所有快照信息。Tag
:一个 Tag 文件(如tag/release-v1.0
)存储的不是一个完整的Snapshot
对象,而是一个非常轻量级的指针对象。这个对象只包含最核心的指向信息,主要是:snapshotId
: 这个 Tag 指向的快照 ID。schemaId
: 对应快照的 Schema ID。timeMillis
: 对应快照的提交时间。options
: 对应快照的表配置。
这种设计非常高效,创建一个 Tag 几乎没有存储开销,因为它只是创建了一个包含快照 ID 的小文件,而不是复制一份完整的快照元数据。
和 Snapshot
、Changelog
一样,Tag 的指针对象也是被序列化成 JSON 格式。
存储方式
- 存储目录:Tag 文件存储在表路径下的
tag/
目录中,与snapshot/
和changelog/
目录并列。 - 文件命名:Tag 文件的文件名就是 Tag 的名称。例如,你创建一个名为
v2.0
的 Tag,Paimon 就会在tag/
目录下创建一个名为v2.0
的文件。
warehouse
└── default.db
└── my_table
├── snapshot/
│ ├── snapshot-1
│ └── snapshot-2
├── changelog/
└── tag/
├── release-v1.0 # 文件内容: {"snapshotId":1, "schemaId":0, ...}
└── v2.0 # 文件内容: {"snapshotId":2, "schemaId":0, ...}
总结对比
特性 | Snapshot | Changelog | Tag |
---|---|---|---|
核心作用 | 代表某个时间点的完整表状态 | 代表一次增量变更,生命周期可独立于快照 | 为某个快照创建永久性的、可读的别名 |
存储目录 | snapshot/ | changelog/ | tag/ |
文件命名 | snapshot-<ID> | changelog-<ID> | <tag-name> |
序列化格式 | JSON | JSON | JSON |
存储内容 | 完整的快照元数据对象 | 完整的快照元数据对象 | 轻量级的指针对象 (主要包含 snapshotId) |
生命周期 | 会被自动过期策略清理 | 从过期的快照转换而来,有独立的生命周期 | 默认永久存在,保护其指向的快照不被清理 |
总而言之,Tag
在 Paimon 中是一个非常重要的功能,它通过一个轻量级的、序列化为 JSON 的指针文件,实现了对特定数据版本的永久性、可读性引用,这在生产环境中对于数据治理、版本控制和故障恢复至关重要。
单个 DataFileMeta
对象内存分析
一个 DataFileMeta
对象本身占用的内存大小是可变的,主要取决于表的结构和配置。我们可以分析它的成员变量来估算其大小。
以下是 DataFileMeta
类的主要字段和它们的大致内存占用分析(基于 64 位 JVM):
-
固定大小字段:
fileSize
,rowCount
,minSequenceNumber
,maxSequenceNumber
,schemaId
: 5个long
类型,共5 * 8 = 40
字节。level
: 1个int
类型,4
字节。creationTime
: 1个Timestamp
对象,内部包含long
和int
,约12
字节 + 对象开销。deleteRowCount
: 1个Long
包装对象,约8
字节 + 对象开销。fileSource
: 1个FileSource
枚举引用。
-
可变大小字段(主要内存消耗来源):
fileName
(String
): 文件名字符串。Paimon 的文件名通常包含 UUID,长度较长,例如data-b3d3b3f0-d8a8-4195-a617-1f4973434b72-0.orc
。一个文件名可能占用 100-200 字节。minKey
,maxKey
(BinaryRow
): 存储了文件的最小和最大主键。其大小取决于主键的列数和类型。如果主键很复杂,这里会占用较多空间,可能在几十到几百字节。keyStats
,valueStats
(SimpleStats
): 存储了主键列和值列的统计信息(最大值、最小值、null 计数等)。这是内存占用的主要部分,特别是valueStats
。其大小与表中列的数量和启用的统计信息级别 (stats.mode
) 直接相关。如果为一个宽表(列非常多)的所有列都收集统计信息,这部分会占用数千字节。extraFiles
(List<String>
): 存储关联的额外文件列表(例如索引文件)。如果存在,每个文件名都会增加内存占用。embeddedIndex
(byte[]
): 内嵌的索引数据,例如布隆过滤器。其大小取决于索引类型和数据量,可能从几十字节到几千字节不等,但通常是可选的。valueStatsCols
(List<String>
): 记录了哪些列收集了valueStats
。如果收集统计信息的列很多,这个字符串列表也会很大。externalPath
(String
): 外部存储路径。如果使用,它会存储一个完整的路径字符串。
小结:
一个单独的 DataFileMeta
对象,对于一个简单的表,其内存占用可能在几百字节;但对于一个列很多(宽表)且开启了详细统计信息的表,其内存占用可能会达到 几 KB 甚至更高。
内存占用会很多吗?
在特定场景下会非常多。
问题不在于单个 DataFileMeta
对象有多大,而在于 查询规划(Planning)时需要在内存中加载多少个这样的对象。
当 Flink 或 Spark 的 Driver/JobManager 节点为 Paimon 表生成执行计划时,它需要读取表的 manifest
文件,这些文件里就存储了所有数据文件(DataFile)的元数据,也就是 DataFileMeta
列表。
想象一个场景: 一个大型分区表,经过了长时间的流式写入,并且没有进行充分的合并(Compaction)。这可能导致每个分区、每个 bucket 下都积累了成千上万个小的碎文件。
如果一个查询需要扫描多个分区,那么 Driver 节点就可能需要加载 数万甚至数百万个 DataFileMeta
对象到内存中。
举例计算: 假设一个表有 1,000,000 个数据文件,每个 DataFileMeta
平均占用 3 KB。 总内存占用 = 1,000,000 * 3 KB = 3 GB
Paimon 的应对策略
Paimon 自身设计了多种机制来缓解这个问题:
- 分区裁剪 (Partition Pruning): 根据
WHERE
条件过滤分区,只读取相关分区的manifest
文件,从源头上减少了需要处理的元数据量。 - 元数据过滤 (Metadata Filtering): Paimon 在
manifest
文件本身也存储了统计信息(如ManifestFileMeta
中的partitionStats
)。在读取和反序列化完整的DataFileMeta
列表之前,可以利用这些更高层级的统计信息来跳过整个manifest
文件或者其中的部分文件,避免了不必要的开销。 - 文件合并 (Compaction): 这是最根本的解决方案。通过定期运行 Compaction 作业,可以将大量小文件合并成少量大文件。文件数量的减少直接导致了
DataFileMeta
数量的减少,从而显著降低了查询规划时的内存压力。
结论
DataFileMeta
的内存占用对于单个对象来说是可控的,但当数据文件数量巨大时,其在 Driver/JobManager 端的总内存消耗会成为一个严重的性能瓶颈。
因此,对于生产环境中的 Paimon 表,定期进行文件合并(Compaction)是至关重要的,它能有效控制文件数量,从而保证查询规划的稳定性和效率。
更多推荐
所有评论(0)