目录

1. 项目背景与环境

1.1 节点角色分配

2. 核心规划 (关键避坑)

2.1 端口规划表

2.2 目录规划

3. 部署前置准备 (所有节点 nd4, nd5, nd6)

3.1 系统配置 (需 sudo 权限)

3.2 检查 CPU AVX2 支持

3.3 安装 JDK 17 (必须)

3.4 准备程序包与目录

4. Frontend (FE) 部署

4.1 修改 FE 配置

4.2 启动 Leader 节点 (nd4)

4.3 启动 Follower 节点 (nd5, nd6)

4.4 注册节点 (构建 HA)

5. Backend (BE) 部署

5.1 修改 BE 配置

5.2 启动 BE

​编辑

5.3 注册 BE 节点

5.4 访问 WebUI

6. Hadoop & Paimon 集成

6.1 分发 Hadoop 配置文件

6.2 创建 Paimon Catalog

6.3 验证查询

6.3.1 切换 Catalog 与 数据库

6.3.2 查看表列表

6.3.3 数据查询验证

6.4 三种方案的区别与特点

6.4.1 方案一:Hive Catalog (HA Mode / 高可用模式)

6.4.2 方案二:Filesystem Catalog (文件系统模式)

6.4.3 方案三:Hive Catalog (Single Node / 单点模式)

6.4.4 综合对比表

6.4.5 最终推荐

7. 故障排查总结 (Troubleshooting)

8.基础测试(增删改查)

8.1 插入数据

8.2 更新数据

8.3 删除数据

9. Paimon外表数据写入StarRocks内表基础测试

9.1 Paimon数据准备

9.2 测试代码

9.3 验证数据


1. 项目背景与技术演进 在早期的基准测试中,虽然 StarRocks 2.5.22 展现了优秀的向量化执行性能,但受限于当时的架构设计,其在 Paimon 数据湖的集成上存在明显短板(不支持原生 Paimon Catalog),导致元数据管理复杂且无法完全发挥湖仓分析的性能潜力。

随着本项目 Paimon + OLAP 架构的演进以及友商(Doris 4.0)的重大升级,为了确保技术选型的公平性与前瞻性,项目组决定部署 StarRocks 4.0.2。该版本是 StarRocks 迈向“极速统一湖仓”的里程碑式版本,它不仅原生支持了 Paimon Catalog,彻底解决了 2.x 时代的集成痛点,还通过 JDK 17 的引入及 AVX2 指令集的深度优化,试图在复杂查询与高并发场景下重塑性能标杆。

2. 测试目标 本次部署不仅仅是版本的迭代,更是一场关于“极致性能与现代湖仓架构”的深度验证。核心目标如下:

  • 原生湖仓集成验证(Native Integration):重点验证 StarRocks 4.0.2 原生 Paimon Catalog 的连通性与稳定性,评估其相比 2.5 版本“Hive 兼容模式”在元数据同步、Schema 自动发现及分区裁剪方面的体验提升。

  • 代际性能基准测试(Generational Benchmarking):在同等硬件资源下,对比 StarRocks 4.0.2 与 StarRocks 2.5 及 Doris 4.0 在海量数据扫描、多表关联分析(Join)场景下的性能差异,量化 Pipeline 并行执行引擎JDK 17 带来的吞吐收益。

  • 复杂场景适应性:验证新版本在处理 Primary Key 模型更新、存算分离(可选)及资源隔离方面的表现,确认其是否能在保持极速查询优势的同时,弥补大字段处理等历史短板。

3.补充说明

StarRocks 2.x不支持对Paimon的操作,高版本的StarRocks对于数据也不支持写回Paimon。官网描述网址:Paimon catalog | StarRocks

调研架构图如下:

1. 项目背景与环境

本次部署旨在现有的 CDH 6.3.2 集群上,混合部署 StarRocks 4.0.2 版本,用于 Paimon 数据湖分析。

  • 操作系统: CentOS 7 (CDH 6.3.2 环境)

  • 部署用户: bigdata

  • StarRocks 版本: 4.0.2

  • JDK 要求: JDK 17 (StarRocks 3.x/4.x 强制要求 JDK 17+)

  • 原有 CDH 节点: nd1-nd6, nd11-nd16

  • 本次部署目标节点: nd4, nd5, nd6

1.1 节点角色分配

前置组件分配

IP 主机名 角色 版本
10.x.xx.201-10.x.xx.205 10.x.xx.215 10.x.xx.149 10.x.xx.151 10.x.xx.156 10.x.xx.157 10.x.xx.167 10.x.xx.206 nd1-nd5 nd6 nd11 nd12 nd13 nd14 nd15 nd16 CDH 6.3.2
10.x.xx.201-10.x.xx.205 nd1-nd5 Paimon 1.1.1

采用 FE (Frontend) + BE (Backend) 混合部署 模式,共 3 个节点。

IP 主机名 角色 部署组件 说明
10.x.xx.204 nd4 FE (Leader) + BE StarRocks 4.0.2 初始 Leader 节点
10.x.xx.205 nd5 FE (Follower) + BE StarRocks 4.0.2 高可用节点
10.x.xx.215 nd6 FE (Follower) + BE StarRocks 4.0.2 高可用节点

2. 核心规划 (关键避坑)

由于 CDH 的 YARN 占用 8040,Zookeeper 占用 9010,必须对 StarRocks 默认端口进行修改。

2.1 端口规划表

组件 配置文件参数 默认端口 规划端口 修改原因
FE http_port 8030 8030 无冲突,保持默认
FE rpc_port 9020 9020 无冲突,保持默认
FE query_port 9030 9030 MySQL 连接端口
FE edit_log_port 9010 19010 避开 CDH Zookeeper JMX
BE be_port 9060 9060 Thrift Server
BE webserver_port 8040 18040 避开 CDH YARN NodeManager
BE heartbeat_service_port 9050 9050 心跳服务
BE brpc_port 8060 8060 数据传输

2.2 目录规划

  • 安装根目录: /home/bigdata/starrocks

  • 软件目录: /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64

  • JDK 17 目录: /home/bigdata/starrocks/jdk-17.0.2

  • FE 元数据: /home/bigdata/starrocks/data/meta

  • BE 数据存储: /home/bigdata/starrocks/data/storage

