Paimon LogStore:秒级数据可见的秘诀
LogStore是Paimon实现低延迟流式读取的可选组件,通过双写机制将数据同时写入主存储和外部消息队列(如Kafka)。流读作业无需等待写入作业的Checkpoint完成,可直接从消息队列消费最新数据,实现毫秒级延迟。KafkaLogStoreFactory是具体实现,通过配置表属性启用。测试用例StreamingReadWriteTableWithKafkaLogITCase验证了这一机制,
Log Store
简单来说,Log Store 是为 Paimon 流式读取提供超低延迟能力的一个可选组件。
当你向一个配置了 Log Store 的 Paimon 表写入数据时,Paimon 会执行一个“双写”操作:
- 主存储: 数据会被写入 Paimon 的主存储系统,比如 HDFS 或 S3。这部分数据只有在 Flink 作业成功 Checkpoint 之后,才会生成一个新的快照 (Snapshot),并对下游的批处理或者新的流作业可见。
- 日志存储 (Log Store): 与此同时,同样的数据会实时地被发送到一个外部的消息队列中(一个Topic),这个消息队列就是 Log Store。
KafkaLogStoreFactory.java
文件,就是 Paimon 使用 Kafka 作为 Log Store 的具体实现。
这个设计的核心目的是:让流式读取的作业不必等待写入作业的 Checkpoint 完成,就能立刻消费到最新的数据。
需要等待 Checkpoint 吗?
这正是 Log Store 的关键优势所在:流读不需要等待写作业的 Checkpoint。
我们来对比一下两种读取模式:
-
没有 Log Store 的情况:
- 一个 Flink 作业向 Paimon 表A 写入数据。
- 另一个 Flink 作业从表A 流式读取数据。
- 读取作业只能读到表A中已经成功提交 (Committed) 的快照。而快照的提交依赖于写入作业的 Checkpoint 完成。
- 结果: 读取的延迟直接取决于写入作业的 Checkpoint 间隔。如果 Checkpoint 间隔是5分钟,那么数据的端到端延迟至少是5分钟。
-
配置了 Kafka 作为 Log Store 的情况:
- 一个 Flink 作业向 Paimon 表A 写入数据。数据被双写到主存储和 Kafka Topic。
- 另一个 Flink 作业从表A 流式读取数据。
- 读取作业启动时,会先读取主存储中已提交的历史数据。
- 当历史数据追赶上后,它会无缝切换到直接从 Kafka Log Store 中消费最新的增量数据。这些数据是写入作业刚发出、还未被 Checkpoint 提交的“热数据”。
- 结果: 读取的延迟大大降低,几乎可以达到秒级甚至毫秒级,不再受限于写入作业的 Checkpoint 间隔。
KafkaLogStoreFactory
就是这个机制的“工厂类”。当你在表属性中配置了 'log.system' = 'kafka'
时,Paimon 就会用这个工厂来创建与 Kafka Topic 交互的 Source
和 Sink
,从而实现了上述的低延迟流读能力。
KafkaLogStoreFactory.java
// ... existing code ...
public class KafkaLogStoreFactory implements LogStoreTableFactory {
public static final String IDENTIFIER = "kafka";
// ... existing code ...
@Override
public String identifier() {
return IDENTIFIER;
}
private String topic(Context context) {
return context.getCatalogTable().getOptions().get(TOPIC.key());
}
@Override
public KafkaLogSourceProvider createSourceProvider(
// ... existing code ...
}
@Override
public KafkaLogSinkProvider createSinkProvider(
// ... existing code ...
}
// ... existing code ...
}
总结一下:Log Store 是一个为流读任务准备的“快车道”,它通过旁路的消息队列(如 Kafka)绕过了写任务的 Checkpoint 提交延迟,从而实现了极低的端到端数据可见性。
如何证明
Log Store 使得流式读取无需等待写入作业的 Checkpoint,从而实现低延迟。
这个论点的证明就在 Paimon 的集成测试中,特别是 StreamingReadWriteTableWithKafkaLogITCase.java
这个测试类。它的命名就清晰地表明了其目的:测试带 Kafka Log 的表的流式读写。
让我们来看一下这个测试类中的一个典型测试流程:
// ...
// 步骤 1: 创建一个带 Kafka Log Store 的 Paimon 表
table =
createTableWithKafkaLog(
Arrays.asList("currency STRING", "rate BIGINT"),
Collections.singletonList("currency"),
Collections.emptyList(),
false);
// 步骤 2: 向表中写入一批初始数据
// 这个操作会完成并提交一个或多个快照 (snapshot)
insertIntoFromTable(temporaryTable, table);
// 步骤 3: 启动一个流式查询作业,并验证它能立刻读到写入的数据
testStreamingRead(
buildQueryWithTableOptions(table, "*", "", scanFromTimeStampMillis(0L)),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 114L),
changelogRow("+I", "Yen", 1L),
changelogRow("-U", "Euro", 114L),
changelogRow("+U", "Euro", 119L)))
.close();
// ...
这个测试流程证明了以上观点:
- 写入与读取: 测试首先通过
insertIntoFromTable
向 Paimon 表写入数据。这些数据被写入主存储(如 HDFS)并生成快照,同时也被写入 Kafka Log Store。 - 启动流读: 接着,
testStreamingRead
方法会启动一个流式的 Flink 作业来读取这张表。 - 读取机制: 这个流式作业的读取过程分为两阶段:
- 追赶阶段 (Catch-up): 首先,它会读取主存储上已提交的快照数据,确保历史数据完整。
- 增量阶段 (Incremental): 读取完快照数据后,作业会无缝切换到直接从 Kafka Log Store 的 Topic 中消费实时增量数据。
- 无需等待 Checkpoint:
testStreamingRead
能够立即验证数据的正确性,说明读取作业没有等待写入作业的下一个 Checkpoint。它直接从 Kafka Log 中消费了最新的变更。如果需要等待 Checkpoint,这个测试将无法通过,或者需要复杂的同步机制来等待 Checkpoint 完成,但测试代码中并没有这样的逻辑。
Paimon 的集成测试 StreamingReadWriteTableWithKafkaLogITCase
通过模拟“先写入、后流读”的场景,验证了 Paimon 流式 Source 的核心能力:结合快照(Snapshot)读取和日志(Log)读取。这种架构确保了数据既完整(来自快照)又实时(来自 Log Store),从而证明了流读任务无需等待写入任务的 Checkpoint 周期,实现了低延迟的数据可见性。
Operator相关处理的代码
RowDataStoreWriteOperator处理的时候会判断是不是有logSink
@Override
public void processElement(StreamRecord<InternalRow> element) throws Exception {
sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
SinkRecord record;
try {
record = write.write(element.getValue());
} catch (Exception e) {
throw new IOException(e);
}
if (record != null
&& logSinkFunction != null
&& (!logIgnoreDelete || record.row().getRowKind().isAdd())) {
// write to log store, need to preserve original pk (which includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
}
}
使用 Log Store 的“风险”是什么?
当你使用 Log Store 并让下游直接消费时,下游实际上是在消费尚未被 Flink Checkpoint 确认的数据。这就引入了以下潜在风险:
风险:数据可能被“撤回” (Retraction)。
设想一个场景:
- T1 时刻: 写入作业处理了一条数据
(id=1, value='A')
,并立刻将其写入了 Kafka Log Store。 - T2 时刻: 下游的流式读取作业立刻从 Kafka Log Store 消费了这条数据
(id=1, value='A')
,并可能已经基于它做了某些计算或发送了通知。 - T3 时刻: 写入作业在完成 Checkpoint 之前,因为某种原因(比如机器故障)失败并重启了。
- T4 时刻: 写入作业从上一个成功的 Checkpoint 恢复,重新处理数据。在这次处理中,
id=1
的数据可能因为逻辑变化变成了(id=1, value='B')
,或者这条数据因为上游问题根本就不存在了。 - 结果: 下游作业在 T2 时刻消费的
(id=1, value='A')
就成了一个“幽灵数据”。它在最终提交的 Paimon 快照中是不存在的。
Paimon 的 changelog-producer
机制可以一定程度上缓解这个问题。当写入作业恢复并处理新数据时,它会为 id=1
生成一条撤回消息 -U (id=1, value='A')
和一条更新消息 +U (id=1, value='B')
。下游需要有能力正确处理这种数据撤回的逻辑。
Paimon 如何在测试中体现和权衡?
StreamingReadWriteTableWithKafkaLogITCase.java
文件中,通过不同的 scan.mode
(扫描模式)来体现这种权衡:
-
scan.mode
=latest-full
(默认模式,混合读取)- 行为: 先读取最新的已提交快照的数据,然后无缝切换到从 Kafka Log Store 读取增量数据。
- 权衡: 提供了历史数据的完整性和增量数据的低延迟。这是最常用的模式,它在一致性和延迟之间取得了很好的平衡。测试
testReadWriteWithPartitionedRecordsWithPk
就属于这种模式。
-
scan.mode
=latest
(仅日志读取)- 行为: 完全跳过快照数据,直接从 Kafka Log Store 的最新位置开始消费实时 changelog。
- 权衡: 获得了极致的低延迟,但代价是丢失了历史状态。下游只能看到“此刻之后”的变更流,看不到表的存量数据。这适用于那些只关心增量变化的场景。测试
testReadLatestChangelogOfPartitionedRecordsWithPk
就是这种模式。
StreamingReadWriteTableWithKafkaLogITCase.java
// ... existing code ...
@Test
public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws Exception {
// ... existing code ...
String table =
createTableWithKafkaLog(
Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"),
Arrays.asList("currency", "dt"),
Collections.singletonList("dt"),
true);
BlockingIterator<Row, Row> streamItr =
testStreamingReadWithReadFirst(
temporaryTable,
table,
// 注意这里的 SCAN_LATEST 选项,它将 scan.mode 设置为 'latest'
buildQueryWithTableOptions(table, "*", "", SCAN_LATEST),
Arrays.asList(
changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
// ... existing code ...
总结
- 等待 Checkpoint:以延迟为代价,换取最强的数据一致性、事务性和故障恢复能力。这是数据仓库的基石。
- 使用 Log Store:以“可能需要处理数据撤回”为代价,换取极低的流式读取延迟。
Paimon 的设计非常精妙,它没有强迫用户二选一,而是通过 Log Store 这个可选组件和不同的扫描模式,将选择权交给了用户。你可以根据业务场景对一致性和延迟的不同要求,来决定是否开启 Log Store 以及如何消费它。
为什么 FlinkKafkaProducer
被弃用了?
FlinkKafkaProducer
是 Flink 早期版本中连接 Kafka 的主要方式。随着 Flink 的版本迭代,社区推出了一个更强大、更统一的新版连接器 API (Sink V2 API),并推荐使用新的 KafkaSink
。因此,旧的 FlinkKafkaProducer
就被标记为“弃用 (deprecated)”了。这是来自 Flink 框架本身的变化,而不是 Paimon 项目的决定。
LogSinkFunction
是否还在使用?
LogSinkFunction
在 Paimon 中仍在使用。它并没有被弃用。
在 Paimon 中,LogSinkFunction
是一个核心接口,用于将表的变更日志(changelog)发送到外部的消息队列系统(比如 Kafka),这是 Paimon 实现“日志存储 (log store)”功能的重要一环。
KafkaSinkFunction
正是 LogSinkFunction
接口针对 Kafka 的一个具体实现。
// ... existing code ...
/**
* A {@link FlinkKafkaProducer} which implements {@link LogSinkFunction} to register {@link
* WriteCallback}.
*/
public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements LogSinkFunction {
// ... existing code ...
为什么 Paimon 仍然使用被弃用的 FlinkKafkaProducer
?
这主要是出于 兼容性 的考虑。Paimon 项目需要支持多个不同版本的 Flink。直接切换到 Flink 新的 KafkaSink
API 会导致无法兼容旧版本的 Flink。因此,Paimon 在这里选择继续使用 FlinkKafkaProducer
,以确保代码能够在不同的 Flink 环境下都能正常工作。
可以从代码中的注释看到为了兼容性所做的努力,比如 open
方法为了同时兼容 Flink 1.18- 和 Flink 2.0+ 的版本,特意没有加上 @Override
注解。
KafkaSinkFunction.java
// ... existing code ...
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.18-.
*/
public void open(OpenContext openContext) throws Exception {
open(new Configuration());
}
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 2.0+.
*/
public void open(Configuration configuration) throws Exception {
// ... existing code ...
另外,无法访问 org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction
也与此相关。在 Flink 较新的版本中,SinkFunction
接口被移到了 legacy
包下,Paimon 通过内部的兼容模块来解决这个问题,保证代码可以编译和运行。
总结
FlinkKafkaProducer
的弃用 是 Flink 官方行为,推荐使用新的KafkaSink
。- Paimon 继续使用它 是为了兼容多个 Flink 版本。
LogSinkFunction
及其相关功能 在 Paimon 中是正常使用的,是你看到的这个日志系统功能的核心部分。
Invoke 逻辑
KafkaSinkFunction
类本身并没有直接实现 invoke
方法。它继承了 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
,因此 invoke
的核心逻辑实际上是在父类 FlinkKafkaProducer
中实现的。
当看到 logSinkFunction.invoke(logRecord, sinkContext)
这行代码被调用时,实际执行的是 FlinkKafkaProducer
的 invoke
方法。它的主要流程如下:
- 检查错误:首先会检查之前是否有异步发送失败的异常,如果有则直接抛出,保证作业失败。
- 序列化:这是最核心的一步。它会调用
KafkaSerializationSchema
的serialize
方法,将输入的 PaimonSinkRecord
对象转换成 Kafka 的ProducerRecord<byte[], byte[]>
对象。- 在 Paimon 的场景下,这个
KafkaSerializationSchema
的具体实现是KafkaLogSerializationSchema
。 - 它会根据表是否有主键、以及配置的
log.changelog-mode
,来决定如何序列化SinkRecord
中的主键和行数据,并将它们分别作为 Kafka 消息的 key 和 value。 SinkRecord
中的bucket()
信息会被用作 Kafka 的分区号。
- 在 Paimon 的场景下,这个
- 发送数据:调用内部的 Kafka
Producer
实例的send
方法,将序列化后的ProducerRecord
异步地发送给 Kafka 集群。 - 注册回调:
send
方法会附带一个Callback
。当 Kafka Broker 确认收到消息或发送失败时,这个回调函数会被触发。- 在
KafkaSinkFunction
中,这个回调函数被包装了一层。除了处理异常和确认消息外,它还会调用writeCallback.onCompletion(metadata.partition(), metadata.offset())
。 - 这个
writeCallback
会将成功写入消息的分区和 offset 位置报告给上游的RowDataStoreWriteOperator
,这样 Paimon 就能在做 Checkpoint 时记录下日志系统的消费位点,以保证端到端的一致性。
- 在
下面是 FlinkKafkaProducer
中 invoke
方法的简化逻辑,可以对照着理解:
// ... existing code ...
public void invoke(KafkaTransactionState transaction, IN next, SinkFunction.Context context) throws FlinkKafkaException {
// 1. 检查是否有异步错误
this.checkErroneous();
ProducerRecord<byte[], byte[]> record;
// ... 省略了部分 keyedSchema 的逻辑 ...
// 2. 调用序列化Schema,将输入的记录(next,即SinkRecord)转换为ProducerRecord
// 在Paimon中,这里的kafkaSchema就是KafkaLogSerializationSchema
record = this.kafkaSchema.serialize(next, context.timestamp());
// 3. 增加待处理记录的计数器,并异步发送
this.pendingRecords.incrementAndGet();
transaction.producer.send(record, this.callback);
}
// ... existing code ...
Sink 的主要配置
KafkaSinkFunction
本身是一个简单的包装类,它的配置主要通过构造函数传递给父类 FlinkKafkaProducer
。从它的构造函数来看,主要有以下几个配置:
// ... existing code ...
public KafkaSinkFunction(
String defaultTopic,
KafkaSerializationSchema<SinkRecord> serializationSchema,
Properties producerConfig,
KafkaSinkFunction.Semantic semantic) {
super(defaultTopic, serializationSchema, producerConfig, semantic);
}
// ... existing code ...
-
defaultTopic
(String)- 作用:指定要写入的默认 Kafka 主题(Topic)。
- 来源:通常由 Paimon 表属性中的
log.system.topic
或kafka.topic
指定。
-
serializationSchema
(KafkaSerializationSchema)- 作用:定义了如何将
SinkRecord
转换成 Kafka 消息。这是连接 Paimon 数据和 Kafka 消息格式的桥梁。 - 实现:在 Paimon 中,这里传入的是
KafkaLogSerializationSchema
的实例。
- 作用:定义了如何将
-
producerConfig
(Properties)- 作用:标准的 Kafka Producer 配置。所有
org.apache.kafka.clients.producer.KafkaProducer
支持的参数都可以通过这个Properties
对象进行配置。 - 关键配置:最重要的配置是
bootstrap.servers
,用于指定 Kafka 集群的地址。其他如acks
,retries
,compression.type
等也在这里设置。 - 来源:这些配置来自于 Paimon 表属性中所有以
kafka.
为前缀的配置项。
- 作用:标准的 Kafka Producer 配置。所有
-
semantic
(Semantic)- 作用:定义了 Sink 的一致性语义,可以是
EXACTLY_ONCE
(精确一次)、AT_LEAST_ONCE
(至少一次)或NONE
。 - 来源:这个配置由 Paimon 表属性中的
log.consistency
决定。TRANSACTIONAL
对应EXACTLY_ONCE
,EVENTUAL
对应AT_LEAST_ONCE
。
- 作用:定义了 Sink 的一致性语义,可以是
除了构造函数中的配置,还有一个通过 setter
方法设置的回调函数,对 Paimon 的日志系统至关重要:
writeCallback
(WriteCallback)- 作用:如上所述,这个回调用于将成功写入 Kafka 的位点(partition 和 offset)通知给 Paimon 的写算子,用于状态的持久化。
- 设置:在
RowDataStoreWriteOperator
的open
方法中被设置。
总结来说,KafkaSinkFunction
的 invoke
逻辑是利用 KafkaSerializationSchema
将 Paimon 的 SinkRecord
序列化后,异步发送到 Kafka。其核心配置包括 Kafka 的连接信息、Topic、序列化逻辑和一致性语义。
更多推荐
所有评论(0)