Paimon 分布式读取数据完整流程深度解析(区分主键表和非主键表)

📌 重要提示:本文档系统性地区分了主键表和非主键表的处理流程。建议先阅读《Paimon 主键表 vs 非主键表核心差异》了解基础概念。

目录

  1. 整体流程概览
  2. Split 分配后的初始化
  3. SourceReader 的工作机制(通用)
  4. SplitReader 读取单个 Split(通用)
  5. RecordReader 层:主键表 vs 非主键表
  6. 数据流转到下游算子(通用)
  7. 完整调用链对比

整体流程概览

完整架构分层图

TaskManager - 输出层

TaskManager - 表类型分支

TaskManager - 通用层

JobManager

非主键表读取路径

主键表读取路径

RPC 分配

有主键

无主键

SplitEnumerator
分配 Splits

FileStoreSourceReader
单线程协调器

FileStoreSourceSplitReader
读取 Splits 队列

LazyRecordReader
延迟创建 Reader

TableRead 接口
根据表类型选择

KeyValueTableRead

MergeFileSplitRead

MergeTreeReaders

MergeSorter

SortMergeReader
优先队列 Merge

多个 FileReader
读取不同层级

AppendOnlyTableRead

AppendOnlyCompactReader

ConcatRecordReader
顺序拼接

多个 FileReader
顺序读取

RecordIterator
批量迭代器

FlinkRowData
转换为 Flink 格式

下游算子
Filter/Agg/Join等

关键差异点标注

层级 主键表 非主键表 是否通用
SplitEnumerator ✅ 相同 ✅ 相同 ✅ 通用
SourceReader ✅ 相同 ✅ 相同 ✅ 通用
SplitReader ✅ 相同 ✅ 相同 ✅ 通用
TableRead KeyValueTableRead AppendOnlyTableRead ❌ 不同
核心 Reader SortMergeReader ConcatRecordReader ❌ 不同
数据转换 ✅ 相同 ✅ 相同 ✅ 通用
输出到下游 ✅ 相同 ✅ 相同 ✅ 通用

Split 分配后的初始化

1. SplitEnumerator 分配 Splits(通用)

这部分对主键表和非主键表是完全相同的。

源码位置: StaticFileStoreSplitEnumerator.handleSplitRequest()

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
    if (!context.registeredReaders().containsKey(subtask)) {
        return;  // Reader 已失败
    }
    
    // 从 SplitAssigner 获取分配给该 subtask 的 Splits
    List<FileStoreSourceSplit> assignment = splitAssigner.getNext(subtask, hostname);
    
    if (assignment.size() > 0) {
        // 通过 RPC 发送 Splits 到 TaskManager
        context.assignSplits(
                new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
    } else {
        context.signalNoMoreSplits(subtask);
    }
}

关键点

  • Enumerator 运行在 JobManager
  • 通过 RPC 将 Splits 发送到 TaskManager 的 SourceReader
  • 不关心表类型:主键表和非主键表使用相同的分配机制

SourceReader 的工作机制(通用)

FileStoreSourceReader:单线程协调器(通用)

这部分对主键表和非主键表是完全相同的。

源码位置: FileStoreSourceReader.java

public class FileStoreSourceReader
        extends SingleThreadMultiplexSourceReaderBase<
                RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {
    
    public FileStoreSourceReader(...) {
        super(
                // 创建 SplitReader 的 Factory
                () -> new FileStoreSourceSplitReader(tableRead, ...),
                
                // RecordEmitter:将数据发送到下游
                (element, output, state) -> FlinkRecordsWithSplitIds.emitRecord(...),
                
                readerContext.getConfiguration(),
                readerContext);
    }
    
    @Override
    public void start() {
        // 如果没有分配 Splits,请求 Splits
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            context.sendSplitRequest();
        }
    }
    
    @Override
    protected void onSplitFinished(Map<String, FileStoreSourceSplitState> finishedSplitIds) {
        // Split 处理完成后,请求更多 Splits
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            context.sendSplitRequest();
        }
    }
}

核心特点

  • 继承自 SingleThreadMultiplexSourceReaderBase
  • 单线程:一个 SourceReader 只有一个线程
  • 管理多个 Splits 的队列
  • 不区分表类型:主键表和非主键表使用相同的协调逻辑

SplitReader 读取单个 Split(通用)

