C++校招项目推荐:高性能协程+RPC项目,一个项目打通后端8大核心技术
本文介绍了高性能协程+RPC框架TinyRPC的实现原理与部署指南。主要内容包括:1. 环境配置与项目构建步骤,推荐使用DevContainer快速搭建开发环境;2. 性能测试方法与实测数据,分析影响QPS的关键因素;3. 核心架构解析,重点阐述m:n协程模型的实现,包括寄存器切换、Hook系统调用等关键技术;4. Reactor事件驱动机制与TCP连接管理;5. 项目难点剖析,如协程上下文切换、
内容来源:程度员老廖
一、项目运行指南
1.1 环境要求
|
项目 |
要求 |
|---|---|
|
操作系统 |
Linux64位(推荐Ubuntu20.04+/CentOS7+) |
|
编译器 |
g++支持C++11 |
|
依赖库 |
protobuf >= 3.19.4, tinyxml |
|
构建工具 |
make或cmake |
1.2 一键搭建开发环境(推荐DevContainer)
项目自带DevContainer配置,最简单的方式:
# 如果使用VS Code/Cursor,直接打开项目,选择"Reopen in Container"
# DevContainer会自动安装所有依赖
项目源码领取:C++校招项目推荐:高性能协程+RPC项目,一个项目打通后端8大核心技术
1.3 手动安装步骤
第一步:安装protobuf
# 下载protobuf 3.19.4
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protobuf-cpp-3.19.4.tar.gz
tar xzf protobuf-cpp-3.19.4.tar.gz
cd protobuf-3.19.4
./configure
make -j$(nproc)
sudo make install
sudo ldconfig
第二步:安装tinyxml
# 方式一:通过git克隆(推荐)
git clone git://git.code.sf.net/p/tinyxml/git tinyxml
cd tinyxml
# 编译所有源文件
make -j4
# 用ar命令将.o文件打包成静态库
ar cr libtinyxml.a *.o
# 安装库文件和头文件到系统路径
sudo cp libtinyxml.a /usr/lib/
sudo mkdir -p /usr/include/tinyxml
sudo cp *.h /usr/include/tinyxml
cd ..
第三步:编译TinyRPC
cd tinyrpc
mkdir -p bin lib obj
# 生成protobuf桩文件
cd testcases
protoc --cpp_out=./ test_tinypb_server.proto
cd ..
# 编译(二选一)
# 方式一:makefile
make -j4
sudo make install
# 方式二:cmake
mkdir build
sudo ./build.sh
第四步:启动服务验证
# 启动TinyPB RPC服务
cd bin
nohup ./test_tinypb_server ../conf/test_tinypb_server.xml &
# 验证服务是否启动
ps -elf | grep test_tinypb_server
netstat -tln | grep 20000
# 使用客户端访问服务端
./test_tinypb_server_client
# 期望输出:Send to tinyrpc server 127.0.0.1:20000, requeset body:
# 期望输出:Success get response frrom tinyrpc server 127.0.0.1:20000, response body: res_info: "OK" age: 100100111
# 启动HTTP服务
nohup ./test_http_server ../conf/test_http_server.xml &
# 测试HTTP接口
curl -X GET 'http://127.0.0.1:19999/qps?id=1'
# 期望输出:QPSHttpServlet Echo Success!! Your id is,1
1.4 常见问题排查
|
问题 |
可能原因 |
解决方案 |
|---|---|---|
|
编译报错找不到protobuf |
protobuf未正确安装 |
检查/usr/include/google/protobuf和/usr/lib/libprotobuf.a |
|
启动后端口未监听 |
配置文件路径错误 |
检查xml配置中的ip和port |
|
curl测试无响应 |
服务未正确启动 |
查看nohup.out日志排查 |
|
coredump |
协程栈溢出 |
增大配置中的coroutine_stack_size |
二、性能测试指南
2.1 测试工具安装
# 安装wrk压测工具
git clone https://github.com/wg/wrk.git
cd wrk
make -j$(nproc)
sudo cp wrk /usr/local/bin/
2.2 HTTP Echo QPS测试
# 确保HTTP服务已启动且关闭日志(提升性能)
# 在xml配置中设置rpc_log_level为NONE
# 基础测试:1000并发,持续30秒
wrk -c 1000 -t 8 -d 30 --latency 'http://127.0.0.1:19999/qps?id=1'
# 高压测试:5000并发
wrk -c 5000 -t 8 -d 30 --latency 'http://127.0.0.1:19999/qps?id=1'
# 极限测试:10000并发
wrk -c 10000 -t 8 -d 30 --latency 'http://127.0.0.1:19999/qps?id=1'
2.3 不同IO线程数对比测试
修改配置文件conf/test_http_server.xml的iothread_num分别为1、4、8、16,重启服务后分别测试。
注意:性能数据与机器配置强相关,以下是两组不同环境下的实测数据。
原作者参考结果(CentOS虚拟机,4核6G,日志关闭):
|
IO线程数 |
1000并发 |
2000并发 |
5000并发 |
10000并发 |
|---|---|---|---|---|
|
1 |
27K QPS |
26K QPS |
20K QPS |
20K QPS |
|
4 |
140K QPS |
130K QPS |
123K QPS |
118K QPS |
|
8 |
135K QPS |
120K QPS |
100K QPS |
100K QPS |
|
16 |
125K QPS |
127K QPS |
123K QPS |
118K QPS |
实测结果(Ubuntu云服务器,AMD EPYC 7K83 8核15G,日志关闭,4个IO线程):
|
并发连接数 |
QPS |
平均延迟 |
P99延迟 |
|---|---|---|---|
|
1000 |
82K QPS |
20.76ms |
484ms |
|
2000 |
82K QPS |
13.41ms |
43ms |
|
5000 |
82K QPS |
10.08ms |
38ms |
|
10000 |
79K QPS |
12.45ms |
54ms |
开启DEBUG日志时同环境实测仅1.5W QPS,日志对性能影响约5倍。
2.4 性能分析要点
为什么两台机器QPS差距这么大?
性能测试的绝对值跟CPU型号、虚拟化方案、内核版本、编译优化等因素都有关。原作者14W QPS是在CentOS裸虚拟机上测的,云服务器虽然核心数多但单核性能受虚拟化开销影响。做性能测试不要纠结绝对值,重要的是理解影响性能的因素和优化方向。
为什么开DEBUG日志后QPS暴跌5倍?
生活类比:你每炒一道菜(处理一个请求)都停下来写一页日记(写日志),炒菜速度当然暴跌。虽然是异步日志(先记脑子里再写),但DEBUG级别产生的日志量太大,光往buffer里塞字符串的CPU开销就很可观。生产环境一般设INFO或WARN级别。
为什么并发连接数从1000增到10000,QPS几乎不变?
这说明瓶颈不在连接管理上,而在CPU处理能力上。epoll管理1000个fd和10000个fd开销差别不大(红黑树O(logN)),真正限制QPS的是每个请求的处理耗时。这也是Reactor+协程架构的优势——连接数增加不会线性增加开销。
三、项目架构深度分析
3.1 整体架构鸟瞰
┌────────────────────────────────────────────────────────┐
│ 应用层(用户代码) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ HTTP Servlet │ │ RPC Service │ │ RPC Client │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
├─────────┼──────────────────┼──────────────────┼────────┤
│ RPC调用封装层 │
│ ┌──────────────────────────────────────────────────┐ │
│ │ TinyPbRpcChannel / TinyPbRpcAsyncChannel │ │
│ └──────────────────────────────────────────────────┘ │
├────────────────────────────────────────────────────────┤
│ 协议编解码层 │
│ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ HTTP Codec │ │ TinyPB Codec │ │
│ └─────────────────┘ └─────────────────────────┘ │
├────────────────────────────────────────────────────────┤
│ 网络传输层 │
│ ┌──────────────────────────────────────────────────┐ │
│ │ TcpServer / TcpConnection / TcpClient │ │
│ └──────────────────────────────────────────────────┘ │
├────────────────────────────────────────────────────────┤
│ 核心引擎层 │
│ ┌────────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ Reactor │ │ Coroutine │ │ Async Logger │ │
│ │ (epoll) │ │ (m:n模型) │ │ (生产者-消费者) │ │
│ └────────────┘ └────────────┘ └──────────────────┘ │
└────────────────────────────────────────────────────────┘
源码目录结构
tinyrpc/
├── comm/ # 公共工具:日志、配置、线程池
│ ├── log.h/cc # 异步日志系统
│ ├── config.h/cc # XML配置解析
│ └── ...
├── coroutine/ # 协程模块(★核心难点★)
│ ├── coroutine.h/cc # 协程封装(Resume/Yield)
│ ├── coctx.h # 协程上下文(14个寄存器)
│ ├── coctx_swap.S # 汇编实现的上下文切换
│ ├── coroutine_hook.h/cc # 系统调用Hook
│ ├── coroutine_pool.h/cc # 协程池
│ └── memory.h/cc # 协程栈内存管理
├── net/ # 网络模块
│ ├── reactor.h/cc # Reactor事件循环
│ ├── fd_event.h/cc # 文件描述符事件封装
│ ├── timer.h/cc # 定时器
│ ├── tcp/ # TCP模块
│ │ ├── tcp_server.h/cc # TCP服务端
│ │ ├── tcp_connection.h/cc # TCP连接管理
│ │ ├── io_thread.h/cc # IO线程及线程池
│ │ └── ...
│ ├── http/ # HTTP协议模块
│ │ ├── http_codec.h/cc # HTTP编解码
│ │ ├── http_servlet.h/cc # Servlet接口
│ │ └── ...
│ └── tinypb/ # TinyPB自定义协议模块
│ ├── tinypb_codec.h/cc # TinyPB编解码
│ ├── tinypb_rpc_channel.h/cc # 阻塞式RPC调用
│ ├── tinypb_rpc_async_channel.h/cc # 非阻塞式RPC调用
│ └── ...
├── testcases/ # 测试用例
├── conf/ # 配置文件
└── generator/ # 代码生成脚手架
3.2 协程模块(核心难点)
3.2.1 什么是协程?为什么需要协程?
先提出问题: 传统网络服务器处理并发连接有什么困难?
传统方案一:多线程/多进程模型
生活类比:一个餐厅来了10000个客人,为每个客人专门请一个服务员。问题是:
-
10000个服务员的工资(内存开销)太贵——每个线程默认占8MB栈空间
-
服务员之间互相抢路(上下文切换开销)——线程切换需要陷入内核态
-
大多数时候服务员在等厨房做菜(IO等待),白白占着位置
传统方案二:异步回调模型(如Node.js)
生活类比:只请一个超级服务员,他用便签纸记录:"3号桌的菜好了就端过去,5号桌要加水"。问题是:
-
便签纸越写越多,越来越难看懂(回调地狱)
-
稍不留神便签纸就乱了(代码难以维护)
协程方案:用同步的代码,达到异步的性能!
生活类比:请4个服务员(IO线程),但每个服务员都会"分身术"(协程)。当他在等3号桌的菜时(IO等待),他的分身会自动去服务5号桌。菜好了他的分身自动回来继续3号桌的服务。从每桌客人的角度看,服务员一直在服务自己(同步体验);从餐厅老板的角度看,4个人干了10000个人的活(异步性能)。
3.2.2 协程的本质:寄存器切换
先提出问题: 所谓"分身术"到底是怎么实现的?CPU怎么知道下次从哪里继续执行?
答案: 保存和恢复CPU寄存器的状态。
函数执行到一半时,CPU的状态完全由几个关键寄存器决定:
-
RSP(栈顶指针):当前在栈的哪个位置
-
RBP(栈底指针):当前函数栈帧的底部
-
RIP(指令指针):下一条要执行的指令在哪
-
RDI/RSI:函数参数
TinyRPC使用14个寄存器来完整保存协程状态:
// tinyrpc/coroutine/coctx.h
struct coctx {
void* regs[14]; // 14个寄存器的"快照"
};
enum {
kRBP = 6, // 栈底指针
kRDI = 7, // 第一个参数
kRSI = 8, // 第二个参数
kRETAddr = 9, // 返回地址(即下次从哪继续执行)
kRSP = 13, // 栈顶指针
};
生活类比:你正在看一本书(执行协程A),看到第50页时需要去做饭(切换到协程B)。你会夹一个书签在第50页(保存寄存器状态)。做完饭回来,翻到书签那一页继续看(恢复寄存器状态)。coctx就是那个书签,14个regs就是书签上记录的14个关键信息。
深入理解:regs[14]的内存布局
汇编代码里的偏移量(104、48、72...)都是从这个数组算出来的:
regs[14] 数组内存布局(每格8字节,总共112字节)
┌─────────┬─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐
│ regs[0] │ regs[1] │ regs[2] │ regs[3] │ regs[4] │ regs[5] │ regs[6] │
│ r15 │ r14 │ r13 │ r12 │ r9 │ r8 │ rbp │
│ 偏移 0 │ 偏移 8 │ 偏移 16 │ 偏移 24 │ 偏移 32 │ 偏移 40 │ 偏移 48 │
├─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ regs[7] │ regs[8] │ regs[9] │ regs[10]│ regs[11]│ regs[12]│ regs[13]│
│ rdi │ rsi │ retAddr │ rdx │ rcx │ rbx │ rsp │
│ 偏移 56 │ 偏移 64 │ 偏移 72 │ 偏移 80 │ 偏移 88 │ 偏移 96 │ 偏移 104│
└─────────┴─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘
汇编里 "movq %rax, 104(%rdi)" → 104 = kRSP(13) × 8
汇编里 "movq %rbp, 48(%rdi)" → 48 = kRBP(6) × 8
所有偏移量都是 数组下标 × 8字节 算出来的,没有魔法。
深入理解:为什么是这14个寄存器?
x86-64有16个通用寄存器,但不是每个都需要保存。根据System V AMD64 ABI(Linux函数调用约定):
|
类别 |
寄存器 |
个数 |
为什么要保存 |
|---|---|---|---|
|
Callee-saved |
rbx, rbp, r12-r15 |
6 |
ABI规定函数返回时这些值必须不变。协程切换相当于"假装函数返回了",不保存这6个,上层代码直接崩 |
|
栈和指令指针 |
rsp, retAddr(rip) |
2 |
rsp决定"在哪个栈上执行",retAddr决定"从哪行代码继续"。缺一个都无法恢复 |
|
参数/临时 |
rdi, rsi, rdx, rcx, r8, r9 |
6 |
正常函数调用中这些是caller-saved。但协程不是正常调用——setCallBack要通过regs[kRDI]传this指针给CoFunction,且切换可能发生在任意时刻,不保存会导致恢复后寄存器残留别人的值 |
|
不需要保存 |
rax, r10, r11 |
-- |
rax是返回值/临时变量(汇编里直接当暂存器用了),r10/r11是纯scratch寄存器,切换完会被重新赋值 |
最终:6 + 2 + 6 = 14个寄存器,每个8字节,一个协程上下文仅112字节。对比线程切换要保存的内核栈、浮点寄存器、TLB状态——量级差了两个数量级。
3.2.3 汇编级别的上下文切换
; tinyrpc/coroutine/coctx_swap.S——来自腾讯libco
coctx_swap:
; ===== 第一阶段:保存当前协程的寄存器 =====
; rdi指向当前协程的coctx(第一个参数)
leaq (%rsp),%rax ; 把当前栈顶地址存到rax
movq %rax, 104(%rdi) ; 保存rsp(regs[13])
movq %rbx, 96(%rdi) ; 保存rbx(regs[12])
movq %rcx, 88(%rdi) ; 保存rcx(regs[11])
; ... 保存其他寄存器 ...
movq %rbp, 48(%rdi) ; 保存rbp(regs[6])
; ===== 第二阶段:恢复目标协程的寄存器 =====
; rsi指向目标协程的coctx(第二个参数)
movq 48(%rsi), %rbp ; 恢复rbp
movq 104(%rsi), %rsp ; 恢复rsp(栈切换!)
; ... 恢复其他寄存器 ...
leaq 8(%rsp), %rsp ; 调整栈顶
pushq 72(%rsi) ; 把返回地址压栈
movq 64(%rsi), %rsi ; 恢复rsi
ret ; ret指令会跳转到栈顶地址执行
生活类比:两个人在同一台电脑上轮流玩游戏。玩家A把游戏存档(保存寄存器),然后玩家B读档继续玩(恢复寄存器)。coctx_swap就是"存档+读档"这两步操作的原子过程。
关键点:为什么只需要约38行汇编?
因为协程切换是用户态操作,不需要陷入内核。相比线程切换需要:保存/恢复所有寄存器→切换内核栈→刷新TLB→切换地址空间,协程切换只需要保存/恢复14个寄存器,快了几个数量级。
深入理解:恢复阶段最后三步(最精妙的部分)
movq 104(%rsi), %rsp恢复栈指针后,rsp指向目标协程的栈。此时栈顶放的是上次call coctx_swap压入的旧返回地址,但我们要用coctx里保存的返回地址(regs[9])来替换它:
第①步:leaq 8(%rsp), %rsp
rsp += 8,跳过栈上旧的返回地址(我们不要它)
┌────────────┐
│ ...上层栈帧... │ ← rsp 现在指向这里
├────────────┤
│ 旧的返回地址(已跳过) │
└────────────┘
第②步:pushq 72(%rsi)
72 = kRETAddr(9) × 8,即coctx里保存的返回地址
pushq = rsp减8,把值写到栈顶
┌────────────┐
│ ...上层栈帧... │
├────────────┤
│ regs[9]的返回地址 │ ← rsp 指向这里(新压入的)
└────────────┘
第③步:ret
ret = pop栈顶 → rip,CPU跳转到regs[9]保存的地址继续执行
也就是目标协程上次Yield时的下一条指令
效果:用coctx里的返回地址替换了栈上的旧地址,ret跳转过去。切换完成。
为什么用leaq+pushq而不是直接覆盖栈顶?因为x86的mov不支持内存到内存操作,直接覆盖需要一个额外寄存器做中转。而leaq+pushq只用rsp自己就完成了,不污染任何其他寄存器——手工汇编中"能少用一个寄存器就少用一个"。
3.2.4 Resume和Yield:协程切换的两个方向
Resume(唤醒): 主协程 → 目标协程 "老板说:你去干活"
Yield(让出): 目标协程 → 主协程 "打工人说:我干不动了,先歇会"
核心代码解析:
// 从主协程切换到目标协程
void Coroutine::Resume(Coroutine* co) {
// 安全检查:只有主协程才能Resume其他协程
if (t_cur_coroutine != t_main_coroutine) {
return; // 必须由"老板"来分配任务
}
t_cur_coroutine = co; // 标记当前执行的协程
coctx_swap(&(t_main_coroutine->m_coctx), &(co->m_coctx));
// ↑ 保存主协程状态 ↑ 恢复目标协程状态
}
// 从目标协程切换回主协程
void Coroutine::Yield() {
Coroutine* co = t_cur_coroutine;
t_cur_coroutine = t_main_coroutine; // 切回主协程
coctx_swap(&(co->m_coctx), &(t_main_coroutine->m_coctx));
// ↑ 保存当前协程状态 ↑ 恢复主协程状态
}
生活类比:一个老板(主协程)管理多个员工(子协程)。
-
Resume:老板对员工A说"你继续干活",然后老板自己去休息
-
Yield:员工A说"这个任务需要等材料(IO),我先歇着",然后控制权回到老板手上,老板可以安排其他员工干活
3.2.5 Hook技术:让旧代码自动异步化
先提出问题: 如果用户代码里写了read(fd, buf, count),这是阻塞调用,会卡住整个线程,怎么办?
答案: Hook(钩子)——偷梁换柱,把系统调用替换成协程版本。
// 原理:利用dlsym获取原始系统调用的地址
#define HOOK_SYS_FUNC(name) \
name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name);
HOOK_SYS_FUNC(read); // g_sys_read_fun指向真正的read系统调用
HOOK_SYS_FUNC(write);
HOOK_SYS_FUNC(connect);
HOOK_SYS_FUNC(accept);
HOOK_SYS_FUNC(sleep);
Hook版的read完整流程(7步):
// tinyrpc/coroutine/coroutine_hook.cc 第66-108行
ssize_t read_hook(int fd, void *buf, size_t count) {
// ① 主协程检查——主协程不能Yield,直接走原版
if (Coroutine::IsMainCoroutine()) return g_sys_read_fun(fd, buf, count);
// ② 设为非阻塞——Hook的前提是非阻塞IO
fd_event->setNonBlock();
// ③ 先尝试直接读
ssize_t n = g_sys_read_fun(fd, buf, count);
if (n > 0) return n; // 有数据就直接返回
// ④ 没数据(EAGAIN),注册epoll监听
toEpoll(fd_event, READ);
// ⑤ 让出协程,切回主协程
Coroutine::Yield();
// ⑥ 被Resume唤醒,取消epoll监听
fd_event->delListenEvents(READ);
fd_event->clearCoroutine();
// ⑦ 真正读数据
return g_sys_read_fun(fd, buf, count);
}
关键细节:
-
①主协程检查:主协程不能Yield(Yield是跳回主协程,已经在主协程了往哪跳?),漏掉这个检查会死循环
-
②setNonBlock:如果fd是阻塞的,read会直接卡住线程;设非阻塞后read立刻返回EAGAIN,才能走后面的epoll+Yield流程
-
③先尝试读:如果数据已经到了就不需要走后面的流程,减少不必要的epoll注册和协程切换
生活类比:你去银行办业务(read),发现柜台没人(数据未就绪)。
-
传统做法:你在柜台前傻等(阻塞IO),其他人都被你挡着了
-
Hook做法:你拿了个号(注册epoll),然后去旁边喝咖啡(Yield),叫到号了再回来(Resume),完全不耽误其他人
端到端时序图:从用户调read()到数据返回
用户代码 read_hook() epoll/Reactor 网络
│ │ │ │
│ read(fd,buf,n) │ │ │
│─────────────────────→│ │ │
│ │ setNonBlock │ │
│ │ g_sys_read_fun() │ │
│ │ 返回EAGAIN │ │
│ │ │ │
│ │ toEpoll(fd,READ) ───→│ 注册fd可读监听 │
│ │ Coroutine::Yield()──→│ │
│ │ ┌───────────────┐ │ │
│ │ │ 协程A被挂起 │ │ │
│ │ │ CPU去处理 │ │ │
│ │ │ 其他协程B,C.. │ │ │
│ │ └───────────────┘ │ │
│ │ │ ←── 数据到达 ─────│
│ │ │ epoll_wait返回 │
│ │ ←─ Resume(协程A) ────│ │
│ │ delListenEvents │ │
│ │ g_sys_read_fun() ────→│ │
│ │ 返回n字节 │ │
│ ←── 返回n ──────────│ │ │
│ 继续执行... │ │ │
用户代码只看到:调了read()→拿到了数据。完全感知不到中间的Yield/Resume。
connect_hook——比read更复杂的场景
connect_hook需要额外处理超时:非阻塞connect返回EINPROGRESS后,有两种情况可能唤醒协程:
// tinyrpc/coroutine/coroutine_hook.cc 第191-263行(简化)
int connect_hook(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
fd_event->setNonBlock();
int n = g_sys_connect_fun(sockfd, addr, addrlen);
if (n == 0) return 0; // 立即连上
if (errno != EINPROGRESS) return n; // 真的失败
toEpoll(fd_event, WRITE); // 连接完成时fd变为可写
// 同时注册超时定时器
bool is_timeout = false;
auto timeout_cb = [&is_timeout, cur_cor]() {
is_timeout = true;
Coroutine::Resume(cur_cor); // 超时也唤醒协程
};
timer->addTimerEvent(timeout_event);
Coroutine::Yield(); // 等连接完成或超时
// 唤醒后用is_timeout区分原因
timer->delTimerEvent(timeout_event); // 清理定时器
if (is_timeout) { errno = ETIMEDOUT; return -1; }
return 0;
}
精妙之处:epoll回调(连接成功)和定时器回调(超时)都可能Resume同一个协程,用is_timeout标志位区分唤醒原因。唤醒后必须同时清理epoll事件和定时器,否则可能被二次唤醒。
3.2.6 m:n线程-协程模型
先提出问题: 1:n模型(一个线程对应多个协程)有什么缺陷?
答案: 如果某个协程的业务逻辑特别耗时(比如复杂计算),同一线程上的其他协程都得等它。
生活类比:1:n模型=一个快递员负责一整栋楼。如果他被10楼的大件快递耽误了,1-9楼全部得等。 m:n模型=4个快递员共同负责一整栋楼。10楼的大件耽误了一个快递员,其他3个继续送1-9楼。
1:n vs m:n的直观对比:
======= 1:n模型 =======
IO线程1: [epoll] → Resume(C1) → Resume(C2) → Resume(C3)
↑ C3很慢,C1/C2被堵
IO线程2: [epoll] → Resume(C4) ← 很闲,帮不上忙
IO线程3: [epoll] → Resume(C5) ← 很闲,帮不上忙
问题:C3阻塞了IO线程1上的所有协程,线程2和3干瞪眼。
======= m:n模型 =======
全局队列: [C1, C2, C3, C4, C5]
IO线程1: pop→C1→处理完→pop→C3→C3很慢→Yield→C3回到队列
IO线程2: pop→C2→处理完→pop→C4→处理完→pop→C3→继续C3的工作
IO线程3: pop→C5→处理完→队列空了→epoll_wait等新事件
结果:C3很慢?没关系,它Yield后回到队列,
IO线程2有空了就接着处理。没有任何线程被堵死。
TinyRPC的实现:
// 全局协程任务队列,所有IO线程共享
// tinyrpc/net/reactor.h 第124-135行
class CoroutineTaskQueue {
std::queue<FdEvent*> m_task;
Mutex m_mutex; // 多线程共享,必须加锁
};
// Reactor事件循环中,SubReactor从全局队列取协程执行
// tinyrpc/net/reactor.cc 第233-244行
if (m_reactor_type != MainReactor) {
while (1) {
FdEvent* ptr = CoroutineTaskQueue::GetCoroutineTaskQueue()->pop();
if (ptr) {
ptr->setReactor(this); // ★ 关键:更新Reactor指针
Coroutine::Resume(ptr->getCoroutine());
} else {
break;
}
}
}
关键细节:ptr->setReactor(this)
协程C上次在IO线程1上执行,现在被IO线程2取出来了。setReactor(this)把fd关联的Reactor更新为IO线程2的。如果不更新,下次read_hook注册epoll时会注册到IO线程1的Reactor上。IO线程1收到事件后Resume协程C——但IO线程2可能还在操作协程C的数据。同一个协程被两个线程同时操作 = 数据竞争 = coredump。
first_coroutine优化:减少锁争用
// tinyrpc/net/reactor.cc 第294-304行
if (ptr->getCoroutine()) {
if (!first_coroutine) {
first_coroutine = ptr->getCoroutine(); // 第一个就绪协程不入队
continue;
}
CoroutineTaskQueue::GetCoroutineTaskQueue()->push(ptr); // 其余入全局队列
}
// 循环开头直接Resume
if (first_coroutine) {
Coroutine::Resume(first_coroutine); // 省掉push+pop各一次锁操作
first_coroutine = NULL;
}
每次epoll_wait返回可能有多个就绪事件。第一个直接在当前线程Resume,省掉两次锁操作。高并发下每秒几十万次协程调度,这个优化的累积效果很可观。
注意事项:m:n模型下,同一个协程可能被不同IO线程Resume,因此:
-
不能依赖thread_local变量——协程上次在线程1执行,这次可能在线程2,thread_local的值跟着线程走不跟着协程走
-
协程池必须加互斥锁——多个IO线程可能同时调getCoroutineInstanse()
-
TinyRPC保证:一个协程在任意时刻只会被一个线程调度(通过从epoll摘除fd→入队→另一个线程pop→重新注册epoll的流转来保证)
3.2.7 协程池:避免频繁创建销毁
先提出问题: 协程的创建过程(分配栈内存、初始化coctx、设回调)看起来不复杂,为什么要搞一个池子?
不用池子时,每个请求的开销:
每个请求到来:
① 分配128KB栈内存 ← 内核需要建立虚拟地址映射
② new Coroutine对象 ← 堆分配
③ setCallBack初始化coctx ← 纯计算,很快
④ 执行业务...
请求处理完:
⑤ 释放128KB栈内存 ← 归还内存
⑥ delete Coroutine对象 ← 堆释放
14万QPS = 每秒执行14万次 ①②⑤⑥ = CPU全在做内存管理
真正的性能杀手:缺页中断(Page Fault)
不管用malloc还是mmap分配内存,Linux的按需分配(Demand Paging)策略都一样:内核只给虚拟地址,物理内存在首次写入时才分配。每次写入未映射的页面触发一次缺页中断,陷入内核态分配物理页。
128KB栈 = 32个4KB页面。一个新协程最坏触发32次缺页中断。14万QPS × 最多32次 = 每秒最多448万次缺页中断,CPU基本在内核态打转。
用了池子之后:
初始化阶段(只做一次):
Memory一次性分配 pool_size × 128KB
第一轮使用触发缺页,之后物理页就一直在了
运行阶段(每个请求):
① getCoroutineInstanse() 从数组取一个 ← O(N)遍历,纯用户态
② setCallBack重置coctx ← 纯计算
③ 执行业务...
④ returnCoroutine() 标记为空闲 ← 改一个bool
没有内存分配/释放,没有缺页中断
源码实现:
// tinyrpc/coroutine/coroutine_pool.cc 第46-73行
Coroutine::ptr CoroutinePool::getCoroutineInstanse() {
Mutex::Lock lock(m_mutex); // m:n模型下多线程共享,必须加锁
for (int i = 0; i < m_pool_size; ++i) {
if (!m_free_cors[i].first->getIsInCoFunc() && !m_free_cors[i].second) {
m_free_cors[i].second = true;
Coroutine::ptr cor = m_free_cors[i].first;
lock.unlock();
return cor;
}
}
// 池中没有可用协程,扩容
m_memory_pool.push_back(std::make_shared<Memory>(m_stack_size, m_pool_size));
return std::make_shared<Coroutine>(m_stack_size, m_memory_pool.back()->getBlock());
}
为什么优先复用"用过的"协程? 遍历顺序从i=0开始,先被使用过的协程排在前面。它们的栈内存已经被写入过,物理页早就分配好了,是"热"内存。后面的可能从未被使用,虚拟地址有了但物理页还没有,一用就触发缺页中断。
关于栈内存分配方式: 当前代码使用malloc(memory.cc第15行)。原版libco使用mmap,好处是可以配合mprotect在栈底设guard page——栈溢出时触发SIGSEGV而不是默默踩坏别人的内存。当前用malloc做不到这一点,这是一个可优化的方向。
生活类比:共享单车站(协程池)。需要骑车时从站点拿一辆(getCoroutineInstanse),用完还回去(returnCoroutine),而不是每次都买新的再扔掉。优先拿"热车"(最近用过的,物理内存已分配),而不是"冷车"(从未使用,首次骑要触发缺页分配物理页)。
3.3 Reactor模块
3.3.1 什么是Reactor模式?
先提出问题: 一个服务端需要同时监听成千上万个连接的IO事件,怎么高效处理?
答案: Reactor模式——一个"调度中心"统一监听所有事件,事件就绪后分发给对应的处理器。
生活类比:医院的导诊台(Reactor)。所有病人(连接)到了先去导诊台登记,导诊台统一安排:"3号去内科,5号去外科"。而不是让每个科室的医生自己出来招揽病人。
3.3.2 TinyRPC的MainReactor+SubReactor架构
┌─────────────────┐
│ MainReactor │
│ (主线程) │
│ 职责:accept │
│ 新连接 │
└────────┬────────┘
│ 分发新连接
┌──────────────┼──────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ SubReactor │ │ SubReactor │ │ SubReactor │
│ IO线程1 │ │ IO线程2 │ │ IO线程3 │
│ epoll_wait │ │ epoll_wait │ │ epoll_wait │
│ 处理读写 │ │ 处理读写 │ │ 处理读写 │
└────────────┘ └────────────┘ └────────────┘
↕ ↕ ↕
┌──────────────────────────────────────────┐
│ 全局协程任务队列(加锁) │
│ 协程A 协程B 协程C 协程D ... │
└──────────────────────────────────────────┘
-
MainReactor:只负责accept新连接,accept本身也是在协程中完成的
-
SubReactor:负责已建立连接的读写事件处理,每个IO线程一个SubReactor
-
全局协程任务队列:就绪的协程放入队列,任何空闲的IO线程都可以取出执行(详见3.2.6 m:n模型)
3.3.3 事件循环核心流程
Reactor::loop()是整个框架的调度引擎(tinyrpc/net/reactor.cc第210-367行),每次循环做5件事:
Reactor::loop() 单次循环:
① 从全局队列pop就绪协程并Resume(仅SubReactor)
→ 配合m:n模型,从CoroutineTaskQueue取其他线程投递的协程
→ 注意:pop后要setReactor(this)更新Reactor指针
② 执行pending_tasks
→ 其他线程投递的异步任务(如注册新fd),先放进队列,这里统一执行
③ epoll_wait等待事件
→ 阻塞直到有IO事件就绪、定时器到期、或被wakeup唤醒
④ 事件分发
→ wakeup fd → 读掉eventfd的数据
→ timer fd → 直接执行定时回调
→ 有关联协程 → 第一个直接Resume(first_coroutine优化),后续入全局队列
→ 有回调函数 → 放入pending_tasks
⑤ 处理跨线程投递的fd添加/删除
→ 将pending_add_fds和pending_del_fds统一应用到epoll
这5步的执行顺序不能随意调换:先处理全局队列(可能有其他线程投递的紧急任务)→再执行pending任务→然后才epoll_wait等新事件→处理新事件后更新epoll状态。
3.3.4 eventfd唤醒机制
// 创建(tinyrpc/net/reactor.cc 第52行)
m_wake_fd = eventfd(0, EFD_NONBLOCK);
// 唤醒(tinyrpc/net/reactor.cc 第122-133行)
void Reactor::wakeup() {
if (!m_is_looping) return;
uint64_t tmp = 1;
g_sys_write_fun(m_wake_fd, &tmp, 8); // 往eventfd写个1
}
生活类比:你在午睡(epoll_wait阻塞),手机闹钟响了(wakeup),你醒来看看有什么新任务。eventfd就是那个闹钟——跨线程安全地唤醒正在epoll_wait的线程。
为什么用eventfd而不是pipe? pipe需要两个fd(一读一写),eventfd只需要一个,更节约资源。eventfd还支持信号量语义(EFD_SEMAPHORE),比pipe更适合这种"通知"场景。
3.4 TCP模块
3.4.1 TcpServer启动流程
1. 创建IOThreadPool(N个IO线程,每个线程一个SubReactor)
2. 创建MainReactor
3. 创建accept协程,绑定MainAcceptCorFunc
4. Resume accept协程 → 开始监听新连接
5. 启动所有IO线程(sem_post)
6. MainReactor进入loop
MainAcceptCorFunc循环:
└→ accept(hook版)→ 无新连接则Yield
└→ 有新连接 → 创建TcpConnection → 分配到某个IO线程
└→ 将连接的协程加入该IO线程的Reactor
3.4.2 时间轮(TimeWheel)连接管理
TinyRPC使用时间轮来管理空闲连接的超时清理。
生活类比:停车场按小时收费。每过一个小时,管理员检查最外圈的停车位,超时的车就被拖走(关闭连接)。新来的车或者续费的车会被放到最内圈(刷新计时)。这比给每辆车设置一个独立闹钟(每个连接一个定时器)要高效得多。
3.5 协议与序列化
3.5.1 TinyPB协议格式
┌───────┬────────┬──────────────┬─────────────┬───────────────────┬──────────┬──────────────┬──────────┬─────────┬───────────┬─────┐
│ start │ pk_len │ msg_req_len │ msg_req │ service_name_len │ service │ err_code │ err_info │ pb_data │ check_num │ end │
│ 0x02 │ 4B │ 4B │ 变长 │ 4B │ 变长 │ 4B │ 变长 │ 变长 │ 4B │0x03 │
└───────┴────────┴──────────────┴─────────────┴───────────────────┴──────────┴──────────────┴──────────┴─────────┴───────────┴─────┘
最小包大小:1+4+4+4+4+4+4+1 = 26字节
3.5.2 为什么自定义协议而不只用HTTP?
|
对比项 |
HTTP |
TinyPB |
|---|---|---|
|
格式 |
文本协议 |
二进制协议 |
|
解析效率 |
需要解析文本Header |
直接按偏移量读取 |
|
包体大小 |
Header较大 |
最小26字节 |
|
适用场景 |
浏览器/外部调用 |
内部服务间通信 |
3.6 异步日志模块
生产者(多个IO线程) 消费者(日志线程)
│ │
├─ pushRpcLog(msg) ──→ buffer │
├─ pushAppLog(msg) ──→ buffer │
│ │ 定时flush
│ ├──→ 写入RPC日志文件
│ └──→ 写入APP日志文件
特点:
-
异步写入:IO线程只负责往buffer塞日志,不做磁盘IO
-
滚动日志:跨天或文件过大自动新建文件
-
崩溃保护:程序异常退出前flush日志
-
双日志流:RPC框架日志和应用业务日志分开
四、项目难点深度解析
难点一:协程上下文切换的正确性(★★★★★)
难在哪里? 汇编级别的寄存器操作,任何一个寄存器保存/恢复的顺序错误,都会导致不可预测的coredump。
关键理解点:
1.栈对齐:x86-64 ABI要求栈指针16字节对齐,否则SSE指令会Segfault
top = reinterpret_cast<char*>((reinterpret_cast<unsigned long>(top)) & -16LL);
2.首次Resume的巧妙设计:通过预设寄存器值,让coctx_swap的ret跳转到CoFunction
m_coctx.regs[kRETAddr] = reinterpret_cast<char*>(CoFunction);
m_coctx.regs[kRDI] = reinterpret_cast<char*>(this); // CoFunction的参数
3.CoFunction结束后自动Yield:防止协程执行完毕后"跑飞"
void CoFunction(Coroutine* co) {
co->m_call_back(); // 执行用户回调
Coroutine::Yield(); // 回调结束后必须让出,否则CPU会执行到非法地址
}
难点二:Hook与协程的联动(★★★★★)
难在哪里? 需要在hook函数中正确地与epoll和协程交互,时序必须精确。
以connect_hook为例,要处理的边界情况:
-
连接立即成功(返回0)→直接返回
-
连接进行中(errno==EINPROGRESS)→注册写事件+超时定时器→Yield
-
Resume后需要判断:是因为连接成功被唤醒?还是因为超时被唤醒?
-
超时定时器和epoll事件必须正确清理,否则可能二次唤醒
// 超时回调和epoll回调都可能Resume协程
// 必须用is_timeout标志区分唤醒原因
auto timeout_cb = [&is_timeout, cur_cor]() {
is_timeout = true;
Coroutine::Resume(cur_cor);
};
难点三:m:n模型下的线程安全(★★★★☆)
难在哪里? 同一个协程可能在线程A中Yield,在线程B中Resume。
必须注意的问题:
-
thread_local变量在不同线程中值不同。协程被线程B Resume后,看到的thread_local是线程B的
-
CoroutineTaskQueue必须加互斥锁
-
FdEvent的Reactor指针需要在跨线程调度时更新(ptr->setReactor(this))
-
全局队列的锁粒度需要平衡:锁太粗影响并发,锁太细容易出bug
TinyRPC的优化: 第一个就绪协程直接在当前线程Resume,避免入队出队的锁开销
难点四:内存管理与生命周期(★★★★☆)
难在哪里? 跨线程、跨协程的对象,谁来负责释放?
典型场景:非阻塞RPC调用
// 这些对象在线程A创建,在线程B的新协程中使用
std::shared_ptr<queryAgeReq> rpc_req = std::make_shared<queryAgeReq>();
std::shared_ptr<queryAgeRes> rpc_res = std::make_shared<queryAgeRes>();
// 必须调用saveCallee预留引用计数!
// 否则rpc_req/rpc_res可能在函数返回后被析构,线程B访问就coredump
async_channel->saveCallee(rpc_controller, rpc_req, rpc_res, closure);
原则: 所有跨协程/跨线程传递的对象,一律使用shared_ptr,并确保引用计数正确。
难点五:epoll+协程的协作调度(★★★★☆)
难在哪里? epoll事件、协程状态、定时器三者的交互逻辑复杂。
Reactor::loop的关键调度逻辑:
1.先处理全局队列中的就绪协程(可能来自其他线程)
2.执行pending_tasks(fd的添加/删除等)
3.epoll_wait等待事件
4.收到事件后:
-
wakeup fd→消费唤醒数据
-
timer fd→直接执行定时回调
-
有关联协程→第一个直接Resume,后续的入全局队列
-
无关联协程但有回调→放入pending_tasks
5.处理跨线程投递的fd添加/删除
这个顺序不能随意调换,否则会出现:协程被错误唤醒、事件丢失、死锁等问题。
更多推荐



所有评论(0)