【Linux】Reactor 反应堆模式:高性能服务器的工程级架构
本文介绍了Reactor反应堆模式作为高性能服务器架构的核心设计思想。Reactor通过事件驱动和回调机制,将epoll的事件通知能力封装成优雅的框架,解决了直接使用epoll时面临的逻辑混乱、状态管理困难等问题。文章详细解析了Reactor的核心组件:Connection类对TCP连接进行完整抽象,包含文件描述符、缓冲区、回调函数等关键信息;事件循环通过epoll_wait被动等待事件,并自动调
文章目录
Reactor 反应堆模式:高性能服务器的工程级架构
💬 开篇:前面四篇,我们从 IO 模型出发,走过了 select → poll → epoll 的演化之路。到了 epoll,我们有了一个高效的 IO 事件通知机制。但是,epoll 只是个"通知器",它告诉你"哪个 fd 就绪了",但怎么处理这些事件、怎么组织代码、怎么管理连接——这些工程问题,epoll 本身不管。
Reactor 模式就是解决这些工程问题的答案。它是一种设计模式,把 epoll(或者 select/poll)的事件通知能力包装成一个优雅的框架:事件来了,自动调用对应的处理函数,就像"反应"一样——所以叫 Reactor(反应堆)。Nginx、Redis、Netty 的底层都是 Reactor 模式。
这篇文章我们从设计思想出发,深度解析 Reactor 的核心组件:Connection 连接抽象、Accepter 连接接受器、HandlerConnection 事件处理器,以及最关键的——如何优雅地处理读写事件的动态切换。这是本系列最有工程深度的一篇。
👍 点赞、收藏与分享:Reactor 是高性能服务器架构的核心模式,搞懂了这个,面试时聊架构设计会非常有底气。
🚀 循序渐进:Reactor 设计思想 → Connection 连接管理 → 事件分发机制 → 完整代码实现 → 架构扩展讨论。
⚠️ 说明(教学简化版) 本文代码以讲清 Reactor/epoll的核心思想为目标,刻意做了不少“为了可读性而牺牲工程完备性”的简化,因此不建议直接用于生产环境。
例如:错误处理/资源回收策略、连接生命周期管理、读写事件分发的细节、缓冲区与背压、关闭时序与边界条件、以及高并发下的性能与稳定性优化,本文仅做点到为止或用最直观写法演示。
若用于真实项目,请结合:更严格的异常分支处理、延迟销毁/引用计数、完善的 outbuffer + EPOLLOUT驱动、限流与超时管理、以及多线程/多 Reactor 的线程安全设计等进行工程化改造。
一、为什么需要 Reactor 模式?
1.1 直接用 epoll 的问题
我们上一篇实现的 epoll 服务器,逻辑是这样的:
for (;;) {
vector<int> ready;
epoll.Wait(&ready);
for (int fd : ready) {
if (fd == listen_fd) {
// 接受新连接
} else {
// 读数据 → 处理 → 写响应
}
}
}
这段代码看起来没问题,但当业务复杂起来就会出现各种混乱:
问题 1:所有逻辑堆在一个 for 循环里
不同的 fd、不同的状态、不同的业务逻辑
全挤在一个循环里
代码越写越乱,维护噩梦
问题 2:读写耦合,难以控制
什么时候监控写就绪?什么时候取消写监控?
发送缓冲区满了怎么办?
一次性读不完怎么办?
这些状态都混在一起,互相干扰
问题 3:没有统一的连接管理
一个 fd 对应的客户端信息在哪里?
读缓冲区在哪里?写缓冲区在哪里?
连接断开时怎么清理?
到处是散装的 map,维护困难
1.2 Reactor 的核心思想:事件驱动 + 回调
Reactor 的思想用一句话说就是:不要主动去做事,而是等事情来找你,来了之后调用对应的处理函数。
传统思维(主动):
我去检查 fd1 有没有数据 → 有了 → 处理
我去检查 fd2 有没有数据 → 有了 → 处理
...
Reactor 思维(被动响应):
我等着(epoll_wait)
fd1 有数据了 → 自动调用 fd1 对应的 handler
fd2 可写了 → 自动调用 fd2 对应的 sender
fd3 出错了 → 自动调用 fd3 对应的 excepter
Reactor 模式的核心组件:
┌──────────────────────────────┐
│ TcpServer │
│ ┌────────────────────────┐ │
│ │ EventLoop │ │
│ │ epoll_wait() 循环 │ │
│ └──────────┬─────────────┘ │
│ │ 事件到来 │
│ ↓ │
│ ┌────────────────────────┐ │
│ │ Connection 管理表 │ │
│ │ fd → Connection 对象 │ │
│ └──────────┬─────────────┘ │
│ │ 找到对应 conn │
│ ↓ │
│ ┌────────────────────────┐ │
│ │ conn._recver(conn) │ │ 读事件 → 调用 recver
│ │ conn._sender(conn) │ │ 写事件 → 调用 sender
│ │ conn._excepter(conn) │ │ 异常 → 调用 excepter
│ └────────────────────────┘ │
└──────────────────────────────┘
二、Connection:连接的完整抽象
2.1 Connection 类设计思路
每一个 TCP 连接,都抽象成一个 Connection 对象。这个对象保存了这条连接的所有信息:
Connection 对象包含:
sockfd ← 对应的文件描述符
inbuffer ← 接收缓冲区(从 socket 读到的数据)
outbuffer ← 发送缓冲区(要发给对方的数据)
events ← 当前关注的 epoll 事件
client ← 客户端的 ip:port 信息
_recver ← 读就绪时的回调函数
_sender ← 写就绪时的回调函数
_excepter ← 异常发生时的回调函数
_R ← 指向 TcpServer(用于调用服务器提供的接口)
这就像餐厅里每张桌子都有一个记录本,记着这桌客人点了什么、上了什么菜、结账了没有——所有信息自成一体。
2.2 Connection 完整代码
// Connection.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
class Connection;
class TcpServer;
// 回调函数类型:接收 Connection 指针,进行相应处理
using func_t = std::function<void(Connection*)>;
/**
* Connection:一条 TCP 连接的完整抽象
*
* 核心思想:把一条连接需要的所有状态(缓冲区、回调函数、事件类型)
* 都封装在一起,通过回调实现事件驱动
*/
class Connection {
public:
Connection(int sockfd, uint32_t events, TcpServer* R)
: _sockfd(sockfd), _events(events), _R(R)
{}
/**
* 注册三个回调函数:读、写、异常
* 这三个函数在对应事件发生时被 TcpServer 自动调用
*/
void RegisterCallback(func_t recver, func_t sender, func_t excepter) {
_recver = recver;
_sender = sender;
_excepter = excepter;
}
// 往接收缓冲区追加数据(从 socket 读到的原始字节流)
void AddInBuffer(const std::string& buffer) {
_inbuffer += buffer;
}
// 往发送缓冲区追加数据(要发给客户端的响应)
void AddOutBuffer(const std::string& buffer) {
_outbuffer += buffer;
}
bool OutBufferEmpty() const {
return _outbuffer.empty();
}
int SockFd() const { return _sockfd; }
uint32_t Events() const { return _events; }
void SetEvents(uint32_t events) { _events = events; }
void SetClient(const struct sockaddr_in& c) { _client = c; }
std::string& InBuffer() { return _inbuffer; }
std::string& OutBuffer() { return _outbuffer; }
void Close() {
::close(_sockfd);
}
~Connection() {}
private:
int _sockfd; // 对应的 socket fd
std::string _inbuffer; // 接收缓冲区(字节流,等待上层协议解析)
std::string _outbuffer; // 发送缓冲区(待发送的数据)
uint32_t _events; // 当前关注的 epoll 事件(EPOLLIN/EPOLLOUT 等)
struct sockaddr_in _client;// 客户端地址信息
public:
// 三个回调函数(public 方便 TcpServer 直接调用)
func_t _recver; // 读就绪时调用
func_t _sender; // 写就绪时调用
func_t _excepter; // 异常时调用
TcpServer* _R; // 反向指针,指向 TcpServer(用于调用服务器接口)
};
/**
* ConnectionFactory:Connection 的工厂类
* 负责创建两种 Connection:监听用的和普通连接用的
*/
class ConnectionFactory {
public:
// 创建监听 socket 对应的 Connection
// 监听 socket 只需要 recver(接受连接),不需要 sender 和 excepter
static Connection* BuildListenConnection(int listensock,
func_t recver,
uint32_t events,
TcpServer* R) {
Connection* conn = new Connection(listensock, events, R);
conn->RegisterCallback(recver, nullptr, nullptr);
return conn;
}
// 创建普通客户端连接对应的 Connection
static Connection* BuildNormalConnection(int sockfd,
func_t recver,
func_t sender,
func_t excepter,
uint32_t events,
TcpServer* R) {
Connection* conn = new Connection(sockfd, events, R);
conn->RegisterCallback(recver, sender, excepter);
return conn;
}
};
三、TcpServer:事件循环的核心
3.1 TcpServer 的职责
TcpServer 是整个 Reactor 的"大脑",负责:
- 维护 epoll 实例:通过
epoll_create / ctl / wait管理所有 fd - 维护 Connection 映射表:fd → Connection 对象
- 事件循环:
epoll_wait→ 找到对应 Connection → 调用回调 - 提供管理接口:
AddConnection、RemoveConnection、EnableReadWrite
3.2 TcpServer 完整代码
// TcpServer.hpp
#pragma once
#include <iostream>
#include <unordered_map>
#include <functional>
#include <memory>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include "Connection.hpp"
const static int g_backlog = 128;
const static int g_max_events = 64;
/**
* 将 fd 设置为非阻塞模式(ET 模式必须)
* 在项目中建一个 Util.hpp:之后要调用的时候直接包含头文件,这里为了减少文章篇幅没有单独提取出来
*/
static bool SetNonBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
if (fl < 0) { perror("fcntl F_GETFL"); return false; }
return fcntl(fd, F_SETFL, fl | O_NONBLOCK) >= 0;
}
/**
* TcpServer:Reactor 模式的核心服务器类
*/
class TcpServer {
public:
TcpServer(int port) : _port(port), _listensock(-1), _epfd(-1) {}
void InitServer() {
// 1. 创建 epoll 实例
_epfd = epoll_create(10);
if (_epfd < 0) { perror("epoll_create"); exit(1); }
// 2. 创建监听 socket
_listensock = socket(AF_INET, SOCK_STREAM, 0);
if (_listensock < 0) { perror("socket"); exit(1); }
int opt = 1;
setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 监听 socket 也设为非阻塞(ET 模式下 accept 循环需要)
SetNonBlock(_listensock);
struct sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(_listensock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("bind"); exit(1);
}
if (listen(_listensock, g_backlog) < 0) {
perror("listen"); exit(1);
}
printf("[TcpServer] 初始化完成,监听端口 %d\n", _port);
// 3. 将 listen_sock 包装成 Connection,加入 epoll 监控
// 回调函数将在后面由 Accepter 注册
}
/**
* 将一个 Connection 加入 epoll 监控和 Connection 表
*/
void AddConnection(Connection* conn) {
int fd = conn->SockFd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = conn->Events();
int ret = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0) {
perror("epoll_ctl ADD");
delete conn;
return;
}
_connections[fd] = conn;
printf("[TcpServer] 连接 fd=%d 已加入监控\n", fd);
}
/**
* 从 epoll 和 Connection 表中移除连接(不关闭 fd,由调用者关闭)
*/
void RemoveConnection(int fd) {
auto it = _connections.find(fd);
if (it == _connections.end()) return;
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
_connections.erase(it);
printf("[TcpServer] 连接 fd=%d 已移除监控\n", fd);
}
/**
* 动态修改 fd 的监控事件(控制读写就绪的监控)
*
* @param fd 目标文件描述符
* @param enable_read 是否监控读就绪
* @param enable_write 是否监控写就绪
*/
void EnableReadWrite(int fd, bool enable_read, bool enable_write) {
auto it = _connections.find(fd);
if (it == _connections.end()) return;
uint32_t events = 0;
if (enable_read) events |= EPOLLIN;
if (enable_write) events |= EPOLLOUT;
events |= EPOLLET; // 保持 ET 模式
it->second->SetEvents(events);
struct epoll_event ev;
ev.data.fd = fd;
ev.events = events;
epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);
printf("[TcpServer] fd=%d 事件更新:read=%d, write=%d\n",
fd, enable_read, enable_write);
}
/**
* 事件循环:核心驱动
*/
void Loop() {
struct epoll_event events[g_max_events];
for (;;) {
int n = epoll_wait(_epfd, events, g_max_events, -1);
if (n < 0) {
if (errno == EINTR) continue; // 被信号打断,重试
perror("epoll_wait");
break;
}
// 遍历就绪事件,分发给对应的 Connection 回调
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
uint32_t revents = events[i].events;
auto it = _connections.find(fd);
if (it == _connections.end()) continue;
Connection* conn = it->second;
// 按优先级处理:异常 > 写 > 读
if (revents & (EPOLLERR | EPOLLHUP)) {
// 异常事件
if (conn->_excepter) conn->_excepter(conn);
} else if (revents & EPOLLOUT) {
// 写就绪
if (conn->_sender) conn->_sender(conn);
} else if (revents & EPOLLIN) {
// 读就绪
if (conn->_recver) conn->_recver(conn);
}
}
}
}
int ListenSock() const { return _listensock; }
int EpollFd() const { return _epfd; }
~TcpServer() {
for (auto& [fd, conn] : _connections) {
close(fd);
delete conn;
}
if (_epfd >= 0) close(_epfd);
if (_listensock >= 0) close(_listensock);
}
private:
int _port;
int _listensock;
int _epfd;
std::unordered_map<int, Connection*> _connections; // fd → Connection 映射表
};
四、Accepter:处理新连接
4.1 Accepter 的职责
Accepter 专门负责处理"新连接到来"的事件。当 listen_sock 的读就绪触发时,调用 AccepterConnection 方法:
- 循环
accept(因为 ET 模式,必须把所有新连接都接受完) - 为每个新连接创建
Connection对象,注册回调函数 - 把新连接加入 TcpServer 的管理
4.2 Accepter 代码
// Accepter.hpp
#pragma once
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cerrno>
#include <functional>
#include "Connection.hpp"
#include "HandlerConnection.hpp" // 读写回调的实现
/**
* Accepter:新连接接受器
*
* 当 listen_sock 上有读事件就绪时,AccepterConnection 被调用
*/
class Accepter {
public:
/**
* 处理 listen_sock 上的新连接事件
*
* @param conn listen_sock 对应的 Connection 对象
*
* 使用 ET 模式时,必须循环 accept 直到 EAGAIN,
* 否则只能接受一个连接,其余连接被忽略
*/
static void AccepterConnection(Connection* conn) {
errno = 0;
while (true) {
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 非阻塞 accept:如果没有新连接,立即返回 EAGAIN
int sockfd = ::accept(conn->SockFd(), (struct sockaddr*)&peer, &len);
if (sockfd > 0) {
// 成功接受一个新连接
char ip_buf[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &peer.sin_addr, ip_buf, sizeof(ip_buf));
printf("[Accepter] 新连接:%s:%d, fd=%d\n",
ip_buf, ntohs(peer.sin_port), sockfd);
// 新连接设为非阻塞(ET 模式要求)
SetNonBlock(sockfd);
// 注册读、写、异常三个回调
// 这里用 std::bind 绑定静态成员函数
auto recver = std::bind(&HandlerConnection::Recver, std::placeholders::_1);
auto sender = std::bind(&HandlerConnection::Sender, std::placeholders::_1);
auto excepter = std::bind(&HandlerConnection::Excepter, std::placeholders::_1);
// 创建普通连接的 Connection,ET 模式 + 初始只监控读事件
Connection* normal_conn = ConnectionFactory::BuildNormalConnection(
sockfd, recver, sender, excepter,
EPOLLIN | EPOLLET, // 初始:只关注读就绪
conn->_R
);
normal_conn->SetClient(peer);
// 加入 TcpServer 管理
conn->_R->AddConnection(normal_conn);
} else {
// accept 失败
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多新连接了,正常退出循环
break;
} else if (errno == EINTR) {
// 被信号打断,重试
continue;
} else {
perror("accept error");
break;
}
}
}
}
};
五、HandlerConnection:读写事件处理
5.1 HandlerConnection 的职责
HandlerConnection 包含三个关键静态函数,分别处理:
- Recver:读就绪时,把数据从 socket 读到 inbuffer,然后调用上层协议处理
- Sender:写就绪时,把 outbuffer 里的数据发出去
- Excepter:异常时,清理连接
5.2 HandlerConnection 完整代码
// HandlerConnection.hpp
#pragma once
#include <iostream>
#include <cerrno>
#include <cstring>
#include <sys/socket.h>
#include "Connection.hpp"
const static int kBufferSize = 4096;
/**
* HandlerConnection:连接的读写异常处理器
*
* 所有方法都是静态的,通过 Connection 指针访问连接状态
*/
class HandlerConnection {
public:
/**
* Recver:处理读就绪事件
*
* 职责:把数据从 socket 读入 inbuffer,然后交给上层协议处理
* 不关心数据格式,只负责读取字节流
*/
static void Recver(Connection* conn) {
errno = 0;
char buffer[kBufferSize];
// ET 模式:循环读直到 EAGAIN,确保读完所有数据
while (true) {
ssize_t n = recv(conn->SockFd(), buffer, sizeof(buffer) - 1, 0);
if (n > 0) {
buffer[n] = '\0';
conn->AddInBuffer(buffer); // 追加到接收缓冲区
} else if (n == 0) {
// 对端关闭连接
printf("[Recver] fd=%d 对端关闭\n", conn->SockFd());
conn->_excepter(conn); // 触发异常处理(关闭连接)
return;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 缓冲区读完了,退出循环,正常情况
break;
} else if (errno == EINTR) {
continue; // 被信号中断,重试
} else {
// 真正的读错误
perror("recv error");
conn->_excepter(conn);
return;
}
}
}
printf("[Recver] fd=%d inbuffer: %s\n", conn->SockFd(), conn->InBuffer().c_str());
// 读取完毕,交给上层处理(解析协议、处理业务、构建响应)
HandlerRequest(conn);
}
/**
* Sender:处理写就绪事件
*
* 职责:把 outbuffer 里的数据发送出去
* 如果本次没发完,开启 EPOLLOUT 监控,等下次机会再发
*/
static void Sender(Connection* conn) {
errno = 0;
std::string& outbuffer = conn->OutBuffer();
// ET 模式:循环发送直到全部发完或缓冲区满
while (true) {
ssize_t n = send(conn->SockFd(),
outbuffer.c_str(),
outbuffer.size(),
0);
if (n >= 0) {
// 发出去了 n 个字节,从 outbuffer 中移除
outbuffer.erase(0, n);
if (outbuffer.empty()) {
// 全部发完了,退出循环
break;
}
// 还有数据没发,继续循环
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 发送缓冲区满了,本次发不完,退出循环
// 后面会开启 EPOLLOUT 监控,等缓冲区空了再来
break;
} else if (errno == EINTR) {
continue;
} else {
perror("send error");
conn->_excepter(conn);
return;
}
}
}
if (!conn->OutBufferEmpty()) {
// outbuffer 还有数据没发完
// 开启 EPOLLOUT 监控,等发送缓冲区可写时再来
printf("[Sender] fd=%d 数据未发完,开启 EPOLLOUT 监控\n", conn->SockFd());
conn->_R->EnableReadWrite(conn->SockFd(), true, true);
} else {
// 数据全部发完
// 关闭 EPOLLOUT 监控(避免 epoll_wait 频繁返回)
printf("[Sender] fd=%d 数据发送完毕,关闭 EPOLLOUT 监控\n", conn->SockFd());
conn->_R->EnableReadWrite(conn->SockFd(), true, false);
}
}
/**
* Excepter:处理异常事件(连接断开/错误)
*
* 职责:清理这条连接的所有资源
*/
static void Excepter(Connection* conn) {
int fd = conn->SockFd();
printf("[Excepter] 连接 fd=%d 发生异常,开始清理\n", fd);
// 1. 从 epoll 中移除,从 connections 表中删除
conn->_R->RemoveConnection(fd);
// 2. 关闭 socket fd
conn->Close();
// 3. 删除 Connection 对象,释放内存
delete conn;
printf("[Excepter] 连接 fd=%d 清理完毕\n", fd);
}
private:
/**
* HandlerRequest:上层业务处理
*
* 这里是协议解析和业务逻辑的入口
* 具体实现由业务层提供(这里用简单的回显作为示例)
*/
static void HandlerRequest(Connection* conn) {
std::string& inbuffer = conn->InBuffer();
if (inbuffer.empty()) return;
// 简单的回显服务:把收到的数据原样返回
// 实际业务中这里会进行协议解析、反序列化、业务处理、序列化
std::string response = "[Echo] " + inbuffer;
inbuffer.clear(); // 清空接收缓冲区
// 把响应放入发送缓冲区
conn->AddOutBuffer(response);
// 直接尝试发送(大多数情况下发送缓冲区有空间,可以直接发)
if (!conn->OutBufferEmpty()) {
conn->_sender(conn);
}
}
};
六、组装完整的 Reactor 服务器
6.1 main 函数:把所有组件组装起来
// reactor_main.cc
#include "TcpServer.hpp"
#include "Connection.hpp"
#include "Accepter.hpp"
#include "HandlerConnection.hpp"
#include <cstdlib>
int main(int argc, char* argv[]) {
int port = (argc > 1) ? atoi(argv[1]) : 8888;
// 1. 创建 TcpServer 实例
TcpServer server(port);
server.InitServer();
// 2. 创建 listen_sock 对应的 Connection
// 回调函数是 Accepter::AccepterConnection
// 使用 ET 模式(EPOLLIN | EPOLLET)
auto listen_recver = std::bind(&Accepter::AccepterConnection, std::placeholders::_1);
Connection* listen_conn = ConnectionFactory::BuildListenConnection(
server.ListenSock(),
listen_recver,
EPOLLIN | EPOLLET, // 监听 socket:ET 模式,监控读就绪
&server
);
// 3. 将 listen_conn 加入 TcpServer 管理
server.AddConnection(listen_conn);
printf("[Main] Reactor 服务器启动,端口 %d\n", port);
printf("[Main] 等待连接...\n");
// 4. 进入事件循环(永不返回,直到进程退出)
server.Loop();
return 0;
}
6.2 编译和测试
# 编译
g++ -std=c++17 reactor_main.cc -o reactor_server
# 运行
./reactor_server 8888
# 另一个终端测试
nc 127.0.0.1 8888
hello world
# 服务器回显:[Echo] hello world
# 用多个终端同时连接,验证并发处理
nc 127.0.0.1 8888 # 终端1
nc 127.0.0.1 8888 # 终端2
nc 127.0.0.1 8888 # 终端3
七、读写事件动态切换:核心工程细节
7.1 为什么要动态切换读写监控?
这是 Reactor 实现中最精妙的一个设计,很多人会困惑——为什么不一直开着 EPOLLOUT 监控?
如果一直开 EPOLLOUT:
发送缓冲区几乎总是有空间的(不满)
→ EPOLLOUT 几乎总是就绪
→ epoll_wait 几乎每次都立刻返回(因为 EPOLLOUT 就绪)
→ 即使没有数据要发,epoll_wait 也一直在唤醒
→ 浪费 CPU
正确的策略:
初始状态:只监控 EPOLLIN(只有读就绪才唤醒)
数据来了(EPOLLIN 就绪):
→ Recver 读数据 → HandlerRequest 处理 → 放入 outbuffer → 调用 Sender
Sender 发送数据:
- 如果一次发完:保持只监控 EPOLLIN
- 如果没发完(缓冲区满):
→ 开启 EPOLLOUT 监控(EnableReadWrite(fd, true, true))
→ epoll_wait 等 EPOLLOUT 就绪
→ 发送缓冲区空了 → 再次调用 Sender → 继续发剩余数据
→ 发完后关闭 EPOLLOUT(EnableReadWrite(fd, true, false))
状态机:
┌─────────────────┐
│ 只监控 EPOLLIN │ (正常状态)
└────────┬────────┘
│ 发送缓冲区满,数据未发完
↓
┌─────────────────────────┐
│ 监控 EPOLLIN + EPOLLOUT │ (等待写就绪)
└────────┬────────────────┘
│ EPOLLOUT 就绪,数据发完
↓
┌─────────────────┐
│ 只监控 EPOLLIN │ (回到正常状态)
└─────────────────┘
7.2 EnableReadWrite 的实现
这个函数在 TcpServer 里,用 epoll_ctl(EPOLL_CTL_MOD) 动态修改 fd 的监控事件:
void EnableReadWrite(int fd, bool enable_read, bool enable_write) {
auto it = _connections.find(fd);
if (it == _connections.end()) return;
uint32_t events = 0;
if (enable_read) events |= EPOLLIN;
if (enable_write) events |= EPOLLOUT;
events |= EPOLLET; // 保持 ET 模式
it->second->SetEvents(events);
struct epoll_event ev;
ev.data.fd = fd;
ev.events = events;
// 修改这个 fd 在 epoll 中的监控事件
epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &ev);
}
八、Reactor 模式的数据流全景
8.1 一次完整请求的处理流程
客户端发送 "hello"
|
| TCP 传输
↓
[网卡收到数据包]
|
| 驱动中断,内核处理
↓
[内核 TCP 接收缓冲区]
|
| ep_poll_callback 回调
↓
[epoll 就绪队列:fd=5 的 EPOLLIN]
|
| epoll_wait 返回
↓
[TcpServer::Loop]
|
| 查找 _connections[5],找到 Connection 对象
↓
[conn._recver(conn) 被调用]
|
| Recver 循环 recv,把 "hello\n" 放入 inbuffer
↓
[HandlerRequest(conn) 被调用]
|
| 解析协议,处理业务,构建响应 "[Echo] hello\n"
| 把响应放入 outbuffer
↓
[conn._sender(conn) 被调用]
|
| Sender 把 outbuffer 发送到 socket
| 如果一次发完:关闭 EPOLLOUT 监控
| 如果没发完:开启 EPOLLOUT 监控,等下次
↓
[内核 TCP 发送缓冲区]
|
| TCP 传输
↓
客户端收到 "[Echo] hello"
九、Reactor 的进阶:单 Reactor vs 多 Reactor
9.1 单 Reactor 单线程(我们实现的版本)
┌───────────────────────────────┐
│ 主线程 │
│ │
│ epoll_wait │
│ ↓ 事件来了 │
│ dispatch → handler │
│ (接受连接 + 读写 + 业务处理) │
└───────────────────────────────┘
优点:简单,无锁,Redis 用的就是这个
缺点:单核,业务处理耗时会影响响应速度
9.2 单 Reactor 多线程
┌───────────────────────────────┐
│ 主线程(IO线程) │
│ epoll_wait │
│ ↓ 事件来了 │
│ dispatch:接受连接 + 读写 │
│ ↓ 把请求交给线程池 │
└───────────────────────────────┘
│
┌────────────┼────────────┐
↓ ↓ ↓
[工作线程1] [工作线程2] [工作线程3]
业务处理 业务处理 业务处理
优点:业务处理不阻塞 IO 线程
缺点:epoll 是单线程的,仍是单点
9.3 多 Reactor 多线程(主从 Reactor,Nginx/Netty 用的)
┌─────────────────────────────────┐
│ 主 Reactor(主线程) │
│ epoll_wait(只监控 listen_sock) │
│ ↓ 新连接来了 │
│ accept → 分配给一个子 Reactor │
└─────────────────────────────────┘
│
┌────────────┼────────────┐
↓ ↓ ↓
[子 Reactor 1] [子 Reactor 2] [子 Reactor 3]
epoll_wait epoll_wait epoll_wait
读写 + 业务 读写 + 业务 读写 + 业务
优点:
- 主 Reactor 专注接收连接
- 子 Reactor 分摊读写压力
- 充分利用多核 CPU
适用:Nginx、Netty、各大 RPC 框架
十、性能优化建议
10.1 读写缓冲区优化
当前我们用 std::string 作为缓冲区,性能有限。生产环境中可以考虑:
// 方案1:环形缓冲区(RingBuffer)
// 避免频繁的内存分配和数据移动
class RingBuffer {
char* data_;
size_t head_, tail_, capacity_;
public:
void Write(const char* src, size_t len);
size_t Read(char* dst, size_t maxlen);
};
// 方案2:scatter-gather IO(分散读/聚集写)
// 使用 readv/writev,减少系统调用次数
struct iovec iov[2];
iov[0].iov_base = buf1;
iov[0].iov_len = len1;
iov[1].iov_base = buf2;
iov[1].iov_len = len2;
ssize_t n = readv(fd, iov, 2); // 一次系统调用读到两个缓冲区
10.2 连接管理优化
// 连接超时检测:维护一个时间堆(最小堆),定期清理超时连接
// 避免僵尸连接占用资源
class TimeWheel {
// 时间轮,O(1) 的超时管理
};
// EPOLLONESHOT:确保一个 fd 在同一时刻只被一个线程处理
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
十一、常见问题解答
11.1 Connection 的 _R 指针有什么用?
_R 是指向 TcpServer 的指针(回指指针)。在事件处理函数(Recver/Sender/Excepter)中,需要调用服务器提供的接口,比如:
conn->_R->EnableReadWrite(fd, true, true):修改 fd 的监控事件conn->_R->RemoveConnection(fd):从服务器中删除连接
没有这个指针,处理函数就无法访问 TcpServer 的功能。
11.2 listen_sock 用 LT 还是 ET?
如果 listen_sock 用 ET 模式,当大量连接同时到来时,epoll_wait 只返回一次(ET 只通知一次状态变化),如果你只调用一次 accept,剩余的连接就永远不会被接受。
解决方案:
- 方案1:listen_sock 用 LT,不用 ET(简单可靠)
- 方案2:listen_sock 用 ET,但在
AccepterConnection中循环 accept 直到 EAGAIN
我们代码中用了方案2(非阻塞 listen_sock + 循环 accept)。
11.3 delete conn 之后还能访问 conn->SockFd() 吗?
不能!delete 之后,对象的内存已经释放,访问任何成员都是未定义行为。
正确的 Excepter 实现顺序:
static void Excepter(Connection* conn) {
int fd = conn->SockFd(); // 先保存 fd(在 delete 之前)
conn->_R->RemoveConnection(fd); // 从管理表删除
conn->Close(); // 关闭 fd
delete conn; // 最后 delete,之后不再访问 conn
printf("fd=%d 清理完毕\n", fd); // 用保存的 fd 打印,不访问 conn
}
为避免回调中直接 delete 造成悬空指针,生产实现通常会采用“延迟销毁”:只标记连接为 closing,把 fd 从 epoll 移除并 close,最后在 event loop 的安全点统一释放对象。
十二、总结
12.1 Reactor 模式核心要点
| # | 要点 | 关键细节 |
|---|---|---|
| 1 | 事件驱动 | 不主动轮询,epoll 通知 → 自动调用对应回调 |
| 2 | Connection 抽象 | 每个连接有自己的 inbuffer、outbuffer 和三个回调 |
| 3 | 回指指针 _R |
回调函数通过 _R 调用 TcpServer 提供的管理接口 |
| 4 | 动态读写监控 | 默认只开 EPOLLIN;数据未发完时开 EPOLLOUT;发完关 EPOLLOUT |
| 5 | ET + 非阻塞 | 所有 IO 操作必须非阻塞,循环读写直到 EAGAIN |
12.2 整个系列的知识脉络
第一篇:五种 IO 模型
↓ 理解了等待和拷贝,理解了为什么需要 IO 多路复用
第二篇:select
↓ 位图、一个线程监控多个 fd、但有数量限制和性能问题
第三篇:poll
↓ 解决了数量限制,接口更清晰,但本质问题还在
第四篇:epoll
↓ 红黑树+就绪链表+回调,O(1) 获取就绪 fd,LT/ET 两种模式
第五篇:Reactor
↓ 在 epoll 之上建立优雅的架构:事件驱动+回调+Connection 抽象
↓ 这是 Nginx、Redis、Node.js 等的核心架构思想
💬 总结:Reactor 模式是我们这个系列的终点,也是工程实践的起点。它把 epoll 的事件通知能力包装成一个优雅的框架:Connection 负责状态管理,Accepter 负责接受连接,HandlerConnection 负责读写处理,TcpServer 负责事件分发。动态切换读写监控是 Reactor 实现中最精妙的细节——只在需要时监控写就绪,避免无效的 epoll_wait 唤醒。理解了这个系列,你就具备了独立分析 Nginx/Redis 核心 IO 架构的基础能力。
👍 点赞、收藏与分享:这是 Linux 网络编程高级系列的完结篇。从五种 IO 模型到 Reactor 反应堆,五篇文章构成了一个完整的知识体系。如果对你有帮助,点个赞支持一下吧!🚀✨
更多推荐



所有评论(0)