1. epoll 基础

1.1 核心 API

1.1.1 创建 epoll 树 (epoll_create / epoll_create1)

   // 1. 创建epoll树节点epfd
    /* 方法1:
    * @function: int epoll_create(int size);
    * @param: size: 内核监听的最大数量, 只要大于0默认1024
    * @return: 成功返回epoll模型的文件描述符, -1 失败
    */
    int m_epfd = epoll_create(1024);
    if(m_epfd < 0){
		LOG_ERROR("epoll_create() error: %s", strerror(errno));
        std::exit(-1);
    }

    /* 方法2:
    * @function: int epoll_create1(int flags);
    * @param: flags: 标志位:
    *              - 0:默认行为,等同于 epoll_create()
    *              - EPOLL_CLOEXEC:子进程执行exec时关闭epoll模型的文件描述符
    * @return: 成功返回epoll模型的文件描述符, -1 失败
    */
    int epoll_create1(int flags);
  • 功能:在内核中创建一棵红黑树,用于存储需要监控的文件描述符。
  • 函数对比
    • oll_create1(int flags)内核监听数量(现内核忽略此值,只要>0 即可),返回 epoll 文件描述符 (epfd)。
    • epoll_create1(int flags):新版本接口,推荐使用。
      • flags = 0:等同于 epoll_create。
      • flags = EPOLL_CLOEXEC:安全推荐。当进程执行 exec 加载新程序时,自动关闭该 fd,防止 fd 泄露给子进程。
  • 返回值:成功返回 epfd,失败返回 -1。

1.1.2 管理 epoll 节点 (epoll_ctl)

 // 2. 向epoll树上添加节点 lfd
    /*
    * @function: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    * @param: epfd: epoll树的文件描述符
    * @param: op: 操作类型(EPOLL_CTL_ADD:添加、EPOLL_CTL_DEL:删除、EPOLL_CTL_MOD:修改)
    * @param: fd: 要操作的文件描述符(要添加/删除/修改的节点)
    * @param: event: 事件结构体指针(指向struct epoll_event结构体), 设置fd对应的事件
    * @return: 0 成功, -1 失败
    */
    /*
    *   struct epoll_event {
    *       uint32_t events; 
    *       epoll_data_t data;
    *   };
    *       events:
    *           EPOLLIN: 读事件
    *           EPOLLOUT: 写事件
    *
    *       【触发模式说明】
    *       epoll 默认使用 **水平触发(Level-Triggered, LT)** 模式。
    *       - LT 模式:只要文件描述符上有未处理完的事件(例如读缓冲区还有数据),epoll_wait 就会持续返回该 fd。
    *                  编程简单,但可能因频繁唤醒造成性能开销。
    *
    *       若显式指定 **EPOLLET** 标志,则启用 **边沿触发(Edge-Triggered, ET)** 模式。
    *       - ET 模式:仅在 fd 状态发生变化时(如从无数据到有数据)通知一次。
    *                  要求应用程序必须一次性读/写完所有数据(通常配合非阻塞 I/O + 循环 read/write),
    *                  否则可能丢失事件(因为后续 epoll_wait 不会再次提醒,直到下一次状态变化)。
    *                  性能更高,但编程复杂度增加。
    *
    *       【使用建议】
    *       - 监听套接字(lfd)通常使用 LT 模式(默认),因为 accept 一般能立即处理完新连接。
    *       - 客户端连接套接字(cfd)常使用 ET + 非阻塞 I/O,以实现高性能事件驱动(如 muduo、Nginx 等)。
    *
    *       typedef union epoll_data {
    *           void *ptr;  // 当 epoll_wait返回事件时,可以通过 ptr直接获取到与该文件描述符(fd)关联的​​用户自定义对象​​(如 muduo 中的 Channel对象)
    *                       // 如果不使用 ptr,通常需要维护一个 fd → 对象的映射表来查找关联对象。ptr可以直接存储对象指针,省去查找开销。
    *           int fd;     // 常用
    *           uint32_t u32;
    *           uint64_t u64;
    *       } epoll_data_t; // 是个联合体,只能使用其中的一个成员,常用fd
    */
    int lfd = m_listener -> getfd(); //监听套接字
    struct epoll_event event;
    event.events = EPOLLIN;		     // 监听读事件(默认为 LT 模式)
    event.data.fd = lfd; 		     // 要监听的文件描述符
    int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event); // 将监听套接字添加到epoll树上
    if(ret < 0){
		LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
        std::exit(-1); 
    }
  • 功能:对 epoll 树进行增、删、改操作。
  • 参数详解
    • op:操作类型。
      • EPOLL_CTL_ADD:添加新 fd。
      • EPOLL_CTL_DEL:删除 fd。
      • EPOLL_CTL_MOD:修改 fd 的监听事件。
    • event:struct epoll_event 结构体,核心配置。
      • events:监听的事件类型(如 EPOLLIN 读就绪)。
      • data:联合体 (epoll_data_t),用于存储用户数据。
        • data.fd:最常用,存储 fd 本身。
        • data.ptr:高级用法。存储自定义对象指针(如 TcpConn*)。epoll_wait 返回时可直接获取对象,避免通过 fd 查表的开销(O(1) vs O(logN))。

