说明:

核心逻辑:两个实时数据流(如订单流 + 支付流)通过 Debezium 捕获 binlog 后,分别以 CDC 模式写入 Paimon 表(Paimon 自动维护数据的 insert/update/delete 一致性),再通过 Flink 的 批查询(全量关联) 和 流查询(增量关联) 实现外键匹配,关联结果可持久化到 Paimon 关联表,供 Doris 等下游直接查询。

以下是完整实操步骤,包含表设计、数据写入、关联查询、增量同步全流程:

一、前置条件

  1. 环境准备:
    • Flink 1.18+(推荐,适配 Paimon 最新 CDC 特性);
    • Paimon 0.8+(需 paimon-flink-connector 依赖);
    • 存储介质:HDFS/S3(生产环境)或本地文件(测试);
    • Debezium + Kafka:已捕获两个表的 binlog(如订单表、支付表),分别写入 Kafka 主题(mysql-binlog-ordermysql-binlog-pay)。
  2. 数据约定:
    • 外键:order_id(订单表主键 = 支付表外键);
    • 操作类型:Debezium 输出的 op 字段(c=插入u=更新d=删除);
    • 目标:通过 order_id 关联订单流与支付流,支持 “全量历史关联” 和 “实时增量关联”。

二、步骤 1:创建 Kafka 源表(读取 binlog 流)

首先在 Flink SQL 中定义 Kafka 源表,解析 Debezium 输出的 binlog 数据(包含 after 新值、op 操作类型):

sql

-- 1. 订单流 Kafka 源表(Debezium 捕获的 binlog 流)
CREATE TABLE kafka_order_source (
  order_id BIGINT,        -- 主键/外键
  user_id BIGINT,
  amount DECIMAL(10,2),
  create_time TIMESTAMP(3),
  op STRING               -- Debezium 操作类型:c/u/d
) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql-binlog-order',  -- Kafka 订单主题
  'properties.bootstrap.servers' = 'kafka-ip:9092',
  'properties.group.id' = 'paimon-join-group',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'scan.startup.mode' = 'earliest-offset'  -- 首次启动从头消费(全量同步)
);

-- 2. 支付流 Kafka 源表(Debezium 捕获的 binlog 流)
CREATE TABLE kafka_pay_source (
  pay_id BIGINT,
  order_id BIGINT,        -- 外键(关联订单表 order_id)
  pay_amount DECIMAL(10,2),
  pay_time TIMESTAMP(3),
  op STRING               -- Debezium 操作类型:c/u/d
) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql-binlog-pay',  -- Kafka 支付主题
  'properties.bootstrap.servers' = 'kafka-ip:9092',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'scan.startup.mode' = 'earliest-offset'
);

三、步骤 2:创建 Paimon 表(CDC 模式存储数据流)

两个流分别写入 Paimon 表,启用 write.mode = 'cdc'(自动处理 binlog 的更新 / 删除,维护数据一致性),并设置主键(确保外键唯一匹配):

sql

-- 1. Paimon 订单表(CDC 模式,存储订单全量+增量数据)
CREATE TABLE paimon_order (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,  -- 主键(外键关联依据)
  user_id BIGINT,
  amount DECIMAL(10,2),
  create_time TIMESTAMP(3),
  `__op` STRING METADATA FROM 'op'  -- 存储操作类型(供后续过滤删除)
) WITH (
  'connector' = 'paimon',
  'path' = 'hdfs://hdfs-ip:9000/paimon/db/order',  -- 存储路径(HDFS/S3)
  'file.format' = 'parquet',  -- 高效存储格式
  'write.mode' = 'cdc',       -- 关键:CDC 模式,支持 update/delete
  'index.enabled' = 'true',   -- 开启主键索引,加速关联
  'index.type' = 'hash',      -- 哈希索引(适配 order_id 等值关联)
  'changelog.producer' = 'input'  -- 直接使用源表的 op 字段生成变更日志
);

-- 2. Paimon 支付表(CDC 模式,存储支付全量+增量数据)
CREATE TABLE paimon_pay (
  pay_id BIGINT PRIMARY KEY NOT ENFORCED,
  order_id BIGINT,            -- 外键(关联 paimon_order.order_id)
  pay_amount DECIMAL(10,2),
  pay_time TIMESTAMP(3),
  `__op` STRING METADATA FROM 'op'  -- 存储操作类型
) WITH (
  'connector' = 'paimon',
  'path' = 'hdfs://hdfs-ip:9000/paimon/db/pay',
  'file.format' = 'parquet',
  'write.mode' = 'cdc',
  'index.enabled' = 'true',
  'index.type' = 'hash',
  'changelog.producer' = 'input'
);
  • 关键配置说明:
    • write.mode = 'cdc':Paimon 会解析 __op 字段(映射 Debezium 的 op),自动执行 insert/update/delete,无需手动处理;
    • index.enabled = 'true':对主键 order_id 建立哈希索引,关联时可快速匹配,避免全表扫描;
    • changelog.producer = 'input':直接复用源表的操作类型,减少 Paimon 解析开销。