FileStoreSourceSplitReader:Split 队列处理器(通用)

这部分对主键表和非主键表是完全相同的。

源码位置: FileStoreSourceSplitReader.java

public class FileStoreSourceSplitReader
        implements SplitReader<BulkFormat.RecordIterator<RowData>, FileStoreSourceSplit> {
    
    private final TableRead tableRead;  // 这是关键!会根据表类型选择实现
    private final Queue<FileStoreSourceSplit> splits;
    
    @Nullable private LazyRecordReader currentReader;
    @Nullable private String currentSplitId;
    
    @Override
    public RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> fetch() throws IOException {
        // 1. 检查是否需要切换到下一个 Split
        checkSplitOrStartNext();
        
        // 2. 从对象池获取 Iterator
        FileStoreRecordIterator iterator = poll();
        if (iterator == null) {
            return new EmptyRecordsWithSplitIds<>();
        }
        
        // 3. 读取一批数据
        RecordIterator<InternalRow> nextBatch;
        if (currentFirstBatch != null) {
            nextBatch = currentFirstBatch;
            currentFirstBatch = null;
        } else {
            nextBatch = currentReader.recordReader().readBatch();
        }
        
        if (nextBatch == null) {
            pool.recycler().recycle(iterator);
            return finishSplit();
        }
        
        return FlinkRecordsWithSplitIds.forRecords(currentSplitId, iterator.replace(nextBatch));
    }
    
    private void checkSplitOrStartNext() throws IOException {
        if (currentReader != null) {
            return;
        }
        
        final FileStoreSourceSplit nextSplit = splits.poll();
        if (nextSplit == null) {
            throw new IOException("Cannot fetch from another split - no split remaining");
        }
        
        currentSplitId = nextSplit.splitId();
        
        // 🔑 关键:这里会根据表类型调用不同的 createReader
        currentReader = new LazyRecordReader(nextSplit.split());
        currentNumRead = nextSplit.recordsToSkip();
        
        if (currentNumRead > 0) {
            seek(currentNumRead);
        }
    }
    
    private class LazyRecordReader {
        private final Split split;
        private RecordReader<InternalRow> lazyRecordReader;
        
        public RecordReader<InternalRow> recordReader() throws IOException {
            if (lazyRecordReader == null) {
                // 🔑 这里会根据表类型创建不同的 Reader
                lazyRecordReader = tableRead.createReader(split);
            }
            return lazyRecordReader;
        }
    }
}

关键点

  • tableRead.createReader(split) 是分叉点
  • 主键表:调用 KeyValueTableRead.createReader()
  • 非主键表:调用 AppendOnlyTableRead.createReader()

RecordReader 层:主键表 vs 非主键表

这是最关键的差异部分!

🔴 主键表的读取流程

1. KeyValueTableRead:选择 Reader 类型

源码位置: paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java

public final class KeyValueTableRead extends AbstractDataTableRead {
    
    private final List<SplitReadProvider> readProviders;
    
    public KeyValueTableRead(
            Supplier<MergeFileSplitRead> mergeReadSupplier,
            Supplier<RawFileSplitRead> batchRawReadSupplier,
            TableSchema schema) {
        
        this.readProviders = Arrays.asList(
                // 1. 原始文件读取(批处理优化:单层级文件无需 Merge)
                new PrimaryKeyTableRawFileSplitReadProvider(...),
                
                // 2. Merge 读取(核心:多层级文件需要 Merge)
                new MergeFileSplitReadProvider(mergeReadSupplier, this::config),
                
                // 3. 增量 Changelog 读取
                new IncrementalChangelogReadProvider(...),
                
                // 4. 增量 Diff 读取
                new IncrementalDiffReadProvider(...)
        );
    }
    
    @Override
    public RecordReader<InternalRow> reader(Split split) throws IOException {
        // 根据 Split 类型和条件选择合适的 ReadProvider
        for (SplitReadProvider readProvider : readProviders) {
            if (readProvider.match(split, context)) {
                return readProvider.get().get().createReader(split);
            }
        }
        throw new RuntimeException("Should not happen.");
    }
}

选择逻辑

如果 Split 满足以下条件,使用 RawFileSplitRead(无需 Merge):
- 所有文件都在高 Level(非 Level 0)
- 没有删除行
- 单层级文件