1.1.3  等待事件 (epoll_wait)

 // 3. 检测
    struct epoll_event evs[1024];
    while(1){
        /*
        * @function: int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
        * @param: epfd: epoll模型的文件描述符
        * @param: evs: 传出参数, 事件结构体数组(指向struct epoll_event结构体)
        * @param: maxevents: 数组的大小
        * @param: timeout: 超时时间(-1:阻塞、0:非阻塞、>0:毫秒)
        * @return: 成功返回就绪的文件描述符数量, -1 失败
        */
        int num = epoll_wait(epfd, evs, sizeof(evs) / sizeof(evs[0]), -1);
        if(num < 0){
            LOG_ERROR("epoll_wait() error: %s", strerror(errno));
            std::exit(-1);
        }
        for(int i = 0; i < num; i++){
            int curfd = evs[i].data.fd;
            if(curfd == lfd){
                // 有新连接
				// 1. 接受新连接
				TcpConn* tcpConn = m_listener->acceptConn();
                if(!tcpConn){
                    // 建立连接失败, 重连
                    LOG_ERROR("acceptConn() error.");
                    continue;
                }
				LOG_INFO("检测到新的连接: %s:%d", tcpConn->getPeerIP().data(), tcpConn->getPeerPort());
				int cfd = tcpConn->getfd();

				// 2.设置 cfd 为非阻塞
				int flag = fcntl(cfd, F_GETFL);
				flag |= O_NONBLOCK;
				fcntl(cfd, F_SETFL, flag);

				// 3. 加入epoll树(边沿模式)
				struct epoll_event ev;
				ev.events = EPOLLIN | EPOLLET;  // 启用边沿触发(ET)模式
				ev.data.fd = cfd;
				int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
				if(ret < 0){
					LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
					std::exit(-1);
				}
				LOG_INFO("新连接已成功加入 epoll 树(ET 模式)");
            }else{
                // 有通信
                LOG_INFO("检测到通信");

                // 【重要】在 ET 模式下,必须循环读取直到返回 EAGAIN/EWOULDBLOCK,
                // 否则剩余数据将不会再次触发 epoll_wait!
                // 示例(此处仅为示意,实际需在对应处理函数中实现):
                // while (true) {
                //     ssize_t n = read(curfd, buf, sizeof(buf));
                //     if (n > 0) { /* 处理数据 */ }
                //     else if (n == 0) { /* 对端关闭 */ break; }
                //     else if (errno == EAGAIN || errno == EWOULDBLOCK) {
                //         break; // 数据已读完
                //     } else {
                //         /* 错误处理 */
                //         break;
                //     }
                // }
            }
        }
    }
  • 功能:阻塞等待,直到有注册的 fd 发生事件。
  • 参数
    • evs:输出参数,内核将就绪的事件填充到此数组。
    • maxevents:数组大小,即一次最多获取多少个事件。
    • timeout:
      • -1:永久阻塞(最常用)。
      • 0:非阻塞,立即返回。
      • >0:超时毫秒数。
  • 返回值:就绪的 fd 数量。

