1. 模块概述

1.1 核心功能与设计目标

Poller 是 mymuduo 网络库中 I/O 多路复用的抽象基类,它定义了事件轮询的标准接口。Poller 模块的核心设计目标包括:

  1. Channel 管理:管理所有注册的 Channel
  2. 封装 epoll 操作epoll_wait 等待,以及 Channel 需要 Poller 实现 epoll 的添加/监听/删除
  3. 事件轮询epoll_wait 等待并返回就绪的事件列表
  4. 线程安全:确保在正确的线程中操作
                    +----------------+
                    |   EventLoop    |
                    +-------+--------+
                            |
                    +-------v--------+
                    |     Poller     |
                    +-------+--------+
                            |
            +---------------+---------------+
            |               |               |
    +-------v-------+ +-----v-----+ +-------v-------+
    |   Channel     | |  Channel  | |   Channel     |
    |   (fd=3)      | |  (fd=4)   | |   (fd=5)      |
    +---------------+ +-----------+ +---------------+

1.2 设计理念

Poller 采用策略模式(Strategy Pattern)设计,允许在运行时选择不同的 I/O 多路复用实现:

                    +----------------+
                    |     Poller     |  (抽象基类)
                    +-------+--------+
                            |
            +---------------+---------------+
            |                               |
    +-------v-------+               +-------v-------+
    |  EPollPoller  |               |   PollPoller  |
    |   (Linux)     |               |   (通用)      |
    +---------------+               +---------------+

mymuduo 默认使用 EPollPoller(Linux 高性能实现),同时预留了 PollPoller 的扩展接口。

2. 源码

2.1 Poller.h

#pragma once
#include "NonCopyable.h" 
#include "Timestamp.h" 
#include <vector>         // 动态数组(存储活跃 Channel)
#include <unordered_map>  // 哈希表(存储 fd 到 Channel 的映射)
namespace mymuduo {
// 前向声明(避免头文件循环依赖)
class Channel;
class EventLoop;

/**
 * Poller 类:事件轮询器的抽象基类
 * 核心职责:
 *  1. 监听一组 fd 的事件,并返回就绪的 Channel 列表。
 *  2. 管理 Channel 的注册、更新和移除。
 */
class Poller : NonCopyable {
public:
    using ChannelList = std::vector<Channel *>;

    Poller(EventLoop *loop);     // 构造函数(需传入所属 EventLoop)
    virtual ~Poller() = default; // 虚析构函数(允许派生类重写)

    /**
     * @brief 轮询事件(纯虚函数,由派生类实现)
     * @param timeoutMs 超时时间(毫秒)
     * @param activeChannels 输出参数,用于存储就绪的 Channel 列表
     * @return 事件到达的时间戳(Timestamp 类型)
     */
    virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;

    virtual void updateChannel(Channel *channel) = 0; // 更新 Channel 监听的事件(纯虚函数)
    virtual void removeChannel(Channel *channel) = 0; // 移除 Channel(停止监听其 fd)
    bool hasChannel(Channel *channel) const;          // 检查 Channel 是否已被注册
    // 创建默认的 Poller 实例(根据系统选择 epoll 或 poll),放在一个单独的源文件中实现防止基类包含派生类,头文件循环依赖
    static Poller *newDefaultPoller(EventLoop *loop); 
protected:
    using ChannelMap = std::unordered_map<int, Channel*>; 
    ChannelMap channels_;   // 所有被监听的 Channel(键为 fd,值为 Channel*)
private:
    EventLoop *ownerLoop_;  // 所属的 EventLoop(用于检查线程安全性)
};
}  // namespace mymuduo

2.2 Poller.cpp

#include "Poller.h"
#include "Channel.h"
using namespace mymuduo;
Poller::Poller(EventLoop *loop)
    : ownerLoop_(loop) {}
 
bool Poller::hasChannel(Channel *channel) const
{
    auto it = channels_.find(channel->fd());
    return it != channels_.end() && it->second == channel;
}

2.3 EPollPoller.cpp

