多 Flink 任务写入一个桶能不能实现多流join

场景设定

  • ​任务A​​:只写入 col_acol_b字段。

  • ​任务B​​:只写入 col_ccol_d字段。

  • ​并发写入​​:两个任务并发写入,它们的 sequenceNumber是完全独立的,因此在全局看来是乱序的。

  • ​聚合配置​​:所有值字段(col_acol_d)都配置了 last_non_null_value聚合。

​只要每个 Flink 任务只写自己的字段,两个任务的字段不互相干扰,即使 sequenceNumber乱序,last_non_null_value也能正确工作。​

通过配置设置agg函数

public class CoreOptions implements Serializable {

    public static final String FIELDS_PREFIX = "fields";

    public static final String FIELDS_SEPARATOR = ",";

    public static final String AGG_FUNCTION = "aggregate-function";
    
    public String fieldAggFunc(String fieldName) {
        return options.get(
                key(FIELDS_PREFIX + "." + fieldName + "." + AGG_FUNCTION)
                        .stringType()
                        .noDefaultValue());
    }

PartialUpdateMergeFunction 会调用获得名字

        @Nullable
        private String getAggFuncName(CoreOptions options, String fieldName) {
            String aggFunc = options.fieldAggFunc(fieldName);
            return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
        }

推演过程

之后分析了PartialUpdateMergeFunction 的实现

我们来推演一下为什么能正确工作:

假设对于主键 pk=1,发生了以下事件:

  1. ​T1时刻​​:任务A 写入

    {pk:1, col_a:'A1', col_b:null, col_c:null, col_d:null}

    Paimon 分配 sequenceNumber = 101

  2. ​T2时刻​​:任务B 写入

    {pk:1, col_a:null, col_b:null, col_c:'C1', col_d:null}

    Paimon 分配 sequenceNumber = 201

  3. ​T3时刻​​:任务A 写入

    {pk:1, col_a:'A2', col_b:null, col_c:null, col_d:null}

    Paimon 分配 sequenceNumber = 102

现在,这三条记录在 Compaction 时相遇了。假设没有配置 sequence.fieldSortMergeReader会根据 sequenceNumber对它们进行排序:

  • ​记录1​​:{..., seq: 101}

  • ​记录3​​:{..., seq: 102}

  • ​记录2​​:{..., seq: 201}

PartialUpdateMergeFunction会按 ​​1 → 3 → 2​​ 的顺序处理它们:

初始化

row = {col_a:null, col_b:null, col_c:null, col_d:null}

处理记录1 (seq=101)

  • col_a更新为 'A1'

  • 其他字段都是 nulllast_non_null_value忽略它们。

  • ​当前 row 状态​​: {col_a:'A1', col_b:null, col_c:null, col_d:null}

处理记录3 (seq=102)

  • col_a更新为 'A2'

  • 其他字段都是 null,忽略。

  • ​当前 row 状态​​: {col_a:'A2', col_b:null, col_c:null, col_d:null}

处理记录2 (seq=201)

  • col_anull,忽略。

  • col_c更新为 'C1'

  • 其他字段是 null,忽略。

  • ​当前 row 状态​​: {col_a:'A2', col_b:null, col_c:'C1', col_d:null}

最终合并结果

{col_a:'A2', col_b:null, col_c:'C1', col_d:null}


为什么能行?

关键在于以下两点:

  1. ​字段不重叠​

    任务A 和 任务B 操作的是完全不同的字段集合。任务A 的写入对 col_ccol_d来说永远是 null,反之亦然。

  2. last_non_null_value的幂等性​

    该聚合函数的逻辑是“只要新来的不是 null就覆盖”。由于字段不重叠,任务A 的 sequenceNumber乱序只会影响 col_acol_b的合并顺序,但不会干扰到 col_ccol_d。同理,任务B 的 sequenceNumber也不会影响到 col_acol_b

快照提交的原子保证

详细分析见:

Paimon 原子提交实现

通过重命名的原子性;或者Hive本身锁;MYSQL唯一主键

在HDFS中,​​重命名(Rename)操作本身是一个原子性的元数据操作​​。对于NameNode来说,它只是改变一个inode的指向。这个操作要么完全成功,使得最终文件立即可见且包含完整数据;要么完全失败,最终文件完全不存在。客户端永远不会看到一个只写了一半的、损坏的最终文件。


重要警告

这种模式能够成功,​​强依赖于“字段不重叠”这个假设​​。

如果任务A 和 任务B 都可能写入 col_a,那么 sequenceNumber的乱序就会导致 col_a的最终值变得不确定,从而产生数据不一致。

​在这种情况下,就必须引入 sequence.field来提供一个全局统一的业务时钟,以确保无论哪个任务写入,都能根据业务时间来决定胜负。​

SequenceNumber(序列号)

其本身是 MergeTreeWriter的一个long字段产生的

见:Paimon MergeTreeWriter:LSM Tree level0 SST的诞生

​定义​​:SequenceNumberKeyValue类中的一个长整型字段,用于标识记录的时间顺序或版本信息。

​作用​​:

  • PartialUpdateMergeFunction.add()方法中,通过kv.sequenceNumber()获取

