Paimon Split 机制深度解析
SplitHDFS 的 InputSplitHive 的 SplitSpark 的 Partition分区信息Bucket ID一批数据文件删除文件(Deletion Files,如果启用了 Deletion Vectors)Split 是 Paimon 的读取单元,包含分区、Bucket、文件列表Split 大小可配置,通过调整(默认 128MB)⚠️ Split 数量受表类型影响主键表:通常
Paimon Split 机制深度解析(区分主键表和非主键表)
📌 重要提示:本文档已更新为 v2.0,系统性地区分了主键表和非主键表的 Split 生成策略。建议先阅读《Paimon 主键表 vs 非主键表核心差异》了解基础概念。
目录
Split 的概念与组成
什么是 Split?
Split 是 Paimon 中的读取单元,类似于其他大数据系统中的概念:
- HDFS 的 InputSplit
- Hive 的 Split
- Spark 的 Partition
一个 Split 包含:
- 分区信息(Partition)
- Bucket ID
- 一批数据文件(DataFile List)
- 删除文件(Deletion Files,如果启用了 Deletion Vectors)
Split 的数据结构
DataSplit (核心 Split 类型)
源码位置: paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
public class DataSplit implements Split {
private final long snapshotId; // 快照 ID
private final BinaryRow partition; // 分区
private final int bucket; // Bucket ID
private final List<DataFileMeta> dataFiles; // 数据文件列表
@Nullable private final List<DeletionFile> deletionFiles; // 删除文件
private final boolean rawConvertible; // 是否可以原始转换
// Split 的统计信息
private transient long rowCount = -1; // 行数
private transient long dataFileSize = -1; // 数据文件大小
}
关键字段解释:
snapshotId: 这个 Split 对应的快照版本partition: 分区的二进制表示bucket: Bucket ID(0 到 N-1)dataFiles: 这个 Split 需要读取的数据文件列表deletionFiles: 如果有删除操作,对应的删除文件
FileStoreSourceSplit (Flink 包装)
源码位置: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java
public class FileStoreSourceSplit implements SourceSplit {
private final String id; // Split 的唯一 ID (格式: uuid-序号)
private final Split split; // Paimon 的 Split
private final long recordsToSkip; // 需要跳过的记录数(用于断点续传)
public FileStoreSourceSplit(String id, Split split, long recordsToSkip) {
this.id = id;
this.split = split;
this.recordsToSkip = recordsToSkip;
}
}
Split 的大小
Split 的大小由以下因素决定:
Split 大小 = Σ(数据文件大小)
其中:
- 数据文件大小 = Parquet/ORC 文件在磁盘上的大小
- Split 目标大小 = split.target-size 配置(默认 128MB)
Split 生成策略(⚠️ 区分表类型):
主键表(Primary Key Table):
-
需要 Merge 的情况:
- 同一个 Bucket 的文件不能随意切分
- 必须按 Key Range 分区
- 相同 Key Range 的文件必须在同一个 Split
- 通常:1 Bucket = 1 Split(如果 Key Range 完全重叠)
-
可以 Raw 转换的情况:
- 所有文件都在高 Level(非 Level 0)且无删除行
- 可以按
split.target-size切分(类似非主键表)
非主键表(Append-Only Table):
- 同一个 Bucket 的文件可以随意切分
- 按照
split.target-size使用 Bin Packing 算法打包 - 如果 Bucket 总大小 = 1GB,targetSplitSize = 128MB
- 可以生成约 8 个 Splits
- 8 个不同的 Task 并行读取这个 Bucket!
Split 生成流程(主键表 vs 非主键表)
⚠️ 注意:主键表和非主键表的 Split 生成策略完全不同!
主键表 vs 非主键表生成策略对比
整体流程图
详细步骤分析
步骤 1:读取 Snapshot
源码位置: paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@Override
public Plan plan() {
long started = System.nanoTime();
// 1. 读取 Manifest 列表
ManifestsReader.Result manifestsResult = readManifests();
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
// ...
}
private ManifestsReader.Result readManifests() {
Snapshot snapshot = specifiedSnapshot != null
? specifiedSnapshot
: snapshotManager.latestSnapshot();
if (snapshot == null) {
return new ManifestsReader.Result(null, Collections.emptyList(), Collections.emptyList());
}
// 读取这个快照的 Manifest 文件列表
List<ManifestFileMeta> allManifests = snapshot.dataManifests(manifestFileFactory);
// 应用分区过滤,过滤掉不需要的 Manifest
List<ManifestFileMeta> filteredManifests =
manifestsReader.filterManifests(allManifests);
return new ManifestsReader.Result(snapshot, allManifests, filteredManifests);
}
关键点:
- Snapshot 包含了这个版本的所有 Manifest 文件列表
- 每个 Manifest 文件包含多个 ManifestEntry(文件元数据)
- 在这一步就会根据分区统计信息过滤掉整个 Manifest
步骤 2:读取 ManifestEntry
private Iterator<ManifestEntry> readManifestEntries(
List<ManifestFileMeta> manifests, boolean readDeletionFiles) {
// 并行读取多个 Manifest 文件
return randomlyExecuteSequentialReturn(
getExecutorService(parallelism),
manifests.stream()
.map(manifest ->
() -> manifestsReader.readManifestEntries(manifest, readDeletionFiles))
.collect(Collectors.toList()))
.stream()
.flatMap(List::stream)
.iterator();
}
ManifestEntry 包含的信息:
public class ManifestEntry {
private FileKind kind; // ADD / DELETE
private BinaryRow partition; // 分区
private int bucket; // Bucket ID
private int totalBuckets; // 总 Bucket 数
private DataFileMeta file; // 数据文件元数据
// DataFileMeta 包含:
// - 文件名
// - 文件大小
// - 行数
// - 最小/最大值(用于过滤)
// - 层级(Level)
}
步骤 3:按 Partition + Bucket 分组
源码位置: paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@Override
public Plan plan() {
// ... 前面的步骤
List<ManifestEntry> files = ListUtils.toList(iterator);
// 按照 Partition + Bucket 分组
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> grouped =
groupByPartFiles(files);
// ...
}
public static Map<Pair<BinaryRow, Integer>, List<ManifestEntry>>
groupByPartFiles(List<ManifestEntry> entries) {
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> grouped = new LinkedHashMap<>();
for (ManifestEntry entry : entries) {
Pair<BinaryRow, Integer> key = Pair.of(entry.partition(), entry.bucket());
grouped.computeIfAbsent(key, k -> new ArrayList<>()).add(entry);
}
return grouped;
}
分组示例:
假设有以下 ManifestEntry:
1. partition=2026-01-25, bucket=0, file=file1.parquet (100MB)
2. partition=2026-01-25, bucket=0, file=file2.parquet (50MB)
3. partition=2026-01-25, bucket=1, file=file3.parquet (200MB)
4. partition=2026-01-26, bucket=0, file=file4.parquet (80MB)
分组后:
Group 1: (2026-01-25, bucket=0) -> [file1, file2] (150MB)
Group 2: (2026-01-25, bucket=1) -> [file3] (200MB)
Group 3: (2026-01-26, bucket=0) -> [file4] (80MB)
步骤 4:生成 Splits(⚠️ 主键表 vs 非主键表差异)
这是最关键的差异点!
🔴 主键表 Split 生成:MergeTreeSplitGenerator
源码位置: paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
核心策略:按 Key Range 分区,确保相同 Key 的文件在同一个 Split
public class MergeTreeSplitGenerator implements SplitGenerator {
private final Comparator<InternalRow> keyComparator;
private final long targetSplitSize;
private final long openFileCost;
@Override
public List<SplitGroup> splitForBatch(List<DataFileMeta> files) {
// 1. 检查是否可以 Raw 转换(无需 Merge)
boolean rawConvertible =
files.stream().allMatch(file -> file.level() != 0 && withoutDeleteRow(file));
boolean oneLevel =
files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1;
if (rawConvertible && (deletionVectorsEnabled || mergeEngine == FIRST_ROW || oneLevel)) {
// 情况 1:可以按大小切分(类似非主键表)
Function<DataFileMeta, Long> weightFunc =
file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize)
.stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}
// 情况 2:需要 Merge,按 Key Range 分区
/*
* 🔑 关键算法:IntervalPartition
*
* 将文件按 Key Range 分区,确保:
* - 相同 Key Range 的文件在同一个 Section
* - 不同 Section 的 Key Range 不重叠
*
* 例如:
* file1: [key 1-100, 200MB]
* file2: [key 1-50, 100MB] ← Key 重叠,必须同一 Section
* file3: [key 51-100, 80MB] ← Key 重叠,必须同一 Section
* file4: [key 200-300, 150MB] ← Key 不重叠,可以单独 Section
*
* 结果:
* Section 1: [file1, file2, file3] ← 必须在同一 Split 中 Merge
* Section 2: [file4]
*/
List<List<DataFileMeta>> sections =
new IntervalPartition(files, keyComparator)
.partition()
.stream()
.map(this::flatRun)
.collect(Collectors.toList());
// 3. 将 Sections 按大小打包成 Splits
return packSplits(sections);
}
@Override
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// 流式读取:不切分,整个 Bucket 作为一个 Split
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}
}
主键表 Split 生成示例:
示例 1:Key Range 完全重叠(最常见)
Bucket 0 的文件:
Level 0: file1 [key: 1-1000, 200MB]
Level 1: file2 [key: 1-500, 100MB]
Level 1: file3 [key: 501-1000, 100MB]
Level 2: file4 [key: 1-1000, 500MB]
分析:
- 所有文件的 Key Range 都有重叠(1-1000)
- 必须在同一个 Split 中读取并 Merge
IntervalPartition 结果:
Section 1: [file1, file2, file3, file4]
最终生成:
Split 1: 包含所有 4 个文件
结论:1 Bucket = 1 Split
并行度 = Bucket 数量
示例 2:Key Range 部分重叠
Bucket 0 的文件:
Level 0: file1 [key: 1-100, 100MB]
Level 1: file2 [key: 1-50, 50MB] ← 与 file1 重叠
Level 1: file3 [key: 51-100, 50MB] ← 与 file1 重叠
Level 1: file4 [key: 200-300, 100MB] ← 不重叠
Level 2: file5 [key: 1-100, 200MB] ← 与 file1,2,3 重叠
Level 2: file6 [key: 200-300, 150MB] ← 与 file4 重叠
IntervalPartition 分析:
- file1, file2, file3, file5 的 Key Range 重叠(1-100)
→ Section 1: [file1, file2, file3, file5]
- file4, file6 的 Key Range 重叠(200-300)
→ Section 2: [file4, file6]
packSplits(按 targetSplitSize=128MB 打包):
- Section 1 总大小 = 400MB,可以切分:
- Split 1: [file1, file2] (150MB)
- Split 2: [file3, file5] (250MB)
- Section 2 总大小 = 250MB,可以切分:
- Split 3: [file4] (100MB)
- Split 4: [file6] (150MB)
最终生成:4 个 Splits
结论:可以有一定的并行度,但受 Key Range 约束
示例 3:Raw Convertible(可按大小切分)
Bucket 0 的文件(经过充分 Compaction):
Level 5: file1 [key: 1-1000, 500MB, 无删除行]
Level 5: file2 [key: 1001-2000, 500MB, 无删除行]
Level 5: file3 [key: 2001-3000, 500MB, 无删除行]
分析:
- 所有文件都在高 Level(Level 5)
- 没有删除行
- 可以直接读取,无需 Merge
splitForBatch 会选择 Bin Packing:
- file1: 500MB / 128MB ≈ 4 个 Splits
- file2: 500MB / 128MB ≈ 4 个 Splits
- file3: 500MB / 128MB ≈ 4 个 Splits
最终生成:约 12 个 Splits
结论:类似非主键表,高并行度
🔵 非主键表 Split 生成:AppendOnlySplitGenerator
源码位置: paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
核心策略:自由切分,按文件大小 Bin Packing
public class AppendOnlySplitGenerator implements SplitGenerator {
private final long targetSplitSize;
private final long openFileCost;
@Override
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
// 按 sequence 排序(保证顺序,但不影响切分)
files.sort(Comparator.comparing(DataFileMeta::minSequenceNumber));
// 🔑 关键:简单的 Bin Packing,不考虑 Key Range
Function<DataFileMeta, Long> weightFunc =
file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize)
.stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}
@Override
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
if (bucketMode == BucketMode.BUCKET_UNAWARE) {
// Unaware Bucket 模式:可以切分
return splitForBatch(files);
} else {
// 固定 Bucket 模式:不切分(保证顺序)
return Collections.singletonList(
SplitGroup.rawConvertibleGroup(files));
}
}
}
非主键表 Split 生成示例:
示例 1:批处理模式
Bucket 0 的文件(都是 Level 0):
file1.parquet (100MB)
file2.parquet (80MB)
file3.parquet (200MB)
file4.parquet (60MB)
file5.parquet (150MB)
按 sequence 排序后(假设文件名即顺序):
[file1, file2, file3, file4, file5]
Bin Packing(targetSplitSize=128MB):
1. current = [], currentSize = 0
2. 添加 file1 (100MB): current = [file1], currentSize = 100MB
3. 添加 file2 (80MB): 100 + 80 = 180MB > 128MB
→ 生成 Split 1: [file1] (100MB)
→ 重置 current = [file2], currentSize = 80MB
4. 添加 file3 (200MB): 80 + 200 = 280MB > 128MB
→ 生成 Split 2: [file2] (80MB)
→ 重置 current = [file3], currentSize = 200MB
5. 添加 file4 (60MB): 200 + 60 = 260MB > 128MB
→ 生成 Split 3: [file3] (200MB)
→ 重置 current = [file4], currentSize = 60MB
6. 添加 file5 (150MB): 60 + 150 = 210MB > 128MB
→ 生成 Split 4: [file4] (60MB)
→ 重置 current = [file5], currentSize = 150MB
7. 处理完毕:
→ 生成 Split 5: [file5] (150MB)
最终生成:5 个 Splits
结论:
- 5 个 Splits 可以分配给 5 个 Task 并行读取
- 1 个 Bucket 的数据被 5 个 Task 并行处理!
- 高并行度,高吞吐量
示例 2:文件合并打包
Bucket 0 的文件(很多小文件):
file1.parquet (40MB)
file2.parquet (30MB)
file3.parquet (50MB)
file4.parquet (35MB)
file5.parquet (60MB)
file6.parquet (45MB)
Bin Packing(targetSplitSize=128MB):
1. 添加 file1 (40MB): currentSize = 40MB
2. 添加 file2 (30MB): 40 + 30 = 70MB < 128MB,继续
3. 添加 file3 (50MB): 70 + 50 = 120MB < 128MB,继续
4. 添加 file4 (35MB): 120 + 35 = 155MB > 128MB
→ 生成 Split 1: [file1, file2, file3] (120MB)
→ 重置 current = [file4], currentSize = 35MB
5. 添加 file5 (60MB): 35 + 60 = 95MB < 128MB,继续
6. 添加 file6 (45MB): 95 + 45 = 140MB > 128MB
→ 生成 Split 2: [file4, file5] (95MB)
→ 重置 current = [file6], currentSize = 45MB
7. 处理完毕:
→ 生成 Split 3: [file6] (45MB)
最终生成:3 个 Splits
结论:
- 小文件被合并打包
- 3 个 Splits 并行读取
- 减少任务调度开销
主键表 vs 非主键表 Split 生成对比
| 维度 | 主键表 | 非主键表 |
|---|---|---|
| 实现类 | MergeTreeSplitGenerator | AppendOnlySplitGenerator |
| 核心约束 | Key Range 约束 | 无约束 |
| 切分策略 | IntervalPartition + BinPacking | BinPacking |
| 批处理(常规) | 1 Bucket = 1 Split | 1 Bucket = 多个 Splits |
| 批处理(Raw 转换) | 可切分 | 可切分 |
| 流处理 | 1 Bucket = 1 Split | 1 Bucket = 1 Split |
| 并行度潜力 | 受限(通常 = Bucket 数) | 高(远大于 Bucket 数) |
| 适用场景 | 需要保证主键唯一性 | 高性能批处理 |
Split 生成决策图
🔴 主键表原始代码
源码位置: paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java
public class MergeTreeSplitGenerator implements SplitGenerator {
private final long targetSplitSize;
private final long openFileCost;
@Override
public List<Split> split(List<ManifestEntry> entries) {
List<DataFileMeta> files = new ArrayList<>();
for (ManifestEntry entry : entries) {
files.add(entry.file());
}
// 按层级排序(Level 小的在前)
files.sort(Comparator.comparingInt(DataFileMeta::level));
List<List<DataFileMeta>> result = new ArrayList<>();
List<DataFileMeta> current = new ArrayList<>();
long currentSize = 0;
for (DataFileMeta file : files) {
long fileSize = file.fileSize() + openFileCost;
// 如果加上这个文件会超过目标大小,且当前已有文件
if (currentSize + fileSize > targetSplitSize && !current.isEmpty()) {
result.add(new ArrayList<>(current));
current.clear();
currentSize = 0;
}
current.add(file);
currentSize += fileSize;
}
if (!current.isEmpty()) {
result.add(current);
}
// 为每个文件组生成一个 DataSplit
List<Split> splits = new ArrayList<>();
for (List<DataFileMeta> group : result) {
splits.add(DataSplit.builder()
.withSnapshot(snapshotId)
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(group)
.build());
}
return splits;
}
}
Split 生成逻辑:
- 同一个 Bucket 的文件按 Level 排序
- 从前往后累加文件大小
- 当累计大小超过
targetSplitSize时,生成一个 Split - 继续处理剩余文件
示例:
假设:
- targetSplitSize = 128MB
- Bucket 0 有以下文件:
- file1.parquet (100MB, Level 0)
- file2.parquet (50MB, Level 0)
- file3.parquet (80MB, Level 1)
- file4.parquet (60MB, Level 1)
处理过程:
1. current = [], currentSize = 0
2. 添加 file1 (100MB): current = [file1], currentSize = 100MB
3. 添加 file2 (50MB): 100 + 50 = 150MB > 128MB
-> 生成 Split 1: [file1],重置 current = [file2], currentSize = 50MB
4. 添加 file3 (80MB): 50 + 80 = 130MB > 128MB
-> 生成 Split 2: [file2],重置 current = [file3], currentSize = 80MB
5. 添加 file4 (60MB): 80 + 60 = 140MB > 128MB
-> 生成 Split 3: [file3],重置 current = [file4], currentSize = 60MB
6. 处理完毕,生成 Split 4: [file4]
最终生成 4 个 Splits
步骤 5:包装成 FileStoreSourceSplit
源码位置: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
public class FileStoreSourceSplitGenerator {
private final String uuid = UUID.randomUUID().toString();
private final AtomicInteger idCounter = new AtomicInteger(1);
public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
return plan.splits().stream()
.map(s -> new FileStoreSourceSplit(getNextId(), s))
.collect(Collectors.toList());
}
protected final String getNextId() {
return uuid + "-" + idCounter.getAndIncrement();
}
}
Split ID 格式:
{uuid}-{序号}
例如:
550e8400-e29b-41d4-a716-446655440000-1
550e8400-e29b-41d4-a716-446655440000-2
550e8400-e29b-41d4-a716-446655440000-3
Split 分配策略
Paimon 支持两种 Split 分配策略:FAIR 和 PREEMPTIVE
FAIR 模式(公平分配)
分配算法
源码位置: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
public class PreAssignSplitAssigner implements SplitAssigner {
private final int splitBatchSize;
private final Map<Integer, List<FileStoreSourceSplit>> assignedSplits;
public PreAssignSplitAssigner(
int splitBatchSize,
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits) {
this.splitBatchSize = splitBatchSize;
// 预先分配 Splits
Map<Integer, List<FileStoreSourceSplit>> preAssignment =
assignSplitsFairly(splits, context.currentParallelism());
this.assignedSplits = new HashMap<>(preAssignment);
}
private Map<Integer, List<FileStoreSourceSplit>> assignSplitsFairly(
Collection<FileStoreSourceSplit> splits, int parallelism) {
// 1. 按大小排序 Splits(大的在前)
List<FileStoreSourceSplit> sortedSplits = new ArrayList<>(splits);
sortedSplits.sort(Comparator.comparingLong(
s -> -s.split().rowCount() // 负号表示降序
));
// 2. 为每个 Task 创建一个桶,记录已分配的数据量
long[] taskLoads = new long[parallelism];
Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
for (int i = 0; i < parallelism; i++) {
assignment.put(i, new ArrayList<>());
}
// 3. 贪心算法:每次把最大的 Split 分配给负载最小的 Task
for (FileStoreSourceSplit split : sortedSplits) {
// 找到负载最小的 Task
int minLoadTask = 0;
long minLoad = taskLoads[0];
for (int i = 1; i < parallelism; i++) {
if (taskLoads[i] < minLoad) {
minLoad = taskLoads[i];
minLoadTask = i;
}
}
// 分配 Split 给这个 Task
assignment.get(minLoadTask).add(split);
taskLoads[minLoadTask] += split.split().rowCount();
}
return assignment;
}
}
分配示例
场景:
- 并行度 = 3
- 有 6 个 Splits,大小分别为:[100MB, 80MB, 70MB, 60MB, 50MB, 40MB]
分配过程:
初始状态:
Task 0: [], load = 0
Task 1: [], load = 0
Task 2: [], load = 0
步骤 1:分配 100MB Split
Task 0: [100MB], load = 100
Task 1: [], load = 0
Task 2: [], load = 0
步骤 2:分配 80MB Split(给 Task 1,负载最小)
Task 0: [100MB], load = 100
Task 1: [80MB], load = 80
Task 2: [], load = 0
步骤 3:分配 70MB Split(给 Task 2,负载最小)
Task 0: [100MB], load = 100
Task 1: [80MB], load = 80
Task 2: [70MB], load = 70
步骤 4:分配 60MB Split(给 Task 2,负载最小)
Task 0: [100MB], load = 100
Task 1: [80MB], load = 80
Task 2: [70MB, 60MB], load = 130
步骤 5:分配 50MB Split(给 Task 1,负载最小)
Task 0: [100MB], load = 100
Task 1: [80MB, 50MB], load = 130
Task 2: [70MB, 60MB], load = 130
步骤 6:分配 40MB Split(给 Task 0,负载最小)
Task 0: [100MB, 40MB], load = 140
Task 1: [80MB, 50MB], load = 130
Task 2: [70MB, 60MB], load = 130
最终分配:
Task 0: [100MB, 40MB]
Task 1: [80MB, 50MB]
Task 2: [70MB, 60MB]
负载相对均衡!
FAIR 模式的特点
优点:
- 负载均衡,避免数据倾斜
- 所有 Task 大致同时完成
- 适合异构资源环境
缺点:
- 需要提前知道所有 Splits(批处理场景)
- 分配算法有一定开销
- 不适合流式读取
适用场景:
- 批处理查询
- 数据倾斜明显的表(某些 Bucket 特别大)
- ETL 作业
PREEMPTIVE 模式(抢占式分配)
分配算法
源码位置: paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java
public class FIFOSplitAssigner implements SplitAssigner {
private final List<FileStoreSourceSplit> pendingSplits;
public FIFOSplitAssigner(Collection<FileStoreSourceSplit> splits) {
this.pendingSplits = new ArrayList<>(splits);
}
@Override
public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname) {
if (pendingSplits.isEmpty()) {
return Collections.emptyList();
}
// 简单的 FIFO:返回第一个 Split
return Collections.singletonList(pendingSplits.remove(0));
}
@Override
public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
// 失败的 Split 加回队列末尾
pendingSplits.addAll(splits);
}
}
工作流程
PREEMPTIVE 模式的特点
优点:
- 实现简单,开销小
- 适合流式场景(动态生成 Splits)
- 快的 Task 可以处理更多 Splits
缺点:
- 可能导致负载不均(如果 Split 大小差异大)
- 慢的 Task 可能成为瓶颈
适用场景:
- Split 大小相对均匀
- 流式读取
- 快速启动(不需要提前分配)
Split 的生命周期
完整的生命周期图
各阶段详解
1. Created(创建阶段)
时机:JobManager 初始化 Source 时
代码位置:
// StaticFileStoreSource.restoreEnumerator()
private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext context) {
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
TableScan scan = readBuilder.newScan();
return splitGenerator.createSplits(scan.plan());
}
2. Pending(等待阶段)
FAIR 模式:
// 预先分配,直接进入各 Task 的队列
Map<Integer, List<FileStoreSourceSplit>> preAssignment = assignSplitsFairly(splits, parallelism);
PREEMPTIVE 模式:
// 加入全局队列
private final List<FileStoreSourceSplit> pendingSplits = new ArrayList<>(splits);
3. Assigned(已分配阶段)
代码位置: StaticFileStoreSplitEnumerator.handleSplitRequest()
@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
List<FileStoreSourceSplit> assignment = splitAssigner.getNext(subtask, hostname);
if (assignment.size() > 0) {
// 通过 RPC 发送 Split 给 TaskManager
context.assignSplits(
new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
} else {
// 没有更多 Splits
context.signalNoMoreSplits(subtask);
}
}
4. Reading(读取阶段)
代码位置: FileStoreSourceSplitReader.fetch()
@Override
public RecordsWithSplitIds<RecordAndPos> fetch() throws IOException {
checkSplitOrStartNext();
// 从当前 Split 的 RecordReader 读取一批数据
RecordReader.RecordIterator<InternalRow> batch;
if (currentFirstBatch != null) {
batch = currentFirstBatch;
currentFirstBatch = null;
} else {
batch = currentReader.readBatch();
}
// 封装成 RecordsWithSplitIds 返回
return new FileStoreRecords(batch, currentSplitId, currentNumRead);
}
private void checkSplitOrStartNext() throws IOException {
if (currentReader != null) {
return; // 当前 Split 还在读取中
}
// 获取下一个 Split
FileStoreSourceSplit split = splits.poll();
if (split == null) {
return; // 没有更多 Splits
}
// 创建 RecordReader 读取这个 Split
TableRead read = readBuilder.newRead();
currentReader = read.createReader(split.split());
currentSplitId = split.splitId();
currentNumRead = 0;
currentFirstBatch = currentReader.readBatch();
}
读取流程:
- 打开 Split 中的数据文件(Parquet/ORC)
- 根据 Schema 创建 Reader
- 应用过滤器(Predicate)
- 应用投影(Projection)
- 批量读取数据(默认一批读取约 2048 条)
- 转换为 Flink RowData
- 发送给下游算子
5. Completed(完成阶段)
当 Split 的所有数据读取完毕:
// Reader 通知 Enumerator:Split 完成
// Enumerator 记录进度(用于 Checkpoint)
6. Failed(失败阶段)
如果 Split 读取失败(例如文件损坏、网络问题):
@Override
public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
// 失败的 Splits 重新加入队列
splitAssigner.addSplitsBack(subtaskId, backSplits);
}
Split 与 Flink 并行度的关系
核心原则
Source 并行度 ≤ Split 数量
原因:
- 每个 Source Task 至少需要一个 Split
- 如果并行度 > Split 数量,部分 Task 会空闲
并行度决策树
不同场景的并行度计算
场景 1:批处理 + 自动推断
配置:
SET 'scan.infer-parallelism' = 'true';
SET 'scan.infer-parallelism.max' = '200';
计算逻辑:
scanSplitsForInference();
parallelism = splitStatistics.splitNumber();
// 应用 limit
if (null != limit && limit > 0) {
int limitCount = limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
parallelism = Math.min(parallelism, limitCount);
}
parallelism = Math.max(1, parallelism);
parallelism = Math.min(parallelism, options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
示例:
假设:
- 表有 100 个分区
- 每个分区 10 个 Bucket
- 每个 Bucket 平均 200MB
- split.target-size = 128MB
计算:
- 总 Bucket 数 = 100 × 10 = 1000
- 每个 Bucket 生成 Splits = 200MB / 128MB ≈ 2
- 总 Split 数 = 1000 × 2 = 2000
推断的并行度 = min(2000, 200) = 200
场景 2:批处理 + 手动设置
配置:
SET 'scan.parallelism' = '50';
结果:
- 并行度 = 50(无论有多少 Splits)
- 如果 Split 数 < 50,部分 Task 空闲
- 如果 Split 数 > 50,每个 Task 处理多个 Splits
场景 3:流处理 + 固定 Bucket
表定义:
CREATE TABLE my_table (...) WITH ('bucket' = '128');
推断逻辑:
if (options.get(CoreOptions.BUCKET) == -1) {
return null; // 动态 Bucket,无法推断
} else {
parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
}
结果:
- 并行度 = 128(等于 Bucket 数)
- 每个 Source Task 读取一个 Bucket
场景 4:流处理 + 动态 Bucket
表定义:
CREATE TABLE my_table (...) WITH ('bucket' = '-1'); -- 动态 Bucket
结果:
- 无法自动推断并行度
- 使用 Flink 的默认并行度或手动设置
并行度与性能的关系
并行度过高
症状:
- 大量小 Splits,任务调度开销大
- 资源浪费(某些 Task 很快完成)
- GC 频繁
示例:
假设:
- 1 个分区,10 个 Bucket,每个 Bucket 50MB
- split.target-size = 128MB
- 生成 10 个 Splits
- 设置并行度 = 100
结果:
- 只有 10 个 Task 有数据,90 个空闲
- 浪费 90% 的资源
解决方案:
-- 降低最大并行度
SET 'scan.infer-parallelism.max' = '20';
并行度过低
症状:
- CPU 利用率低
- 读取速度慢
- 单个 Task 处理的数据量过大
示例:
假设:
- 1000 个分区,每个分区 10 个 Bucket,共 10000 个 Bucket
- 生成 20000 个 Splits
- 设置并行度 = 10
结果:
- 每个 Task 处理 2000 个 Splits
- 串行处理,非常慢
解决方案:
-- 增加最大并行度
SET 'scan.infer-parallelism.max' = '500';
-- 或增大 Split,减少总数
SET 'split.target-size' = '512mb';
最佳实践建议
批处理
建议并行度 = 集群总 Slot 数的 1-2 倍
例如:
- 10 个 TaskManager
- 每个 TM 有 8 个 Slot
- 总 Slot 数 = 80
建议:
- 并行度 = 80 - 160
- Split 数量应 >= 并行度
流处理
建议并行度 = Bucket 数量
例如:
- 表有 256 个 Bucket
建议:
- 并行度 = 256
- TaskManager 数量 = 16-32(假设每个 TM 8-16 Slot)
总结
Split 的核心要点(更新 v2.0)
-
Split 是 Paimon 的读取单元,包含分区、Bucket、文件列表
-
Split 大小可配置,通过
split.target-size调整(默认 128MB) -
⚠️ Split 数量受表类型影响:
- 主键表:通常 Split 数量 ≈ Bucket 数量(因为 Key Range 约束)
- 非主键表:Split 数量 ≈ 总数据量 / splitTargetSize(可自由切分)
-
分区过滤发生在多个层级,从 Manifest 到 ManifestEntry 到数据文件
-
⚠️ 主键表 vs 非主键表的核心差异:
- 主键表:必须保证相同 Key 的文件在同一个 Split 中 Merge
- 非主键表:可以随意切分,每个文件独立读取
分配策略选择
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 数据倾斜明显 | FAIR | 负载均衡 |
| Split 大小均匀 | PREEMPTIVE | 简单高效 |
| 批处理 | FAIR | 可以预先分配 |
| 流处理 | PREEMPTIVE | 动态分配 |
| 主键表(批处理) | FAIR | 通常 Split 数量少,倾斜明显 |
| 非主键表(批处理) | PREEMPTIVE 或 FAIR | Split 数量多,两种都可以 |
性能优化技巧(区分表类型)
主键表优化:
-
合理设置 Bucket 数量:
- Bucket 数 = 期望的流式并行度
- 批处理并行度受限,Bucket 不宜过少
-
定期 Compaction:
- 减少 Level 0 文件,提高 Raw 转换率
- 提升批处理并行度
-
启用分区裁剪:
- WHERE 条件包含分区字段
-
考虑表设计:
- 如果主要是批处理查询且不需要更新
- 考虑改用非主键表
非主键表优化:
-
充分利用并行度:
- 调整
split.target-size生成更多 Splits - 提高
scan.infer-parallelism.max
- 调整
-
合并小文件:
- 定期 Compaction 减少文件数
- 避免打开文件的开销
-
Bucket 数量:
- 批处理:Bucket 数不是限制因素
- 可以设置较少的 Bucket(8-32)减少小文件
-
启用分区裁剪:
- WHERE 条件包含分区字段
如果你喜欢这篇文章,请转发、点赞。扫描下方二维码关注我们,您会收到更多优质文章推送
关注「Java源码进阶」,获取海量java,大数据,机器学习资料!
更多推荐


所有评论(0)