1.1 epoll 使用流程

    // 1. 创建epoll树节点epfd
    /* 方法1:
    * @function: int epoll_create(int size);
    * @param: size: 内核监听的最大数量, 只要大于0默认1024
    * @return: 成功返回epoll模型的文件描述符, -1 失败
    */
    int m_epfd = epoll_create(1024);
    if(m_epfd < 0){
		LOG_ERROR("epoll_create() error: %s", strerror(errno));
        std::exit(-1);
    }

    /* 方法2:
    * @function: int epoll_create1(int flags);
    * @param: flags: 标志位:
    *              - 0:默认行为,等同于 epoll_create()
    *              - EPOLL_CLOEXEC:子进程执行exec时关闭epoll模型的文件描述符
    * @return: 成功返回epoll模型的文件描述符, -1 失败
    */
    int epoll_create1(int flags);


    // 2. 向epoll树上添加节点 lfd
    /*
    * @function: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    * @param: epfd: epoll树的文件描述符
    * @param: op: 操作类型(EPOLL_CTL_ADD:添加、EPOLL_CTL_DEL:删除、EPOLL_CTL_MOD:修改)
    * @param: fd: 要操作的文件描述符(要添加/删除/修改的节点)
    * @param: event: 事件结构体指针(指向struct epoll_event结构体), 设置fd对应的事件
    * @return: 0 成功, -1 失败
    */
    /*
    *   struct epoll_event {
    *       uint32_t events; 
    *       epoll_data_t data;
    *   };
    *       events:
    *           EPOLLIN: 读事件
    *           EPOLLOUT: 写事件
    *
    *       【触发模式说明】
    *       epoll 默认使用 **水平触发(Level-Triggered, LT)** 模式。
    *       - LT 模式:只要文件描述符上有未处理完的事件(例如读缓冲区还有数据),epoll_wait 就会持续返回该 fd。
    *                  编程简单,但可能因频繁唤醒造成性能开销。
    *
    *       若显式指定 **EPOLLET** 标志,则启用 **边沿触发(Edge-Triggered, ET)** 模式。
    *       - ET 模式:仅在 fd 状态发生变化时(如从无数据到有数据)通知一次。
    *                  要求应用程序必须一次性读/写完所有数据(通常配合非阻塞 I/O + 循环 read/write),
    *                  否则可能丢失事件(因为后续 epoll_wait 不会再次提醒,直到下一次状态变化)。
    *                  性能更高,但编程复杂度增加。
    *
    *       【使用建议】
    *       - 监听套接字(lfd)通常使用 LT 模式(默认),因为 accept 一般能立即处理完新连接。
    *       - 客户端连接套接字(cfd)常使用 ET + 非阻塞 I/O,以实现高性能事件驱动(如 muduo、Nginx 等)。
    *
    *       typedef union epoll_data {
    *           void *ptr;  // 当 epoll_wait返回事件时,可以通过 ptr直接获取到与该文件描述符(fd)关联的​​用户自定义对象​​(如 muduo 中的 Channel对象)
    *                       // 如果不使用 ptr,通常需要维护一个 fd → 对象的映射表来查找关联对象。ptr可以直接存储对象指针,省去查找开销。
    *           int fd;     // 常用
    *           uint32_t u32;
    *           uint64_t u64;
    *       } epoll_data_t; // 是个联合体,只能使用其中的一个成员,常用fd
    */
    int lfd = m_listener -> getfd(); //监听套接字
    struct epoll_event event;
    event.events = EPOLLIN;		     // 监听读事件(默认为 LT 模式)
    event.data.fd = lfd; 		     // 要监听的文件描述符
    int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event); // 将监听套接字添加到epoll模型中
    if(ret < 0){
		LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
        std::exit(-1); 
    }

    // 3. 检测
    struct epoll_event evs[1024];
    while(1){
        /*
        * @function: int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
        * @param: epfd: epoll模型的文件描述符
        * @param: evs: 传出参数, 事件结构体数组(指向struct epoll_event结构体)
        * @param: maxevents: 数组的大小
        * @param: timeout: 超时时间(-1:阻塞、0:非阻塞、>0:毫秒)
        * @return: 成功返回就绪的文件描述符数量, -1 失败
        */
        int num = epoll_wait(epfd, evs, sizeof(evs) / sizeof(evs[0]), -1);
        if(num < 0){
            LOG_ERROR("epoll_wait() error: %s", strerror(errno));
            std::exit(-1);
        }
        for(int i = 0; i < num; i++){
            int curfd = evs[i].data.fd;
            if(curfd == lfd){
                // 有新连接
				// 1. 接受新连接
				TcpConn* tcpConn = m_listener->acceptConn();
                if(!tcpConn){
                    // 建立连接失败, 重连
                    LOG_ERROR("acceptConn() error.");
                    continue;
                }
				LOG_INFO("检测到新的连接: %s:%d", tcpConn->getPeerIP().data(), tcpConn->getPeerPort());
				int cfd = tcpConn->getfd();

				// 2.设置 cfd 为非阻塞
				int flag = fcntl(cfd, F_GETFL);
				flag |= O_NONBLOCK;
				fcntl(cfd, F_SETFL, flag);

				// 3. 加入epoll树(边沿模式)
				struct epoll_event ev;
				ev.events = EPOLLIN | EPOLLET;  // 启用边沿触发(ET)模式
				ev.data.fd = cfd;
				int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
				if(ret < 0){
					LOG_ERROR("epoll_ctl() error: %s", strerror(errno));
					std::exit(-1);
				}
				LOG_INFO("新连接已成功加入 epoll 树(ET 模式)");
            }else{
                // 有通信
                LOG_INFO("检测到通信");

                // 【重要】在 ET 模式下,必须循环读取直到返回 EAGAIN/EWOULDBLOCK,
                // 否则剩余数据将不会再次触发 epoll_wait!
                // 示例(此处仅为示意,实际需在对应处理函数中实现):
                // while (true) {
                //     ssize_t n = read(curfd, buf, sizeof(buf));
                //     if (n > 0) { /* 处理数据 */ }
                //     else if (n == 0) { /* 对端关闭 */ break; }
                //     else if (errno == EAGAIN || errno == EWOULDBLOCK) {
                //         break; // 数据已读完
                //     } else {
                //         /* 错误处理 */
                //         break;
                //     }
                // }
            }
        }
    }

