通信:(3) 高并发网络通信:epoll + 边沿触发 + 非阻塞 IO + tcp
本文详细介绍了Linux epoll机制及其在高并发网络编程中的应用。主要内容包括: epoll核心API使用: 创建epoll树(epoll_create/epoll_create1) 管理epoll节点(epoll_ctl) 等待事件(epoll_wait) 详细对比了水平触发(LT)和边沿触发(ET)模式的特点及适用场景 高并发架构设计: 基于epoll ET模式+非阻塞IO实现Reacto
1. epoll 基础
1.1 核心 API
1.1.1 创建 epoll 树 (epoll_create / epoll_create1)
// 1. 创建epoll树节点epfd
/* 方法1:
* @function: int epoll_create(int size);
* @param: size: 内核监听的最大数量, 只要大于0默认1024
* @return: 成功返回epoll模型的文件描述符, -1 失败
*/
int m_epfd = epoll_create(1024);
if(m_epfd < 0){
LOG_ERROR("epoll_create() error: %s", strerror(errno));
std::exit(-1);
}
/* 方法2:
* @function: int epoll_create1(int flags);
* @param: flags: 标志位:
* - 0:默认行为,等同于 epoll_create()
* - EPOLL_CLOEXEC:子进程执行exec时关闭epoll模型的文件描述符
* @return: 成功返回epoll模型的文件描述符, -1 失败
*/
int epoll_create1(int flags);
- 功能:在内核中创建一棵红黑树,用于存储需要监控的文件描述符。
- 函数对比:
- oll_create1(int flags)内核监听数量(现内核忽略此值,只要>0 即可),返回 epoll 文件描述符 (epfd)。
- epoll_create1(int flags):新版本接口,推荐使用。
- flags = 0:等同于 epoll_create。
- flags = EPOLL_CLOEXEC:安全推荐。当进程执行 exec 加载新程序时,自动关闭该 fd,防止 fd 泄露给子进程。
- 返回值:成功返回 epfd,失败返回 -1。
1.1.2 管理 epoll 节点 (epoll_ctl)
// 2. 向epoll树上添加节点 lfd
/*
* @function: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
* @param: epfd: epoll树的文件描述符
* @param: op: 操作类型(EPOLL_CTL_ADD:添加、EPOLL_CTL_DEL:删除、EPOLL_CTL_MOD:修改)
* @param: fd: 要操作的文件描述符(要添加/删除/修改的节点)
* @param: event: 事件结构体指针(指向struct epoll_event结构体), 设置fd对应的事件
* @return: 0 成功, -1 失败
*/
/*
* struct epoll_event {
* uint32_t events;
* epoll_data_t data;
* };
* events:
* EPOLLIN: 读事件
* EPOLLOUT: 写事件
*
* 【触发模式说明】
* epoll 默认使用 **水平触发(Level-Triggered, LT)** 模式。
* - LT 模式:只要文件描述符上有未处理完的事件(例如读缓冲区还有数据),epoll_wait 就会持续返回该 fd。
* 编程简单,但可能因频繁唤醒造成性能开销。
*
* 若显式指定 **EPOLLET** 标志,则启用 **边沿触发(Edge-Triggered, ET)** 模式。
* - ET 模式:仅在 fd 状态发生变化时(如从无数据到有数据)通知一次。
* 要求应用程序必须一次性读/写完所有数据(通常配合非阻塞 I/O + 循环 read/write),
* 否则可能丢失事件(因为后续 epoll_wait 不会再次提醒,直到下一次状态变化)。
* 性能更高,但编程复杂度增加。
*
* 【使用建议】
* - 监听套接字(lfd)通常使用 LT 模式(默认),因为 accept 一般能立即处理完新连接。
* - 客户端连接套接字(cfd)常使用 ET + 非阻塞 I/O,以实现高性能事件驱动(如 muduo、Nginx 等)。
*
* typedef union epoll_data {
* void *ptr; // 当 epoll_wait返回事件时,可以通过 ptr直接获取到与该文件描述符(fd)关联的用户自定义对象(如 muduo 中的 Channel对象)
* // 如果不使用 ptr,通常需要维护一个 fd → 对象的映射表来查找关联对象。ptr可以直接存储对象指针,省去查找开销。
* int fd; // 常用
* uint32_t u32;
* uint64_t u64;
* } epoll_data_t; // 是个联合体,只能使用其中的一个成员,常用fd
*/
int lfd = m_listener -> getfd(); //监听套接字
struct epoll_event event;
event.events = EPOLLIN; // 监听读事件(默认为 LT 模式)
event.data.fd = lfd; // 要监听的文件描述符
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event); // 将监听套接字添加到epoll树上
if(ret < 0){
LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
std::exit(-1);
}
- 功能:对 epoll 树进行增、删、改操作。
- 参数详解:
- op:操作类型。
- EPOLL_CTL_ADD:添加新 fd。
- EPOLL_CTL_DEL:删除 fd。
- EPOLL_CTL_MOD:修改 fd 的监听事件。
- event:struct epoll_event 结构体,核心配置。
- events:监听的事件类型(如 EPOLLIN 读就绪)。
- data:联合体 (epoll_data_t),用于存储用户数据。
- data.fd:最常用,存储 fd 本身。
- data.ptr:高级用法。存储自定义对象指针(如 TcpConn*)。epoll_wait 返回时可直接获取对象,避免通过 fd 查表的开销(O(1) vs O(logN))。
- op:操作类型。
1.1.3 等待事件 (epoll_wait)
// 3. 检测
struct epoll_event evs[1024];
while(1){
/*
* @function: int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
* @param: epfd: epoll模型的文件描述符
* @param: evs: 传出参数, 事件结构体数组(指向struct epoll_event结构体)
* @param: maxevents: 数组的大小
* @param: timeout: 超时时间(-1:阻塞、0:非阻塞、>0:毫秒)
* @return: 成功返回就绪的文件描述符数量, -1 失败
*/
int num = epoll_wait(epfd, evs, sizeof(evs) / sizeof(evs[0]), -1);
if(num < 0){
LOG_ERROR("epoll_wait() error: %s", strerror(errno));
std::exit(-1);
}
for(int i = 0; i < num; i++){
int curfd = evs[i].data.fd;
if(curfd == lfd){
// 有新连接
// 1. 接受新连接
TcpConn* tcpConn = m_listener->acceptConn();
if(!tcpConn){
// 建立连接失败, 重连
LOG_ERROR("acceptConn() error.");
continue;
}
LOG_INFO("检测到新的连接: %s:%d", tcpConn->getPeerIP().data(), tcpConn->getPeerPort());
int cfd = tcpConn->getfd();
// 2.设置 cfd 为非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
// 3. 加入epoll树(边沿模式)
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 启用边沿触发(ET)模式
ev.data.fd = cfd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
if(ret < 0){
LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
std::exit(-1);
}
LOG_INFO("新连接已成功加入 epoll 树(ET 模式)");
}else{
// 有通信
LOG_INFO("检测到通信");
// 【重要】在 ET 模式下,必须循环读取直到返回 EAGAIN/EWOULDBLOCK,
// 否则剩余数据将不会再次触发 epoll_wait!
// 示例(此处仅为示意,实际需在对应处理函数中实现):
// while (true) {
// ssize_t n = read(curfd, buf, sizeof(buf));
// if (n > 0) { /* 处理数据 */ }
// else if (n == 0) { /* 对端关闭 */ break; }
// else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// break; // 数据已读完
// } else {
// /* 错误处理 */
// break;
// }
// }
}
}
}
- 功能:阻塞等待,直到有注册的 fd 发生事件。
- 参数:
- evs:输出参数,内核将就绪的事件填充到此数组。
- maxevents:数组大小,即一次最多获取多少个事件。
- timeout:
- -1:永久阻塞(最常用)。
- 0:非阻塞,立即返回。
- >0:超时毫秒数。
- 返回值:就绪的 fd 数量。
1.1 epoll 使用流程
// 1. 创建epoll树节点epfd
/* 方法1:
* @function: int epoll_create(int size);
* @param: size: 内核监听的最大数量, 只要大于0默认1024
* @return: 成功返回epoll模型的文件描述符, -1 失败
*/
int m_epfd = epoll_create(1024);
if(m_epfd < 0){
LOG_ERROR("epoll_create() error: %s", strerror(errno));
std::exit(-1);
}
/* 方法2:
* @function: int epoll_create1(int flags);
* @param: flags: 标志位:
* - 0:默认行为,等同于 epoll_create()
* - EPOLL_CLOEXEC:子进程执行exec时关闭epoll模型的文件描述符
* @return: 成功返回epoll模型的文件描述符, -1 失败
*/
int epoll_create1(int flags);
// 2. 向epoll树上添加节点 lfd
/*
* @function: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
* @param: epfd: epoll树的文件描述符
* @param: op: 操作类型(EPOLL_CTL_ADD:添加、EPOLL_CTL_DEL:删除、EPOLL_CTL_MOD:修改)
* @param: fd: 要操作的文件描述符(要添加/删除/修改的节点)
* @param: event: 事件结构体指针(指向struct epoll_event结构体), 设置fd对应的事件
* @return: 0 成功, -1 失败
*/
/*
* struct epoll_event {
* uint32_t events;
* epoll_data_t data;
* };
* events:
* EPOLLIN: 读事件
* EPOLLOUT: 写事件
*
* 【触发模式说明】
* epoll 默认使用 **水平触发(Level-Triggered, LT)** 模式。
* - LT 模式:只要文件描述符上有未处理完的事件(例如读缓冲区还有数据),epoll_wait 就会持续返回该 fd。
* 编程简单,但可能因频繁唤醒造成性能开销。
*
* 若显式指定 **EPOLLET** 标志,则启用 **边沿触发(Edge-Triggered, ET)** 模式。
* - ET 模式:仅在 fd 状态发生变化时(如从无数据到有数据)通知一次。
* 要求应用程序必须一次性读/写完所有数据(通常配合非阻塞 I/O + 循环 read/write),
* 否则可能丢失事件(因为后续 epoll_wait 不会再次提醒,直到下一次状态变化)。
* 性能更高,但编程复杂度增加。
*
* 【使用建议】
* - 监听套接字(lfd)通常使用 LT 模式(默认),因为 accept 一般能立即处理完新连接。
* - 客户端连接套接字(cfd)常使用 ET + 非阻塞 I/O,以实现高性能事件驱动(如 muduo、Nginx 等)。
*
* typedef union epoll_data {
* void *ptr; // 当 epoll_wait返回事件时,可以通过 ptr直接获取到与该文件描述符(fd)关联的用户自定义对象(如 muduo 中的 Channel对象)
* // 如果不使用 ptr,通常需要维护一个 fd → 对象的映射表来查找关联对象。ptr可以直接存储对象指针,省去查找开销。
* int fd; // 常用
* uint32_t u32;
* uint64_t u64;
* } epoll_data_t; // 是个联合体,只能使用其中的一个成员,常用fd
*/
int lfd = m_listener -> getfd(); //监听套接字
struct epoll_event event;
event.events = EPOLLIN; // 监听读事件(默认为 LT 模式)
event.data.fd = lfd; // 要监听的文件描述符
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event); // 将监听套接字添加到epoll模型中
if(ret < 0){
LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
std::exit(-1);
}
// 3. 检测
struct epoll_event evs[1024];
while(1){
/*
* @function: int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
* @param: epfd: epoll模型的文件描述符
* @param: evs: 传出参数, 事件结构体数组(指向struct epoll_event结构体)
* @param: maxevents: 数组的大小
* @param: timeout: 超时时间(-1:阻塞、0:非阻塞、>0:毫秒)
* @return: 成功返回就绪的文件描述符数量, -1 失败
*/
int num = epoll_wait(epfd, evs, sizeof(evs) / sizeof(evs[0]), -1);
if(num < 0){
LOG_ERROR("epoll_wait() error: %s", strerror(errno));
std::exit(-1);
}
for(int i = 0; i < num; i++){
int curfd = evs[i].data.fd;
if(curfd == lfd){
// 有新连接
// 1. 接受新连接
TcpConn* tcpConn = m_listener->acceptConn();
if(!tcpConn){
// 建立连接失败, 重连
LOG_ERROR("acceptConn() error.");
continue;
}
LOG_INFO("检测到新的连接: %s:%d", tcpConn->getPeerIP().data(), tcpConn->getPeerPort());
int cfd = tcpConn->getfd();
// 2.设置 cfd 为非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
// 3. 加入epoll树(边沿模式)
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 启用边沿触发(ET)模式
ev.data.fd = cfd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
if(ret < 0){
LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
std::exit(-1);
}
LOG_INFO("新连接已成功加入 epoll 树(ET 模式)");
}else{
// 有通信
LOG_INFO("检测到通信");
// 【重要】在 ET 模式下,必须循环读取直到返回 EAGAIN/EWOULDBLOCK,
// 否则剩余数据将不会再次触发 epoll_wait!
// 示例(此处仅为示意,实际需在对应处理函数中实现):
// while (true) {
// ssize_t n = read(curfd, buf, sizeof(buf));
// if (n > 0) { /* 处理数据 */ }
// else if (n == 0) { /* 对端关闭 */ break; }
// else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// break; // 数据已读完
// } else {
// /* 错误处理 */
// break;
// }
// }
}
}
}
1.2 LT 模式 vs ET 模式
|
特性 |
水平触发 (LT, Level-Triggered) |
边沿触发 (ET, Edge-Triggered) |
|---|---|---|
|
默认情况 |
epoll 默认模式 |
需显式添加 标志 |
|
触发条件 |
只要缓冲区有数据,每次 wait 都会通知 |
仅在状态变化时通知一次(如从无数据到有数据) |
|
编程难度 |
低(读不完下次还会提醒) |
高(必须一次性读完,否则数据会滞留) |
|
性能 |
较高,但频繁唤醒可能有开销 |
最高,减少系统调用次数 |
|
代码中的应用 |
监听套接字 (lfd) |
连接套接字 (cfd) |
|
原因 |
accept 处理快,LT 足够且稳定 |
客户端数据量大,ET 减少唤醒,提升吞吐量 |
2. 高并发编程
2.1 架构
基于 Linux Epoll + ET (Edge-Triggered) 模式 + 非阻塞 IO 构建,采用 Reactor 设计模式。
- 高并发:单线程即可处理成千上万个连接。
- 高性能:利用 ET 模式减少内核态与用户态的切换次数。
- 稳定性:完善的缓冲区管理,处理 TCP 粘包/拆包及背压(Backpressure)。
2.2 核心模块
2.2.1 应用层缓冲区 (Buffer 类)
作用:管理用户态内存缓冲区,解决 TCP 粘包/拆包问题,配合非阻塞 IO 处理 EAGAIN 错误。
// ================= 1. 应用层缓冲区 (Buffer) =================
// 用于解决 TCP 粘包/拆包,以及暂存待发送数据
class Buffer {
public:
Buffer() : readPos_(0), writePos_(0) {
buffer_.resize(BUFFER_SIZE);
}
// 获取可读字节数
size_t readableBytes() const { return writePos_ - readPos_; }
// 获取可写字节数
size_t writableBytes() const { return buffer_.size() - writePos_; }
// 获取读指针
const char* peek() const { return begin() + readPos_; }
// 读取数据后移动指针
void retrieve(size_t len) { readPos_ += len; }
void retrieveAll() { readPos_ = 0; writePos_ = 0; }
// 写入数据
void append(const char* data, size_t len) {
if (writableBytes() < len) {
// 简单扩容策略
buffer_.resize(writePos_ + len);
}
std::copy(data, data + len, begin() + writePos_);
writePos_ += len;
}
// 从 fd 读取数据到 buffer (核心:ET 模式必须循环读)
ssize_t readFd(int fd, int* saveErrno) {
char buff[BUFFER_SIZE];
ssize_t len = -1;
do {
len = read(fd, buff, sizeof(buff));
if (len < 0) {
*saveErrno = errno;
// EAGAIN: 数据读完 (非阻塞下); EINTR: 被信号中断
if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
len = 0;
}
break;
} else if (len == 0) {
// 对端关闭
break;
} else {
append(buff, len);
}
} while (len > 0); // ET 模式关键:直到 read 返回 <0 或 0
return len;
}
// 从 buffer 发送数据到 fd
ssize_t writeFd(int fd, int* saveErrno) {
ssize_t len = -1;
do {
size_t readSize = readableBytes();
if (readSize == 0) break;
len = write(fd, peek(), readSize);
if (len < 0) {
*saveErrno = errno;
if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
len = 0; // 暂时发不动,下次再发
}
break;
}
retrieve(len); // 发送成功,移动读指针
} while (len > 0); // 尝试发送直到缓冲区空或 EAGAIN
return len;
}
private:
std::vector<char> buffer_;
size_t readPos_;
size_t writePos_;
char* begin() { return &*buffer_.begin(); }
const char* begin() const { return &*buffer_.begin(); }
};
|
成员/方法 |
可见性 |
功能说明 |
|---|---|---|
|
buffer_ |
private |
底层数据存储容器,动态扩容。 |
|
readPos_ |
private |
读指针,指向当前未处理数据的起始位置。 |
|
writePos_ |
private |
写指针,指向当前可用空间的起始位置。 |
|
readableBytes |
public |
返回当前缓冲区中可读的数据长度 (writePos_ - readPos_)。 |
|
writableBytes |
public |
返回当前缓冲区剩余可写空间长度。 |
|
peek |
public |
返回读指针位置的常量指针,用于读取数据但不移动指针。 |
|
retrieve |
public |
移动读指针,表示前 len 字节数据已处理完毕。 |
|
retrieveAll |
public |
重置读写指针为 0,清空缓冲区。 |
|
append(data, len) |
public |
将数据写入缓冲区末尾,自动扩容空间不足时。 |
|
readFd(fd, errno) |
public |
核心方法。循环从 fd 读取数据到缓冲区。 必须循环 read,直到返回 -1 且 errno == EAGAIN,确保取空内核缓冲区。 |
|
writeFd(fd, errno) |
public |
核心方法。循环将缓冲区数据写入 fd。 必须循环 write,若遇到 EAGAIN 则停止,保留剩余数据待下次发送。 |
|
begin() |
private |
返回 vector 底层数组首地址,供内部使用。 |
2.2.2 连接对象 (Connection 类)
作用:封装单个客户端连接的状态、文件描述符及对应的收发缓冲区。
// ================= 2. 连接对象 (Connection) =================
class Connection {
public:
Connection(int fd) : fd_(fd), isClosed_(false) {}
~Connection() {
if (fd_ > 0) close(fd_);
}
int getFd() const { return fd_; }
bool isClosed() const { return isClosed_; }
void setClosed() { isClosed_ = true; }
Buffer& inputBuffer() { return inBuf_; }
Buffer& outputBuffer() { return outBuf_; }
// 处理业务逻辑 (示例:Echo Server)
void handleRequest() {
// 简单 Echo 逻辑:收到什么发回什么
size_t n = inBuf_.readableBytes();
if (n > 0) {
outBuf_.append(inBuf_.peek(), n);
inBuf_.retrieveAll();
}
}
private:
int fd_;
bool isClosed_;
Buffer inBuf_; // 接收缓冲区
Buffer outBuf_; // 发送缓冲区
};
2.2.3 服务器核心 (EpollServer 类)
作用:实现 Reactor 模式,管理事件循环、连接生命周期及事件分发。
// ================= 3. Epoll 服务器核心 (EpollServer) =================
class EpollServer {
public:
EpollServer() : epfd_(-1), listenfd_(-1) {}
~EpollServer() {
if (epfd_ >= 0) close(epfd_);
if (listenfd_ >= 0) close(listenfd_);
}
bool init(uint16_t port) {
// 1. 创建监听 socket
listenfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd_ < 0) { LOG_ERROR("socket create error"); return false; }
// 2. 地址复用
int opt = 1;
setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 3. 绑定
struct sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(listenfd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOG_ERROR("bind error"); return false;
}
// 4. 监听
if (listen(listenfd_, 1024) < 0) {
LOG_ERROR("listen error"); return false;
}
// 5. 设置监听 fd 为非阻塞 (ET 模式必须)
setNonBlocking(listenfd_);
// 6. 创建 epoll
epfd_ = epoll_create1(0);
if (epfd_ < 0) { LOG_ERROR("epoll_create error"); return false; }
// 7. 将监听 fd 加入 epoll (监听 fd 通常用 LT 或 ET 均可,这里统一用 ET)
addFd(listenfd_, EPOLLIN | EPOLLET);
LOG_INFO("Server started on port %d", port);
return true;
}
void start() {
struct epoll_event events[MAX_EVENTS];
while (true) {
// 8. 等待事件
int nfds = epoll_wait(epfd_, events, MAX_EVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) continue; // 被信号中断,继续
LOG_ERROR("epoll_wait error");
break;
}
for (int i = 0; i < nfds; ++i) {
int curFd = events[i].data.fd;
uint32_t revents = events[i].events;
if (curFd == listenfd_) {
handleAccept();
} else if (revents & EPOLLIN) {
handleRead(curFd);
} else if (revents & EPOLLOUT) {
handleWrite(curFd);
} else if (revents & (EPOLLHUP | EPOLLERR)) {
closeConnection(curFd);
}
}
}
}
private:
// 设置非阻塞
void setNonBlocking(int fd) {
int flag = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
// 添加/修改 epoll 事件
void modFd(int fd, uint32_t events) {
struct epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);
}
void addFd(int fd, uint32_t events) {
struct epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
}
void delFd(int fd) {
epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);
}
// 处理新连接
void handleAccept() {
struct sockaddr_in clientAddr{};
socklen_t len = sizeof(clientAddr);
// ET 模式 accept 也必须循环,直到返回 EAGAIN
while (true) {
int connFd = accept(listenfd_, (struct sockaddr*)&clientAddr, &len);
if (connFd < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 连接处理完毕
}
LOG_ERROR("accept error");
break;
}
// 设置新连接为非阻塞
setNonBlocking(connFd);
// 加入 epoll,监听读事件 + ET 模式
addFd(connFd, EPOLLIN | EPOLLET);
// 创建连接对象
connections_[connFd] = std::make_unique<Connection>(connFd);
LOG_INFO("New connection: fd=%d, IP=%s", connFd, inet_ntoa(clientAddr.sin_addr));
}
}
// 处理读事件 (ET 模式核心)
void handleRead(int fd) {
auto it = connections_.find(fd);
if (it == connections_.end()) return;
Connection* conn = it->second.get();
int saveErrno = 0;
// 循环读取,直到 EAGAIN
ssize_t n = conn->inputBuffer().readFd(fd, &saveErrno);
if (n < 0) {
// 真正错误
closeConnection(fd);
} else if (n == 0) {
// 对端关闭
closeConnection(fd);
} else {
// 有数据,处理业务
conn->handleRequest();
// 如果有数据要回写,且当前未监听 OUT 事件,则修改监听
if (conn->outputBuffer().readableBytes() > 0) {
// 注意:ET 模式下,如果 write 可能阻塞,必须监听 EPOLLOUT
modFd(fd, EPOLLIN | EPOLLOUT | EPOLLET);
handleWrite(fd); // 尝试立即发送
}
}
}
// 处理写事件
void handleWrite(int fd) {
auto it = connections_.find(fd);
if (it == connections_.end()) return;
Connection* conn = it->second.get();
int saveErrno = 0;
ssize_t n = conn->outputBuffer().writeFd(fd, &saveErrno);
if (n < 0) {
closeConnection(fd);
} else {
// 如果发送缓冲区空了,不再监听写事件,只监听读事件 (减少唤醒)
if (conn->outputBuffer().readableBytes() == 0) {
modFd(fd, EPOLLIN | EPOLLET);
}
}
}
// 关闭连接
void closeConnection(int fd) {
auto it = connections_.find(fd);
if (it != connections_.end()) {
LOG_INFO("Close connection: fd=%d", fd);
delFd(fd);
connections_.erase(it); // unique_ptr 自动析构,关闭 fd
}
}
private:
int epfd_;
int listenfd_;
std::unordered_map<int, std::unique_ptr<Connection>> connections_;
};
|
成员/方法 |
可见性 |
功能说明 |
ET 模式关键点 |
|---|---|---|---|
|
|
private |
epoll 实例的文件描述符。 |
内核事件表句柄。 |
|
|
private |
监听套接字文件描述符。 |
用于 accept 新连接。 |
|
|
private |
管理所有活跃连接对象 (fd -> Connection*)。 |
确保连接对象生命周期与 fd 一致。 |
|
|
public |
创建 socket,绑定端口,监听,创建 epoll,注册监听 fd。 |
监听 fd 也设为 非阻塞 + ET。 |
|
|
public |
主事件循环。调用 |
阻塞等待事件,唤醒后遍历就绪数组。 |
|
|
private |
利用 |
ET 模式必须配合非阻塞 IO。 |
|
|
private |
封装 |
新连接默认只监听 |
|
|
private |
封装 |
动态切换 EPOLLOUT,避免空转。 |
|
|
private |
封装 |
关闭连接前必须移除。 |
|
|
private |
处理新连接事件。循环 |
必须循环 accept,防止连接积压。 |
|
|
private |
处理读事件。调用 |
读取后若需回复,注册 EPOLLOUT 并尝试发送。 |
|
|
private |
处理写事件。调用 |
发送完毕后,注销 EPOLLOUT,只监听读。 |
|
|
private |
关闭连接。从 epoll 移除,从 map 删除,关闭 fd。 |
确保资源彻底释放,防止野指针。 |
2.2 类图