#include "EPollPoller.h"
#include "Timestamp.h"
#include "LogStream.h"
#include <unistd.h>     // 提供 close() 系统调用
#include <cstring>      // 提供 memset 函数
using namespace mymuduo;
const int kNew = -1;    // 某个channel还没添加至Poller          // channel的成员index_初始化为-1
const int kAdded = 1;   // 某个channel已经添加至Poller
const int kDeleted = 2; // 某个channel已经从Poller删除

EPollPoller::EPollPoller(EventLoop *loop)
    : Poller(loop),                             // 调用基类构造函数,传入所属的EventLoop
      epollfd_(::epoll_create1(EPOLL_CLOEXEC)), // 创建epoll实例​(创建epoll树根节点)
        // ​自动关闭​​:通过该 epoll创建的文件描述符会在程序调用 exec系列函数(如 execve())​​执行新程序时自动关闭​​。
        // ​​安全隔离​​:防止子进程继承父进程的 epoll文件描述符,避免意外的资源泄漏或竞争条件。
      events_(kInitEventListSize)               // 初始化事件列表,默认大小16
{
    if (epollfd_ < 0) {                               // 检查epoll创建是否成功
        LOG_ERROR << "epoll_create error: " << errno; // 记录错误日志
        exit(-1);                                     // 创建失败则退出程序
    }
}
 
EPollPoller::~EPollPoller() {
    ::close(epollfd_); // 关闭epoll文件描述符
}

// 核心事件轮询函数
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {
    // 记录当前管理的文件描述符总数
    LOG_DEBUG << "fd total count: " << channels_.size();
    
    // 调用epoll_wait等待事件发生
    // &*events_.begin()获取events_底层数组首地址
    /*
        * @function: int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
        * @param: epfd: epoll模型的文件描述符
        * @param: evs: 传出参数, 事件结构体数组(指向struct epoll_event结构体)
        * @param: maxevents: 数组的大小
        * @param: timeout: 超时时间(-1:阻塞、0:非阻塞、>0:毫秒)
        * @return: 成功返回就绪的文件描述符数量, -1 失败
    */
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    
    // 保存errno,防止被后续操作覆盖
    int saveErrno = errno;
    // 获取当前时间戳
    Timestamp now(Timestamp::now());

    if (numEvents > 0) { // 有事件发生
        LOG_DEBUG << numEvents << " events happend";
        // 填充活跃的Channel列表
        fillActiveChannels(numEvents, activeChannels);
        // 如果事件数量等于当前容量,扩容一倍
        if (numEvents == events_.size()) {
            events_.resize(events_.size() * 2);
        }
    }else if (numEvents == 0) { // 超时
        LOG_DEBUG << " timeout!";
    }else { // 出错
        if (saveErrno != EINTR) { // 不是被信号中断的错误
            errno = saveErrno;
            LOG_ERROR << "EPollPoller::poll() error!";
        }
    }
    return now; // 返回事件发生的时间戳
}

void EPollPoller::updateChannel(Channel *channel) {
    // 获取Channel当前状态
    const int index = channel->index();
    // 记录日志:文件描述符、关注的事件和当前状态
    LOG_INFO << "update fd= " << channel->fd()  << " events= " << channel->events()  << " index= " << index;
 
    if (index == kNew || index == kDeleted) { // 新Channel或已删除的Channel
        if (index == kNew) {            // 新Channel
            int fd = channel->fd();
            channels_[fd] = channel;    // 添加到channels_映射表中
        }
        channel->set_index(kAdded);     // 设置状态为已添加
        update(EPOLL_CTL_ADD, channel); // 调用epoll_ctl添加监听
    }else{                              // 已注册的Channel
        int fd = channel->fd();
        if (channel->isNoneEvent()) {       // 不关注任何事件
            update(EPOLL_CTL_DEL, channel); // 从epoll中删除监听
            channel->set_index(kDeleted);   // 设置状态为已删除
        }else { // 关注的事件有变化
            update(EPOLL_CTL_MOD, channel); // 修改监听的事件
        }
    }
}
void EPollPoller::removeChannel(Channel *channel){
    int fd = channel->fd();
    channels_.erase(fd);
    LOG_INFO << "remove fd= " << fd;
    int index = channel->index();
    if (index == kAdded) update(EPOLL_CTL_DEL, channel);
    channel->set_index(kNew);
}

