Reactor 反应堆模式:高性能服务器的工程级架构

💬 开篇:前面四篇,我们从 IO 模型出发,走过了 select → poll → epoll 的演化之路。到了 epoll,我们有了一个高效的 IO 事件通知机制。但是,epoll 只是个"通知器",它告诉你"哪个 fd 就绪了",但怎么处理这些事件、怎么组织代码、怎么管理连接——这些工程问题,epoll 本身不管。

Reactor 模式就是解决这些工程问题的答案。它是一种设计模式,把 epoll(或者 select/poll)的事件通知能力包装成一个优雅的框架:事件来了,自动调用对应的处理函数,就像"反应"一样——所以叫 Reactor(反应堆)。Nginx、Redis、Netty 的底层都是 Reactor 模式。

这篇文章我们从设计思想出发,深度解析 Reactor 的核心组件:Connection 连接抽象、Accepter 连接接受器、HandlerConnection 事件处理器,以及最关键的——如何优雅地处理读写事件的动态切换。这是本系列最有工程深度的一篇。

👍 点赞、收藏与分享:Reactor 是高性能服务器架构的核心模式,搞懂了这个,面试时聊架构设计会非常有底气。

🚀 循序渐进:Reactor 设计思想 → Connection 连接管理 → 事件分发机制 → 完整代码实现 → 架构扩展讨论。


⚠️ 说明(教学简化版) 本文代码以讲清 Reactor/epoll的核心思想为目标,刻意做了不少“为了可读性而牺牲工程完备性”的简化,因此不建议直接用于生产环境。
例如:错误处理/资源回收策略、连接生命周期管理、读写事件分发的细节、缓冲区与背压、关闭时序与边界条件、以及高并发下的性能与稳定性优化,本文仅做点到为止或用最直观写法演示。
若用于真实项目,请结合:更严格的异常分支处理、延迟销毁/引用计数、完善的 outbuffer + EPOLLOUT驱动、限流与超时管理、以及多线程/多 Reactor 的线程安全设计等进行工程化改造。

一、为什么需要 Reactor 模式?

1.1 直接用 epoll 的问题

我们上一篇实现的 epoll 服务器,逻辑是这样的:

for (;;) {
    vector<int> ready;
    epoll.Wait(&ready);
    
    for (int fd : ready) {
        if (fd == listen_fd) {
            // 接受新连接
        } else {
            // 读数据 → 处理 → 写响应
        }
    }
}

这段代码看起来没问题,但当业务复杂起来就会出现各种混乱:

问题 1:所有逻辑堆在一个 for 循环里

不同的 fd、不同的状态、不同的业务逻辑
全挤在一个循环里
代码越写越乱,维护噩梦

问题 2:读写耦合,难以控制

什么时候监控写就绪?什么时候取消写监控?
发送缓冲区满了怎么办?
一次性读不完怎么办?
这些状态都混在一起,互相干扰

问题 3:没有统一的连接管理

一个 fd 对应的客户端信息在哪里?
读缓冲区在哪里?写缓冲区在哪里?
连接断开时怎么清理?
到处是散装的 map,维护困难

1.2 Reactor 的核心思想:事件驱动 + 回调

Reactor 的思想用一句话说就是:不要主动去做事,而是等事情来找你,来了之后调用对应的处理函数。

传统思维(主动):
  我去检查 fd1 有没有数据 → 有了 → 处理
  我去检查 fd2 有没有数据 → 有了 → 处理
  ...

Reactor 思维(被动响应):
  我等着(epoll_wait)
  fd1 有数据了 → 自动调用 fd1 对应的 handler
  fd2 可写了 → 自动调用 fd2 对应的 sender
  fd3 出错了 → 自动调用 fd3 对应的 excepter

Reactor 模式的核心组件:

                    ┌──────────────────────────────┐
                    │         TcpServer            │
                    │  ┌────────────────────────┐  │
                    │  │       EventLoop         │  │
                    │  │   epoll_wait() 循环     │  │
                    │  └──────────┬─────────────┘  │
                    │             │ 事件到来         │
                    │             ↓                 │
                    │  ┌────────────────────────┐  │
                    │  │    Connection 管理表    │  │
                    │  │  fd → Connection 对象  │  │
                    │  └──────────┬─────────────┘  │
                    │             │ 找到对应 conn    │
                    │             ↓                 │
                    │  ┌────────────────────────┐  │
                    │  │  conn._recver(conn)     │  │ 读事件 → 调用 recver
                    │  │  conn._sender(conn)     │  │ 写事件 → 调用 sender
                    │  │  conn._excepter(conn)   │  │ 异常    → 调用 excepter
                    │  └────────────────────────┘  │
                    └──────────────────────────────┘