  • 当没有配置sequence group时,作为主要的排序依据

SeqComparators(序列比较器)

​定义​​:SeqComparatorsFieldsComparator接口的实现集合,存储在fieldSeqComparators字段中。

​作用​​:

  • 实现自定义的字段比较逻辑,替代默认的SequenceNumber比较

  • 支持基于业务字段(如时间戳、版本号)的排序

  • updateWithSequenceGroupretractWithSequenceGroup方法中使用

​核心接口​​:

public interface FieldsComparator extends Comparator<InternalRow> {
    int[] compareFields();  // 返回参与比较的字段索引
}

​主要实现​​:UserDefinedSeqComparator,通过代码生成实现高效的字段比较。

Sequence field(序列字段)

​定义​​:Sequence field是用于比较的字段,通常包含时间戳、版本号等具有顺序意义的业务字段。

​配置方式​​:

  • 通过fields.<field_name>.sequence-group配置

  • 在Factory类构造函数中解析配置:

可以看到解析格式是:fields.f1,f2...,fn.sequence-group

    public static final String FIELDS_PREFIX = "fields";

    public static final String FIELDS_SEPARATOR = ",";

    public static final String SEQUENCE_GROUP = "sequence-group";

if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
                    int[] sequenceFields =
                            Arrays.stream(
                                            k.substring(
                                                            FIELDS_PREFIX.length() + 1,
                                                            k.length()
                                                                    - SEQUENCE_GROUP.length()
                                                                    - 1)
                                                    .split(FIELDS_SEPARATOR))
                                    .mapToInt(fieldName -> requireField(fieldName, fieldNames))
                                    .toArray();

​作用​​:

  • 作为比较的依据字段

  • 可以是单个字段或多个字段的组合

  • 在sequence group中作为排序的基准

Sequence group(序列组)

​定义​​:Sequence group是一组字段的集合,这些字段在更新时需要一起处理,具有相同的序列比较逻辑。

​配置语法​​:

fields.<sequence_field1>,<sequence_field2>.sequence-group=<target_field1>,<target_field2>

​配置示例​​:

fields.update_time.sequence-group=name,age,address

PartialUpdateMergeFunction里的设置,可以看到对于每个等号右边的字段设置比较器就是sequence field

if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
                    int[] sequenceFields = ...;

                    Supplier<FieldsComparator> userDefinedSeqComparator =
                            () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
                    Arrays.stream(v.split(FIELDS_SEPARATOR))
                            .map(fieldName -> requireField(fieldName, fieldNames))
                            .forEach(
                                    field -> {
                                        if (fieldSeqComparators.containsKey(field)) {
                                            throw new IllegalArgumentException(
                                                    String.format(
                                                            "Field %s is defined repeatedly by multiple groups: %s",
                                                            fieldNames.get(field), k));
                                        }
                                        fieldSeqComparators.put(field, userDefinedSeqComparator);
                                    });

keyComparatorvs FieldsComparator(userDefinedSeqComparator)

这两个比较器在 MergeTreeWriter中扮演着不同但互补的角色,共同决定了数据的最终顺序和合并结果。

  • keyComparator(主键比较器)​

    • ​作用​​:它的核心作用是​​对数据记录按主键(Primary Key)进行排序​​。在 Paimon 的 Merge-Tree (LSM) 结构中,所有的数据文件(SSTable)内部都必须是按主 key 有序的。

    • ​使用场景​​:当内存中的 WriteBuffer(即 SortBufferWriteBuffer) 写满并需要刷盘时,它会使用 keyComparator对缓冲区内的所有 KeyValue记录进行排序,然后将排序后的结果写入一个新的 Level-0 文件。这个排序是后续所有 Compaction (归并) 操作高效进行的基础。

    • ​总结​​:keyComparator决定了数据在文件中的​​物理存储顺序​​,是 LSM-Tree 结构的核心要求。

  • userDefinedSeqComparator(FieldsComparator) (用户定义序列比较器)​

    • ​作用​​:它是一个​​可选的、用于处理主键冲突的仲裁比较器​​。当用户配置了 sequence.field选项时,Paimon 会根据该字段生成一个 FieldsComparator。它的作用是:​​当两条或多条记录具有相同的主键时,使用这个比较器来判断哪条记录是“最新”的​​。

UserDefinedSeqComparator创建

主要是KeyValueFileStoreWrite负责,对于写入

 return new MergeTreeWriter(
                options.writeBufferSpillable(),
                options.writeBufferSpillDiskSize(),
                options.localSortMaxNumFileHandles(),
                options.spillCompressOptions(),
                ioManager,
                compactManager,
                restoredMaxSeqNumber,
                keyComparator,
                mfFactory.create(),
                writerFactory,
                options.commitForceCompact(),
                options.changelogProducer(),
                restoreIncrement,
                UserDefinedSeqComparator.create(valueType, options));
    

使用了工具类UserDefinedSeqComparator