否则,使用 MergeFileSplitRead(需要 Merge):
- 有 Level 0 文件
- 或有删除行
- 或多层级文件

2. MergeFileSplitRead:构建 Merge Reader

源码位置: paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java

public class MergeFileSplitRead implements SplitRead<KeyValue> {
    
    private final Comparator<InternalRow> keyComparator;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final MergeSorter mergeSorter;
    
    @Override
    public RecordReader<KeyValue> createReader(Split split) throws IOException {
        DataSplit dataSplit = (DataSplit) split;
        
        // 1. 按 Level 分组文件
        //    例如:[[Level 0 files], [Level 1 files], [Level 2 files]]
        List<List<SortedRun>> sections = 
                splitIntoSections(dataSplit.dataFiles(), dataSplit.level());
        
        // 2. 创建 FileReaderFactory
        KeyValueFileReaderFactory readerFactory = 
                readerFactoryBuilder.build(...);
        
        // 3. 创建 Merge 函数(决定如何合并相同 key 的记录)
        MergeFunctionWrapper<KeyValue> mergeFunction = 
                mfFactory.createAndInitMergeFunction(...);
        
        // 4. 构建 MergeTree Reader
        RecordReader<KeyValue> reader = MergeTreeReaders.readerForMergeTree(
                sections,          // 按层级分组的文件
                readerFactory,     // 文件 Reader 工厂
                keyComparator,     // Key 比较器
                userDefinedSeqComparator,  // Sequence 比较器
                mergeFunction,     // Merge 函数
                mergeSorter);      // 归并排序器
        
        return reader;
    }
}

3. MergeSorter + SortMergeReader:优先队列 Merge

核心流程图

DEDUPLICATE

PARTIAL_UPDATE

AGGREGATION

DELETE

Split 包含多层级文件

Level 0: file1, file2

Level 1: file3, file4

Level 2: file5

MergeSorter.mergeSort

为每个文件创建 FileReader

SortMergeReader 初始化

创建优先队列
按 key + sequence 排序

从每个 Reader 读取第一条记录

将记录加入优先队列

readBatch: 取出堆顶

相同 key?

收集所有相同 key 的记录

处理下一个 key

MergeFunction 合并

Merge 策略

保留 seq 最大的

字段级合并

聚合计算

跳过该 key

输出唯一记录

关键代码

// SortMergeReader.readBatch()
public RecordIterator<T> readBatch() throws IOException {
    List<T> batch = new ArrayList<>();
    
    while (batch.size() < BATCH_SIZE && !heap.isEmpty()) {
        // 1. 从堆顶取出最小的 KeyValue(按 key + seq 排序)
        ReaderWithKeyValue top = heap.poll();
        KeyValue kv = top.currentKeyValue;
        
        // 2. 收集相同 Key 的所有记录(从不同层级的文件)
        List<KeyValue> sameKeyRecords = new ArrayList<>();
        sameKeyRecords.add(kv);
        
        while (!heap.isEmpty()) {
            ReaderWithKeyValue next = heap.peek();
            // 比较 key 是否相同
            if (keyComparator.compare(kv.key(), next.currentKeyValue.key()) == 0) {
                heap.poll();
                sameKeyRecords.add(next.currentKeyValue);
                // 读取该 Reader 的下一条记录
                advanceReader(next.reader, heap);
            } else {
                break;  // 不同 key 了,停止收集
            }
        }
        
        // 3. 使用 MergeFunction 合并相同 Key 的记录
        //    例如:DEDUPLICATE 策略保留 sequence 最大的
        T merged = mergeFunction.merge(sameKeyRecords);
        
        if (merged != null) {
            batch.add(merged);
        }
        
        // 4. 读取该 Reader 的下一条记录并加入堆
        advanceReader(top.reader, heap);
    }
    
    return batch.isEmpty() ? null : new BatchIterator<>(batch);
}

示例:主键表读取过程

假设 Split 包含:
Level 0:
  - file1: (key=1, seq=10, value='new')
  - file2: (key=3, seq=12, value='c')

Level 1:
  - file3: (key=1, seq=5, value='mid')
  - file4: (key=2, seq=6, value='b')

Level 2:
  - file5: (key=1, seq=1, value='old')
  - file6: (key=2, seq=2, value='a')