void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const{
    for (int i = 0; i < numEvents; ++i){
        Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);
    }
}
 
void EPollPoller::update(int operation, Channel *channel){
    epoll_event event;
    ::memset(&event, 0, sizeof(event));
 
    int fd = channel->fd();
 
    event.events = channel->events();
    event.data.ptr = channel;
 
    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0){
        if (operation == EPOLL_CTL_DEL){
            LOG_ERROR << "epoll_ctl del error: " << errno;
        }else{
            LOG_ERROR << "epoll_ctl add/mod error: " << errno;
            exit(-1);
        }
    }
}

3. 模块详解

3.1 模块交互

3.1.1 Channel 注册流程

+------------+     +------------+     +------------+     +------------+
|  Channel   |     | EventLoop  |     |   Poller   |     |   epoll    |
+-----+------+     +-----+------+     +-----+------+     +-----+------+
      |                  |                  |                  |
      | enableReading()  |                  |                  |
      |----------------->|                  |                  |
      |                  |                  |                  |
      |                  | updateChannel()  |                  |
      |                  |----------------->|                  |
      |                  |                  |                  |
      |                  |                  | epoll_ctl(ADD)   |
      |                  |                  |----------------->|
      |                  |                  |                  |
      |                  |                  |     成功         |
      |                  |                  |<-----------------|
      |                  |                  |                  |

3.1.2 事件等待流程

+------------+     +------------+     +------------+
| EventLoop  |     |   Poller   |     |   epoll    |
+-----+------+     +-----+------+     +-----+------+
      |                  |                  |
      |   poll()         |                  |
      |----------------->|                  |
      |                  |                  |
      |                  |  epoll_wait()    |
      |                  |----------------->|
      |                  |                  |
      |                  |    [阻塞等待]     |
      |                  |                  |
      |                  |   事件就绪        |
      |                  |<-----------------|
      |                  |                  |
      | activeChannels   |                  |
      |<-----------------|                  |
      |                  |                  |

3.2 更新 epoll 所关心的事件

void EPollPoller::updateChannel(Channel *channel) {
    // 1. 获取 Channel 当前记录的状态
    const int index = channel->index();
    
    LOG_INFO << "update fd= " << channel->fd()  
             << " events= " << channel->events()  
             << " index= " << index;
 
    // 2. 分支 A:新 Channel 或 已删除的 Channel
    if (index == kNew || index == kDeleted) {
        
        if (index == kNew) {            
            // 如果是全新的,加入本地map,方便后续查找
            int fd = channel->fd();
            channels_[fd] = channel;    
        }
        
        // 状态更新为"已添加"
        channel->set_index(kAdded);     
        
        // 调用内核:ADD 操作
        update(EPOLL_CTL_ADD, channel); 
    } 
    // 3. 分支 B:已经在 epoll 中的 Channel
    else {                              
        int fd = channel->fd();
        
        if (channel->isNoneEvent()) {       
            // 情况 B1:Channel 不关心任何事件了(如连接关闭)
            // 调用内核:DEL 操作
            update(EPOLL_CTL_DEL, channel); 
            // 状态更新为"已删除"
            channel->set_index(kDeleted);   
        } else { 
            // 情况 B2:还关心事件,只是类型变了(如从读改为读写)
            // 调用内核:MOD 操作
            update(EPOLL_CTL_MOD, channel); 
        }
    }
}

void EPollPoller::update(int operation, Channel *channel){
    epoll_event event;
    ::memset(&event, 0, sizeof(event));  // 1. 清零,避免垃圾数据
 
    int fd = channel->fd();
 
    // 2. 设置关注的事件类型(如 EPOLLIN | EPOLLOUT)
    event.events = channel->events();    
    
    // 3. 【关键】把 Channel 指针存进去,回调时能取回
    event.data.ptr = channel;            
 
    // 4. 调用系统函数
    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0){
        if (operation == EPOLL_CTL_DEL){
            // 删除失败通常不致命(可能 fd 已经关了)
            LOG_ERROR << "epoll_ctl del error: " << errno;
        }else{
            // 添加/修改失败是致命错误,程序无法继续
            LOG_ERROR << "epoll_ctl add/mod error: " << errno;
            exit(-1);
        }
    }
}

