muduo网络库4EPollPoller结尾和EventLoop
1.epoll_wait()
poll()函数,负责等待和收集 IO 事件 观察哪一个channel或者说fd上有事件发生了,调用epoll_wait() 方法,将活跃的连接进行添加fillActiveChannels(),
Timestamp EpollPoller::poll(int timeouts,ChannelList* activeChannels){
// 由于频繁调用poll 实际上应该用LOG_DEBUG输出日志更为合理 当遇到并发场景 关闭DEBUG日志提升效率
LOG_DEBUG("func=%s => fd total count:%lu\n", __FUNCTION__, channels_.size());
// numEvents 个文件描述符上有事件发生
int numEvents = ::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeouts);
int saveErrno = errno;
Timestamp now(Timestamp::now());
if(numEvents>0){
LOG_INFO("%d events happend\n", numEvents); // LOG_DEBUG最合理
//添加活跃的连接
fillActiveChannels(numEvents,activeChannels);
//全部活跃了 需要扩容
if(numEvents == events_.size()){
events_.resize(events_.size()*2);
}
}else if(numEvents == 0){
LOG_DEBUG("%s timeout!\n", __FUNCTION__);
}else{
if(saveErrno != EINTR){
errno = saveErrno;
LOG_ERROR("EPollPoller::poll() error!");
}
}
return now;
}
void EpollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const{
for(int i=0;i<numEvents;++i){
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);//EVentLoop就可以拿到他的poller给他返回的所有发生事件的channel列表了
}
}
2.获取线程的PID值
在EpollEvent中,每一个EpollEvent就是一个线程,所以获取EpollEvent对应的线程PID,用系统内部的调用
namespace CurrentThread{
extern __thread int t_cachedTid;
void cacheTid();
inline int tid(){
if(__builtin_expect(t_cachedTid == 0,0)){
cacheTid();
}
return t_cachedTid;
}
}
#include <unistd.h>
#include <sys/syscall.h>
namespace CurrentThread{
__thread int t_cachedTid = 0;
void cacheTid(){
if(t_cachedTid == 0){
//通过linux系统调用获取当前线程的pid值
t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
}
}
}
3.EventLoop模块
* loop()循环中,当有事件发生(包括eventfd可读)时,
* 会调用对应Channel的handleEvent(),而不仅仅是wakeup()
* 唤醒的Channel。wakeup()只是用来唤醒阻塞在poll()上的EventLoop,
* 以便执行pendingFunctors_中的回调。
-
多Reactor模型下的任务调度
1. **主Reactor(mainLoop)接受新连接**:
- 主Reactor通过`accept`系统调用接受新连接,得到connfd。
- 将这个connfd封装成一个Channel,然后创建一个TcpConnection对象。
2. **主Reactor将连接分配给子Reactor(subloop)**:
- 通过轮询算法选择一个子Reactor(subloop),然后将TcpConnection对象交给它。
3. **主Reactor通过`queueInLoop`将任务加入子Reactor的任务队列**:
- 由于子Reactor可能正阻塞在`poller_->poll`上等待事件,所以需要唤醒它。
- `queueInLoop`会先将回调函数(这个回调函数会执行TcpConnection的建立连接等操作)
放入子Reactor的任务队列(`pendingFunctors_`),然后通过`wakeup`唤醒子Reactor(通过向eventfd写数据)。
4. **子Reactor被唤醒后执行`doPendingFunctors`**:
- 子Reactor从`poller_->poll`中返回,然后执行`doPendingFunctors`,从任务队列中取出回调函数并执行。
- 这样,TcpConnection就被转移到子Reactor线程中,后续该连接上的所有I/O事件都由子Reactor处理。
`runInLoop` 会检查线程,如果当前是 EventLoop 线程则立即执行,否则调用 `queueInLoop`。
`queueInLoop` 无论是否在当前线程,都会将回调加入队列,并在必要时唤醒 EventLoop。
class EventLoop:public noncopyable{
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
void loop();//开启事件循环
void quit();//退出事件循环
Timestamp pollReturnTime() const{
return pollReturnTime_;
}
//在当前loop中执行
void runInLoop(Functor cb);
//把cb放入队列中 唤醒loop所在的线程 执行cb
void queueInLoop(Functor cb);
//唤醒subloop所在的线程
void wakeup();
//eventloop的方法 =》 Poller的方法
void updateChannel(Channel *channel);
void removeChannel(Channel *channel);
bool hasChannel(Channel *channel);
//查看当前EventLoop是否在当前线程中
bool isInLoopThread() const{
return threadId_ == CurrentThread::tid();
}
private:
void handleRead();//处理wakeup的
void doPendingFunctors();//执行回调
using ChannelList = std::vector<Channel*>;
std::atomic_bool loopping_;//原子操作 通过CAS实现
std::atomic_bool quit_;// 标识退出loop循环
const pid_t threadId_; //记录当前loop所在的线程id
Timestamp pollReturnTime_;//poller返回发生事件的channels的时间节点
std::unique_ptr<Poller> poller_;
//int eventfd(unsigned int initval, int flags); 通过系统内部的eventfd()唤醒
int wakeupFd_;//当mainloop获取一个新用户的channel时,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
Channel* channelActiveChannel_;
std::atomic_bool callPendingFunctors_;//标识当前loop是否需要执行回调操作
std::vector<Functor> pendingFunctors_; // 存储loop所需要执行的所有回调操作
std::mutex mutex_;//互斥锁 用来保护上面vector容器的线程安全操作
};
-
EventLoop的构造析构函数
创建wakeupfd,用来唤醒subloop
// 防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000; // 10000毫秒 = 10秒钟
int createEventfd(){
int evtfd = ::eventfd(0,EFD_NONBLOCK|EFD_CLOEXEC);
if(evtfd<0){
LOG_FATAL("eventfd error:%d\n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
:looping_(false)
,quit_(false)
,callingPendingFunctors_(false)
,threadId_(CurrentThread::tid())
,poller_(Poller::newDefaultPoller(this))
,wakeupFd_(createEventfd())
,wakeupChannel_(new Channel(this,wakeupFd_)){
LOG_DEBUG("EventLoop created %p in thread %d\n", this, threadId_);
if(t_loopInThisThread){
LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadId_);
}else{
t_loopInThisThread = this;
}
//设置wakeupfd的事件类型以及发生事件后的回调操作
//当eventfd可读时,就会调用EventLoop::handleRead()
wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead,this));
wakeupChannel_->enableReading(); // 每一个EventLoop都将监听wakeupChannel_的EPOLL读事件了
}
EventLoop::~EventLoop(){
wakeupChannel_->disableAll();//给Channel移除所有感兴趣的事件
wakeupChannel_->remove();//把Channel从EventLoop上删除掉
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
-
事件循环
执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:
accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理
mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒
//开启事件循环
void EventLoop::loop(){
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping\n", this);
while (!quit_){
//在每一次事件循环中,poller_->poll会填充activeChannels_
activeChannels_.clear();
//监听两类fd,一种是client的fd,一种是wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs,&activeChannels_);
for(Channel* channel:activeChannels_){
//Poller监听哪些channel发生了事件,然后上报给eventloop,通知channel处理相应的事件
channel->handelEvent(pollReturnTime_);
}
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping.\n", this);
looping_ = false;
}
void EventLoop::quit(){
quit_ = true;
if(!isInLoopThread()){
wakeup();
}
}
- 当前loop中执行
在当前loop中执行,就直接执行回调操作,如果不是,把cb放入队列中,唤醒subloop的线程
//在当前loop中执行
void EventLoop::runInLoop(Functor cb){
if(isInLoopThread()){//在当前的loop线程中 执行cb
cb();
}else{
//在非当前loop中执行cb 需要唤醒loop所在线程,执行cb
queueInLoop(cb);
}
}
//把cb放入队列中 唤醒loop所在的线程 执行cb
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}
//唤醒相应的需要执行上面回调操作的loop的线程了
//callingPendingFunctors_ 当前loop正在执行回调,但是loop有了新的回调
if(!isInLoopThread() || callingPendingFunctors_){
wakeup();//唤醒loop所在线程
}
}
-
唤醒线程
向对应subloop所在的线程向wakeupfd写个数据即可,就会唤醒当前线程
//唤醒subloop所在的线程 向wakeupfd写一个数据就行,就有事件可读,当前loop线程就会被唤醒
void EventLoop::wakeup(){
uint64_t one = 1;
size_t n = write(wakeupFd_,&one,sizeof one);
if(n!=sizeof one){
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n",n);
}
}
-
改变Channel状态
channel需要改变自己的状态 更改或者删除注册自己在poller的状态,但是channel无法主动访问poller,所以通过eventloop进行访问
//eventloop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel *channel){
//channel需要改变自己的状态 更改或者删除注册自己在poller的状态,但是channel无法主动访问poller,所以通过eventloop进行访问
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel){
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel){
poller_->hasChannel(channel);
}
-
处理读
在构造函数中设置了wakeupFd_的事件类型以及发生事件后的回调操作
void EventLoop::handleRead(){
uint64_t one = 1;
ssize_t n = read(wakeupFd_,&one,sizeof one);
if(n!=sizeof(one)){
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\n", n);
}
}
-
执行回调
当subloop在执行回调操作的时候,是原子操作,如果数量较多,这时mainloop有新的回调函数需要添加进subloop,就会阻塞,造成时延 所以创建一个局部的functors,和pendingFunctors_进行交换这时pendingFunctors_就会被解放
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> locck(mutex_);
functors.swap(pendingFunctors_);
}
for(const Functor &functor:functors){
functor();//执行当前loop所需要执行的回调操作
}
callingPendingFunctors_ =false;
}
更多推荐




所有评论(0)