集群聊天室项目--Server层代码设计
是 muduo 的多 Reactor 配置 ——1 个 mainReactor(处理监听连接)+ 4 个 subReactor(处理已连接客户端的 IO),提升并发 IO 处理能力;意图将耗时业务(如数据库查询)放到独立线程池,避免阻塞 IO 线程(Reactor 模式的核心原则:IO 线程只做 IO,不做耗时操作)。:1 个 mainReactor 处理监听连接,4 个 subReactor 处
封装的Server层代码主要有以下作用
管理 TCP 连接的建立 / 断开;
处理客户端网络数据的读写;
将业务逻辑解耦到 ChatService 模块(业务层),实现 “网络层与业务层分离”;
基于 Reactor 事件驱动模型,支持多线程 IO 处理,为高并发聊天场景提供基础。Onconnection回调函数交给单reactor处理 OnMessage回调函数交给多线程处理
代码结构与核心模块拆解
1. 头文件与命名空间(ChatServer.hpp)
#ifndef CHATSERVER_HPP
#define CHATSERVER_HPP
#include<muduo/net/TcpServer.h> // muduo TCP服务器核心类
#include<muduo/net/EventLoop.h> // muduo Reactor事件循环核心
#include<muduo/base/ThreadPool.h> // 线程池(注释未启用)
#include<muduo/base/Logging.h> // muduo 日志组件
#include<functional> // 绑定回调函数
#include<string>
#include<iostream>
#include"json.hpp" // JSON序列化/反序列化
#include"chatservice.hpp" // 业务层核心类
// 命名空间简化
using namespace muduo;
using namespace muduo::net;
using namespace std;
using namespace placeholders;
using json=nlohmann::json;
-
头文件保护宏:
#ifndef...#endif防止头文件重复包含; -
核心依赖:
-
muduo 库组件:实现网络 IO、事件循环、线程池、日志;
-
nlohmann/json:客户端与服务器的消息统一用 JSON 格式序列化,便于解析;
-
chatservice.hpp:业务层(如私聊、群聊、好友管理),与网络层解耦。
-
2. ChatServer 类设计(核心数据结构)
class ChatServer
{
// 私有成员:网络核心组件
TcpServer server_; // muduo TCP服务器对象(封装监听、连接管理、IO分发)
EventLoop* loop_; // Reactor核心:事件循环(处理IO事件、定时器等)
// ThreadPool threadpool_; // 业务线程池(注释:意图异步处理耗时业务)
// 私有回调函数:muduo事件触发时自动调用
void onConnection(const TcpConnectionPtr&); // 处理连接建立/断开
void onMessage(const TcpConnectionPtr&,Buffer*,Timestamp); // 处理消息读写
public:
// 构造函数:初始化服务器
ChatServer(EventLoop*loop, const InetAddress& listenAddr,const string& nameArg);
// void doHeavyBusiness(const TcpConnectionPtr& conn, const string& msg); // 耗时业务处理(注释)
void start(); // 启动服务器
};
#endif
-
核心成员:
-
server_:muduo 的TcpServer是封装好的 TCP 服务器,无需手动处理 socket、bind、listen 等底层操作; -
loop_:muduo 的EventLoop是 Reactor 模式的核心,负责监听事件(如连接、读写)并分发到对应回调; -
回调函数:
onConnection(连接事件)、onMessage(消息事件)是 muduo 的核心回调机制,事件触发时自动执行。
-
3. 构造函数实现(ChatServer.cpp)
ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg)
: loop_(loop), server_(loop, listenAddr, nameArg) // 初始化列表赋值
{
// 绑定连接事件回调:_1是占位符,对应TcpConnectionPtr参数
server_.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
// 绑定消息事件回调:_1=TcpConnectionPtr, _2=Buffer*, _3=Timestamp
server_.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(4); // 设置muduo IO线程池数量(subReactor数)
// threadpool_.start(4); // 注释:启动4个业务线程,异步处理耗时逻辑
}
-
核心逻辑:
-
初始化赋值:将事件循环、监听地址、服务器名称传给
TcpServer; -
绑定回调:通过
std::bind将类成员函数绑定为 muduo 的回调函数(muduo 回调要求是可调用对象,需绑定this指针); -
设置 IO 线程数:
setThreadNum(4)是 muduo 的多 Reactor 配置 ——1 个 mainReactor(处理监听连接)+ 4 个 subReactor(处理已连接客户端的 IO),提升并发 IO 处理能力; -
业务线程池(注释):意图启动 4 个业务线程,将耗时业务(如数据库操作)异步执行,避免阻塞 IO 线程。原本十一这样的想法设计,但是业务模块提供的全局唯一实例,我设计的初衷是将一个客户端的多个业务分配给4个线程,因此后续设计在了业务代码中
-
4. 连接事件回调(onConnection)
void ChatServer::onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected()) // 新连接建立
{
printf("ChatServer - %s connected\n", conn->peerAddress().toIpPort().c_str());
}
else // 连接断开
{
printf("ChatServer - %s disconnected\n", conn->peerAddress().toIpPort().c_str());
// 业务层处理客户端异常断开(如清理用户在线状态、释放资源)
chatservice::instance()->clientCloseException(conn);
conn->shutdown(); // 主动关闭TCP连接,释放资源
}
}
- 核心逻辑:
conn->connected():判断连接状态(true = 新连接,false = 断开);- 连接断开时:调用
ChatService单例的clientCloseException处理业务层清理(如用户离线),再调用shutdown()关闭连接(确保 TCP 四次挥手完成)。
5. 消息事件回调(onMessage)
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time)
{
// 读取客户端发送的所有数据(muduo Buffer自动处理TCP缓冲区)
string buff = buffer->retrieveAllAsString();
try{
// 解析JSON数据(客户端与服务器的消息协议)
json js = json::parse(buff);
// 分发到业务层处理:解耦网络层与业务层
chatservice::instance()->recvmsg(conn, js, time);
}catch(const exception& e){
// 捕获JSON解析异常,避免非法数据导致服务器崩溃
LOG_ERROR << "JSON parse error: " << e.what();
}
// 注释:原思路——通过msgid获取业务处理器,交给线程池异步执行
// int id = js["msgid"].get<int>();
// auto msgHandler=chatservice::instance()->getMsgHandler(id);
// threadpool_.run(std::bind(msgHandler, conn, js, time));
}
- 核心逻辑:
-
读取数据:
buffer->retrieveAllAsString()读取客户端发送的所有字节(muduo 的Buffer适配 TCP 流式传输,自动处理粘包 / 拆包的基础); -
JSON 解析:将字符串转为 JSON 对象(聊天消息的统一格式,如
{"msgid":1, "userid":1001, "content":"hello"}); -
异常捕获:必须捕获
json::parse异常,否则客户端发送非法 JSON 会导致服务器崩溃; -
业务分发:调用
ChatService单例的recvmsg,将网络数据交给业务层处理(网络层只做 IO,不碰业务逻辑)
-
6. 启动服务器(start)
void ChatServer::start()
{
server_.start(); // 启动muduo TcpServer,开始监听客户端连接
}
- 底层逻辑:
server_.start()会启动监听 socket,将 “监听事件” 注册到EventLoop,事件循环开始运行,服务器进入等待连接状态。
核心设计思路与模式
1. Reactor 事件驱动模型(muduo 核心)
-
单 Reactor + 多线程:1 个 mainReactor 处理监听连接,4 个 subReactor 处理已连接客户端的 IO,避免单线程 IO 瓶颈;
-
事件回调:连接 / 消息事件触发时,muduo 自动调用绑定的回调函数,无需轮询,提升效率。
2. 分层设计(高内聚低耦合)
-
网络层(ChatServer):只负责 TCP 连接管理、数据读写、异常处理,不涉及具体业务;
-
业务层(ChatService):负责聊天逻辑(私聊、群聊、好友列表、数据库操作等),与网络层完全解耦,便于维护和扩展。
3. 单例模式(ChatService::instance ())
-
ChatService采用单例模式,确保全局只有一个业务实例,统一管理用户在线状态、业务逻辑、数据库连接等全局资源。
4. 多线程设计
-
IO 多线程:muduo 的
setThreadNum(4)实现 IO 线程池,处理高并发客户端的 IO; -
业务多线程(预留):注释的
ThreadPool意图将耗时业务(如数据库查询)放到独立线程池,避免阻塞 IO 线程(Reactor 模式的核心原则:IO 线程只做 IO,不做耗时操作)。
整体执行流程
-
初始化
EventLoop(事件循环); -
创建
ChatServer实例,传入监听地址、事件循环、服务器名称; -
调用
ChatServer::start(),启动 TcpServer,开始监听客户端连接; -
客户端连接:
onConnection被调用,打印连接信息; -
客户端发消息:
onMessage被调用,读取数据→解析 JSON→分发到ChatService处理; -
客户端断开:
onConnection被调用,业务层清理资源,关闭连接。
#include "chatserver.hpp"
void ChatServer::onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
printf("ChatServer - %s connected\n", conn->peerAddress().toIpPort().c_str());
}
else
{
printf("ChatServer - %s disconnected\n", conn->peerAddress().toIpPort().c_str());
chatservice::instance()->clientCloseException(conn);
conn->shutdown();
}
}
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time)
{
string buff = buffer->retrieveAllAsString();
// printf("ChatServer received message:%s\n", buff.c_str());
// conn->send(buff);
// threadpool_.run(std::bind(&ChatServer::doHeavyBusiness,this,conn,buff));
// 业务处理模块
try{
json js = json::parse(buff);
chatservice::instance()->recvmsg(conn, js, time);
}catch(const exception& e){
//
LOG_ERROR << "JSON parse error: " << e.what();
}
//int id = js["msgid"].get<int>();
// auto msgHandler=chatservice::instance()->getMsgHandler(id);
// msgHandler(conn,js,time);
// auto chatservice = chatservice::instance();
// auto msgHandler = chatservice->getMsgHandler(id);
// threadpool_.run(std::bind(msgHandler, conn, js, time));
// threadpool_.run(std::bind((chatservice::instance()->getMsgHandler(id)),conn,js,time));
}
ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg) : loop_(loop), server_(loop, listenAddr, nameArg)
{
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&ChatServer::onMessage, this, _1, _2, _3));
server_.setThreadNum(4);
//threadpool_.start(4);
}
// void ChatServer::doHeavyBusiness(const TcpConnectionPtr &conn, const string &msg)
// {
// try{
// if (conn->connected()) {
// conn->getLoop()->runInLoop([conn, msg]() {
// conn->send(msg);
// });
// }
// }catch(const exception& e){
// LOG_ERROR << "Biz thread exception: " << e.what();
// // 异常处理:通知客户端错误、清理资源等
// if (conn->connected()) {
// conn->getLoop()->runInLoop([conn]() {
// conn->send("Server internal error!");
// });
// }
// }
// }
void ChatServer::start()
{
server_.start();
}
更多推荐


所有评论(0)