3. 部署前置准备 (所有节点 nd4, nd5, nd6)

3.1 系统配置 (需 sudo 权限)

# 1. 关闭 Swap
sudo swapoff -a
# (可选) 永久关闭: sudo sed -i '/swap/s/^/#/' /etc/fstab
​
# 2. 修改文件句柄限制
sudo vi /etc/security/limits.conf
# 添加或修改以下内容:
* soft nofile 65535
* hard nofile 65535
* soft nproc 65535
* hard nproc 65535

3.2 检查 CPU AVX2 支持

StarRocks 4.x 默认开启 AVX2 指令集优化。

cat /proc/cpuinfo | grep avx2

  • 有输出: 正常。

  • 无输出: 需在 be.conf 中添加 enable_avx2 = false

3.3 安装 JDK 17 (必须)

StarRocks 4.0.2 不支持 JDK 8 或 11,必须使用 JDK 17。

# 创建目录
mkdir -p /home/bigdata/starrocks
cd /home/bigdata/starrocks
​
# 下载 JDK 17 (如果您已通过 wget 下载)
# wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz
​
# 解压
tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz -C /home/bigdata/starrocks/
# 验证路径
ls -ld /home/bigdata/starrocks/jdk-17.0.2

3.4 准备程序包与目录

# 1. 解压 StarRocks 包
tar -zxvf StarRocks-4.0.2-centos-amd64.tar.gz -C /home/bigdata/starrocks/
​
# 2. 创建数据目录
mkdir -p /home/bigdata/starrocks/data/meta
mkdir -p /home/bigdata/starrocks/data/storage

4. Frontend (FE) 部署

操作节点: nd4, nd5, nd6

4.1 修改 FE 配置

编辑 /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/fe.conf

vi /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/fe.conf

修改或者添加如下内容:

# -----------------------------------
# 1. 核心目录与 JDK (必须修改)
# -----------------------------------
meta_dir = /home/bigdata/starrocks/data/meta
​
# 指向独立的 JDK 17 路径
JAVA_HOME = /home/bigdata/starrocks/jdk-17.0.2
​
# -----------------------------------
# 2. 端口修改 (解决 ZK 冲突)
# -----------------------------------
http_port = 8030
rpc_port = 9020
query_port = 9030
# 修改 edit_log_port 为 19010,避开 CDH Zookeeper
edit_log_port = 19010
​
# -----------------------------------
# 3. 网络绑定
# -----------------------------------
# 根据您的 IP 段 (10.8.16.x) 配置
priority_networks = 10.8.16.0/24

4.2 启动 Leader 节点 (nd4)

仅在 nd4 (10.x.xx.204) 上执行

cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/bin
./start_fe.sh --daemon
  • 检查: tail -f ../log/fe.log,看到 thrift server started 表示成功。

4.3 启动 Follower 节点 (nd5, nd6)

注意: 因为 edit_log_port 改为了 19010--helper 参数必须对应。

在 nd5 (10.x.xx.205) 执行:

cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/bin
./start_fe.sh --helper 10.x.xx.204:19010 --daemon

在 nd6 (10.x.xx.215) 执行:

cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/bin
./start_fe.sh --helper 10.x.xx.204:19010 --daemon

4.4 注册节点 (构建 HA)

回到 nd4,连接 MySQL 并添加节点:

mysql -h 10.x.xx.204 -P 9030 -u root
-- 添加 nd5 (注意端口是 19010)
ALTER SYSTEM ADD FOLLOWER "10.x.xx.205:19010";

-- 添加 nd6 (注意端口是 19010)
ALTER SYSTEM ADD FOLLOWER "10.x.xx.215:19010";

-- 验证
SHOW PROC '/frontends';

  • 应显示 3 个节点,状态均为 Alive: true

5. Backend (BE) 部署

操作节点: nd4, nd5, nd6

5.1 修改 BE 配置

编辑 /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/be.conf

vi /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/be.conf

修改或者增加下述内容:

# -----------------------------------
# 1. JDK 配置 (必须 JDK 17)
# -----------------------------------
JAVA_HOME = /home/bigdata/starrocks/jdk-17.0.2

# -----------------------------------
# 2. 端口修改 (解决 YARN 冲突)
# -----------------------------------
be_port = 9060
# 修改 web_server_port 为 18040,避开 YARN NodeManager
web_server_port = 18040
heartbeat_service_port = 9050
brpc_port = 8060

# -----------------------------------
# 3. 存储与网络
# -----------------------------------
storage_root_path = /home/bigdata/starrocks/data/storage
priority_networks = 10.8.16.0/24

5.2 启动 BE

在三个节点上分别执行:

cd /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/bin
./start_be.sh --daemon
  • 检查: netstat -tpln | grep 18040 确认端口监听成功。

5.3 注册 BE 节点

回到 nd4 的 MySQL 客户端执行:

-- 注意端口是 heartbeat_service_port (默认 9050)
ALTER SYSTEM ADD BACKEND "10.x.xx.204:9050";
ALTER SYSTEM ADD BACKEND "10.x.xx.205:9050";
ALTER SYSTEM ADD BACKEND "10.x.xx.215:9050";

-- 验证
SHOW PROC '/backends';

  • 应显示 3 个节点,Alive: true

5.4 访问 WebUI

打开浏览器访问 FE 的 WebUI:

  • 地址: http://10.8.16.204:8030

  • 用户: root

  • 密码: (空)

FE节点状态:

BE节点状态:

6. Hadoop & Paimon 集成

StarRocks 4.0 已经原生支持 Paimon Catalog,无需像 2.5 版本那样纠结。

6.1 分发 Hadoop 配置文件

为了让 StarRocks 能够访问 CDH HDFS,需要复制配置文件。

# 在 nd4, nd5, nd6 上分别执行
# FE
scp root@10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/
scp root@10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/
scp root@10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/fe/conf/

# BE
scp root@10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/
scp root@10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/
scp root@10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/starrocks/StarRocks-4.0.2-centos-amd64/be/conf/

注意: 复制完成后,建议重启所有 FE 和 BE

# 启动 FE:
sh fe/bin/start_fe.sh --daemon
# 停止 FE:
sh fe/bin/stop_fe.sh
# 启动 BE:
sh be/bin/start_be.sh --daemon
#停止 BE:
sh be/bin/stop_be.sh

