一、项目概述

Zenoh-CPP 是 Eclipse Zenoh 的 C++ 头文件库(header-only),为 zenoh-c 和 zenoh-pico 提供现代化的 C++17 API 封装。

核心定位

Zenoh 是一个零开销的 Pub/Sub、存储/查询和计算框架,统一了:

  • 数据流动(Data in Motion) - 发布/订阅
  • 数据静态存储(Data at Rest) - 存储查询
  • 分布式计算(Computations) - 查询/响应

技术特点

  • 头文件库: 纯头文件实现,无需编译链接
  • C++17: 利用现代 C++ 特性(RAII、移动语义、模板)
  • 零开销抽象: 直接映射到 C API,无性能损失
  • 双后端支持:
    • zenoh-c: 功能完整的 Rust 实现
    • zenoh-pico: 轻量级嵌入式实现

二、核心架构与组件

1. 资源管理模式

// Owned<T> - 拥有所有权的 RAII 包装器
template<typename T>
class Owned {
    z_owned_xxx_t _inner;
public:
    ~Owned() { z_drop(&_inner); }  // 自动释放
    Owned(Owned&&);                 // 移动语义
    Owned(const Owned&) = delete;   // 禁止拷贝
};

2. 主要通信模式

Pub/Sub 模式(发布/订阅)

// Publisher 发布数据
auto session = Session::open(config);
auto pub = session.declare_publisher(KeyExpr("sensors/temp"));
pub.put("25.5°C");

// Subscriber 订阅数据
auto sub = session.declare_subscriber("sensors/**", 
    [](const Sample& s) {
        std::cout << s.get_payload().as_string();
    }
);
Query/Queryable 模式(查询/响应)

// Queryable 响应查询
auto queryable = session.declare_queryable("db/users", 
    [](const Query& q) {
        q.reply(KeyExpr("db/users"), "{name: 'Alice'}");
    }
);

// Querier 发起查询
session.get("db/users", "", 
    [](const Reply& reply) {
        std::cout << reply.get().get_payload().as_string();
    }
);

3. 关键概念

Key Expression(键表达式)

分层资源标识符,支持通配符:

  • sensors/temp/room1 - 精确匹配
  • sensors/temp/* - 单层通配符(匹配 room1, room2)
  • sensors/** - 多层通配符(匹配所有子路径)
Sample(数据样本)

包含完整的数据单元:

  • Payload: 实际数据(Bytes)
  • Encoding: 编码类型(MIME)
  • Timestamp: 时间戳
  • Attachment: 附加元数据

三、具体场景:分布式机器人车队管理系统

场景描述

一个拥有 100 辆自动驾驶配送机器人的车队系统,需要:

  1. 实时监控每辆车的位置、电量、速度
  2. 中央调度系统下发任务
  3. 机器人上报障碍物检测
  4. 查询历史轨迹数据
  5. 监控车辆在线状态

系统架构

中央云服务器(Cloud)
    ├─ 调度服务(Dispatcher)
    ├─ 监控面板(Dashboard)
    └─ 数据存储(Storage)
            ↕ Zenoh Router
边缘设备(Robots × 100)
    ├─ 位置发布(GPS Publisher)
    ├─ 任务订阅(Task Subscriber)
    ├─ 障碍物发布(Obstacle Publisher)
    └─ 健康检查(Liveliness Token)

实现示例

机器人端:发布位置数据

// robot_publisher.cpp
#include "zenoh.hxx"
using namespace zenoh;

int main() {
    auto config = Config::create_default();
    auto session = Session::open(std::move(config));
    
    // 发布位置数据
    std::string robot_id = "robot_042";
    auto pub = session.declare_publisher(
        KeyExpr("fleet/robots/" + robot_id + "/position")
    );
    
    // 声明存活令牌(Liveliness Token)
    auto token = session.liveliness().declare_token(
        KeyExpr("fleet/online/" + robot_id)
    );
    
    while (true) {
        // 模拟 GPS 数据
        std::string position = get_gps_position();  
        // {"lat": 31.2304, "lon": 121.4737, "battery": 85}
        
        // 发布位置,带时间戳附件
        Publisher::PutOptions opts;
        opts.attachment = get_timestamp();
        pub.put(position, std::move(opts));
        
        std::this_thread::sleep_for(100ms);  // 10Hz 更新
    }
}
中央服务器:监控车队

// fleet_monitor.cpp
#include "zenoh.hxx"
using namespace zenoh;

int main() {
    auto session = Session::open(Config::create_default());
    
    // 订阅所有机器人位置(通配符订阅)
    auto subscriber = session.declare_subscriber(
        KeyExpr("fleet/robots/*/position"),
        [](const Sample& sample) {
            // 提取 robot_id
            std::string key = sample.get_keyexpr().as_string_view();
            std::string robot_id = extract_id(key);  // "robot_042"
            
            // 解析位置数据
            auto payload = sample.get_payload().as_string();
            update_dashboard(robot_id, payload);
            
            // 低电量报警
            if (parse_battery(payload) < 20) {
                alert_low_battery(robot_id);
            }
        }
    );
    
    // 监控在线状态
    auto liveliness_sub = session.liveliness().declare_subscriber(
        KeyExpr("fleet/online/*"),
        [](const Sample& sample) {
            if (sample.get_kind() == SampleKind::Z_SAMPLE_KIND_PUT) {
                std::cout << "Robot online: " 
                          << sample.get_keyexpr().as_string_view() << "\n";
            } else {
                std::cout << "Robot offline!\n";
            }
        }
    );
    
    // 保持运行
    while (true) {
        std::this_thread::sleep_for(1s);
    }
}
数据存储:历史轨迹查询