    @Nullable
    public static UserDefinedSeqComparator create(RowType rowType, CoreOptions options) {
        return create(
                rowType, options.sequenceField(), options.sequenceFieldSortOrderIsAscending());
    }

对于compaction也是一样的

@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();
            CompactRewriter rewriter =
                    createRewriter(
                            partition,
                            bucket,
                            keyComparator,
                            userDefinedSeqComparator,
                            levels,
                            dvMaintainer);

这个supply是KeyValueFileStore传递来的,但实际是一样的。

return new KeyValueFileStoreWrite(
                fileIO,
                schemaManager,
                schema,
                commitUser,
                partitionType,
                keyType,
                valueType,
                keyComparatorSupplier,
                () -> UserDefinedSeqComparator.create(valueType, options),
                logDedupEqualSupplier,
                mfFactory,

Compaction 比较次序

Paimon LSM Tree Compaction核心:堆和败者树 有更深入的分析

在 Compaction 的核心组件 SortMergeReaderWithMinHeap中,当合并来自不同文件的记录时,其排序逻辑(定义在 PriorityQueue的构造函数中)非常清晰:

// ...existing code...
(e1, e2) -> {
    // 1. 首先,比较主键 (Primary Key)
    int result = userKeyComparator.compare(e1.kv.key(), e2.kv.key());
    if (result != 0) {
        return result;
    }
    // 2. 如果主键相同,则比较 sequence.field
    if (userDefinedSeqComparator != null) {
        result = 
            userDefinedSeqComparator.compare(
                e1.kv.value(), 
                e2.kv.value());
        if (result != 0) {
            return result;
        }
    }
    // 3. 如果主键和 sequence.field 都相同,才最后比较 sequenceNumber
    return Long.compare(e1.kv.sequenceNumber(), e2.kv.sequenceNumber());
}
// ...existing code...

正确的比较优先级链

  1. ​主键 (userKeyComparator)​

    这是最高优先级的排序依据,决定了数据的大致顺序。

  2. ​序列字段 (userDefinedSeqComparator)​

    当主键相同时,它作为第一道平局决胜规则 (Tie-breaker)。Paimon 会根据 sequence.field的值来判断哪条记录在业务逻辑上“更领先”。

  3. ​序列号 (sequenceNumber)​

    只有在主键和 sequence.field都完全相同的情况下,它才作为最终的平局决胜规则。sequenceNumber更大的记录被认为是“更新”的。

​结论​​:

sequence.field的作用域贯穿了从内存写入到跨文件合并的全过程,是处理主键冲突时的首要业务逻辑判断依据;sequenceNumber则是保证系统层面最终一致性的最后防线。

在交给merge function的时候,一个批次判断 只是使用主键

while (!minHeap.isEmpty()) {
                Element element = minHeap.peek();
                if (userKeyComparator.compare(key, element.kv.key()) != 0) {
                    break;
                }
                minHeap.poll();
                mergeFunctionWrapper.add(element.kv);
                polled.add(element);
            }

SortBufferWriteBuffer 比较次序

深入分析见:Paimon MemStore写入本地磁盘前的Sort:内存快排

SortBufferWriteBuffer 是 Apache Paimon 存储引擎中一个至关重要的组件,它实现了 WriteBuffer 接口,是 LSM-Tree (Log-Structured Merge-Tree) 架构中内存缓冲区的核心实现。它的主要职责是在数据写入磁盘前,在内存中对写入的记录进行缓冲、排序和预合并

它的核心工作流是:

  1. 接收数据:接收上游算子发来的 KeyValue 数据(包含主键、序列号、行类型和行数据)。
  2. 内存排序:将接收到的数据在内存中按照指定的排序规则进行排序。
  3. 溢写 (Spillable):当内存使用达到阈值时,能将排序好的数据溢写到磁盘临时文件中,以防止内存溢出(OOM),并支持处理远超内存大小的数据量。
  4. 提供有序迭代器:当内存缓冲区需要被刷盘(Flush)时,它能提供一个全局有序的迭代器,该迭代器会合并内存中和所有已溢写到磁盘的临时文件中的数据。
  5. 预合并 (Pre-aggregation/Merging):在数据刷盘前,通过 forEach 方法对主键相同的数据进行合并(例如,对于主键表,只保留序列号最大的那条记录),从而减少写入磁盘的数据量。

SortBufferWriteBuffer 的构造函数非常关键,它完成了所有排序和缓冲逻辑的准备工作。

// ... existing code ...
    public SortBufferWriteBuffer(
            RowType keyType,
            RowType valueType,
            @Nullable FieldsComparator userDefinedSeqComparator,
            MemorySegmentPool memoryPool,
            boolean spillable,
            MemorySize maxDiskSize,
            int sortMaxFan,
            CompressOptions compression,
            IOManager ioManager) {
        this.keyType = keyType;
        this.valueType = valueType;
        this.serializer = new KeyValueSerializer(keyType, valueType);

        // 1. 确定排序字段
        // key fields
        IntStream sortFields = IntStream.range(0, keyType.getFieldCount());

        // user define sequence fields
        if (userDefinedSeqComparator != null) {
            IntStream udsFields =
                    IntStream.of(userDefinedSeqComparator.compareFields())
                            .map(operand -> operand + keyType.getFieldCount() + 2);
            sortFields = IntStream.concat(sortFields, udsFields);
        }

        // sequence field
        sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));

        int[] sortFieldArray = sortFields.toArray();