6.2 创建 Paimon Catalog

在 MySQL 客户端执行:

DROP CATALOG IF EXISTS paimon_catalog;

CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES (
    -- 1. StarRocks 自身的类型
    "type" = "paimon",
    
    -- 2. 【核心修改】Paimon SDK 识别的 HMS 类型是 "hive",不是 "hms"
    "paimon.catalog.type" = "hive",
    
    -- 3. HMS 地址
    "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083",
    
    -- 4. 指定数仓路径
    "paimon.catalog.warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    
    -- 5. 认证方式
    "hadoop.security.authentication" = "simple"
);

6.3 验证查询

6.3.1 切换 Catalog 与 数据库

-- 1. 切换到该 Catalog
SET CATALOG paimon_catalog;

-- 2. 查看数据库
SHOW DATABASES;

6.3.2 查看表列表

-- 3. 进入某个库 (这里为ods)
USE ods; 

-- 4. 查看表
SHOW TABLES;

6.3.3 数据查询验证

-- 5. 查询数据
SELECT * FROM t_admin_division_code LIMIT 5;

6.4 三种方案的区别与特点

6.4.1 方案一:Hive Catalog (HA Mode / 高可用模式)

DROP CATALOG IF EXISTS paimon_catalog;

CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES (
    -- 1. StarRocks 自身的类型
    "type" = "paimon",
    -- 2. 【核心修改】Paimon SDK 识别的 HMS 类型是 "hive",不是 "hms"
    "paimon.catalog.type" = "hive",
    -- 3. HMS 地址
    "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083",
    -- 4. 指定数仓路径
    "paimon.catalog.warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    -- 5. 认证方式
    "hadoop.security.authentication" = "simple"
);

USE ods; 
SELECT * FROM t_admin_division_code LIMIT 5;

  • 配置特征: "paimon.catalog.type" = "hive", 且 hive.metastore.uris 配置了 多个地址 (thrift://nd1:9083,thrift://nd3:9083)。

  • 原理: 同方案三,但客户端(StarRocks)支持故障转移。

  • 特点:

    • 高可用 (HA): 如果 nd1 挂了,StarRocks 会自动尝试连接 nd3。这与 CDH 集群本身的高可用架构是匹配的。

    • 生产标准: 这是企业级生产环境最稳健的配置。

6.4.2 方案二:Filesystem Catalog (文件系统模式)

DROP CATALOG IF EXISTS paimon_catalog;

CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    "paimon.catalog.warehouse" = "hdfs://nd1:8020/user/hive/warehouse", 
    "hadoop.security.authentication" = "simple" 
);

USE ods; 
SELECT * FROM t_admin_division_code LIMIT 5;

  • 配置特征: "paimon.catalog.type" = "filesystem"

  • 原理: StarRocks 不经过 Hive Metastore,而是直接去扫描 HDFS 目录结构来获取数据库和表的元数据(Schema、分区信息等)。

  • 特点:

    • 去中心化: 完全不依赖 Hive 服务。即使 CDH 的 Hive Metastore 挂了,只要 HDFS 活着,StarRocks 就能查 Paimon。

    • 局限性: 它是“孤独”的。如果您的 Flink 任务是用 Hive Catalog 写入的,而 StarRocks 用 Filesystem Catalog 读取,一旦路径配置稍有偏差,两边看到的数据可能不一致。且无法利用 Hive 的权限控制(Ranger/Sentry)。

    • 性能: 对于超多表或海量文件的场景,频繁 listing HDFS 目录可能比直接查 Hive Metastore 慢。

6.4.3 方案三:Hive Catalog (Single Node / 单点模式)

DROP CATALOG IF EXISTS paimon_catalog;

CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES (
    -- 1. StarRocks 自身的类型
    "type" = "paimon",
    -- 2. 【核心修改】Paimon SDK 识别的 HMS 类型是 "hive",不是 "hms"
    "paimon.catalog.type" = "hive",
    -- 3. HMS 地址
    "hive.metastore.uris" = "thrift://nd1:9083",
    -- 4. 指定数仓路径
    "paimon.catalog.warehouse" = "hdfs://nd1:8020/user/hive/warehouse",
    -- 5. 认证方式
    "hadoop.security.authentication" = "simple"
);