1.2 LT 模式 vs ET 模式

特性

水平触发 (LT, Level-Triggered)

边沿触发 (ET, Edge-Triggered)

默认情况

epoll 默认模式

需显式添加 标志

触发条件

只要缓冲区有数据,每次 wait 都会通知

仅在状态变化时通知一次(如从无数据到有数据)

编程难度

低(读不完下次还会提醒)

高(必须一次性读完,否则数据会滞留)

性能

较高,但频繁唤醒可能有开销

最高,减少系统调用次数

代码中的应用

监听套接字 (lfd)

连接套接字 (cfd)

原因

accept 处理快,LT 足够且稳定

客户端数据量大,ET 减少唤醒,提升吞吐量

2. 高并发编程 

2.1 架构

基于 Linux Epoll + ET (Edge-Triggered) 模式 + 非阻塞 IO 构建,采用 Reactor 设计模式

  • 高并发:单线程即可处理成千上万个连接。
  • 高性能:利用 ET 模式减少内核态与用户态的切换次数。
  • 稳定性:完善的缓冲区管理,处理 TCP 粘包/拆包及背压(Backpressure)。

2.2 核心模块

2.2.1 应用层缓冲区 (Buffer 类)

作用:管理用户态内存缓冲区,解决 TCP 粘包/拆包问题,配合非阻塞 IO 处理 EAGAIN 错误。

// ================= 1. 应用层缓冲区 (Buffer) =================
// 用于解决 TCP 粘包/拆包,以及暂存待发送数据
class Buffer {
public:
    Buffer() : readPos_(0), writePos_(0) {
        buffer_.resize(BUFFER_SIZE);
    }

    // 获取可读字节数
    size_t readableBytes() const { return writePos_ - readPos_; }
    
    // 获取可写字节数
    size_t writableBytes() const { return buffer_.size() - writePos_; }

    // 获取读指针
    const char* peek() const { return begin() + readPos_; }

    // 读取数据后移动指针
    void retrieve(size_t len) { readPos_ += len; }
    void retrieveAll() { readPos_ = 0; writePos_ = 0; }

    // 写入数据
    void append(const char* data, size_t len) {
        if (writableBytes() < len) {
            // 简单扩容策略
            buffer_.resize(writePos_ + len);
        }
        std::copy(data, data + len, begin() + writePos_);
        writePos_ += len;
    }

    // 从 fd 读取数据到 buffer (核心:ET 模式必须循环读)
    ssize_t readFd(int fd, int* saveErrno) {
        char buff[BUFFER_SIZE];
        ssize_t len = -1;
        do {
            len = read(fd, buff, sizeof(buff));
            if (len < 0) {
                *saveErrno = errno;
                // EAGAIN: 数据读完 (非阻塞下); EINTR: 被信号中断
                if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
                    len = 0; 
                }
                break;
            } else if (len == 0) {
                // 对端关闭
                break;
            } else {
                append(buff, len);
            }
        } while (len > 0); // ET 模式关键:直到 read 返回 <0 或 0
        return len;
    }

    // 从 buffer 发送数据到 fd
    ssize_t writeFd(int fd, int* saveErrno) {
        ssize_t len = -1;
        do {
            size_t readSize = readableBytes();
            if (readSize == 0) break;
            
            len = write(fd, peek(), readSize);
            if (len < 0) {
                *saveErrno = errno;
                if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
                    len = 0; // 暂时发不动,下次再发
                }
                break;
            }
            retrieve(len); // 发送成功,移动读指针
        } while (len > 0); // 尝试发送直到缓冲区空或 EAGAIN
        return len;
    }

private:
    std::vector<char> buffer_;
    size_t readPos_;
    size_t writePos_;

    char* begin() { return &*buffer_.begin(); }
    const char* begin() const { return &*buffer_.begin(); }
};

成员/方法

可见性

功能说明

buffer_

private

底层数据存储容器,动态扩容。

readPos_

private

读指针,指向当前未处理数据的起始位置。

writePos_

private

写指针,指向当前可用空间的起始位置。

readableBytes()

public

返回当前缓冲区中可读的数据长度 (writePos_ - readPos_)。

writableBytes()

public

返回当前缓冲区剩余可写空间长度。

peek()

public

返回读指针位置的常量指针,用于读取数据但不移动指针。

retrieve(len)

public

移动读指针,表示前 len 字节数据已处理完毕。

retrieveAll()

public

重置读写指针为 0,清空缓冲区。