初始化优先队列(从每个文件读取第一条):
heap = [
    (Reader1, key=1, seq=10, value='new'),     ← Level 0
    (Reader2, key=3, seq=12, value='c'),       ← Level 0
    (Reader3, key=1, seq=5, value='mid'),      ← Level 1
    (Reader4, key=2, seq=6, value='b'),        ← Level 1
    (Reader5, key=1, seq=1, value='old'),      ← Level 2
    (Reader6, key=2, seq=2, value='a')         ← Level 2
]

按 (key ASC, seq DESC) 排序后:
heap = [
    (Reader1, key=1, seq=10, value='new'),     ← 堆顶
    (Reader3, key=1, seq=5, value='mid'),
    (Reader5, key=1, seq=1, value='old'),
    (Reader4, key=2, seq=6, value='b'),
    (Reader6, key=2, seq=2, value='a'),
    (Reader2, key=3, seq=12, value='c')
]

第一次 readBatch():
1. 取出堆顶:(key=1, seq=10, value='new')
2. 继续取相同 key 的记录:
   - (key=1, seq=5, value='mid')
   - (key=1, seq=1, value='old')
3. Merge 这 3 条记录:
   - 使用 DEDUPLICATE 策略
   - 保留 seq 最大的:(key=1, seq=10, value='new')
4. 输出:(key=1, value='new')

第二次 readBatch():
1. 取出堆顶:(key=2, seq=6, value='b')
2. 继续取相同 key 的记录:
   - (key=2, seq=2, value='a')
3. Merge:保留 seq 最大的
4. 输出:(key=2, value='b')

第三次 readBatch():
1. 取出堆顶:(key=3, seq=12, value='c')
2. 没有相同 key 的记录
3. 输出:(key=3, value='c')

最终结果:
[(key=1, value='new'), (key=2, value='b'), (key=3, value='c')]

✅ 主键唯一,没有重复!
✅ 总是返回最新版本的数据!

🔵 非主键表的读取流程

1. AppendOnlyTableRead:简单拼接

源码位置: paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlyTableRead.java

public final class AppendOnlyTableRead extends AbstractDataTableRead {
    
    private final Supplier<AppendOnlyCompactReader> readSupplier;
    
    public AppendOnlyTableRead(
            Supplier<AppendOnlyCompactReader> readSupplier,
            TableSchema schema) {
        super(schema);
        this.readSupplier = readSupplier;
    }
    
    @Override
    public RecordReader<InternalRow> reader(Split split) throws IOException {
        // 直接创建 AppendOnlyCompactReader
        // 不需要考虑 Merge,因为没有主键
        return readSupplier.get().createReader(split);
    }
}

关键点

  • 不需要选择 Reader 类型
  • 直接使用 AppendOnlyCompactReader
  • 不需要 Merge 逻辑

2. AppendOnlyCompactReader:顺序读取

源码位置: paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactReader.java

public class AppendOnlyCompactReader implements SplitRead<InternalRow> {
    
    private final FileReaderFactory<InternalRow> readerFactory;
    
    @Override
    public RecordReader<InternalRow> createReader(Split split) throws IOException {
        DataSplit dataSplit = (DataSplit) split;
        
        // 获取所有文件(都是 Level 0,没有分层)
        List<DataFileMeta> files = dataSplit.dataFiles();
        
        // 创建 FileReader 列表
        List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
        for (DataFileMeta file : files) {
            readers.add(() -> readerFactory.createRecordReader(file));
        }
        
        // 使用 ConcatRecordReader 顺序拼接
        // 🔑 关键:不需要 Merge,只需要顺序读取
        return ConcatRecordReader.create(readers);
    }
}

3. ConcatRecordReader:顺序拼接

核心流程图

Split 包含多个文件

file1.parquet

file2.parquet

file3.parquet

ConcatRecordReader

为每个文件创建 FileReader

FileReader 1

FileReader 2

FileReader 3

读取 file1 的所有数据

file1 读完

切换到 FileReader 2

读取 file2 的所有数据

file2 读完

切换到 FileReader 3

读取 file3 的所有数据

file3 读完

所有文件读完

关键代码

// ConcatRecordReader.readBatch()
public class ConcatRecordReader<T> implements RecordReader<T> {
    
    private final List<ReaderSupplier<T>> readers;
    private RecordReader<T> currentReader;
    