二、Connection:连接的完整抽象

2.1 Connection 类设计思路

每一个 TCP 连接,都抽象成一个 Connection 对象。这个对象保存了这条连接的所有信息:

Connection 对象包含:
  sockfd           ← 对应的文件描述符
  inbuffer         ← 接收缓冲区(从 socket 读到的数据)
  outbuffer        ← 发送缓冲区(要发给对方的数据)
  events           ← 当前关注的 epoll 事件
  client           ← 客户端的 ip:port 信息
  _recver          ← 读就绪时的回调函数
  _sender          ← 写就绪时的回调函数
  _excepter        ← 异常发生时的回调函数
  _R               ← 指向 TcpServer(用于调用服务器提供的接口)

这就像餐厅里每张桌子都有一个记录本,记着这桌客人点了什么、上了什么菜、结账了没有——所有信息自成一体。

2.2 Connection 完整代码

// Connection.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

class Connection;
class TcpServer;

// 回调函数类型:接收 Connection 指针,进行相应处理
using func_t = std::function<void(Connection*)>;

/**
 * Connection:一条 TCP 连接的完整抽象
 * 
 * 核心思想:把一条连接需要的所有状态(缓冲区、回调函数、事件类型)
 * 都封装在一起,通过回调实现事件驱动
 */
class Connection {
public:
    Connection(int sockfd, uint32_t events, TcpServer* R)
        : _sockfd(sockfd), _events(events), _R(R)
    {}

    /**
     * 注册三个回调函数:读、写、异常
     * 这三个函数在对应事件发生时被 TcpServer 自动调用
     */
    void RegisterCallback(func_t recver, func_t sender, func_t excepter) {
        _recver   = recver;
        _sender   = sender;
        _excepter = excepter;
    }

    // 往接收缓冲区追加数据(从 socket 读到的原始字节流)
    void AddInBuffer(const std::string& buffer) {
        _inbuffer += buffer;
    }

    // 往发送缓冲区追加数据(要发给客户端的响应)
    void AddOutBuffer(const std::string& buffer) {
        _outbuffer += buffer;
    }

    bool OutBufferEmpty() const {
        return _outbuffer.empty();
    }

    int SockFd() const { return _sockfd; }

    uint32_t Events() const { return _events; }

    void SetEvents(uint32_t events) { _events = events; }

    void SetClient(const struct sockaddr_in& c) { _client = c; }

    std::string& InBuffer()  { return _inbuffer; }
    std::string& OutBuffer() { return _outbuffer; }

    void Close() {
        ::close(_sockfd);
    }

    ~Connection() {}

private:
    int _sockfd;                // 对应的 socket fd
    std::string _inbuffer;     // 接收缓冲区(字节流,等待上层协议解析)
    std::string _outbuffer;    // 发送缓冲区(待发送的数据)
    uint32_t _events;          // 当前关注的 epoll 事件(EPOLLIN/EPOLLOUT 等)
    struct sockaddr_in _client;// 客户端地址信息

public:
    // 三个回调函数(public 方便 TcpServer 直接调用)
    func_t _recver;    // 读就绪时调用
    func_t _sender;    // 写就绪时调用
    func_t _excepter;  // 异常时调用

    TcpServer* _R;     // 反向指针,指向 TcpServer(用于调用服务器接口)
};

/**
 * ConnectionFactory:Connection 的工厂类
 * 负责创建两种 Connection:监听用的和普通连接用的
 */
class ConnectionFactory {
public:
    // 创建监听 socket 对应的 Connection
    // 监听 socket 只需要 recver(接受连接),不需要 sender 和 excepter
    static Connection* BuildListenConnection(int listensock, 
                                              func_t recver, 
                                              uint32_t events, 
                                              TcpServer* R) {
        Connection* conn = new Connection(listensock, events, R);
        conn->RegisterCallback(recver, nullptr, nullptr);
        return conn;
    }

    // 创建普通客户端连接对应的 Connection
    static Connection* BuildNormalConnection(int sockfd,
                                              func_t recver,
                                              func_t sender,
                                              func_t excepter,
                                              uint32_t events,
                                              TcpServer* R) {
        Connection* conn = new Connection(sockfd, events, R);
        conn->RegisterCallback(recver, sender, excepter);
        return conn;
    }
};

