说明

核心结论:Hudi 完全能实现上述 “两个 CDC 流写入存储后,通过批 / 流查询实现增量关联” 的操作,但实现逻辑、核心特性与 Paimon 有差异 ——Hudi 以 “增量拉取(Incremental Pull)” 为核心机制,依赖其 CDC 表类型 和 增量查询能力 完成关联,同样支持 “全量初始化 + 增量同步”,且能与 Flink、Doris 无缝集成。

以下是 Hudi 实现该需求的完整方案,对照 Paimon 逻辑逐一拆解,方便你对比选择:

一、Hudi 与 Paimon 核心特性对比(关联场景视角)

先明确两者在流 - 流增量关联场景的差异,避免踩坑:

特性维度 Hudi(0.14+) Paimon(0.8+)
CDC 模式支持 支持(table.type=MERGE_ON_READ/COPY_ON_WRITE + hoodie.datasource.write.operation=UPSERT 支持(write.mode=cdc 原生适配 binlog)
增量查询机制 基于 commit time 拉取增量数据(Incremental Pull) 基于 scan.mode=incremental 扫描增量变更
关联性能 依赖索引(Bloom Filter/Global Index),等值关联高效 哈希索引优化更彻底,关联速度略优
全量 + 增量结合 支持(全量查询初始化,增量查询同步新数据) 支持(批查询全量,流查询增量)
Doris 集成 原生支持(Doris 提供 Hudi 外部表引擎) 需通过 HUDI 兼容引擎或专属连接器
状态管理 无 Flink 状态膨胀(数据持久化到 Hudi) 无 Flink 状态膨胀(数据持久化到 Paimon)

关键适配点:Hudi 需通过 hoodie.datasource.write.recordkey.field 定义主键(对应外键关联字段),通过 hoodie.datasource.write.operation=UPSERT 处理 binlog 的更新 / 删除,完全匹配 Paimon 的 CDC 模式能力。

二、Hudi 实现流 - 流增量关联完整步骤

沿用之前的业务场景:

  • 数据源:MySQL 订单表 + 支付表 binlog 流(Debezium→Kafka);
  • 外键:order_id(订单表主键 = 支付表外键);
  • 目标:全量关联历史数据,实时增量关联新数据,结果供 Doris 查询。

前置条件

  • Flink 1.17+(Hudi 0.14+ 适配 Flink 1.18);
  • Hudi 0.14+(需 hudi-flink-connector 依赖);
  • 存储介质:HDFS/S3(生产)、本地文件(测试);
  • Debezium+Kafka 已就绪(Kafka 主题:mysql-binlog-ordermysql-binlog-pay)。

步骤 1:创建 Kafka 源表(与 Paimon 方案完全一致)

解析 Debezium 输出的 binlog 数据,无需修改:

sql

-- 订单流 Kafka 源表
CREATE TABLE kafka_order_source (
  order_id BIGINT,
  user_id BIGINT,
  amount DECIMAL(10,2),
  create_time TIMESTAMP(3),
  op STRING  -- Debezium 操作类型:c/u/d
) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql-binlog-order',
  'properties.bootstrap.servers' = 'kafka-ip:9092',
  'properties.group.id' = 'hudi-join-group',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'scan.startup.mode' = 'earliest-offset'
);

-- 支付流 Kafka 源表
CREATE TABLE kafka_pay_source (
  pay_id BIGINT,
  order_id BIGINT,
  pay_amount DECIMAL(10,2),
  pay_time TIMESTAMP(3),
  op STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql-binlog-pay',
  'properties.bootstrap.servers' = 'kafka-ip:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset'
);

步骤 2:创建 Hudi 表(CDC 模式存储数据流)

Hudi 无专门的 “CDC 模式”,但通过 UPSERT 操作 + 主键配置,可完美适配 binlog 的 insert/update/delete,等价于 Paimon 的 write.mode=cdc

sql

