Libevent 核心组件

  • event_base (事件基)

    • 本质:event (事件) 的集合容器,用于管理和存放需要监听的所有事件。

    • 核心功能:实现并管理事件的监听循环 (event loop)。

  • Event Loop (事件循环)

    • 机制:核心循环,持续监听注册在 event_base 中的所有事件。

    • 退出控制:

      • event_base_loopexit()优雅退出。设置一个超时时间,在超时到达或循环自然结束时退出,并且会等待所有当前激活事件的回调函数执行完毕。

      • event_base_loopbreak()立即退出。尽快退出事件循环(通常在下一个事件处理完成后),不等待所有激活事件回调完成,不处理超时。

  • event (事件) 的创建与管理

    • 创建:

      • event_new(): 初始化一个事件对象,在堆上分配内存。参数指定关联的文件描述符 (fd)、关注的事件类型 (如 EV_READ | EV_WRITE | EV_PERSIST)、回调函数以及回调参数。

    • 注册监听:

      • event_add(): 将初始化好的事件注册到指定的 event_base 上,使其开始被监听。

    • 事件状态:

      • Pending (等待状态):事件通过 event_add() 注册到 event_base 后,即进入此状态,等待其关注的条件(如 fd 可读/可写)发生。

      • Active (激活状态):当事件关注的条件发生时,事件被激活,其关联的回调函数将被放入待执行队列,由事件循环调度执行。

    • 事件持久性 (Persistence):

      • 持久事件 (EV_PERSIST):在 event_new() 的 what 参数中包含 EV_PERSIST。这类事件在回调函数执行完毕后,自动重新进入 Pending 状态,继续等待下一次事件发生。这是最常用的类型。

      • 非持久事件:回调函数执行完毕后,事件状态重置为初始化状态 (未添加)。如需再次监听,必须显式再次调用 event_add() 将其重新注册到 event_base

Memcached 源码工作线程模型分析

Memcached 采用主线程 (main thread) + 多个工作线程 (worker threads) 的模型处理并发连接。

  • 核心流程概述

  • 主线程 (main thread)

    • 初始化全局 event_base (main_base = event_init())

    • 执行其他关键初始化 (stats_init()assoc_init()conn_init()slabs_init())。

    • 创建工作线程并进行初始化 (thread_init(settings.num_threads, main_base))。

    • 承担监听 (listen) 和接受 (accept) 新连接的职责。

    • 启动主事件循环 (event_base_loop(main_base, 0))。

  • 工作线程 (worker threads)

    • 每个工作线程拥有自己独立的 event_base

    • 使用管道 (pipe) 机制与主线程进行通信。

    • 职责:处理分配给它的客户端连接上的数据读写、命令解析、数据存取及响应。

  • 新连接处理流程

    • 主线程 accept:主线程 accept 到一个新客户端连接 (client_fd)。

    • 选择工作线程:主线程根据负载均衡策略选择一个工作线程 (thread[i])。

    • 封装连接信息:主线程创建一个 CQ_ITEM 结构体,包含新连接的 client_fd、初始状态、事件标志、读缓冲区大小、传输方式等信息。

    • 入队与通知:主线程将 CQ_ITEM 放入该工作线程的 new_conn_queue 队列,然后通过向该工作线程的管道写入端 (notify_send_fd) 写入一个字符 'c' 来通知工作线程有新连接到达。

    • 工作线程唤醒:工作线程的 event_base 监听到其管道读取端 (notify_receive_fd) 可读 (即有 'c' 消息到达),触发其预先注册的 thread_libevent_process 回调函数。

    • 取出连接信息:在 thread_libevent_process 中,工作线程读取管道消息,识别出 'c' 后,从自己的 new_conn_queue 队列中 pop 出 CQ_ITEM (item = cq_pop(me->new_conn_queue)

    • 创建连接对象:工作线程调用 conn_new(item->sfd, ...) 函数。

      • conn_new 初始化一个 conn 结构体 (c) 代表这个客户端连接。

      • 关键步骤:设置 event 并注册到工作线程的 event_base (me->base)

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(me->base, &c->event); // 关联到线程的 event_base
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) { // 注册监听
    perror("event_add");
    return NULL;
}

  • 这里设置的回调函数是 event_handler

  • 监听客户端事件:此时,工作线程的 event_base 开始监听客户端 socket (sfd) 上指定的事件 (event_flags,通常是 EV_READ | EV_PERSIST)。

  • 事件触发与处理:当客户端有数据可读 (或可写,取决于注册的事件) 时,工作线程的 event_base激活对应 event,调用 event_handler 函数。

  •  状态机驱动:event_handler 函数的核心是调用 drive_machine(c)drive_machine 是一个状态机,它根据当前连接 c 的状态 (c->state) 执行相应的处理逻辑(解析命令、读取数据、写入响应、改变状态等)。conn 结构体中的 state 字段驱动了整个连接生命周期的处理流程。

