epoll 深度剖析:高性能 IO 多路复用的终极方案

💬 开篇:终于到 epoll 了!如果说 select 是自行车、poll 是摩托车,那 epoll 就是高铁。它是 Linux 2.5.44 内核引入的,被公认为 Linux 下性能最好的 IO 多路复用机制,是 Nginx、Redis、Node.js 等明星项目的底层支柱。

为什么 epoll 这么快?因为它从设计上就彻底解决了 select/poll 的根本缺陷:用红黑树管理所有监控的 fd,用就绪队列存放就绪的 fd,用回调机制替代轮询——内核不需要"找"就绪的 fd,而是就绪的 fd 会主动"报到"。这一篇,我们从系统调用接口开始,深入内核数据结构,再到 LT/ET 两种工作模式,最后用代码实现完整的 epoll 服务器。

👍 点赞、收藏与分享:epoll 是 Linux 网络编程的必考知识点,也是高性能服务器开发的核心技能。

🚀 循序渐进:三个系统调用 → 内核原理 → LT vs ET → 为什么 ET 要非阻塞 → LT 版服务器 → ET 版服务器 → 性能对比。


一、epoll 的三个系统调用

1.1 epoll_create:创建 epoll 实例

#include <sys/epoll.h>

int epoll_create(int size);

epoll_create 创建一个 epoll 实例,返回一个文件描述符(epoll fd),用来标识这个 epoll 实例。

参数 size:自 Linux 2.6.8 之后,这个参数被忽略(历史原因保留),传任何正整数都行,通常传个 10 或者 1。

返回值:成功返回 epoll fd(非负整数),失败返回 -1。

int epfd = epoll_create(10);
if (epfd < 0) {
    perror("epoll_create");
    exit(1);
}
// epfd 就是这个 epoll 实例的"句柄"
// 用完记得 close(epfd)

类比epoll_create 就像开了一家"监控中心",epfd 是这家中心的"门牌号"。后续所有操作都用这个门牌号来找到这家中心。

注意:
现代 Linux 更推荐用 epoll_create1(EPOLL_CLOEXEC),避免 fd 泄漏到 exec 后的子进程。


1.2 epoll_ctl:注册/修改/删除监控事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl 用来管理 epoll 实例中的 fd——增加、修改或删除。

参数说明:

参数 说明
epfd epoll_create 返回的 epoll fd
op 操作类型:ADD / MOD / DEL
fd 要操作的目标文件描述符
event 感兴趣的事件(ADD 和 MOD 时必须提供,DEL 时传 NULL)

op 的取值:

EPOLL_CTL_ADD  // 将 fd 注册到 epoll 实例中,开始监控
EPOLL_CTL_MOD  // 修改已注册的 fd 的监控事件
EPOLL_CTL_DEL  // 从 epoll 实例中删除 fd,停止监控

epoll_event 结构体:

struct epoll_event {
    uint32_t events;   // 要监控的事件(位图)
    epoll_data_t data; // 用户自定义数据(回调时原样返回)
};

// epoll_data_t 是一个联合体
typedef union epoll_data {
    void        *ptr;  // 指向任意数据的指针
    int          fd;   // 文件描述符
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

events 的常用取值:

事件宏 含义
EPOLLIN 可读(有数据,或对端关闭,或新连接)
EPOLLOUT 可写(发送缓冲区有空间)
EPOLLPRI 带外数据可读
EPOLLERR 发生错误(不用手动设置,自动检测)
EPOLLHUP 对端关闭(不用手动设置,自动检测)
EPOLLET 设置为 ET(边缘触发)模式
EPOLLONESHOT 只触发一次,之后需要重新注册

使用示例:

struct epoll_event ev;
ev.events = EPOLLIN;      // 关注可读事件
ev.data.fd = new_fd;      // 保存 fd,epoll_wait 返回时能拿到它

// 将 new_fd 加入 epoll 监控
epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &ev);

// 不再监控 new_fd
epoll_ctl(epfd, EPOLL_CTL_DEL, new_fd, NULL);

1.3 epoll_wait:等待事件发生

int epoll_wait(int epfd, 
               struct epoll_event *events, 
               int maxevents, 
               int timeout);

