多智能体系统的通信开销优化:gRPC 与 WebSocket 的性能对比

1. 标题 (Title)

  • 多智能体通信架构实战:gRPC vs WebSocket 性能深度对比与优化策略
  • 分布式系统通信开销优化:从理论到实践的gRPC与WebSocket对比
  • 构建高效多智能体系统:gRPC与WebSocket通信协议的性能基准测试
  • 通信效率提升指南:多智能体系统中gRPC与WebSocket的选型与优化

2. 引言 (Introduction)

痛点引入 (Hook)

在当今的分布式系统和人工智能领域,多智能体系统(Multi-Agent Systems, MAS)正变得越来越重要。从自动驾驶车队到分布式机器学习,从智能物流系统到金融交易平台,多智能体系统正在各个领域发挥着关键作用。然而,随着智能体数量的增加和交互频率的提高,一个致命的问题逐渐浮现:通信开销正成为系统性能的主要瓶颈

你是否遇到过这样的场景:当你的多智能体系统只有几个节点时,一切运行流畅;但当节点数量扩展到几十、上百个时,系统响应变得迟缓,数据同步出现延迟,甚至出现节点失联的情况?这很可能不是你的算法有问题,而是通信协议的选择和优化出了问题。

文章内容概述 (What)

本文将深入探讨多智能体系统中的通信开销问题,重点对比分析两种主流的现代通信协议:gRPCWebSocket。我们将从理论基础、架构设计、性能基准测试等多个维度进行全面对比,并通过实际代码示例展示如何在多智能体系统中实现这两种通信方式。

不仅如此,我们还将提供一套完整的性能测试方法论,帮助你在自己的系统中进行客观的性能评估,以及针对性的优化策略,让你的多智能体通信更加高效。

读者收益 (Why)

读完本文,你将:

  • 深入理解多智能体系统中通信开销的本质和影响因素
  • 掌握 gRPC 和 WebSocket 的核心原理、优缺点及适用场景
  • 学会如何设计和实现基于这两种协议的多智能体通信系统
  • 获得一套可复用的性能测试框架和优化策略
  • 能够根据具体业务场景,做出更明智的通信协议选型决策

3. 准备工作 (Prerequisites)

在开始我们的探索之旅前,请确保你已经具备以下基础:

技术栈/知识:

  • 熟悉基础的网络编程概念(TCP/IP、HTTP、序列化等)
  • 具备 Python 或 Go 等后端语言的编程经验(本文示例使用 Python)
  • 了解多智能体系统的基本概念和架构
  • 对分布式系统有基本了解

环境/工具:

  • 已安装 Python 3.7+ 和 pip
  • 已安装 Docker(可选,用于环境隔离和性能测试)
  • 基本的命令行操作能力
  • 一个代码编辑器(如 VS Code、PyCharm 等)

4. 核心概念与理论基础

4.1 多智能体系统通信概述

核心概念

多智能体系统(MAS)是由多个自主智能体组成的集合,这些智能体通过相互通信、协作和竞争来解决单个智能体难以解决的复杂问题。在 MAS 中,通信是智能体之间传递信息、协调行为的基础。

通信开销主要包括以下几个方面:

  • 延迟:信息从发送方到接收方所需的时间
  • 吞吐量:单位时间内能够传输的数据量
  • CPU 使用率:通信过程中消耗的计算资源
  • 内存占用:通信模块占用的内存资源
  • 网络带宽:数据传输占用的网络资源
问题背景

随着 MAS 规模的扩大和应用场景的复杂化,通信开销问题日益突出:

  • 智能体数量从几个增加到成百上千个
  • 交互频率从每分钟几次增加到每秒几千次
  • 数据 payload 从简单的文本消息增加到复杂的结构化数据
  • 对实时性和可靠性的要求越来越高

这些变化使得传统的通信方式(如简单的 HTTP 请求/响应)难以满足需求,需要更高效的通信协议和架构。

4.2 gRPC 核心概念

什么是 gRPC

gRPC 是由 Google 开发的高性能、开源的通用 RPC(Remote Procedure Call,远程过程调用)框架。它基于 HTTP/2 协议传输,使用 Protocol Buffers 作为接口描述语言(IDL)和消息序列化工具。

gRPC 的核心特性
  1. HTTP/2 传输:提供多路复用、双向流、头部压缩等特性
  2. Protocol Buffers:高效的二进制序列化格式,比 JSON 更小、更快
  3. 强类型接口:通过 .proto 文件定义服务和消息类型
  4. 多语言支持:支持多种编程语言,便于跨语言开发
  5. 流式通信:支持单向流、服务端流、客户端流和双向流
gRPC 的通信模式

gRPC 提供了四种通信模式:

  1. 一元 RPC(Unary RPC):简单的请求-响应模式
  2. 服务端流式 RPC(Server Streaming RPC):客户端发送一个请求,服务端返回一系列响应
  3. 客户端流式 RPC(Client Streaming RPC):客户端发送一系列请求,服务端返回一个响应
  4. 双向流式 RPC(Bidirectional Streaming RPC):客户端和服务端可以独立地发送和接收一系列消息