        // 2. 构造完整的行类型
        List<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());
        fieldTypes.add(new BigIntType(false));
        fieldTypes.add(new TinyIntType(false));
        fieldTypes.addAll(valueType.getFieldTypes());

        // 3. 通过代码生成创建比较器和正规化键计算器
        NormalizedKeyComputer normalizedKeyComputer =
                CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);
        RecordComparator keyComparator =
                CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray, true);

        if (memoryPool.freePages() < 3) {
// ... existing code ...
        }
        InternalRowSerializer serializer =
                InternalSerializers.create(KeyValue.schema(keyType, valueType));
        BinaryInMemorySortBuffer inMemorySortBuffer =
                BinaryInMemorySortBuffer.createBuffer(
                        normalizedKeyComputer, serializer, keyComparator, memoryPool);
        
        // 4. 根据是否支持溢写,选择不同的 SortBuffer 实现
        this.buffer =
                ioManager != null && spillable
                        ? new BinaryExternalSortBuffer(
                                new BinaryRowSerializer(serializer.getArity()),
                                keyComparator,
                                memoryPool.pageSize(),
                                inMemorySortBuffer,
                                ioManager,
                                sortMaxFan,
                                compression,
                                maxDiskSize)
                        : inMemorySortBuffer;
    }
// ... existing code ...

分析要点:

  1. 排序字段的确定:排序规则是保证数据正确合并的关键。排序优先级为:主键字段 > 用户定义的序列号字段 (可选) > 内部序列号 (sequenceNumber)。这样可以确保相同主键的数据被排在一起,并且可以根据序列号确定其先后顺序。

这里加入的是索引位置,不是实际key

Paimon 作为一种 Merge-Tree 存储,必须保证对相同主键(Primary Key)的数据进行合并(Merge)时,顺序是稳定且正确的。因此,排序规则是分层次的:

  1. 按主键排序:这是最优先的规则,确保相同主键的数据被排在一起,以便后续进行合并。

  2. 按用户定义的 sequence.field 排序(如果存在):当多条记录主键相同时,Paimon 允许用户指定一个或多个业务字段(如 update_time)来决定合并的顺序。值越大,越靠后合并。

  3. 按内部 sequenceNumber 排序:当主键和用户定义的 sequence.field 都相同时,就需要一个最终的规则来决胜负。Paimon 会为每条写入的数据分配一个单调递增的内部 sequenceNumber。这个数字代表了数据的写入顺序。通过对这个字段排序,可以保证即使所有业务字段都相同,合并顺序依然是稳定和可预测的(后写入的数据后合并)。

  1. 代码生成 (Code Generation):Paimon 为了极致的性能,没有使用基于反射或解释性的比较方式,而是通过 CodeGenUtils 动态生成了 NormalizedKeyComputer 和 RecordComparator 的字节码。
    • RecordComparator: 用于精确比较两条记录。
    • NormalizedKeyComputer: 用于为每条记录生成一个定长的二进制前缀(Normalized Key)。这样在排序时,可以直接比较这个二进制前缀,速度极快,只有在前缀相同时才需要调用 RecordComparator 进行完整比较。
  2. Buffer 的选择与初始化
    • 首先,它会创建一个 BinaryInMemorySortBuffer,这是一个纯内存的排序缓冲区。
    • 然后,它会判断 spillable 参数。如果为 true 并且提供了 IOManager,它会将内存缓冲区包装成一个 BinaryExternalSortBuffer。这个外部排序缓冲区具备了当内存不足时,将数据溢写到磁盘的能力,从而支持大数据量的写入。这是一种典型的“外部排序”算法实现。

PartialUpdateMergeFunction

见:双流join 、 Paimon Partial Update 和 动态schema-CSDN博客

这是在 Paimon 中实现 partial-update (部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue)合并成最终的一条记录。

PartialUpdateMergeFunction 实现了 MergeFunction<KeyValue> 接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue 数据,然后交由一个 MergeFunction 实例来处理,计算出最终的结果。

PartialUpdateMergeFunction 的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。

// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/**
 * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
 * non-null fields on merge.
 */
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...

核心成员变量

这些变量定义了 PartialUpdateMergeFunction 的状态和配置,决定了其合并行为。

// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

    public static final String SEQUENCE_GROUP = "sequence-group";

    private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值
    private final boolean ignoreDelete; // 是否忽略删除记录
    private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列号比较器,用于 sequence-group
    private final boolean fieldSequenceEnabled; // 是否启用了 sequence-group
    private final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器
    private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行
    private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列
    private final boolean[] nullables; // 记录每个字段是否可为 null

    private InternalRow currentKey; // 当前处理的主键
    private long latestSequenceNumber; // 见过的最新序列号
    private GenericRow row; // 合并过程中的结果行
    private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建
    private boolean currentDeleteRow; // 标记当前行最终是否应被删除
    private boolean notNullColumnFilled;
    /**
     * If the first value is retract, and no insert record is received, the row kind should be
     * RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no
     * RowKind.INSERT value is received)
     */
    private boolean meetInsert; // 是否遇到过 INSERT 类型的记录