append(data, len)

public

将数据写入缓冲区末尾,自动扩容空间不足时。

readFd(fd, errno)

public

核心方法。循环从 fd 读取数据到缓冲区。

必须循环 read,直到返回 -1 且 errno == EAGAIN,确保取空内核缓冲区。

writeFd(fd, errno)

public

核心方法。循环将缓冲区数据写入 fd。

必须循环 write,若遇到 EAGAIN 则停止,保留剩余数据待下次发送。

begin()

private

返回 vector 底层数组首地址,供内部使用。

2.2.2 连接对象 (Connection 类)

作用:封装单个客户端连接的状态、文件描述符及对应的收发缓冲区。

// ================= 2. 连接对象 (Connection) =================
class Connection {
public:
    Connection(int fd) : fd_(fd), isClosed_(false) {}
    ~Connection() {
        if (fd_ > 0) close(fd_);
    }

    int getFd() const { return fd_; }
    bool isClosed() const { return isClosed_; }
    void setClosed() { isClosed_ = true; }

    Buffer& inputBuffer() { return inBuf_; }
    Buffer& outputBuffer() { return outBuf_; }

    // 处理业务逻辑 (示例:Echo Server)
    void handleRequest() {
        // 简单 Echo 逻辑:收到什么发回什么
        size_t n = inBuf_.readableBytes();
        if (n > 0) {
            outBuf_.append(inBuf_.peek(), n);
            inBuf_.retrieveAll();
        }
    }

private:
    int fd_;
    bool isClosed_;
    Buffer inBuf_;   // 接收缓冲区
    Buffer outBuf_;  // 发送缓冲区
};

2.2.3 服务器核心 (EpollServer 类)

作用:实现 Reactor 模式,管理事件循环、连接生命周期及事件分发。

// ================= 3. Epoll 服务器核心 (EpollServer) =================
class EpollServer {
public:
    EpollServer() : epfd_(-1), listenfd_(-1) {}
    ~EpollServer() {
        if (epfd_ >= 0) close(epfd_);
        if (listenfd_ >= 0) close(listenfd_);
    }

    bool init(uint16_t port) {
        // 1. 创建监听 socket
        listenfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (listenfd_ < 0) { LOG_ERROR("socket create error"); return false; }

        // 2. 地址复用
        int opt = 1;
        setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

        // 3. 绑定
        struct sockaddr_in addr{};
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
        if (bind(listenfd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
            LOG_ERROR("bind error"); return false;
        }

        // 4. 监听
        if (listen(listenfd_, 1024) < 0) {
            LOG_ERROR("listen error"); return false;
        }

        // 5. 设置监听 fd 为非阻塞 (ET 模式必须)
        setNonBlocking(listenfd_);

        // 6. 创建 epoll
        epfd_ = epoll_create1(0);
        if (epfd_ < 0) { LOG_ERROR("epoll_create error"); return false; }

        // 7. 将监听 fd 加入 epoll (监听 fd 通常用 LT 或 ET 均可,这里统一用 ET)
        addFd(listenfd_, EPOLLIN | EPOLLET);
        
        LOG_INFO("Server started on port %d", port);
        return true;
    }

    void start() {
        struct epoll_event events[MAX_EVENTS];
        while (true) {
            // 8. 等待事件
            int nfds = epoll_wait(epfd_, events, MAX_EVENTS, -1);
            if (nfds < 0) {
                if (errno == EINTR) continue; // 被信号中断,继续
                LOG_ERROR("epoll_wait error");
                break;
            }

            for (int i = 0; i < nfds; ++i) {
                int curFd = events[i].data.fd;
                uint32_t revents = events[i].events;

                if (curFd == listenfd_) {
                    handleAccept();
                } else if (revents & EPOLLIN) {
                    handleRead(curFd);
                } else if (revents & EPOLLOUT) {
                    handleWrite(curFd);
                } else if (revents & (EPOLLHUP | EPOLLERR)) {
                    closeConnection(curFd);
                }
            }
        }
    }

private:
    // 设置非阻塞
    void setNonBlocking(int fd) {
        int flag = fcntl(fd, F_GETFL);
        fcntl(fd, F_SETFL, flag | O_NONBLOCK);
    }

    // 添加/修改 epoll 事件
    void modFd(int fd, uint32_t events) {
        struct epoll_event ev{};
        ev.events = events;
        ev.data.fd = fd;
        epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);
    }

    void addFd(int fd, uint32_t events) {
        struct epoll_event ev{};
        ev.events = events;
        ev.data.fd = fd;
        epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
    }