4.3 WebSocket 核心概念

什么是 WebSocket

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它于 2011 年被 IETF 标准化为 RFC 6455,允许客户端和服务器之间建立持久连接,双方可以随时向对方发送数据。

WebSocket 的核心特性
  1. 全双工通信:客户端和服务器可以同时发送和接收数据
  2. 持久连接:连接建立后保持打开状态,避免了频繁的握手开销
  3. 低延迟:减少了 HTTP 请求/响应模式中的头部开销
  4. 兼容性好:基于 HTTP 协议进行握手,可以通过大多数 HTTP 代理和防火墙
  5. 消息帧:数据以帧的形式传输,支持文本和二进制数据
WebSocket 的工作原理
  1. 握手阶段:客户端发送 HTTP 请求进行协议升级,服务器确认后,连接从 HTTP 升级为 WebSocket
  2. 数据传输阶段:连接建立后,双方可以通过消息帧进行数据传输
  3. 连接关闭阶段:任何一方都可以发送关闭帧来关闭连接

4.4 概念对比与关系

为了更清晰地理解 gRPC 和 WebSocket 的区别,让我们从多个维度进行对比:

特性维度 gRPC WebSocket
协议基础 HTTP/2 WebSocket 协议(基于 TCP)
序列化方式 Protocol Buffers(默认),也支持 JSON 无默认格式,可自定义(JSON、二进制等)
通信模式 一元、服务端流、客户端流、双向流 全双工消息传递
接口定义 强类型(通过 .proto 文件) 无标准接口定义
传输效率 高(二进制序列化 + HTTP/2 头部压缩) 中等(取决于序列化方式)
适用场景 微服务通信、API 调用、高性能 RPC 实时聊天、实时数据推送、多人协作
浏览器支持 需要 gRPC-Web 或 Envoy 代理 现代浏览器原生支持
连接管理 基于 HTTP/2 多路复用 独立的长连接
错误处理 标准的状态码和错误模型 需要自定义错误处理机制
生态系统 丰富的工具和库支持 相对简单,需要自行处理很多细节
概念关系图

下面是 gRPC 和 WebSocket 在网络协议栈中的位置关系:

应用层

gRPC

WebSocket

HTTP/2

WebSocket Protocol

TCP

IP

物理层

Protocol Buffers

自定义序列化

交互流程对比

让我们通过两个流程图来看看这两种协议的典型交互流程。

gRPC 一元调用流程:

服务端智能体 客户端智能体 服务端智能体 客户端智能体 连接复用 处理请求 连接保持,可复用 HTTP/2 连接建立 Protobuf 序列化请求 Protobuf 序列化响应

WebSocket 交互流程:

服务端智能体 客户端智能体 服务端智能体 客户端智能体 WebSocket 连接建立 loop [持续通信] HTTP 握手请求 (Upgrade) 101 Switching Protocols 消息帧 (文本/二进制) 消息帧 (文本/二进制) 关闭帧 关闭帧确认

5. 数学模型与性能分析

在深入代码实现之前,让我们先建立一些数学模型,从理论上分析这两种通信方式的性能特点。

5.1 通信延迟模型

通信延迟通常由以下几个部分组成:

Ttotal=Tsetup+Tserialization+Ttransmission+Tprocessing+Tdeserialization T_{total} = T_{setup} + T_{serialization} + T_{transmission} + T_{processing} + T_{deserialization} Ttotal=Tsetup+Tserialization+Ttransmission+Tprocessing+Tdeserialization

其中:

  • TtotalT_{total}Ttotal:总延迟
  • TsetupT_{setup}Tsetup:连接建立延迟
  • TserializationT_{serialization}Tserialization:序列化延迟
  • TtransmissionT_{transmission}Ttransmission:传输延迟
  • TprocessingT_{processing}Tprocessing:服务端处理延迟
  • TdeserializationT_{deserialization}Tdeserialization:反序列化延迟
gRPC 延迟分析

对于 gRPC,特别是在使用 HTTP/2 多路复用时:

TgRPC=THTTP2_setup+N×(Tproto_ser+Thttp2_frame+Tproc+Tproto_deser) T_{gRPC} = T_{HTTP2\_setup} + N \times (T_{proto\_ser} + T_{http2\_frame} + T_{proc} + T_{proto\_deser}) TgRPC=THTTP2_setup+N×(Tproto_ser+Thttp2_frame+Tproc+Tproto_deser)

其中:

  • THTTP2_setupT_{HTTP2\_setup}THTTP2_setup:HTTP/2 连接建立延迟(通常只需一次)
  • NNN:请求/响应次数
  • Tproto_serT_{proto\_ser}Tproto_ser:Protocol Buffers 序列化延迟
  • Thttp2_frameT_{http2\_frame}Thttp2_frame:HTTP/2 帧传输延迟
  • TprocT_{proc}Tproc:处理延迟
  • Tproto_deserT_{proto\_deser}Tproto_deser:Protocol Buffers 反序列化延迟