// ... existing code ...
  • 配置类变量 (ignoreDeletefieldSeqComparatorsfieldAggregators 等) 通常在 Factory 中被初始化,它们在整个合并过程中保持不变。
  • 状态类变量 (currentKeyrowlatestSequenceNumber 等) 会在每次 reset() 时被重置,用于处理新的一组具有相同主键的记录。

add(KeyValue kv) :合并逻辑的核心

这是最重要的方法,定义了单条 KeyValue 是如何被合并到当前结果 row 中的。

// ... existing code ...
    @Override
    public void add(KeyValue kv) {
        // refresh key object to avoid reference overwritten
        currentKey = kv.key();
        currentDeleteRow = false;
        if (kv.valueKind().isRetract()) {

            if (!notNullColumnFilled) {
                initRow(row, kv.value());
                notNullColumnFilled = true;
            }

            // ... 删除逻辑处理 ...
            // ... existing code ...
            String msg =
                    String.join(
                            "\n",
                            "By default, Partial update can not accept delete records,"
                                    + " you can choose one of the following solutions:",
                            "1. Configure 'ignore-delete' to ignore delete records.",
                            "2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.",
                            "3. Configure 'sequence-group's to retract partial columns.");

            throw new IllegalArgumentException(msg);
        }

        latestSequenceNumber = kv.sequenceNumber();
        if (fieldSeqComparators.isEmpty()) {
            updateNonNullFields(kv);
        } else {
            updateWithSequenceGroup(kv);
        }
        meetInsert = true;
        notNullColumnFilled = true;
    }
// ... existing code ...

它的逻辑可以分为两大块:

A. 处理 retract 消息 (RowKind 为 DELETE 或 UPDATE_BEFORE)

partial-update 默认不接受删除记录。如果收到了,行为由配置决定:

  1. ignoreDelete = true: 直接忽略这条删除记录,返回。
  2. removeRecordOnDelete = true: 当收到 DELETE 类型的记录时,将 currentDeleteRow 标记为 true,并清空当前 row。这意味着最终这条主键对应的记录将被删除。
  3. fieldSequenceEnabled = true: 启用了 sequence-group。这是最复杂的逻辑,它会调用 retractWithSequenceGroup(kv)。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为 null 或调用聚合器的 retract 方法)。
  4. 默认行为: 如果以上配置都没有,则直接抛出 IllegalArgumentException 异常,提示用户如何正确配置。

B. 处理 add 消息 (RowKind 为 INSERT 或 UPDATE_AFTER)

这是主要的更新逻辑:

  1. 简单更新 (updateNonNullFields): 如果没有配置 sequence-group (fieldSeqComparators 为空),则执行最简单的部分列更新。遍历新纪录 kv 的所有字段,只要字段值不为 null,就用它来更新 row 中对应位置的值。

    // ... existing code ...
    private void updateNonNullFields(KeyValue kv) {
        for (int i = 0; i < getters.length; i++) {
            Object field = getters[i].getFieldOrNull(kv.value());
            if (field != null) {
                row.setField(i, field);
            } else {
    // ... existing code ...
    
  2. 带序列号的更新 (updateWithSequenceGroup): 如果配置了 sequence-group,逻辑会更复杂。对于每个字段:

    • 如果该字段不属于任何 sequence-group,则行为和简单更新类似(但会考虑聚合)。
    • 如果该字段属于某个 sequence-group,则会使用 FieldsComparator 比较新记录 kv 和当前结果 row 的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新 row 中由该 sequence-group 控制的所有字段。这保证了数据的更新顺序。

updateWithSequenceGroup

这个方法是 partial-update 合并引擎处理带有 sequence-group 配置时的核心逻辑。当用户在表属性中定义了 fields.<seq_field>.sequence-group = <data_field1>,<data_field2> 这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field 的值来判断是否应该更新 data_field1 和 data_field2。这解决了多流更新时可能出现的数据乱序覆盖问题。

updateWithSequenceGroup 方法通过引入 FieldsComparator,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimon partial-update 模式能够处理复杂更新场景的关键所在。

// ... existing code ...
    private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
  • 输入KeyValue kv,代表一条新到达的、具有相同主键的记录。
  • 目标: 遍历这条新记录 kv 的所有字段,并根据 sequence-group 的规则,决定是否用 kv 中的字段值来更新当前正在合并的结果行 this.row

该方法的核心是一个 for 循环,它遍历了表中的每一个字段。

// ... existing code ...
    private void updateWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < getters.length; i++) {
// ... existing code ...

在循环内部,对每个字段的处理逻辑可以分为两种情况:

  1. 该字段不属于任何 sequence-group
  2. 该字段属于某个 sequence-group

让我们来详细看这两种情况。

1. 字段不属于任何 sequence-group

// ... existing code ...
    private void updateWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < getters.length; i++) {
            Object field = getters[i].getFieldOrNull(kv.value());
            FieldsComparator seqComparator = fieldSeqComparators.get(i);
            FieldAggregator aggregator = fieldAggregators.get(i);
            Object accumulator = getters[i].getFieldOrNull(row);
            if (seqComparator == null) {
                if (aggregator != null) {
                    row.setField(i, aggregator.agg(accumulator, field));
                } else if (field != null) {
                    row.setField(i, field);
                }
            } else {
// ... existing code ...
  • 判断条件seqComparator == nullfieldSeqComparators 是一个 Map<Integer, FieldsComparator>,如果在里面找不到当前字段索引 i,就说明这个字段不受任何 sequence-group 控制。
  • 处理逻辑:
    • 带聚合函数: 如果为该字段配置了聚合函数(aggregator != null),例如 summax 等,则调用 aggregator.agg() 方法,将当前累加值 accumulator 和新值 field 进行聚合,并将结果写回 row
    • 不带聚合函数: 这是最简单的情况。如果新来的字段值 field 不为 null,就直接用它覆盖 row 中的旧值。这和 updateNonNullFields 的行为是一致的。

2. 字段属于某个 sequence-group

这是该方法最核心和复杂的部分。

// ... existing code ...
            } else {
                if (isEmptySequenceGroup(kv, seqComparator)) {
                    // skip null sequence group
                    continue;
                }

                if (seqComparator.compare(kv.value(), row) >= 0) {
                    int index = i;

                    // Multiple sequence fields should be updated at once.
                    if (Arrays.stream(seqComparator.compareFields())
                            .anyMatch(seqIndex -> seqIndex == index)) {
                        for (int fieldIndex : seqComparator.compareFields()) {
                            row.setField(
                                    fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
                        }
                        continue;
                    }
                    row.setField(
                            i, aggregator == null ? field : aggregator.agg(accumulator, field));
                } else if (aggregator != null) {
                    row.setField(i, aggregator.aggReversed(accumulator, field));
                }
            }
        }
    }
// ... existing code ...
  • 判断条件seqComparator != null
  • 处理逻辑:
    1. 空序列组检查isEmptySequenceGroup(kv, seqComparator) 会检查这条新纪录 kv 中,其对应的序列号字段是否都为 null。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。
    2. 序列号比较seqComparator.compare(kv.value(), row) >= 0 是关键。它会比较新记录 kv 和当前结果 row 中,由 seqComparator 定义的序列号字段。
      • 如果新记录的序列字段 >= 当前结果的序列字段 : 这意味着新记录 kv 是“更新”的或者“同样新”的,此时应该用 kv 的值去更新 row
        • 更新 序列字段 本身: 如果当前字段 i 就是序列字段之一,那么需要把这个 sequence-group 定义的所有序列号字段都一次性更新掉,然后用 continue 跳出本次循环。这是为了保证序列字段之间的一致性。
        • 更新数据字段: 如果当前字段 i 是被序列字段 控制的数据字段,则执行更新。如果有聚合器,则调用 aggregator.agg();如果没有,则直接用新值 field 覆盖。
      • 如果新记录的序列字段 < 当前结果的序列字段 : 这意味着 kv 是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如 sum),则会调用 aggregator.aggReversed()。这个方法通常和 agg() 的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如 max),aggReversed 可能就是一个空操作。