    void delFd(int fd) {
        epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);
    }

    // 处理新连接
    void handleAccept() {
        struct sockaddr_in clientAddr{};
        socklen_t len = sizeof(clientAddr);
        // ET 模式 accept 也必须循环,直到返回 EAGAIN
        while (true) {
            int connFd = accept(listenfd_, (struct sockaddr*)&clientAddr, &len);
            if (connFd < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break; // 连接处理完毕
                }
                LOG_ERROR("accept error");
                break;
            }
            // 设置新连接为非阻塞
            setNonBlocking(connFd);
            // 加入 epoll,监听读事件 + ET 模式
            addFd(connFd, EPOLLIN | EPOLLET);
            // 创建连接对象
            connections_[connFd] = std::make_unique<Connection>(connFd);
            LOG_INFO("New connection: fd=%d, IP=%s", connFd, inet_ntoa(clientAddr.sin_addr));
        }
    }

    // 处理读事件 (ET 模式核心)
    void handleRead(int fd) {
        auto it = connections_.find(fd);
        if (it == connections_.end()) return;
        
        Connection* conn = it->second.get();
        int saveErrno = 0;
        // 循环读取,直到 EAGAIN
        ssize_t n = conn->inputBuffer().readFd(fd, &saveErrno);
        
        if (n < 0) {
            // 真正错误
            closeConnection(fd);
        } else if (n == 0) {
            // 对端关闭
            closeConnection(fd);
        } else {
            // 有数据,处理业务
            conn->handleRequest();
            // 如果有数据要回写,且当前未监听 OUT 事件,则修改监听
            if (conn->outputBuffer().readableBytes() > 0) {
                // 注意:ET 模式下,如果 write 可能阻塞,必须监听 EPOLLOUT
                modFd(fd, EPOLLIN | EPOLLOUT | EPOLLET);
                handleWrite(fd); // 尝试立即发送
            }
        }
    }

    // 处理写事件
    void handleWrite(int fd) {
        auto it = connections_.find(fd);
        if (it == connections_.end()) return;

        Connection* conn = it->second.get();
        int saveErrno = 0;
        ssize_t n = conn->outputBuffer().writeFd(fd, &saveErrno);

        if (n < 0) {
            closeConnection(fd);
        } else {
            // 如果发送缓冲区空了,不再监听写事件,只监听读事件 (减少唤醒)
            if (conn->outputBuffer().readableBytes() == 0) {
                modFd(fd, EPOLLIN | EPOLLET);
            }
        }
    }

    // 关闭连接
    void closeConnection(int fd) {
        auto it = connections_.find(fd);
        if (it != connections_.end()) {
            LOG_INFO("Close connection: fd=%d", fd);
            delFd(fd);
            connections_.erase(it); // unique_ptr 自动析构,关闭 fd
        }
    }

private:
    int epfd_;
    int listenfd_;
    std::unordered_map<int, std::unique_ptr<Connection>> connections_;
};

成员/方法

可见性

功能说明

ET 模式关键点

epfd_

private

epoll 实例的文件描述符。

内核事件表句柄。

listenfd_

private

监听套接字文件描述符。

用于 accept 新连接。

connections_

private

管理所有活跃连接对象 (fd -> Connection*)。

确保连接对象生命周期与 fd 一致。

init(port)

public

创建 socket,绑定端口,监听,创建 epoll,注册监听 fd。

监听 fd 也设为 非阻塞 + ET

start()

public

主事件循环。调用 epoll_wait 并分发事件。

阻塞等待事件,唤醒后遍历就绪数组。

setNonBlocking(fd)

private

利用 fcntl 设置 fd 为 O_NONBLOCK

ET 模式必须配合非阻塞 IO

addFd(fd, events)

private

封装 epoll_ctl(ADD),添加监听事件。

新连接默认只监听 EPOLLIN | EPOLLET

modFd(fd, events)

private

封装 epoll_ctl(MOD),修改监听事件。

动态切换 EPOLLOUT,避免空转。

delFd(fd)

private

封装 epoll_ctl(DEL),从 epoll 移除 fd。

关闭连接前必须移除。

handleAccept()

private

处理新连接事件。循环 accept 直到 EAGAIN

必须循环 accept,防止连接积压。

handleRead(fd)

private

处理读事件。调用 Buffer::readFd 读取数据。

读取后若需回复,注册 EPOLLOUT 并尝试发送。

handleWrite(fd)

private

处理写事件。调用 Buffer::writeFd 发送数据。

发送完毕后,注销 EPOLLOUT,只监听读。

closeConnection(fd)

private

关闭连接。从 epoll 移除,从 map 删除,关闭 fd。

确保资源彻底释放,防止野指针。

2.2 类图

2.3 源码

#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <memory>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>

// ================= 配置常量 =================
const int MAX_EVENTS = 1024;
const int LISTEN_PORT = 9999;
const int BUFFER_SIZE = 4096;