三、TcpServer:事件循环的核心

3.1 TcpServer 的职责

TcpServer 是整个 Reactor 的"大脑",负责:

  1. 维护 epoll 实例:通过 epoll_create / ctl / wait 管理所有 fd
  2. 维护 Connection 映射表:fd → Connection 对象
  3. 事件循环epoll_wait → 找到对应 Connection → 调用回调
  4. 提供管理接口AddConnectionRemoveConnectionEnableReadWrite

3.2 TcpServer 完整代码

// TcpServer.hpp
#pragma once
#include <iostream>
#include <unordered_map>
#include <functional>
#include <memory>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include "Connection.hpp"

const static int g_backlog = 128;
const static int g_max_events = 64;

/**
 * 将 fd 设置为非阻塞模式(ET 模式必须)
 * 在项目中建一个 Util.hpp:之后要调用的时候直接包含头文件,这里为了减少文章篇幅没有单独提取出来
 */
static bool SetNonBlock(int fd) {
    int fl = fcntl(fd, F_GETFL);
    if (fl < 0) { perror("fcntl F_GETFL"); return false; }
    return fcntl(fd, F_SETFL, fl | O_NONBLOCK) >= 0;
}

/**
 * TcpServer:Reactor 模式的核心服务器类
 */
class TcpServer {
public:
    TcpServer(int port) : _port(port), _listensock(-1), _epfd(-1) {}

    void InitServer() {
        // 1. 创建 epoll 实例
        _epfd = epoll_create(10);
        if (_epfd < 0) { perror("epoll_create"); exit(1); }

        // 2. 创建监听 socket
        _listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (_listensock < 0) { perror("socket"); exit(1); }

        int opt = 1;
        setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
        
        // 监听 socket 也设为非阻塞(ET 模式下 accept 循环需要)
        SetNonBlock(_listensock);

        struct sockaddr_in addr = {};
        addr.sin_family = AF_INET;
        addr.sin_port = htons(_port);
        addr.sin_addr.s_addr = INADDR_ANY;

        if (bind(_listensock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
            perror("bind"); exit(1);
        }
        if (listen(_listensock, g_backlog) < 0) {
            perror("listen"); exit(1);
        }

        printf("[TcpServer] 初始化完成,监听端口 %d\n", _port);

        // 3. 将 listen_sock 包装成 Connection,加入 epoll 监控
        //    回调函数将在后面由 Accepter 注册
    }

    /**
     * 将一个 Connection 加入 epoll 监控和 Connection 表
     */
    void AddConnection(Connection* conn) {
        int fd = conn->SockFd();

        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = conn->Events();

        int ret = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
        if (ret < 0) {
            perror("epoll_ctl ADD");
            delete conn;
            return;
        }

        _connections[fd] = conn;
        printf("[TcpServer] 连接 fd=%d 已加入监控\n", fd);
    }

    /**
     * 从 epoll 和 Connection 表中移除连接(不关闭 fd,由调用者关闭)
     */
    void RemoveConnection(int fd) {
        auto it = _connections.find(fd);
        if (it == _connections.end()) return;

        epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
        _connections.erase(it);
        printf("[TcpServer] 连接 fd=%d 已移除监控\n", fd);
    }

    /**
     * 动态修改 fd 的监控事件(控制读写就绪的监控)
     * 
     * @param fd     目标文件描述符
     * @param enable_read  是否监控读就绪
     * @param enable_write 是否监控写就绪
     */
    void EnableReadWrite(int fd, bool enable_read, bool enable_write) {
        auto it = _connections.find(fd);
        if (it == _connections.end()) return;

        uint32_t events = 0;
        if (enable_read)  events |= EPOLLIN;
        if (enable_write) events |= EPOLLOUT;
        events |= EPOLLET;  // 保持 ET 模式

        it->second->SetEvents(events);

        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = events;
        epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);

        printf("[TcpServer] fd=%d 事件更新:read=%d, write=%d\n", 
               fd, enable_read, enable_write);
    }

