封装的Server层代码主要有以下作用

管理 TCP 连接的建立 / 断开;
处理客户端网络数据的读写;
将业务逻辑解耦到 ChatService 模块(业务层),实现 “网络层与业务层分离”;
基于 Reactor 事件驱动模型,支持多线程 IO 处理,为高并发聊天场景提供基础。

Onconnection回调函数交给单reactor处理 OnMessage回调函数交给多线程处理

代码结构与核心模块拆解

1. 头文件与命名空间(ChatServer.hpp)
#ifndef CHATSERVER_HPP
#define CHATSERVER_HPP
#include<muduo/net/TcpServer.h>   // muduo TCP服务器核心类
#include<muduo/net/EventLoop.h>   // muduo Reactor事件循环核心
#include<muduo/base/ThreadPool.h> // 线程池(注释未启用)
#include<muduo/base/Logging.h>    // muduo 日志组件
#include<functional>              // 绑定回调函数
#include<string>
#include<iostream>
#include"json.hpp"                // JSON序列化/反序列化
#include"chatservice.hpp"         // 业务层核心类
// 命名空间简化
using namespace muduo;
using namespace muduo::net;
using namespace std;
using namespace placeholders;   
using json=nlohmann::json;
  • 头文件保护宏#ifndef...#endif 防止头文件重复包含;

  • 核心依赖

    • muduo 库组件:实现网络 IO、事件循环、线程池、日志;

    • nlohmann/json:客户端与服务器的消息统一用 JSON 格式序列化,便于解析;

    • chatservice.hpp:业务层(如私聊、群聊、好友管理),与网络层解耦。

2. ChatServer 类设计(核心数据结构)
class ChatServer
{
    // 私有成员:网络核心组件
    TcpServer server_;    // muduo TCP服务器对象(封装监听、连接管理、IO分发)
    EventLoop* loop_;     // Reactor核心:事件循环(处理IO事件、定时器等)
    // ThreadPool threadpool_; // 业务线程池(注释:意图异步处理耗时业务)

    // 私有回调函数:muduo事件触发时自动调用
    void onConnection(const TcpConnectionPtr&); // 处理连接建立/断开
    void onMessage(const TcpConnectionPtr&,Buffer*,Timestamp); // 处理消息读写

public:
    // 构造函数:初始化服务器
    ChatServer(EventLoop*loop, const InetAddress& listenAddr,const string& nameArg);
    // void doHeavyBusiness(const TcpConnectionPtr& conn, const string& msg); // 耗时业务处理(注释)
    void start(); // 启动服务器
};
#endif
  • 核心成员

    • server_:muduo 的TcpServer是封装好的 TCP 服务器,无需手动处理 socket、bind、listen 等底层操作;

    • loop_:muduo 的EventLoop是 Reactor 模式的核心,负责监听事件(如连接、读写)并分发到对应回调;

    • 回调函数:onConnection(连接事件)、onMessage(消息事件)是 muduo 的核心回调机制,事件触发时自动执行。

3. 构造函数实现(ChatServer.cpp)
ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg) 
    : loop_(loop), server_(loop, listenAddr, nameArg) // 初始化列表赋值
{
    // 绑定连接事件回调:_1是占位符,对应TcpConnectionPtr参数
    server_.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
    // 绑定消息事件回调:_1=TcpConnectionPtr, _2=Buffer*, _3=Timestamp
    server_.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));

    server_.setThreadNum(4); // 设置muduo IO线程池数量(subReactor数)
    // threadpool_.start(4); // 注释:启动4个业务线程,异步处理耗时逻辑
}
  • 核心逻辑

    1. 初始化赋值:将事件循环、监听地址、服务器名称传给TcpServer

    2. 绑定回调:通过std::bind将类成员函数绑定为 muduo 的回调函数(muduo 回调要求是可调用对象,需绑定this指针);

    3. 设置 IO 线程数setThreadNum(4) 是 muduo 的多 Reactor 配置 ——1 个 mainReactor(处理监听连接)+ 4 个 subReactor(处理已连接客户端的 IO),提升并发 IO 处理能力;

    4. 业务线程池(注释):意图启动 4 个业务线程,将耗时业务(如数据库操作)异步执行,避免阻塞 IO 线程。原本十一这样的想法设计,但是业务模块提供的全局唯一实例,我设计的初衷是将一个客户端的多个业务分配给4个线程,因此后续设计在了业务代码中

4. 连接事件回调(onConnection)
void ChatServer::onConnection(const TcpConnectionPtr &conn)
{
    if (conn->connected()) // 新连接建立
    {
        printf("ChatServer - %s connected\n", conn->peerAddress().toIpPort().c_str());
    }
    else // 连接断开
    {
        printf("ChatServer - %s disconnected\n", conn->peerAddress().toIpPort().c_str());
        // 业务层处理客户端异常断开(如清理用户在线状态、释放资源)
        chatservice::instance()->clientCloseException(conn);
        conn->shutdown(); // 主动关闭TCP连接,释放资源
    }
}
  • 核心逻辑
    • conn->connected():判断连接状态(true = 新连接,false = 断开);
    • 连接断开时:调用ChatService单例的clientCloseException处理业务层清理(如用户离线),再调用shutdown()关闭连接(确保 TCP 四次挥手完成)。