WebSocket 延迟分析

对于 WebSocket:

TWebSocket=TWS_setup+N×(Tcustom_ser+TWS_frame+Tproc+Tcustom_deser) T_{WebSocket} = T_{WS\_setup} + N \times (T_{custom\_ser} + T_{WS\_frame} + T_{proc} + T_{custom\_deser}) TWebSocket=TWS_setup+N×(Tcustom_ser+TWS_frame+Tproc+Tcustom_deser)

其中:

  • TWS_setupT_{WS\_setup}TWS_setup:WebSocket 握手延迟(通常只需一次)
  • NNN:消息数量
  • Tcustom_serT_{custom\_ser}Tcustom_ser:自定义序列化延迟(如 JSON 序列化)
  • TWS_frameT_{WS\_frame}TWS_frame:WebSocket 帧传输延迟
  • TprocT_{proc}Tproc:处理延迟
  • Tcustom_deserT_{custom\_deser}Tcustom_deser:自定义反序列化延迟

5.2 吞吐量模型

吞吐量是指单位时间内能够传输的数据量,通常用 messages/second 或 MB/s 来衡量。

Throughput=Data SizeTotal Time \text{Throughput} = \frac{\text{Data Size}}{\text{Total Time}} Throughput=Total TimeData Size

序列化效率比较

Protocol Buffers 通常比 JSON 更高效。让我们定义一个序列化效率比:

Rser=SizeJSONSizeProtoBuf R_{ser} = \frac{\text{Size}_{\text{JSON}}}{\text{Size}_{\text{ProtoBuf}}} Rser=SizeProtoBufSizeJSON

根据经验数据,RserR_{ser}Rser 通常在 2 到 10 之间,取决于数据的结构。这意味着 Protocol Buffers 序列化后的数据大小通常是 JSON 的 1/2 到 1/10。

头部开销比较

HTTP/1.1 的头部开销通常较大,而 HTTP/2 使用 HPACK 压缩算法可以显著减少头部开销。WebSocket 在握手后也有较小的帧头开销。

让我们定义头部开销因子:

Oheader=Header SizeTotal Message Size O_{header} = \frac{\text{Header Size}}{\text{Total Message Size}} Oheader=Total Message SizeHeader Size

对于小消息,头部开销可能会成为主导因素。

5.3 资源消耗模型

CPU 使用率

序列化和反序列化是 CPU 密集型操作。Protocol Buffers 通常比 JSON 更省 CPU,因为它的解析更简单。

CPUusage∝Kser×N+Kproc×N CPU_{usage} \propto K_{ser} \times N + K_{proc} \times N CPUusageKser×N+Kproc×N

其中 KserK_{ser}Kser 是序列化/反序列化的 CPU 消耗系数,KprocK_{proc}Kproc 是处理的 CPU 消耗系数。

内存占用

内存占用主要用于存储连接状态、缓冲区和序列化/反序列化中间结果。

Memoryusage=Mconn×C+Mbuffer×B+Mser×N Memory_{usage} = M_{conn} \times C + M_{buffer} \times B + M_{ser} \times N Memoryusage=Mconn×C+Mbuffer×B+Mser×N

其中:

  • MconnM_{conn}Mconn:每个连接的内存占用
  • CCC:连接数量
  • MbufferM_{buffer}Mbuffer:每个缓冲区的内存占用
  • BBB:缓冲区数量
  • MserM_{ser}Mser:每次序列化/反序列化的临时内存占用
  • NNN:并发操作数

5.4 扩展性模型

当智能体数量增加时,系统的扩展性变得至关重要。我们可以用以下模型来描述:

S(N)=P(N)P(1) S(N) = \frac{P(N)}{P(1)} S(N)=P(1)P(N)

其中 S(N)S(N)S(N) 是系统在 NNN 个智能体时的扩展比,P(N)P(N)P(N)NNN 个智能体时的性能指标(如吞吐量)。

对于 gRPC,由于 HTTP/2 的多路复用,连接数量不会随着智能体数量线性增长:

CgRPC(N)≈min⁡(N,Cmax) C_{gRPC}(N) \approx \min(N, C_{max}) CgRPC(N)min(N,Cmax)

其中 CmaxC_{max}Cmax 是最大连接数,通常由 HTTP/2 的并发流限制决定。

对于 WebSocket,每个智能体通常需要一个独立的连接:

CWebSocket(N)≈N C_{WebSocket}(N) \approx N CWebSocket(N)N


6. 实际场景应用与项目设计

现在我们已经了解了理论基础,让我们设计一个实际的多智能体系统项目,分别使用 gRPC 和 WebSocket 实现,并进行性能对比。

6.1 项目介绍