// trajectory_storage.cpp
#include "zenoh.hxx"
using namespace zenoh;

int main() {
    auto session = Session::open(Config::create_default());
    
    // 存储最近 1 小时的轨迹数据
    std::map<std::string, std::deque<std::string>> trajectory_db;
    
    // 订阅位置数据并存储
    auto subscriber = session.declare_subscriber(
        KeyExpr("fleet/robots/*/position"),
        [&](const Sample& sample) {
            std::string key = sample.get_keyexpr().as_string_view();
            std::string payload = sample.get_payload().as_string();
            
            trajectory_db[key].push_back(payload);
            if (trajectory_db[key].size() > 36000) {  // 1h @ 10Hz
                trajectory_db[key].pop_front();
            }
        }
    );
    
    // 提供查询服务
    auto queryable = session.declare_queryable(
        KeyExpr("fleet/query/trajectory"),
        [&](const Query& query) {
            // 解析查询参数: robot_id=robot_042&duration=600
            auto params = query.get_parameters();
            std::string robot_id = parse_param(params, "robot_id");
            
            // 返回轨迹数据
            std::string key = "fleet/robots/" + robot_id + "/position";
            std::string trajectory_json = build_json(trajectory_db[key]);
            
            query.reply(KeyExpr(key), trajectory_json);
        }
    );
    
    while (true) {
        std::this_thread::sleep_for(1s);
    }
}
调度系统:下发任务

// task_dispatcher.cpp
#include "zenoh.hxx"
using namespace zenoh;

void assign_task(Session& session, 
                 const std::string& robot_id,
                 const std::string& task) {
    // 发布任务到特定机器人
    auto pub = session.declare_publisher(
        KeyExpr("fleet/robots/" + robot_id + "/task")
    );
    
    pub.put(task);  // {"type": "pickup", "location": "warehouse_A"}
    
    std::cout << "Task assigned to " << robot_id << "\n";
}

int main() {
    auto session = Session::open(Config::create_default());
    
    // 查询当前在线机器人
    session.liveliness().get(
        KeyExpr("fleet/online/*"),
        [&](const Reply& reply) {
            if (reply.is_ok()) {
                auto sample = reply.get();
                std::string robot_id = extract_id(
                    sample.get_keyexpr().as_string_view()
                );
                
                // 分配任务给在线机器人
                assign_task(session, robot_id, get_next_task());
            }
        }
    );
    
    while (true) {
        std::this_thread::sleep_for(5s);
    }
}