关键函数详解

  • thread_init(int nthreads, struct event_base *main_base)

    • 为每个工作线程 threads[i]

      1. 创建管道 (pipe(fds)),设置 notify_receive_fd (读) 和 notify_send_fd (写)。

      2. 调用 setup_thread(&threads[i])

        • 初始化工作线程独有的 event_base (me->base = event_init())

        • 设置监听管道读取端的事件:

          event_set(&me->notify_event, me->notify_receive_fd,
                   EV_READ | EV_PERSIST, thread_libevent_process, me);
          event_base_set(me->base, &me->notify_event); // 关联到线程的 event_base
          event_add(&me->notify_event, 0); // 开始监听管道
        • 回调函数 thread_libevent_process 负责处理主线程的通知(新连接 'c'、锁切换 'l'/'g' 等)。

      3. 调用 create_worker(worker_libevent, &threads[i])

        • 内部调用 pthread_create 创建操作系统线程。

        • 线程入口函数是 worker_libevent

  • worker_libevent(void *arg) (工作线程入口函数)

    • 主要逻辑:

      LIBEVENT_THREAD *me = arg; // 获取线程上下文
      ... // (可能的一些初始化)
      event_base_loop(me->base, 0); // 启动工作线程自己的事件循环!
    • 一旦启动,工作线程就阻塞在 event_base_loop 上,等待其 event_base 上注册的事件(管道通知事件 notify_event 或后续添加的客户端连接事件 c->event) 发生。

  • thread_libevent_process(int fd, short which, void *arg)

    • 读取管道消息 (read(fd, buf, 1))。

    • 根据 buf[0] 进行分支处理:

      switch (buf[0]) {
      case 'c': // 新连接
          item = cq_pop(me->new_conn_queue); // 从队列取 CQ_ITEM
          if (NULL != item) {
              conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                                 item->read_buffer_size, item->transport, me->base);
              ... // (资源清理等)
          }
          break;
      case 'l': // 切换到细粒度锁 (Locks)
          ...
          break;
      case 'g': // 切换到全局锁 (Global lock)
          ...
          break;
      }
  • event_handler(const int fd, const short which, void *arg)

    • 核心逻辑:

      conn *c = (conn *)arg; // 获取关联的连接对象
      assert(c != NULL);
      c->which = which; // 记录触发的事件类型 (EV_READ, EV_WRITE 等)
      // 检查 fd 一致性 (安全措施)
      if (fd != c->sfd) {
          ... // 错误处理 (关闭连接)
          return;
      }
      drive_machine(c); // 核心处理:驱动连接状态机
      return; // 返回,等待下一次事件

总结

Memcached 利用 libevent 高效地实现了主从 Reactor 模型:

  1. 主 Reactor (Main Thread):使用 main_base,只负责 accept 新连接,并通过管道将新连接均衡分发给工作线程。

  2. 从 Reactor (Worker Threads):每个工作线程有自己的 event_base (threads[i].base)。它监听两种事件:

    • 管道通知事件 (notify_event): 接收主线程分发的新连接指令 ('c')。

    • 客户端 socket 事件 (c->event): 处理已建立连接的客户端请求。event_handler 作为统一入口,最终调用 drive_machine 状态机完成具体的命令处理逻辑。

  3. 通信机制:主线程与工作线程通过管道进行通知,通过无锁队列 (new_conn_queue) 传递连接信息 (CQ_ITEM),实现了高效的任务分发。

  4. 核心处理:客户端连接的读写事件处理最终由 drive_machine 状态机驱动,该状态机根据 conn->state 决定当前应执行的操作,是 Memcached 协议处理的核心。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