Log Store

简单来说,Log Store 是为 Paimon 流式读取提供超低延迟能力的一个可选组件

当你向一个配置了 Log Store 的 Paimon 表写入数据时,Paimon 会执行一个“双写”操作:

  1. 主存储: 数据会被写入 Paimon 的主存储系统,比如 HDFS 或 S3。这部分数据只有在 Flink 作业成功 Checkpoint 之后,才会生成一个新的快照 (Snapshot),并对下游的批处理或者新的流作业可见。
  2. 日志存储 (Log Store): 与此同时,同样的数据会实时地被发送到一个外部的消息队列中(一个Topic),这个消息队列就是 Log Store。 KafkaLogStoreFactory.java 文件,就是 Paimon 使用 Kafka 作为 Log Store 的具体实现。

这个设计的核心目的是:让流式读取的作业不必等待写入作业的 Checkpoint 完成,就能立刻消费到最新的数据。

需要等待 Checkpoint 吗?

这正是 Log Store 的关键优势所在:流读不需要等待写作业的 Checkpoint

我们来对比一下两种读取模式:

  • 没有 Log Store 的情况:

    • 一个 Flink 作业向 Paimon 表A 写入数据。
    • 另一个 Flink 作业从表A 流式读取数据。
    • 读取作业只能读到表A中已经成功提交 (Committed) 的快照。而快照的提交依赖于写入作业的 Checkpoint 完成。
    • 结果: 读取的延迟直接取决于写入作业的 Checkpoint 间隔。如果 Checkpoint 间隔是5分钟,那么数据的端到端延迟至少是5分钟。
  • 配置了 Kafka 作为 Log Store 的情况:

    • 一个 Flink 作业向 Paimon 表A 写入数据。数据被双写到主存储和 Kafka Topic。
    • 另一个 Flink 作业从表A 流式读取数据。
    • 读取作业启动时,会先读取主存储中已提交的历史数据。
    • 当历史数据追赶上后,它会无缝切换到直接从 Kafka Log Store 中消费最新的增量数据。这些数据是写入作业刚发出、还未被 Checkpoint 提交的“热数据”。
    • 结果: 读取的延迟大大降低,几乎可以达到秒级甚至毫秒级,不再受限于写入作业的 Checkpoint 间隔。

 KafkaLogStoreFactory 就是这个机制的“工厂类”。当你在表属性中配置了 'log.system' = 'kafka' 时,Paimon 就会用这个工厂来创建与 Kafka Topic 交互的 Source 和 Sink,从而实现了上述的低延迟流读能力。

KafkaLogStoreFactory.java

// ... existing code ...
public class KafkaLogStoreFactory implements LogStoreTableFactory {

    public static final String IDENTIFIER = "kafka";

// ... existing code ...
    @Override
    public String identifier() {
        return IDENTIFIER;
    }

    private String topic(Context context) {
        return context.getCatalogTable().getOptions().get(TOPIC.key());
    }

    @Override
    public KafkaLogSourceProvider createSourceProvider(
// ... existing code ...
    }

    @Override
    public KafkaLogSinkProvider createSinkProvider(
// ... existing code ...
    }
// ... existing code ...
}

总结一下:Log Store 是一个为流读任务准备的“快车道”,它通过旁路的消息队列(如 Kafka)绕过了写任务的 Checkpoint 提交延迟,从而实现了极低的端到端数据可见性。

如何证明

Log Store 使得流式读取无需等待写入作业的 Checkpoint,从而实现低延迟

这个论点的证明就在 Paimon 的集成测试中,特别是 StreamingReadWriteTableWithKafkaLogITCase.java 这个测试类。它的命名就清晰地表明了其目的:测试带 Kafka Log 的表的流式读写。

让我们来看一下这个测试类中的一个典型测试流程:

// ...
        // 步骤 1: 创建一个带 Kafka Log Store 的 Paimon 表
        table =
                createTableWithKafkaLog(
                        Arrays.asList("currency STRING", "rate BIGINT"),
                        Collections.singletonList("currency"),
                        Collections.emptyList(),
                        false);

        // 步骤 2: 向表中写入一批初始数据
        // 这个操作会完成并提交一个或多个快照 (snapshot)
        insertIntoFromTable(temporaryTable, table);

        // 步骤 3: 启动一个流式查询作业,并验证它能立刻读到写入的数据
        testStreamingRead(
                        buildQueryWithTableOptions(table, "*", "", scanFromTimeStampMillis(0L)),
                        Arrays.asList(
                                changelogRow("+I", "US Dollar", 102L),
                                changelogRow("+I", "Euro", 114L),
                                changelogRow("+I", "Yen", 1L),
                                changelogRow("-U", "Euro", 114L),
                                changelogRow("+U", "Euro", 119L)))
                .close();
