1.epoll_wait()

poll()函数,负责等待和收集 IO 事件  观察哪一个channel或者说fd上有事件发生了,调用epoll_wait() 方法,将活跃的连接进行添加fillActiveChannels(),

 Timestamp EpollPoller::poll(int timeouts,ChannelList* activeChannels){
    // 由于频繁调用poll 实际上应该用LOG_DEBUG输出日志更为合理 当遇到并发场景 关闭DEBUG日志提升效率
    LOG_DEBUG("func=%s => fd total count:%lu\n", __FUNCTION__, channels_.size());
    // numEvents 个文件描述符上有事件发生
    int numEvents = ::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeouts);
    
    int saveErrno = errno;

    Timestamp now(Timestamp::now());

    if(numEvents>0){
        LOG_INFO("%d events happend\n", numEvents); // LOG_DEBUG最合理

        //添加活跃的连接
        fillActiveChannels(numEvents,activeChannels);

        //全部活跃了  需要扩容
        if(numEvents == events_.size()){
            events_.resize(events_.size()*2);
        }
    }else if(numEvents == 0){
        LOG_DEBUG("%s timeout!\n", __FUNCTION__);
    }else{
        if(saveErrno != EINTR){
            errno = saveErrno;
            LOG_ERROR("EPollPoller::poll() error!");
        }
    }

    return now;
}
void EpollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const{
     for(int i=0;i<numEvents;++i){
        Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);//EVentLoop就可以拿到他的poller给他返回的所有发生事件的channel列表了
     }
}

2.获取线程的PID值

在EpollEvent中,每一个EpollEvent就是一个线程,所以获取EpollEvent对应的线程PID,用系统内部的调用

namespace CurrentThread{
    extern __thread int t_cachedTid;

    void cacheTid();

    inline int tid(){
        if(__builtin_expect(t_cachedTid == 0,0)){
            cacheTid();
        }
        return t_cachedTid;
    }
}
#include <unistd.h>
#include <sys/syscall.h>
namespace CurrentThread{
    __thread int t_cachedTid = 0;

    void cacheTid(){
        if(t_cachedTid == 0){
            //通过linux系统调用获取当前线程的pid值
            t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
        }
    }
}

3.EventLoop模块

 * loop()循环中,当有事件发生(包括eventfd可读)时,

 * 会调用对应Channel的handleEvent(),而不仅仅是wakeup()

 * 唤醒的Channel。wakeup()只是用来唤醒阻塞在poll()上的EventLoop,

 * 以便执行pendingFunctors_中的回调。

  • 多Reactor模型下的任务调度

1. **主Reactor(mainLoop)接受新连接**:

   - 主Reactor通过`accept`系统调用接受新连接,得到connfd。

   - 将这个connfd封装成一个Channel,然后创建一个TcpConnection对象。

2. **主Reactor将连接分配给子Reactor(subloop)**:

   - 通过轮询算法选择一个子Reactor(subloop),然后将TcpConnection对象交给它。

3. **主Reactor通过`queueInLoop`将任务加入子Reactor的任务队列**:

   - 由于子Reactor可能正阻塞在`poller_->poll`上等待事件,所以需要唤醒它。

   - `queueInLoop`会先将回调函数(这个回调函数会执行TcpConnection的建立连接等操作)

   放入子Reactor的任务队列(`pendingFunctors_`),然后通过`wakeup`唤醒子Reactor(通过向eventfd写数据)。

4. **子Reactor被唤醒后执行`doPendingFunctors`**:

   - 子Reactor从`poller_->poll`中返回,然后执行`doPendingFunctors`,从任务队列中取出回调函数并执行。

   - 这样,TcpConnection就被转移到子Reactor线程中,后续该连接上的所有I/O事件都由子Reactor处理。

   `runInLoop` 会检查线程,如果当前是 EventLoop 线程则立即执行,否则调用 `queueInLoop`。

   `queueInLoop` 无论是否在当前线程,都会将回调加入队列,并在必要时唤醒 EventLoop。

class EventLoop:public noncopyable{
public:
    using Functor = std::function<void()>;

    EventLoop();
    ~EventLoop();

    void loop();//开启事件循环
    void quit();//退出事件循环

    Timestamp pollReturnTime() const{
        return pollReturnTime_;
    }

    //在当前loop中执行
    void runInLoop(Functor cb);
    //把cb放入队列中 唤醒loop所在的线程 执行cb
    void queueInLoop(Functor cb);

    //唤醒subloop所在的线程
    void wakeup();

    //eventloop的方法 =》 Poller的方法
    void updateChannel(Channel *channel);
    void removeChannel(Channel *channel);
    bool hasChannel(Channel *channel);

    //查看当前EventLoop是否在当前线程中
    bool isInLoopThread() const{
        return threadId_ == CurrentThread::tid();
    }
private:

    void handleRead();//处理wakeup的
    void doPendingFunctors();//执行回调

    using ChannelList = std::vector<Channel*>;

    std::atomic_bool loopping_;//原子操作 通过CAS实现
    std::atomic_bool quit_;// 标识退出loop循环

    const pid_t threadId_; //记录当前loop所在的线程id

    Timestamp pollReturnTime_;//poller返回发生事件的channels的时间节点
    std::unique_ptr<Poller> poller_;