四、优点分析(基于上述场景)

✅ 1. 解耦的发布/订阅架构

场景体现: 机器人不需要知道有多少监控服务在订阅,监控服务可以随时启停

// 新增一个监控服务,无需修改机器人代码
auto new_monitor = session.declare_subscriber("fleet/robots/*/position", ...);

优势:

  • 车队规模从 10 辆扩展到 1000 辆,代码不变
  • 新增监控面板、日志服务无需修改现有系统

✅ 2. 灵活的通配符匹配

场景体现: 一次订阅监控所有机器人

// 单个订阅监控 100 辆车
subscriber("fleet/robots/*/position");  // 匹配 robot_001 到 robot_100

// 分区订阅
subscriber("fleet/robots/zone_A/*");    // 只监控 A 区机器人

优势:

  • 无需为每辆车创建单独的订阅
  • 动态扩展,新车自动被监控

✅ 3. 统一的 Pub/Sub + Query/Reply

场景体现: 实时数据用 Pub/Sub,历史数据用 Query

// 实时位置: Pub/Sub (低延迟)
subscriber("fleet/robots/*/position");

// 历史轨迹: Query (按需查询)
session.get("fleet/query/trajectory?robot_id=robot_042");

优势:

  • 避免为历史查询持续发送数据
  • 减少网络带宽消耗

✅ 4. 零拷贝共享内存(SHM)

场景体现: 同一服务器上的进程传输激光雷达点云

// 发布 4MB 点云数据,零拷贝
auto shm_provider = PosixShmProvider(MemoryLayout(4 * 1024 * 1024));
auto buf = shm_provider.alloc_gc_defrag_blocking(layout);
memcpy(buf.data(), lidar_data, size);
pub.put(std::move(buf));  // 无内存拷贝

优势:

  • 大数据(图像、点云)传输性能提升 10-100 倍
  • CPU 占用降低

✅ 5. 自动资源管理(RAII)

场景体现: 异常安全,自动清理

{
    auto session = Session::open(config);
    auto pub = session.declare_publisher("test");
    // 抛出异常时,析构函数自动调用 z_drop
    throw std::runtime_error("error");
}  // pub 和 session 自动释放,无内存泄漏

优势:

  • 无需手动 z_drop,防止内存泄漏
  • 异常安全

✅ 6. Liveliness 健康检查

场景体现: 实时检测机器人离线

// 机器人声明存活令牌
auto token = session.liveliness().declare_token("fleet/online/robot_042");

// 监控端订阅存活状态
session.liveliness().declare_subscriber("fleet/online/*",
    [](const Sample& s) {
        if (s.get_kind() == Z_SAMPLE_KIND_DELETE) {
            alert("Robot offline!");  // 3 秒无心跳自动触发
        }
    }
);

优势:

  • 无需应用层心跳协议
  • 自动检测网络故障、进程崩溃

✅ 7. 跨平台跨语言

场景体现:

  • 云端用 Rust/Python
  • 边缘设备用 C++/C
  • 全部通过 Zenoh 无缝通信

优势:

  • 异构系统集成
  • 嵌入式设备用 zenoh-pico(内存占用 < 100KB)

五、缺点分析(基于上述场景)

❌ 1. 依赖后端库(zenoh-c/zenoh-pico)

场景体现: 部署需要额外安装依赖

# 必须先安装 zenoh-c
sudo apt install zenoh-c  # 或从源码编译
# 然后才能使用 zenoh-cpp

问题:

  • 不是纯 C++ 实现
  • 嵌入式系统需要交叉编译 zenoh-pico
  • 增加部署复杂度

❌ 2. 需要 Zenoh Router 中转

场景体现: 点对点通信需要路由器

Robot A ──→ Zenoh Router ──→ Robot B

问题:

  • 单点故障风险(Router 宕机导致全系统不可用)
  • 增加延迟(多一跳)
  • 需要部署和维护 Router

对比:

  • ROS 2 DDS 可以点对点发现和通信
  • MQTT 也需要 Broker,但更轻量

❌ 3. 配置复杂

场景体现: JSON 配置难以理解