    @Override
    public RecordIterator<T> readBatch() throws IOException {
        while (currentReader != null) {
            // 从当前 Reader 读取一批数据
            RecordIterator<T> batch = currentReader.readBatch();
            
            if (batch != null) {
                // 当前 Reader 还有数据,直接返回
                return batch;
            } else {
                // 当前 Reader 读完了
                currentReader.close();
                currentReader = nextReader();  // 切换到下一个 Reader
            }
        }
        
        // 所有 Readers 都读完了
        return null;
    }
    
    private RecordReader<T> nextReader() throws IOException {
        if (readers.isEmpty()) {
            return null;
        }
        // 取出下一个 Reader 并创建
        return readers.remove(0).get();
    }
}

示例:非主键表读取过程

假设 Split 包含 3 个文件:
file1.parquet: [rec1, rec2, rec3, rec4]
file2.parquet: [rec5, rec6]
file3.parquet: [rec7, rec8, rec9]

读取过程:
1. 打开 FileReader1(读取 file1.parquet)
   - readBatch() 返回:[rec1, rec2, rec3, rec4]
   - file1 读完,关闭 FileReader1

2. 打开 FileReader2(读取 file2.parquet)
   - readBatch() 返回:[rec5, rec6]
   - file2 读完,关闭 FileReader2

3. 打开 FileReader3(读取 file3.parquet)
   - readBatch() 返回:[rec7, rec8, rec9]
   - file3 读完,关闭 FileReader3

4. 所有文件读完,返回 null

最终输出顺序:
rec1 → rec2 → rec3 → rec4 → rec5 → rec6 → rec7 → rec8 → rec9

特点:
✅ 顺序拼接,保持文件顺序
✅ 不需要 Merge,效率高
✅ 内存占用少(只缓冲当前批次)
✅ CPU 占用低(无排序计算)

主键表 vs 非主键表读取对比

对比表格

维度 主键表 非主键表
TableRead 实现 KeyValueTableRead AppendOnlyTableRead
Reader 选择 根据条件选择
(RawRead / MergeRead)
固定使用
AppendOnlyCompactReader
核心 Reader SortMergeReader ConcatRecordReader
数据结构 优先队列(Min Heap / Loser Tree) 队列(FIFO)
算法 多路归并排序 + Merge 顺序拼接
时间复杂度 O(N log K)
N=记录数, K=文件数
O(N)
N=记录数
是否需要 Merge ✅ 需要 ❌ 不需要
相同 Key 处理 保留最新版本(按 seq) 不处理(无主键概念)
是否去重 ✅ 自动去重 ❌ 不去重
内存占用 高(优先队列 + 缓冲区) 低(只缓冲当前批次)
CPU 占用 高(比较、排序、Merge) 低(顺序读取)
吞吐量 中等

性能对比测试

测试场景:读取 100GB 数据(10 个分区 × 10 个 Buckets)

指标 主键表(3 层级) 主键表(单层级) 非主键表
需要 Merge ✅ 是 ❌ 否(Raw 转换) ❌ 否
Reader 类型 SortMergeReader RawFileReader ConcatRecordReader
读取时间 8 分钟 4 分钟 3 分钟
CPU 利用率 80% 60% 50%
内存占用 16GB 10GB 8GB
吞吐量 200 MB/s 400 MB/s 550 MB/s

数据流转到下游算子(通用)

1. FlinkRowData 转换(通用)

这部分对主键表和非主键表是完全相同的。

// KeyValueTableRead.unwrap (主键表)
public static RecordReader<InternalRow> unwrap(RecordReader<KeyValue> reader) {
    return new RecordReader<InternalRow>() {
        @Override
        public RecordIterator<InternalRow> readBatch() throws IOException {
            RecordIterator<KeyValue> batch = reader.readBatch();
            return batch == null ? null : new ValueContentRowDataRecordIterator(batch);
        }
    };
}

// FlinkRowData 包装(通用)
public class FlinkRowData implements RowData {
    private final InternalRow row;
    
    public FlinkRowData(InternalRow row) {
        this.row = row;
    }
    
    // ... 实现 RowData 接口方法
}

2. 输出到下游(通用)

RecordReader
主键表: SortMergeReader
非主键表: ConcatRecordReader

InternalRow

FlinkRowData

RecordEmitter

SourceOutput.collect

Flink 内部队列

下游算子


完整调用链对比