-- 1. Hudi 订单表(存储订单全量+增量数据,适配 CDC)
CREATE TABLE hudi_order (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,  -- 主键(外键关联依据)
  user_id BIGINT,
  amount DECIMAL(10,2),
  create_time TIMESTAMP(3),
  op STRING  -- 保留操作类型,供过滤删除
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hdfs-ip:9000/hudi/db/order',  -- 存储路径
  'table.type' = 'MERGE_ON_READ',  -- 推荐:读时合并(实时性+存储效率平衡)
  -- 若需更高查询性能,用 COPY_ON_WRITE(写时合并)
  'hoodie.datasource.write.recordkey.field' = 'order_id',  -- 主键字段(必选)
  'hoodie.datasource.write.partitionpath.field' = 'create_time',  -- 分区字段(按时间分区)
  'hoodie.datasource.write.partitionpath.urlencode' = 'false',
  'hoodie.datasource.write.operation' = 'UPSERT',  -- 核心:支持更新/插入(适配 binlog)
  'hoodie.datasource.write.precombine.field' = 'create_time',  -- 预合并字段(处理同主键乱序)
  'hoodie.index.type' = 'BLOOM',  -- 布隆索引(等值关联高效)
  'hoodie.bloom.index.fields' = 'order_id',  -- 索引字段(外键关联字段)
  'hoodie.datasource.write.hive_style_partitioning' = 'true',  -- Hive 风格分区(方便 Doris 读取)
  'hoodie.table.name' = 'order_table'  -- Hudi 表名(元数据标识)
);

-- 2. Hudi 支付表(与订单表配置一致)
CREATE TABLE hudi_pay (
  pay_id BIGINT PRIMARY KEY NOT ENFORCED,
  order_id BIGINT,  -- 外键(关联 order_id)
  pay_amount DECIMAL(10,2),
  pay_time TIMESTAMP(3),
  op STRING
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hdfs-ip:9000/hudi/db/pay',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'pay_id',
  'hoodie.datasource.write.partitionpath.field' = 'pay_time',
  'hoodie.datasource.write.operation' = 'UPSERT',
  'hoodie.datasource.write.precombine.field' = 'pay_time',
  'hoodie.index.type' = 'BLOOM',
  'hoodie.bloom.index.fields' = 'order_id',  -- 索引外键字段,加速关联
  'hoodie.table.name' = 'pay_table'
);
  • 关键配置说明:
    • table.type=MERGE_ON_READ:Hudi 核心表类型,写入时追加日志文件,查询时合并,兼顾实时性和写入性能(适合增量流场景);
    • hoodie.datasource.write.operation=UPSERT:自动根据主键匹配,存在则更新,不存在则插入,删除操作需通过 op='d' 手动处理(后续步骤说明);
    • BLOOM 索引:对 order_id 建立索引,关联时快速过滤不匹配的数据,避免全表扫描。

步骤 3:将 Kafka 流写入 Hudi 表(全量 + 增量同步)

Hudi 不自动解析 op 字段,需手动处理删除事件(通过 DELETE 操作),插入 / 更新通过 UPSERT 自动处理:

sql

-- 1. 订单流写入 Hudi 表(处理 insert/update/delete)
INSERT INTO hudi_order
SELECT
  order_id,
  user_id,
  amount,
  create_time,
  op
FROM kafka_order_source;

-- 2. 支付流写入 Hudi 表
INSERT INTO hudi_pay
SELECT
  pay_id,
  order_id,
  pay_amount,
  pay_time,
  op
FROM kafka_pay_source;

-- 关键:处理删除事件(Hudi 需显式执行 DELETE 操作)
-- 订单表删除事件处理(单独启动一个流作业)
INSERT INTO hudi_order /*+ OPTIONS('hoodie.datasource.write.operation'='DELETE') */
SELECT
  order_id,
  user_id,
  amount,
  create_time,
  op
FROM kafka_order_source
WHERE op = 'd';

-- 支付表删除事件处理
INSERT INTO hudi_pay /*+ OPTIONS('hoodie.datasource.write.operation'='DELETE') */
SELECT
  pay_id,
  order_id,
  pay_amount,
  pay_time,
  op
FROM kafka_pay_source
WHERE op = 'd';
  • 执行后效果:
    • 全量阶段:Flink 作业从头消费 Kafka 数据,将历史全量数据写入 Hudi;
    • 增量阶段:实时消费新 binlog 数据,op='c'/'u' 触发 UPSERT,op='d' 触发 DELETE,Hudi 自动维护数据一致性。

步骤 4:流 - 流关联实现(全量关联 + 增量关联)