    /**
     * 事件循环:核心驱动
     */
    void Loop() {
        struct epoll_event events[g_max_events];

        for (;;) {
            int n = epoll_wait(_epfd, events, g_max_events, -1);
            
            if (n < 0) {
                if (errno == EINTR) continue;  // 被信号打断,重试
                perror("epoll_wait");
                break;
            }

            // 遍历就绪事件,分发给对应的 Connection 回调
            for (int i = 0; i < n; i++) {
                int fd = events[i].data.fd;
                uint32_t revents = events[i].events;

                auto it = _connections.find(fd);
                if (it == _connections.end()) continue;

                Connection* conn = it->second;

                // 按优先级处理:异常 > 写 > 读
                if (revents & (EPOLLERR | EPOLLHUP)) {
                    // 异常事件
                    if (conn->_excepter) conn->_excepter(conn);
                } else if (revents & EPOLLOUT) {
                    // 写就绪
                    if (conn->_sender) conn->_sender(conn);
                } else if (revents & EPOLLIN) {
                    // 读就绪
                    if (conn->_recver) conn->_recver(conn);
                }
            }
        }
    }

    int ListenSock() const { return _listensock; }
    int EpollFd() const { return _epfd; }

    ~TcpServer() {
        for (auto& [fd, conn] : _connections) {
            close(fd);
            delete conn;
        }
        if (_epfd >= 0) close(_epfd);
        if (_listensock >= 0) close(_listensock);
    }

private:
    int _port;
    int _listensock;
    int _epfd;
    std::unordered_map<int, Connection*> _connections;  // fd → Connection 映射表
};

四、Accepter:处理新连接

4.1 Accepter 的职责

Accepter 专门负责处理"新连接到来"的事件。当 listen_sock 的读就绪触发时,调用 AccepterConnection 方法:

  1. 循环 accept(因为 ET 模式,必须把所有新连接都接受完)
  2. 为每个新连接创建 Connection 对象,注册回调函数
  3. 把新连接加入 TcpServer 的管理

4.2 Accepter 代码

// Accepter.hpp
#pragma once
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cerrno>
#include <functional>
#include "Connection.hpp"
#include "HandlerConnection.hpp"  // 读写回调的实现

/**
 * Accepter:新连接接受器
 * 
 * 当 listen_sock 上有读事件就绪时,AccepterConnection 被调用
 */
class Accepter {
public:
    /**
     * 处理 listen_sock 上的新连接事件
     * 
     * @param conn  listen_sock 对应的 Connection 对象
     * 
     * 使用 ET 模式时,必须循环 accept 直到 EAGAIN,
     * 否则只能接受一个连接,其余连接被忽略
     */
    static void AccepterConnection(Connection* conn) {
        errno = 0;
        
        while (true) {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            
            // 非阻塞 accept:如果没有新连接,立即返回 EAGAIN
            int sockfd = ::accept(conn->SockFd(), (struct sockaddr*)&peer, &len);
            
            if (sockfd > 0) {
                // 成功接受一个新连接
                char ip_buf[INET_ADDRSTRLEN];
                inet_ntop(AF_INET, &peer.sin_addr, ip_buf, sizeof(ip_buf));
                printf("[Accepter] 新连接:%s:%d, fd=%d\n", 
                       ip_buf, ntohs(peer.sin_port), sockfd);

                // 新连接设为非阻塞(ET 模式要求)
                SetNonBlock(sockfd);

                // 注册读、写、异常三个回调
                // 这里用 std::bind 绑定静态成员函数
                auto recver   = std::bind(&HandlerConnection::Recver,   std::placeholders::_1);
                auto sender   = std::bind(&HandlerConnection::Sender,   std::placeholders::_1);
                auto excepter = std::bind(&HandlerConnection::Excepter, std::placeholders::_1);

                // 创建普通连接的 Connection,ET 模式 + 初始只监控读事件
                Connection* normal_conn = ConnectionFactory::BuildNormalConnection(
                    sockfd, recver, sender, excepter,
                    EPOLLIN | EPOLLET,  // 初始:只关注读就绪
                    conn->_R
                );
                normal_conn->SetClient(peer);

                // 加入 TcpServer 管理
                conn->_R->AddConnection(normal_conn);
                
            } else {
                // accept 失败
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 没有更多新连接了,正常退出循环
                    break;
                } else if (errno == EINTR) {
                    // 被信号打断,重试
                    continue;
                } else {
                    perror("accept error");
                    break;
                }
            }
        }
    }
};

五、HandlerConnection:读写事件处理

5.1 HandlerConnection 的职责

HandlerConnection 包含三个关键静态函数,分别处理:

  • Recver:读就绪时,把数据从 socket 读到 inbuffer,然后调用上层协议处理
  • Sender:写就绪时,把 outbuffer 里的数据发出去
  • Excepter:异常时,清理连接