epoll_wait 等待已注册的 fd 上有事件发生,直接返回就绪的 fd 列表(不需要你自己遍历找)。

参数说明:

参数 说明
epfd epoll_create 返回的 epoll fd
events 用户分配的数组,内核把就绪事件写入这里
maxevents events 数组的容量(最多返回多少个事件)
timeout 超时时间(毫秒),-1 为永久阻塞,0 为立即返回

返回值:

int n = epoll_wait(epfd, events, maxevents, timeout);
// n > 0:就绪的事件数量(events[0..n-1] 有效)
// n == 0:超时
// n < 0:出错

重要细节:events 数组只有前 n 个元素有效!

struct epoll_event events[1000];  // 最多返回 1000 个
int n = epoll_wait(epfd, events, 1000, -1);

// ✅ 正确:只遍历到 n
for (int i = 0; i < n; i++) {
    int fd = events[i].data.fd;
    // 处理 fd...
}

// ❌ 错误:遍历到 1000,大量无效元素
for (int i = 0; i < 1000; i++) { ... }

1.4 三个调用的组合使用模板

// 第一步:创建 epoll 实例
int epfd = epoll_create(10);

// 第二步:注册要监控的 fd
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

// 第三步:循环等待事件
struct epoll_event events[1024];
for (;;) {
    int n = epoll_wait(epfd, events, 1024, -1);
    for (int i = 0; i < n; i++) {
        // 处理 events[i]...
        int fd = events[i].data.fd;
        if (events[i].events & EPOLLIN) {
            // fd 可读
        }
    }
}

// 用完关闭
close(epfd);

二、epoll 的工作原理:红黑树 + 就绪队列 + 回调

2.1 内核数据结构

当你调用 epoll_create,Linux 内核创建了一个 eventpoll 结构体:

struct eventpoll {
    // ...
    struct rb_root rbr;        // 红黑树根节点:存储所有被监控的 fd
    struct list_head rdlist;   // 就绪链表:存储当前已就绪的 fd
    // ...
};

两个核心成员:

红黑树(Red-Black Tree):存储所有通过 epoll_ctl 注册的 fd 信息。

                  [fd=5]
                 /      \
            [fd=3]      [fd=8]
           /     \     /     \
        [fd=1]  [fd=4][fd=7] [fd=9]

        红黑树:O(log n) 的插入/删除/查找
        注册 fd 时插入,注销时删除

就绪链表(Ready List):存储当前已经就绪(有事件待处理)的 fd。

就绪链表(双向链表):
[fd=3] <-> [fd=7] <-> [fd=9]
当前这三个 fd 有数据可读

2.2 回调机制:数据怎么进就绪链表的?

这是 epoll 高效的关键所在!

当 epoll_ctl(ADD) 注册 fd 时,内核会把 epitem 挂到该 fd 对应的等待队列(waitqueue)上。
当这个 fd 的状态发生变化(变得可读/可写/异常)时,内核在唤醒等待队列的过程中触发 ep_poll_callback,将 epitem 加入 rdlist 就绪链表。

struct epitem {
    struct rb_node rbn;         // 红黑树节点(在红黑树中的位置)
    struct list_head rdllink;   // 链表节点(在就绪链表中的位置)
    struct epoll_filefd ffd;    // 事件句柄信息(fd + file 指针)
    struct eventpoll *ep;       // 指向所属的 eventpoll 实例
    struct epoll_event event;   // 用户设置的感兴趣事件
};

2.3 epoll_wait 的工作过程

调用 epoll_wait
  ↓
检查 rdlist(就绪链表)
  ↓
空?→ 阻塞等待(或超时返回)
  ↓
不空?→ 把 rdlist 中的事件拷贝到用户传入的 events 数组
  ↓
返回就绪事件数量

关键:epoll_wait 的工作是 O(k) 的(直接看就绪链表),不需要遍历所有 fd!


2.4 select/poll/epoll 工作原理对比

select/poll:
  用户注册 fd → 每次全量拷贝到内核
  内核等待 → 有事件发生时,扫描所有 fd(O(n))→ 返回

epoll:
  用户注册 fd(epoll_ctl)→ 内核红黑树存储,一次注册永久有效
  有事件发生 → 驱动回调 ep_poll_callback → fd 进入就绪链表
  epoll_wait → 直接取就绪链表 → 返回(O(k))