Hudi 核心通过 read.start-commit(增量查询起始版本)实现 “全量初始化 + 增量同步”,关联逻辑与 Paimon 一致,但语法有差异:

方式 1:全量关联(初始化历史数据)

首次执行全量关联,读取两个 Hudi 表的所有历史数据,写入关联结果表:

sql

-- 1. 创建 Hudi 关联结果表(存储全量+增量关联数据)
CREATE TABLE hudi_order_pay_join (
  order_id BIGINT,
  user_id BIGINT,
  order_amount DECIMAL(10,2),
  create_time TIMESTAMP(3),
  pay_id BIGINT,
  pay_amount DECIMAL(10,2),
  pay_time TIMESTAMP(3),
  PRIMARY KEY (order_id, pay_id) NOT ENFORCED  -- 联合主键,避免重复
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hdfs-ip:9000/hudi/db/order_pay_join',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'order_id,pay_id',  -- 联合主键
  'hoodie.datasource.write.partitionpath.field' = 'create_time',
  'hoodie.datasource.write.operation' = 'UPSERT',
  'hoodie.datasource.write.precombine.field' = 'create_time',
  'hoodie.table.name' = 'order_pay_join_table'
);

-- 2. 全量关联:订单表 + 支付表(外键 order_id)
INSERT INTO hudi_order_pay_join
SELECT
  o.order_id,
  o.user_id,
  o.amount AS order_amount,
  o.create_time,
  p.pay_id,
  p.pay_amount,
  p.pay_time
FROM hudi_order o
LEFT JOIN hudi_pay p
ON o.order_id = p.order_id
WHERE o.op != 'd' AND p.op != 'd';  -- 过滤删除数据
  • 执行方式:Flink 批作业(或流作业读全量),一次性关联所有历史数据。
方式 2:增量关联(实时同步新数据)

Hudi 增量查询需指定 read.start-commit(起始提交版本),仅读取该版本后的增量变更数据,实现实时关联:

sql

-- 1. 先获取 Hudi 表的最新提交版本(用于首次启动时指定起始版本)
-- 在 Flink SQL Client 执行(或通过 Hudi API 查询):
DESCRIBE EXTENDED hudi_order;  -- 查看表元数据,获取最新 commit time(如 20250101120000)

-- 2. 增量关联:仅读取两个表的新增/更新数据
INSERT INTO hudi_order_pay_join
SELECT
  o.order_id,
  o.user_id,
  o.amount AS order_amount,
  o.create_time,
  p.pay_id,
  p.pay_amount,
  p.pay_time
FROM hudi_order /*+ OPTIONS('read.start-commit'='20250101120000', 'read.streaming.enabled'='true') */ o
LEFT JOIN hudi_pay /*+ OPTIONS('read.start-commit'='20250101120000', 'read.streaming.enabled'='true') */ p
ON o.order_id = p.order_id
WHERE o.op != 'd' AND p.op != 'd';
  • 关键配置:
    • read.streaming.enabled='true':启用 Hudi 流读取模式(增量查询);
    • read.start-commit:指定增量查询的起始版本(首次用全量关联后的最新版本,后续作业自动续接);
    • 执行方式:Flink 流作业,持续读取两个 Hudi 表的增量变更,实时关联后写入结果表(延迟秒级)。
方式 3:全量 + 增量结合(生产推荐)
  1. 执行 “方式 1” 全量关联,初始化历史数据;
  2. 执行 “方式 2” 增量关联,指定全量关联后的最新 commit time 作为起始版本,持续同步新数据;
  3. 定期(如每天凌晨)重新执行全量关联,修正增量关联可能的不一致(Hudi 支持 UPSERT 覆盖旧数据)。

步骤 5:Doris 直接查询 Hudi 关联结果(原生支持)

Doris 提供原生 Hudi 外部表引擎,无需兼容适配,直接查询关联结果:

sql