5.2 HandlerConnection 完整代码

// HandlerConnection.hpp
#pragma once
#include <iostream>
#include <cerrno>
#include <cstring>
#include <sys/socket.h>
#include "Connection.hpp"

const static int kBufferSize = 4096;

/**
 * HandlerConnection:连接的读写异常处理器
 * 
 * 所有方法都是静态的,通过 Connection 指针访问连接状态
 */
class HandlerConnection {
public:
    /**
     * Recver:处理读就绪事件
     * 
     * 职责:把数据从 socket 读入 inbuffer,然后交给上层协议处理
     * 不关心数据格式,只负责读取字节流
     */
    static void Recver(Connection* conn) {
        errno = 0;
        char buffer[kBufferSize];
        
        // ET 模式:循环读直到 EAGAIN,确保读完所有数据
        while (true) {
            ssize_t n = recv(conn->SockFd(), buffer, sizeof(buffer) - 1, 0);
            
            if (n > 0) {
                buffer[n] = '\0';
                conn->AddInBuffer(buffer);  // 追加到接收缓冲区
                
            } else if (n == 0) {
                // 对端关闭连接
                printf("[Recver] fd=%d 对端关闭\n", conn->SockFd());
                conn->_excepter(conn);  // 触发异常处理(关闭连接)
                return;
                
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 缓冲区读完了,退出循环,正常情况
                    break;
                } else if (errno == EINTR) {
                    continue;  // 被信号中断,重试
                } else {
                    // 真正的读错误
                    perror("recv error");
                    conn->_excepter(conn);
                    return;
                }
            }
        }
        
        printf("[Recver] fd=%d inbuffer: %s\n", conn->SockFd(), conn->InBuffer().c_str());
        
        // 读取完毕,交给上层处理(解析协议、处理业务、构建响应)
        HandlerRequest(conn);
    }

    /**
     * Sender:处理写就绪事件
     * 
     * 职责:把 outbuffer 里的数据发送出去
     * 如果本次没发完,开启 EPOLLOUT 监控,等下次机会再发
     */
    static void Sender(Connection* conn) {
        errno = 0;
        std::string& outbuffer = conn->OutBuffer();
        
        // ET 模式:循环发送直到全部发完或缓冲区满
        while (true) {
            ssize_t n = send(conn->SockFd(), 
                            outbuffer.c_str(), 
                            outbuffer.size(), 
                            0);
            
            if (n >= 0) {
                // 发出去了 n 个字节,从 outbuffer 中移除
                outbuffer.erase(0, n);
                
                if (outbuffer.empty()) {
                    // 全部发完了,退出循环
                    break;
                }
                // 还有数据没发,继续循环
                
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 发送缓冲区满了,本次发不完,退出循环
                    // 后面会开启 EPOLLOUT 监控,等缓冲区空了再来
                    break;
                } else if (errno == EINTR) {
                    continue;
                } else {
                    perror("send error");
                    conn->_excepter(conn);
                    return;
                }
            }
        }
        
        if (!conn->OutBufferEmpty()) {
            // outbuffer 还有数据没发完
            // 开启 EPOLLOUT 监控,等发送缓冲区可写时再来
            printf("[Sender] fd=%d 数据未发完,开启 EPOLLOUT 监控\n", conn->SockFd());
            conn->_R->EnableReadWrite(conn->SockFd(), true, true);
        } else {
            // 数据全部发完
            // 关闭 EPOLLOUT 监控(避免 epoll_wait 频繁返回)
            printf("[Sender] fd=%d 数据发送完毕,关闭 EPOLLOUT 监控\n", conn->SockFd());
            conn->_R->EnableReadWrite(conn->SockFd(), true, false);
        }
    }

    /**
     * Excepter:处理异常事件(连接断开/错误)
     * 
     * 职责:清理这条连接的所有资源
     */
    static void Excepter(Connection* conn) {
        int fd = conn->SockFd();
        printf("[Excepter] 连接 fd=%d 发生异常,开始清理\n", fd);
        
        // 1. 从 epoll 中移除,从 connections 表中删除
        conn->_R->RemoveConnection(fd);
        
        // 2. 关闭 socket fd
        conn->Close();
        
        // 3. 删除 Connection 对象,释放内存
        delete conn;
        
        printf("[Excepter] 连接 fd=%d 清理完毕\n", fd);
    }