createFieldAggregators方法详解

该方法的核心任务是:​​为表中的每一个字段(列)确定它在合并时应该使用的聚合逻辑,并创建相应的聚合器(FieldAggregator)​​。

private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
                RowType rowType,
                List<String> primaryKeys,
                List<Integer> allSequenceFields,
                CoreOptions options) {

            List<String> fieldNames = rowType.getFieldNames();
            List<DataType> fieldTypes = rowType.getFieldTypes();
            Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<>();
            for (int i = 0; i < fieldNames.size(); i++) {
                String fieldName = fieldNames.get(i);
                DataType fieldType = fieldTypes.get(i);

                if (allSequenceFields.contains(i)) {
                    // no agg for sequence fields
                    continue;
                }

                if (primaryKeys.contains(fieldName)) {
                    // aggregate by primary keys, so they do not aggregate
                    fieldAggregators.put(
                            i,
                            () ->
                                    FieldAggregatorFactory.create(
                                            fieldType,
                                            fieldName,
                                            FieldPrimaryKeyAggFactory.NAME,
                                            options));
                    continue;
                }

                String aggFuncName = getAggFuncName(options, fieldName);
                if (aggFuncName != null) {
                    // last_non_null_value doesn't require sequence group
                    checkArgument(
                            aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
                                    || fieldSeqComparators.containsKey(
                                            fieldNames.indexOf(fieldName)),
                            "Must use sequence group for aggregation functions but not found for field %s.",
                            fieldName);
                    fieldAggregators.put(
                            i,
                            () ->
                                    FieldAggregatorFactory.create(
                                            fieldType, fieldName, aggFuncName, options));
                }
            }
            return fieldAggregators;
        }

它遍历表中的所有字段,并按以下优先级规则为每个字段决定其命运:

规则一:序列字段不参与聚合

if (allSequenceFields.contains(i)) {    
    // no agg for sequence fields    
    continue;
}
  • ​说明​​:如果一个字段被定义为 sequence.field(例如 event_time),它的唯一作用就是用来比较记录的新旧,其本身的值在合并时永远是直接覆盖,不会进行求和、求最大值等聚合操作。因此直接跳过该字段的聚合逻辑。

规则二:主键不参与聚合

