Hudi 能否实现 Paimon 同款流 - 流增量关联?结论 + 实操方案
Hudi能够实现CDC流数据的增量关联查询,其核心机制基于增量拉取(IncrementalPull)和CDC表类型。与Paimon相比,Hudi通过UPSERT操作和MERGE_ON_READ表类型实现binlog的更新/删除处理,支持全量初始化+增量同步模式,并能与Flink、Doris无缝集成。Hudi依赖BloomFilter/GlobalIndex进行高效关联,但性能略逊于Paimon的哈
说明
核心结论:Hudi 完全能实现上述 “两个 CDC 流写入存储后,通过批 / 流查询实现增量关联” 的操作,但实现逻辑、核心特性与 Paimon 有差异 ——Hudi 以 “增量拉取(Incremental Pull)” 为核心机制,依赖其 CDC 表类型 和 增量查询能力 完成关联,同样支持 “全量初始化 + 增量同步”,且能与 Flink、Doris 无缝集成。
以下是 Hudi 实现该需求的完整方案,对照 Paimon 逻辑逐一拆解,方便你对比选择:
一、Hudi 与 Paimon 核心特性对比(关联场景视角)
先明确两者在流 - 流增量关联场景的差异,避免踩坑:
| 特性维度 | Hudi(0.14+) | Paimon(0.8+) |
|---|---|---|
| CDC 模式支持 | 支持(table.type=MERGE_ON_READ/COPY_ON_WRITE + hoodie.datasource.write.operation=UPSERT) |
支持(write.mode=cdc 原生适配 binlog) |
| 增量查询机制 | 基于 commit time 拉取增量数据(Incremental Pull) |
基于 scan.mode=incremental 扫描增量变更 |
| 关联性能 | 依赖索引(Bloom Filter/Global Index),等值关联高效 | 哈希索引优化更彻底,关联速度略优 |
| 全量 + 增量结合 | 支持(全量查询初始化,增量查询同步新数据) | 支持(批查询全量,流查询增量) |
| Doris 集成 | 原生支持(Doris 提供 Hudi 外部表引擎) | 需通过 HUDI 兼容引擎或专属连接器 |
| 状态管理 | 无 Flink 状态膨胀(数据持久化到 Hudi) | 无 Flink 状态膨胀(数据持久化到 Paimon) |
关键适配点:Hudi 需通过 hoodie.datasource.write.recordkey.field 定义主键(对应外键关联字段),通过 hoodie.datasource.write.operation=UPSERT 处理 binlog 的更新 / 删除,完全匹配 Paimon 的 CDC 模式能力。
二、Hudi 实现流 - 流增量关联完整步骤
沿用之前的业务场景:
- 数据源:MySQL 订单表 + 支付表 binlog 流(Debezium→Kafka);
- 外键:
order_id(订单表主键 = 支付表外键); - 目标:全量关联历史数据,实时增量关联新数据,结果供 Doris 查询。
前置条件
- Flink 1.17+(Hudi 0.14+ 适配 Flink 1.18);
- Hudi 0.14+(需
hudi-flink-connector依赖); - 存储介质:HDFS/S3(生产)、本地文件(测试);
- Debezium+Kafka 已就绪(Kafka 主题:
mysql-binlog-order、mysql-binlog-pay)。
步骤 1:创建 Kafka 源表(与 Paimon 方案完全一致)
解析 Debezium 输出的 binlog 数据,无需修改:
sql
-- 订单流 Kafka 源表
CREATE TABLE kafka_order_source (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
create_time TIMESTAMP(3),
op STRING -- Debezium 操作类型:c/u/d
) WITH (
'connector' = 'kafka',
'topic' = 'mysql-binlog-order',
'properties.bootstrap.servers' = 'kafka-ip:9092',
'properties.group.id' = 'hudi-join-group',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'scan.startup.mode' = 'earliest-offset'
);
-- 支付流 Kafka 源表
CREATE TABLE kafka_pay_source (
pay_id BIGINT,
order_id BIGINT,
pay_amount DECIMAL(10,2),
pay_time TIMESTAMP(3),
op STRING
) WITH (
'connector' = 'kafka',
'topic' = 'mysql-binlog-pay',
'properties.bootstrap.servers' = 'kafka-ip:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
步骤 2:创建 Hudi 表(CDC 模式存储数据流)
Hudi 无专门的 “CDC 模式”,但通过 UPSERT 操作 + 主键配置,可完美适配 binlog 的 insert/update/delete,等价于 Paimon 的 write.mode=cdc:
sql
-- 1. Hudi 订单表(存储订单全量+增量数据,适配 CDC)
CREATE TABLE hudi_order (
order_id BIGINT PRIMARY KEY NOT ENFORCED, -- 主键(外键关联依据)
user_id BIGINT,
amount DECIMAL(10,2),
create_time TIMESTAMP(3),
op STRING -- 保留操作类型,供过滤删除
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hdfs-ip:9000/hudi/db/order', -- 存储路径
'table.type' = 'MERGE_ON_READ', -- 推荐:读时合并(实时性+存储效率平衡)
-- 若需更高查询性能,用 COPY_ON_WRITE(写时合并)
'hoodie.datasource.write.recordkey.field' = 'order_id', -- 主键字段(必选)
'hoodie.datasource.write.partitionpath.field' = 'create_time', -- 分区字段(按时间分区)
'hoodie.datasource.write.partitionpath.urlencode' = 'false',
'hoodie.datasource.write.operation' = 'UPSERT', -- 核心:支持更新/插入(适配 binlog)
'hoodie.datasource.write.precombine.field' = 'create_time', -- 预合并字段(处理同主键乱序)
'hoodie.index.type' = 'BLOOM', -- 布隆索引(等值关联高效)
'hoodie.bloom.index.fields' = 'order_id', -- 索引字段(外键关联字段)
'hoodie.datasource.write.hive_style_partitioning' = 'true', -- Hive 风格分区(方便 Doris 读取)
'hoodie.table.name' = 'order_table' -- Hudi 表名(元数据标识)
);
-- 2. Hudi 支付表(与订单表配置一致)
CREATE TABLE hudi_pay (
pay_id BIGINT PRIMARY KEY NOT ENFORCED,
order_id BIGINT, -- 外键(关联 order_id)
pay_amount DECIMAL(10,2),
pay_time TIMESTAMP(3),
op STRING
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hdfs-ip:9000/hudi/db/pay',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'pay_id',
'hoodie.datasource.write.partitionpath.field' = 'pay_time',
'hoodie.datasource.write.operation' = 'UPSERT',
'hoodie.datasource.write.precombine.field' = 'pay_time',
'hoodie.index.type' = 'BLOOM',
'hoodie.bloom.index.fields' = 'order_id', -- 索引外键字段,加速关联
'hoodie.table.name' = 'pay_table'
);
- 关键配置说明:
table.type=MERGE_ON_READ:Hudi 核心表类型,写入时追加日志文件,查询时合并,兼顾实时性和写入性能(适合增量流场景);hoodie.datasource.write.operation=UPSERT:自动根据主键匹配,存在则更新,不存在则插入,删除操作需通过op='d'手动处理(后续步骤说明);BLOOM 索引:对order_id建立索引,关联时快速过滤不匹配的数据,避免全表扫描。
步骤 3:将 Kafka 流写入 Hudi 表(全量 + 增量同步)
Hudi 不自动解析 op 字段,需手动处理删除事件(通过 DELETE 操作),插入 / 更新通过 UPSERT 自动处理:
sql
-- 1. 订单流写入 Hudi 表(处理 insert/update/delete)
INSERT INTO hudi_order
SELECT
order_id,
user_id,
amount,
create_time,
op
FROM kafka_order_source;
-- 2. 支付流写入 Hudi 表
INSERT INTO hudi_pay
SELECT
pay_id,
order_id,
pay_amount,
pay_time,
op
FROM kafka_pay_source;
-- 关键:处理删除事件(Hudi 需显式执行 DELETE 操作)
-- 订单表删除事件处理(单独启动一个流作业)
INSERT INTO hudi_order /*+ OPTIONS('hoodie.datasource.write.operation'='DELETE') */
SELECT
order_id,
user_id,
amount,
create_time,
op
FROM kafka_order_source
WHERE op = 'd';
-- 支付表删除事件处理
INSERT INTO hudi_pay /*+ OPTIONS('hoodie.datasource.write.operation'='DELETE') */
SELECT
pay_id,
order_id,
pay_amount,
pay_time,
op
FROM kafka_pay_source
WHERE op = 'd';
- 执行后效果:
- 全量阶段:Flink 作业从头消费 Kafka 数据,将历史全量数据写入 Hudi;
- 增量阶段:实时消费新 binlog 数据,
op='c'/'u'触发 UPSERT,op='d'触发 DELETE,Hudi 自动维护数据一致性。
步骤 4:流 - 流关联实现(全量关联 + 增量关联)
Hudi 核心通过 read.start-commit(增量查询起始版本)实现 “全量初始化 + 增量同步”,关联逻辑与 Paimon 一致,但语法有差异:
方式 1:全量关联(初始化历史数据)
首次执行全量关联,读取两个 Hudi 表的所有历史数据,写入关联结果表:
sql
-- 1. 创建 Hudi 关联结果表(存储全量+增量关联数据)
CREATE TABLE hudi_order_pay_join (
order_id BIGINT,
user_id BIGINT,
order_amount DECIMAL(10,2),
create_time TIMESTAMP(3),
pay_id BIGINT,
pay_amount DECIMAL(10,2),
pay_time TIMESTAMP(3),
PRIMARY KEY (order_id, pay_id) NOT ENFORCED -- 联合主键,避免重复
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hdfs-ip:9000/hudi/db/order_pay_join',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'order_id,pay_id', -- 联合主键
'hoodie.datasource.write.partitionpath.field' = 'create_time',
'hoodie.datasource.write.operation' = 'UPSERT',
'hoodie.datasource.write.precombine.field' = 'create_time',
'hoodie.table.name' = 'order_pay_join_table'
);
-- 2. 全量关联:订单表 + 支付表(外键 order_id)
INSERT INTO hudi_order_pay_join
SELECT
o.order_id,
o.user_id,
o.amount AS order_amount,
o.create_time,
p.pay_id,
p.pay_amount,
p.pay_time
FROM hudi_order o
LEFT JOIN hudi_pay p
ON o.order_id = p.order_id
WHERE o.op != 'd' AND p.op != 'd'; -- 过滤删除数据
- 执行方式:Flink 批作业(或流作业读全量),一次性关联所有历史数据。
方式 2:增量关联(实时同步新数据)
Hudi 增量查询需指定 read.start-commit(起始提交版本),仅读取该版本后的增量变更数据,实现实时关联:
sql
-- 1. 先获取 Hudi 表的最新提交版本(用于首次启动时指定起始版本)
-- 在 Flink SQL Client 执行(或通过 Hudi API 查询):
DESCRIBE EXTENDED hudi_order; -- 查看表元数据,获取最新 commit time(如 20250101120000)
-- 2. 增量关联:仅读取两个表的新增/更新数据
INSERT INTO hudi_order_pay_join
SELECT
o.order_id,
o.user_id,
o.amount AS order_amount,
o.create_time,
p.pay_id,
p.pay_amount,
p.pay_time
FROM hudi_order /*+ OPTIONS('read.start-commit'='20250101120000', 'read.streaming.enabled'='true') */ o
LEFT JOIN hudi_pay /*+ OPTIONS('read.start-commit'='20250101120000', 'read.streaming.enabled'='true') */ p
ON o.order_id = p.order_id
WHERE o.op != 'd' AND p.op != 'd';
- 关键配置:
read.streaming.enabled='true':启用 Hudi 流读取模式(增量查询);read.start-commit:指定增量查询的起始版本(首次用全量关联后的最新版本,后续作业自动续接);- 执行方式:Flink 流作业,持续读取两个 Hudi 表的增量变更,实时关联后写入结果表(延迟秒级)。
方式 3:全量 + 增量结合(生产推荐)
- 执行 “方式 1” 全量关联,初始化历史数据;
- 执行 “方式 2” 增量关联,指定全量关联后的最新
commit time作为起始版本,持续同步新数据; - 定期(如每天凌晨)重新执行全量关联,修正增量关联可能的不一致(Hudi 支持
UPSERT覆盖旧数据)。
步骤 5:Doris 直接查询 Hudi 关联结果(原生支持)
Doris 提供原生 Hudi 外部表引擎,无需兼容适配,直接查询关联结果:
sql
-- 在 Doris 中创建 Hudi 外部表(关联结果表)
CREATE EXTERNAL TABLE doris_order_pay_join (
order_id BIGINT,
user_id BIGINT,
order_amount DECIMAL(10,2),
create_time DATETIME,
pay_id BIGINT,
pay_amount DECIMAL(10,2),
pay_time DATETIME
) ENGINE=HUDI
PROPERTIES (
"path" = "hdfs://hdfs-ip:9000/hudi/db/order_pay_join", -- Hudi 关联结果表路径
"hudi.table.type" = "MERGE_ON_READ", -- 与 Hudi 表类型一致
"fs.defaultFS" = "hdfs://hdfs-ip:9000",
"hive.metastore.uris" = "thrift://hive-metastore-ip:9083", -- 若 Hudi 注册到 Hive 元数据
"partition_filter_expr" = "create_time >= '2025-01-01'" -- 可选:默认过滤分区
);
-- Doris 实时查询关联结果
SELECT
user_id,
COUNT(order_id) AS order_count,
SUM(order_amount) AS total_amount
FROM doris_order_pay_join
GROUP BY user_id;
三、Hudi 方案关键优化与注意事项
1. 关联性能优化
- 索引优化:对关联字段(
order_id)启用GLOBAL_BLOOM索引(而非默认的BLOOM),跨分区关联更快:sql
ALTER TABLE hudi_order SET TBLPROPERTIES ('hoodie.index.type' = 'GLOBAL_BLOOM'); - 分区裁剪:关联时显式指定分区范围(如按天分区),减少扫描数据量:
sql
WHERE o.create_time BETWEEN '2025-01-01' AND '2025-01-02' AND p.pay_time BETWEEN '2025-01-01' AND '2025-01-02' - 并行度配置:Flink 作业并行度 = Hudi 表分区数 = Kafka 分区数(如 10),避免数据倾斜。
2. 数据一致性保障
- 删除事件处理:必须单独启动删除流作业(
operation=DELETE),否则 Hudi 无法处理 binlog 的删除事件; - 预合并字段:
precombine.field需设置为时间字段(如create_time),避免同主键乱序数据导致的关联不一致; - Checkpoint 配置:开启 Flink Checkpoint(1 分钟一次),故障恢复后从断点续跑,避免数据丢失。
3. 实时性优化
- 表类型选择:若需低延迟(秒级),用
MERGE_ON_READ;若需查询性能优先(毫秒级),用COPY_ON_WRITE(写入性能略差); - 日志合并触发:
MERGE_ON_READ表可设置hoodie.compact.inline=true,写入时自动触发小文件合并,提升查询速度。
4. 存储优化
- 过期数据清理:设置 Hudi 表的 TTL,自动清理过期数据:
sql
ALTER TABLE hudi_order SET TBLPROPERTIES ('hoodie.cleaner.policy' = 'KEEP_LATEST_FILE_VERSIONS', 'hoodie.cleaner.fileversions.retained' = '30'); -- 保留30个版本(约30天) - 压缩格式:默认使用 Snappy 压缩(Parquet 格式),无需额外配置,平衡压缩比和查询速度。
四、Hudi vs Paimon 方案选择建议
| 场景特征 | 推荐方案 | 原因 |
|---|---|---|
| 追求极简配置,原生适配 CDC | Paimon | write.mode=cdc 直接解析 binlog,无需手动处理删除 |
| 已有 Hadoop 生态(HDFS+Hive) | Hudi | 与 Hadoop 生态兼容性更好,元数据可注册到 Hive |
| 关联性能要求极高,需哈希索引 | Paimon | 哈希索引比 Hudi 布隆索引等值关联更快 |
| 需多引擎兼容(Spark/Flink/Trino) | Hudi | 多引擎支持更成熟,社区生态更丰富 |
| Doris 集成优先级最高 | 两者均可(Hudi 更原生) | Doris 对 Hudi 外部表支持更完善,无需兼容配置 |
五、总结
Hudi 完全能实现 “两个 CDC 流写入存储后,批 / 流结合的增量关联”,核心依赖其 UPSERT 操作、增量查询能力 和 MERGE_ON_READ 表类型,与 Paimon 方案功能对等,仅在 CDC 适配、索引类型、配置细节上有差异。
若你已在使用 Hadoop 生态(HDFS/Hive),或需要多引擎兼容,优先选择 Hudi;若追求极简配置、更优的关联性能,可选择 Paimon。两种方案最终都能实现 “全量初始化 + 增量同步 + Doris 直接查询” 的闭环,满足流 - 流增量关联需求。
更多推荐


所有评论(0)