private:
    /**
     * HandlerRequest:上层业务处理
     * 
     * 这里是协议解析和业务逻辑的入口
     * 具体实现由业务层提供(这里用简单的回显作为示例)
     */
    static void HandlerRequest(Connection* conn) {
        std::string& inbuffer = conn->InBuffer();
        
        if (inbuffer.empty()) return;
        
        // 简单的回显服务:把收到的数据原样返回
        // 实际业务中这里会进行协议解析、反序列化、业务处理、序列化
        std::string response = "[Echo] " + inbuffer;
        inbuffer.clear();  // 清空接收缓冲区
        
        // 把响应放入发送缓冲区
        conn->AddOutBuffer(response);
        
        // 直接尝试发送(大多数情况下发送缓冲区有空间,可以直接发)
        if (!conn->OutBufferEmpty()) {
            conn->_sender(conn);
        }
    }
};

六、组装完整的 Reactor 服务器

6.1 main 函数:把所有组件组装起来

// reactor_main.cc
#include "TcpServer.hpp"
#include "Connection.hpp"
#include "Accepter.hpp"
#include "HandlerConnection.hpp"
#include <cstdlib>

int main(int argc, char* argv[]) {
    int port = (argc > 1) ? atoi(argv[1]) : 8888;

    // 1. 创建 TcpServer 实例
    TcpServer server(port);
    server.InitServer();

    // 2. 创建 listen_sock 对应的 Connection
    //    回调函数是 Accepter::AccepterConnection
    //    使用 ET 模式(EPOLLIN | EPOLLET)
    auto listen_recver = std::bind(&Accepter::AccepterConnection, std::placeholders::_1);
    
    Connection* listen_conn = ConnectionFactory::BuildListenConnection(
        server.ListenSock(),
        listen_recver,
        EPOLLIN | EPOLLET,  // 监听 socket:ET 模式,监控读就绪
        &server
    );

    // 3. 将 listen_conn 加入 TcpServer 管理
    server.AddConnection(listen_conn);

    printf("[Main] Reactor 服务器启动,端口 %d\n", port);
    printf("[Main] 等待连接...\n");

    // 4. 进入事件循环(永不返回,直到进程退出)
    server.Loop();

    return 0;
}

6.2 编译和测试

# 编译
g++ -std=c++17 reactor_main.cc -o reactor_server

# 运行
./reactor_server 8888

# 另一个终端测试
nc 127.0.0.1 8888
hello world
# 服务器回显:[Echo] hello world

# 用多个终端同时连接,验证并发处理
nc 127.0.0.1 8888  # 终端1
nc 127.0.0.1 8888  # 终端2
nc 127.0.0.1 8888  # 终端3

七、读写事件动态切换:核心工程细节

7.1 为什么要动态切换读写监控?

这是 Reactor 实现中最精妙的一个设计,很多人会困惑——为什么不一直开着 EPOLLOUT 监控?

如果一直开 EPOLLOUT

发送缓冲区几乎总是有空间的(不满)
→ EPOLLOUT 几乎总是就绪
→ epoll_wait 几乎每次都立刻返回(因为 EPOLLOUT 就绪)
→ 即使没有数据要发,epoll_wait 也一直在唤醒
→ 浪费 CPU

正确的策略

初始状态:只监控 EPOLLIN(只有读就绪才唤醒)

数据来了(EPOLLIN 就绪):
  → Recver 读数据 → HandlerRequest 处理 → 放入 outbuffer → 调用 Sender

Sender 发送数据:
  - 如果一次发完:保持只监控 EPOLLIN
  - 如果没发完(缓冲区满):
      → 开启 EPOLLOUT 监控(EnableReadWrite(fd, true, true))
      → epoll_wait 等 EPOLLOUT 就绪
      → 发送缓冲区空了 → 再次调用 Sender → 继续发剩余数据
      → 发完后关闭 EPOLLOUT(EnableReadWrite(fd, true, false)
状态机:

                ┌─────────────────┐
                │  只监控 EPOLLIN  │ (正常状态)
                └────────┬────────┘
                         │ 发送缓冲区满,数据未发完
                         ↓
                ┌─────────────────────────┐
                │ 监控 EPOLLIN + EPOLLOUT  │ (等待写就绪)
                └────────┬────────────────┘
                         │ EPOLLOUT 就绪,数据发完
                         ↓
                ┌─────────────────┐
                │  只监控 EPOLLIN  │ (回到正常状态)
                └─────────────────┘

7.2 EnableReadWrite 的实现

这个函数在 TcpServer 里,用 epoll_ctl(EPOLL_CTL_MOD) 动态修改 fd 的监控事件:

void EnableReadWrite(int fd, bool enable_read, bool enable_write) {
    auto it = _connections.find(fd);
    if (it == _connections.end()) return;

    uint32_t events = 0;
    if (enable_read)  events |= EPOLLIN;
    if (enable_write) events |= EPOLLOUT;
    events |= EPOLLET;  // 保持 ET 模式

    it->second->SetEvents(events);

    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = events;
    
    // 修改这个 fd 在 epoll 中的监控事件
    epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);
}