// ================= 日志宏 =================
#define LOG_INFO(fmt, ...) printf("[INFO] " fmt "\n", ##__VA_ARGS__)
#define LOG_ERROR(fmt, ...) printf("[ERROR] " fmt "\n", ##__VA_ARGS__)

// ================= 1. 应用层缓冲区 (Buffer) =================
// 用于解决 TCP 粘包/拆包,以及暂存待发送数据
class Buffer {
public:
    Buffer() : readPos_(0), writePos_(0) {
        buffer_.resize(BUFFER_SIZE);
    }

    // 获取可读字节数
    size_t readableBytes() const { return writePos_ - readPos_; }
    
    // 获取可写字节数
    size_t writableBytes() const { return buffer_.size() - writePos_; }

    // 获取读指针
    const char* peek() const { return begin() + readPos_; }

    // 读取数据后移动指针
    void retrieve(size_t len) { readPos_ += len; }
    void retrieveAll() { readPos_ = 0; writePos_ = 0; }

    // 写入数据
    void append(const char* data, size_t len) {
        if (writableBytes() < len) {
            // 简单扩容策略
            buffer_.resize(writePos_ + len);
        }
        std::copy(data, data + len, begin() + writePos_);
        writePos_ += len;
    }

    // 从 fd 读取数据到 buffer (核心:ET 模式必须循环读)
    ssize_t readFd(int fd, int* saveErrno) {
        char buff[BUFFER_SIZE];
        ssize_t len = -1;
        do {
            len = read(fd, buff, sizeof(buff));
            if (len < 0) {
                *saveErrno = errno;
                // EAGAIN: 数据读完 (非阻塞下); EINTR: 被信号中断
                if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
                    len = 0; 
                }
                break;
            } else if (len == 0) {
                // 对端关闭
                break;
            } else {
                append(buff, len);
            }
        } while (len > 0); // ET 模式关键:直到 read 返回 <0 或 0
        return len;
    }

    // 从 buffer 发送数据到 fd
    ssize_t writeFd(int fd, int* saveErrno) {
        ssize_t len = -1;
        do {
            size_t readSize = readableBytes();
            if (readSize == 0) break;
            
            len = write(fd, peek(), readSize);
            if (len < 0) {
                *saveErrno = errno;
                if (*saveErrno == EAGAIN || *saveErrno == EINTR) {
                    len = 0; // 暂时发不动,下次再发
                }
                break;
            }
            retrieve(len); // 发送成功,移动读指针
        } while (len > 0); // 尝试发送直到缓冲区空或 EAGAIN
        return len;
    }

private:
    std::vector<char> buffer_;
    size_t readPos_;
    size_t writePos_;

    char* begin() { return &*buffer_.begin(); }
    const char* begin() const { return &*buffer_.begin(); }
};

// ================= 2. 连接对象 (Connection) =================
class Connection {
public:
    Connection(int fd) : fd_(fd), isClosed_(false) {}
    ~Connection() {
        if (fd_ > 0) close(fd_);
    }

    int getFd() const { return fd_; }
    bool isClosed() const { return isClosed_; }
    void setClosed() { isClosed_ = true; }

    Buffer& inputBuffer() { return inBuf_; }
    Buffer& outputBuffer() { return outBuf_; }

    // 处理业务逻辑 (示例:Echo Server)
    void handleRequest() {
        // 简单 Echo 逻辑:收到什么发回什么
        size_t n = inBuf_.readableBytes();
        if (n > 0) {
            outBuf_.append(inBuf_.peek(), n);
            inBuf_.retrieveAll();
        }
    }

private:
    int fd_;
    bool isClosed_;
    Buffer inBuf_;   // 接收缓冲区
    Buffer outBuf_;  // 发送缓冲区
};

// ================= 3. Epoll 服务器核心 (EpollServer) =================
class EpollServer {
public:
    EpollServer() : epfd_(-1), listenfd_(-1) {}
    ~EpollServer() {
        if (epfd_ >= 0) close(epfd_);
        if (listenfd_ >= 0) close(listenfd_);
    }