updateChannel:根据 Channel 的当前状态,决定调用哪种 epoll_ctl 操作。

update:不关心状态,只负责把参数传给内核。

阶段

触发者

调用链

updateChannel 的动作

内核状态

1. 连接建立

TcpConnection

enableReading() → updateChannel()

发现 index_=kNew → 执行 EPOLL_CTL_ADD

开始监听读

2. 数据发送

TcpConnection

enableWriting() → updateChannel()

发现 index_=kAdded → 执行 EPOLL_CTL_MOD

监听读 + 写

3. 发送完成

TcpConnection

disableWriting() → updateChannel()

发现 index_=kAdded → 执行 EPOLL_CTL_MOD

只监听读

4. 连接关闭

TcpConnection

close() → updateChannel()

发现 isNoneEvent() → 执行 EPOLL_CTL_DEL

停止监听

5. 对象销毁

EventLoop

removeChannel()

从map erase,重置 index_=kNew

清理完毕

3.3 事件等待 (poll) 与列表填充

  • poll:负责 (阻塞等待内核通知)。
  • fillActiveChannels:负责 (把内核通知的事件打包好,交给上层处理)。
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {
    // 1. 调用内核函数,阻塞等待事件
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    
    // 2. 保存 errno,防止被后续日志打印覆盖
    int saveErrno = errno;
    Timestamp now(Timestamp::now());

    if (numEvents > 0) { 
        // 3. 有事件发生,填充 activeChannels 列表
        fillActiveChannels(numEvents, activeChannels);
        // 4. 如果事件满了,说明容量不够,扩容一倍(动态适应高并发)
        if (numEvents == events_.size()) {
            events_.resize(events_.size() * 2);
        }
    } else if (numEvents == 0) { 
        // 超时,正常现象
    } else { 
        // 出错了
        if (saveErrno != EINTR) { // 如果不是被信号中断,则是真错误
            LOG_ERROR << "EPollPoller::poll() error!";
        }
    }
    return now;
}

void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const{
    for (int i = 0; i < numEvents; ++i){
        // 1. 从内核返回的事件结构中取出我们之前存的 Channel 指针
        Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
        // 2. 把内核返回的事件类型(如 EPOLLIN)告诉 Channel
        channel->set_revents(events_[i].events);
        // 3. 加入活跃列表,稍后 EventLoop 会处理它们
        activeChannels->push_back(channel);
    }
}

poll 唯一作用:调用 epoll_wait,通知对应的 fd 的 Channel,获取就绪事件,并记录时间。

fillActiveChannels:获取内核返回的活跃的 fd,获取对应的 Channel 并通知 revents_ 给对应的 Channel

4. 全流程

模拟一次 客户端发送数据 的全过程:

  1. 初始化
    • EventLoop 创建 EPollPoller。
    • TcpConnection 创建 Channel (fd=5)。
    • Channel 初始 index_ = kNew。
  2. 注册监听
    • TcpConnection 调用 channel->enableReading() 使能监听读事件。
    • Channel::update() -> EventLoop::updateChannel() -> Poller::updateChannel()。Channel 调用 epoll 的封装类 Poller 实现监听读事件
    • EPollPoller 发现 index_ == kNew。
    • 调用 epoll_ctl(ADD, fd=5, ptr=&channel)。
    • 设置 channel->index_ = kAdded。
  3. 事件循环
    • EventLoop::loop() 调用 poller->poll()。
    • epoll_wait 阻塞。
  4. 事件到达
    • 客户端发送数据,内核唤醒 epoll_wait。
    • epoll_wait 返回 1,events_[0].data.ptr 指向 &channel,events_[0].events 是 EPOLLIN。
    • fillActiveChannels 把 channel 放入 activeChannels 列表,并通知 Channel
    • channel->set_revents(EPOLLIN),Channel 完成事件设置
  5. 回调处理
    • EventLoop 遍历 activeChannels,调用 channel->handleEvent()。
    • Channel 检查 revents_ 是读事件,调用 readCallback_。
    • readCallback_ 实际上是 TcpConnection::handleRead。
    • 数据被读取,业务逻辑执行。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