-- 在 Doris 中创建 Hudi 外部表(关联结果表)
CREATE EXTERNAL TABLE doris_order_pay_join (
  order_id BIGINT,
  user_id BIGINT,
  order_amount DECIMAL(10,2),
  create_time DATETIME,
  pay_id BIGINT,
  pay_amount DECIMAL(10,2),
  pay_time DATETIME
) ENGINE=HUDI
PROPERTIES (
  "path" = "hdfs://hdfs-ip:9000/hudi/db/order_pay_join",  -- Hudi 关联结果表路径
  "hudi.table.type" = "MERGE_ON_READ",  -- 与 Hudi 表类型一致
  "fs.defaultFS" = "hdfs://hdfs-ip:9000",
  "hive.metastore.uris" = "thrift://hive-metastore-ip:9083",  -- 若 Hudi 注册到 Hive 元数据
  "partition_filter_expr" = "create_time >= '2025-01-01'"  -- 可选:默认过滤分区
);

-- Doris 实时查询关联结果
SELECT
  user_id,
  COUNT(order_id) AS order_count,
  SUM(order_amount) AS total_amount
FROM doris_order_pay_join
GROUP BY user_id;

三、Hudi 方案关键优化与注意事项

1. 关联性能优化

  • 索引优化:对关联字段(order_id)启用 GLOBAL_BLOOM 索引(而非默认的 BLOOM),跨分区关联更快:

    sql

    ALTER TABLE hudi_order SET TBLPROPERTIES ('hoodie.index.type' = 'GLOBAL_BLOOM');
    
  • 分区裁剪:关联时显式指定分区范围(如按天分区),减少扫描数据量:

    sql

    WHERE o.create_time BETWEEN '2025-01-01' AND '2025-01-02' 
      AND p.pay_time BETWEEN '2025-01-01' AND '2025-01-02'
    
  • 并行度配置:Flink 作业并行度 = Hudi 表分区数 = Kafka 分区数(如 10),避免数据倾斜。

2. 数据一致性保障

  • 删除事件处理:必须单独启动删除流作业(operation=DELETE),否则 Hudi 无法处理 binlog 的删除事件;
  • 预合并字段:precombine.field 需设置为时间字段(如 create_time),避免同主键乱序数据导致的关联不一致;
  • Checkpoint 配置:开启 Flink Checkpoint(1 分钟一次),故障恢复后从断点续跑,避免数据丢失。

3. 实时性优化

  • 表类型选择:若需低延迟(秒级),用 MERGE_ON_READ;若需查询性能优先(毫秒级),用 COPY_ON_WRITE(写入性能略差);
  • 日志合并触发:MERGE_ON_READ 表可设置 hoodie.compact.inline = true,写入时自动触发小文件合并,提升查询速度。

4. 存储优化

  • 过期数据清理:设置 Hudi 表的 TTL,自动清理过期数据:

    sql

    ALTER TABLE hudi_order SET TBLPROPERTIES ('hoodie.cleaner.policy' = 'KEEP_LATEST_FILE_VERSIONS', 'hoodie.cleaner.fileversions.retained' = '30');  -- 保留30个版本(约30天)
    
  • 压缩格式:默认使用 Snappy 压缩(Parquet 格式),无需额外配置,平衡压缩比和查询速度。

四、Hudi vs Paimon 方案选择建议

场景特征 推荐方案 原因
追求极简配置,原生适配 CDC Paimon write.mode=cdc 直接解析 binlog,无需手动处理删除
已有 Hadoop 生态(HDFS+Hive) Hudi 与 Hadoop 生态兼容性更好,元数据可注册到 Hive
关联性能要求极高,需哈希索引 Paimon 哈希索引比 Hudi 布隆索引等值关联更快
需多引擎兼容(Spark/Flink/Trino) Hudi 多引擎支持更成熟,社区生态更丰富
Doris 集成优先级最高 两者均可(Hudi 更原生) Doris 对 Hudi 外部表支持更完善,无需兼容配置

五、总结

Hudi 完全能实现 “两个 CDC 流写入存储后,批 / 流结合的增量关联”,核心依赖其 UPSERT 操作增量查询能力 和 MERGE_ON_READ 表类型,与 Paimon 方案功能对等,仅在 CDC 适配、索引类型、配置细节上有差异。

若你已在使用 Hadoop 生态(HDFS/Hive),或需要多引擎兼容,优先选择 Hudi;若追求极简配置、更优的关联性能,可选择 Paimon。两种方案最终都能实现 “全量初始化 + 增量同步 + Doris 直接查询” 的闭环,满足流 - 流增量关联需求。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