    bool init(uint16_t port) {
        // 1. 创建监听 socket
        listenfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (listenfd_ < 0) { LOG_ERROR("socket create error"); return false; }

        // 2. 地址复用
        int opt = 1;
        setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

        // 3. 绑定
        struct sockaddr_in addr{};
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
        if (bind(listenfd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
            LOG_ERROR("bind error"); return false;
        }

        // 4. 监听
        if (listen(listenfd_, 1024) < 0) {
            LOG_ERROR("listen error"); return false;
        }

        // 5. 设置监听 fd 为非阻塞 (ET 模式必须)
        setNonBlocking(listenfd_);

        // 6. 创建 epoll
        epfd_ = epoll_create1(0);
        if (epfd_ < 0) { LOG_ERROR("epoll_create error"); return false; }

        // 7. 将监听 fd 加入 epoll (监听 fd 通常用 LT 或 ET 均可,这里统一用 ET)
        addFd(listenfd_, EPOLLIN | EPOLLET);
        
        LOG_INFO("Server started on port %d", port);
        return true;
    }

    void start() {
        struct epoll_event events[MAX_EVENTS];
        while (true) {
            // 8. 等待事件
            int nfds = epoll_wait(epfd_, events, MAX_EVENTS, -1);
            if (nfds < 0) {
                if (errno == EINTR) continue; // 被信号中断,继续
                LOG_ERROR("epoll_wait error");
                break;
            }

            for (int i = 0; i < nfds; ++i) {
                int curFd = events[i].data.fd;
                uint32_t revents = events[i].events;

                if (curFd == listenfd_) {
                    handleAccept();
                } else if (revents & EPOLLIN) {
                    handleRead(curFd);
                } else if (revents & EPOLLOUT) {
                    handleWrite(curFd);
                } else if (revents & (EPOLLHUP | EPOLLERR)) {
                    closeConnection(curFd);
                }
            }
        }
    }

private:
    // 设置非阻塞
    void setNonBlocking(int fd) {
        int flag = fcntl(fd, F_GETFL);
        fcntl(fd, F_SETFL, flag | O_NONBLOCK);
    }

    // 添加/修改 epoll 事件
    void modFd(int fd, uint32_t events) {
        struct epoll_event ev{};
        ev.events = events;
        ev.data.fd = fd;
        epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);
    }

    void addFd(int fd, uint32_t events) {
        struct epoll_event ev{};
        ev.events = events;
        ev.data.fd = fd;
        epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
    }

    void delFd(int fd) {
        epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);
    }

    // 处理新连接
    void handleAccept() {
        struct sockaddr_in clientAddr{};
        socklen_t len = sizeof(clientAddr);
        // ET 模式 accept 也必须循环,直到返回 EAGAIN
        while (true) {
            int connFd = accept(listenfd_, (struct sockaddr*)&clientAddr, &len);
            if (connFd < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break; // 连接处理完毕
                }
                LOG_ERROR("accept error");
                break;
            }
            // 设置新连接为非阻塞
            setNonBlocking(connFd);
            // 加入 epoll,监听读事件 + ET 模式
            addFd(connFd, EPOLLIN | EPOLLET);
            // 创建连接对象
            connections_[connFd] = std::make_unique<Connection>(connFd);
            LOG_INFO("New connection: fd=%d, IP=%s", connFd, inet_ntoa(clientAddr.sin_addr));
        }
    }

    // 处理读事件 (ET 模式核心)
    void handleRead(int fd) {
        auto it = connections_.find(fd);
        if (it == connections_.end()) return;
        
        Connection* conn = it->second.get();
        int saveErrno = 0;
        // 循环读取,直到 EAGAIN
        ssize_t n = conn->inputBuffer().readFd(fd, &saveErrno);
        
        if (n < 0) {
            // 真正错误
            closeConnection(fd);
        } else if (n == 0) {
            // 对端关闭
            closeConnection(fd);
        } else {
            // 有数据,处理业务
            conn->handleRequest();
            // 如果有数据要回写,且当前未监听 OUT 事件,则修改监听
            if (conn->outputBuffer().readableBytes() > 0) {
                // 注意:ET 模式下,如果 write 可能阻塞,必须监听 EPOLLOUT
                modFd(fd, EPOLLIN | EPOLLOUT | EPOLLET);
                handleWrite(fd); // 尝试立即发送
            }
        }
    }

    // 处理写事件
    void handleWrite(int fd) {
        auto it = connections_.find(fd);
        if (it == connections_.end()) return;

        Connection* conn = it->second.get();
        int saveErrno = 0;
        ssize_t n = conn->outputBuffer().writeFd(fd, &saveErrno);

        if (n < 0) {
            closeConnection(fd);
        } else {
            // 如果发送缓冲区空了,不再监听写事件,只监听读事件 (减少唤醒)
            if (conn->outputBuffer().readableBytes() == 0) {
                modFd(fd, EPOLLIN | EPOLLET);
            }
        }
    }

    // 关闭连接
    void closeConnection(int fd) {
        auto it = connections_.find(fd);
        if (it != connections_.end()) {
            LOG_INFO("Close connection: fd=%d", fd);
            delFd(fd);
            connections_.erase(it); // unique_ptr 自动析构,关闭 fd
        }
    }

private:
    int epfd_;
    int listenfd_;
    std::unordered_map<int, std::unique_ptr<Connection>> connections_;
};

// ================= 4. 主函数 =================
int main() {
    EpollServer server;
    if (!server.init(LISTEN_PORT)) {
        return -1;
    }
    server.start();
    return 0;
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