USE ods; 
SELECT * FROM t_admin_division_code LIMIT 5;

  • 配置特征: "paimon.catalog.type" = "hive", 且 hive.metastore.uris 只有 一个地址 (thrift://nd1:9083)。

  • 原理: StarRocks 通过 Thrift 协议连接 Hive Metastore 获取元数据,然后去 HDFS 读取数据。

  • 特点:

    • 统一元数据: 实现了 Flink(写入端)和 StarRocks(读取端)的元数据统一,是生产环境的标准做法。

    • 单点故障风险: 这是最大的缺点。如果 nd1 节点的 Hive Metastore 服务宕机或进行维护,StarRocks 将无法查询任何 Paimon 表,整个分析链路中断。

6.4.4 综合对比表

维度 方案一 (Hive HA) 方案二 (Filesystem) 方案三 (Hive 单点)
元数据存储 Hive Metastore HDFS 文件系统 Hive Metastore
依赖组件 HDFS + Hive MS 仅 HDFS HDFS + Hive MS
与其他引擎互通 极好 差 (需手动对齐路径) 好 (共享 HMS)
抗风险能力 高 (自动故障转移) 中 (依赖 HDFS) 低 (存在单点故障)
适用场景 生产环境 测试、Hive 服务不可用时 临时测试

6.4.5 最终推荐

强烈推荐使用:【方案一】

推荐理由:

  1. 架构统一: Paimon 的最佳实践是使用 Hive Catalog,这样 Flink 实时写入、Spark 离线修正、StarRocks 实时分析都能看到同一份元数据,避免了“分裂”。

  2. 高可用性: 您的 CDH 环境既然部署了两个 Metastore (nd1, nd3),就应该充分利用。配置多个 URI 可以确保在一个节点宕机时,StarRocks 的查询业务不受影响。

  3. 未来扩展: 如果未来需要做权限管控(如 Ranger),基于 Hive Catalog 的链路更容易集成。

7. 故障排查总结 (Troubleshooting)

在部署过程中,我们解决了以下核心问题,后续运维请注意:

  1. JDK 版本报错:

    • 现象: Error: JDK 11 is not supported...

    • 解决: StarRocks 4.0.2 强制要求 JDK 17。必须在 fe.confbe.conf 中显式配置 JAVA_HOME = .../jdk-17.0.2

  2. FE 启动失败 / 端口冲突:

    • 现象: edit_log_port 9010 is already in use

    • 原因: CDH 的 Zookeeper (zookeep) 占用了 9010。

    • 解决: 将 fe.conf 中的 edit_log_port 修改为 19010注意所有 FE 节点必须一致,且 helper 参数也要用新端口。

  3. YARN 端口冲突:

    • 原因: StarRocks BE 默认 webserver_port 为 8040,与 YARN NodeManager 冲突。

    • 解决: 在 be.conf 中将 webserver_port 修改为 18040

  4. 残留进程:

    • 现象: 修改配置后重启,端口仍被占用。

    • 解决: 使用 ps -ef | grep starrocks 查出 PID,使用 kill -9 彻底杀掉旧进程后再启动。

  5. 权限问题:

    • 现象: Permission denied

    • 解决: 确保 /home/bigdata/starrocks 及其子目录的归属权是 bigdata:bigdata。如果曾用 root 启动过,需执行 sudo chown -R bigdata:bigdata /home/bigdata/starrocks 修复。

8.基础测试(增删改查)

将下述测试代码保存为starrocks4_test.py【python解释器:3.8.20、windows系统:11】

# -*- coding: utf-8 -*-
import pymysql
import random
import time
import logging
import functools
from sshtunnel import SSHTunnelForwarder

# ================= 配置信息 (适配 StarRocks 4.0.2) =================

# 1. SSH 连接信息
# 修改为新集群的 FE Leader 节点 (nd4)
SSH_HOST = '10.x.xx.204'       
SSH_PORT = 22
SSH_USER = 'xxxxxx'
SSH_PASSWORD = "xxxxxxxxxxxxxxx"

# 2. StarRocks 数据库信息
SR_LOCAL_HOST = '127.0.0.1'    
# 【关键修改】StarRocks 4.0.2 部署文档中 query_port 保持了默认的 9030
SR_QUERY_PORT = 9030          
SR_DB_USER = 'root'
SR_DB_PWD = ''                 
DB_NAME = 'python_perf_test_sr4'    # 库名区分 4.0
TABLE_NAME = 'student_scores_perf_sr4'

LOG_FILE = 'starrocks4_test_report.log'

# ================= 日志与工具模块 (通用) =================
def setup_logger():
    logger = logging.getLogger("StarRocks4Tester")
    logger.setLevel(logging.INFO)
    if logger.hasHandlers():
        logger.handlers.clear()
    file_handler = logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8')
    console_handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

logger = setup_logger()

def measure_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        logger.info(f"正在执行: [{func.__name__}] ...")
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            logger.info(f"执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}")
            raise e
    return wrapper

# ================= 业务逻辑 =================

@measure_time
def init_db_and_table(cursor):
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}")
    cursor.execute(f"USE {DB_NAME}")
    
    # 【StarRocks 4.0 建表】
    # 依然使用 Primary Key 模型 (性能最优)
    create_sql = f"""
    CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
        id INT NOT NULL COMMENT "用户ID", 
        name VARCHAR(50) COMMENT "姓名",
        age INT COMMENT "年龄",
        score INT COMMENT "分数",
        update_time DATETIME COMMENT "更新时间"
    )
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id) BUCKETS 3
    PROPERTIES (
        -- 【修改】新集群有3个BE,设置副本数为3以测试高可用
        "replication_num" = "3",
        -- 4.0 默认开启持久化索引,显式写出来更清晰
        "enable_persistent_index" = "true" 
    );
    """
    
    cursor.execute(create_sql)
    # 清空表以便重复测试
    cursor.execute(f"TRUNCATE TABLE {TABLE_NAME}")
    logger.info(f"数据库 {DB_NAME} 和表 {TABLE_NAME} 已初始化 (Replication=3)")

@measure_time
def insert_data_batch(cursor, count=10):
    data = []
    for i in range(1, count + 1):
        name = f"User_{i:03d}"
        age = random.randint(18, 30)
        score = random.randint(50, 100)
        data.append((i, name, age, score))
    
    sql = f"INSERT INTO {TABLE_NAME} (id, name, age, score, update_time) VALUES (%s, %s, %s, %s, NOW())"
    cursor.executemany(sql, data)
    logger.info(f"成功插入 {count} 条数据")

@measure_time
def query_and_log(cursor, stage_name):
    # 简单全表查,验证数据一致性
    sql = f"SELECT * FROM {TABLE_NAME} ORDER BY id"
    cursor.execute(sql)
    results = cursor.fetchall()
    logger.info(f"--- [{stage_name}] 当前总行数: {len(results)} ---")
    if results:
        # 只打印前3条
        for row in results[:3]: logger.info(f"Row: {row}")

@measure_time
def update_random_data(cursor, update_count=3):
    cursor.execute(f"SELECT id FROM {TABLE_NAME}")
    all_ids = [row['id'] for row in cursor.fetchall()]
    if not all_ids: return
    target_ids = random.sample(all_ids, min(len(all_ids), update_count))
    for uid in target_ids:
        new_score = random.randint(95, 100)
        # Primary Key 模型 update 性能极快
        sql = f"UPDATE {TABLE_NAME} SET score = %s, update_time = NOW() WHERE id = %s"
        cursor.execute(sql, (new_score, uid))
        logger.info(f" -> 更新 ID={uid}, New Score={new_score}")

@measure_time
def delete_random_data(cursor, delete_count=2):
    cursor.execute(f"SELECT id FROM {TABLE_NAME}")
    all_ids = [row['id'] for row in cursor.fetchall()]
    if not all_ids: return
    target_ids = random.sample(all_ids, min(len(all_ids), delete_count))
    for uid in target_ids:
        sql = f"DELETE FROM {TABLE_NAME} WHERE id = %s"
        cursor.execute(sql, (uid,))
        logger.info(f" -> 删除 ID={uid}")

# ================= 主流程 =================

