Libevent 核心概念与 Memcached 工作线程模型分析
主线程 (初始化全局 event_base。执行其他关键初始化 (stats_init()创建工作线程并进行初始化 (thread_init(settings.num_threads, main_base)承担监听 (listen) 和接受 (accept) 新连接的职责。启动主事件循环 (event_base_loop(main_base, 0)工作线程 (每个工作线程拥有自己独立的 event
Libevent 核心组件
-
event_base
(事件基)-
本质:
event
(事件) 的集合容器,用于管理和存放需要监听的所有事件。 -
核心功能:实现并管理事件的监听循环 (event loop)。
-
-
Event Loop (事件循环)
-
机制:核心循环,持续监听注册在
event_base
中的所有事件。 -
退出控制:
-
event_base_loopexit()
:优雅退出。设置一个超时时间,在超时到达或循环自然结束时退出,并且会等待所有当前激活事件的回调函数执行完毕。 -
event_base_loopbreak()
:立即退出。尽快退出事件循环(通常在下一个事件处理完成后),不等待所有激活事件回调完成,不处理超时。
-
-
-
event
(事件) 的创建与管理-
创建:
-
event_new()
: 初始化一个事件对象,在堆上分配内存。参数指定关联的文件描述符 (fd)、关注的事件类型 (如EV_READ | EV_WRITE | EV_PERSIST
)、回调函数以及回调参数。
-
-
注册监听:
-
event_add()
: 将初始化好的事件注册到指定的event_base
上,使其开始被监听。
-
-
事件状态:
-
Pending (等待状态):事件通过
event_add()
注册到event_base
后,即进入此状态,等待其关注的条件(如 fd 可读/可写)发生。 -
Active (激活状态):当事件关注的条件发生时,事件被激活,其关联的回调函数将被放入待执行队列,由事件循环调度执行。
-
-
事件持久性 (Persistence):
-
持久事件 (
EV_PERSIST
):在event_new()
的what
参数中包含EV_PERSIST
。这类事件在回调函数执行完毕后,自动重新进入 Pending 状态,继续等待下一次事件发生。这是最常用的类型。 -
非持久事件:回调函数执行完毕后,事件状态重置为初始化状态 (未添加)。如需再次监听,必须显式再次调用
event_add()
将其重新注册到event_base
。
-
-
Memcached 源码工作线程模型分析
Memcached 采用主线程 (main thread
) + 多个工作线程 (worker threads
) 的模型处理并发连接。
-
核心流程概述
-
主线程 (
main thread
)-
初始化全局
event_base
(main_base = event_init()
)。 -
执行其他关键初始化 (
stats_init()
,assoc_init()
,conn_init()
,slabs_init()
)。 -
创建工作线程并进行初始化 (
thread_init(settings.num_threads, main_base)
)。 -
承担监听 (
listen
) 和接受 (accept
) 新连接的职责。 -
启动主事件循环 (
event_base_loop(main_base, 0)
)。
-
-
工作线程 (
worker threads
)-
每个工作线程拥有自己独立的
event_base
。 -
使用管道 (
pipe
) 机制与主线程进行通信。 -
职责:处理分配给它的客户端连接上的数据读写、命令解析、数据存取及响应。
-
-
新连接处理流程
-
主线程
accept
:主线程accept
到一个新客户端连接 (client_fd
)。 -
选择工作线程:主线程根据负载均衡策略选择一个工作线程 (
thread[i]
)。 -
封装连接信息:主线程创建一个
CQ_ITEM
结构体,包含新连接的client_fd
、初始状态、事件标志、读缓冲区大小、传输方式等信息。 -
入队与通知:主线程将
CQ_ITEM
放入该工作线程的new_conn_queue
队列,然后通过向该工作线程的管道写入端 (notify_send_fd
) 写入一个字符'c'
来通知工作线程有新连接到达。 -
工作线程唤醒:工作线程的
event_base
监听到其管道读取端 (notify_receive_fd
) 可读 (即有'c'
消息到达),触发其预先注册的thread_libevent_process
回调函数。 -
取出连接信息:在
thread_libevent_process
中,工作线程读取管道消息,识别出'c'
后,从自己的new_conn_queue
队列中pop
出CQ_ITEM
(item = cq_pop(me->new_conn_queue)
。 -
创建连接对象:工作线程调用
conn_new(item->sfd, ...)
函数。-
conn_new
初始化一个conn
结构体 (c
) 代表这个客户端连接。 -
关键步骤:设置
event
并注册到工作线程的event_base
(me->base
):
-
-
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(me->base, &c->event); // 关联到线程的 event_base
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) { // 注册监听
perror("event_add");
return NULL;
}
-
这里设置的回调函数是
event_handler
。 -
监听客户端事件:此时,工作线程的 event_base 开始监听客户端 socket (sfd) 上指定的事件 (event_flags,通常是 EV_READ | EV_PERSIST)。
-
事件触发与处理:当客户端有数据可读 (或可写,取决于注册的事件) 时,工作线程的 event_base激活对应 event,调用 event_handler 函数。
-
状态机驱动:event_handler 函数的核心是调用 drive_machine(c)。drive_machine 是一个状态机,它根据当前连接 c 的状态 (c->state) 执行相应的处理逻辑(解析命令、读取数据、写入响应、改变状态等)。conn 结构体中的 state 字段驱动了整个连接生命周期的处理流程。
关键函数详解
-
thread_init(int nthreads, struct event_base *main_base)
-
为每个工作线程
threads[i]
:-
创建管道 (
pipe(fds)
),设置notify_receive_fd
(读) 和notify_send_fd
(写)。 -
调用
setup_thread(&threads[i])
:-
初始化工作线程独有的
event_base
(me->base = event_init()
)。 -
设置监听管道读取端的事件:
event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); // 关联到线程的 event_base event_add(&me->notify_event, 0); // 开始监听管道
-
回调函数
thread_libevent_process
负责处理主线程的通知(新连接'c'
、锁切换'l'/'g'
等)。
-
-
调用
create_worker(worker_libevent, &threads[i])
:-
内部调用
pthread_create
创建操作系统线程。 -
线程入口函数是
worker_libevent
。
-
-
-
-
worker_libevent(void *arg)
(工作线程入口函数)-
主要逻辑:
LIBEVENT_THREAD *me = arg; // 获取线程上下文 ... // (可能的一些初始化) event_base_loop(me->base, 0); // 启动工作线程自己的事件循环!
-
一旦启动,工作线程就阻塞在
event_base_loop
上,等待其event_base
上注册的事件(管道通知事件notify_event
或后续添加的客户端连接事件c->event
) 发生。
-
-
thread_libevent_process(int fd, short which, void *arg)
-
读取管道消息 (
read(fd, buf, 1)
)。 -
根据
buf[0]
进行分支处理:switch (buf[0]) { case 'c': // 新连接 item = cq_pop(me->new_conn_queue); // 从队列取 CQ_ITEM if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); ... // (资源清理等) } break; case 'l': // 切换到细粒度锁 (Locks) ... break; case 'g': // 切换到全局锁 (Global lock) ... break; }
-
-
event_handler(const int fd, const short which, void *arg)
-
核心逻辑:
conn *c = (conn *)arg; // 获取关联的连接对象 assert(c != NULL); c->which = which; // 记录触发的事件类型 (EV_READ, EV_WRITE 等) // 检查 fd 一致性 (安全措施) if (fd != c->sfd) { ... // 错误处理 (关闭连接) return; } drive_machine(c); // 核心处理:驱动连接状态机 return; // 返回,等待下一次事件
-
总结
Memcached 利用 libevent 高效地实现了主从 Reactor 模型:
-
主 Reactor (Main Thread):使用
main_base
,只负责accept
新连接,并通过管道将新连接均衡分发给工作线程。 -
从 Reactor (Worker Threads):每个工作线程有自己的
event_base
(threads[i].base
)。它监听两种事件:-
管道通知事件 (
notify_event
): 接收主线程分发的新连接指令 ('c'
)。 -
客户端 socket 事件 (
c->event
): 处理已建立连接的客户端请求。event_handler
作为统一入口,最终调用drive_machine
状态机完成具体的命令处理逻辑。
-
-
通信机制:主线程与工作线程通过管道进行通知,通过无锁队列 (
new_conn_queue
) 传递连接信息 (CQ_ITEM
),实现了高效的任务分发。 -
核心处理:客户端连接的读写事件处理最终由
drive_machine
状态机驱动,该状态机根据conn->state
决定当前应执行的操作,是 Memcached 协议处理的核心。
更多推荐
所有评论(0)