四、步骤 3:将 Kafka 流写入 Paimon 表(全量 + 增量同步)

通过 Flink SQL 将 Kafka 中的 binlog 流同步到 Paimon 表,首次运行会全量同步历史数据,之后自动消费增量 binlog:

sql

-- 1. 订单流写入 Paimon 订单表(过滤删除事件,或保留供后续处理)
INSERT INTO paimon_order
SELECT
  order_id,
  user_id,
  amount,
  create_time,
  op  -- 映射到 Paimon 表的 __op 元数据字段
FROM kafka_order_source
WHERE op != 'd';  -- 可选:过滤删除事件(如需保留删除,直接去掉 WHERE 条件)

-- 2. 支付流写入 Paimon 支付表
INSERT INTO paimon_pay
SELECT
  pay_id,
  order_id,
  pay_amount,
  pay_time,
  op
FROM kafka_pay_source
WHERE op != 'd';
  • 执行后,Flink 会启动两个流作业:
    • 全量阶段:消费 Kafka 主题从头开始的数据,同步历史全量数据到 Paimon;
    • 增量阶段:全量同步完成后,自动切换为消费新产生的 binlog 增量数据,实时写入 Paimon。

五、步骤 4:流 - 流关联实现(全量关联 + 增量关联)

Paimon 支持 批查询(全量关联历史数据) 和 流查询(增量关联新数据),两种方式可单独使用,也可结合实现 “全量 + 增量” 完整关联。

方式 1:批查询(全量关联历史数据)

适合首次初始化关联结果、定期重算全量数据的场景,直接关联两个 Paimon 表的全量数据:

sql

-- 1. 创建 Paimon 关联结果表(存储全量关联数据)
CREATE TABLE paimon_order_pay_join (
  order_id BIGINT,
  user_id BIGINT,
  order_amount DECIMAL(10,2),
  create_time TIMESTAMP(3),
  pay_id BIGINT,
  pay_amount DECIMAL(10,2),
  pay_time TIMESTAMP(3),
  PRIMARY KEY (order_id, pay_id) NOT ENFORCED  -- 联合主键(避免重复关联)
) WITH (
  'connector' = 'paimon',
  'path' = 'hdfs://hdfs-ip:9000/paimon/db/order_pay_join',
  'file.format' = 'parquet',
  'write.mode' = 'overwrite'  -- 全量重算时覆盖,或用 'append' 追加
);

-- 2. 全量关联:订单表 + 支付表(外键 order_id)
INSERT INTO paimon_order_pay_join
SELECT
  o.order_id,
  o.user_id,
  o.amount AS order_amount,
  o.create_time,
  p.pay_id,
  p.pay_amount,
  p.pay_time
FROM paimon_order o
LEFT JOIN paimon_pay p
ON o.order_id = p.order_id;  -- 外键关联
  • 执行方式:在 Flink SQL Client 中执行该语句,会启动一个 批作业,一次性关联 Paimon 表中的全量数据,写入关联结果表。

方式 2:流查询(增量关联新数据)

适合实时关联新增 / 更新的数据,仅消费两个 Paimon 表的增量变更(CDC 日志),避免重复关联历史数据:

sql

-- 增量关联:仅处理订单表和支付表的新增/更新数据
INSERT INTO paimon_order_pay_join
SELECT
  o.order_id,
  o.user_id,
  o.amount AS order_amount,
  o.create_time,
  p.pay_id,
  p.pay_amount,
  p.pay_time
FROM paimon_order o /*+ OPTIONS('scan.mode'='incremental') */
LEFT JOIN paimon_pay p /*+ OPTIONS('scan.mode'='incremental') */
ON o.order_id = p.order_id
-- 过滤删除事件(Paimon 增量扫描会包含删除的变更记录)
WHERE o.`__op` != 'd' AND p.`__op` != 'd';
  • 关键配置:scan.mode = 'incremental' 表示 Paimon 表以 “增量模式” 扫描,仅输出自上次消费后的新增 / 更新数据;
  • 执行方式:启动一个 流作业,持续消费两个 Paimon 表的增量变更,实时关联后写入结果表,延迟秒级。

方式 3:全量 + 增量结合(推荐生产使用)