def main_process():
    server = None
    conn = None
    try:
        logger.info(">>> 1. 正在建立 SSH 隧道 (连接 nd4: 10.8.16.204) ...")
        # 建立 SSH 隧道
        server = SSHTunnelForwarder(
            (SSH_HOST, SSH_PORT),
            ssh_username=SSH_USER,
            ssh_password=SSH_PASSWORD,
            # 映射远程 9030 到本地随机端口
            remote_bind_address=(SR_LOCAL_HOST, SR_QUERY_PORT) 
        )
        server.start()
        logger.info(f">>> SSH 隧道建立成功! 映射 StarRocks 4.0 端口 {SR_QUERY_PORT} -> 本地 {server.local_bind_port}")

        # 2. 连接数据库
        logger.info(">>> 正在连接 StarRocks 4.0.2 ...")
        conn = pymysql.connect(
            host='127.0.0.1',             # 连接本机
            port=server.local_bind_port,  # 使用隧道端口
            user=SR_DB_USER,
            password=SR_DB_PWD,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor,
            autocommit=True
        )
        cursor = conn.cursor()
        
        # 3. 执行测试逻辑
        # 验证建表 (验证 FE 元数据管理)
        init_db_and_table(cursor)
        
        # 验证插入 (验证 BE 数据写入与副本同步)
        insert_data_batch(cursor, count=100)
        time.sleep(1) # StarRocks 4.0 也是准实时的,稍作等待
        query_and_log(cursor, "插入后")
        
        # 验证更新 (验证 Primary Key 索引机制)
        update_random_data(cursor, update_count=5)
        time.sleep(1)
        query_and_log(cursor, "更新后")
        
        # 验证删除
        delete_random_data(cursor, delete_count=5)
        time.sleep(1)
        query_and_log(cursor, "删除后")
        
        logger.info(">>> StarRocks 4.0.2 基本功能测试全部通过 <<<")

    except Exception as e:
        logger.error(f"主流程发生错误: {e}")
    finally:
        # 清理资源
        if conn:
            conn.close()
            logger.info("数据库连接已关闭")
        if server:
            server.stop()
            logger.info("SSH 隧道已关闭")

if __name__ == "__main__":
    main_process()

对应的log文件内容如下:

