目录

1. 架构规划

1.1 硬件与系统信息

1.2节点分配

1.3 依赖组件 (CDH)

2. 操作系统基础配置 (所有节点)

2.1 检查 CPU AVX2 指令集

2.2 操作系统参数优化 (核心稳定性保障)

2.3 配置 Hosts 映射

2.4 创建目录与授权

3. Doris 安装与环境集成 (Bigdata 用户)

3.1 解决 JDK 版本冲突 (关键)

3.2 解压 Doris 安装包

3.3 集成 CDH 配置文件

4. FE (Frontend) 部署

4.1 修改 fe.conf

4.2 启动与组网

5. BE (Backend) 部署

5.1 修改 be.conf

5.2 启动与注册

6. Paimon 数据湖集成 (最终验证方案)

6.1 创建 Catalog SQL (生产推荐)

6.2 验证查询

6.2.1 切换 Catalog 与 数据库

6.2.2 查看表列表

6.2.3 数据查询验证

6.3 方案优势总结 (Why HMS?)

7. 部署过程问题排查记录 (Troubleshooting)

8. 基础测试(增删改查)

8.1 插入数据

8.2 更新数据

8.3 删除数据

9. Paimon外表数据写入Doris内表基础测试

9.1 Paimon数据准备

9.2 测试代码

9.3 验证数据


1. 项目背景与技术升级 随着业务数据量的激增及对实时性要求的提高,现有 Paimon + OLAP 架构面临着更高的性能挑战。尽管早期引入的 Doris 2.x 版本成功解决了大字段存储痛点,但为了进一步挖掘数据价值,追求极致的查询响应速度与更低的资源消耗,项目组计划探索 Apache Doris 4.0.1(官方最新版本)的架构潜力。

新版本在异步物化视图、全新的查询优化器(Nereids)以及湖仓读取管线上进行了颠覆性升级。本次选型调整旨在验证新一代引擎在处理超大规模复杂数据时的性能边界,评估其是否能作为下一代核心计算单元,进一步提升数仓的流批处理能力。

2. 测试目标 本次测试旨在对 Doris 4.0.1 + Paimon 1.1.1 进行前瞻性的深度集成验证,重点评估新版本特性带来的收益,具体包括:

  • 极速读取性能:验证新版本针对 Paimon 格式的 Native Reader 优化效果,评估在海量数据扫描场景下的 IO 吞吐提升幅度。

  • 复杂计算与回写效能:在保留大字段处理优势的前提下,对比测试新优化器在执行复杂 ETL 逻辑时的计算加速比,以及高并发场景下的数据回写稳定性。

3.补充说明

Paimon官网支持Doris 2.0.6 及以上版本。对应网址:Doris | Apache Paimon

Doirs官网支持对Paimon的查询,使用 Doris 的分布式计算引擎直接访问 Paimon 数据以实现查询加速;数据集成,读取Paimon数据并将其写入Doris内部表,或使用Doris计算引擎执行ZeroETL;不支持数据写回Paimon。对应网址:Paimon Catalog - Apache Doris

支持Paimon版本为1.0.0

调研架构图如下:

1. 架构规划

1.1 硬件与系统信息

  • 操作系统: CentOS 7 (CDH 6.3.2 环境混合部署)

  • 节点配置:

    • CPU: 10核

    • 内存: 14GB (资源紧缺,需精细调优)

    • 存储: 400GB SSD

  • 部署用户: bigdata

  • Java 环境: Doris 4.0 需独立安装 JDK 17,这里我安装部署之后的路径为/home/bigdata/doris/jdk-17.0.2

Doris官方文档:软硬件环境检查 - Apache Doris

1.2节点分配

前置组件分配

IP 主机名 角色 版本
10.x.xx.201-10.x.xx.205 10.x.xx.215 10.x.xx.149 10.x.xx.151 10.x.xx.156 10.x.xx.157 10.x.xx.167 10.x.xx.206 nd1-nd5 nd6 nd11 nd12 nd13 nd14 nd15 nd16 CDH 6.3.2
10.x.xx.201-10.x.xx.205 nd1-nd5 Paimon 1.1.1

采用 3 节点混合部署 (FE + BE 同节点) 的高可用架构。

IP 地址 主机名 角色规划 操作系统用户 备注
10.x.xx.157 nd14 FE (Master) + BE bigdata 引导节点
10.x.xx.167 nd15 FE (Follower) + BE bigdata 高可用节点
10.x.xx.206 nd16 FE (Follower) + BE bigdata 高可用节点

1.3 依赖组件 (CDH)

  • Hive Metastore: nd1 (10.x.xx.201), nd3 (10.x.xx.203)

  • HDFS NameNode: HA 模式 (Active 节点假设为 nd1)

  • CDH Java 版本: JDK 1.8 (与 Doris 4.0 不兼容,需独立安装 JDK 17)

2. 操作系统基础配置 (所有节点)

执行对象: 所有 3 台机器,nd14、nd15、nd16上执行以下操作。

2.1 检查 CPU AVX2 指令集

Doris 4.0+ 强依赖 AVX2。

cat /proc/cpuinfo | grep avx2

  • 有输出:继续下一步。

  • 无输出:您需要下载 Doris 的 x64-noavx2 版本安装包,否则 BE 启动会报错 Illegal instruction

2.2 操作系统参数优化 (核心稳定性保障)

针对 14GB 小内存环境,必须关闭 Swap 并调大文件句柄。

# 1. 永久关闭 Swap (防止内存吃紧时拖死机器)
# 临时关闭,这里我使用的临时关闭,可根据实际环境选择是否永久关闭
swapoff -a
# 永久关闭
sed -i '/swap/s/^/#/' /etc/fstab
​
# 2. 修改内核参数
sudo vi /etc/sysctl.conf
# 添加:
vm.max_map_count=2000000
vm.swappiness=0
# 生效:
sysctl -p

# 3. 修改资源限制
vi /etc/security/limits.conf
# 添加:
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536

注意:修改 limits 后,必须退出 SSH 重新登录 bigdata 用户才能生效。

注意:由于原始的配置均为65535,Doris 官方推荐 65536 是为了取个整(2的16次方),但实际上 65535 对于 Doris 来说没有任何区别。只要这个数值大于 60000,Doris 就能非常稳定地运行。因此上述配置可以不用进行配置。

2.3 配置 Hosts 映射

确保 Doris 节点能解析自身以及 CDH 的节点。由于这里我是在CDH集群节点上选择的节点搭建,相应的配置均有,因此也可不用进行配置。

vi /etc/hosts
​
# Doris 集群
10.x.xx.157 nd14
10.x.xx.167 nd15
10.x.xx.206 nd16
​
# CDH 依赖 (必须包含 hive-site.xml 中配置的主机名)
10.x.xx.201 nd1
10.x.xx.203 nd3

2.4 创建目录与授权

mkdir -p /home/bigdata/doris
mkdir -p /home/bigdata/doris/doris-meta     # FE 元数据
mkdir -p /home/bigdata/doris/doris-storage  # BE 数据存储
​
# 移交权限给 bigdata
chown -R bigdata:bigdata /home/bigdata/doris

3. Doris 安装与环境集成 (Bigdata 用户)

执行对象: 所有 3 台机器 执行用户: bigdata

3.1 解决 JDK 版本冲突 (关键)

痛点:CDH 环境是 JDK 8,但 Doris 4.0.1 强制要求 JDK 17。 方案:下载免安装版 JDK 17,仅供 Doris 内部使用,不影响系统环境变量。