一句话总结:select/poll 是"主动轮询",epoll 是"被动通知"。


三、epoll 的优点(对应 select 的缺点)

select 缺点 epoll 解决方案
fd 数量上限 1024 无上限,取决于系统最大可打开文件数
每次全量拷贝 fd_set epoll_ctl 只在注册/修改时拷贝,不频繁
O(n) 遍历查找就绪 fd 回调机制 + 就绪链表,O(k) 获取就绪事件
每次要手动重建集合 内核红黑树维护,一次注册长期有效

四、LT 和 ET:两种工作模式

4.1 形象比喻

你正在打游戏,快进决赛圈了。你妈饭做好了,喊你去吃:

水平触发(LT,Level Triggered)

你妈:吃饭了!
你:等会儿(没动)
你妈:吃饭了!(继续喊)
你妈:吃饭了!(还在喊)
...
只要饭没吃,你妈就一直喊
亲妈模式 ❤️

边缘触发(ET,Edge Triggered)

你妈:吃饭了!(只喊一次)
你:等会儿(没动)
你妈:(不管你了,爱吃不吃)
后妈模式 💔

4.2 技术层面的 LT 和 ET

假设服务器收到了 2KB 数据,你调用 epoll_wait 后,只 read 了 1KB,还有 1KB 在缓冲区里:

LT 模式(默认)

第一次 epoll_wait:返回!fd 就绪(有 2KB 数据)
你 read 了 1KB
第二次 epoll_wait:还是返回!因为缓冲区还有 1KB
只要缓冲区有数据,epoll_wait 就一直通知你

ET 模式(加了 EPOLLET 标志)

第一次 epoll_wait:返回!fd 就绪(有 2KB 数据)
你只 read 了 1KB(还有 1KB 在缓冲区)
第二次 epoll_wait:不返回!
即使缓冲区有数据,只要没有新的数据来,epoll_wait 不再通知
剩下的 1KB 你可能永远看不到了(直到有新数据触发新的边缘)

4.3 ET 模式的正确使用:非阻塞 + 循环读完

ET 的问题:一次只通知,如果没读完,剩余数据就没了。

解决方案:ET 模式下,收到通知后,必须用非阻塞 IO 循环读取,直到 EAGAIN,保证把所有数据都读完。

// ET 模式下正确的读取方式
void ReadAll(int fd, std::string& buf) {
    char tmp[4096];
    for (;;) {
        ssize_t n = recv(fd, tmp, sizeof(tmp) - 1, 0);
        
        if (n > 0) {
            tmp[n] = '\0';
            buf += tmp;
            // 继续读,直到 EAGAIN
        } else if (n == 0) {
            // 对端关闭连接
            break;
        } else {
            // n < 0
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 缓冲区已空,本次数据读完了
                break;
            } else if (errno == EINTR) {
                // 被信号打断,重试
                continue;
            } else {
                // 真正的错误
                perror("recv");
                break;
            }
        }
    }
}

为什么 ET 必须非阻塞?

假设用阻塞 read,每次只读 1KB:

  • 服务器有 10KB 数据
  • ET 通知来了,你 read 1KB,剩 9KB
  • ET 不再通知了(没有新边缘)
    剩余数据一直要等下次更新才能读

ET 模式不是“读不完就丢”,而是“状态没变化就不再通知”。
如果你收到一次 EPOLLIN 只读了一部分,缓冲区仍然有数据,但没有新的数据到来,fd 的“可读状态”并未发生新的边缘变化,因此后续 epoll_wait 可能不会再返回这个 fd——结果就是剩余数据滞留在内核缓冲区,应用层表现为卡住。
所以 ET 的正确实现是:socket 必须非阻塞,并在一次事件回调里循环读到 EAGAIN/EWOULDBLOCK,把缓冲区读空。


4.4 LT vs ET 对比与选择

比较项 LT(水平触发) ET(边缘触发)
默认? 是(epoll 默认) 否(需加 EPOLLET)
通知时机 只要就绪就通知(可重复) 状态变化时通知(只触发一次)
是否需要非阻塞 不强制 必须使用非阻塞 IO
epoll_wait 触发次数 多(每次有数据都触发) 少(仅状态变化时触发)
代码复杂度 高(需处理非阻塞循环读写)
性能 稍低(更多次 epoll_wait 返回) 稍高(减少 epoll_wait 返回次数)
适用场景 一般场景,代码简单 高性能场景,Nginx 默认使用 ET
select/poll 对比 select/poll 也是 LT epoll 独有