    //int eventfd(unsigned int initval, int flags); 通过系统内部的eventfd()唤醒
    int wakeupFd_;//当mainloop获取一个新用户的channel时,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
    std::unique_ptr<Channel> wakeupChannel_;

    ChannelList activeChannels_;
    Channel* channelActiveChannel_;

    std::atomic_bool callPendingFunctors_;//标识当前loop是否需要执行回调操作
    std::vector<Functor> pendingFunctors_; // 存储loop所需要执行的所有回调操作
    std::mutex mutex_;//互斥锁 用来保护上面vector容器的线程安全操作
 };
  • EventLoop的构造析构函数

创建wakeupfd,用来唤醒subloop

// 防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;

// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000; // 10000毫秒 = 10秒钟
int createEventfd(){
    int evtfd = ::eventfd(0,EFD_NONBLOCK|EFD_CLOEXEC);

    if(evtfd<0){
        LOG_FATAL("eventfd error:%d\n", errno);
    }

    return evtfd;
}
EventLoop::EventLoop()
    :looping_(false)
    ,quit_(false)
    ,callingPendingFunctors_(false)
    ,threadId_(CurrentThread::tid())
    ,poller_(Poller::newDefaultPoller(this))
    ,wakeupFd_(createEventfd())
    ,wakeupChannel_(new Channel(this,wakeupFd_)){
        LOG_DEBUG("EventLoop created %p in thread %d\n", this, threadId_);

        if(t_loopInThisThread){
            LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadId_);
        }else{
            t_loopInThisThread = this;
        }

        //设置wakeupfd的事件类型以及发生事件后的回调操作
        //当eventfd可读时,就会调用EventLoop::handleRead()
        wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead,this));
        wakeupChannel_->enableReading(); // 每一个EventLoop都将监听wakeupChannel_的EPOLL读事件了
    }
EventLoop::~EventLoop(){
    wakeupChannel_->disableAll();//给Channel移除所有感兴趣的事件
    wakeupChannel_->remove();//把Channel从EventLoop上删除掉
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}
  • 事件循环

         执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:
         accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理
         mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒

//开启事件循环
void EventLoop::loop(){
    looping_ = true;
    quit_ = false;

    LOG_INFO("EventLoop %p start looping\n", this);

    while (!quit_){
        //在每一次事件循环中,poller_->poll会填充activeChannels_
        activeChannels_.clear();
        
        //监听两类fd,一种是client的fd,一种是wakeupfd
        pollReturnTime_ = poller_->poll(kPollTimeMs,&activeChannels_);

        for(Channel* channel:activeChannels_){
            //Poller监听哪些channel发生了事件,然后上报给eventloop,通知channel处理相应的事件
            channel->handelEvent(pollReturnTime_);
        }
        doPendingFunctors();
    }

    LOG_INFO("EventLoop %p stop looping.\n", this);
    looping_ = false;
}

void EventLoop::quit(){
    quit_ = true;
    if(!isInLoopThread()){
        wakeup();
    }
}
  • 当前loop中执行

在当前loop中执行,就直接执行回调操作,如果不是,把cb放入队列中,唤醒subloop的线程

//在当前loop中执行
void EventLoop::runInLoop(Functor cb){
    if(isInLoopThread()){//在当前的loop线程中 执行cb
        cb();
    }else{
        //在非当前loop中执行cb 需要唤醒loop所在线程,执行cb
        queueInLoop(cb);
    }
}

//把cb放入队列中 唤醒loop所在的线程 执行cb
void EventLoop::queueInLoop(Functor cb){
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }

    //唤醒相应的需要执行上面回调操作的loop的线程了
    //callingPendingFunctors_ 当前loop正在执行回调,但是loop有了新的回调
    if(!isInLoopThread() || callingPendingFunctors_){
        wakeup();//唤醒loop所在线程
    }
}
  • 唤醒线程

向对应subloop所在的线程向wakeupfd写个数据即可,就会唤醒当前线程

//唤醒subloop所在的线程  向wakeupfd写一个数据就行,就有事件可读,当前loop线程就会被唤醒
void EventLoop::wakeup(){
    uint64_t one = 1;
    size_t n = write(wakeupFd_,&one,sizeof one);
    if(n!=sizeof one){
        LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n",n);
    }
}
  • 改变Channel状态

channel需要改变自己的状态 更改或者删除注册自己在poller的状态,但是channel无法主动访问poller,所以通过eventloop进行访问

//eventloop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel *channel){
    //channel需要改变自己的状态 更改或者删除注册自己在poller的状态,但是channel无法主动访问poller,所以通过eventloop进行访问
    poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel){
    poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel){
    poller_->hasChannel(channel);
}
  • 处理读

在构造函数中设置了wakeupFd_的事件类型以及发生事件后的回调操作

void EventLoop::handleRead(){
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_,&one,sizeof one);
    if(n!=sizeof(one)){
         LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8\n", n);
    }
}
  • 执行回调

当subloop在执行回调操作的时候,是原子操作,如果数量较多,这时mainloop有新的回调函数需要添加进subloop,就会阻塞,造成时延 所以创建一个局部的functors,和pendingFunctors_进行交换这时pendingFunctors_就会被解放

void EventLoop::doPendingFunctors(){
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> locck(mutex_);
        functors.swap(pendingFunctors_);
    }

    for(const Functor &functor:functors){
        functor();//执行当前loop所需要执行的回调操作
    }
    callingPendingFunctors_  =false;
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