# 1. 上传或下载 JDK 17 到 /home/bigdata/doris/ 目录
cd /home/bigdata/doris/
# 假设已下载 openjdk-17.0.2_linux-x64_bin.tar.gz
# wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz
tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz
​
# 2. 记录路径 (后续配置要用)
# 路径为: /home/bigdata/doris/jdk-17.0.2

也可使用网址下载然后上传:https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz

下载压缩包成功的截图如下:

解压之后的结果如下:

3.2 解压 Doris 安装包

cd /home/bigdata/doris
tar -zxvf apache-doris-4.0.1-bin-x64.tar.gz
cd apache-doris-4.0.1-bin-x64
mv fe /home/bigdata/doris/
mv be /home/bigdata/doris/

3.3 集成 CDH 配置文件

nd14 上操作,将 nd1上CDH 集群的配置文件拉取到 Doris 配置目录。

mkdir -p /home/bigdata/doris/conf/cdh_conf/
​
# 从 CDH 节点 (201) 拷贝
# 拷贝 Hadoop 配置文件 (core-site.xml 和 hdfs-site.xml)
scp root@10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/doris/conf/cdh_conf/
scp root@10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/doris/conf/cdh_conf/
# 拷贝 Hive 配置文件 (hive-site.xml)
scp root@10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/doris/conf/cdh_conf/

验证文件

ls -l /home/bigdata/doris/conf/cdh_conf/

分发到其他节点 (nd15, nd16)

将这 3 个文件放到 Doris 2台机器(另外2台)的统一目录,例如 /home/bigdata/doris/conf/cdh_conf/

# 1. 确保目标机器也有这个目录
ssh bigdata@nd15 "mkdir -p /home/bigdata/doris/conf/cdh_conf/"
ssh bigdata@nd16 "mkdir -p /home/bigdata/doris/conf/cdh_conf/"