重要澄清:ET 性能比 LT 高,但差距没有你想的那么大。如果 LT 模式下每次 epoll_wait 返回后立刻把所有数据处理完,那性能和 ET 其实差不多。ET 的真正意义在于减少 epoll_wait 的唤醒次数,对高频小数据场景有优化。


五、LT 模式 epoll 服务器实现

5.1 Epoll 封装类

// epoll_lt_server.hpp
#pragma once
#include <vector>
#include <functional>
#include <unordered_map>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>

// 业务处理函数类型
typedef std::function<void(const std::string& req, std::string* resp)> Handler;

/**
 * TcpSocket:基础 TCP 套接字封装
 */
class TcpSocket {
public:
    TcpSocket(int fd = -1) : fd_(fd) {}
    
    int GetFd() const { return fd_; }
    
    bool Socket() {
        fd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (fd_ < 0) { perror("socket"); return false; }
        int opt = 1;
        setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
        return true;
    }
    
    bool Bind(const std::string& ip, uint16_t port) {
        struct sockaddr_in addr = {};
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        if (bind(fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
            perror("bind"); return false;
        }
        return true;
    }
    
    bool Listen(int backlog) {
        if (listen(fd_, backlog) < 0) { perror("listen"); return false; }
        return true;
    }
    
    bool Accept(TcpSocket* new_sock, std::string* ip = nullptr, uint16_t* port = nullptr) {
        struct sockaddr_in client = {};
        socklen_t len = sizeof(client);
        int new_fd = accept(fd_, (struct sockaddr*)&client, &len);
        if (new_fd < 0) { perror("accept"); return false; }
        new_sock->fd_ = new_fd;
        if (ip) *ip = inet_ntoa(client.sin_addr);
        if (port) *port = ntohs(client.sin_port);
        return true;
    }
    
    bool Recv(std::string* buf) const {
        char tmp[4096] = {0};
        ssize_t n = recv(fd_, tmp, sizeof(tmp) - 1, 0);
        if (n <= 0) return false;
        tmp[n] = '\0';
        *buf = tmp;
        return true;
    }
    
    bool Send(const std::string& buf) const {
        ssize_t n = send(fd_, buf.c_str(), buf.size(), 0);
        return n > 0;
    }
    
    void Close() {
        if (fd_ >= 0) { close(fd_); fd_ = -1; }
    }

private:
    int fd_;
};

/**
 * Epoll:对 epoll 三个系统调用的封装(LT 模式)
 */
class Epoll {
public:
    Epoll() {
        epoll_fd_ = epoll_create(10);
        if (epoll_fd_ < 0) {
            perror("epoll_create");
            exit(1);
        }
    }
    
    ~Epoll() {
        if (epoll_fd_ >= 0) close(epoll_fd_);
    }
    
    /**
     * 将 socket 加入 epoll 监控(LT 模式,默认)
     */
    bool Add(const TcpSocket& sock) const {
        int fd = sock.GetFd();
        printf("[Epoll::Add] fd = %d\n", fd);
        
        struct epoll_event ev;
        ev.events = EPOLLIN;        // 关注可读事件(LT 模式,不加 EPOLLET)
        ev.data.fd = fd;            // 保存 fd,epoll_wait 返回时用
        
        int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
        if (ret < 0) { perror("epoll_ctl ADD"); return false; }
        return true;
    }
    
    /**
     * 将 socket 从 epoll 监控中删除
     */
    bool Del(const TcpSocket& sock) const {
        int fd = sock.GetFd();
        printf("[Epoll::Del] fd = %d\n", fd);
        
        // Linux 2.6.9 之后,DEL 操作的第四个参数可以传 NULL
        int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
        if (ret < 0) { perror("epoll_ctl DEL"); return false; }
        return true;
    }
    
