Muduo:(2) EPollPoller 实现 epoll 封装、 fd 事件监听与事件通知
Poller是mymuduo网络库中I/O多路复用的抽象基类,采用策略模式设计,支持多种I/O多路复用实现。核心功能包括事件轮询接口定义、Channel管理、线程安全保证等。默认使用高性能的EPollPoller实现,同时保留PollPoller扩展接口。模块通过Channel注册流程管理文件描述符,使用epoll_wait进行事件等待,并通过fillActiveChannels处理就绪事件。关键
1. 模块概述
1.1 核心功能与设计目标
Poller 是 mymuduo 网络库中 I/O 多路复用的抽象基类,它定义了事件轮询的标准接口。Poller 模块的核心设计目标包括:
- Channel 管理:管理所有注册的 Channel
- 封装 epoll 操作:epoll_wait 等待,以及 Channel 需要 Poller 实现 epoll 的添加/监听/删除
- 事件轮询:epoll_wait 等待并返回就绪的事件列表
- 线程安全:确保在正确的线程中操作
+----------------+
| EventLoop |
+-------+--------+
|
+-------v--------+
| Poller |
+-------+--------+
|
+---------------+---------------+
| | |
+-------v-------+ +-----v-----+ +-------v-------+
| Channel | | Channel | | Channel |
| (fd=3) | | (fd=4) | | (fd=5) |
+---------------+ +-----------+ +---------------+
1.2 设计理念
Poller 采用策略模式(Strategy Pattern)设计,允许在运行时选择不同的 I/O 多路复用实现:
+----------------+
| Poller | (抽象基类)
+-------+--------+
|
+---------------+---------------+
| |
+-------v-------+ +-------v-------+
| EPollPoller | | PollPoller |
| (Linux) | | (通用) |
+---------------+ +---------------+
mymuduo 默认使用 EPollPoller(Linux 高性能实现),同时预留了 PollPoller 的扩展接口。
2. 源码
2.1 Poller.h
#pragma once
#include "NonCopyable.h"
#include "Timestamp.h"
#include <vector> // 动态数组(存储活跃 Channel)
#include <unordered_map> // 哈希表(存储 fd 到 Channel 的映射)
namespace mymuduo {
// 前向声明(避免头文件循环依赖)
class Channel;
class EventLoop;
/**
* Poller 类:事件轮询器的抽象基类
* 核心职责:
* 1. 监听一组 fd 的事件,并返回就绪的 Channel 列表。
* 2. 管理 Channel 的注册、更新和移除。
*/
class Poller : NonCopyable {
public:
using ChannelList = std::vector<Channel *>;
Poller(EventLoop *loop); // 构造函数(需传入所属 EventLoop)
virtual ~Poller() = default; // 虚析构函数(允许派生类重写)
/**
* @brief 轮询事件(纯虚函数,由派生类实现)
* @param timeoutMs 超时时间(毫秒)
* @param activeChannels 输出参数,用于存储就绪的 Channel 列表
* @return 事件到达的时间戳(Timestamp 类型)
*/
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;
virtual void updateChannel(Channel *channel) = 0; // 更新 Channel 监听的事件(纯虚函数)
virtual void removeChannel(Channel *channel) = 0; // 移除 Channel(停止监听其 fd)
bool hasChannel(Channel *channel) const; // 检查 Channel 是否已被注册
// 创建默认的 Poller 实例(根据系统选择 epoll 或 poll),放在一个单独的源文件中实现防止基类包含派生类,头文件循环依赖
static Poller *newDefaultPoller(EventLoop *loop);
protected:
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap channels_; // 所有被监听的 Channel(键为 fd,值为 Channel*)
private:
EventLoop *ownerLoop_; // 所属的 EventLoop(用于检查线程安全性)
};
} // namespace mymuduo
2.2 Poller.cpp
#include "Poller.h"
#include "Channel.h"
using namespace mymuduo;
Poller::Poller(EventLoop *loop)
: ownerLoop_(loop) {}
bool Poller::hasChannel(Channel *channel) const
{
auto it = channels_.find(channel->fd());
return it != channels_.end() && it->second == channel;
}
2.3 EPollPoller.cpp
#include "EPollPoller.h"
#include "Timestamp.h"
#include "LogStream.h"
#include <unistd.h> // 提供 close() 系统调用
#include <cstring> // 提供 memset 函数
using namespace mymuduo;
const int kNew = -1; // 某个channel还没添加至Poller // channel的成员index_初始化为-1
const int kAdded = 1; // 某个channel已经添加至Poller
const int kDeleted = 2; // 某个channel已经从Poller删除
EPollPoller::EPollPoller(EventLoop *loop)
: Poller(loop), // 调用基类构造函数,传入所属的EventLoop
epollfd_(::epoll_create1(EPOLL_CLOEXEC)), // 创建epoll实例(创建epoll树根节点)
// 自动关闭:通过该 epoll创建的文件描述符会在程序调用 exec系列函数(如 execve())执行新程序时自动关闭。
// 安全隔离:防止子进程继承父进程的 epoll文件描述符,避免意外的资源泄漏或竞争条件。
events_(kInitEventListSize) // 初始化事件列表,默认大小16
{
if (epollfd_ < 0) { // 检查epoll创建是否成功
LOG_ERROR << "epoll_create error: " << errno; // 记录错误日志
exit(-1); // 创建失败则退出程序
}
}
EPollPoller::~EPollPoller() {
::close(epollfd_); // 关闭epoll文件描述符
}
// 核心事件轮询函数
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {
// 记录当前管理的文件描述符总数
LOG_DEBUG << "fd total count: " << channels_.size();
// 调用epoll_wait等待事件发生
// &*events_.begin()获取events_底层数组首地址
/*
* @function: int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
* @param: epfd: epoll模型的文件描述符
* @param: evs: 传出参数, 事件结构体数组(指向struct epoll_event结构体)
* @param: maxevents: 数组的大小
* @param: timeout: 超时时间(-1:阻塞、0:非阻塞、>0:毫秒)
* @return: 成功返回就绪的文件描述符数量, -1 失败
*/
int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
// 保存errno,防止被后续操作覆盖
int saveErrno = errno;
// 获取当前时间戳
Timestamp now(Timestamp::now());
if (numEvents > 0) { // 有事件发生
LOG_DEBUG << numEvents << " events happend";
// 填充活跃的Channel列表
fillActiveChannels(numEvents, activeChannels);
// 如果事件数量等于当前容量,扩容一倍
if (numEvents == events_.size()) {
events_.resize(events_.size() * 2);
}
}else if (numEvents == 0) { // 超时
LOG_DEBUG << " timeout!";
}else { // 出错
if (saveErrno != EINTR) { // 不是被信号中断的错误
errno = saveErrno;
LOG_ERROR << "EPollPoller::poll() error!";
}
}
return now; // 返回事件发生的时间戳
}
void EPollPoller::updateChannel(Channel *channel) {
// 获取Channel当前状态
const int index = channel->index();
// 记录日志:文件描述符、关注的事件和当前状态
LOG_INFO << "update fd= " << channel->fd() << " events= " << channel->events() << " index= " << index;
if (index == kNew || index == kDeleted) { // 新Channel或已删除的Channel
if (index == kNew) { // 新Channel
int fd = channel->fd();
channels_[fd] = channel; // 添加到channels_映射表中
}
channel->set_index(kAdded); // 设置状态为已添加
update(EPOLL_CTL_ADD, channel); // 调用epoll_ctl添加监听
}else{ // 已注册的Channel
int fd = channel->fd();
if (channel->isNoneEvent()) { // 不关注任何事件
update(EPOLL_CTL_DEL, channel); // 从epoll中删除监听
channel->set_index(kDeleted); // 设置状态为已删除
}else { // 关注的事件有变化
update(EPOLL_CTL_MOD, channel); // 修改监听的事件
}
}
}
void EPollPoller::removeChannel(Channel *channel){
int fd = channel->fd();
channels_.erase(fd);
LOG_INFO << "remove fd= " << fd;
int index = channel->index();
if (index == kAdded) update(EPOLL_CTL_DEL, channel);
channel->set_index(kNew);
}
void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const{
for (int i = 0; i < numEvents; ++i){
Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
void EPollPoller::update(int operation, Channel *channel){
epoll_event event;
::memset(&event, 0, sizeof(event));
int fd = channel->fd();
event.events = channel->events();
event.data.ptr = channel;
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0){
if (operation == EPOLL_CTL_DEL){
LOG_ERROR << "epoll_ctl del error: " << errno;
}else{
LOG_ERROR << "epoll_ctl add/mod error: " << errno;
exit(-1);
}
}
}
3. 模块详解
3.1 模块交互
3.1.1 Channel 注册流程
+------------+ +------------+ +------------+ +------------+
| Channel | | EventLoop | | Poller | | epoll |
+-----+------+ +-----+------+ +-----+------+ +-----+------+
| | | |
| enableReading() | | |
|----------------->| | |
| | | |
| | updateChannel() | |
| |----------------->| |
| | | |
| | | epoll_ctl(ADD) |
| | |----------------->|
| | | |
| | | 成功 |
| | |<-----------------|
| | | |
3.1.2 事件等待流程
+------------+ +------------+ +------------+
| EventLoop | | Poller | | epoll |
+-----+------+ +-----+------+ +-----+------+
| | |
| poll() | |
|----------------->| |
| | |
| | epoll_wait() |
| |----------------->|
| | |
| | [阻塞等待] |
| | |
| | 事件就绪 |
| |<-----------------|
| | |
| activeChannels | |
|<-----------------| |
| | |
3.2 更新 epoll 所关心的事件
void EPollPoller::updateChannel(Channel *channel) {
// 1. 获取 Channel 当前记录的状态
const int index = channel->index();
LOG_INFO << "update fd= " << channel->fd()
<< " events= " << channel->events()
<< " index= " << index;
// 2. 分支 A:新 Channel 或 已删除的 Channel
if (index == kNew || index == kDeleted) {
if (index == kNew) {
// 如果是全新的,加入本地map,方便后续查找
int fd = channel->fd();
channels_[fd] = channel;
}
// 状态更新为"已添加"
channel->set_index(kAdded);
// 调用内核:ADD 操作
update(EPOLL_CTL_ADD, channel);
}
// 3. 分支 B:已经在 epoll 中的 Channel
else {
int fd = channel->fd();
if (channel->isNoneEvent()) {
// 情况 B1:Channel 不关心任何事件了(如连接关闭)
// 调用内核:DEL 操作
update(EPOLL_CTL_DEL, channel);
// 状态更新为"已删除"
channel->set_index(kDeleted);
} else {
// 情况 B2:还关心事件,只是类型变了(如从读改为读写)
// 调用内核:MOD 操作
update(EPOLL_CTL_MOD, channel);
}
}
}
void EPollPoller::update(int operation, Channel *channel){
epoll_event event;
::memset(&event, 0, sizeof(event)); // 1. 清零,避免垃圾数据
int fd = channel->fd();
// 2. 设置关注的事件类型(如 EPOLLIN | EPOLLOUT)
event.events = channel->events();
// 3. 【关键】把 Channel 指针存进去,回调时能取回
event.data.ptr = channel;
// 4. 调用系统函数
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0){
if (operation == EPOLL_CTL_DEL){
// 删除失败通常不致命(可能 fd 已经关了)
LOG_ERROR << "epoll_ctl del error: " << errno;
}else{
// 添加/修改失败是致命错误,程序无法继续
LOG_ERROR << "epoll_ctl add/mod error: " << errno;
exit(-1);
}
}
}
updateChannel:根据 Channel 的当前状态,决定调用哪种 epoll_ctl 操作。
update:不关心状态,只负责把参数传给内核。
|
阶段 |
触发者 |
调用链 |
|
内核状态 |
|---|---|---|---|---|
|
1. 连接建立 |
TcpConnection |
enableReading() → updateChannel() |
发现 index_=kNew → 执行 EPOLL_CTL_ADD |
开始监听读 |
|
2. 数据发送 |
TcpConnection |
enableWriting() → updateChannel() |
发现 index_=kAdded → 执行 EPOLL_CTL_MOD |
监听读 + 写 |
|
3. 发送完成 |
TcpConnection |
disableWriting() → updateChannel() |
发现 index_=kAdded → 执行 EPOLL_CTL_MOD |
只监听读 |
|
4. 连接关闭 |
TcpConnection |
close() → updateChannel() |
发现 isNoneEvent() → 执行 EPOLL_CTL_DEL |
停止监听 |
|
5. 对象销毁 |
EventLoop |
removeChannel() |
从map erase,重置 index_=kNew |
清理完毕 |
3.3 事件等待 (poll) 与列表填充
- poll:负责 等(阻塞等待内核通知)。
- fillActiveChannels:负责 收(把内核通知的事件打包好,交给上层处理)。
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {
// 1. 调用内核函数,阻塞等待事件
int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
// 2. 保存 errno,防止被后续日志打印覆盖
int saveErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0) {
// 3. 有事件发生,填充 activeChannels 列表
fillActiveChannels(numEvents, activeChannels);
// 4. 如果事件满了,说明容量不够,扩容一倍(动态适应高并发)
if (numEvents == events_.size()) {
events_.resize(events_.size() * 2);
}
} else if (numEvents == 0) {
// 超时,正常现象
} else {
// 出错了
if (saveErrno != EINTR) { // 如果不是被信号中断,则是真错误
LOG_ERROR << "EPollPoller::poll() error!";
}
}
return now;
}
void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const{
for (int i = 0; i < numEvents; ++i){
// 1. 从内核返回的事件结构中取出我们之前存的 Channel 指针
Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
// 2. 把内核返回的事件类型(如 EPOLLIN)告诉 Channel
channel->set_revents(events_[i].events);
// 3. 加入活跃列表,稍后 EventLoop 会处理它们
activeChannels->push_back(channel);
}
}
poll 唯一作用:调用 epoll_wait,通知对应的 fd 的 Channel,获取就绪事件,并记录时间。
fillActiveChannels:获取内核返回的活跃的 fd,获取对应的 Channel 并通知 revents_ 给对应的 Channel
4. 全流程
模拟一次 客户端发送数据 的全过程:
- 初始化:
- EventLoop 创建 EPollPoller。
- TcpConnection 创建 Channel (fd=5)。
- Channel 初始 index_ = kNew。
- 注册监听:
- TcpConnection 调用 channel->enableReading() 使能监听读事件。
- Channel::update() -> EventLoop::updateChannel() -> Poller::updateChannel()。Channel 调用 epoll 的封装类 Poller 实现监听读事件
- EPollPoller 发现 index_ == kNew。
- 调用 epoll_ctl(ADD, fd=5, ptr=&channel)。
- 设置 channel->index_ = kAdded。
- 事件循环:
- EventLoop::loop() 调用 poller->poll()。
- epoll_wait 阻塞。
- 事件到达:
- 客户端发送数据,内核唤醒 epoll_wait。
- epoll_wait 返回 1,events_[0].data.ptr 指向 &channel,events_[0].events 是 EPOLLIN。
- fillActiveChannels 把 channel 放入 activeChannels 列表,并通知 Channel。
- channel->set_revents(EPOLLIN),Channel 完成事件设置。
- 回调处理:
- EventLoop 遍历 activeChannels,调用 channel->handleEvent()。
- Channel 检查 revents_ 是读事件,调用 readCallback_。
- readCallback_ 实际上是 TcpConnection::handleRead。
- 数据被读取,业务逻辑执行。
更多推荐

所有评论(0)