# 2. 发送文件给 nd15
scp /home/bigdata/doris/conf/cdh_conf/* bigdata@nd15:/home/bigdata/doris/conf/cdh_conf/

# 3. 发送文件给 nd16
scp /home/bigdata/doris/conf/cdh_conf/* bigdata@nd16:/home/bigdata/doris/conf/cdh_conf/

4. FE (Frontend) 部署

4.1 修改 fe.conf

操作对象: 3 台机器。

vi /home/bigdata/doris/fe/conf/fe.conf
# 1. 元数据目录
meta_dir = /home/bigdata/doris/doris-meta

# 2. 绑定网段
priority_networks = 10.8.15.0/24

# 3. 【关键】指定 JDK 17 路径 (解决 The jdk_version is 8, must be 17 报错)
JAVA_HOME = /home/bigdata/doris/jdk-17.0.2

# 4. 内存优化 (G1GC + 4G堆内存)
# 14G 内存机器,给 FE 分配 4G,预留资源给 BE
JAVA_OPTS = "-Xmx4096m -Xms4096m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc*:$DORIS_HOME/log/fe.gc.log:time,uptimemillis,level,tags"

4.2 启动与组网

1. 启动 Master (10.x.xx.157)

/home/bigdata/doris/fe/bin/start_fe.sh --daemon

# 停止服务
# /home/bigdata/doris/fe/bin/stop_fe.sh --daemon

2. 启动 Followers (10.x.xx.167 和 206) 首次启动需指定 Helper

/home/bigdata/doris/fe/bin/start_fe.sh --helper 10.x.xx.157:9010 --daemon

查看日志确认是否启动成功:

tail -f /home/bigdata/doris/fe/log/fe.log

由上述截图可以得出:

  1. receive report from be ...:

    • 这是最关键的信号。说明 FE 收到了 BE 节点发来的心跳汇报。

    • type: DISK: BE 正在汇报磁盘使用情况。

    • type: CPU: BE 正在汇报 CPU 负载。

    • 这意味着 FE 和 BE 之间的网络是通的,且 BE 进程已存活

  2. BeLoadRebalancer ... get number of low load paths ...:

    • 这是 Doris 的均衡调度器在工作。它正在检查是否需要在节点间迁移数据副本。

    • isUrgent false: 表示当前没有紧急的均衡任务(正常,因为是新集群,没有数据倾斜)。

  3. BinlogManager.gc() ... no gc binlog:

    • 这是清理过期的 Binlog 日志,属于正常的后台维护任务。

3. 注册节点 在 157 上使用 MySQL 客户端连接:

mysql -h 10.x.xx.157 -P 9030 -u root

执行 SQL:

ALTER SYSTEM ADD FOLLOWER "10.x.xx.167:9010";
ALTER SYSTEM ADD FOLLOWER "10.x.xx.206:9010";

SHOW PROC '/frontends'; 

验证:SHOW PROC '/frontends'; 确保 3 节点 Alive=true

5. BE (Backend) 部署

5.1 修改 be.conf

操作对象: 3 台机器。

vi /home/bigdata/doris/be/conf/be.conf
# 1. 绑定网段
priority_networks = 10.8.15.0/24

# 2. 数据目录
storage_root_path = /home/bigdata/doris/doris-storage

# 3. 【关键】指定 JDK 17 (Paimon 插件依赖)
JAVA_HOME = /home/bigdata/doris/jdk-17.0.2

# 4. 【关键】端口避让 (CDH NodeManager 占用 8040,改为 18040)
webserver_port = 18040

5.2 启动与注册

1. 启动所有 BE

/home/bigdata/doris/be/bin/start_be.sh --daemon

# 停止服务
# /home/bigdata/doris/be/bin/stop_be.sh --daemon

查看日志确认是否启动成功:

tail -f /home/bigdata/doris/be/log/be.log

由上面截图可以得出:

  1. success to build all report tablets info:

    • 这是最重要的信号。说明 BE 已经成功扫描了本地的数据分片(Tablets),并准备好向 FE 汇报状态。

    • tablet_count=15: 说明它已经管理了一些内部表(通常是 Doris 的系统表)。

  2. Scheduled(every 10s) WAL info:

    • 预写日志(WAL)管理器正在例行检查,用于保障数据写入的可靠性。状态正常。

  3. query for dictionary status, return 0 rows:

    • 这是 BE 内部的字典缓存查询,没有报错,属于正常的心跳或检测机制。

集群状态总结

  • FE 日志: 正常接收 BE 汇报,无报错。

  • BE 日志: 正常维护存储,定期汇报,无报错。

  • 进程: FE 和 BE 均已存活。

2. 注册 BE 在 157 的 MySQL 客户端执行:

ALTER SYSTEM ADD BACKEND "10.x.xx.157:9050";
ALTER SYSTEM ADD BACKEND "10.x.xx.167:9050";
ALTER SYSTEM ADD BACKEND "10.x.xx.206:9050";

SHOW PROC '/backends';

验证:SHOW PROC '/backends'; 确保 3 节点 Alive=true

6. Paimon 数据湖集成 (最终验证方案)

背景: 在对接 CDH 6.3.2 (Hive 2.1.1) 时,Doris 默认的 Hive 3 客户端协议会触发 Invalid method name 报错。 在 Doris 2.x 版本中,为了规避此问题通常被迫使用 filesystem 模式,或者在配置了HA模式下,"hive.metastore.uris"参数只设置一个地址,当其中一个节点挂掉之后,需要手动进行切换。 但在 Doris 4.0.1 版本中,内核优化了对多版本协议的支持,完全修复了 HA 模式下版本参数失效的问题

解决方案: 采用 HMS (Hive Metastore) 模式,配置 双节点 HA,并通过显式指定 hive.version 进行协议降级。这是兼顾高性能(支持 CBO 优化)高可用数据一致性(支持 ACID)的最佳实践方案。

6.1 创建 Catalog SQL (生产推荐)

在 Doris 4.0.1 中,可以放心使用 HA 配置。在 Doris MySQL 客户端执行:

DROP CATALOG IF EXISTS paimon_catalog;

CREATE CATALOG paimon_catalog PROPERTIES (
    -- 1. 指定 Catalog 类型为 Paimon
    "type" = "paimon",
    
    -- 2. 【核心配置】使用 hms 模式,通过 Hive Metastore 管理元数据
    -- 相比 filesystem 模式,支持权限控制、ACID 事务读取和查询优化
    "paimon.catalog.type" = "hms",
    
    -- 3. 【高可用配置】配置多个 Metastore 地址 (Doris 4.0 已修复 HA 兼容性问题)
    "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083",
    
    -- 4. 【关键兼容修复】
    -- 强制指定 Hive 版本为 2.1.1 (对应 CDH 6.3.2),禁用高版本不兼容的 API 调用
    "hive.version" = "2.1.1",
    
    -- 5. 指定数仓物理路径
    "warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    
    -- 6. 加载 Hadoop 配置文件 (用于读取 HDFS HA、Kerberos 等配置)
    "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/",
    "hadoop.username" = "hdfs"
);

注意:此处 paimon.catalog.type 必须填写为 "hms",不可使用 "hive"(旧版写法)或 "filesystem"

6.2 验证查询

6.2.1 切换 Catalog 与 数据库

SWITCH paimon_catalog;
SHOW DATABASES;

结果如下:

6.2.2 查看表列表

USE ods;
SHOW TABLES;

结果如下:

6.2.3 数据查询验证

SELECT * FROM t_admin_division_code LIMIT 5;

结果如下:

6.3 方案优势总结 (Why HMS?)

相比之前的 Filesystem 方案,当前方案具有显著优势:

  1. 高可用性 (HA):配置了 nd1nd3 双 Metastore 节点,任意单点故障不影响 Doris 业务查询。

  2. 性能优化 (CBO):Doris 可以从 HMS 获取表的行数、文件大小等统计信息,生成更优的 Join 执行计划。

  3. 数据准确性:HMS 模式能正确识别 Paimon/Hive 的 ACID 事务状态,避免读取到未提交或已删除的脏数据。

  4. 运维规范:统一通过 Metastore 管理元数据,符合数仓建设标准。

对于Doris 2.1.10的所有方案均可以在Doris 4.0.1里面进行查看,相应的截图如下:

方案一【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog;

CREATE CATALOG paimon_catalog PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "hms",
    "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083",
    "warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/",
    "hadoop.username" = "hdfs",
    
    -- 【关键修复】显式指定 Hive 版本,禁止调用 Hive 3 的新 API
    "hive.version" = "2.1.1"
);

SWITCH paimon_catalog;
USE ods;
SELECT * FROM t_admin_division_code LIMIT 5;

方案二【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog;

CREATE CATALOG paimon_catalog PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "hms",
    "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083",
    "warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/",
    "hadoop.username" = "hdfs",
    
    -- 【关键修复】显式指定 Hive 版本,禁止调用 Hive 3 的新 API
    "hive.version" = "1.1.0"
);

SWITCH paimon_catalog;
USE ods;
SELECT * FROM t_admin_division_code LIMIT 5;

方案三【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog;

CREATE CATALOG paimon_catalog PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    
    -- 直接指向 HDFS 上的数仓根目录 (注意:如果 nameservice 未解析,直接写 active namenode 地址)
    "warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    
    "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/",
    "hadoop.username" = "hdfs"
);

SWITCH paimon_catalog;
USE ods;
SELECT * FROM t_admin_division_code LIMIT 5;

方案四【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog;

CREATE CATALOG paimon_catalog PROPERTIES (
    "type" = "paimon",
    -- 【这里是修改点】:将 'hive' 改为 'hms'
    "paimon.catalog.type" = "hms",
    -- 指定 HMS 地址
    "hive.metastore.uris" = "thrift://nd1:9083",
    -- 【核心兼容配置】保持不变,解决 CDH 兼容性
    "hive.version" = "2.1.1",
    -- 数仓路径
    "warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    -- 复用配置文件
    "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/",
    "hadoop.username" = "hdfs"
);

SWITCH paimon_catalog;
USE ods;
SELECT * FROM t_admin_division_code LIMIT 5;

7. 部署过程问题排查记录 (Troubleshooting)

在本次部署中,我们遇到了以下关键问题并已解决:

# 问题现象 报错关键信息 根本原因 解决方案
1 FE 启动失败 The jdk_version is 8, must be 17 CDH 环境默认是 JDK 8,SelectDB 4.0 强制要求 JDK 17。 下载解压 JDK 17,在 fe.confbe.conf 中显式配置 JAVA_HOME 指向新 JDK 路径。
2 权限拒绝 fe.out: Permission denied 曾使用 root 用户启动过 Doris,导致日志文件归属变为 root,切回 bigdata 后无法写入。 使用 root 执行 chown -R bigdata:bigdata /home/bigdata/doris 修复所有权,并杀掉残留的 root 进程。
3 端口冲突 tcp listen failed, errno=98 (8040) CDH 的 YARN NodeManager 占用了 8040 端口。 修改 be.conf,设置 webserver_port = 18040
5 内存隐患 (无报错,但在高负载下可能死机) 14GB 内存下,BE 默认尝试占用 90% 内存,会挤压 FE 和 OS 空间。 be.conf 中强制设置 mem_limit = 60%

8. 基础测试(增删改查)

将下述测试代码保存为doris4_test.py【python解释器:3.8.20、windows系统:11】

# -*- coding: utf-8 -*-
import pymysql
import random
import time
import logging
import functools
from sshtunnel import SSHTunnelForwarder

# ================= 配置信息 =================

# 1. SSH 连接配置 (连接到 Doris FE 所在的服务器)
# 您提供的 IP 列表: 10.x.xx.157, 10.x.xx.167, 10.x.xx.206
SSH_HOST = '10.x.xx.157'       # 选一个 FE 节点 (Master)
SSH_PORT = 22
SSH_USER = 'xxxxxx'           # CDH 环境通常使用 bigdata 用户
SSH_PASSWORD = 'xxxxxxxxxxxxxxxxxxxxx' # 您提供的密码

# 2. Doris 数据库连接配置
DORIS_LOCAL_HOST = '127.0.0.1' # 隧道建立后,对本地而言就是连本机
DORIS_QUERY_PORT = 9030        # Doris MySQL 协议查询端口
DORIS_DB_USER = 'root'
DORIS_DB_PWD = ''              # 默认无密码
DB_NAME = 'doris4_test_db'
TABLE_NAME = 'user_profile_v4'

LOG_FILE = 'doris4_test.log'

# ================= 日志与工具模块 =================
def setup_logger():
    logger = logging.getLogger("Doris4Tester")
    logger.setLevel(logging.INFO)
    if logger.hasHandlers():
        logger.handlers.clear()
    
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    
    # 文件日志
    file_handler = logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8')
    file_handler.setFormatter(formatter)
    
    # 控制台日志
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

logger = setup_logger()

def measure_time(func):
    """装饰器:用于记录函数执行耗时"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        logger.info(f"正在执行: [{func.__name__}] ...")
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            logger.info(f"执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}")
            raise e
    return wrapper

# ================= Doris 4.0 核心业务逻辑 =================

@measure_time
def init_db_and_table(cursor):
    """初始化数据库和表 (适配 Doris 4.0 Unique Key 模型)"""
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}")
    cursor.execute(f"USE {DB_NAME}")
    
    # Doris 4.0 建表最佳实践:
    # 1. 使用 Unique Key 模型
    # 2. 开启 enable_unique_key_merge_on_write (MoW) 实现类 MySQL 的高性能更新
    # 3. replication_num = 1 (测试环境节省资源,生产建议 3)
    create_sql = f"""
    CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
        user_id INT COMMENT "用户ID",
        username VARCHAR(50) COMMENT "用户名",
        age INT COMMENT "年龄",
        city VARCHAR(20) COMMENT "城市",
        balance DECIMAL(10, 2) COMMENT "余额",
        create_time DATETIME(3) COMMENT "创建时间(毫秒精度)"
    )
    UNIQUE KEY(user_id)
    DISTRIBUTED BY HASH(user_id) BUCKETS 1
    PROPERTIES (
        "replication_num" = "1",
        "enable_unique_key_merge_on_write" = "true", 
        "store_row_column" = "true" 
    );
    """
    # store_row_column="true" 是 4.0 特性,优化部分列更新和点查性能
    
    cursor.execute(create_sql)
    # 每次测试前清空表,保证环境纯净
    cursor.execute(f"TRUNCATE TABLE {TABLE_NAME}")
    logger.info(f"数据库 {DB_NAME} 和表 {TABLE_NAME} (Unique Key MoW) 已初始化")

@measure_time
def insert_data(cursor, count=10):
    """批量插入数据"""
    logger.info(f"准备插入 {count} 条数据...")
    data_list = []
    cities = ['Beijing', 'Shanghai', 'Guangzhou', 'Shenzhen', 'Chengdu']
    
    for i in range(1, count + 1):
        user_id = i
        username = f"User_{i:03d}"
        age = random.randint(20, 60)
        city = random.choice(cities)
        balance = round(random.uniform(100.0, 5000.0), 2)
        # 模拟当前时间
        create_time = time.strftime('%Y-%m-%d %H:%M:%S')
        data_list.append((user_id, username, age, city, balance, create_time))
    
    sql = f"INSERT INTO {TABLE_NAME} VALUES (%s, %s, %s, %s, %s, %s)"
    cursor.executemany(sql, data_list)
    logger.info(f"成功插入 {cursor.rowcount} 条数据")

@measure_time
def update_data(cursor):
    """随机更新数据 (Doris 4.0 MoW 模型支持高效 UPDATE)"""
    # 1. 先查出所有 ID
    cursor.execute(f"SELECT user_id FROM {TABLE_NAME}")
    all_ids = [row['user_id'] for row in cursor.fetchall()]
    
    if not all_ids:
        logger.warning("表中无数据,跳过更新")
        return

    # 2. 随机选 3 个 ID 更新
    target_ids = random.sample(all_ids, min(len(all_ids), 3))
    logger.info(f"随机选中 ID 进行更新: {target_ids}")
    
    for uid in target_ids:
        new_balance = 9999.99
        new_city = "Updated_City"
        # 标准 MySQL 更新语法
        sql = f"UPDATE {TABLE_NAME} SET balance = %s, city = %s WHERE user_id = %s"
        cursor.execute(sql, (new_balance, new_city, uid))
        logger.info(f" -> 已更新 ID={uid}: Balance设为 {new_balance}, City设为 {new_city}")

@measure_time
def delete_data(cursor):
    """随机删除数据"""
    # 1. 先查出所有 ID
    cursor.execute(f"SELECT user_id FROM {TABLE_NAME}")
    all_ids = [row['user_id'] for row in cursor.fetchall()]
    
    if not all_ids:
        logger.warning("表中无数据,跳过删除")
        return

    # 2. 随机选 2 个 ID 删除
    target_ids = random.sample(all_ids, min(len(all_ids), 2))
    logger.info(f"随机选中 ID 进行删除: {target_ids}")
    
    # 使用 IN 语法批量删除
    format_strings = ','.join(['%s'] * len(target_ids))
    sql = f"DELETE FROM {TABLE_NAME} WHERE user_id IN ({format_strings})"
    cursor.execute(sql, tuple(target_ids))
    logger.info(f" -> 删除操作完成,受影响行数: {cursor.rowcount}")

@measure_time
def query_final_result(cursor):
    """查询并展示最终结果"""
    sql = f"SELECT * FROM {TABLE_NAME} ORDER BY user_id"
    cursor.execute(sql)
    results = cursor.fetchall()
    
    logger.info("-" * 50)
    logger.info(f"最终表数据 (总行数: {len(results)}):")
    logger.info(f"{'ID':<5} {'Name':<10} {'Age':<5} {'City':<15} {'Balance':<10}")
    logger.info("-" * 50)
    
    for row in results:
        # 这里的 row 是字典,因为 connect 时指定了 DictCursor
        logger.info(f"{row['user_id']:<5} {row['username']:<10} {row['age']:<5} {row['city']:<15} {row['balance']:<10}")
    logger.info("-" * 50)

# ================= 主程序入口 =================

def main():
    ssh_tunnel = None
    db_conn = None
    
    try:
        logger.info(">>> 1. 正在建立 SSH 隧道 ...")
        # 配置 SSH 隧道:本地随机端口 -> SSH(10.8.15.157) -> Doris FE(127.0.0.1:9030)
        ssh_tunnel = SSHTunnelForwarder(
            (SSH_HOST, SSH_PORT),
            ssh_username=SSH_USER,
            ssh_password=SSH_PASSWORD,
            remote_bind_address=(DORIS_LOCAL_HOST, DORIS_QUERY_PORT)
        )
        ssh_tunnel.start()
        logger.info(f">>> SSH 隧道建立成功! 本地映射端口: {ssh_tunnel.local_bind_port}")

        logger.info(">>> 2. 正在连接 Doris 4.0.1 数据库 ...")
        db_conn = pymysql.connect(
            host='127.0.0.1',                # 连接本机
            port=ssh_tunnel.local_bind_port, # 隧道端口
            user=DORIS_DB_USER,
            password=DORIS_DB_PWD,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor, # 返回字典格式数据方便读取
            autocommit=True  # 开启自动提交,这对 Doris 很重要
        )
        cursor = db_conn.cursor()
        
        # --- 执行测试步骤 ---
        logger.info(">>> 3. 开始执行 CRUD 测试流程")
        
        # 1. 建库建表
        init_db_and_table(cursor)
        
        # 2. 插入 10 条数据
        insert_data(cursor, count=10)
        time.sleep(2) # 稍作等待,确保数据版本提交(虽然4.0很快,但为了演示效果)
        
        # 3. 随机更新
        update_data(cursor)
        time.sleep(1)
        
        # 4. 随机删除
        delete_data(cursor)
        time.sleep(1)
        
        # 5. 查询结果
        query_final_result(cursor)
        
        logger.info(">>> 4. 测试全部通过!Doris 4.0.1 运行正常。")

    except Exception as e:
        logger.error(f"❌ 测试过程中发生错误: {e}")
        import traceback
        logger.error(traceback.format_exc())
    finally:
        # 资源清理
        if db_conn:
            db_conn.close()
            logger.info("数据库连接已关闭")
        if ssh_tunnel:
            ssh_tunnel.stop()
            logger.info("SSH 隧道已关闭")

if __name__ == "__main__":
    main()

对应log文件记录如下:

2025-12-09 11:46:53,544 - INFO - >>> 1. 正在建立 SSH 隧道 ...
2025-12-09 11:46:53,830 - INFO - >>> SSH 隧道建立成功! 本地映射端口: 64441
2025-12-09 11:46:53,831 - INFO - >>> 2. 正在连接 Doris 4.0.1 数据库 ...
2025-12-09 11:46:54,028 - INFO - >>> 3. 开始执行 CRUD 测试流程
2025-12-09 11:46:54,028 - INFO - 正在执行: [init_db_and_table] ...
2025-12-09 11:46:54,044 - INFO - 数据库 doris4_test_db 和表 user_profile_v4 (Unique Key MoW) 已初始化
2025-12-09 11:46:54,044 - INFO - 执行完成: [init_db_and_table] | 耗时: 0.0165 秒
2025-12-09 11:46:54,045 - INFO - 正在执行: [insert_data] ...
2025-12-09 11:46:54,045 - INFO - 准备插入 100 条数据...
2025-12-09 11:46:54,188 - INFO - 成功插入 100 条数据
2025-12-09 11:46:54,188 - INFO - 执行完成: [insert_data] | 耗时: 0.1437 秒
2025-12-09 11:46:56,189 - INFO - 正在执行: [update_data] ...
2025-12-09 11:46:56,224 - INFO - 随机选中 ID 进行更新: [46, 71, 41]
2025-12-09 11:46:56,275 - INFO -  -> 已更新 ID=46: Balance设为 9999.99, City设为 Updated_City
2025-12-09 11:46:56,326 - INFO -  -> 已更新 ID=71: Balance设为 9999.99, City设为 Updated_City
2025-12-09 11:46:56,387 - INFO -  -> 已更新 ID=41: Balance设为 9999.99, City设为 Updated_City
2025-12-09 11:46:56,388 - INFO - 执行完成: [update_data] | 耗时: 0.1991 秒
2025-12-09 11:46:57,388 - INFO - 正在执行: [delete_data] ...
2025-12-09 11:46:57,405 - INFO - 随机选中 ID 进行删除: [58, 50]
2025-12-09 11:46:57,457 - INFO -  -> 删除操作完成,受影响行数: 2
2025-12-09 11:46:57,457 - INFO - 执行完成: [delete_data] | 耗时: 0.0691 秒
2025-12-09 11:46:58,458 - INFO - 正在执行: [query_final_result] ...
2025-12-09 11:46:58,483 - INFO - --------------------------------------------------
2025-12-09 11:46:58,483 - INFO - 最终表数据 (总行数: 98):
2025-12-09 11:46:58,484 - INFO - ID    Name       Age   City            Balance   
2025-12-09 11:46:58,484 - INFO - --------------------------------------------------
2025-12-09 11:46:58,484 - INFO - 1     User_001   32    Guangzhou       4987.09   
2025-12-09 11:46:58,484 - INFO - 2     User_002   55    Shanghai        4799.23   
2025-12-09 11:46:58,484 - INFO - 3     User_003   29    Guangzhou       3795.66   
2025-12-09 11:46:58,484 - INFO - 4     User_004   21    Shanghai        1065.80   
2025-12-09 11:46:58,484 - INFO - 5     User_005   48    Beijing         3250.84   
2025-12-09 11:46:58,484 - INFO - 6     User_006   33    Shenzhen        2121.31   
2025-12-09 11:46:58,484 - INFO - 7     User_007   39    Guangzhou       1792.82   
2025-12-09 11:46:58,485 - INFO - 8     User_008   21    Chengdu         350.23    
2025-12-09 11:46:58,485 - INFO - 9     User_009   59    Chengdu         4108.27   
2025-12-09 11:46:58,485 - INFO - 10    User_010   57    Shanghai        2674.18   
2025-12-09 11:46:58,485 - INFO - 11    User_011   43    Guangzhou       4362.53   
2025-12-09 11:46:58,485 - INFO - 12    User_012   24    Chengdu         845.92    
2025-12-09 11:46:58,485 - INFO - 13    User_013   47    Shenzhen        3597.69   
2025-12-09 11:46:58,485 - INFO - 14    User_014   53    Shenzhen        1159.92   
2025-12-09 11:46:58,485 - INFO - 15    User_015   48    Shanghai        4159.49   
2025-12-09 11:46:58,485 - INFO - 16    User_016   30    Guangzhou       3863.11   
2025-12-09 11:46:58,485 - INFO - 17    User_017   30    Guangzhou       2356.23   
2025-12-09 11:46:58,485 - INFO - 18    User_018   43    Beijing         4038.47   
2025-12-09 11:46:58,485 - INFO - 19    User_019   33    Shenzhen        3465.97   
2025-12-09 11:46:58,486 - INFO - 20    User_020   33    Guangzhou       2548.67   
2025-12-09 11:46:58,486 - INFO - 21    User_021   47    Shanghai        2104.14   
2025-12-09 11:46:58,486 - INFO - 22    User_022   44    Beijing         4932.08   
2025-12-09 11:46:58,486 - INFO - 23    User_023   60    Guangzhou       2993.93   
2025-12-09 11:46:58,486 - INFO - 24    User_024   57    Chengdu         1831.64   
2025-12-09 11:46:58,486 - INFO - 25    User_025   26    Shenzhen        2478.94   
2025-12-09 11:46:58,486 - INFO - 26    User_026   31    Shanghai        4901.88   
2025-12-09 11:46:58,486 - INFO - 27    User_027   42    Shanghai        1422.68   
2025-12-09 11:46:58,486 - INFO - 28    User_028   46    Shenzhen        2586.16   
2025-12-09 11:46:58,486 - INFO - 29    User_029   31    Guangzhou       3395.99   
2025-12-09 11:46:58,486 - INFO - 30    User_030   50    Shanghai        657.19    
2025-12-09 11:46:58,486 - INFO - 31    User_031   51    Shenzhen        4879.95   
2025-12-09 11:46:58,487 - INFO - 32    User_032   58    Guangzhou       1523.34   
2025-12-09 11:46:58,487 - INFO - 33    User_033   48    Shanghai        2711.63   
2025-12-09 11:46:58,487 - INFO - 34    User_034   38    Shanghai        1920.85   
2025-12-09 11:46:58,487 - INFO - 35    User_035   31    Shanghai        1700.61   
2025-12-09 11:46:58,487 - INFO - 36    User_036   56    Chengdu         2682.61   
2025-12-09 11:46:58,487 - INFO - 37    User_037   55    Shenzhen        1431.78   
2025-12-09 11:46:58,487 - INFO - 38    User_038   31    Shenzhen        4727.04   
2025-12-09 11:46:58,487 - INFO - 39    User_039   32    Chengdu         3227.39   
2025-12-09 11:46:58,487 - INFO - 40    User_040   43    Shenzhen        3663.79   
2025-12-09 11:46:58,487 - INFO - 41    User_041   50    Updated_City    9999.99   
2025-12-09 11:46:58,487 - INFO - 42    User_042   43    Guangzhou       1648.04   
2025-12-09 11:46:58,487 - INFO - 43    User_043   35    Shanghai        3318.13   
2025-12-09 11:46:58,487 - INFO - 44    User_044   48    Chengdu         2464.68   
2025-12-09 11:46:58,488 - INFO - 45    User_045   28    Shenzhen        4477.58   
2025-12-09 11:46:58,488 - INFO - 46    User_046   31    Updated_City    9999.99   
2025-12-09 11:46:58,488 - INFO - 47    User_047   59    Shenzhen        3873.40   
2025-12-09 11:46:58,488 - INFO - 48    User_048   55    Beijing         4772.47   
2025-12-09 11:46:58,488 - INFO - 49    User_049   50    Shenzhen        1199.26   
2025-12-09 11:46:58,488 - INFO - 51    User_051   59    Beijing         1975.62   
2025-12-09 11:46:58,488 - INFO - 52    User_052   52    Beijing         309.98    
2025-12-09 11:46:58,488 - INFO - 53    User_053   34    Shenzhen        1315.21   
2025-12-09 11:46:58,488 - INFO - 54    User_054   40    Guangzhou       4976.19   
2025-12-09 11:46:58,488 - INFO - 55    User_055   59    Shenzhen        2495.20   
2025-12-09 11:46:58,488 - INFO - 56    User_056   38    Shanghai        2183.50   
2025-12-09 11:46:58,489 - INFO - 57    User_057   47    Shanghai        3532.53   
2025-12-09 11:46:58,489 - INFO - 59    User_059   29    Guangzhou       3959.38   
2025-12-09 11:46:58,489 - INFO - 60    User_060   57    Shenzhen        2794.60   
2025-12-09 11:46:58,489 - INFO - 61    User_061   44    Guangzhou       1043.38   
2025-12-09 11:46:58,489 - INFO - 62    User_062   44    Beijing         1445.02   
2025-12-09 11:46:58,489 - INFO - 63    User_063   34    Chengdu         2018.03   
2025-12-09 11:46:58,489 - INFO - 64    User_064   30    Beijing         1325.72   
2025-12-09 11:46:58,489 - INFO - 65    User_065   60    Shenzhen        2405.65   
2025-12-09 11:46:58,489 - INFO - 66    User_066   24    Shanghai        1521.32   
2025-12-09 11:46:58,489 - INFO - 67    User_067   47    Beijing         3320.86   
2025-12-09 11:46:58,489 - INFO - 68    User_068   39    Shenzhen        2205.58   
2025-12-09 11:46:58,490 - INFO - 69    User_069   43    Beijing         4372.42   
2025-12-09 11:46:58,490 - INFO - 70    User_070   48    Guangzhou       1719.70   
2025-12-09 11:46:58,490 - INFO - 71    User_071   55    Updated_City    9999.99   
2025-12-09 11:46:58,490 - INFO - 72    User_072   58    Shanghai        4531.46   
2025-12-09 11:46:58,490 - INFO - 73    User_073   47    Shanghai        1505.61   
2025-12-09 11:46:58,490 - INFO - 74    User_074   30    Beijing         1342.90   
2025-12-09 11:46:58,490 - INFO - 75    User_075   31    Shanghai        1321.63   
2025-12-09 11:46:58,490 - INFO - 76    User_076   60    Chengdu         2761.57   
2025-12-09 11:46:58,490 - INFO - 77    User_077   33    Shanghai        3493.49   
2025-12-09 11:46:58,490 - INFO - 78    User_078   37    Shanghai        254.65    
2025-12-09 11:46:58,490 - INFO - 79    User_079   29    Beijing         2223.87   
2025-12-09 11:46:58,491 - INFO - 80    User_080   46    Chengdu         4061.74   
2025-12-09 11:46:58,491 - INFO - 81    User_081   26    Shanghai        1116.46   
2025-12-09 11:46:58,491 - INFO - 82    User_082   22    Guangzhou       3541.36   
2025-12-09 11:46:58,491 - INFO - 83    User_083   43    Beijing         1379.45   
2025-12-09 11:46:58,491 - INFO - 84    User_084   47    Beijing         4405.15   
2025-12-09 11:46:58,491 - INFO - 85    User_085   22    Guangzhou       328.10    
2025-12-09 11:46:58,491 - INFO - 86    User_086   37    Shanghai        1957.66   
2025-12-09 11:46:58,491 - INFO - 87    User_087   44    Shenzhen        4303.43   
2025-12-09 11:46:58,491 - INFO - 88    User_088   24    Shenzhen        2125.09   
2025-12-09 11:46:58,491 - INFO - 89    User_089   57    Shenzhen        3759.54   
2025-12-09 11:46:58,491 - INFO - 90    User_090   45    Guangzhou       4788.86   
2025-12-09 11:46:58,492 - INFO - 91    User_091   46    Beijing         683.87    
2025-12-09 11:46:58,492 - INFO - 92    User_092   56    Chengdu         1321.36   
2025-12-09 11:46:58,492 - INFO - 93    User_093   36    Shanghai        4819.07   
2025-12-09 11:46:58,492 - INFO - 94    User_094   28    Beijing         4583.63   
2025-12-09 11:46:58,492 - INFO - 95    User_095   29    Shenzhen        2064.74   
2025-12-09 11:46:58,492 - INFO - 96    User_096   52    Beijing         3397.31   
2025-12-09 11:46:58,492 - INFO - 97    User_097   33    Shanghai        3227.49   
2025-12-09 11:46:58,492 - INFO - 98    User_098   27    Beijing         1809.42   
2025-12-09 11:46:58,492 - INFO - 99    User_099   41    Chengdu         2890.17   
2025-12-09 11:46:58,492 - INFO - 100   User_100   41    Chengdu         3228.25   
2025-12-09 11:46:58,493 - INFO - --------------------------------------------------
2025-12-09 11:46:58,493 - INFO - 执行完成: [query_final_result] | 耗时: 0.0348 秒
2025-12-09 11:46:58,493 - INFO - >>> 4. 测试全部通过!Doris 4.0.1 运行正常。
2025-12-09 11:46:58,493 - INFO - 数据库连接已关闭
2025-12-09 11:46:58,512 - INFO - SSH 隧道已关闭

去nd14执行下述语句进行查看:

mysql -h 10.x.xx.157 -P 9030 -uroot
show databases;
use doris4_test_db;
show tables;

select * from user_profile_v4;

8.1 插入数据

这里展示部分数据,可以看出插入了100条测试数据:

8.2 更新数据

控制台打印结果:

终端查看结果:

8.3 删除数据

控制台打印结果:

终端验证结果:

9. Paimon外表数据写入Doris内表基础测试

9.1 Paimon数据准备

对于Paimon外表数据写入Doris内表基础测试,需要提前在Flink SQL会话里面创建Paimon表,并插入测试数据

-- 1. 创建 Flink 端的 Paimon Catalog
CREATE CATALOG paimon_catalog WITH (
  'type'        = 'paimon',
  'warehouse'   = 'hdfs:///user/hive/warehouse',
  'metastore'   = 'hive',
  'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);

-- 2. 切换 Catalog 和 Database
USE CATALOG my_paimon;
CREATE DATABASE IF NOT EXISTS ods;
USE ods;

-- 3. 创建 Paimon 表 (源表)
-- 这是一个记录用户行为的日志表
CREATE TABLE IF NOT EXISTS paimon_source_event (
    user_id INT,
    item_id INT,
    behavior STRING,
    dt STRING,
    ts TIMESTAMP(3),
    PRIMARY KEY (dt, user_id, item_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
    'bucket' = '1',
    'file.format' = 'parquet'
);

-- 4. 写入测试数据 (Batch 模式写入)
INSERT INTO paimon_source_event VALUES
(1001, 501, 'click', '2025-12-12', TIMESTAMP '2025-12-12 10:00:00.123'),
(1002, 502, 'view',  '2025-12-12', TIMESTAMP '2025-12-12 10:05:00.456'),
(1003, 501, 'buy',   '2025-12-12', TIMESTAMP '2025-12-12 10:10:00.789'),
(1001, 503, 'view',  '2025-12-13', TIMESTAMP '2025-12-13 11:00:00.000'),
(1004, 501, 'click', '2025-12-13', TIMESTAMP '2025-12-13 11:05:00.000');

9.2 测试代码

将下述测试代码保存为doris4_paimon_tel_test.py【python解释器:3.8.20、windows系统:11】

# -*- coding: utf-8 -*-
import pymysql
import time
import logging
import functools
from sshtunnel import SSHTunnelForwarder

# ================= 配置信息 =================

# 1. SSH 连接信息 (使用 Doris 4.0 所在的节点 IP)
# 参考 doris4_test.py 的配置
SSH_HOST = '10.x.xx.157'       # Doris 4.0 FE Master
SSH_PORT = 22
SSH_USER = 'xxxxxx'           
SSH_PASSWORD = 'xxxxxxxxxxxxxxxxx' 

# 2. Doris 数据库连接信息
DORIS_LOCAL_HOST = '127.0.0.1' 
DORIS_QUERY_PORT = 9030        
DORIS_DB_USER = 'root'
DORIS_DB_PWD = ''              

# 3. Paimon Catalog 配置 
# 【重要提示】: 请确保 Doris 4.0 的节点上,该路径下也有 core-site.xml/hdfs-site.xml
CATALOG_PROPS = {
    "type": "paimon",
    "paimon.catalog.type": "hms",
    "hive.metastore.uris": "thrift://nd1:9083,thrift://nd3:9083",  # HMS 地址保持不变
    "hive.version": "2.1.1",                     # 保持兼容性配置
    "warehouse": "hdfs://nd1:8020/user/hive/warehouse",
    "hadoop.conf.dir": "/home/bigdata/doris/conf/cdh_conf/", 
    "hadoop.username": "hdfs"
}

# 4. 业务配置
PAIMON_CATALOG_NAME = 'paimon_catalog' 
PAIMON_DB = 'ods'
PAIMON_TABLE = 'paimon_source_event'

# 使用新的数据库名区分测试
DORIS_TEST_DB = 'doris4_paimon_test_db'
DORIS_TARGET_TABLE = 'doris4_target_event_sink'

LOG_FILE = 'doris4_paimon_etl_report.log'

# ================= 日志与工具模块 =================
def setup_logger():
    logger = logging.getLogger("Doris4PaimonTester")
    logger.setLevel(logging.INFO)
    if logger.hasHandlers():
        logger.handlers.clear()
    
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler = logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8')
    file_handler.setFormatter(formatter)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

logger = setup_logger()

def measure_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        logger.info(f"正在执行: [{func.__name__}] ...")
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            logger.info(f"执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}")
            raise e
    return wrapper

# ================= 核心测试逻辑 =================

@measure_time
def init_doris_catalog(cursor):
    """在 Doris 4.0 中初始化 Paimon Catalog"""
    logger.info(f"正在初始化 Doris Catalog: {PAIMON_CATALOG_NAME} ...")
    
    # 删除旧的 Catalog (如果存在)
    cursor.execute(f"DROP CATALOG IF EXISTS {PAIMON_CATALOG_NAME}")
    
    # 拼接创建语句
    props_str = ",\n".join([f'"{k}" = "{v}"' for k, v in CATALOG_PROPS.items()])
    create_sql = f"""
    CREATE CATALOG {PAIMON_CATALOG_NAME} PROPERTIES (
        {props_str}
    );
    """
    logger.info("发送 Create Catalog 请求...")
    cursor.execute(create_sql)
    logger.info("Catalog 创建成功!")
    time.sleep(2) # 稍微等待元数据同步

@measure_time
def check_paimon_source(cursor):
    """验证 Doris 是否能通过 Catalog 读取 Paimon 数据"""
    logger.info(f"检查 Paimon 数据源: {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}")
    
    # 1. 切换到 Paimon Catalog
    cursor.execute(f"SWITCH {PAIMON_CATALOG_NAME}")
    
    # 2. 检查表是否存在
    cursor.execute(f"USE {PAIMON_DB}")
    cursor.execute("SHOW TABLES")
    # Doris 不同版本 fetchall 返回结构可能微调,这里做通用处理
    tables = [list(row.values())[0] for row in cursor.fetchall()] 
    
    if PAIMON_TABLE not in tables:
        logger.warning(f"当前 Catalog 下的表: {tables}")
        raise Exception(f"Paimon 表 {PAIMON_TABLE} 未找到!")
    
    # 3. 预览数据
    sql = f"SELECT * FROM {PAIMON_TABLE} ORDER BY dt, user_id LIMIT 5"
    cursor.execute(sql)
    results = cursor.fetchall()
    logger.info(f"Paimon 数据预览 (前5条):")
    for row in results:
        logger.info(row)
    
    if not results:
        raise Exception("Paimon 表为空,请先在 Flink 端写入数据!")
    
    return len(results)

@measure_time
def create_doris_target_table(cursor):
    """创建 Doris 4.0 内部表 (Unique Key MoW 模型)"""
    cursor.execute("SWITCH internal")
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DORIS_TEST_DB}")
    cursor.execute(f"USE {DORIS_TEST_DB}")
    
    # 注意:Unique Key 模型要求 Key 字段必须排在 Value 字段前面
    # Key: user_id, item_id, dt
    # Value: behavior, ts
    create_sql = f"""
    CREATE TABLE IF NOT EXISTS {DORIS_TARGET_TABLE} (
        user_id INT COMMENT "用户ID",
        item_id INT COMMENT "商品ID",
        dt VARCHAR(20) COMMENT "日期分区",
        behavior VARCHAR(50) COMMENT "行为类型",
        ts DATETIME(3) COMMENT "时间戳"
    )
    UNIQUE KEY(user_id, item_id, dt)
    PARTITION BY LIST(dt) (
        PARTITION p20251212 VALUES IN ("2025-12-12"),
        PARTITION p20251213 VALUES IN ("2025-12-13")
    )
    DISTRIBUTED BY HASH(user_id) BUCKETS 1
    PROPERTIES (
        "replication_num" = "1",
        "enable_unique_key_merge_on_write" = "true",
        "store_row_column" = "true" 
    );
    """
    # "store_row_column" = "true" 是 4.0 的特性,用于优化点查
    
    cursor.execute(create_sql)
    cursor.execute(f"TRUNCATE TABLE {DORIS_TARGET_TABLE}")
    logger.info(f"Doris 4.0 内表 {DORIS_TARGET_TABLE} 已准备就绪")

@measure_time
def execute_etl_paimon_to_doris(cursor):
    """执行 INSERT INTO ... SELECT ..."""
    logger.info(">>> 开始执行从 Paimon 到 Doris 4.0 的数据导入 (ETL) <<<")
    
    # 显式指定字段顺序,防止 select * 顺序不一致
    etl_sql = f"""
    INSERT INTO internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE}
    (user_id, item_id, dt, behavior, ts)
    SELECT user_id, item_id, dt, behavior, ts 
    FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}
    """
    
    cursor.execute(etl_sql)
    logger.info("ETL SQL 提交完毕")

@measure_time
def verify_data_consistency(cursor):
    """验证数据一致性"""
    logger.info(">>> 开始数据一致性校验 <<<")
    
    # 1. Paimon 源数据量
    cursor.execute(f"SELECT count(*) as cnt FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}")
    paimon_count = cursor.fetchone()['cnt']
    
    # 2. Doris 目标数据量
    cursor.execute(f"SELECT count(*) as cnt FROM internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE}")
    doris_count = cursor.fetchone()['cnt']
    
    logger.info(f"Paimon 源表行数: {paimon_count}")
    logger.info(f"Doris 目标表行数: {doris_count}")
    
    if paimon_count == doris_count:
        logger.info("✅ 测试通过:数据条数一致!")
    else:
        logger.error("❌ 测试失败:数据条数不一致!")
        
    # 3. 抽样展示
    cursor.execute(f"SELECT * FROM internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE} ORDER BY dt, user_id LIMIT 3")
    rows = cursor.fetchall()
    logger.info("Doris 内表数据抽样:")
    for row in rows:
        logger.info(row)

# ================= 主流程 =================

def main_process():
    server = None
    conn = None
    try:
        logger.info(">>> 1. 正在建立 SSH 隧道 (连接至 Doris 4.0) ...")
        # 建立隧道:本地 -> SSH(10.8.15.157) -> Doris FE(127.0.0.1:9030)
        server = SSHTunnelForwarder(
            (SSH_HOST, SSH_PORT),
            ssh_username=SSH_USER,
            ssh_password=SSH_PASSWORD,
            remote_bind_address=(DORIS_LOCAL_HOST, DORIS_QUERY_PORT) 
        )
        server.start()
        logger.info(f">>> SSH 隧道建立成功! 本地端口: {server.local_bind_port}")

        logger.info(">>> 2. 连接 Doris 数据库 ...")
        conn = pymysql.connect(
            host='127.0.0.1',
            port=server.local_bind_port,
            user=DORIS_DB_USER,
            password=DORIS_DB_PWD,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor,
            autocommit=True
        )
        cursor = conn.cursor()
        
        # --- 测试步骤 ---
        
        # 步骤 1: 初始化 Catalog
        init_doris_catalog(cursor)

        # 步骤 2: 确认源端 Paimon 可读
        check_paimon_source(cursor)
        
        # 步骤 3: 准备 Doris 4.0 目标表
        create_doris_target_table(cursor)
        
        # 步骤 4: 执行导入
        execute_etl_paimon_to_doris(cursor)
        
        # Doris 4.0 导入通常非常快,但仍建议稍微sleep
        time.sleep(2)
        
        # 步骤 5: 校验结果
        verify_data_consistency(cursor)
        
        logger.info(">>> Doris 4.0 与 Paimon 集成测试全部完成 <<<")

    except Exception as e:
        logger.error(f"主流程发生错误: {e}")
        import traceback
        logger.error(traceback.format_exc())
    finally:
        if conn:
            conn.close()
            logger.info("数据库连接已关闭")
        if server:
            server.stop()
            logger.info("SSH 隧道已关闭")

if __name__ == "__main__":
    main_process()

对应的log文件内容如下:

2025-12-12 15:55:44,709 - INFO - >>> 1. 正在建立 SSH 隧道 (连接至 Doris 4.0) ...
2025-12-12 15:55:45,144 - INFO - >>> SSH 隧道建立成功! 本地端口: 60651
2025-12-12 15:55:45,144 - INFO - >>> 2. 连接 Doris 数据库 ...
2025-12-12 15:55:45,353 - INFO - 正在执行: [init_doris_catalog] ...
2025-12-12 15:55:45,354 - INFO - 正在初始化 Doris Catalog: paimon_catalog ...
2025-12-12 15:55:45,359 - INFO - 发送 Create Catalog 请求...
2025-12-12 15:55:45,437 - INFO - Catalog 创建成功!
2025-12-12 15:55:47,438 - INFO - 执行完成: [init_doris_catalog] | 耗时: 2.0848 秒
2025-12-12 15:55:47,444 - INFO - 正在执行: [check_paimon_source] ...
2025-12-12 15:55:47,445 - INFO - 检查 Paimon 数据源: paimon_catalog.ods.paimon_source_event
2025-12-12 15:55:48,296 - INFO - Paimon 数据预览 (前5条):
2025-12-12 15:55:48,296 - INFO - {'user_id': 1001, 'item_id': 501, 'behavior': 'click', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)}
2025-12-12 15:55:48,296 - INFO - {'user_id': 1002, 'item_id': 502, 'behavior': 'view', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)}
2025-12-12 15:55:48,296 - INFO - {'user_id': 1003, 'item_id': 501, 'behavior': 'buy', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)}
2025-12-12 15:55:48,296 - INFO - {'user_id': 1001, 'item_id': 503, 'behavior': 'view', 'dt': '2025-12-13', 'ts': datetime.datetime(2025, 12, 13, 11, 0)}
2025-12-12 15:55:48,297 - INFO - {'user_id': 1004, 'item_id': 501, 'behavior': 'click', 'dt': '2025-12-13', 'ts': datetime.datetime(2025, 12, 13, 11, 5)}
2025-12-12 15:55:48,297 - INFO - 执行完成: [check_paimon_source] | 耗时: 0.8523 秒
2025-12-12 15:55:48,297 - INFO - 正在执行: [create_doris_target_table] ...
2025-12-12 15:55:48,345 - INFO - Doris 4.0 内表 doris4_target_event_sink 已准备就绪
2025-12-12 15:55:48,345 - INFO - 执行完成: [create_doris_target_table] | 耗时: 0.0479 秒
2025-12-12 15:55:48,345 - INFO - 正在执行: [execute_etl_paimon_to_doris] ...
2025-12-12 15:55:48,345 - INFO - >>> 开始执行从 Paimon 到 Doris 4.0 的数据导入 (ETL) <<<
2025-12-12 15:55:48,515 - INFO - ETL SQL 提交完毕
2025-12-12 15:55:48,516 - INFO - 执行完成: [execute_etl_paimon_to_doris] | 耗时: 0.1707 秒
2025-12-12 15:55:50,517 - INFO - 正在执行: [verify_data_consistency] ...
2025-12-12 15:55:50,517 - INFO - >>> 开始数据一致性校验 <<<
2025-12-12 15:55:51,382 - INFO - Paimon 源表行数: 5
2025-12-12 15:55:51,384 - INFO - Doris 目标表行数: 5
2025-12-12 15:55:51,385 - INFO - ✅ 测试通过:数据条数一致!
2025-12-12 15:55:51,438 - INFO - Doris 内表数据抽样:
2025-12-12 15:55:51,438 - INFO - {'user_id': 1001, 'item_id': 501, 'dt': '2025-12-12', 'behavior': 'click', 'ts': datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)}
2025-12-12 15:55:51,438 - INFO - {'user_id': 1002, 'item_id': 502, 'dt': '2025-12-12', 'behavior': 'view', 'ts': datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)}
2025-12-12 15:55:51,439 - INFO - {'user_id': 1003, 'item_id': 501, 'dt': '2025-12-12', 'behavior': 'buy', 'ts': datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)}
2025-12-12 15:55:51,439 - INFO - 执行完成: [verify_data_consistency] | 耗时: 0.9222 秒
2025-12-12 15:55:51,439 - INFO - >>> Doris 4.0 与 Paimon 集成测试全部完成 <<<
2025-12-12 15:55:51,440 - INFO - 数据库连接已关闭
2025-12-12 15:55:51,444 - INFO - SSH 隧道已关闭

9.3 验证数据

去Doris终端验证数据结果如下:

mysql -h 10.x.xx.157 -P 9030 -uroot

执行下述sql

SWITCH internal;
SHOW DATABASES;
USE doris4_paimon_test_db;
SHOW TABLES;

SELECT * FROM doris4_target_event_sink ORDER BY user_id;

也可以在同一个查询窗口中直接对比两边的数量(不需要反复 SWITCH):

-- 这里的 internal 和 paimon_catalog 是 Catalog 名称
SELECT 
    (SELECT count(*) FROM internal.doris4_paimon_test_db.doris4_target_event_sink) as doris_count,
    (SELECT count(*) FROM paimon_catalog.ods.paimon_source_event) as paimon_count;

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