    /**
     * 等待就绪事件
     * 返回就绪的 TcpSocket 列表
     */
    bool Wait(std::vector<TcpSocket>* output) const {
        output->clear();
        
        struct epoll_event events[1000];
        // -1:永久阻塞,直到有事件就绪
        int n = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
        
        if (n < 0) { perror("epoll_wait"); return false; }
        
        // 注意!只遍历到 n,events[n..] 是无效数据
        for (int i = 0; i < n; ++i) {
            TcpSocket sock(events[i].data.fd);
            output->push_back(sock);
        }
        
        return true;
    }

private:
    int epoll_fd_;
};

/**
 * TcpEpollServer(LT 模式)
 */
class TcpEpollServer {
public:
    TcpEpollServer(const std::string& ip, uint16_t port)
        : ip_(ip), port_(port) {}

    bool Start(Handler handler) {
        TcpSocket listen_sock;
        if (!listen_sock.Socket()) return false;
        if (!listen_sock.Bind(ip_, port_)) return false;
        if (!listen_sock.Listen(5)) return false;
        
        printf("[LT Server] 启动,监听 %s:%d\n", ip_.c_str(), port_);

        Epoll epoll;
        epoll.Add(listen_sock);

        for (;;) {
            std::vector<TcpSocket> ready;
            if (!epoll.Wait(&ready)) continue;

            for (size_t i = 0; i < ready.size(); ++i) {
                if (ready[i].GetFd() == listen_sock.GetFd()) {
                    // 有新连接
                    TcpSocket new_sock;
                    std::string client_ip;
                    uint16_t client_port;
                    if (!listen_sock.Accept(&new_sock, &client_ip, &client_port)) continue;
                    
                    printf("[LT Server] 新连接:%s:%d fd=%d\n", 
                           client_ip.c_str(), client_port, new_sock.GetFd());
                    epoll.Add(new_sock);
                    
                } else {
                    // 有数据可读
                    std::string req, resp;
                    if (!ready[i].Recv(&req)) {
                        printf("[LT Server] 客户端断开 fd=%d\n", ready[i].GetFd());
                        epoll.Del(ready[i]);
                        ready[i].Close();
                        continue;
                    }
                    
                    printf("[LT Server] 收到请求:%s\n", req.c_str());
                    handler(req, &resp);
                    ready[i].Send(resp);
                }
            }
        }
        
        return true;
    }

private:
    std::string ip_;
    uint16_t port_;
};

六、ET 模式 epoll 服务器实现

6.1 在 LT 基础上的修改点

ET 模式相比 LT,需要修改两处:

  1. 注册时加上 EPOLLET 标志
  2. 读写都改为非阻塞循环
// epoll_et_server.hpp
#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <cerrno>
#include <cstdio>
#include <cstring>

// TcpSocket 的 ET 版本,新增非阻塞读写
class TcpSocketET : public TcpSocket {
public:
    TcpSocketET(int fd = -1) : TcpSocket(fd) {}
    
    /**
     * 将这个 socket 设置为非阻塞模式
     */
    bool SetNonBlock() {
        int fl = fcntl(GetFd(), F_GETFL);
        if (fl < 0) { perror("fcntl F_GETFL"); return false; }
        
        int ret = fcntl(GetFd(), F_SETFL, fl | O_NONBLOCK);
        if (ret < 0) { perror("fcntl F_SETFL"); return false; }
        
        return true;
    }
    
    /**
     * 非阻塞接收:循环读取直到 EAGAIN,确保读完所有数据
     * ET 模式必须这样写!
     */
    bool RecvNonBlock(std::string* buf) const {
        buf->clear();
        char tmp[1024 * 10] = {0};
        
        for (;;) {
            ssize_t n = recv(GetFd(), tmp, sizeof(tmp) - 1, 0);
            
            if (n > 0) {
                tmp[n] = '\0';
                *buf += tmp;
                // 如果读到的数据比缓冲区小,说明已经读完了
                if (n < (ssize_t)(sizeof(tmp) - 1)) break;
                // 否则可能还有数据,继续读
            } else if (n == 0) {
                // 对端关闭
                return false;
            } else {
                // n < 0
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 缓冲区已空,读完了,这是正常情况
                    break;
                } else if (errno == EINTR) {
                    continue;  // 被信号中断,重试
                } else {
                    perror("recv");
                    return false;
                }
            }
        }
        