// ...

这个测试流程证明了以上观点:

  1. 写入与读取: 测试首先通过 insertIntoFromTable 向 Paimon 表写入数据。这些数据被写入主存储(如 HDFS)并生成快照,同时也被写入 Kafka Log Store。
  2. 启动流读: 接着,testStreamingRead 方法会启动一个流式的 Flink 作业来读取这张表。
  3. 读取机制: 这个流式作业的读取过程分为两阶段:
    • 追赶阶段 (Catch-up): 首先,它会读取主存储上已提交的快照数据,确保历史数据完整。
    • 增量阶段 (Incremental): 读取完快照数据后,作业会无缝切换到直接从 Kafka Log Store 的 Topic 中消费实时增量数据。
  4. 无需等待 CheckpointtestStreamingRead 能够立即验证数据的正确性,说明读取作业没有等待写入作业的下一个 Checkpoint。它直接从 Kafka Log 中消费了最新的变更。如果需要等待 Checkpoint,这个测试将无法通过,或者需要复杂的同步机制来等待 Checkpoint 完成,但测试代码中并没有这样的逻辑。

Paimon 的集成测试 StreamingReadWriteTableWithKafkaLogITCase 通过模拟“先写入、后流读”的场景,验证了 Paimon 流式 Source 的核心能力:结合快照(Snapshot)读取和日志(Log)读取。这种架构确保了数据既完整(来自快照)又实时(来自 Log Store),从而证明了流读任务无需等待写入任务的 Checkpoint 周期,实现了低延迟的数据可见性。

Operator相关处理的代码

RowDataStoreWriteOperator处理的时候会判断是不是有logSink

    @Override
    public void processElement(StreamRecord<InternalRow> element) throws Exception {
        sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;

        SinkRecord record;
        try {
            record = write.write(element.getValue());
        } catch (Exception e) {
            throw new IOException(e);
        }

        if (record != null
                && logSinkFunction != null
                && (!logIgnoreDelete || record.row().getRowKind().isAdd())) {
            // write to log store, need to preserve original pk (which includes partition fields)
            SinkRecord logRecord = write.toLogRecord(record);
            logSinkFunction.invoke(logRecord, sinkContext);
        }
    }

使用 Log Store 的“风险”是什么?

当你使用 Log Store 并让下游直接消费时,下游实际上是在消费尚未被 Flink Checkpoint 确认的数据。这就引入了以下潜在风险:

风险:数据可能被“撤回” (Retraction)。

设想一个场景:

  1. T1 时刻: 写入作业处理了一条数据 (id=1, value='A'),并立刻将其写入了 Kafka Log Store。
  2. T2 时刻: 下游的流式读取作业立刻从 Kafka Log Store 消费了这条数据 (id=1, value='A'),并可能已经基于它做了某些计算或发送了通知。
  3. T3 时刻: 写入作业在完成 Checkpoint 之前,因为某种原因(比如机器故障)失败并重启了。
  4. T4 时刻: 写入作业从上一个成功的 Checkpoint 恢复,重新处理数据。在这次处理中,id=1 的数据可能因为逻辑变化变成了 (id=1, value='B'),或者这条数据因为上游问题根本就不存在了。
  5. 结果: 下游作业在 T2 时刻消费的 (id=1, value='A') 就成了一个“幽灵数据”。它在最终提交的 Paimon 快照中是不存在的。

Paimon 的 changelog-producer 机制可以一定程度上缓解这个问题。当写入作业恢复并处理新数据时,它会为 id=1 生成一条撤回消息 -U (id=1, value='A') 和一条更新消息 +U (id=1, value='B')。下游需要有能力正确处理这种数据撤回的逻辑。

Paimon 如何在测试中体现和权衡?

StreamingReadWriteTableWithKafkaLogITCase.java 文件中,通过不同的 scan.mode(扫描模式)来体现这种权衡:

  • scan.mode = latest-full (默认模式,混合读取)

    • 行为: 先读取最新的已提交快照的数据,然后无缝切换到从 Kafka Log Store 读取增量数据。
    • 权衡: 提供了历史数据的完整性增量数据的低延迟。这是最常用的模式,它在一致性和延迟之间取得了很好的平衡。测试 testReadWriteWithPartitionedRecordsWithPk 就属于这种模式。
  • scan.mode = latest (仅日志读取)

    • 行为: 完全跳过快照数据,直接从 Kafka Log Store 的最新位置开始消费实时 changelog
    • 权衡: 获得了极致的低延迟,但代价是丢失了历史状态。下游只能看到“此刻之后”的变更流,看不到表的存量数据。这适用于那些只关心增量变化的场景。测试 testReadLatestChangelogOfPartitionedRecordsWithPk 就是这种模式。