2025-12-10 18:13:50,107 - INFO - >>> 1. 正在建立 SSH 隧道 (连接 nd4: 10.x.xx.204) ...
2025-12-10 18:13:50,405 - INFO - >>> SSH 隧道建立成功! 映射 StarRocks 4.0 端口 9030 -> 本地 58569
2025-12-10 18:13:50,405 - INFO - >>> 正在连接 StarRocks 4.0.2 ...
2025-12-10 18:13:50,474 - INFO - 正在执行: [init_db_and_table] ...
2025-12-10 18:13:50,554 - INFO - 数据库 python_perf_test_sr4 和表 student_scores_perf_sr4 已初始化 (Replication=3)
2025-12-10 18:13:50,554 - INFO - 执行完成: [init_db_and_table] | 耗时: 0.0798 秒
2025-12-10 18:13:50,554 - INFO - 正在执行: [insert_data_batch] ...
2025-12-10 18:14:01,150 - INFO - 成功插入 100 条数据
2025-12-10 18:14:01,150 - INFO - 执行完成: [insert_data_batch] | 耗时: 10.5959 秒
2025-12-10 18:14:02,151 - INFO - 正在执行: [query_and_log] ...
2025-12-10 18:14:02,192 - INFO - --- [插入后] 当前总行数: 100 ---
2025-12-10 18:14:02,192 - INFO - Row: {'id': 1, 'name': 'User_001', 'age': 26, 'score': 81, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:02,192 - INFO - Row: {'id': 2, 'name': 'User_002', 'age': 29, 'score': 55, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:02,192 - INFO - Row: {'id': 3, 'name': 'User_003', 'age': 21, 'score': 63, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:02,192 - INFO - 执行完成: [query_and_log] | 耗时: 0.0411 秒
2025-12-10 18:14:02,193 - INFO - 正在执行: [update_random_data] ...
2025-12-10 18:14:02,330 - INFO -  -> 更新 ID=4, New Score=98
2025-12-10 18:14:02,425 - INFO -  -> 更新 ID=57, New Score=100
2025-12-10 18:14:02,529 - INFO -  -> 更新 ID=29, New Score=95
2025-12-10 18:14:02,623 - INFO -  -> 更新 ID=8, New Score=95
2025-12-10 18:14:02,727 - INFO -  -> 更新 ID=42, New Score=100
2025-12-10 18:14:02,727 - INFO - 执行完成: [update_random_data] | 耗时: 0.5346 秒
2025-12-10 18:14:03,728 - INFO - 正在执行: [query_and_log] ...
2025-12-10 18:14:03,752 - INFO - --- [更新后] 当前总行数: 100 ---
2025-12-10 18:14:03,753 - INFO - Row: {'id': 1, 'name': 'User_001', 'age': 26, 'score': 81, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:03,753 - INFO - Row: {'id': 2, 'name': 'User_002', 'age': 29, 'score': 55, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:03,753 - INFO - Row: {'id': 3, 'name': 'User_003', 'age': 21, 'score': 63, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:03,753 - INFO - 执行完成: [query_and_log] | 耗时: 0.0251 秒
2025-12-10 18:14:03,753 - INFO - 正在执行: [delete_random_data] ...
2025-12-10 18:14:03,928 - INFO -  -> 删除 ID=41
2025-12-10 18:14:04,020 - INFO -  -> 删除 ID=54
2025-12-10 18:14:04,116 - INFO -  -> 删除 ID=92
2025-12-10 18:14:04,220 - INFO -  -> 删除 ID=59
2025-12-10 18:14:04,324 - INFO -  -> 删除 ID=20
2025-12-10 18:14:04,324 - INFO - 执行完成: [delete_random_data] | 耗时: 0.5706 秒
2025-12-10 18:14:05,324 - INFO - 正在执行: [query_and_log] ...
2025-12-10 18:14:05,343 - INFO - --- [删除后] 当前总行数: 95 ---
2025-12-10 18:14:05,358 - INFO - Row: {'id': 1, 'name': 'User_001', 'age': 26, 'score': 81, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:05,358 - INFO - Row: {'id': 2, 'name': 'User_002', 'age': 29, 'score': 55, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:05,358 - INFO - Row: {'id': 3, 'name': 'User_003', 'age': 21, 'score': 63, 'update_time': datetime.datetime(2025, 12, 10, 18, 13, 51)}
2025-12-10 18:14:05,359 - INFO - 执行完成: [query_and_log] | 耗时: 0.0345 秒
2025-12-10 18:14:05,359 - INFO - >>> StarRocks 4.0.2 基本功能测试全部通过 <<<
2025-12-10 18:14:05,359 - INFO - 数据库连接已关闭
2025-12-10 18:14:05,460 - INFO - SSH 隧道已关闭

去nd4执行下述语句进行查看:

mysql -h 10.x.xx.204 -P 9030 -u root
show databases;
use python_perf_test_sr4;
show tables;

select * from student_scores_perf_sr4;

8.1 插入数据

这里展示部分数据,可以看出插入了100条测试数据:

8.2 更新数据

控制台打印结果如下:

终端验证,部分数据展示如下:

8.3 删除数据

控制台打印结果如下:

终端验证,部分数据展示如下:

9. Paimon外表数据写入StarRocks内表基础测试

9.1 Paimon数据准备

对于Paimon外表数据写入StarRocks内表基础测试,需要提前在Flink SQL会话里面创建Paimon表,并插入测试数据。

-- 1. 创建 Flink 端的 Paimon Catalog
CREATE CATALOG paimon_catalog WITH (
  'type'        = 'paimon',
  'warehouse'   = 'hdfs:///user/hive/warehouse',
  'metastore'   = 'hive',
  'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);

-- 2. 切换 Catalog 和 Database
USE CATALOG my_paimon;
CREATE DATABASE IF NOT EXISTS ods;
USE ods;

-- 3. 创建 Paimon 表 (源表)
-- 这是一个记录用户行为的日志表
CREATE TABLE IF NOT EXISTS paimon_source_event (
    user_id INT,
    item_id INT,
    behavior STRING,
    dt STRING,
    ts TIMESTAMP(3),
    PRIMARY KEY (dt, user_id, item_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
    'bucket' = '1',
    'file.format' = 'parquet'
);

-- 4. 写入测试数据 (Batch 模式写入)
INSERT INTO paimon_source_event VALUES
(1001, 501, 'click', '2025-12-12', TIMESTAMP '2025-12-12 10:00:00.123'),
(1002, 502, 'view',  '2025-12-12', TIMESTAMP '2025-12-12 10:05:00.456'),
(1003, 501, 'buy',   '2025-12-12', TIMESTAMP '2025-12-12 10:10:00.789'),
(1001, 503, 'view',  '2025-12-13', TIMESTAMP '2025-12-13 11:00:00.000'),
(1004, 501, 'click', '2025-12-13', TIMESTAMP '2025-12-13 11:05:00.000');

9.2 测试代码

将下述测试代码保存为doris4_paimon_tel_test.py【python解释器:3.8.20、windows系统:11】

# -*- coding: utf-8 -*-
import pymysql
import time
import logging
import functools
from sshtunnel import SSHTunnelForwarder

# ================= 配置信息 =================

# 1. SSH 连接信息 (参考 starrocks4_test.py)
SSH_HOST = '10.x.xx.204'       # StarRocks FE Leader IP (nd4)
SSH_PORT = 22
SSH_USER = 'xxxxx'
SSH_PASSWORD = "xxxxxxxxxxxxxxxxx"

# 2. StarRocks 数据库连接信息
SR_LOCAL_HOST = '127.0.0.1'
SR_QUERY_PORT = 9030           # StarRocks 默认查询端口
SR_DB_USER = 'root'
SR_DB_PWD = ''

# 3. Paimon Catalog 配置
# StarRocks 创建 Paimon Catalog 语法略有不同
CATALOG_PROPS = {
    "type": "paimon",
    "paimon.catalog.type": "hive", # StarRocks 这里通常填 hive 代表基于 HMS
    "hive.metastore.uris": "thrift://nd1:9083,thrift://nd3:9083",
    "warehouse": "hdfs://nd1:8020/user/hive/warehouse"
    # StarRocks 通常不需要显式指定 hadoop.conf.dir,除非有特殊 HA 配置
}

# 4. 业务配置
PAIMON_CATALOG_NAME = 'paimon_catalog_sr' # 避免与 Doris 测试混淆,改个名
PAIMON_DB = 'ods'
PAIMON_TABLE = 'paimon_source_event'

SR_TEST_DB = 'starrocks_paimon_test_db'
SR_TARGET_TABLE = 'sr_target_event_sink'

LOG_FILE = 'starrocks4_paimon_etl_report.log'

# ================= 日志与工具模块 =================
def setup_logger():
    logger = logging.getLogger("StarRocksPaimonTester")
    logger.setLevel(logging.INFO)
    if logger.hasHandlers():
        logger.handlers.clear()
    
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler = logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8')
    file_handler.setFormatter(formatter)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

logger = setup_logger()

def measure_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        logger.info(f"正在执行: [{func.__name__}] ...")
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            logger.info(f"执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}")
            raise e
    return wrapper

# ================= 核心测试逻辑 =================

@measure_time
def init_starrocks_catalog(cursor):
    """在 StarRocks 中初始化 Paimon Catalog"""
    logger.info(f"正在初始化 StarRocks Catalog: {PAIMON_CATALOG_NAME} ...")
    
    # 1. 删除旧 Catalog
    # StarRocks 删除 Catalog 语法: DROP CATALOG <name>
    cursor.execute(f"DROP CATALOG IF EXISTS {PAIMON_CATALOG_NAME}")
    
    # 2. 创建 Catalog
    # StarRocks 语法: CREATE EXTERNAL CATALOG ...
    props_str = ",\n".join([f'"{k}" = "{v}"' for k, v in CATALOG_PROPS.items()])
    create_sql = f"""
    CREATE EXTERNAL CATALOG {PAIMON_CATALOG_NAME} PROPERTIES (
        {props_str}
    );
    """
    logger.info(f"发送 Create Catalog 请求:\n{create_sql}")
    cursor.execute(create_sql)
    logger.info("Catalog 创建成功!")
    time.sleep(2) # 等待元数据加载

@measure_time
def check_paimon_source(cursor):
    """验证 StarRocks 是否能读取 Paimon 数据"""
    logger.info(f"检查 Paimon 数据源: {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}")
    
    # 【差异点】StarRocks 切换 Catalog 使用 SET CATALOG
    cursor.execute(f"SET CATALOG {PAIMON_CATALOG_NAME}")
    
    cursor.execute(f"USE {PAIMON_DB}")
    cursor.execute("SHOW TABLES")
    tables = [list(row.values())[0] for row in cursor.fetchall()]
    
    if PAIMON_TABLE not in tables:
        logger.warning(f"当前 Catalog 下的表: {tables}")
        raise Exception(f"Paimon 表 {PAIMON_TABLE} 未找到!")
    
    # 预览数据
    sql = f"SELECT * FROM {PAIMON_TABLE} ORDER BY dt, user_id LIMIT 5"
    cursor.execute(sql)
    results = cursor.fetchall()
    logger.info(f"Paimon 数据预览 (前5条):")
    for row in results:
        logger.info(row)
    
    if not results:
        raise Exception("Paimon 表为空,请先在 Flink 端写入数据!")
    
    return len(results)

@measure_time
def create_starrocks_target_table(cursor):
    """创建 StarRocks 4.0 内部表 (Primary Key 模型)"""
    # 【差异点】切回内部 Catalog,StarRocks 默认为 default_catalog
    cursor.execute("SET CATALOG default_catalog")
    
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {SR_TEST_DB}")
    cursor.execute(f"USE {SR_TEST_DB}")
    
    # StarRocks 4.0 推荐使用 Primary Key 模型
    # Primary Key 模型不强制要求 Key 列在最前,但为了规范,我们保持顺序
    create_sql = f"""
    CREATE TABLE IF NOT EXISTS {SR_TARGET_TABLE} (
        user_id INT COMMENT "用户ID",
        item_id INT COMMENT "商品ID",
        dt VARCHAR(20) COMMENT "日期分区",
        behavior VARCHAR(50) COMMENT "行为类型",
        ts DATETIME COMMENT "时间戳"
    )
    PRIMARY KEY(user_id, item_id, dt)
    PARTITION BY LIST(dt) (
        PARTITION p20251212 VALUES IN ("2025-12-12"),
        PARTITION p20251213 VALUES IN ("2025-12-13")
    )
    DISTRIBUTED BY HASH(user_id) BUCKETS 3
    PROPERTIES (
        "replication_num" = "3",
        "enable_persistent_index" = "true" 
    );
    """
    
    cursor.execute(create_sql)
    cursor.execute(f"TRUNCATE TABLE {SR_TARGET_TABLE}")
    logger.info(f"StarRocks 内表 {SR_TARGET_TABLE} (Primary Key) 已准备就绪")

@measure_time
def execute_etl_paimon_to_starrocks(cursor):
    """执行 INSERT INTO ... SELECT ..."""
    logger.info(">>> 开始执行从 Paimon 到 StarRocks 的数据导入 (ETL) <<<")
    
    # 确保在 default_catalog 下执行,或者全路径引用
    # 这里使用全路径引用更安全
    # 目标: default_catalog.db.table
    # 源:   paimon_catalog.db.table
    
    etl_sql = f"""
    INSERT INTO default_catalog.{SR_TEST_DB}.{SR_TARGET_TABLE}
    (user_id, item_id, dt, behavior, ts)
    SELECT user_id, item_id, dt, behavior, ts 
    FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}
    """
    
    cursor.execute(etl_sql)
    logger.info("ETL SQL 提交完毕")

@measure_time
def verify_data_consistency(cursor):
    """验证数据一致性"""
    logger.info(">>> 开始数据一致性校验 <<<")
    
    # 1. Paimon 源数据量
    cursor.execute(f"SELECT count(*) as cnt FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}")
    paimon_count = cursor.fetchone()['cnt']
    
    # 2. StarRocks 目标数据量
    cursor.execute(f"SELECT count(*) as cnt FROM default_catalog.{SR_TEST_DB}.{SR_TARGET_TABLE}")
    sr_count = cursor.fetchone()['cnt']
    
    logger.info(f"Paimon 源表行数: {paimon_count}")
    logger.info(f"StarRocks 目标表行数: {sr_count}")
    
    if paimon_count == sr_count:
        logger.info("✅ 测试通过:数据条数一致!")
    else:
        logger.error("❌ 测试失败:数据条数不一致!")
        
    # 3. 抽样展示
    cursor.execute(f"SELECT * FROM default_catalog.{SR_TEST_DB}.{SR_TARGET_TABLE} ORDER BY dt, user_id LIMIT 3")
    rows = cursor.fetchall()
    logger.info("StarRocks 内表数据抽样:")
    for row in rows:
        logger.info(row)

# ================= 主流程 =================

def main_process():
    server = None
    conn = None
    try:
        logger.info(">>> 1. 正在建立 SSH 隧道 (连接至 StarRocks 4.0 nd4) ...")
        # 建立隧道
        server = SSHTunnelForwarder(
            (SSH_HOST, SSH_PORT),
            ssh_username=SSH_USER,
            ssh_password=SSH_PASSWORD,
            remote_bind_address=(SR_LOCAL_HOST, SR_QUERY_PORT) 
        )
        server.start()
        logger.info(f">>> SSH 隧道建立成功! 本地端口: {server.local_bind_port}")

        logger.info(">>> 2. 连接 StarRocks 数据库 ...")
        conn = pymysql.connect(
            host='127.0.0.1',
            port=server.local_bind_port,
            user=SR_DB_USER,
            password=SR_DB_PWD,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor,
            autocommit=True
        )
        cursor = conn.cursor()
        
        # --- 测试步骤 ---
        
        # 步骤 1: 初始化 Catalog
        init_starrocks_catalog(cursor)

        # 步骤 2: 确认源端 Paimon 可读
        check_paimon_source(cursor)
        
        # 步骤 3: 准备 StarRocks 目标表
        create_starrocks_target_table(cursor)
        
        # 步骤 4: 执行导入
        execute_etl_paimon_to_starrocks(cursor)
        
        # StarRocks 写入速度极快,稍微等待事务可见
        time.sleep(2)
        
        # 步骤 5: 校验结果
        verify_data_consistency(cursor)
        
        logger.info(">>> StarRocks 4.0 与 Paimon 集成测试全部完成 <<<")

    except Exception as e:
        logger.error(f"主流程发生错误: {e}")
        import traceback
        logger.error(traceback.format_exc())
    finally:
        if conn:
            conn.close()
            logger.info("数据库连接已关闭")
        if server:
            server.stop()
            logger.info("SSH 隧道已关闭")

if __name__ == "__main__":
    main_process()

对应的log文件内容如下:

2025-12-12 16:39:49,820 - INFO - >>> 1. 正在建立 SSH 隧道 (连接至 StarRocks 4.0 nd4) ...
2025-12-12 16:39:50,103 - INFO - >>> SSH 隧道建立成功! 本地端口: 61825
2025-12-12 16:39:50,104 - INFO - >>> 2. 连接 StarRocks 数据库 ...
2025-12-12 16:39:50,154 - INFO - 正在执行: [init_starrocks_catalog] ...
2025-12-12 16:39:50,154 - INFO - 正在初始化 StarRocks Catalog: paimon_catalog_sr ...
2025-12-12 16:39:50,181 - INFO - 发送 Create Catalog 请求:

    CREATE EXTERNAL CATALOG paimon_catalog_sr PROPERTIES (
        "type" = "paimon",
"paimon.catalog.type" = "hive",
"hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083",
"warehouse" = "hdfs://nd1:8020/user/hive/warehouse"
    );
    
2025-12-12 16:39:50,219 - INFO - Catalog 创建成功!
2025-12-12 16:39:52,219 - INFO - 执行完成: [init_starrocks_catalog] | 耗时: 2.0656 秒
2025-12-12 16:39:52,220 - INFO - 正在执行: [check_paimon_source] ...
2025-12-12 16:39:52,220 - INFO - 检查 Paimon 数据源: paimon_catalog_sr.ods.paimon_source_event
2025-12-12 16:39:52,672 - INFO - Paimon 数据预览 (前5条):
2025-12-12 16:39:52,672 - INFO - {'user_id': 1001, 'item_id': 501, 'behavior': 'click', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)}
2025-12-12 16:39:52,673 - INFO - {'user_id': 1002, 'item_id': 502, 'behavior': 'view', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)}
2025-12-12 16:39:52,673 - INFO - {'user_id': 1003, 'item_id': 501, 'behavior': 'buy', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)}
2025-12-12 16:39:52,673 - INFO - {'user_id': 1001, 'item_id': 503, 'behavior': 'view', 'dt': '2025-12-13', 'ts': datetime.datetime(2025, 12, 13, 11, 0)}
2025-12-12 16:39:52,674 - INFO - {'user_id': 1004, 'item_id': 501, 'behavior': 'click', 'dt': '2025-12-13', 'ts': datetime.datetime(2025, 12, 13, 11, 5)}
2025-12-12 16:39:52,674 - INFO - 执行完成: [check_paimon_source] | 耗时: 0.4546 秒
2025-12-12 16:39:52,674 - INFO - 正在执行: [create_starrocks_target_table] ...
2025-12-12 16:39:52,732 - INFO - StarRocks 内表 sr_target_event_sink (Primary Key) 已准备就绪
2025-12-12 16:39:52,733 - INFO - 执行完成: [create_starrocks_target_table] | 耗时: 0.0581 秒
2025-12-12 16:39:52,733 - INFO - 正在执行: [execute_etl_paimon_to_starrocks] ...
2025-12-12 16:39:52,733 - INFO - >>> 开始执行从 Paimon 到 StarRocks 的数据导入 (ETL) <<<
2025-12-12 16:39:56,177 - INFO - ETL SQL 提交完毕
2025-12-12 16:39:56,177 - INFO - 执行完成: [execute_etl_paimon_to_starrocks] | 耗时: 3.4447 秒
2025-12-12 16:39:58,178 - INFO - 正在执行: [verify_data_consistency] ...
2025-12-12 16:39:58,178 - INFO - >>> 开始数据一致性校验 <<<
2025-12-12 16:39:58,341 - INFO - Paimon 源表行数: 5
2025-12-12 16:39:58,341 - INFO - StarRocks 目标表行数: 5
2025-12-12 16:39:58,342 - INFO - ✅ 测试通过:数据条数一致!
2025-12-12 16:39:58,373 - INFO - StarRocks 内表数据抽样:
2025-12-12 16:39:58,374 - INFO - {'user_id': 1001, 'item_id': 501, 'dt': '2025-12-12', 'behavior': 'click', 'ts': datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)}
2025-12-12 16:39:58,374 - INFO - {'user_id': 1002, 'item_id': 502, 'dt': '2025-12-12', 'behavior': 'view', 'ts': datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)}
2025-12-12 16:39:58,374 - INFO - {'user_id': 1003, 'item_id': 501, 'dt': '2025-12-12', 'behavior': 'buy', 'ts': datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)}
2025-12-12 16:39:58,374 - INFO - 执行完成: [verify_data_consistency] | 耗时: 0.1962 秒
2025-12-12 16:39:58,374 - INFO - >>> StarRocks 4.0 与 Paimon 集成测试全部完成 <<<
2025-12-12 16:39:58,374 - INFO - 数据库连接已关闭
2025-12-12 16:39:58,402 - INFO - SSH 隧道已关闭

9.3 验证数据

去StarRocks终端验证数据结果如下:

mysql -h 10.x.xx.204 -P 9030 -uroot

执行下述sql

-- 1. 切换到默认内部 Catalog (必须使用 SET CATALOG)
SET CATALOG default_catalog;

-- 2. 切换到测试数据库 (脚本中定义的 SR_TEST_DB)
USE starrocks_paimon_test_db;

-- 3. 查看表
SHOW TABLES;
-- 预期输出: sr_target_event_sink

-- 4. 查询数据
SELECT * FROM sr_target_event_sink ORDER BY user_id;

-- 5. 查看数据总量
SELECT count(*) FROM sr_target_event_sink;

StarRocks 支持通过 catalog.db.table 的全路径方式进行跨源查询,这样不用频繁切换 Catalog。

SELECT 
    'Paimon Source' as source_type, 
    count(*) as cnt 
FROM paimon_catalog_sr.ods.paimon_source_event
UNION ALL
SELECT 
    'StarRocks Target' as source_type, 
    count(*) as cnt 
FROM default_catalog.starrocks_paimon_test_db.sr_target_event_sink;

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