2.3 源码
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <memory>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
// ================= 配置常量 =================
const int MAX_EVENTS = 1024;
const int LISTEN_PORT = 9999;
const int BUFFER_SIZE = 4096;
// ================= 日志宏 =================
#define LOG_INFO(fmt, ...) printf("[INFO] " fmt "\n", ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) printf("[ERROR] " fmt "\n", ##__VA_ARGS__)
// ================= 1. 应用层缓冲区 (Buffer) =================
// 用于解决 TCP 粘包/拆包,以及暂存待发送数据
class Buffer {
public:
Buffer() : readPos_(0), writePos_(0) {
buffer_.resize(BUFFER_SIZE);
}
// 获取可读字节数
size_t readableBytes() const { return writePos_ - readPos_; }
// 获取可写字节数
size_t writableBytes() const { return buffer_.size() - writePos_; }
// 获取读指针
const char* peek() const { return begin() + readPos_; }
// 读取数据后移动指针
void retrieve(size_t len) { readPos_ += len; }
void retrieveAll() { readPos_ = 0; writePos_ = 0; }
// 写入数据
void append(const char* data, size_t len) {
if (writableBytes() < len) {
// 简单扩容策略
buffer_.resize(writePos_ + len);
}
std::copy(data, data + len, begin() + writePos_);
writePos_ += len;
}
// 从 fd 读取数据到 buffer (核心:ET 模式必须循环读)
ssize_t readFd(int fd, int* saveErrno) {
char buff[BUFFER_SIZE];
ssize_t len = -1;
do {
len = read(fd, buff, sizeof(buff));
if (len < 0) {
*saveErrno = errno;
// EAGAIN: 数据读完 (非阻塞下); EINTR: 被信号中断
if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
len = 0;
}
break;
} else if (len == 0) {
// 对端关闭
break;
} else {
append(buff, len);
}
} while (len > 0); // ET 模式关键:直到 read 返回 <0 或 0
return len;
}
// 从 buffer 发送数据到 fd
ssize_t writeFd(int fd, int* saveErrno) {
ssize_t len = -1;
do {
size_t readSize = readableBytes();
if (readSize == 0) break;
len = write(fd, peek(), readSize);
if (len < 0) {
*saveErrno = errno;
if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
len = 0; // 暂时发不动,下次再发
}
break;
}
retrieve(len); // 发送成功,移动读指针
} while (len > 0); // 尝试发送直到缓冲区空或 EAGAIN
return len;
}
private:
std::vector<char> buffer_;
size_t readPos_;
size_t writePos_;
char* begin() { return &*buffer_.begin(); }
const char* begin() const { return &*buffer_.begin(); }
};
// ================= 2. 连接对象 (Connection) =================
class Connection {
public:
Connection(int fd) : fd_(fd), isClosed_(false) {}
~Connection() {
if (fd_ > 0) close(fd_);
}
int getFd() const { return fd_; }
bool isClosed() const { return isClosed_; }
void setClosed() { isClosed_ = true; }
Buffer& inputBuffer() { return inBuf_; }
Buffer& outputBuffer() { return outBuf_; }
// 处理业务逻辑 (示例:Echo Server)
void handleRequest() {
// 简单 Echo 逻辑:收到什么发回什么
size_t n = inBuf_.readableBytes();
if (n > 0) {
outBuf_.append(inBuf_.peek(), n);
inBuf_.retrieveAll();
}
}
private:
int fd_;
bool isClosed_;
Buffer inBuf_; // 接收缓冲区
Buffer outBuf_; // 发送缓冲区
};
// ================= 3. Epoll 服务器核心 (EpollServer) =================
class EpollServer {
public:
EpollServer() : epfd_(-1), listenfd_(-1) {}
~EpollServer() {
if (epfd_ >= 0) close(epfd_);
if (listenfd_ >= 0) close(listenfd_);
}
bool init(uint16_t port) {
// 1. 创建监听 socket
listenfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd_ < 0) { LOG_ERROR("socket create error"); return false; }
// 2. 地址复用
int opt = 1;
setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 3. 绑定
struct sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(listenfd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
LOG_ERROR("bind error"); return false;
}
// 4. 监听
if (listen(listenfd_, 1024) < 0) {
LOG_ERROR("listen error"); return false;
}
// 5. 设置监听 fd 为非阻塞 (ET 模式必须)
setNonBlocking(listenfd_);
// 6. 创建 epoll
epfd_ = epoll_create1(0);
if (epfd_ < 0) { LOG_ERROR("epoll_create error"); return false; }
// 7. 将监听 fd 加入 epoll (监听 fd 通常用 LT 或 ET 均可,这里统一用 ET)
addFd(listenfd_, EPOLLIN | EPOLLET);
LOG_INFO("Server started on port %d", port);
return true;
}
void start() {
struct epoll_event events[MAX_EVENTS];
while (true) {
// 8. 等待事件
int nfds = epoll_wait(epfd_, events, MAX_EVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) continue; // 被信号中断,继续
LOG_ERROR("epoll_wait error");
break;
}
for (int i = 0; i < nfds; ++i) {
int curFd = events[i].data.fd;
uint32_t revents = events[i].events;
if (curFd == listenfd_) {
handleAccept();
} else if (revents & EPOLLIN) {
handleRead(curFd);
} else if (revents & EPOLLOUT) {
handleWrite(curFd);
} else if (revents & (EPOLLHUP | EPOLLERR)) {
closeConnection(curFd);
}
}
}
}
private:
// 设置非阻塞
void setNonBlocking(int fd) {
int flag = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
// 添加/修改 epoll 事件
void modFd(int fd, uint32_t events) {
struct epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);
}
void addFd(int fd, uint32_t events) {
struct epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
}
void delFd(int fd) {
epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);
}
// 处理新连接
void handleAccept() {
struct sockaddr_in clientAddr{};
socklen_t len = sizeof(clientAddr);
// ET 模式 accept 也必须循环,直到返回 EAGAIN
while (true) {
int connFd = accept(listenfd_, (struct sockaddr*)&clientAddr, &len);
if (connFd < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 连接处理完毕
}
LOG_ERROR("accept error");
break;
}
// 设置新连接为非阻塞
setNonBlocking(connFd);
// 加入 epoll,监听读事件 + ET 模式
addFd(connFd, EPOLLIN | EPOLLET);
// 创建连接对象
connections_[connFd] = std::make_unique<Connection>(connFd);
LOG_INFO("New connection: fd=%d, IP=%s", connFd, inet_ntoa(clientAddr.sin_addr));
}
}
// 处理读事件 (ET 模式核心)
void handleRead(int fd) {
auto it = connections_.find(fd);
if (it == connections_.end()) return;
Connection* conn = it->second.get();
int saveErrno = 0;
// 循环读取,直到 EAGAIN
ssize_t n = conn->inputBuffer().readFd(fd, &saveErrno);
if (n < 0) {
// 真正错误
closeConnection(fd);
} else if (n == 0) {
// 对端关闭
closeConnection(fd);
} else {
// 有数据,处理业务
conn->handleRequest();
// 如果有数据要回写,且当前未监听 OUT 事件,则修改监听
if (conn->outputBuffer().readableBytes() > 0) {
// 注意:ET 模式下,如果 write 可能阻塞,必须监听 EPOLLOUT
modFd(fd, EPOLLIN | EPOLLOUT | EPOLLET);
handleWrite(fd); // 尝试立即发送
}
}
}
// 处理写事件
void handleWrite(int fd) {
auto it = connections_.find(fd);
if (it == connections_.end()) return;
Connection* conn = it->second.get();
int saveErrno = 0;
ssize_t n = conn->outputBuffer().writeFd(fd, &saveErrno);
if (n < 0) {
closeConnection(fd);
} else {
// 如果发送缓冲区空了,不再监听写事件,只监听读事件 (减少唤醒)
if (conn->outputBuffer().readableBytes() == 0) {
modFd(fd, EPOLLIN | EPOLLET);
}
}
}
// 关闭连接
void closeConnection(int fd) {
auto it = connections_.find(fd);
if (it != connections_.end()) {
LOG_INFO("Close connection: fd=%d", fd);
delFd(fd);
connections_.erase(it); // unique_ptr 自动析构,关闭 fd
}
}
private:
int epfd_;
int listenfd_;
std::unordered_map<int, std::unique_ptr<Connection>> connections_;
};
// ================= 4. 主函数 =================
int main() {
EpollServer server;
if (!server.init(LISTEN_PORT)) {
return -1;
}
server.start();
return 0;
}
更多推荐

所有评论(0)