首次执行全量关联初始化结果表,之后通过增量关联同步新数据,确保结果表数据完整且实时:

  1. 先执行 “方式 1” 的全量关联,初始化历史数据;
  2. 再执行 “方式 2” 的增量关联,持续同步新增 / 更新数据;
  3. 可选:定期(如每天凌晨)重新执行全量关联,修正增量关联可能出现的不一致(如网络抖动导致的漏数据)。

六、步骤 5:关联结果消费(Doris 直接查询)

Paimon 关联结果表可直接作为 Doris 的外部表,无需额外同步,Doris 可实时查询关联后的数据:

sql

-- 在 Doris 中创建 Paimon 外部表(关联结果表)
CREATE EXTERNAL TABLE doris_order_pay_join (
  order_id BIGINT,
  user_id BIGINT,
  order_amount DECIMAL(10,2),
  create_time DATETIME,
  pay_id BIGINT,
  pay_amount DECIMAL(10,2),
  pay_time DATETIME
) ENGINE=HUDI  -- Doris 用 HUDI 引擎兼容 Paimon(或使用 Paimon 专属引擎)
PROPERTIES (
  "path" = "hdfs://hdfs-ip:9000/paimon/db/order_pay_join",  -- Paimon 关联结果表路径
  "format" = "parquet",
  "hudi.table.type" = "COPY_ON_WRITE",  -- 适配 Paimon 的存储格式
  "fs.defaultFS" = "hdfs://hdfs-ip:9000",
  "hive.metastore.uris" = "thrift://hive-metastore-ip:9083"  -- 若 Paimon 注册到 Hive 元数据
);

-- Doris 直接查询关联结果(支持实时分析)
SELECT 
  user_id,
  COUNT(order_id) AS order_count,
  SUM(order_amount) AS total_amount
FROM doris_order_pay_join
WHERE create_time >= '2025-01-01'
GROUP BY user_id;

七、关键优化与注意事项

1. 关联性能优化

  • 索引优化:维表(如订单表)必须以关联字段(order_id)作为主键,开启哈希索引,避免关联时全表扫描;
  • 分区优化:Paimon 表按时间字段分区(如 create_time 按天分区),关联时指定分区过滤,减少扫描数据量:

    sql

    -- Paimon 订单表添加分区配置
    ALTER TABLE paimon_order SET PROPERTIES (
      'partition.column' = 'create_time',
      'partition.type' = 'range',
      'partition.range' = 'INTERVAL 1 DAY'  -- 按天分区
    );
    
  • 并行度配置:Flink 作业并行度 = Paimon 表分区数 = Kafka 分区数,避免数据倾斜(如并行度设为 10,对应 Kafka 10 个分区)。

2. 数据一致性保障

  • 操作类型过滤:关联时过滤 __op = 'd' 的删除事件,避免关联结果包含已删除数据;
  • 联合主键:关联结果表用 (order_id, pay_id) 作为联合主键,避免同一订单 + 支付记录重复写入;
  • Checkpoint 配置:Flink 作业开启 Checkpoint(如 1 分钟一次),故障恢复后可从断点继续消费,避免数据丢失。

3. 处理延迟与乱序

  • 时间窗口约束:若两个流存在延迟(如订单创建后 30 分钟内支付),可在关联时添加时间窗口过滤,避免无效关联:

    sql

    WHERE p.pay_time BETWEEN o.create_time AND o.create_time + INTERVAL '30' MINUTE
    
  • 乱序处理:Paimon 支持设置 write.buffer.delay(如 5 秒),缓冲乱序到达的数据后再写入文件,提升关联准确性。

4. 存储优化

  • 文件格式:优先使用 Parquet(压缩比高、查询快),避免 CSV;
  • 过期数据清理:设置 Paimon 表的 TTL(如 30 天),自动清理过期的历史数据,减少存储开销:

    sql

    ALTER TABLE paimon_order SET PROPERTIES ("ttl" = "2592000");  -- 30天(秒)
    

八、总结

Paimon 流 - 流增量关联的核心是 “CDC 模式存储数据流 + 批 / 流结合关联”,相比 Flink 纯流关联的优势:

  1. 无状态膨胀:关联数据持久化到 Paimon,无需依赖 Flink 状态缓存;
  2. 支持回溯:可随时重新执行全量关联,修正数据不一致;
  3. 多下游复用:关联结果表可供 Doris、Spark 等多个下游查询,无需重复关联;
  4. 简化架构:Doris 直接查询 Paimon 外部表,避免额外同步步骤。

该方案适合需要 “实时增量关联 + 历史全量回溯” 的场景(如电商订单与支付数据关联、物流数据与订单数据关联),是工业界成熟的流批一体关联方案。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