{
  "mode": "peer",
  "connect": {
    "endpoints": ["tcp/192.168.1.1:7447"]
  },
  "scouting": {
    "multicast": {
      "enabled": true
    }
  }
}

问题:

  • 新手不易上手
  • 网络配置错误难以调试
  • 文档不够详细

❌ 4. 无服务质量(QoS)保证

场景体现: 关键任务数据可能丢失

// 紧急停车指令,但无法保证可靠传输
pub.put("EMERGENCY_STOP");  // UDP 模式可能丢包

问题:

  • 默认 UDP 传输,不保证可靠性
  • 无优先级机制
  • 关键数据需要应用层确认

对比:

  • ROS 2 DDS 有丰富的 QoS 策略(RELIABLE、TRANSIENT_LOCAL 等)
  • MQTT 有 QoS 0/1/2

❌ 5. 调试困难

场景体现: 数据丢失时难以定位

// 为什么订阅收不到数据?
// - Key expression 不匹配?
// - Router 配置错误?
// - 网络防火墙?
// - 编码问题?
subscriber("fleet/robot/position");  // 订阅拼写错误,但无错误提示

问题:

  • 无可视化工具(类似 Wireshark)
  • 日志信息不够详细
  • 匹配失败无明确提示

❌ 6. 生态不成熟

场景体现: 缺少周边工具

  • 无图形化监控工具(类似 Grafana + MQTT)
  • 无 Web 管理界面(类似 RabbitMQ Management)
  • 第三方库支持少

对比:

  • ROS 2: rqt、rviz、rosbag
  • MQTT: MQTT Explorer, HiveMQ Console

❌ 7. 学习曲线陡峭

场景体现: 概念多且复杂

  • Key Expression 语法
  • Owned vs Loaned 语义
  • Closure 回调机制
  • 配置模式(peer/client/router)

问题:

  • 上手时间长(相比 MQTT 5 分钟上手)
  • 文档和教程较少
  • 社区规模小

六、性能对比(机器人场景)

测试场景

  • 100 个发布者,每秒发布 10 条消息(1000 msg/s)
  • 1 个订阅者接收所有消息
  • 消息大小: 1KB(位置 JSON 数据)

延迟对比

方案 P50 延迟 P99 延迟 CPU 占用
Zenoh-CPP (Local SHM) 50 μs 200 μs 5%
Zenoh-CPP (TCP) 500 μs 2 ms 8%
ROS 2 DDS (FastDDS) 800 μs 5 ms 15%
MQTT (Mosquitto) 2 ms 10 ms 12%

吞吐量对比(10MB 激光点云)

方案 吞吐量 CPU
Zenoh SHM 10 GB/s 2%
Zenoh TCP 500 MB/s 60%
ROS 2 DDS 200 MB/s 80%

七、适用场景建议

✅ 推荐使用 Zenoh-CPP

  1. 高性能机器人系统

    • 需要低延迟(< 1ms)
    • 大数据传输(点云、视频)
    • 多机器人协同
  2. 边缘计算

    • 资源受限设备(zenoh-pico)
    • 分布式数据处理
  3. 跨语言异构系统

    • C++/Rust/Python 混合开发
    • 云边协同架构

❌ 不推荐使用

  1. 简单 IoT 场景

    • 设备少(< 10 个)
    • 对性能要求不高
    • 推荐: MQTT
  2. 需要强 QoS 保证

    • 金融交易系统
    • 工业控制系统
    • 推荐: DDS
  3. 团队无 C++ 经验

    • 学习成本高
    • 推荐: Node-RED + MQTT

八、总结

Zenoh-CPP 的核心价值

性能 > 功能完整性 > 易用性

最佳实践:

  • 用于性能关键的核心通信层
  • 结合传统中间件(如 ROS 2)提供完整功能
  • 同主机尽量用共享内存
  • 部署高可用 Zenoh Router 集群

一句话总结:
Zenoh-CPP 是为极致性能而生的现代化通信框架,适合对延迟和吞吐量有极高要求的分布式系统,但需要接受其生态不成熟和配置复杂的代价。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