八、Reactor 模式的数据流全景

8.1 一次完整请求的处理流程

客户端发送 "hello"
        |
        | TCP 传输
        ↓
[网卡收到数据包]
        |
        | 驱动中断,内核处理
        ↓
[内核 TCP 接收缓冲区]
        |
        | ep_poll_callback 回调
        ↓
[epoll 就绪队列:fd=5 的 EPOLLIN]
        |
        | epoll_wait 返回
        ↓
[TcpServer::Loop]
        |
        | 查找 _connections[5],找到 Connection 对象
        ↓
[conn._recver(conn) 被调用]
        |
        | Recver 循环 recv,把 "hello\n" 放入 inbuffer
        ↓
[HandlerRequest(conn) 被调用]
        |
        | 解析协议,处理业务,构建响应 "[Echo] hello\n"
        | 把响应放入 outbuffer
        ↓
[conn._sender(conn) 被调用]
        |
        | Sender 把 outbuffer 发送到 socket
        | 如果一次发完:关闭 EPOLLOUT 监控
        | 如果没发完:开启 EPOLLOUT 监控,等下次
        ↓
[内核 TCP 发送缓冲区]
        |
        | TCP 传输
        ↓
客户端收到 "[Echo] hello"

九、Reactor 的进阶:单 Reactor vs 多 Reactor

9.1 单 Reactor 单线程(我们实现的版本)

                  ┌───────────────────────────────┐
                  │          主线程                │
                  │                               │
                  │  epoll_wait                   │
                  │    ↓ 事件来了                  │
                  │  dispatch → handler           │
                  │  (接受连接 + 读写 + 业务处理) │
                  └───────────────────────────────┘

优点:简单,无锁,Redis 用的就是这个
缺点:单核,业务处理耗时会影响响应速度

9.2 单 Reactor 多线程

                  ┌───────────────────────────────┐
                  │          主线程(IO线程)       │
                  │  epoll_wait                   │
                  │    ↓ 事件来了                  │
                  │  dispatch:接受连接 + 读写      │
                  │    ↓ 把请求交给线程池           │
                  └───────────────────────────────┘
                           │
              ┌────────────┼────────────┐
              ↓            ↓            ↓
        [工作线程1]   [工作线程2]   [工作线程3]
        业务处理      业务处理      业务处理
        
优点:业务处理不阻塞 IO 线程
缺点:epoll 是单线程的,仍是单点

9.3 多 Reactor 多线程(主从 Reactor,Nginx/Netty 用的)

                  ┌─────────────────────────────────┐
                  │          主 Reactor(主线程)      │
                  │  epoll_wait(只监控 listen_sock) │
                  │    ↓ 新连接来了                   │
                  │  accept → 分配给一个子 Reactor    │
                  └─────────────────────────────────┘
                           │
              ┌────────────┼────────────┐
              ↓            ↓            ↓
       [子 Reactor 1]  [子 Reactor 2] [子 Reactor 3]
       epoll_wait       epoll_wait    epoll_wait
       读写 + 业务       读写 + 业务    读写 + 业务

优点:
  - 主 Reactor 专注接收连接
  - 子 Reactor 分摊读写压力
  - 充分利用多核 CPU
  
适用:Nginx、Netty、各大 RPC 框架

十、性能优化建议

10.1 读写缓冲区优化

当前我们用 std::string 作为缓冲区,性能有限。生产环境中可以考虑:

// 方案1:环形缓冲区(RingBuffer)
// 避免频繁的内存分配和数据移动
class RingBuffer {
    char* data_;
    size_t head_, tail_, capacity_;
public:
    void Write(const char* src, size_t len);
    size_t Read(char* dst, size_t maxlen);
};