我们将创建一个简化的分布式传感器网络系统,模拟以下场景:

  • 多个传感器智能体(Sensor Agent)收集环境数据
  • 一个中心协调器智能体(Coordinator Agent)接收和处理数据
  • 传感器需要定期向协调器发送数据
  • 协调器需要向传感器发送控制指令
  • 我们需要评估在不同负载下两种通信方式的性能

6.2 环境安装

让我们先设置项目环境:

# 创建项目目录
mkdir mas-communication-comparison
cd mas-communication-comparison

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装必要的依赖
pip install grpcio grpcio-tools websockets asyncio protobuf python-dotenv

6.3 系统功能设计

我们的系统将包含以下功能:

  1. 传感器注册:传感器启动时向协调器注册
  2. 数据上报:传感器定期向协调器上报传感器数据
  3. 指令下发:协调器向传感器发送控制指令
  4. 状态同步:传感器和协调器同步状态信息
  5. 性能监控:系统自动收集通信性能指标

6.4 系统架构设计

让我们先看看整体系统架构:

多智能体系统

协调器层

通信层

传感器智能体层

中心协调器

传感器智能体 1

gRPC 通信模块

传感器智能体 2

传感器智能体 n

WebSocket 通信模块

性能监控模块

6.5 数据结构设计

我们需要定义两种通信方式共享的数据结构。让我们先从 Protocol Buffers 定义开始,这将用于 gRPC,同时也为 WebSocket 提供数据结构参考。

// sensor.proto
syntax = "proto3";

package sensor;

import "google/protobuf/timestamp.proto";

// 传感器数据消息
message SensorData {
  string sensor_id = 1;
  google.protobuf.Timestamp timestamp = 2;
  double temperature = 3;
  double humidity = 4;
  double pressure = 5;
  int32 battery_level = 6;
  SensorStatus status = 7;
}

// 传感器状态枚举
enum SensorStatus {
  SENSOR_STATUS_UNSPECIFIED = 0;
  SENSOR_STATUS_OK = 1;
  SENSOR_STATUS_WARNING = 2;
  SENSOR_STATUS_ERROR = 3;
}

// 注册请求
message RegisterRequest {
  string sensor_id = 1;
  string sensor_type = 2;
  string location = 3;
}

// 注册响应
message RegisterResponse {
  bool success = 1;
  string message = 2;
  string coordinator_id = 3;
}

// 控制指令
message ControlCommand {
  string command_id = 1;
  string sensor_id = 2;
  CommandType type = 3;
  map<string, string> parameters = 4;
  google.protobuf.Timestamp timestamp = 5;
}

// 指令类型枚举
enum CommandType {
  COMMAND_TYPE_UNSPECIFIED = 0;
  COMMAND_TYPE_SET_REPORT_INTERVAL = 1;
  COMMAND_TYPE_START_CALIBRATION = 2;
  COMMAND_TYPE_RESET = 3;
  COMMAND_TYPE_UPDATE_FIRMWARE = 4;
}

// 指令响应
message CommandResponse {
  string command_id = 1;
  bool success = 2;
  string message = 3;
}

// 服务定义
service SensorService {
  // 一元 RPC:传感器注册
  rpc RegisterSensor(RegisterRequest) returns (RegisterResponse);
  
  // 一元 RPC:发送传感器数据
  rpc SendSensorData(SensorData) returns (CommandResponse);
  
  // 服务端流式 RPC:接收控制指令
  rpc SubscribeCommands(RegisterRequest) returns (stream ControlCommand);
  
  // 双向流式 RPC:实时数据和指令交换
  rpc RealTimeStream(stream SensorData) returns (stream ControlCommand);
}

现在让我们生成 gRPC 代码:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. sensor.proto

这将生成 sensor_pb2.pysensor_pb2_grpc.py 文件。


7. 系统实现

现在让我们分别实现基于 gRPC 和 WebSocket 的通信系统。

7.1 gRPC 实现

首先,让我们实现 gRPC 版本的协调器和传感器。

gRPC 协调器实现
# grpc_coordinator.py
import asyncio
import grpc
import time
import random
from google.protobuf.timestamp_pb2 import Timestamp
from collections import defaultdict, deque
import sensor_pb2
import sensor_pb2_grpc