        return true;
    }
    
    /**
     * 非阻塞发送:循环发送直到全部发完
     * 如果发送缓冲区满了,等待再发,为了演示简化这里直接 busy retry;生产环境应配合 EPOLLOUT + outbuffer。
     */
    bool SendNonBlock(const std::string& buf) const {
        ssize_t cur_pos = 0;
        ssize_t left = buf.size();
        
        for (;;) {
            ssize_t n = send(GetFd(), buf.data() + cur_pos, left, 0);
            
            if (n >= 0) {
                cur_pos += n;
                left -= n;
                if (left <= 0) break;  // 全部发完
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    // 发送缓冲区满了,等一会儿再发
                    continue;
                } else if (errno == EINTR) {
                    continue;
                } else {
                    perror("send");
                    return false;
                }
            }
        }
        
        return true;
    }
};

/**
 * ET 模式的 Epoll 封装
 */
class EpollET {
public:
    EpollET() {
        epoll_fd_ = epoll_create(10);
        if (epoll_fd_ < 0) { perror("epoll_create"); exit(1); }
    }
    
    ~EpollET() { if (epoll_fd_ >= 0) close(epoll_fd_); }
    
    /**
     * 加入 epoll 监控
     * @param use_et 是否使用 ET 模式
     */
    bool Add(int fd, bool use_et = false) const {
        printf("[EpollET::Add] fd=%d, ET=%s\n", fd, use_et ? "yes" : "no");
        
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = EPOLLIN;
        if (use_et) {
            ev.events |= EPOLLET;  // 加上 ET 标志
        }
        
        int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
        if (ret < 0) { perror("epoll_ctl ADD"); return false; }
        return true;
    }
    
    bool Del(int fd) const {
        printf("[EpollET::Del] fd=%d\n", fd);
        int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
        if (ret < 0) { perror("epoll_ctl DEL"); return false; }
        return true;
    }
    
    /**
     * 等待就绪事件,返回就绪的 fd 列表
     */
    bool Wait(std::vector<int>* output) const {
        output->clear();
        
        struct epoll_event events[1000];
        int n = epoll_wait(epoll_fd_, events, 1000, -1);
        
        if (n < 0) { perror("epoll_wait"); return false; }
        
        for (int i = 0; i < n; ++i) {
            output->push_back(events[i].data.fd);
        }
        
        return true;
    }

private:
    int epoll_fd_;
};

/**
 * ET 模式的 TCP Epoll 服务器
 * 注意:listen_sock 用 LT 模式(简化实现),new_sock 用 ET 模式
 */
class TcpEpollServerET {
public:
    TcpEpollServerET(const std::string& ip, uint16_t port)
        : ip_(ip), port_(port) {}

    bool Start(std::function<void(const std::string&, std::string*)> handler) {
        // 1. 创建并初始化监听 socket
        listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
        int opt = 1;
        setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
        
        struct sockaddr_in addr = {};
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port_);
        addr.sin_addr.s_addr = inet_addr(ip_.c_str());
        
        bind(listen_fd_, (struct sockaddr*)&addr, sizeof(addr));
        listen(listen_fd_, 5);
        
        printf("[ET Server] 启动,监听 %s:%d\n", ip_.c_str(), port_);
        
        // 2. 创建 Epoll,将 listen_sock 以 LT 模式加入
        //    注意:listen_sock 不用 ET,因为如果用 ET,大量连接同时到来时,
        //    只会触发一次,可能丢失连接
        EpollET epoll;
        epoll.Add(listen_fd_, false);  // listen_sock: LT 模式
        