// 方案2:scatter-gather IO(分散读/聚集写)
// 使用 readv/writev,减少系统调用次数
struct iovec iov[2];
iov[0].iov_base = buf1;
iov[0].iov_len  = len1;
iov[1].iov_base = buf2;
iov[1].iov_len  = len2;
ssize_t n = readv(fd, iov, 2);  // 一次系统调用读到两个缓冲区

10.2 连接管理优化

// 连接超时检测:维护一个时间堆(最小堆),定期清理超时连接
// 避免僵尸连接占用资源
class TimeWheel {
    // 时间轮,O(1) 的超时管理
};

// EPOLLONESHOT:确保一个 fd 在同一时刻只被一个线程处理
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;

十一、常见问题解答

11.1 Connection 的 _R 指针有什么用?

_R 是指向 TcpServer 的指针(回指指针)。在事件处理函数(Recver/Sender/Excepter)中,需要调用服务器提供的接口,比如:

  • conn->_R->EnableReadWrite(fd, true, true):修改 fd 的监控事件
  • conn->_R->RemoveConnection(fd):从服务器中删除连接

没有这个指针,处理函数就无法访问 TcpServer 的功能。

11.2 listen_sock 用 LT 还是 ET?

如果 listen_sock 用 ET 模式,当大量连接同时到来时,epoll_wait 只返回一次(ET 只通知一次状态变化),如果你只调用一次 accept,剩余的连接就永远不会被接受。

解决方案:

  • 方案1:listen_sock 用 LT,不用 ET(简单可靠)
  • 方案2:listen_sock 用 ET,但在 AccepterConnection 中循环 accept 直到 EAGAIN

我们代码中用了方案2(非阻塞 listen_sock + 循环 accept)。

11.3 delete conn 之后还能访问 conn->SockFd() 吗?

不能!delete 之后,对象的内存已经释放,访问任何成员都是未定义行为。

正确的 Excepter 实现顺序:

static void Excepter(Connection* conn) {
    int fd = conn->SockFd();  // 先保存 fd(在 delete 之前)
    
    conn->_R->RemoveConnection(fd);  // 从管理表删除
    conn->Close();                   // 关闭 fd
    delete conn;                     // 最后 delete,之后不再访问 conn
    
    printf("fd=%d 清理完毕\n", fd);  // 用保存的 fd 打印,不访问 conn
}

为避免回调中直接 delete 造成悬空指针,生产实现通常会采用“延迟销毁”:只标记连接为 closing,把 fd 从 epoll 移除并 close,最后在 event loop 的安全点统一释放对象。


十二、总结

12.1 Reactor 模式核心要点

# 要点 关键细节
1 事件驱动 不主动轮询,epoll 通知 → 自动调用对应回调
2 Connection 抽象 每个连接有自己的 inbuffer、outbuffer 和三个回调
3 回指指针 _R 回调函数通过 _R 调用 TcpServer 提供的管理接口
4 动态读写监控 默认只开 EPOLLIN;数据未发完时开 EPOLLOUT;发完关 EPOLLOUT
5 ET + 非阻塞 所有 IO 操作必须非阻塞,循环读写直到 EAGAIN

12.2 整个系列的知识脉络

第一篇:五种 IO 模型
  ↓ 理解了等待和拷贝,理解了为什么需要 IO 多路复用
第二篇:select
  ↓ 位图、一个线程监控多个 fd、但有数量限制和性能问题
第三篇:poll
  ↓ 解决了数量限制,接口更清晰,但本质问题还在
第四篇:epoll
  ↓ 红黑树+就绪链表+回调,O(1) 获取就绪 fd,LT/ET 两种模式
第五篇:Reactor
  ↓ 在 epoll 之上建立优雅的架构:事件驱动+回调+Connection 抽象
  ↓ 这是 Nginx、Redis、Node.js 等的核心架构思想

💬 总结:Reactor 模式是我们这个系列的终点,也是工程实践的起点。它把 epoll 的事件通知能力包装成一个优雅的框架:Connection 负责状态管理,Accepter 负责接受连接,HandlerConnection 负责读写处理,TcpServer 负责事件分发。动态切换读写监控是 Reactor 实现中最精妙的细节——只在需要时监控写就绪,避免无效的 epoll_wait 唤醒。理解了这个系列,你就具备了独立分析 Nginx/Redis 核心 IO 架构的基础能力。

👍 点赞、收藏与分享:这是 Linux 网络编程高级系列的完结篇。从五种 IO 模型到 Reactor 反应堆,五篇文章构成了一个完整的知识体系。如果对你有帮助,点个赞支持一下吧!🚀✨


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