if (primaryKeys.contains(fieldName)) {    
    // ...    
    fieldAggregators.put(        
            i,        
            () -> FieldAggregatorFactory.create(..., FieldPrimaryKeyAggFactory.NAME, ...));    
    continue;
}
  • ​说明​​:主键字段是记录的唯一标识,它的值在合并过程中必须保持不变。这里为它创建了一个 FieldPrimaryKeyAggFactory,该工厂产生的聚合器逻辑非常简单:直接返回遇到的第一个值(每个merge function处理的总是主键相同的批次),后续的值都忽略。这种方式保证了主键的稳定性。

规则三:用户自定义的聚合函数

String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) {    
    // ...
}
  • ​说明​​:Paimon 允许用户为特定字段或所有字段(通过 fields.default.aggregate-function)指定聚合函数,例如 summaxminlast_non_null_value等。

  • ​逻辑​​:getAggFuncName会读取这些配置。如果找到了配置,就会进入一个重要的检查:

checkArgument(    
        aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)         
                || fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)),    
        ...);
  • ​背景​​:除了 last_non_null_value,其他所有聚合函数都必须有 sequence group的支持。

  • ​后续​​:检查通过后,就使用 FieldAggregatorFactory创建一个用户指定的聚合器。

规则四:默认行为(无聚合)

  • ​说明​​:如果一个字段不满足以上任何一条规则(它不是序列字段、不是主键、也没有配置任何聚合函数),那么 createFieldAggregators就不会为它创建任何聚合器。

  • ​合并逻辑​​:在后续的合并逻辑中(如 updateNonNullFieldsupdateWithSequenceGroup),对于没有聚合器的字段,其默认行为就是 ​​“非空值覆盖” (non-null value overwrite)​​。



定义多个 sequence.field会怎么样?

这个问题可以从两个层面来理解:

  1. ​一个 sequence-group 定义多个字段​

  2. ​定义多个独立的 sequence-group​


一个 sequence-group 定义多个字段

​配置语法​​:

'fields.col_a,col_b.sequence-group' = 'val_c,val_d'

  • col_a,col_b​:这就是“很多 sequence.field”。它定义了一个复合序列键 (Composite Sequence Key)。

  • val_c,val_d​:这是受这个复合序列键控制的值字段。

​行为​​:

当 Paimon 需要比较两条主键相同的记录时,它会:

  1. 先比较 col_a的值;

  2. 如果 col_a的值相等,再比较 col_b的值;

  3. 只有当 col_acol_b的值都相同时,才认为这两条记录的业务序列是相同的,进而才会去比较 sequenceNumber

​效果​​:

这提供了一种更精细的业务逻辑排序能力。例如,可能需要先按 event_date排序,如果日期相同,再按 event_time排序。这种复合键的设计可以精确地表达这种多层次的业务逻辑。


定义多个独立的 sequence-group

​配置语法​​:

'fields.event_time.sequence-group' = 'col_a,col_b',
'fields.version.sequence-group' = 'col_c,col_d'

​行为​​:

这种配置定义了互不相干的更新域。具体表现为:

  • 当一条新记录到来,只更新了 col_acol_b时,Paimon 会取出新旧记录的 event_time字段进行比较,来决定是否接受这次更新;

  • 当另一条新记录到来,只更新了 col_ccol_d时,Paimon 则会取出新旧记录的 version字段进行比较。

​关键点​​:

event_time的比较结果完全不影响 col_c的更新,反之亦然。

​效果​​:

  • ​性能影响​​:

    定义越多的 sequence-group,意味着在 PartialUpdateMergeFunction中需要执行更多的条件判断和比较逻辑。每个字段的更新都需要找到它对应的 FieldsComparator并执行比较。虽然单个比较很快,但大量的分组会增加 CPU 的开销,尤其是在合并大量数据时。

  • ​逻辑复杂性​​:

    这会使表的更新行为变得非常复杂。需要非常清楚地管理哪个字段由哪个序列字段控制,以避免出现不符合预期的更新结果。

  • ​灵活性​​:

    它提供了极大的灵活性,允许表的不同部分遵循不同的更新逻辑。例如,用户的基本信息(如姓名)可能由“最后修改时间”控制,而用户的账户余额可能由一个严格递增的“交易版本号”来控制。


总结

定义很多 sequence.field(无论是复合键还是多分组),都会让 Paimon 在处理主键冲突时执行更复杂的比较逻辑。这增加了系统的灵活性,但也带来了性能开销和管理上的复杂性。

​设计建议​​:

在设计表结构时,应仅在业务确实需要时才使用复杂的序列字段配置,并优先选择最简单、最能代表数据新旧的单个字段作为 sequence.field

为什么compaction比较 和 merge比较不同

SortMergeReaderWithMinHeapPartialUpdateMergeFunction分别负责其中一个阶段,因此它们的数据结构和视角是不同的。


把整个合并过程想象成一个工厂流水线:

第一站:排序站 (SortMergeReaderWithMinHeap)

  • ​任务​​:将来自不同文件(不同传送带)的所有零件(KeyValue记录)进行​​全局排序​​,确保主键相同、业务逻辑上更领先的零件排在前面。

  • ​工具​​:一个最小堆 (PriorityQueue)。