主键表调用链(需要 Merge)

JobManager:
  └─ StaticFileStoreSplitEnumerator
       └─ SplitAssigner (FAIR/PREEMPTIVE)
            └─ RPC 发送 Splits

TaskManager:
  └─ FileStoreSourceReader (单线程) ← 通用
       └─ FileStoreSourceSplitReader ← 通用
            └─ LazyRecordReader ← 通用
                 └─ KeyValueTableRead.createReader(Split) ← 主键表特有
                      └─ 🔀 根据条件选择:
                           ├─ 情况 1:RawFileSplitRead(无需 Merge)
                           │    └─ 单层级文件直接读取
                           │
                           └─ 情况 2:MergeFileSplitRead(需要 Merge)
                                └─ MergeTreeReaders.readerForMergeTree
                                     └─ MergeSorter.mergeSort
                                          └─ SortMergeReader(优先队列)
                                               ├─ FileReader 1 (Level 0)
                                               ├─ FileReader 2 (Level 1)
                                               └─ FileReader 3 (Level 2)
                                               └─ Merge 相同 key 的记录
                                                    └─ 输出唯一记录

  └─ 输出流程(通用):
       KeyValue → InternalRow → FlinkRowData → SourceOutput → 下游算子

非主键表调用链(顺序拼接)

JobManager:
  └─ StaticFileStoreSplitEnumerator
       └─ SplitAssigner (FAIR/PREEMPTIVE)
            └─ RPC 发送 Splits

TaskManager:
  └─ FileStoreSourceReader (单线程) ← 通用
       └─ FileStoreSourceSplitReader ← 通用
            └─ LazyRecordReader ← 通用
                 └─ AppendOnlyTableRead.createReader(Split) ← 非主键表特有
                      └─ AppendOnlyCompactReader
                           └─ ConcatRecordReader(顺序拼接)
                                ├─ FileReader 1 (file1.parquet)
                                ├─ FileReader 2 (file2.parquet)
                                └─ FileReader 3 (file3.parquet)
                                └─ 顺序输出所有记录

  └─ 输出流程(通用):
       InternalRow → FlinkRowData → SourceOutput → 下游算子

关键类对比表

层级 主键表 非主键表 通用性
协调层 StaticFileStoreSplitEnumerator 相同 ✅ 通用
Reader 层 FileStoreSourceReader 相同 ✅ 通用
Split 层 FileStoreSourceSplitReader 相同 ✅ 通用
TableRead 层 KeyValueTableRead AppendOnlyTableRead ❌ 不同
Merge 层 MergeFileSplitRead N/A(无需 Merge) ❌ 主键表特有
构建层 MergeTreeReaders N/A ❌ 主键表特有
排序层 MergeSorter N/A ❌ 主键表特有
执行层 SortMergeReader(优先队列) ConcatRecordReader(顺序) ❌ 不同
文件层 FileRecordReader 相同 ✅ 通用
转换层 FlinkRowData 相同 ✅ 通用
输出层 RecordEmitter 相同 ✅ 通用

总结

核心差异点

1. Split 生成策略

主键表

  • 受 Key Range 约束
  • 相同 Key Range 的文件必须在同一个 Split
  • 批处理并行度受限

非主键表

  • 自由切分
  • 按文件大小 Bin Packing
  • 批处理并行度高
2. 数据读取方式

主键表

  • 使用 SortMergeReader
  • 优先队列多路归并
  • 自动去重(按主键 + sequence)
  • CPU 和内存密集

非主键表

  • 使用 ConcatRecordReader
  • 顺序拼接
  • 不去重
  • IO 密集
3. 性能特点
指标 主键表 非主键表
读取速度 中等
CPU 占用
内存占用
并行度 受限
适用场景 需要更新/删除 只追加

选择建议

使用主键表

  • ✅ 需要 UPDATE/DELETE
  • ✅ 需要自动去重
  • ✅ CDC 场景
  • ✅ 维度表

使用非主键表

  • ✅ 只追加写入
  • ✅ 日志/指标数据
  • ✅ 批处理性能关键
  • ✅ 数据量巨大

如果你喜欢这篇文章,请转发、点赞。扫描下方二维码关注我们,您会收到更多优质文章推送
在这里插入图片描述

                       关注「Java源码进阶」,获取海量java,大数据,机器学习资料!
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