5. 消息事件回调(onMessage)
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time)
{
    // 读取客户端发送的所有数据(muduo Buffer自动处理TCP缓冲区)
    string buff = buffer->retrieveAllAsString();
    try{
        // 解析JSON数据(客户端与服务器的消息协议)
        json js = json::parse(buff);
        // 分发到业务层处理:解耦网络层与业务层
        chatservice::instance()->recvmsg(conn, js, time);
    }catch(const exception& e){
        // 捕获JSON解析异常,避免非法数据导致服务器崩溃
        LOG_ERROR << "JSON parse error: " << e.what();
    }
    // 注释:原思路——通过msgid获取业务处理器,交给线程池异步执行
    // int id = js["msgid"].get<int>();
    // auto msgHandler=chatservice::instance()->getMsgHandler(id);
    // threadpool_.run(std::bind(msgHandler, conn, js, time));
}
  • 核心逻辑
    1. 读取数据buffer->retrieveAllAsString() 读取客户端发送的所有字节(muduo 的Buffer适配 TCP 流式传输,自动处理粘包 / 拆包的基础);

    2. JSON 解析:将字符串转为 JSON 对象(聊天消息的统一格式,如{"msgid":1, "userid":1001, "content":"hello"});

    3. 异常捕获:必须捕获json::parse异常,否则客户端发送非法 JSON 会导致服务器崩溃;

    4. 业务分发:调用ChatService单例的recvmsg,将网络数据交给业务层处理(网络层只做 IO,不碰业务逻辑)

6. 启动服务器(start)
void ChatServer::start()
{
    server_.start(); // 启动muduo TcpServer,开始监听客户端连接
}
  • 底层逻辑:server_.start() 会启动监听 socket,将 “监听事件” 注册到EventLoop,事件循环开始运行,服务器进入等待连接状态。

核心设计思路与模式

1. Reactor 事件驱动模型(muduo 核心)
  • 单 Reactor + 多线程:1 个 mainReactor 处理监听连接,4 个 subReactor 处理已连接客户端的 IO,避免单线程 IO 瓶颈;

  • 事件回调:连接 / 消息事件触发时,muduo 自动调用绑定的回调函数,无需轮询,提升效率。

2. 分层设计(高内聚低耦合)
  • 网络层(ChatServer):只负责 TCP 连接管理、数据读写、异常处理,不涉及具体业务;

  • 业务层(ChatService):负责聊天逻辑(私聊、群聊、好友列表、数据库操作等),与网络层完全解耦,便于维护和扩展。

3. 单例模式(ChatService::instance ())
  • ChatService采用单例模式,确保全局只有一个业务实例,统一管理用户在线状态、业务逻辑、数据库连接等全局资源。

4. 多线程设计
  • IO 多线程:muduo 的setThreadNum(4) 实现 IO 线程池,处理高并发客户端的 IO;

  • 业务多线程(预留):注释的ThreadPool意图将耗时业务(如数据库查询)放到独立线程池,避免阻塞 IO 线程(Reactor 模式的核心原则:IO 线程只做 IO,不做耗时操作)。

整体执行流程

  1. 初始化EventLoop(事件循环);

  2. 创建ChatServer实例,传入监听地址、事件循环、服务器名称;

  3. 调用ChatServer::start(),启动 TcpServer,开始监听客户端连接;

  4. 客户端连接:onConnection被调用,打印连接信息;

  5. 客户端发消息:onMessage被调用,读取数据→解析 JSON→分发到ChatService处理;

  6. 客户端断开:onConnection被调用,业务层清理资源,关闭连接。

#include "chatserver.hpp"

void ChatServer::onConnection(const TcpConnectionPtr &conn)
{
    if (conn->connected())
    {
        printf("ChatServer - %s connected\n", conn->peerAddress().toIpPort().c_str());
    }
    else
    {
        printf("ChatServer - %s disconnected\n", conn->peerAddress().toIpPort().c_str());
        chatservice::instance()->clientCloseException(conn);
        conn->shutdown();
    }
}

void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time)
{
    string buff = buffer->retrieveAllAsString();
    // printf("ChatServer received message:%s\n", buff.c_str());
    // conn->send(buff);
    // threadpool_.run(std::bind(&ChatServer::doHeavyBusiness,this,conn,buff));
    // 业务处理模块
    try{
    json js = json::parse(buff);
    chatservice::instance()->recvmsg(conn, js, time);
    }catch(const exception& e){
        //
        LOG_ERROR << "JSON parse error: " << e.what();
    }
    //int id = js["msgid"].get<int>();
    // auto msgHandler=chatservice::instance()->getMsgHandler(id);
    // msgHandler(conn,js,time);
    // auto chatservice = chatservice::instance();
    // auto msgHandler = chatservice->getMsgHandler(id);
    // threadpool_.run(std::bind(msgHandler, conn, js, time));
   // threadpool_.run(std::bind((chatservice::instance()->getMsgHandler(id)),conn,js,time));

}

ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg) : loop_(loop), server_(loop, listenAddr, nameArg)
{
    server_.setConnectionCallback(
        std::bind(&ChatServer::onConnection, this, _1));
    server_.setMessageCallback(
        std::bind(&ChatServer::onMessage, this, _1, _2, _3));

    server_.setThreadNum(4);
    //threadpool_.start(4);
}

// void ChatServer::doHeavyBusiness(const TcpConnectionPtr &conn, const string &msg)
// {
//     try{
//         if (conn->connected()) {
//             conn->getLoop()->runInLoop([conn, msg]() {
//                 conn->send(msg);
//             });
//         }
//     }catch(const exception& e){
//         LOG_ERROR << "Biz thread exception: " << e.what();
//         // 异常处理:通知客户端错误、清理资源等
//         if (conn->connected()) {
//             conn->getLoop()->runInLoop([conn]() {
//                 conn->send("Server internal error!");
//             });
//     }
//     }
// }

void ChatServer::start()
{
    server_.start();
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