Paimon 流 - 流增量关联(CDC 模式)具体实现方案
本文介绍了一种基于Paimon实现双流实时关联的技术方案。通过Debezium捕获MySQL的订单和支付表binlog,写入Kafka后同步到Paimon表(CDC模式),利用Flink的批查询和流查询分别实现全量历史关联和实时增量关联。方案支持自动处理数据变更(insert/update/delete),通过主键索引优化关联性能,并将结果持久化供Doris等下游直接查询。关键点包括:CDC模式配
说明:
核心逻辑:两个实时数据流(如订单流 + 支付流)通过 Debezium 捕获 binlog 后,分别以 CDC 模式写入 Paimon 表(Paimon 自动维护数据的 insert/update/delete 一致性),再通过 Flink 的 批查询(全量关联) 和 流查询(增量关联) 实现外键匹配,关联结果可持久化到 Paimon 关联表,供 Doris 等下游直接查询。
以下是完整实操步骤,包含表设计、数据写入、关联查询、增量同步全流程:
一、前置条件
- 环境准备:
- Flink 1.18+(推荐,适配 Paimon 最新 CDC 特性);
- Paimon 0.8+(需
paimon-flink-connector依赖); - 存储介质:HDFS/S3(生产环境)或本地文件(测试);
- Debezium + Kafka:已捕获两个表的 binlog(如订单表、支付表),分别写入 Kafka 主题(
mysql-binlog-order、mysql-binlog-pay)。
- 数据约定:
- 外键:
order_id(订单表主键 = 支付表外键); - 操作类型:Debezium 输出的
op字段(c=插入、u=更新、d=删除); - 目标:通过
order_id关联订单流与支付流,支持 “全量历史关联” 和 “实时增量关联”。
- 外键:
二、步骤 1:创建 Kafka 源表(读取 binlog 流)
首先在 Flink SQL 中定义 Kafka 源表,解析 Debezium 输出的 binlog 数据(包含 after 新值、op 操作类型):
sql
-- 1. 订单流 Kafka 源表(Debezium 捕获的 binlog 流)
CREATE TABLE kafka_order_source (
order_id BIGINT, -- 主键/外键
user_id BIGINT,
amount DECIMAL(10,2),
create_time TIMESTAMP(3),
op STRING -- Debezium 操作类型:c/u/d
) WITH (
'connector' = 'kafka',
'topic' = 'mysql-binlog-order', -- Kafka 订单主题
'properties.bootstrap.servers' = 'kafka-ip:9092',
'properties.group.id' = 'paimon-join-group',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset' -- 首次启动从头消费(全量同步)
);
-- 2. 支付流 Kafka 源表(Debezium 捕获的 binlog 流)
CREATE TABLE kafka_pay_source (
pay_id BIGINT,
order_id BIGINT, -- 外键(关联订单表 order_id)
pay_amount DECIMAL(10,2),
pay_time TIMESTAMP(3),
op STRING -- Debezium 操作类型:c/u/d
) WITH (
'connector' = 'kafka',
'topic' = 'mysql-binlog-pay', -- Kafka 支付主题
'properties.bootstrap.servers' = 'kafka-ip:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'scan.startup.mode' = 'earliest-offset'
);
三、步骤 2:创建 Paimon 表(CDC 模式存储数据流)
两个流分别写入 Paimon 表,启用 write.mode = 'cdc'(自动处理 binlog 的更新 / 删除,维护数据一致性),并设置主键(确保外键唯一匹配):
sql
-- 1. Paimon 订单表(CDC 模式,存储订单全量+增量数据)
CREATE TABLE paimon_order (
order_id BIGINT PRIMARY KEY NOT ENFORCED, -- 主键(外键关联依据)
user_id BIGINT,
amount DECIMAL(10,2),
create_time TIMESTAMP(3),
`__op` STRING METADATA FROM 'op' -- 存储操作类型(供后续过滤删除)
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://hdfs-ip:9000/paimon/db/order', -- 存储路径(HDFS/S3)
'file.format' = 'parquet', -- 高效存储格式
'write.mode' = 'cdc', -- 关键:CDC 模式,支持 update/delete
'index.enabled' = 'true', -- 开启主键索引,加速关联
'index.type' = 'hash', -- 哈希索引(适配 order_id 等值关联)
'changelog.producer' = 'input' -- 直接使用源表的 op 字段生成变更日志
);
-- 2. Paimon 支付表(CDC 模式,存储支付全量+增量数据)
CREATE TABLE paimon_pay (
pay_id BIGINT PRIMARY KEY NOT ENFORCED,
order_id BIGINT, -- 外键(关联 paimon_order.order_id)
pay_amount DECIMAL(10,2),
pay_time TIMESTAMP(3),
`__op` STRING METADATA FROM 'op' -- 存储操作类型
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://hdfs-ip:9000/paimon/db/pay',
'file.format' = 'parquet',
'write.mode' = 'cdc',
'index.enabled' = 'true',
'index.type' = 'hash',
'changelog.producer' = 'input'
);
- 关键配置说明:
write.mode = 'cdc':Paimon 会解析__op字段(映射 Debezium 的op),自动执行 insert/update/delete,无需手动处理;index.enabled = 'true':对主键order_id建立哈希索引,关联时可快速匹配,避免全表扫描;changelog.producer = 'input':直接复用源表的操作类型,减少 Paimon 解析开销。
四、步骤 3:将 Kafka 流写入 Paimon 表(全量 + 增量同步)
通过 Flink SQL 将 Kafka 中的 binlog 流同步到 Paimon 表,首次运行会全量同步历史数据,之后自动消费增量 binlog:
sql
-- 1. 订单流写入 Paimon 订单表(过滤删除事件,或保留供后续处理)
INSERT INTO paimon_order
SELECT
order_id,
user_id,
amount,
create_time,
op -- 映射到 Paimon 表的 __op 元数据字段
FROM kafka_order_source
WHERE op != 'd'; -- 可选:过滤删除事件(如需保留删除,直接去掉 WHERE 条件)
-- 2. 支付流写入 Paimon 支付表
INSERT INTO paimon_pay
SELECT
pay_id,
order_id,
pay_amount,
pay_time,
op
FROM kafka_pay_source
WHERE op != 'd';
- 执行后,Flink 会启动两个流作业:
- 全量阶段:消费 Kafka 主题从头开始的数据,同步历史全量数据到 Paimon;
- 增量阶段:全量同步完成后,自动切换为消费新产生的 binlog 增量数据,实时写入 Paimon。
五、步骤 4:流 - 流关联实现(全量关联 + 增量关联)
Paimon 支持 批查询(全量关联历史数据) 和 流查询(增量关联新数据),两种方式可单独使用,也可结合实现 “全量 + 增量” 完整关联。
方式 1:批查询(全量关联历史数据)
适合首次初始化关联结果、定期重算全量数据的场景,直接关联两个 Paimon 表的全量数据:
sql
-- 1. 创建 Paimon 关联结果表(存储全量关联数据)
CREATE TABLE paimon_order_pay_join (
order_id BIGINT,
user_id BIGINT,
order_amount DECIMAL(10,2),
create_time TIMESTAMP(3),
pay_id BIGINT,
pay_amount DECIMAL(10,2),
pay_time TIMESTAMP(3),
PRIMARY KEY (order_id, pay_id) NOT ENFORCED -- 联合主键(避免重复关联)
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://hdfs-ip:9000/paimon/db/order_pay_join',
'file.format' = 'parquet',
'write.mode' = 'overwrite' -- 全量重算时覆盖,或用 'append' 追加
);
-- 2. 全量关联:订单表 + 支付表(外键 order_id)
INSERT INTO paimon_order_pay_join
SELECT
o.order_id,
o.user_id,
o.amount AS order_amount,
o.create_time,
p.pay_id,
p.pay_amount,
p.pay_time
FROM paimon_order o
LEFT JOIN paimon_pay p
ON o.order_id = p.order_id; -- 外键关联
- 执行方式:在 Flink SQL Client 中执行该语句,会启动一个 批作业,一次性关联 Paimon 表中的全量数据,写入关联结果表。
方式 2:流查询(增量关联新数据)
适合实时关联新增 / 更新的数据,仅消费两个 Paimon 表的增量变更(CDC 日志),避免重复关联历史数据:
sql
-- 增量关联:仅处理订单表和支付表的新增/更新数据
INSERT INTO paimon_order_pay_join
SELECT
o.order_id,
o.user_id,
o.amount AS order_amount,
o.create_time,
p.pay_id,
p.pay_amount,
p.pay_time
FROM paimon_order o /*+ OPTIONS('scan.mode'='incremental') */
LEFT JOIN paimon_pay p /*+ OPTIONS('scan.mode'='incremental') */
ON o.order_id = p.order_id
-- 过滤删除事件(Paimon 增量扫描会包含删除的变更记录)
WHERE o.`__op` != 'd' AND p.`__op` != 'd';
- 关键配置:
scan.mode = 'incremental'表示 Paimon 表以 “增量模式” 扫描,仅输出自上次消费后的新增 / 更新数据; - 执行方式:启动一个 流作业,持续消费两个 Paimon 表的增量变更,实时关联后写入结果表,延迟秒级。
方式 3:全量 + 增量结合(推荐生产使用)
首次执行全量关联初始化结果表,之后通过增量关联同步新数据,确保结果表数据完整且实时:
- 先执行 “方式 1” 的全量关联,初始化历史数据;
- 再执行 “方式 2” 的增量关联,持续同步新增 / 更新数据;
- 可选:定期(如每天凌晨)重新执行全量关联,修正增量关联可能出现的不一致(如网络抖动导致的漏数据)。
六、步骤 5:关联结果消费(Doris 直接查询)
Paimon 关联结果表可直接作为 Doris 的外部表,无需额外同步,Doris 可实时查询关联后的数据:
sql
-- 在 Doris 中创建 Paimon 外部表(关联结果表)
CREATE EXTERNAL TABLE doris_order_pay_join (
order_id BIGINT,
user_id BIGINT,
order_amount DECIMAL(10,2),
create_time DATETIME,
pay_id BIGINT,
pay_amount DECIMAL(10,2),
pay_time DATETIME
) ENGINE=HUDI -- Doris 用 HUDI 引擎兼容 Paimon(或使用 Paimon 专属引擎)
PROPERTIES (
"path" = "hdfs://hdfs-ip:9000/paimon/db/order_pay_join", -- Paimon 关联结果表路径
"format" = "parquet",
"hudi.table.type" = "COPY_ON_WRITE", -- 适配 Paimon 的存储格式
"fs.defaultFS" = "hdfs://hdfs-ip:9000",
"hive.metastore.uris" = "thrift://hive-metastore-ip:9083" -- 若 Paimon 注册到 Hive 元数据
);
-- Doris 直接查询关联结果(支持实时分析)
SELECT
user_id,
COUNT(order_id) AS order_count,
SUM(order_amount) AS total_amount
FROM doris_order_pay_join
WHERE create_time >= '2025-01-01'
GROUP BY user_id;
七、关键优化与注意事项
1. 关联性能优化
- 索引优化:维表(如订单表)必须以关联字段(
order_id)作为主键,开启哈希索引,避免关联时全表扫描; - 分区优化:Paimon 表按时间字段分区(如
create_time按天分区),关联时指定分区过滤,减少扫描数据量:sql
-- Paimon 订单表添加分区配置 ALTER TABLE paimon_order SET PROPERTIES ( 'partition.column' = 'create_time', 'partition.type' = 'range', 'partition.range' = 'INTERVAL 1 DAY' -- 按天分区 ); - 并行度配置:Flink 作业并行度 = Paimon 表分区数 = Kafka 分区数,避免数据倾斜(如并行度设为 10,对应 Kafka 10 个分区)。
2. 数据一致性保障
- 操作类型过滤:关联时过滤
__op = 'd'的删除事件,避免关联结果包含已删除数据; - 联合主键:关联结果表用
(order_id, pay_id)作为联合主键,避免同一订单 + 支付记录重复写入; - Checkpoint 配置:Flink 作业开启 Checkpoint(如 1 分钟一次),故障恢复后可从断点继续消费,避免数据丢失。
3. 处理延迟与乱序
- 时间窗口约束:若两个流存在延迟(如订单创建后 30 分钟内支付),可在关联时添加时间窗口过滤,避免无效关联:
sql
WHERE p.pay_time BETWEEN o.create_time AND o.create_time + INTERVAL '30' MINUTE - 乱序处理:Paimon 支持设置
write.buffer.delay(如 5 秒),缓冲乱序到达的数据后再写入文件,提升关联准确性。
4. 存储优化
- 文件格式:优先使用 Parquet(压缩比高、查询快),避免 CSV;
- 过期数据清理:设置 Paimon 表的 TTL(如 30 天),自动清理过期的历史数据,减少存储开销:
sql
ALTER TABLE paimon_order SET PROPERTIES ("ttl" = "2592000"); -- 30天(秒)
八、总结
Paimon 流 - 流增量关联的核心是 “CDC 模式存储数据流 + 批 / 流结合关联”,相比 Flink 纯流关联的优势:
- 无状态膨胀:关联数据持久化到 Paimon,无需依赖 Flink 状态缓存;
- 支持回溯:可随时重新执行全量关联,修正数据不一致;
- 多下游复用:关联结果表可供 Doris、Spark 等多个下游查询,无需重复关联;
- 简化架构:Doris 直接查询 Paimon 外部表,避免额外同步步骤。
该方案适合需要 “实时增量关联 + 历史全量回溯” 的场景(如电商订单与支付数据关联、物流数据与订单数据关联),是工业界成熟的流批一体关联方案。
更多推荐


所有评论(0)