Paimon 是否能够多个任务同时写同一个桶(Sequence field 、自定义比较顺序、partial update 和 多流join 详解)
本文深入分析了Apache Paimon中多流Join场景下的并发写入机制。通过配置last_non_null_value聚合函数,多个Flink任务可以安全地并发写入同一Paimon表的不同字段。关键点包括:1) 各任务写入的字段需严格不重叠;2) 独立sequenceNumber不会影响最终合并结果;3) PartialUpdateMergeFunction通过字段级比较器确保正确合并。
多 Flink 任务写入一个桶能不能实现多流join
场景设定
-
任务A:只写入
col_a
和col_b
字段。 -
任务B:只写入
col_c
和col_d
字段。 -
并发写入:两个任务并发写入,它们的
sequenceNumber
是完全独立的,因此在全局看来是乱序的。 -
聚合配置:所有值字段(
col_a
到col_d
)都配置了last_non_null_value
聚合。
只要每个 Flink 任务只写自己的字段,两个任务的字段不互相干扰,即使 sequenceNumber
乱序,last_non_null_value
也能正确工作。
通过配置设置agg函数
public class CoreOptions implements Serializable {
public static final String FIELDS_PREFIX = "fields";
public static final String FIELDS_SEPARATOR = ",";
public static final String AGG_FUNCTION = "aggregate-function";
public String fieldAggFunc(String fieldName) {
return options.get(
key(FIELDS_PREFIX + "." + fieldName + "." + AGG_FUNCTION)
.stringType()
.noDefaultValue());
}
PartialUpdateMergeFunction 会调用获得名字
@Nullable
private String getAggFuncName(CoreOptions options, String fieldName) {
String aggFunc = options.fieldAggFunc(fieldName);
return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
}
推演过程
之后分析了PartialUpdateMergeFunction 的实现
我们来推演一下为什么能正确工作:
假设对于主键 pk=1
,发生了以下事件:
-
T1时刻:任务A 写入
{pk:1, col_a:'A1', col_b:null, col_c:null, col_d:null}
。Paimon 分配
sequenceNumber = 101
。 -
T2时刻:任务B 写入
{pk:1, col_a:null, col_b:null, col_c:'C1', col_d:null}
。Paimon 分配
sequenceNumber = 201
。 -
T3时刻:任务A 写入
{pk:1, col_a:'A2', col_b:null, col_c:null, col_d:null}
。Paimon 分配
sequenceNumber = 102
。
现在,这三条记录在 Compaction 时相遇了。假设没有配置 sequence.field
,SortMergeReader
会根据 sequenceNumber
对它们进行排序:
-
记录1:
{..., seq: 101}
-
记录3:
{..., seq: 102}
-
记录2:
{..., seq: 201}
PartialUpdateMergeFunction
会按 1 → 3 → 2 的顺序处理它们:
初始化
row = {col_a:null, col_b:null, col_c:null, col_d:null}
处理记录1 (seq=101)
-
col_a
更新为'A1'
。 -
其他字段都是
null
,last_non_null_value
忽略它们。 -
当前 row 状态:
{col_a:'A1', col_b:null, col_c:null, col_d:null}
处理记录3 (seq=102)
-
col_a
更新为'A2'
。 -
其他字段都是
null
,忽略。 -
当前 row 状态:
{col_a:'A2', col_b:null, col_c:null, col_d:null}
处理记录2 (seq=201)
-
col_a
是null
,忽略。 -
col_c
更新为'C1'
。 -
其他字段是
null
,忽略。 -
当前 row 状态:
{col_a:'A2', col_b:null, col_c:'C1', col_d:null}
最终合并结果
{col_a:'A2', col_b:null, col_c:'C1', col_d:null}
为什么能行?
关键在于以下两点:
-
字段不重叠
任务A 和 任务B 操作的是完全不同的字段集合。任务A 的写入对
col_c
和col_d
来说永远是null
,反之亦然。 -
last_non_null_value
的幂等性该聚合函数的逻辑是“只要新来的不是
null
就覆盖”。由于字段不重叠,任务A 的sequenceNumber
乱序只会影响col_a
和col_b
的合并顺序,但不会干扰到col_c
和col_d
。同理,任务B 的sequenceNumber
也不会影响到col_a
和col_b
。
快照提交的原子保证
详细分析见:
通过重命名的原子性;或者Hive本身锁;MYSQL唯一主键
在HDFS中,重命名(Rename)操作本身是一个原子性的元数据操作。对于NameNode来说,它只是改变一个inode的指向。这个操作要么完全成功,使得最终文件立即可见且包含完整数据;要么完全失败,最终文件完全不存在。客户端永远不会看到一个只写了一半的、损坏的最终文件。
重要警告
这种模式能够成功,强依赖于“字段不重叠”这个假设。
如果任务A 和 任务B 都可能写入 col_a
,那么 sequenceNumber
的乱序就会导致 col_a
的最终值变得不确定,从而产生数据不一致。
在这种情况下,就必须引入
sequence.field
来提供一个全局统一的业务时钟,以确保无论哪个任务写入,都能根据业务时间来决定胜负。
SequenceNumber(序列号)
其本身是 MergeTreeWriter的一个long字段产生的
定义:SequenceNumber
是KeyValue
类中的一个长整型字段,用于标识记录的时间顺序或版本信息。
作用:
-
在
PartialUpdateMergeFunction.add()
方法中,通过kv.sequenceNumber()
获取 -
当没有配置sequence group时,作为主要的排序依据
SeqComparators(序列比较器)
定义:SeqComparators
是FieldsComparator
接口的实现集合,存储在fieldSeqComparators
字段中。
作用:
-
实现自定义的字段比较逻辑,替代默认的SequenceNumber比较
-
支持基于业务字段(如时间戳、版本号)的排序
-
在
updateWithSequenceGroup
和retractWithSequenceGroup
方法中使用
核心接口:
public interface FieldsComparator extends Comparator<InternalRow> {
int[] compareFields(); // 返回参与比较的字段索引
}
主要实现:UserDefinedSeqComparator
,通过代码生成实现高效的字段比较。
Sequence field(序列字段)
定义:Sequence field
是用于比较的字段,通常包含时间戳、版本号等具有顺序意义的业务字段。
配置方式:
-
通过
fields.<field_name>.sequence-group
配置 -
在Factory类构造函数中解析配置:
可以看到解析格式是:fields.f1,f2...,fn.sequence-group
public static final String FIELDS_PREFIX = "fields";
public static final String FIELDS_SEPARATOR = ",";
public static final String SEQUENCE_GROUP = "sequence-group";
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
int[] sequenceFields =
Arrays.stream(
k.substring(
FIELDS_PREFIX.length() + 1,
k.length()
- SEQUENCE_GROUP.length()
- 1)
.split(FIELDS_SEPARATOR))
.mapToInt(fieldName -> requireField(fieldName, fieldNames))
.toArray();
作用:
-
作为比较的依据字段
-
可以是单个字段或多个字段的组合
-
在sequence group中作为排序的基准
Sequence group(序列组)
定义:Sequence group
是一组字段的集合,这些字段在更新时需要一起处理,具有相同的序列比较逻辑。
配置语法:
fields.<sequence_field1>,<sequence_field2>.sequence-group=<target_field1>,<target_field2>
配置示例:
fields.update_time.sequence-group=name,age,address
PartialUpdateMergeFunction里的设置,可以看到对于每个等号右边的字段设置比较器就是sequence field
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
int[] sequenceFields = ...;
Supplier<FieldsComparator> userDefinedSeqComparator =
() -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
Arrays.stream(v.split(FIELDS_SEPARATOR))
.map(fieldName -> requireField(fieldName, fieldNames))
.forEach(
field -> {
if (fieldSeqComparators.containsKey(field)) {
throw new IllegalArgumentException(
String.format(
"Field %s is defined repeatedly by multiple groups: %s",
fieldNames.get(field), k));
}
fieldSeqComparators.put(field, userDefinedSeqComparator);
});
keyComparator
vs FieldsComparator
(userDefinedSeqComparator
)
这两个比较器在 MergeTreeWriter
中扮演着不同但互补的角色,共同决定了数据的最终顺序和合并结果。
-
keyComparator
(主键比较器)-
作用:它的核心作用是对数据记录按主键(Primary Key)进行排序。在 Paimon 的 Merge-Tree (LSM) 结构中,所有的数据文件(SSTable)内部都必须是按主 key 有序的。
-
使用场景:当内存中的
WriteBuffer
(即SortBufferWriteBuffer
) 写满并需要刷盘时,它会使用keyComparator
对缓冲区内的所有KeyValue
记录进行排序,然后将排序后的结果写入一个新的 Level-0 文件。这个排序是后续所有 Compaction (归并) 操作高效进行的基础。 -
总结:
keyComparator
决定了数据在文件中的物理存储顺序,是 LSM-Tree 结构的核心要求。
-
-
userDefinedSeqComparator
(FieldsComparator
) (用户定义序列比较器)-
作用:它是一个可选的、用于处理主键冲突的仲裁比较器。当用户配置了
sequence.field
选项时,Paimon 会根据该字段生成一个FieldsComparator
。它的作用是:当两条或多条记录具有相同的主键时,使用这个比较器来判断哪条记录是“最新”的。
-
UserDefinedSeqComparator创建
主要是KeyValueFileStoreWrite负责,对于写入
return new MergeTreeWriter(
options.writeBufferSpillable(),
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
ioManager,
compactManager,
restoredMaxSeqNumber,
keyComparator,
mfFactory.create(),
writerFactory,
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
UserDefinedSeqComparator.create(valueType, options));
使用了工具类UserDefinedSeqComparator
@Nullable
public static UserDefinedSeqComparator create(RowType rowType, CoreOptions options) {
return create(
rowType, options.sequenceField(), options.sequenceFieldSortOrderIsAscending());
}
对于compaction也是一样的
@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();
CompactRewriter rewriter =
createRewriter(
partition,
bucket,
keyComparator,
userDefinedSeqComparator,
levels,
dvMaintainer);
这个supply是KeyValueFileStore传递来的,但实际是一样的。
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
schema,
commitUser,
partitionType,
keyType,
valueType,
keyComparatorSupplier,
() -> UserDefinedSeqComparator.create(valueType, options),
logDedupEqualSupplier,
mfFactory,
Compaction 比较次序
在 Compaction 的核心组件 SortMergeReaderWithMinHeap
中,当合并来自不同文件的记录时,其排序逻辑(定义在 PriorityQueue
的构造函数中)非常清晰:
// ...existing code...
(e1, e2) -> {
// 1. 首先,比较主键 (Primary Key)
int result = userKeyComparator.compare(e1.kv.key(), e2.kv.key());
if (result != 0) {
return result;
}
// 2. 如果主键相同,则比较 sequence.field
if (userDefinedSeqComparator != null) {
result =
userDefinedSeqComparator.compare(
e1.kv.value(),
e2.kv.value());
if (result != 0) {
return result;
}
}
// 3. 如果主键和 sequence.field 都相同,才最后比较 sequenceNumber
return Long.compare(e1.kv.sequenceNumber(), e2.kv.sequenceNumber());
}
// ...existing code...
正确的比较优先级链
-
主键 (
userKeyComparator
)这是最高优先级的排序依据,决定了数据的大致顺序。
-
序列字段 (
userDefinedSeqComparator
)当主键相同时,它作为第一道平局决胜规则 (Tie-breaker)。Paimon 会根据
sequence.field
的值来判断哪条记录在业务逻辑上“更领先”。 -
序列号 (
sequenceNumber
)只有在主键和
sequence.field
都完全相同的情况下,它才作为最终的平局决胜规则。sequenceNumber
更大的记录被认为是“更新”的。
结论:
sequence.field
的作用域贯穿了从内存写入到跨文件合并的全过程,是处理主键冲突时的首要业务逻辑判断依据;sequenceNumber
则是保证系统层面最终一致性的最后防线。
在交给merge function的时候,一个批次判断 只是使用主键
while (!minHeap.isEmpty()) {
Element element = minHeap.peek();
if (userKeyComparator.compare(key, element.kv.key()) != 0) {
break;
}
minHeap.poll();
mergeFunctionWrapper.add(element.kv);
polled.add(element);
}
SortBufferWriteBuffer
比较次序
SortBufferWriteBuffer
是 Apache Paimon 存储引擎中一个至关重要的组件,它实现了 WriteBuffer
接口,是 LSM-Tree (Log-Structured Merge-Tree) 架构中内存缓冲区的核心实现。它的主要职责是在数据写入磁盘前,在内存中对写入的记录进行缓冲、排序和预合并。
它的核心工作流是:
- 接收数据:接收上游算子发来的
KeyValue
数据(包含主键、序列号、行类型和行数据)。 - 内存排序:将接收到的数据在内存中按照指定的排序规则进行排序。
- 溢写 (Spillable):当内存使用达到阈值时,能将排序好的数据溢写到磁盘临时文件中,以防止内存溢出(OOM),并支持处理远超内存大小的数据量。
- 提供有序迭代器:当内存缓冲区需要被刷盘(Flush)时,它能提供一个全局有序的迭代器,该迭代器会合并内存中和所有已溢写到磁盘的临时文件中的数据。
- 预合并 (Pre-aggregation/Merging):在数据刷盘前,通过
forEach
方法对主键相同的数据进行合并(例如,对于主键表,只保留序列号最大的那条记录),从而减少写入磁盘的数据量。
SortBufferWriteBuffer
的构造函数非常关键,它完成了所有排序和缓冲逻辑的准备工作。
// ... existing code ...
public SortBufferWriteBuffer(
RowType keyType,
RowType valueType,
@Nullable FieldsComparator userDefinedSeqComparator,
MemorySegmentPool memoryPool,
boolean spillable,
MemorySize maxDiskSize,
int sortMaxFan,
CompressOptions compression,
IOManager ioManager) {
this.keyType = keyType;
this.valueType = valueType;
this.serializer = new KeyValueSerializer(keyType, valueType);
// 1. 确定排序字段
// key fields
IntStream sortFields = IntStream.range(0, keyType.getFieldCount());
// user define sequence fields
if (userDefinedSeqComparator != null) {
IntStream udsFields =
IntStream.of(userDefinedSeqComparator.compareFields())
.map(operand -> operand + keyType.getFieldCount() + 2);
sortFields = IntStream.concat(sortFields, udsFields);
}
// sequence field
sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));
int[] sortFieldArray = sortFields.toArray();
// 2. 构造完整的行类型
List<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());
fieldTypes.add(new BigIntType(false));
fieldTypes.add(new TinyIntType(false));
fieldTypes.addAll(valueType.getFieldTypes());
// 3. 通过代码生成创建比较器和正规化键计算器
NormalizedKeyComputer normalizedKeyComputer =
CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);
RecordComparator keyComparator =
CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, true);
if (memoryPool.freePages() < 3) {
// ... existing code ...
}
InternalRowSerializer serializer =
InternalSerializers.create(KeyValue.schema(keyType, valueType));
BinaryInMemorySortBuffer inMemorySortBuffer =
BinaryInMemorySortBuffer.createBuffer(
normalizedKeyComputer, serializer, keyComparator, memoryPool);
// 4. 根据是否支持溢写,选择不同的 SortBuffer 实现
this.buffer =
ioManager != null && spillable
? new BinaryExternalSortBuffer(
new BinaryRowSerializer(serializer.getArity()),
keyComparator,
memoryPool.pageSize(),
inMemorySortBuffer,
ioManager,
sortMaxFan,
compression,
maxDiskSize)
: inMemorySortBuffer;
}
// ... existing code ...
分析要点:
- 排序字段的确定:排序规则是保证数据正确合并的关键。排序优先级为:主键字段 > 用户定义的序列号字段 (可选) > 内部序列号 (sequenceNumber)。这样可以确保相同主键的数据被排在一起,并且可以根据序列号确定其先后顺序。
这里加入的是索引位置,不是实际key
Paimon 作为一种 Merge-Tree 存储,必须保证对相同主键(Primary Key)的数据进行合并(Merge)时,顺序是稳定且正确的。因此,排序规则是分层次的:
按主键排序:这是最优先的规则,确保相同主键的数据被排在一起,以便后续进行合并。
按用户定义的
sequence.field
排序(如果存在):当多条记录主键相同时,Paimon 允许用户指定一个或多个业务字段(如update_time
)来决定合并的顺序。值越大,越靠后合并。按内部
sequenceNumber
排序:当主键和用户定义的sequence.field
都相同时,就需要一个最终的规则来决胜负。Paimon 会为每条写入的数据分配一个单调递增的内部sequenceNumber
。这个数字代表了数据的写入顺序。通过对这个字段排序,可以保证即使所有业务字段都相同,合并顺序依然是稳定和可预测的(后写入的数据后合并)。
- 代码生成 (Code Generation):Paimon 为了极致的性能,没有使用基于反射或解释性的比较方式,而是通过
CodeGenUtils
动态生成了NormalizedKeyComputer
和RecordComparator
的字节码。RecordComparator
: 用于精确比较两条记录。NormalizedKeyComputer
: 用于为每条记录生成一个定长的二进制前缀(Normalized Key)。这样在排序时,可以直接比较这个二进制前缀,速度极快,只有在前缀相同时才需要调用RecordComparator
进行完整比较。
- Buffer 的选择与初始化:
- 首先,它会创建一个
BinaryInMemorySortBuffer
,这是一个纯内存的排序缓冲区。 - 然后,它会判断
spillable
参数。如果为true
并且提供了IOManager
,它会将内存缓冲区包装成一个BinaryExternalSortBuffer
。这个外部排序缓冲区具备了当内存不足时,将数据溢写到磁盘的能力,从而支持大数据量的写入。这是一种典型的“外部排序”算法实现。
- 首先,它会创建一个
PartialUpdateMergeFunction
这是在 Paimon 中实现 partial-update
(部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue
)合并成最终的一条记录。
PartialUpdateMergeFunction
实现了 MergeFunction<KeyValue>
接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue
数据,然后交由一个 MergeFunction
实例来处理,计算出最终的结果。
PartialUpdateMergeFunction
的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。
// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/**
* A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
* non-null fields on merge.
*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...
核心成员变量
这些变量定义了 PartialUpdateMergeFunction
的状态和配置,决定了其合并行为。
// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
public static final String SEQUENCE_GROUP = "sequence-group";
private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值
private final boolean ignoreDelete; // 是否忽略删除记录
private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列号比较器,用于 sequence-group
private final boolean fieldSequenceEnabled; // 是否启用了 sequence-group
private final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器
private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行
private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列
private final boolean[] nullables; // 记录每个字段是否可为 null
private InternalRow currentKey; // 当前处理的主键
private long latestSequenceNumber; // 见过的最新序列号
private GenericRow row; // 合并过程中的结果行
private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建
private boolean currentDeleteRow; // 标记当前行最终是否应被删除
private boolean notNullColumnFilled;
/**
* If the first value is retract, and no insert record is received, the row kind should be
* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no
* RowKind.INSERT value is received)
*/
private boolean meetInsert; // 是否遇到过 INSERT 类型的记录
// ... existing code ...
- 配置类变量 (
ignoreDelete
,fieldSeqComparators
,fieldAggregators
等) 通常在Factory
中被初始化,它们在整个合并过程中保持不变。 - 状态类变量 (
currentKey
,row
,latestSequenceNumber
等) 会在每次reset()
时被重置,用于处理新的一组具有相同主键的记录。
add(KeyValue kv)
:合并逻辑的核心
这是最重要的方法,定义了单条 KeyValue
是如何被合并到当前结果 row
中的。
// ... existing code ...
@Override
public void add(KeyValue kv) {
// refresh key object to avoid reference overwritten
currentKey = kv.key();
currentDeleteRow = false;
if (kv.valueKind().isRetract()) {
if (!notNullColumnFilled) {
initRow(row, kv.value());
notNullColumnFilled = true;
}
// ... 删除逻辑处理 ...
// ... existing code ...
String msg =
String.join(
"\n",
"By default, Partial update can not accept delete records,"
+ " you can choose one of the following solutions:",
"1. Configure 'ignore-delete' to ignore delete records.",
"2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.",
"3. Configure 'sequence-group's to retract partial columns.");
throw new IllegalArgumentException(msg);
}
latestSequenceNumber = kv.sequenceNumber();
if (fieldSeqComparators.isEmpty()) {
updateNonNullFields(kv);
} else {
updateWithSequenceGroup(kv);
}
meetInsert = true;
notNullColumnFilled = true;
}
// ... existing code ...
它的逻辑可以分为两大块:
A. 处理 retract
消息 (RowKind 为 DELETE
或 UPDATE_BEFORE
)
partial-update
默认不接受删除记录。如果收到了,行为由配置决定:
ignoreDelete = true
: 直接忽略这条删除记录,返回。removeRecordOnDelete = true
: 当收到DELETE
类型的记录时,将currentDeleteRow
标记为true
,并清空当前row
。这意味着最终这条主键对应的记录将被删除。fieldSequenceEnabled = true
: 启用了sequence-group
。这是最复杂的逻辑,它会调用retractWithSequenceGroup(kv)
。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为null
或调用聚合器的retract
方法)。- 默认行为: 如果以上配置都没有,则直接抛出
IllegalArgumentException
异常,提示用户如何正确配置。
B. 处理 add
消息 (RowKind 为 INSERT
或 UPDATE_AFTER
)
这是主要的更新逻辑:
-
简单更新 (
updateNonNullFields
): 如果没有配置sequence-group
(fieldSeqComparators
为空),则执行最简单的部分列更新。遍历新纪录kv
的所有字段,只要字段值不为null
,就用它来更新row
中对应位置的值。// ... existing code ... private void updateNonNullFields(KeyValue kv) { for (int i = 0; i < getters.length; i++) { Object field = getters[i].getFieldOrNull(kv.value()); if (field != null) { row.setField(i, field); } else { // ... existing code ...
-
带序列号的更新 (
updateWithSequenceGroup
): 如果配置了sequence-group
,逻辑会更复杂。对于每个字段:- 如果该字段不属于任何
sequence-group
,则行为和简单更新类似(但会考虑聚合)。 - 如果该字段属于某个
sequence-group
,则会使用FieldsComparator
比较新记录kv
和当前结果row
的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新row
中由该sequence-group
控制的所有字段。这保证了数据的更新顺序。
- 如果该字段不属于任何
updateWithSequenceGroup
这个方法是 partial-update
合并引擎处理带有 sequence-group
配置时的核心逻辑。当用户在表属性中定义了 fields.<seq_field>.sequence-group = <data_field1>,<data_field2>
这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field
的值来判断是否应该更新 data_field1
和 data_field2
。这解决了多流更新时可能出现的数据乱序覆盖问题。
updateWithSequenceGroup
方法通过引入FieldsComparator
,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimonpartial-update
模式能够处理复杂更新场景的关键所在。
// ... existing code ...
private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
- 输入:
KeyValue kv
,代表一条新到达的、具有相同主键的记录。 - 目标: 遍历这条新记录
kv
的所有字段,并根据sequence-group
的规则,决定是否用kv
中的字段值来更新当前正在合并的结果行this.row
。
该方法的核心是一个 for
循环,它遍历了表中的每一个字段。
// ... existing code ...
private void updateWithSequenceGroup(KeyValue kv) {
for (int i = 0; i < getters.length; i++) {
// ... existing code ...
在循环内部,对每个字段的处理逻辑可以分为两种情况:
- 该字段不属于任何
sequence-group
。 - 该字段属于某个
sequence-group
。
让我们来详细看这两种情况。
1. 字段不属于任何 sequence-group
// ... existing code ...
private void updateWithSequenceGroup(KeyValue kv) {
for (int i = 0; i < getters.length; i++) {
Object field = getters[i].getFieldOrNull(kv.value());
FieldsComparator seqComparator = fieldSeqComparators.get(i);
FieldAggregator aggregator = fieldAggregators.get(i);
Object accumulator = getters[i].getFieldOrNull(row);
if (seqComparator == null) {
if (aggregator != null) {
row.setField(i, aggregator.agg(accumulator, field));
} else if (field != null) {
row.setField(i, field);
}
} else {
// ... existing code ...
- 判断条件:
seqComparator == null
。fieldSeqComparators
是一个Map<Integer, FieldsComparator>
,如果在里面找不到当前字段索引i
,就说明这个字段不受任何sequence-group
控制。 - 处理逻辑:
- 带聚合函数: 如果为该字段配置了聚合函数(
aggregator != null
),例如sum
、max
等,则调用aggregator.agg()
方法,将当前累加值accumulator
和新值field
进行聚合,并将结果写回row
。 - 不带聚合函数: 这是最简单的情况。如果新来的字段值
field
不为null
,就直接用它覆盖row
中的旧值。这和updateNonNullFields
的行为是一致的。
- 带聚合函数: 如果为该字段配置了聚合函数(
2. 字段属于某个 sequence-group
这是该方法最核心和复杂的部分。
// ... existing code ...
} else {
if (isEmptySequenceGroup(kv, seqComparator)) {
// skip null sequence group
continue;
}
if (seqComparator.compare(kv.value(), row) >= 0) {
int index = i;
// Multiple sequence fields should be updated at once.
if (Arrays.stream(seqComparator.compareFields())
.anyMatch(seqIndex -> seqIndex == index)) {
for (int fieldIndex : seqComparator.compareFields()) {
row.setField(
fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
}
continue;
}
row.setField(
i, aggregator == null ? field : aggregator.agg(accumulator, field));
} else if (aggregator != null) {
row.setField(i, aggregator.aggReversed(accumulator, field));
}
}
}
}
// ... existing code ...
- 判断条件:
seqComparator != null
。 - 处理逻辑:
- 空序列组检查:
isEmptySequenceGroup(kv, seqComparator)
会检查这条新纪录kv
中,其对应的序列号字段是否都为null
。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。 - 序列号比较:
seqComparator.compare(kv.value(), row) >= 0
是关键。它会比较新记录kv
和当前结果row
中,由seqComparator
定义的序列号字段。- 如果新记录的序列字段 >= 当前结果的序列字段 : 这意味着新记录
kv
是“更新”的或者“同样新”的,此时应该用kv
的值去更新row
。- 更新 序列字段 本身: 如果当前字段
i
就是序列字段之一,那么需要把这个sequence-group
定义的所有序列号字段都一次性更新掉,然后用continue
跳出本次循环。这是为了保证序列字段之间的一致性。 - 更新数据字段: 如果当前字段
i
是被序列字段 控制的数据字段,则执行更新。如果有聚合器,则调用aggregator.agg()
;如果没有,则直接用新值field
覆盖。
- 更新 序列字段 本身: 如果当前字段
- 如果新记录的序列字段 < 当前结果的序列字段 : 这意味着
kv
是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如sum
),则会调用aggregator.aggReversed()
。这个方法通常和agg()
的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如max
),aggReversed
可能就是一个空操作。
- 如果新记录的序列字段 >= 当前结果的序列字段 : 这意味着新记录
- 空序列组检查:
createFieldAggregators
方法详解
该方法的核心任务是:为表中的每一个字段(列)确定它在合并时应该使用的聚合逻辑,并创建相应的聚合器(FieldAggregator)。
private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
RowType rowType,
List<String> primaryKeys,
List<Integer> allSequenceFields,
CoreOptions options) {
List<String> fieldNames = rowType.getFieldNames();
List<DataType> fieldTypes = rowType.getFieldTypes();
Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<>();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
if (allSequenceFields.contains(i)) {
// no agg for sequence fields
continue;
}
if (primaryKeys.contains(fieldName)) {
// aggregate by primary keys, so they do not aggregate
fieldAggregators.put(
i,
() ->
FieldAggregatorFactory.create(
fieldType,
fieldName,
FieldPrimaryKeyAggFactory.NAME,
options));
continue;
}
String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) {
// last_non_null_value doesn't require sequence group
checkArgument(
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
|| fieldSeqComparators.containsKey(
fieldNames.indexOf(fieldName)),
"Must use sequence group for aggregation functions but not found for field %s.",
fieldName);
fieldAggregators.put(
i,
() ->
FieldAggregatorFactory.create(
fieldType, fieldName, aggFuncName, options));
}
}
return fieldAggregators;
}
它遍历表中的所有字段,并按以下优先级规则为每个字段决定其命运:
规则一:序列字段不参与聚合
if (allSequenceFields.contains(i)) {
// no agg for sequence fields
continue;
}
-
说明:如果一个字段被定义为
sequence.field
(例如event_time
),它的唯一作用就是用来比较记录的新旧,其本身的值在合并时永远是直接覆盖,不会进行求和、求最大值等聚合操作。因此直接跳过该字段的聚合逻辑。
规则二:主键不参与聚合
if (primaryKeys.contains(fieldName)) {
// ...
fieldAggregators.put(
i,
() -> FieldAggregatorFactory.create(..., FieldPrimaryKeyAggFactory.NAME, ...));
continue;
}
-
说明:主键字段是记录的唯一标识,它的值在合并过程中必须保持不变。这里为它创建了一个
FieldPrimaryKeyAggFactory
,该工厂产生的聚合器逻辑非常简单:直接返回遇到的第一个值(每个merge function处理的总是主键相同的批次),后续的值都忽略。这种方式保证了主键的稳定性。
规则三:用户自定义的聚合函数
String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) {
// ...
}
-
说明:Paimon 允许用户为特定字段或所有字段(通过
fields.default.aggregate-function
)指定聚合函数,例如sum
、max
、min
、last_non_null_value
等。 -
逻辑:
getAggFuncName
会读取这些配置。如果找到了配置,就会进入一个重要的检查:
checkArgument(
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
|| fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)),
...);
-
背景:除了
last_non_null_value
,其他所有聚合函数都必须有sequence group
的支持。 -
后续:检查通过后,就使用
FieldAggregatorFactory
创建一个用户指定的聚合器。
规则四:默认行为(无聚合)
-
说明:如果一个字段不满足以上任何一条规则(它不是序列字段、不是主键、也没有配置任何聚合函数),那么
createFieldAggregators
就不会为它创建任何聚合器。 -
合并逻辑:在后续的合并逻辑中(如
updateNonNullFields
或updateWithSequenceGroup
),对于没有聚合器的字段,其默认行为就是 “非空值覆盖” (non-null value overwrite)。
定义多个 sequence.field
会怎么样?
这个问题可以从两个层面来理解:
-
一个 sequence-group 定义多个字段
-
定义多个独立的 sequence-group
一个 sequence-group 定义多个字段
配置语法:
'fields.col_a,col_b.sequence-group' = 'val_c,val_d'
-
col_a,col_b
:这就是“很多sequence.field
”。它定义了一个复合序列键 (Composite Sequence Key)。 -
val_c,val_d
:这是受这个复合序列键控制的值字段。
行为:
当 Paimon 需要比较两条主键相同的记录时,它会:
-
先比较
col_a
的值; -
如果
col_a
的值相等,再比较col_b
的值; -
只有当
col_a
和col_b
的值都相同时,才认为这两条记录的业务序列是相同的,进而才会去比较sequenceNumber
。
效果:
这提供了一种更精细的业务逻辑排序能力。例如,可能需要先按 event_date
排序,如果日期相同,再按 event_time
排序。这种复合键的设计可以精确地表达这种多层次的业务逻辑。
定义多个独立的 sequence-group
配置语法:
'fields.event_time.sequence-group' = 'col_a,col_b',
'fields.version.sequence-group' = 'col_c,col_d'
行为:
这种配置定义了互不相干的更新域。具体表现为:
-
当一条新记录到来,只更新了
col_a
或col_b
时,Paimon 会取出新旧记录的event_time
字段进行比较,来决定是否接受这次更新; -
当另一条新记录到来,只更新了
col_c
或col_d
时,Paimon 则会取出新旧记录的version
字段进行比较。
关键点:
event_time
的比较结果完全不影响 col_c
的更新,反之亦然。
效果:
-
性能影响:
定义越多的
sequence-group
,意味着在PartialUpdateMergeFunction
中需要执行更多的条件判断和比较逻辑。每个字段的更新都需要找到它对应的FieldsComparator
并执行比较。虽然单个比较很快,但大量的分组会增加 CPU 的开销,尤其是在合并大量数据时。 -
逻辑复杂性:
这会使表的更新行为变得非常复杂。需要非常清楚地管理哪个字段由哪个序列字段控制,以避免出现不符合预期的更新结果。
-
灵活性:
它提供了极大的灵活性,允许表的不同部分遵循不同的更新逻辑。例如,用户的基本信息(如姓名)可能由“最后修改时间”控制,而用户的账户余额可能由一个严格递增的“交易版本号”来控制。
总结
定义很多 sequence.field
(无论是复合键还是多分组),都会让 Paimon 在处理主键冲突时执行更复杂的比较逻辑。这增加了系统的灵活性,但也带来了性能开销和管理上的复杂性。
设计建议:
在设计表结构时,应仅在业务确实需要时才使用复杂的序列字段配置,并优先选择最简单、最能代表数据新旧的单个字段作为 sequence.field
。
为什么compaction比较 和 merge比较不同
SortMergeReaderWithMinHeap
和 PartialUpdateMergeFunction
分别负责其中一个阶段,因此它们的数据结构和视角是不同的。
把整个合并过程想象成一个工厂流水线:
第一站:排序站 (SortMergeReaderWithMinHeap
)
-
任务:将来自不同文件(不同传送带)的所有零件(
KeyValue
记录)进行全局排序,确保主键相同、业务逻辑上更领先的零件排在前面。 -
工具:一个最小堆 (
PriorityQueue
)。
第二站:合并站 (PartialUpdateMergeFunction
)
-
任务:从排序站拿到一批主键完全相同的零件,然后根据非常精细的规则(比如 A 字段用 A 规则更新,B 字段用 B 规则更新),将它们合并成一个最终成品。
-
工具:一个存有每种字段更新规则的“说明书” (
fieldSeqComparators
Map)。
SortMergeReaderWithMinHeap
的核心是一个最小堆。最小堆的本质是,你每次只能从堆顶取出一个“最小”的元素。为了能决定谁是“最小”的,你必须提供一个全局统一、对所有元素都适用的比较规则。
这个比较规则就是在代码中看到的:
-
先按主键排。
-
如果主键相同,就用那一个
userDefinedSeqComparator
按sequence.field
排。 -
如果连
sequence.field
都相同,最后才用sequenceNumber
排。
这里的 userDefinedSeqComparator
是根据 sequence.field
选项创建的。它代表了这张表最主要的、全局的业务时间线。SortMergeReader
用它来解决最主要的排序问题:当主键相同时,到底哪条记录应该先被处理?
它不需要、也不能够理解“A 字段用一个比较器,B 字段用另一个”这种复杂的逻辑。它的任务很简单:给我一个统一的规则,我帮你把所有记录排好队。
当 SortMergeReader
完成排序后,它会把所有主键相同的记录一次性地、按顺序地喂给 PartialUpdateMergeFunction
的 add()
方法。
此时,PartialUpdateMergeFunction
的任务变了。它不再需要进行全局排序,而是要执行精细化的字段级合并。
// ...existing code...
for (int index : sequenceFields) {
allSequenceFields.add(index);
String fieldName = fieldNames.get(index);
fieldSeqComparators.put(index, userDefinedSeqComparator);
sequenceGroupMap.put(fieldName, index);
}
// ...existing code...
这段代码是在 Factory
中解析用户的 sequence-group
配置。它的作用是构建一个“更新规则地图”,即 fieldSeqComparators
。
-
Key (
index
): 字段的索引(比如col_a
的索引是 5)。 -
Value (
userDefinedSeqComparator
): 当要更新这个字段时,应该使用哪个比较器来判断新旧。
这个 Map 告诉 PartialUpdateMergeFunction
:
-
“如果要更新
col_a
(索引为 5),请使用event_time
比较器。” -
“如果要更新
col_b
(索引为 6),也请使用event_time
比较器。” -
“如果要更新
col_c
(索引为 7),请使用version
比较器。”
当 PartialUpdateMergeFunction
的 updateWithSequenceGroup
方法被调用时,它会遍历一行中的所有字段,然后在这个 Map 中查找每个字段对应的更新规则(FieldsComparator
),并根据规则决定是否用新值覆盖旧值。
总结
组件 |
角色 |
为什么是这种设计? |
---|---|---|
|
全局排序器 |
只有一个 |
|
字段级合并器 |
有一个 |
所以,这两个组件的设计并不矛盾,而是完美地体现了它们在整个合并流程中不同阶段、不同职责的分工。SortMergeReader
负责宏观的排序,PartialUpdateMergeFunction
负责微观的合并。
配置解析(Factory类)PartialMergeFunction
// 解析sequence group配置
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
// 解析sequence字段
int[] sequenceFields = ...;
// 创建比较器
Supplier<FieldsComparator> userDefinedSeqComparator =
() -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
// 为每个目标字段设置比较器
Arrays.stream(v.split(FIELDS_SEPARATOR))
.forEach(field -> fieldSeqComparators.put(field, userDefinedSeqComparator));
}
}
更新逻辑(updateWithSequenceGroup方法)
private void updateWithSequenceGroup(KeyValue kv) {
for (int i = 0; i < getters.length; i++) {
FieldsComparator seqComparator = fieldSeqComparators.get(i);
if (seqComparator != null) {
// 比较序列字段
if (seqComparator.compare(kv.value(), row) >= 0) {
// 新记录的序列号更大,更新字段
row.setField(i, field);
// 多个序列字段需要同时更新
if (Arrays.stream(seqComparator.compareFields())
.anyMatch(seqIndex -> seqIndex == index)) {
for (int fieldIndex : seqComparator.compareFields()) {
row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
}
}
}
}
}
}
撤回逻辑(retractWithSequenceGroup方法)
private void retractWithSequenceGroup(KeyValue kv) {
if (seqComparator.compare(kv.value(), row) >= 0) {
// 处理删除记录
if (kv.valueKind() == RowKind.DELETE &&
sequenceGroupPartialDelete.contains(field)) {
currentDeleteRow = true;
return;
}
// 撤回字段值
row.setField(field, getters[field].getFieldOrNull(kv.value()));
}
}
更多推荐
所有评论(0)