手写Muduo网络库核心代码3--详解CurrentThread类、Eventloop类
为了保证线程安全和事件处理的正确性,必须确保每个 Channel 的事件只能由其所属的 subReactor 所在线的线程来处理。即Muduo 通过 mainReactor 监听新连接,并将连接分发给多个 subReactor,每个 subReactor 在独立线程中处理其 Channel 的 I/O 事件。为了确保事件只能由所属线程处理,必须借助 CurrentThread 获取线程身份,实现线
CurrentThread
Muduo 网络库采用 Reactor 多线程模型,通常由一个 mainReactor 和多个 subReactors 构成:
- mainReactor 运行在 I/O 主线程,负责监听 accept 新的客户端连接。
- 当有新连接到来时,mainReactor 会通过负载均衡策略(如轮询)将该连接分发给某个 subReactor。
- 每个 subReactor 运行在独立的 I/O 工作线程 中,负责管理一组 Channel,监听并处理这些 Channel 上的 I/O 事件。
为了保证线程安全和事件处理的正确性,必须确保每个 Channel 的事件只能由其所属的 subReactor 所在线的线程来处理。这就要求:
- 每个 subReactor(即每个 I/O 线程)必须能够识别自己所在线程的身份;
- 在跨线程调度任务时(如从其他线程向某个 subReactor 投递任务),必须能定位到目标 subReactor 的线程。
为此,Muduo 提供了 CurrentThread 工具类,用于:
- 获取当前线程的唯一 ID(tid);
- 判断当前代码是否运行在某个特定的 EventLoop(即 subReactor)所在线程中;
- 实现线程安全的“谁创建,谁处理”原则。
即Muduo 通过 mainReactor 监听新连接,并将连接分发给多个 subReactor,每个 subReactor 在独立线程中处理其 Channel 的 I/O 事件。为了确保事件只能由所属线程处理,必须借助 CurrentThread 获取线程身份,实现线程绑定与安全调度。
头文件
#pragma once
#include <unistd.h>
#include <sys/syscall.h>
namespace CurrentThread
{
extern __thread int t_cachedTid;
void cacheTid();
inline int tid()
{
if(__builtin_expect(t_cachedTid==0,0))//先看 t_cachedTid 有没有缓存过(是否为 0)。
{
cacheTid();
}
return t_cachedTid;
}
}
extern 表示:“这个变量的定义在别处”。它只是一个声明,不是定义。告诉编译器:“别担心,链接时会找到它的定义”。
__thread 这是关键!它表示这个变量是 线程局部存储。意思就是一个变量在同一个进程内的多个线程中,每个线程都有自己的副本,每个线程修改这个变量都不会影响到其他的线程。
所以 extern __thread int t_cachedTid; 的意思是定义一个整数变量,名字叫 t_cachedTid,且这个变量是线程局部存储的。所以这个变量用来缓存当前的线程的ID。
__builtin_expect(t_cachedTid == 0, 0):GCC 编译器提供的内置函数(built-in function)。它的作用是:告诉编译器“你期望 expression 的值等于 expected_value”。这是一种分支预测优化。
也就是说:
“我告诉你编译器:t_cachedTid == 0 这个条件几乎总是 false,也就是说,t_cachedTid 通常已经有值了,不需要调用 cacheTid()。”即兄弟,t_cachedTid == 0 这个条件几乎不会成立,你放心大胆地假设它为 false,直接跳过 cacheTid()。”
为什么要有这个?
现代 CPU 是“流水线”工作的,它会提前猜测接下来要执行哪条指令,提前加载和执行。
- 如果猜对了 → 极快
- 如果猜错了 → 流水线清空,性能损失巨大
所以,让 CPU “猜得准” 非常重要。
源文件
调用系统接口获取线程ID.
#include"CurrentThread.h"
namespace CurrentThread
{
__thread int t_cachedTid=0;
void cacheTid()
{
if(t_cachedTid==0)
{
t_cachedTid=static_cast<pid_t>(syscall(SYS_gettid));//通过系统调用拿到当前线程id
}
}
}
EventLoop
EventLoop模块是一个事件循环的实现。它在一个线程中运行,不断地:
- 等待:使用操作系统提供的I/O多路复用机制(如 epoll poll 等)等待文件描述符上的事件发生(如可读、可写、错误)。
- 分发:当事件发生时,EventLoop 分发这些事件到预先注册的处理函数(回调函数)。
- 执行:调用相应的回调函数来处理这些I/O事件。
可以把它想象成一个永不结束的 while 循环
主要参与
- I/O 事件循环:
- 与 EPollPoller)模块协作,注册、修改、删除对文件描述符上感兴趣的事件(读、写等)。
- 在循环中调用 Poller 的 poll() 方法,获取当前活跃的事件。
- 将活跃的事件分发给对应的 Channel 对象进行处理。
- 事件分发:
- EventLoop 本身不直接处理事件细节,它将事件分发给与 fd 关联的 Channel 对象。
- Channel 对象保存了 fd、感兴趣的事件、以及对应的回调函数(如 readCallback, writeCallback)。EventLoop 通知 Channel 有事件发生,Channel 再调用相应的回调。
- 跨线程任务执行(线程安全的事件分发):
- 提供 runInLoop 和 queueInLoop 方法。
- 允许其他线程向 EventLoop 所在线程提交任务(函数对象)。
- 当任务被提交时,如果不在 EventLoop 线程,会通过 wakeUp 机制(通常是写一个字节到 eventfd 或 pipe)唤醒阻塞的 poll 调用,然后在下一次循环中执行这些任务。这是实现线程安全通信的关键。
所以说 时间循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
头文件
#pragma once
//事件循环类,主要包含两个大模块,一个是channel一个是poller(poll和epoll的抽象类)
#include<functional>
#include<vector>
#include<atomic>
#include"noncopyable.h"
#include"Timestamp.h"
#include"CurrentThread.h"
#include<memory>
#include<mutex>
class Channel;
class Poller;
//EventLoop 是与线程强绑定的,每个线程最多一个 EventLoop。
//如果允许拷贝,会导致多个对象指向同一个事件循环,引发资源竞争和逻辑混乱。
class Eventloop:noncopyable
{
public:
using Functor=std::function<void()>;
Eventloop();
~Eventloop();
void loop();//开启事件循环
void quit();//退出事件循环
Timestamp pollReturnTime() const {return pollReturnTime_;}
void runInLoop(Functor cb);//在当前loop中执行cb
void queueInLoop(Functor cb);//把cb放进队列中,唤醒loop所在线程执行cb
void wakeup();//用来唤醒loop所在线程的
//下面这三个EventLoop的方法,用来调用poller的方法
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
//判断Eventloop对象是否在自己的线程里面
bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}
private:
void handleRead();
void doPendingFunctors();
using ChannelList=std::vector<Channel*>;
std::atomic_bool looping_;//原子操作,通过CAS实现的
std::atomic_bool quit_;//标识退出loop循环(客户端断开之后,关闭事件循环),从boost库转成C++,实现更好地跨平台
const pid_t threadId_;//记录当前loop所在的线程id
Timestamp pollReturnTime_;//poller返回发生事件的channels的时间点
std::unique_ptr<Poller> poller_;
int wakeupFd_;//主要作用,当mainloop获取一个新用户的channel,通过轮循算法选择一个subloop,通过该成员唤醒subloop处理channel
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
std::atomic_bool callingPendingFunctors_;//标识当前loop是否有需要回执的回调操作
std::vector<Functor> pendingFunctors_;//存储loop需要执行的所有回调操作
std::mutex mutex_;//互斥锁,保护上面vector容器的线程安全操作
};
成员函数
1、提前声明
class Channel;
class Poller;
告诉编译器 Channel 和 Poller 是类名,但不包含它们的完整定义。
这样会减少头文件依赖,提高编译速度。因为 Eventloop 中只使用了 Channel* 指针和 Poller* 指针,不需要知道它们的完整结构。完整定义会在 .cpp 文件中通过 #include "Channel.h" 和 #include "Poller.h" 引入。
2、启动/退出事件循环
void loop();
void quit();
启动事件循环。这是一个核心方法,会进入一个无限循环,所有 I/O 事件、用户任务都在这个循环中被处理。
客户端断开连接后,可能需要关闭对应的 EventLoop。设置 quit_ = true,通知 loop() 循环退出。
3、获取最近一次 poll 调用返回的时间点。
Timestamp pollReturnTime() const {return pollReturnTime_;}
用于计算事件处理延迟、定时器精度、调试性能问题。例如:某个定时器应该在 10ms 后触发,但 poll 阻塞了 20ms,说明系统负载高。
4、线程函数
void runInLoop(Functor cb); // 在当前loop中执行cb
void queueInLoop(Functor cb); // 把cb放进队列中
提供一种线程安全的方式,在正确的线程中执行代码。比如前面提到的Muduo库的设计原则就是 one loop per thread ,如果调用者在当前 Loop 所在线程,则立即执行 cb;否则,将 cb 放入任务队列(queueInLoop)。
比如我们要实现跨线程任务提交。 TcpServer 在 mainLoop 接受到新连接,需要让某个 subLoop 处理这个连接,就要通过 queueInLoop 把 TcpConnection 的初始化任务发过去。
将回调 cb 加入 pendingFunctors_ 队列,并通过 wakeup() 唤醒 EventLoop 所在线程(如果它正在 poll 阻塞)。
5、线程唤醒
void wakeup();//用来唤醒loop所在线程的
像前面说的一样,poll/epoll 可能长时间阻塞等待事件。如果另一个线程向它提交了任务(queueInLoop),必须有一种机制“唤醒”它,否则任务会被延迟很久。
我们向 wakeupFd_ 写一个字节,打破 poll/epoll 的阻塞状态,让 EventLoop 能够处理新加入的任务。
6、Channel管理代理
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
这三个方法是 EventLoop 对外提供的接口,用于管理 Channel:
- updateChannel: 通知 Poller 更新某个 Channel 的关注事件(如从只读变为可写)。
- removeChannel: 通知 Poller 移除对某个 Channel 的监听。
- hasChannel: 检查某个 Channel 是否已被管理。
Channel 不能直接操作 Poller,它通过 EventLoop 作为中介来注册/注销事件。这是典型的中介者模式,保证了模块间的解耦。
7、判断Eventloop对象是否在自己的线程里面
bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}
和上面的线程函数联动使用,Muduo 遵循 "one loop per thread" 模型,即每个线程最多一个 EventLoop,所有操作必须在该线程中执行。这个函数用于断言(assertInLoopThread())防止误用。
8、处理唤醒事件
void handleRead();
专门处理 wakeupFd_ 上的读事件(读取一个字节,清空缓冲区)。
9、执行待处理的回调
void doPendingFunctors();
遍历 pendingFunctors_ 队列,执行所有待处理的回调。
成员变量
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;
wakeupFd_: 一个特殊的文件描述符(eventfd 或 pipe 的写端),用于线程间通信。
wakeupChannel_: 包装 wakeupFd_ 的 Channel,监听其可读事件,回调为 handleRead。
当其他线程调用 queueInLoop 时,会向 wakeupFd_ 写数据,触发 wakeupChannel_ 的读事件,从而唤醒 EventLoop。
这里使用unique_ptr::使用智能指针方便管理且wakeupChannel_ 是 EventLoop 独有的资源。它只属于这一个 EventLoop 实例,不会被其他对象共享
std::atomic_bool callingPendingFunctors_;
表示是否正在执行 pendingFunctors_ 队列。用于控制是否需要加锁。原子类型保证了在多线程的情况下,该变量的安全性。
std::vector<Functor> pendingFunctors_;
std::mutex mutex_;
- pendingFunctors_: 存放其他线程通过 queueInLoop 提交的任务。
- mutex_: 保护 pendingFunctors_ 的线程安全(因为多个线程可能同时添加任务)。
- queueInLoop 时需要加锁;doPendingFunctors 时会交换向量内容以减少锁持有时间。
源文件
#include"Eventloop.h"
#include"Logger.h"
#include"Poller.h"
#include"Channel.h"
#include<sys/eventfd.h>
#include<unistd.h>
#include<fcntl.h>
#include<errno.h>
#include<memory>
//防止一个线程创建多个EventLoop对象
__thread Eventloop *t_loopInThisThread=nullptr;
//定义默认的Poller,IO复用接口的超时时间
const int kPollTimeMs=10000;
int createEventfd()//创建wakeupfd用来通知睡觉的subreactor起来处理新来的channel
{
int evfd=::eventfd(0,EFD_NONBLOCK | EFD_CLOEXEC);
if(evfd<0)
{
LOG_ERROR("eventfd error:%d \n",errno);
}
return evfd;
}
Eventloop::Eventloop()
:looping_(false)
,quit_(false)
,callingPendingFunctors_(false)
,threadId_(CurrentThread::tid())
,poller_(Poller::newDefaultPoller(this))
,wakeupFd_(createEventfd())
,wakeupChannel_(new Channel(this,wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n",this,threadId_);
if(t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d \n",t_loopInThisThread,threadId_);
}
else
{
t_loopInThisThread=this;
}
//设置wakeupfd_的事件类型以及发生该事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&Eventloop::handleRead,this));
//每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了
wakeupChannel_->enableReading();
}
Eventloop::~Eventloop()
{
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread=nullptr;
}
//开启事件循环,相当于调度底层poller开始事件分发器,开始监听事件
void Eventloop::loop()
{
looping_=true;
quit_=false;
LOG_INFO("Eventloop &p start looping \n",this);
while(!quit_)
{
activeChannels_.clear();
pollReturnTime_=poller_->poll(kPollTimeMs,&activeChannels_);
for(Channel *channel:activeChannels_)//遍历发生事件的channel
{
//Poller能监听到哪些channel发生事件了,然后上报给Eventloop,然后通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
//执行当前Eventloop事件循环需要处理的回调操作
doPendingFunctors();
}
LOG_INFO("Eventloop %p stop looping \n",this);
looping_=false;
}
void Eventloop::quit()
{
quit_=true;
if(!isInLoopThread())
{
wakeup();//怕在其他线程中quit
}
}
void Eventloop::runInLoop(Functor cb)
{
if(isInLoopThread())//在当前的loop线程中执行cb
{
cb();
}
else//在非当前线程中执行cb,就需要唤醒loop所在线程
{
queueInLoop(cb);
}
}
void Eventloop::queueInLoop(Functor cb)
{
std::unique_lock<std::mutex>lock(mutex_);
pendingFunctors_.emplace_back(cb);
//唤醒相应的,需要执行上面回调操作的loop的线程了,第二个是当前loop在执行回调,但是loop又有了新的回调
if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup();//唤醒loop所在线程
}
}
void Eventloop::handleRead()
{
uint64_t one =1;
ssize_t n=read(wakeupFd_,&one,sizeof one);
if(n!=sizeof one)
{
LOG_ERROR("Eventloop::handleRead() reads %d bytes instead of 8",n);
}
}
void Eventloop::wakeup()//用来唤醒loop所在线程的,向wakeupfd_写一个数据,wakeupchannel就发生读事件,当前loop线程就会唤醒
{
uint64_t one=1;
ssize_t n=write(wakeupFd_,&one,sizeof one);
if(n!=sizeof one)
{
LOG_ERROR("Eventloop::wakeup() writes %lu bytes instead of 8\n",n);
}
}
//下面这三个EventLoop的方法,用来调用poller的方法
void Eventloop::updateChannel(Channel* channel)
{
poller_->updateChannel(channel);
}
void Eventloop::removeChannel(Channel* channel)
{
poller_->removeChannel(channel);
}
bool Eventloop::hasChannel(Channel* channel)
{
return poller_->hasChannel(channel);
}
void Eventloop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_=true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor &functor:functors)
{
functor();//执行当前loop需要执行的回调
}
callingPendingFunctors_=false;
}
1、创建描述符用于线程间唤醒
int createEventfd()
{
int evfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evfd < 0)
{
LOG_ERROR("eventfd error:%d \n", errno);
}
return evfd;
}
系统调用介绍:
eventfd 是 Linux 的一个系统调。它创建一个“事件文件描述符”,内部维护一个 64 位无符号整数计数器(uint64_t),用于传递事件或信号。
- 本质:一个内核对象,通过文件描述符暴露给用户空间。
- 用途:主要用于线程间或进程间的事件通知,比如唤醒等待线程、传递信号量式事件。
#include <sys/eventfd.h> int eventfd(unsigned int initval, int flags);
initval:初始值(通常为 0)。
flags:可选标志位,常见如下:
- EFD_CLOEXEC:执行 exec 时自动关闭(推荐使用)。
- EFD_NONBLOCK:设置为非阻塞模式。
- EFD_SEMAPHORE:以信号量语义读取(每次读取减 1,而不是清零)。
返回值:成功返回文件描述符,失败返回 -1。
写操作
//向 eventfd 写入一个 uint64_t 类型的值 uint64_t val = 1; write(eventfd_fd, &val, sizeof(uint64_t));
写入的值会加到内部计数器上。
写操作会使 eventfd 变为“就绪”状态,触发 epoll_wait 等待的线程。读操作
//从 eventfd 读取一个 uint64_t 值 uint64_t val; read(eventfd_fd, &val, sizeof(uint64_t));
读取后,内部计数器被清零(除非使用 EFD_SEMAPHORE 模式)。
在 EFD_SEMAPHORE 模式下,每次只减 1。
2、构造函数
Eventloop::Eventloop()
:looping_(false),
quit_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),//记录创建线程ID(用于线程安全检查)
poller_(Poller::newDefaultPoller(this)),//创建默认的 Poller 实现(epoll/poll)
wakeupFd_(createEventfd()),//创建唤醒文件描述符
wakeupChannel_(new Channel(this, wakeupFd_)) //将唤醒fd封装为Channel
{
// 线程唯一性检查
if(t_loopInThisThread) {
LOG_FATAL("Another EventLoop exists...");
}
else {
t_loopInThisThread = this;
}
// 设置唤醒通道
wakeupChannel_->setReadCallback(std::bind(&Eventloop::handleRead, this));
wakeupChannel_->enableReading();
}
线程唯一性检查是为了防止一个线程有多个EventLoop
- 检查 t_loopInThisThread 是否已存在。
- 如果存在,说明当前线程已经有了一个 EventLoop,这是严重错误,直接 FATAL 终止程序。
- 否则,把自己注册为当前线程的 EventLoop。
设置换新那个机制的回调
- setReadCallback:当 wakeupFd_ 可读时,调用 handleRead()。
- enableReading():将 wakeupChannel_ 的读事件(EPOLLIN)注册到 Poller 中。
这样,一旦有人向 wakeupFd_ 写数据,poller_->poll() 就会返回,触发 handleRead。
3、析构函数
Eventloop::~Eventloop()
{
wakeupChannel_->disableAll(); // 停止监听所有事件
wakeupChannel_->remove(); // 从 Poller 中移除
::close(wakeupFd_); // 关闭文件描述符
t_loopInThisThread = nullptr; // 解除线程绑定
}
- 必须先 disableAll 和 remove,否则 Poller 还持有 Channel 指针,可能导致野指针。
- 关闭 wakeupFd_,释放系统资源。
- 将 t_loopInThisThread 置空,允许该线程未来再创建新的 EventLoop(虽然通常不会)。
4、事件循环核心loop()
void Eventloop::loop()
{
looping_=true;//事件循环已经开始运行
quit_=false;//重置退出标志。确保在调用 loop() 前,退出状态是“未请求退出”。quit() 方法会设置 quit_ = true,这里是为了“重新开始”。
LOG_INFO("Eventloop &p start looping \n",this);
while(!quit_)
{
activeChannels_.clear();
pollReturnTime_=poller_->poll(kPollTimeMs,&activeChannels_);
for(Channel *channel:activeChannels_)//遍历发生事件的channel
{
//Poller能监听到哪些channel发生事件了,然后上报给Eventloop,然后通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
//执行当前Eventloop事件循环需要处理的回调操作
doPendingFunctors();
}
LOG_INFO("Eventloop %p stop looping \n",this);
looping_=false;
}
开启事件循环,相当于调度底层poller开始事件分发器,开始监听事件。
pollReturnTime_=poller_->poll(kPollTimeMs,&activeChannels_);
这段代码调用的就是EPollPoller中的poll方法(该方法是等待事件发生后将发生事件的channel添加到活跃channel列表activeChannels_中)
for (Channel *channel : activeChannels_)
{
channel->handleEvent(pollReturnTime_);
}
接着遍历所有“活跃”的 Channel(即发生了 I/O 事件的 socket),让它们处理自己的事件。
doPendingFunctors();
接着执行通过 queueInLoop() 提交的跨线程任务。
每个线程有一个 EventLoop。
所有 I/O 操作(读、写、连接、关闭)必须在 对应的 EventLoop 所在线程 中执行。
如果你在线程 A 想操作线程 B 的连接,你不能直接调用,必须通过 queueInLoop 把任务“投递”过去。
这就是 runInLoop / queueInLoop + wakeup 机制的意义:实现线程安全的异步调用。
我们下面看看这个函数是怎么实现的
void Eventloop::doPendingFunctors() { std::vector<Functor> functors;////创建一个局部变量 functors,用来临时存放要执行的任务。 callingPendingFunctors_=true;//表示正在执行doPendingFunctors() { std::unique_lock<std::mutex> lock(mutex_); functors.swap(pendingFunctors_); }//这个大括号限定作用域,由智能指针管理的锁出了这个作用域之后立马析构 for(const Functor &functor:functors) { functor();//执行当前loop需要执行的回调 } callingPendingFunctors_=false; }
对代码的解释:
这个公共区域 pendingFunctors_ 可能会有其他线程一直往里面添加函数,如果我直接拿这个数组来进行执行回调,会阻塞住其他线程向这个数组的写入,所以我每次拿一个空的数组functors直接与pendingFunctors_这个数组的资源进行置换,然后去执行我这个functors数组中置换过来的任务,也不会影响到其他线程对这个数组的写入,std::unique_lock<std::mutex> lock(mutex_);这个锁的作用也就是为了防止我在进行资源置换的时候其他线程写入导致的线程安全问题。
callingPendingFunctors_=true;这是为了配合 queueInLoop() 中的一个关键判断,在queueInLoop()中需要判断是否唤醒线程,条件就是当Eventloop不在当前的线程中或者是正在执行doPendingFunctors()时,都唤醒,前者不必多说,后者是一个优化设计,就是如果我当前在执行置换过来的functors数组中的任务的时(还没执行完,callingPendingFunctors_=true),此时又有新的任务被添加到了pendingFunctors_这个数组中,如果我在queueInLoop()中唤醒了线程(),给正在忙碌的线程‘埋下一个伏笔’:也就是说 “等你忙完,你会去 poll,那时我会让你发现有个事件(wakeupFd 可读),让你的 poll 立刻返回,从而无缝进入下一轮循环处理新任务”。如果不唤醒的话,epoll就会阻塞直到下一次有channel上的事件发生才会重新进入一个事件循环,才会再次处理这个doPendingFunctors(),这样就会导致效率低下。
5、退出事件循环
void Eventloop::quit()
{
quit_=true;
if(!isInLoopThread())
{
wakeup();
}
}
首先,设置退出标志,它的作用是 请求退出,但不会立即中断当前正在执行的任务,后面在loop()中有while(!quit)进行检查。
判断是否在非本线程调用,因为 quit() 可能被任何线程调用,而 EventLoop 的主循环可能正在 poll() 阻塞,不知道有人想让它退出。
跨线程调用时唤醒,打破 poll() 的阻塞,让 loop() 能尽快检查 quit_ 标志
6、runInLoop
void Eventloop::runInLoop(Functor cb)
{
if(isInLoopThread())//在当前的loop线程中执行cb
{
cb();
}
else//在非当前线程中执行cb,就需要唤醒loop所在线程
{
queueInLoop(cb);
}
}
如果当前就是 EventLoop 线程:直接调用 cb(),无需任何同步。因为 EventLoop 线程是“主人”,它有权直接执行任务。
如果在其他线程(比如主线程、IO 线程、定时器线程):不能直接执行 cb(),因为:
cb 可能操作 EventLoop 相关资源(如 Channel、TimerQueue)。直接执行会导致数据竞争(data race)。所以必须把任务交给 EventLoop 线程自己执行。
这里也是one loop per thread模型的体现
7、queueInLoop()
void Eventloop::queueInLoop(Functor cb)
{
std::unique_lock<std::mutex>lock(mutex_);
pendingFunctors_.emplace_back(cb);
//唤醒相应的,需要执行上面回调操作的loop的线程了,第二个是当前loop在执行回调,但是loop又有了新的回调
if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup();//唤醒loop所在线程
}
}
pendingFunctors_ 是一个被多线程共享的 std::vector<Functor>。必须加锁,防止多个线程同时 push_back 导致数据竞争。
下面的判断这里很关键!!!我已经在上面的loop()函数中解释了。
8、唤醒机制
void EventLoop::wakeup()//发送唤醒信号
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8\n", n);
}
}
void EventLoop::handleRead()//处理唤醒信号
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);///读取8字节
if (n != sizeof one)//正常情况下就是8
{
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8", n);
}
}
handleRead() 的本质:消费一个 I/O 事件,清空缓冲区,即每次写进来的8个字节,每次调用read就会清空,一次 write 对应一次 read,不会累积。
wakeupChannel 是在 EventLoop 构造时就注册到 Poller(如 epoll)里的,只要 wakeupFd_ 可读,epoll_wait 就会返回,并自动调用 wakeupChannel 的回调函数 handleRead。
9、转发函数
void Eventloop::updateChannel(Channel* channel)
{
poller_->updateChannel(channel);
}
void Eventloop::removeChannel(Channel* channel)
{
poller_->removeChannel(channel);
}
bool Eventloop::hasChannel(Channel* channel)
{
return poller_->hasChannel(channel);
}
“updateChannel 不是‘调用 epoll_ctl’,而是‘请求 EventLoop 在合适的时间调用 epoll_ctl’。”
“在事件驱动系统中,‘谁调用’比‘调用什么’更重要。”
感谢阅读!
更多推荐
所有评论(0)