        // 3. 事件循环
        for (;;) {
            std::vector<int> ready;
            if (!epoll.Wait(&ready)) continue;
            
            for (int fd : ready) {
                if (fd == listen_fd_) {
                    // 有新连接
                    struct sockaddr_in client = {};
                    socklen_t len = sizeof(client);
                    int new_fd = accept(listen_fd_, (struct sockaddr*)&client, &len);
                    
                    if (new_fd < 0) { perror("accept"); continue; }
                    
                    printf("[ET Server] 新连接 fd=%d, from %s:%d\n",
                           new_fd, inet_ntoa(client.sin_addr), ntohs(client.sin_port));
                    
                    // 关键:new_sock 设为非阻塞,以 ET 模式加入 epoll
                    TcpSocketET new_sock(new_fd);
                    new_sock.SetNonBlock();           // 必须设为非阻塞!!
                    epoll.Add(new_fd, true);          // ET 模式加入 epoll
                    
                } else {
                    // 有数据可读(ET 模式,必须非阻塞循环读完)
                    TcpSocketET client_sock(fd);
                    std::string req, resp;
                    
                    if (!client_sock.RecvNonBlock(&req)) {
                        // 读取失败或对端关闭
                        printf("[ET Server] 客户端断开 fd=%d\n", fd);
                        epoll.Del(fd);
                        close(fd);
                        continue;
                    }
                    
                    printf("[ET Server] fd=%d 收到:%s\n", fd, req.c_str());
                    
                    handler(req, &resp);
                    client_sock.SendNonBlock(resp);
                    
                    printf("[ET Server] fd=%d 响应:%s\n", fd, resp.c_str());
                }
            }
        }
        
        return true;
    }

private:
    std::string ip_;
    uint16_t port_;
    int listen_fd_;
};

七、选择 LT 还是 ET?

7.1 决策树

需要用 epoll?
  ↓
代码复杂度优先(简单优先)?
  → LT 模式(更容易写对)

性能优先,且连接非常活跃(高频短消息)?
  → ET 模式(减少 epoll_wait 触发次数)

参考主流框架的选择?
  Nginx → ET 模式
  Redis → LT 模式(性能够用,代码简单)
  Node.js → LT 模式

7.2 ET 的潜在陷阱

陷阱 1:忘记设置非阻塞

// ❌ ET 模式下如果还是阻塞 fd:
// - 你只 read 一次可能读不空 → 剩余数据滞留,但 ET 不再通知
// - 如果你写循环 read 想“读空”,读到缓冲区空那一下会阻塞等待新数据 → 线程被卡住
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);  // ev.events = EPOLLIN | EPOLLET
ssize_t n = read(fd, buf, sizeof(buf));   

陷阱 2:ET 下 listen_sock 也设置了 ET

// ❌ listen_sock 设置 ET,大量连接同时到来时:
// - ET 只通知一次
// - 你只 accept 一次
// - 剩余连接永远不被处理
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

// ✅ 正确:listen_sock 用 LT,或者 ET 下用非阻塞循环 accept
// 方式1:listen_sock 不加 EPOLLET(保持 LT)
ev.events = EPOLLIN;  // 不加 EPOLLET

// 方式2:ET 下非阻塞循环 accept
while (true) {
    int new_fd = accept(listen_fd, ...);
    if (new_fd < 0) {
        if (errno == EAGAIN) break;  // 没有更多连接了
        break;
    }
    // 处理新连接...
}

陷阱 3:ET 写就绪的处理

// ET 下,发送缓冲区变成"可写"时只通知一次
// 如果没发完,不会再次通知
// 需要在发送未完成时,注册 EPOLLOUT 事件
// 发完后,关闭 EPOLLOUT 监控(避免频繁触发)

// 如果 outbuffer 没发完:
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);  // 修改,加上 EPOLLOUT

// 等 EPOLLOUT 触发,继续发剩余数据
// 发完后取消 EPOLLOUT:
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

八、epoll 的使用场景与性能分析

8.1 最适合 epoll 的场景

epoll 高性能的前提:
  连接数多 且 活跃连接比例低

比如:
  Web 服务器:10000 个连接,每次只有 100 个活跃
  IM 服务器:百万连接,大部分时间只是保持心跳
  这种场景下,epoll 完虐 select/poll

不适合 epoll 的场景:
  连接数少(比如只有 10 个服务端)
  几乎所有连接都很活跃(比如内部 RPC)
  这种场景下,select/poll 和 epoll 差距不大

九、关于"epoll 使用内存映射"的常见误解

9.1 网上的错误说法

很多博客说:

“epoll 使用 mmap 内存映射,内核直接将就绪队列映射到用户态,避免了内存拷贝。”

这个说法是不准确的!

9.2 正确的理解

我们调用 epoll_wait 时传入的 struct epoll_event events[] 数组是在用户空间分配的内存