StreamingReadWriteTableWithKafkaLogITCase.java

// ... existing code ...
    @Test
    public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws Exception {
// ... existing code ...
        String table =
                createTableWithKafkaLog(
                        Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"),
                        Arrays.asList("currency", "dt"),
                        Collections.singletonList("dt"),
                        true);

        BlockingIterator<Row, Row> streamItr =
                testStreamingReadWithReadFirst(
                        temporaryTable,
                        table,
                        // 注意这里的 SCAN_LATEST 选项,它将 scan.mode 设置为 'latest'
                        buildQueryWithTableOptions(table, "*", "", SCAN_LATEST),
                        Arrays.asList(
                                changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
// ... existing code ...

总结

  • 等待 Checkpoint:以延迟为代价,换取最强的数据一致性、事务性和故障恢复能力。这是数据仓库的基石。
  • 使用 Log Store:以“可能需要处理数据撤回”为代价,换取极低的流式读取延迟。

Paimon 的设计非常精妙,它没有强迫用户二选一,而是通过 Log Store 这个可选组件和不同的扫描模式,将选择权交给了用户。你可以根据业务场景对一致性和延迟的不同要求,来决定是否开启 Log Store 以及如何消费它。

为什么 FlinkKafkaProducer 被弃用了?

FlinkKafkaProducer 是 Flink 早期版本中连接 Kafka 的主要方式。随着 Flink 的版本迭代,社区推出了一个更强大、更统一的新版连接器 API (Sink V2 API),并推荐使用新的 KafkaSink。因此,旧的 FlinkKafkaProducer 就被标记为“弃用 (deprecated)”了。这是来自 Flink 框架本身的变化,而不是 Paimon 项目的决定。

LogSinkFunction 是否还在使用?

LogSinkFunction 在 Paimon 中仍在使用。它并没有被弃用。

在 Paimon 中,LogSinkFunction 是一个核心接口,用于将表的变更日志(changelog)发送到外部的消息队列系统(比如 Kafka),这是 Paimon 实现“日志存储 (log store)”功能的重要一环。

KafkaSinkFunction 正是 LogSinkFunction 接口针对 Kafka 的一个具体实现。

// ... existing code ...
/**
 * A {@link FlinkKafkaProducer} which implements {@link LogSinkFunction} to register {@link
 * WriteCallback}.
 */
public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements LogSinkFunction {
// ... existing code ...

为什么 Paimon 仍然使用被弃用的 FlinkKafkaProducer

这主要是出于 兼容性 的考虑。Paimon 项目需要支持多个不同版本的 Flink。直接切换到 Flink 新的 KafkaSink API 会导致无法兼容旧版本的 Flink。因此,Paimon 在这里选择继续使用 FlinkKafkaProducer,以确保代码能够在不同的 Flink 环境下都能正常工作。

可以从代码中的注释看到为了兼容性所做的努力,比如 open 方法为了同时兼容 Flink 1.18- 和 Flink 2.0+ 的版本,特意没有加上 @Override 注解。

KafkaSinkFunction.java

// ... existing code ...
    /**
     * Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.18-.
     */
    public void open(OpenContext openContext) throws Exception {
        open(new Configuration());
    }

    /**
     * Do not annotate with <code>@override</code> here to maintain compatibility with Flink 2.0+.
     */
    public void open(Configuration configuration) throws Exception {
// ... existing code ...

另外,无法访问 org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction 也与此相关。在 Flink 较新的版本中,SinkFunction 接口被移到了 legacy 包下,Paimon 通过内部的兼容模块来解决这个问题,保证代码可以编译和运行。

总结

  • FlinkKafkaProducer 的弃用 是 Flink 官方行为,推荐使用新的 KafkaSink
  • Paimon 继续使用它 是为了兼容多个 Flink 版本。
  • LogSinkFunction 及其相关功能 在 Paimon 中是正常使用的,是你看到的这个日志系统功能的核心部分。

Invoke 逻辑

KafkaSinkFunction 类本身并没有直接实现 invoke 方法。它继承了 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer,因此 invoke 的核心逻辑实际上是在父类 FlinkKafkaProducer 中实现的。

当看到 logSinkFunction.invoke(logRecord, sinkContext) 这行代码被调用时,实际执行的是 FlinkKafkaProducer 的 invoke 方法。它的主要流程如下:

  1. 检查错误:首先会检查之前是否有异步发送失败的异常,如果有则直接抛出,保证作业失败。
  2. 序列化:这是最核心的一步。它会调用 KafkaSerializationSchema 的 serialize 方法,将输入的 Paimon SinkRecord 对象转换成 Kafka 的 ProducerRecord<byte[], byte[]> 对象。
    • 在 Paimon 的场景下,这个 KafkaSerializationSchema 的具体实现是 KafkaLogSerializationSchema
    • 它会根据表是否有主键、以及配置的 log.changelog-mode,来决定如何序列化 SinkRecord 中的主键和行数据,并将它们分别作为 Kafka 消息的 key 和 value。
    • SinkRecord 中的 bucket() 信息会被用作 Kafka 的分区号。
  3. 发送数据:调用内部的 Kafka Producer 实例的 send 方法,将序列化后的 ProducerRecord 异步地发送给 Kafka 集群。
  4. 注册回调send 方法会附带一个 Callback。当 Kafka Broker 确认收到消息或发送失败时,这个回调函数会被触发。
    • 在 KafkaSinkFunction 中,这个回调函数被包装了一层。除了处理异常和确认消息外,它还会调用 writeCallback.onCompletion(metadata.partition(), metadata.offset())
    • 这个 writeCallback 会将成功写入消息的分区和 offset 位置报告给上游的 RowDataStoreWriteOperator,这样 Paimon 就能在做 Checkpoint 时记录下日志系统的消费位点,以保证端到端的一致性。

下面是 FlinkKafkaProducer 中 invoke 方法的简化逻辑,可以对照着理解:

// ... existing code ...
    public void invoke(KafkaTransactionState transaction, IN next, SinkFunction.Context context) throws FlinkKafkaException {
        // 1. 检查是否有异步错误
        this.checkErroneous();
        
        ProducerRecord<byte[], byte[]> record;
        
        // ... 省略了部分 keyedSchema 的逻辑 ...

        // 2. 调用序列化Schema,将输入的记录(next,即SinkRecord)转换为ProducerRecord
        // 在Paimon中,这里的kafkaSchema就是KafkaLogSerializationSchema
        record = this.kafkaSchema.serialize(next, context.timestamp());

        // 3. 增加待处理记录的计数器,并异步发送
        this.pendingRecords.incrementAndGet();
        transaction.producer.send(record, this.callback);
    }
// ... existing code ...

Sink 的主要配置

KafkaSinkFunction 本身是一个简单的包装类,它的配置主要通过构造函数传递给父类 FlinkKafkaProducer。从它的构造函数来看,主要有以下几个配置:

// ... existing code ...
    public KafkaSinkFunction(
            String defaultTopic,
            KafkaSerializationSchema<SinkRecord> serializationSchema,
            Properties producerConfig,
            KafkaSinkFunction.Semantic semantic) {
        super(defaultTopic, serializationSchema, producerConfig, semantic);
    }
// ... existing code ...
  1. defaultTopic (String)

    • 作用:指定要写入的默认 Kafka 主题(Topic)。
    • 来源:通常由 Paimon 表属性中的 log.system.topic 或 kafka.topic 指定。
  2. serializationSchema (KafkaSerializationSchema)

    • 作用:定义了如何将 SinkRecord 转换成 Kafka 消息。这是连接 Paimon 数据和 Kafka 消息格式的桥梁。
    • 实现:在 Paimon 中,这里传入的是 KafkaLogSerializationSchema 的实例。
  3. producerConfig (Properties)

    • 作用:标准的 Kafka Producer 配置。所有 org.apache.kafka.clients.producer.KafkaProducer 支持的参数都可以通过这个 Properties 对象进行配置。
    • 关键配置:最重要的配置是 bootstrap.servers,用于指定 Kafka 集群的地址。其他如 acksretriescompression.type 等也在这里设置。
    • 来源:这些配置来自于 Paimon 表属性中所有以 kafka. 为前缀的配置项。
  4. semantic (Semantic)

    • 作用:定义了 Sink 的一致性语义,可以是 EXACTLY_ONCE(精确一次)、AT_LEAST_ONCE(至少一次)或 NONE
    • 来源:这个配置由 Paimon 表属性中的 log.consistency 决定。TRANSACTIONAL 对应 EXACTLY_ONCEEVENTUAL 对应 AT_LEAST_ONCE

除了构造函数中的配置,还有一个通过 setter 方法设置的回调函数,对 Paimon 的日志系统至关重要:

  • writeCallback (WriteCallback)
    • 作用:如上所述,这个回调用于将成功写入 Kafka 的位点(partition 和 offset)通知给 Paimon 的写算子,用于状态的持久化。
    • 设置:在 RowDataStoreWriteOperator 的 open 方法中被设置。

总结来说,KafkaSinkFunction 的 invoke 逻辑是利用 KafkaSerializationSchema 将 Paimon 的 SinkRecord 序列化后,异步发送到 Kafka。其核心配置包括 Kafka 的连接信息、Topic、序列化逻辑和一致性语义。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