class PerformanceMonitor:
    """性能监控器"""
    def __init__(self):
        self.latency_samples = defaultdict(deque)  # 延迟采样
        self.throughput_samples = defaultdict(list)  # 吞吐量采样
        self.message_count = defaultdict(int)  # 消息计数
        self.start_time = time.time()
        
    def record_message(self, sensor_id, message_type, latency=None):
        """记录消息"""
        self.message_count[message_type] += 1
        self.message_count[f"{message_type}_{sensor_id}"] += 1
        
        if latency is not None:
            self.latency_samples[sensor_id].append(latency)
            # 只保留最近1000个样本
            if len(self.latency_samples[sensor_id]) > 1000:
                self.latency_samples[sensor_id].popleft()
    
    def get_statistics(self):
        """获取统计数据"""
        elapsed_time = time.time() - self.start_time
        
        stats = {
            "elapsed_time_seconds": elapsed_time,
            "total_messages": sum(v for k, v in self.message_count.items() if "_" not in k),
            "messages_per_second": sum(v for k, v in self.message_count.items() if "_" not in k) / elapsed_time if elapsed_time > 0 else 0
        }
        
        # 计算延迟统计
        all_latencies = []
        for sensor_latencies in self.latency_samples.values():
            all_latencies.extend(sensor_latencies)
            
        if all_latencies:
            stats["avg_latency_ms"] = sum(all_latencies) / len(all_latencies) * 1000
            stats["min_latency_ms"] = min(all_latencies) * 1000
            stats["max_latency_ms"] = max(all_latencies) * 1000
            stats["p50_latency_ms"] = sorted(all_latencies)[len(all_latencies) // 2] * 1000
            stats["p95_latency_ms"] = sorted(all_latencies)[int(len(all_latencies) * 0.95)] * 1000 if len(all_latencies) > 20 else stats["max_latency_ms"]
        
        return stats

class SensorService(sensor_pb2_grpc.SensorServiceServicer):
    """传感器服务实现"""
    def __init__(self, perf_monitor):
        self.perf_monitor = perf_monitor
        self.sensors = {}  # 已注册的传感器
        self.command_queues = defaultdict(asyncio.Queue)  # 每个传感器的指令队列
        self._register_command_generator()
    
    def _register_command_generator(self):
        """注册指令生成任务"""
        asyncio.create_task(self._generate_random_commands())
    
    async def _generate_random_commands(self):
        """定期生成随机控制指令"""
        command_types = [
            sensor_pb2.COMMAND_TYPE_SET_REPORT_INTERVAL,
            sensor_pb2.COMMAND_TYPE_START_CALIBRATION,
            sensor_pb2.COMMAND_TYPE_RESET,
            sensor_pb2.COMMAND_TYPE_UPDATE_FIRMWARE
        ]
        
        while True:
            await asyncio.sleep(random.uniform(5, 15))  # 每5-15秒生成一次指令
            
            for sensor_id in self.sensors:
                if random.random() < 0.3:  # 30%的概率向传感器发送指令
                    command = sensor_pb2.ControlCommand(
                        command_id=f"cmd_{int(time.time())}_{random.randint(1000, 9999)}",
                        sensor_id=sensor_id,
                        type=random.choice(command_types),
                        timestamp=Timestamp(seconds=int(time.time()), nanos=int((time.time() % 1) * 1e9))
                    )
                    
                    # 添加一些参数
                    if command.type == sensor_pb2.COMMAND_TYPE_SET_REPORT_INTERVAL:
                        command.parameters["interval_seconds"] = str(random.randint(1, 10))
                    
                    await self.command_queues[sensor_id].put(command)
    
    async def RegisterSensor(self, request, context):
        """处理传感器注册"""
        start_time = time.time()
        print(f"收到注册请求: {request.sensor_id}")
        
        # 记录注册
        self.sensors[request.sensor_id] = {
            "sensor_type": request.sensor_type,
            "location": request.location,
            "registered_at": time.time()
        }
        
        latency = time.time() - start_time
        self.perf_monitor.record_message(request.sensor_id, "register", latency)
        
        return sensor_pb2.RegisterResponse(
            success=True,
            message=f"传感器 {request.sensor_id} 注册成功",
            coordinator_id="coordinator_1"
        )
    
    async def SendSensorData(self, request, context):
        """处理传感器数据上报"""
        start_time = time.time()
        
        # 简单处理数据
        if request.battery_level < 10:
            print(f"警告: 传感器 {request.sensor_id} 电量低: {request.battery_level}%")
        
        latency = time.time() - start_time
        self.perf_monitor.record_message(request.sensor_id, "sensor_data", latency)
        
        return sensor_pb2.CommandResponse(
            command_id="",
            success=True,
            message="数据接收成功"
        )
    
    async def SubscribeCommands(self, request, context):
        """服务端流式RPC:订阅控制指令"""
        print(f"传感器 {request.sensor_id} 订阅控制指令")
        
        # 确保传感器已注册
        if request.sensor_id not in self.sensors:
            await self.RegisterSensor(request, context)
        
        try:
            while True:
                # 从队列获取指令
                command = await self.command_queues[request.sensor_id].get()
                self.perf_monitor.record_message(request.sensor_id, "command_sent")
                yield command
                
        except asyncio.CancelledError:
            print(f"传感器 {request.sensor_id} 取消了指令订阅")
    
    async def RealTimeStream(self, request_iterator, context):
        """双向流式RPC:实时数据和指令交换"""
        # 获取传感器ID(从第一条消息中)
        first_message = await anext(request_iterator)
        sensor_id = first_message.sensor_id
        
        print(f"传感器 {sensor_id} 建立了实时流")
        
        # 处理第一条消息
        start_time = time.time()
        self.perf_monitor.record_message(sensor_id, "sensor_data", time.time() - start_time)
        
        # 启动发送指令的任务
        async def send_commands():
            try:
                while True:
                    command = await self.command_queues[sensor_id].get()
                    self.perf_monitor.record_message(sensor_id, "command_sent")
                    yield command
            except asyncio.CancelledError:
                pass
        
        # 处理接收消息的任务
        async def process_messages():
            try:
                async for message in request_iterator:
                    start_time = time.time()
                    # 处理传感器数据
                    if message.battery_level < 10:
                        print(f"警告: 传感器 {message.sensor_id} 电量低: {message.battery_level}%")
                    
                    self.perf_monitor.record_message(message.sensor_id, "sensor_data", time.time() - start_time)
            except asyncio.CancelledError:
                pass
        
        # 并发处理发送和接收
        send_task = asyncio.create_task(send_commands())
        process_task = asyncio.create_task(process_messages())
        
        try:
            # 等待任一任务完成(通常是连接关闭)
            done, pending = await asyncio.wait(
                [send_task, process_task],
                return_when=asyncio.FIRST_COMPLETED
            )
            
            # 取消另一个任务
            for task in pending:
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass
            
            # 返回发送的命令
            for task in done:
                if task is send_task:
                    for command in task.result():
                        yield command
                        
        except Exception as e:
            print(f"实时流处理异常: {e}")
        finally:
            print(f"传感器 {sensor_id} 的实时流已关闭")

async def print_statistics_periodically(perf_monitor, interval=10):
    """定期打印统计信息"""
    while True:
        await asyncio.sleep(interval)
        stats = perf_monitor.get_statistics()
        print("\n========== 性能统计 ==========")
        for key, value in stats.items():
            if isinstance(value, float):
                print(f"{key}: {value:.2f}")
            else:
                print(f"{key}: {value}")
        print("===============================\n")

async def serve():
    """启动gRPC服务器"""
    perf_monitor = PerformanceMonitor()
    
    server = grpc.aio.server()
    sensor_pb2_grpc.add_SensorServiceServicer_to_server(
        SensorService(perf_monitor), server)
    
    listen_addr = '[::]:50051'
    server.add_insecure_port(listen_addr)
    await server.start()
    print(f"gRPC 协调器已启动,监听 {listen_addr}")
    
    # 启动统计打印任务
    asyncio.create_task(print_statistics_periodically(perf_monitor))
    
    await server.wait_for_termination()

if __name__ == '__main__':
    asyncio.run(serve())
gRPC 传感器实现
# grpc_sensor.py
import asyncio
import grpc
import time
import random
import argparse
from google.protobuf.timestamp_pb2 import Timestamp
import sensor_pb2
import sensor_pb2_grpc

class SensorAgent:
    """传感器智能体"""
    def __init__(self, sensor_id, server_address, sensor_type="temperature_humidity", location="unknown"):
        self.sensor_id = sensor_id
        self.server_address = server_address
        self.sensor_type = sensor_type
        self.location = location
        self.report_interval = 1.0  # 默认每秒上报一次
        self.is_running = False
        self.channel = None
        self.stub = None
        
    async def connect(self):
        """连接到协调器"""
        self.channel = grpc.aio.insecure_channel(self.server_address)
        self.stub = sensor_pb2_grpc.SensorServiceStub(self.channel)
        
        # 注册传感器
        register_request = sensor_pb2.RegisterRequest(
            sensor_id=self.sensor_id,
            sensor_type=self.sensor_type,
            location=self.location
        )
        
        try:
            response = await self.stub.RegisterSensor(register_request)
            if response.success:
                print(f"传感器 {self.sensor_id} 成功注册到协调器 {response.coordinator_id}")
                return True
            else:
                print(f"传感器 {self.sensor_id} 注册失败: {response.message}")
                return False
        except Exception as e:
            print(f"传感器 {self.sensor_id} 连接失败: {e}")
            return False
    
    def generate_sensor_data(self):
        """生成模拟传感器数据"""
        timestamp = Timestamp(seconds=int(time.time()), nanos=int((time.time() % 1) * 1e9))
        
        # 模拟一些真实的传感器数据变化
        temperature = 20 + random.uniform(-5, 5)
        humidity = 50 + random.uniform(-20, 20)
        pressure = 1013 + random.uniform(-10, 10)
        battery_level = max(0, 100 - random.uniform(0, 0.1))  # 电池缓慢消耗
        
        # 随机状态
        status = sensor_pb2.SENSOR_STATUS_OK
        if battery_level < 10:
            status = sensor_pb2.SENSOR_STATUS_ERROR
        elif battery_level < 30:
            status = sensor_pb2.SENSOR_STATUS_WARNING
        
        return sensor_pb2.SensorData(
            sensor_id=self.sensor_id,
            timestamp=timestamp,
            temperature=temperature,
            humidity=humidity,
            pressure=pressure,
            battery_level=battery_level,
            status=status
        )
    
    async def send_sensor_data(self):
        """发送传感器数据(一元RPC方式)"""
        while self.is_running:
            try:
                sensor_data = self.generate_sensor_data()
                response = await self.stub.SendSensorData(sensor_data)
                if not response.success:
                    print(f"传感器 {self.sensor_id} 数据发送失败: {response.message}")
            except Exception as e:
                print(f"传感器 {self.sensor_id} 发送数据异常: {e}")
            
            await asyncio.sleep(self.report_interval)
    
    async def subscribe_commands(self):
        """订阅控制指令(服务端流式RPC)"""
        try:
            register_request = sensor_pb2.RegisterRequest(
                sensor_id=self.sensor_id,
                sensor_type=self.sensor_type,
                location=self.location
            )
            
            async for command in self.stub.SubscribeCommands(register_request):
                print(f"传感器 {self.sensor_id} 收到指令: {command}")
                await self._handle_command(command)
                
        except Exception as e:
            if self.is_running:  # 只有在主动运行时才打印错误
                print(f"传感器 {self.sensor_id} 指令订阅异常: {e}")
    
    async def _handle_command(self, command):
        """处理控制指令"""
        if command.type == sensor_pb2.COMMAND_TYPE_SET_REPORT_INTERVAL:
            interval = float(command.parameters.get("interval_seconds", 1.0))
            self.report_interval = interval
            print(f"传感器 {self.sensor_id} 更新上报间隔: {interval}秒")
        elif command.type == sensor_pb2.COMMAND_TYPE_RESET:
            print(f"传感器 {self.sensor_id} 执行重置")
        elif command.type == sensor_pb2.COMMAND_TYPE_START_CALIBRATION:
            print(f"传感器 {self.sensor_id} 开始校准")
        elif command.type == sensor_pb2.COMMAND_TYPE_UPDATE_FIRMWARE:
            print(f"传感器 {self.sensor_id} 更新固件")
    
    async def real_time_stream(self):
        """使用双向流进行实时通信"""
        try:
            # 创建请求迭代器
            async def request_generator():
                while self.is_running:
                    yield self.generate_sensor_data()
                    await asyncio.sleep(self.report_interval)
            
            # 调用双向流RPC
            async for command in self.stub.RealTimeStream(request_generator()):
                print(f"传感器 {self.sensor_id} 收到指令: {command}")
                await self._handle_command(command)
                
        except Exception as e:
            if self.is_running:
                print(f"传感器 {self.sensor_id} 实时流异常: {e}")
    
    async def run(self, use_stream=False):
        """运行传感器"""
        if not await self.connect():
            return
        
        self.is_running = True
        print(f"传感器 {self.sensor_id} 开始运行")
        
        if use_stream:
            # 使用双向流方式
            await self.real_time_stream()
        else:
            # 使用一元RPC + 服务端流方式
            send_task = asyncio.create_task(self.send_sensor_data())
            subscribe_task = asyncio.create_task(self.subscribe_commands())
            
            try:
                await asyncio.gather(send_task, subscribe_task)
            except asyncio.CancelledError:
                pass
            finally:
                self.is_running = False
                send_task.cancel()
                subscribe_task.cancel()
    
    async def stop(self):
        """停止传感器"""
        self.is_running = False
        if self.channel:
            await self.channel.close()
        print(f"传感器 {self.sensor_id} 已停止")

async def main():
    parser = argparse.ArgumentParser(description='gRPC 传感器智能体')
    parser.add_argument('--sensor-id', type=str, default=f"sensor_{random.randint(1000, 9999)}",
                        help='传感器 ID')
    parser.add_argument('--server-address', type=str, default='localhost:50051',
                        help='协调器地址')
    parser.add_argument('--sensor-type', type=str, default='temperature_humidity',
                        help='传感器类型')
    parser.add_argument('--location', type=str, default='unknown',
                        help='传感器位置')
    parser.add_argument('--use-stream', action='store_true',
                        help='使用双向流模式')
    
    args = parser.parse_args()
    
    sensor = SensorAgent(
        args.sensor_id,
        args.server_address,
        args.sensor_type,
        args.location
    )
    
    try:
        await sensor.run(args.use_stream)
    except KeyboardInterrupt:
        print("\n接收到中断信号,停止传感器...")
        await sensor.stop()

if __name__ == '__main__':
    asyncio.run(main())

7.2 WebSocket 实现

现在让我们实现 WebSocket 版本的协调器和传感器。我们将使用 JSON 作为数据交换格式,以便与 gRPC 的 Protocol Buffers 进行对比。

WebSocket 协调器实现
# websocket_coordinator.py
import asyncio
import websockets
import json
import time
import random
from collections import defaultdict, deque
from datetime import datetime

class PerformanceMonitor:
    """性能监控器(与gRPC版本相同,用于公平比较)"""
    def __init__(self):
        self.latency_samples = defaultdict(deque)  # 延迟采样
        self.throughput_samples = defaultdict(list)  # 吞吐量采样
        self.message_count = defaultdict(int)  # 消息计数
        self.start_time = time.time()
        
    def record_message(self, sensor_id, message_type, latency=None):
        """记录消息"""
        self.message_count[message_type] += 1
        self.message_count[f"{message_type}_{sensor_id}"] += 1
        
        if latency is not None:
            self.latency_samples[sensor_id].append(latency)
            # 只保留最近1000个样本
            if len(self.latency_samples[sensor_id]) > 1000:
                self.latency_samples[sensor_id].popleft()
    
    def get_statistics(self):
        """获取统计数据"""
        elapsed_time = time.time() - self.start_time
        
        stats = {
            "elapsed_time_seconds": elapsed_time,
            "total_messages": sum(v for k, v in self.message_count.items() if "_" not in k),
            "messages_per_second": sum(v for k, v in self.message_count.items() if "_" not in k) / elapsed_time if elapsed_time > 0 else 0
        }
        
        # 计算延迟统计
        all_latencies = []
        for sensor_latencies in self.latency_samples.values():
            all_latencies.extend(sensor_latencies)
            
        if all_latencies:
            stats["avg_latency_ms"] = sum(all_latencies) / len(all_latencies) * 1000
            stats["min_latency_ms"] = min(all_latencies) * 1000
            stats["max_latency_ms"] = max(all_latencies) * 1000
            stats["p50_latency_ms"] = sorted(all_latencies)[len(all_latencies) // 2] * 1000
            stats["p95_latency_ms"] = sorted(all_latencies)[int(len(all_latencies) * 0.95)] * 1000 if len(all_latencies) > 20 else stats["max_latency_ms"]
        
        return stats

class WebSocketCoordinator:
    """WebSocket 协调器"""
    def __init__(self, host='localhost', port=8765):
        self.host = host
        self.port = port
        self.perf_monitor = PerformanceMonitor()
        self.connected_sensors = {}  # 已连接的传感器
        self.command_queues = defaultdict(asyncio.Queue)  # 每个传感器的指令队列
        self.command_generator_task = None
    
    async def start(self):
        """启动协调器"""
        # 启动指令生成任务
        self.command_generator_task = asyncio.create_task(self._generate_random_commands())
        
        # 启动统计打印任务
        asyncio.create_task(self._print_statistics_periodically())
        
        # 启动WebSocket服务器
        print(f"WebSocket 协调器已启动,监听 ws://{self.host}:{self.port}")
        async with websockets.serve(self._handle_connection, self.host, self.port):
            await asyncio.Future()  # 永久运行
    
    async def _generate_random_commands(self):
        """定期生成随机控制指令"""
        command_types = [
            "SET_REPORT_INTERVAL",
            "START_CALIBRATION",
            "RESET",
            "UPDATE_FIRMWARE"
        ]
        
        while True:
            await asyncio.sleep(random.uniform(5, 15))  # 每5-15秒生成一次指令
            
            for sensor_id in list(self.connected_sensors.keys()):
                if random.random() < 0.3:  # 30%的概率向传感器发送指令
                    command = {
                        "type": "control_command",
                        "data": {
                            "command_id": f"cmd_{int(time.time())}_{random.randint(1000, 9999)}",
                            "sensor_id": sensor_id,
                            "command_type": random.choice(command_types),
                            "parameters": {},
                            "timestamp": datetime.utcnow().isoformat() + "Z"
                        }
                    }
                    
                    # 添加一些参数
                    if command["data"]["command_type"] == "SET_REPORT_INTERVAL":
                        command["data"]["parameters"]["interval_seconds"] = random.randint(1, 10)
                    
                    await self.command_queues[sensor_id].put(command)
    
    async def _print_statistics_periodically(self, interval=10):
        """定期打印统计信息"""
        while True:
            await asyncio.sleep(interval)
            stats = self.perf_monitor.get_statistics()
            print("\n========== 性能统计 ==========")
            for key, value in stats.items():
                if isinstance(value, float):
                    print(f"{key}: {value:.2f}")
                else:
                    print(f"{key}: {value}")
            print("===============================\n")
    
    async def _handle_connection(self, websocket, path):
        """处理WebSocket连接"""
        sensor_id = None
        try:
            # 等待注册消息
            register_message = await websocket.recv()
            register_data = json.loads(register_message)
            
            if register_data.get("type") != "register":
                await websocket.send(json.dumps({
                    "type": "error",
                    "message": "期望注册消息"
                }))
                return
            
            # 处理注册
            sensor_id = register_data["data"]["sensor_id"]
            start_time = time.time()
            
            print(f"收到注册请求: {sensor_id}")
            
            # 记录注册信息
            self.connected_sensors[sensor_id] = {
                "websocket": websocket,
                "sensor_type": register_data["data"].get("sensor_type", "unknown"),
                "location": register_data["data"].get("location", "unknown"),
                "connected_at": time.time()
            }
            
            # 发送注册响应
            await websocket.send(json.dumps({
                "type": "register_response",
                "data": {
                    "success": True,
                    "message": f"传感器 {sensor_id} 注册成功",
                    "coordinator_id": "coordinator_1"
                }
            }))
            
           
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