struct epoll_event events[1000];  // 用户空间
int n = epoll_wait(epfd, events, 1000, -1);
// 内核把就绪事件从内核空间拷贝到 events 数组(用户空间)

内核确实要拷贝数据到用户空间,并没有用 mmap 绕过拷贝。

epoll 的效率来自于:

  1. 只拷贝就绪的 fd 而不是所有 fd(O(就绪数) vs O(总fd数))
  2. 回调机制,不需要轮询

十、epoll 惊群问题(面试加分项)

10.1 什么是惊群问题?

多进程/多线程服务器,多个进程/线程共享同一个 epoll fd,同时等待:

进程1:epoll_wait(epfd, ...)
进程2:epoll_wait(epfd, ...)
进程3:epoll_wait(epfd, ...)

一个连接来了:
  → 内核唤醒所有进程/线程
  → 只有一个进程/线程能 accept 成功
  → 其他进程白白被唤醒,浪费资源

这就是"惊群"——一个事件惊动了所有在睡觉的进程。

10.2 解决方案

// 方案1:EPOLLEXCLUSIVE(Linux 4.5+)
// 使用排他性唤醒,只唤醒一个进程
ev.events = EPOLLIN | EPOLLEXCLUSIVE;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

// 方案2:EPOLLONESHOT
// 每个 fd 只触发一次,处理完后需要重新注册
ev.events = EPOLLIN | EPOLLONESHOT;

// 方案3:Nginx 的解决方案
// 用互斥锁控制哪个 worker 持有 accept 权限
// 同一时刻只有一个 worker 的 epoll 监控 listen_fd

十一、select / poll / epoll 终极对比

比较维度 select poll epoll
接口 三个 fd_set,使用麻烦 pollfd 数组,比 select 好用 三个系统调用,功能分离,最清晰
fd 上限 1024(FD_SETSIZE) 无限制(受系统限制) 无限制(受系统限制)
内存拷贝 每次全量拷贝 fd_set 每次全量拷贝 pollfd 数组 只在 epoll_ctl 时拷贝,不频繁
查找就绪 O(n) 遍历 O(n) 遍历 O(k),就绪链表直接取
工作模式 LT only LT only LT + ET
适用连接数 < 100 < 1000 万级以上
跨平台 全平台 类 Unix Linux 专属
典型应用 教学用 中等规模 Nginx / Redis / Node.js

十二、总结

12.1 核心要点

# 要点 关键细节
1 三个系统调用 create(创建)→ ctl(注册/修改/删除)→ wait(等待)
2 红黑树 + 就绪链表 红黑树管理所有 fd,就绪链表存放就绪 fd
3 回调机制 数据到来时驱动回调,fd 自动进就绪链表
4 LT vs ET LT 重复通知,ET 一次通知;ET 必须非阻塞
5 epoll_wait 遍历到 n 只有 [0, n) 是有效的就绪事件,不要多遍历

12.2 面试答题模板

Q:说说 epoll 和 select 的区别?

select 的三个缺陷:① fd 数量上限 1024;② 每次调用全量拷贝 fd_set 到内核;③ 返回后需要 O(n) 遍历找就绪 fd。

epoll 的解决方案:① 内核红黑树维护所有注册 fd,无数量限制;② epoll_ctl 只在注册时拷贝,不频繁;③ 驱动回调机制,就绪 fd 自动进就绪链表,epoll_wait 直接取,O(k)。

同时 epoll 支持 LT 和 ET 两种工作模式,ET 模式下需配合非阻塞 IO 使用,性能更高。


💬 总结:epoll 是 Linux IO 多路复用的顶点,红黑树 + 就绪链表 + 回调机制的组合彻底解决了 select/poll 的性能瓶颈。LT 和 ET 两种工作模式各有适用场景,ET 虽然高效,但对代码要求更高(必须非阻塞循环读写)。理解了 epoll 的工作原理,你就真正理解了为什么 Nginx 能支撑百万并发。下一篇,我们在 epoll 之上搭建 Reactor 反应堆模式——这是工程级高性能服务器的架构基础,把 epoll 的能力发挥到极致。

👍 点赞、收藏与分享:epoll 是后端和系统开发的必备知识。搞懂了这篇,下一篇 Reactor 会顺理成章!💪🚀

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