第二站:合并站 (PartialUpdateMergeFunction)

  • ​任务​​:从排序站拿到​​一批主键完全相同的零件​​,然后根据非常精细的规则(比如 A 字段用 A 规则更新,B 字段用 B 规则更新),将它们合并成一个最终成品。

  • ​工具​​:一个存有每种字段更新规则的“说明书” (fieldSeqComparatorsMap)。


SortMergeReaderWithMinHeap的核心是一个​​最小堆​​。最小堆的本质是,你每次只能从堆顶取出一个“最小”的元素。为了能决定谁是“最小”的,你​​必须提供一个全局统一、对所有元素都适用的比较规则​​。

这个比较规则就是在代码中看到的:

  1. 先按​​主键​​排。

  2. 如果主键相同,就用那​​一个​userDefinedSeqComparatorsequence.field排。

  3. 如果连 sequence.field都相同,最后才用 sequenceNumber排。

这里的 userDefinedSeqComparator是根据 sequence.field选项创建的。​​它代表了这张表最主要的、全局的业务时间线​​。SortMergeReader用它来解决最主要的排序问题:当主键相同时,到底哪条记录应该先被处理?

它不需要、也不能够理解“A 字段用一个比较器,B 字段用另一个”这种复杂的逻辑。它的任务很简单:​​给我一个统一的规则,我帮你把所有记录排好队​​。


SortMergeReader完成排序后,它会把​​所有主键相同的记录​​一次性地、按顺序地喂给 PartialUpdateMergeFunctionadd()方法。

此时,PartialUpdateMergeFunction的任务变了。它不再需要进行全局排序,而是要执行​​精细化的字段级合并​​。

// ...existing code...
for (int index : sequenceFields) {
    allSequenceFields.add(index);
    String fieldName = fieldNames.get(index);
    fieldSeqComparators.put(index, userDefinedSeqComparator);
    sequenceGroupMap.put(fieldName, index);
}
// ...existing code...

这段代码是在 Factory中解析用户的 sequence-group配置。它的作用是构建一个​​“更新规则地图”​​,即 fieldSeqComparators

  • ​Key (index)​​: 字段的索引(比如 col_a的索引是 5)。

  • ​Value (userDefinedSeqComparator)​​: 当要更新这个字段时,应该使用哪个比较器来判断新旧。

这个 Map 告诉 PartialUpdateMergeFunction

  • “如果要更新 col_a(索引为 5),请使用 event_time比较器。”

  • “如果要更新 col_b(索引为 6),也请使用 event_time比较器。”

  • “如果要更新 col_c(索引为 7),请使用 version比较器。”

PartialUpdateMergeFunctionupdateWithSequenceGroup方法被调用时,它会遍历一行中的所有字段,然后在这个 Map 中查找每个字段对应的更新规则(FieldsComparator),并根据规则决定是否用新值覆盖旧值。


总结

组件

角色

为什么是这种设计?

SortMergeReaderWithMinHeap

​全局排序器​

只有一个 userDefinedSeqComparator,因为它需要一个​​统一的、全局的规则​​来对最小堆中的所有元素进行排序,以决定处理的先后顺序。

PartialUpdateMergeFunction

​字段级合并器​

有一个 Map<Integer, FieldsComparator>,因为它处理的是已经被分组好的、主键相同的记录。它需要一个​​详细的、分字段的规则集​​来决定如何合并每个字段的值。


所以,这两个组件的设计并不矛盾,而是完美地体现了它们在整个合并流程中​​不同阶段、不同职责​​的分工。SortMergeReader负责宏观的排序,PartialUpdateMergeFunction负责微观的合并。

​配置解析(Factory类)PartialMergeFunction

// 解析sequence group配置
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
    String k = entry.getKey();
    String v = entry.getValue();
    if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
        // 解析sequence字段
        int[] sequenceFields = ...;
        
        // 创建比较器
        Supplier<FieldsComparator> userDefinedSeqComparator = 
            () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
        
        // 为每个目标字段设置比较器
        Arrays.stream(v.split(FIELDS_SEPARATOR))
            .forEach(field -> fieldSeqComparators.put(field, userDefinedSeqComparator));
    }
}

更新逻辑(updateWithSequenceGroup方法)

private void updateWithSequenceGroup(KeyValue kv) {
    for (int i = 0; i < getters.length; i++) {
        FieldsComparator seqComparator = fieldSeqComparators.get(i);
        if (seqComparator != null) {
            // 比较序列字段
            if (seqComparator.compare(kv.value(), row) >= 0) {
                // 新记录的序列号更大,更新字段
                row.setField(i, field);
                
                // 多个序列字段需要同时更新
                if (Arrays.stream(seqComparator.compareFields())
                    .anyMatch(seqIndex -> seqIndex == index)) {
                    for (int fieldIndex : seqComparator.compareFields()) {
                        row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
                    }
                }
            }
        }
    }
}

撤回逻辑(retractWithSequenceGroup方法)

private void retractWithSequenceGroup(KeyValue kv) {
    if (seqComparator.compare(kv.value(), row) >= 0) {
        // 处理删除记录
        if (kv.valueKind() == RowKind.DELETE && 
            sequenceGroupPartialDelete.contains(field)) {
            currentDeleteRow = true;
            return;
        }
        // 撤回字段值
        row.setField(field, getters[field].getFieldOrNull(kv.value()));
    }
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