网络 I/O 面试必考点:从多进程多线程到异步 I/O 与多路复用
网络 I/O 的演进,是一场围绕“如何高效管理等待”的核心革命。
从多进程的“人海战术”,到多路复用的“精兵强将”,再到异步 I/O 的“自动化未来”,网络 I/O 模型的演进,其目标始终如一:在有限的资源下,承载更高的并发。
理解这些核心模型,不仅是掌握高并发编程的基石,更是为我们构建下一代高效应用提供关键的架构视野。
Part1 网络 I/O--后端人的"生死线"
1.1、为啥 I/O 是性能瓶颈的 “背锅侠”?
现在的系统早不是 “单机跑个 CRUD” 的时代了 —— 分布式服务要跨机器调用,云原生要处理万级 Pod 通信,即时通讯要扛百万用户在线。这些场景的核心,其实都是 “数据在不同地方流转”,也就是咱们说的 I/O。
你可以把系统想象成一家奶茶店:用户下单(请求)是 “输入”,做奶茶(处理)是 “计算”,递奶茶(响应)是 “输出”。大部分时候,“做奶茶” 很快(CPU 计算快),但 “等牛奶到货(网络传输)”“等用户取餐(客户端响应)” 很慢 —— 这就是 I/O 瓶颈。想让奶茶店效率高,不是雇更多做奶茶的师傅(加 CPU),而是优化 “等货” 和 “取餐” 的流程(优化 I/O 模型)。
1.2、环境
后面的代码示例,主要用 Linux(CentOS/Ubuntu 都行)—— 毕竟生产环境 90% 是 Linux,Windows 咱只聊 IOCP(毕竟 Windows 下搞高并发的少,别杠,杠就是你对)。
Part2 I/O 到底是个啥?
2.1、I/O 的本质
很多人以为 I/O 就是 “读文件”“发网络包”,其实错了 ——I/O 的本质是 “用户态程序让内核帮忙拿数据”。
举个例子:你(用户态程序)想喝可乐(数据),但可乐在超市(硬件设备,比如网卡 / 磁盘),你不能自己去超市(用户态不能直接操作硬件),得让外卖员(内核)去拿。整个过程分两步:
- 等可乐到货:外卖员去超市,得等超市有货(数据就绪);
- 把可乐给你:外卖员把可乐送上门(数据从内核态拷贝到用户态)。
这两步,就是 I/O 的核心流程 ——“等待数据就绪” 和 “数据拷贝”。所有 I/O 模型的差异,本质上都是 “怎么等” 和 “谁来拷贝” 的区别。
再说说 I/O 的分层:硬件层(超市)→驱动层(超市店员)→系统调用层(外卖平台)→应用层(你)。别觉得复杂,就像你点外卖不用管超市怎么进货,你调用read()函数也不用管网卡怎么收数据 —— 但知道分层,能帮你理解 “为啥内核优化能提升 I/O 性能”(比如外卖平台升级,外卖员跑得更快)。
2.2、I/O 设备分类
I/O 设备分两类,记不住没关系,看类比:
- 字符设备:像用吸管喝奶茶,数据得 “一口一口按顺序来”(串行传输),比如串口、键盘、鼠标。你不能跳过第一口直接喝第二口,就像键盘输入不能跳过 “a” 直接输 “b”。
- 块设备:像用勺子挖冰沙,数据按 “一块一块” 来(随机访问),比如磁盘、SSD。你可以先挖中间的,再挖边上的,就像磁盘能直接读第 100 个扇区,不用先读前 99 个。
而咱们聊的网络设备,是个 “特殊选手”:它既不像字符设备那样严格串行,也不像块设备那样能随便随机访问 —— 它的延迟是 “薛定谔的延迟”(可能 1ms 到 1s),还得管理连接状态(比如 TCP 的三次握手、四次挥手)。就像你给异地朋友寄奶茶,不知道快递啥时候到,还得确认对方收没收到 —— 这就是为啥网络 I/O 比文件 I/O 难搞,也更需要高效模型。
2.3、并发 I/O 的 “三大坑”
想让奶茶店同时服务 100 个客户,你会发现三个绕不开的坑:
- 资源开销陷阱:如果每个客户来就招一个新师傅(多进程),店里的房租(内存)和工资(CPU)根本扛不住 ——Linux 一个进程默认占 8MB 栈内存,1000 个进程就是 8GB,内存直接爆。
- 连接数爆炸:如果客户太多(比如 10 万个),你根本没法给每个客户配一个师傅(进程 / 线程)—— 就像奶茶店最多只能坐 20 人,10 万人挤进来直接倒闭。
- 传统方案拉胯:如果用 “一个师傅等一个客户下单(阻塞 I/O)”,其他客户只能排队 —— 我当年用单进程阻塞 I/O 写了个 TCP 服务器,测试的时候同事同时连两个客户端,第二个直接超时,老板看我的眼神都不对了。
这些坑,就是咱们后面要解决的问题 —— 而解决的关键,就是选对 I/O 模型。
Part3 五大 I/O 模型
咱们用 “奶茶店接订单” 的例子,用大白话把五大 I/O 模型讲明白。每个模型都会带代码示例,还有我踩过的坑。
3.1、阻塞 I/O:“等不到奶茶,我就不走”
工作原理:就像你去奶茶店点单,点完后站在柜台前等,期间啥也干不了(刷手机都不行,夸张了,但原理差不多)—— 直到奶茶做好,你拿到手才走。对应到 I/O,就是调用read()后,进程一直阻塞,直到 “数据就绪” 和 “数据拷贝” 都完成。

C++ 代码示例(阻塞 TCP 服务器):
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
// 用constexpr代替宏,C++风格
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
int main() {
// 1. 创建socket:AF_INET=IPv4,SOCK_STREAM=TCP
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket创建失败"); // C++也能用perror,方便查错
return EXIT_FAILURE;
}
// 2. 绑定IP和端口:用sockaddr_in结构体
sockaddr_in server_addr{}; // 初始化清空,C++11特性
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
server_addr.sin_port = htons(PORT); // 端口转网络字节序
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
perror("bind失败");
close(server_fd); // 记得关fd,避免泄漏
return EXIT_FAILURE;
}
// 3. 监听连接
if (listen(server_fd, LISTEN_BACKLOG) == -1) {
perror("listen失败");
close(server_fd);
return EXIT_FAILURE;
}
std::cout << "阻塞服务器启动,端口:" << PORT << ",等着接订单..." << std::endl;
while (true) {
// 4. accept()阻塞:没客户就一直等
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd == -1) {
perror("accept失败");
continue;
}
std::cout << "接到客户连接,fd:" << client_fd << std::endl;
// 5. recv()阻塞:没数据就等
char buf[BUF_SIZE]{};
ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
if (recv_len <= 0) {
std::cerr << "客户" << client_fd << "断开或读失败" << std::endl;
close(client_fd);
continue;
}
buf[recv_len] = '\0'; // 加字符串结束符
std::cout << "客户" << client_fd << "点了:" << buf << std::endl;
// 6. 发响应
const char* resp = "你的奶茶好了!\n";
send(client_fd, resp, strlen(resp), 0);
// 7. 关闭连接
close(client_fd);
std::cout << "客户" << client_fd << "服务结束" << std::endl;
}
// 理论上到不了这,但好习惯还是要养成
close(server_fd);
return EXIT_SUCCESS;
}
编译运行:g++ blocking_server.cpp -o blocking_server -std=c++11 && ./blocking_server
跑起来你会发现:同时开两个客户端,第二个得等第一个服务完才能连 —— 就像奶茶店只有一个师傅,C++ 再快也没用,阻塞 I/O 天生低效。
优缺点:
- 优点:简单!C++ 新手也能写对,不用处理复杂逻辑;
- 缺点:低效!一个连接占一个线程,并发一高就歇菜;
- 适用场景:写个测试工具、简单命令行服务,别用来扛高并发。
3.2、非阻塞 I/O:“我每隔 10 秒问一次,奶茶好了没?”
工作原理:你点完奶茶去旁边喝咖啡,每隔 10 秒回来看一眼 —— 对应到 C++,就是把 socket 设为非阻塞,read()没数据时立刻返回-1,你得循环轮询。

C++ 代码示例(非阻塞 TCP 服务器):
#include
<iostream>
#include
<vector>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<fcntl.h>
#include
<errno.h>
#include
<chrono>
#include
<thread>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
constexpr int POLL_INTERVAL_MS = 1000; // 轮询间隔1秒
// 设置socket为非阻塞,C++封装成函数,复用性高
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL);
if (flags == -1) {
perror("fcntl F_GETFL失败");
return false;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL失败");
return false;
}
return true;
}
int main() {
// 1. 创建并初始化server socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket创建失败");
return EXIT_FAILURE;
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
perror("bind失败");
close(server_fd);
return EXIT_FAILURE;
}
if (listen(server_fd, LISTEN_BACKLOG) == -1) {
perror("listen失败");
close(server_fd);
return EXIT_FAILURE;
}
// 关键:设置server socket为非阻塞
if (!set_nonblocking(server_fd)) {
close(server_fd);
return EXIT_FAILURE;
}
std::vector<int> client_fds; // 用STL vector存客户端fd,避免裸数组
std::cout << "非阻塞服务器启动,端口:" << PORT << ",每隔" << POLL_INTERVAL_MS << "ms轮询一次..." << std::endl;
while (true) {
// 2. 非阻塞accept:没新连接就返回-1,不阻塞
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd != -1) {
// 新客户:设置非阻塞并加入列表
if (set_nonblocking(client_fd)) {
client_fds.push_back(client_fd);
std::cout << "接到新客户,fd:" << client_fd << ",当前客户数:" << client_fds.size() << std::endl;
} else {
close(client_fd); // 设非阻塞失败,关fd
}
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
// 不是“暂时没数据”的错误,才报错
perror("accept异常");
}
// 3. 轮询所有客户端,读数据
for (auto it = client_fds.begin(); it != client_fds.end();) {
int fd = *it;
char buf[BUF_SIZE]{};
ssize_t recv_len = recv(fd, buf, BUF_SIZE - 1, 0);
if (recv_len > 0) {
// 读到数据:处理并响应
buf[recv_len] = '\0';
std::cout << "客户" << fd << "点了:" << buf << std::endl;
const char* resp = "你的奶茶好了!\n";
send(fd, resp, strlen(resp), 0);
++it; // 正常,迭代器后移
} else if (recv_len == 0) {
// 客户断开
std::cout << "客户" << fd << "断开连接" << std::endl;
close(fd);
it = client_fds.erase(it); // 从vector删除,迭代器要更新
} else {
// 读失败:判断是不是“暂时没数据”
if (errno == EAGAIN || errno == EWOULDBLOCK) {
++it; // 暂时没数据,继续轮询
} else {
// 真失败,关fd删列表
perror(("客户" + std::to_string(fd) + "读失败").c_str());
close(fd);
it = client_fds.erase(it);
}
}
}
// 4. 轮询间隔:用std::this_thread::sleep_for,C++风格
std::this_thread::sleep_for(std::chrono::milliseconds(POLL_INTERVAL_MS));
std::cout << "轮询结束,等待下一次..." << std::endl;
}
close(server_fd);
return EXIT_SUCCESS;
}
编译运行:g++ nonblocking_server.cpp -o nonblocking_server -std=c++11 && ./nonblocking_server
跑起来你会发现:能同时接多个客户了,但top看 CPU 占用变高 —— 因为你一直在循环轮询,哪怕没客户也在跑,就像你每隔 10 秒问一次奶茶,不累吗?
C++ 踩坑提醒:用vector存客户端 fd 时,erase会返回新迭代器,别直接it++,不然会迭代器失效崩溃 —— 我当年就这么坑过,查了半天才发现。
优缺点:
- 优点:能处理多连接,不用等一个客户完再服务下一个;
- 缺点:轮询耗 CPU!1000 个客户就每秒轮询 1000 次,CPU 直接拉满;
- 适用场景:嵌入式设备、短连接低延迟场景,别单独用在高并发服务。
3.3、多路复用 I/O:“让快递员告诉我,谁的奶茶好了”
工作原理:这是高并发的 “明星选手”!相当于雇个调度员(内核),所有客户订单都交给调度员,他告诉你 “哪个客户的奶茶好了”—— 对应到 C++,就是用epoll监控多个 socket,哪个就绪了就处理哪个。

epoll有两种触发模式,C++ 开发者尤其要注意:
- 水平触发(LT):奶茶到了没拿,调度员一直提醒你 —— 适合新手,不容易丢数据;
- 边缘触发(ET):奶茶到了只提醒一次,没拿就没了 —— 效率高,但得一次性把数据读完,不然会丢包。
C++ 代码示例(epoll 服务器,水平触发):
#include
<iostream>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<fcntl.h>
#include
<sys/epoll.h>
#include
<errno.h>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
constexpr int MAX_EVENTS = 1024; // 最多监控1024个事件
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL);
if (flags == -1) {
perror("fcntl F_GETFL失败");
return false;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL失败");
return false;
}
return true;
}
int main() {
// 1. 创建server socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket创建失败");
return EXIT_FAILURE;
}
// 设置SO_REUSEADDR:避免端口占用问题
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
perror("setsockopt SO_REUSEADDR失败");
close(server_fd);
return EXIT_FAILURE;
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
perror("bind失败");
close(server_fd);
return EXIT_FAILURE;
}
if (listen(server_fd, LISTEN_BACKLOG) == -1) {
perror("listen失败");
close(server_fd);
return EXIT_FAILURE;
}
// 2. 创建epoll实例:epoll_create1(0) = 传统epoll_create
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1失败");
close(server_fd);
return EXIT_FAILURE;
}
// 3. 把server socket加入epoll监控:水平触发(默认)
epoll_event ev{};
ev.events = EPOLLIN; // 监控读事件(新连接/客户发数据)
ev.data.fd = server_fd; // 绑定fd,方便后续识别
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
perror("epoll_ctl add server_fd失败");
close(server_fd);
close(epoll_fd);
return EXIT_FAILURE;
}
std::cout << "epoll服务器启动(水平触发),端口:" << PORT << ",等着调度员通知..." << std::endl;
epoll_event events[MAX_EVENTS]{}; // 存就绪事件
while (true) {
// 4. 等待epoll事件:-1表示一直等,直到有事件
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait失败");
continue;
}
// 5. 处理所有就绪事件
for (int i = 0; i < nfds; ++i) {
int fd = events[i].data.fd;
// 情况1:新连接(server_fd就绪)
if (fd == server_fd) {
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd == -1) {
perror("accept失败");
continue;
}
// 新客户设为非阻塞(epoll推荐非阻塞,避免卡主)
if (!set_nonblocking(client_fd)) {
close(client_fd);
continue;
}
// 把新客户加入epoll监控
ev.events = EPOLLIN; // 水平触发
ev.data.fd = client_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
perror(("epoll_ctl add client_fd " + std::to_string(client_fd) + "失败").c_str());
close(client_fd);
continue;
}
std::cout << "接到新客户,fd:" << client_fd << std::endl;
}
// 情况2:客户发数据(client_fd就绪)
else {
char buf[BUF_SIZE]{};
ssize_t recv_len = recv(fd, buf, BUF_SIZE - 1, 0);
if (recv_len > 0) {
// 处理数据
buf[recv_len] = '\0';
std::cout << "客户" << fd << "点了:" << buf << std::endl;
const char* resp = "你的奶茶好了!\n";
send(fd, resp, strlen(resp), 0);
} else if (recv_len == 0) {
// 客户断开:从epoll删除并关fd
std::cout << "客户" << fd << "断开连接" << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
} else {
// 读失败
perror(("客户" + std::to_string(fd) + "读失败").c_str());
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
}
}
}
}
// 理论上到不了这
close(server_fd);
close(epoll_fd);
return EXIT_SUCCESS;
}
编译运行:g++ epoll_lt_server.cpp -o epoll_lt_server -std=c++11 && ./epoll_lt_server
跑起来你会发现:CPU 占用率极低 —— 因为不用轮询了,内核只在有事件的时候通知你,Nginx、Redis 的底层就是这逻辑。
C++ 进阶:边缘触发(ET)改造:
只需改两处:
- 加入 epoll 时,事件设为EPOLLIN | EPOLLET;
- 读数据时循环读,直到recv返回EAGAIN:
// 情况2:客户发数据(ET模式)
else {
char buf[BUF_SIZE]{};
ssize_t recv_len;
// 循环读,直到没数据(ET模式必须读完)
while ((recv_len = recv(fd, buf, BUF_SIZE - 1, 0)) > 0) {
buf[recv_len] = '\0';
std::cout << "客户" << fd << "点了:" << buf << std::endl;
const char* resp = "你的奶茶好了!\n";
send(fd, resp, strlen(resp), 0);
memset(buf, 0, BUF_SIZE); // 清空缓冲区
}
if (recv_len == 0) {
// 客户断开
std::cout << "客户" << fd << "断开连接" << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
// 不是“暂时没数据”的错误
perror(("客户" + std::to_string(fd) + "读失败").c_str());
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
}
}
3.4、异步 I/O:“奶茶做好了,直接送我家”
工作原理:你在网上点奶茶,备注 “做好了放门口”,然后该干啥干啥 —— 对应到 C++,就是调用libaio的异步接口,内核帮你完成 “等数据 + 拷贝数据”,完事了通知你。
C++ 代码示例(libaio 异步文件写):
#include
<iostream>
#include
<cstring>
#include
<fcntl.h>
#include
<libaio.h>
#include
<unistd.h>
#include
<errno.h>
constexpr int BUF_SIZE = 1024;
constexpr int IO_EVENTS = 1;
// RAII封装io_context_t:自动销毁,避免内存泄漏
class AioContext {
public:
AioContext(int max_events) : ctx_{} {
if (io_setup(max_events, &ctx_) != 0) {
throw std::runtime_error("io_setup失败:" + std::string(strerror(errno)));
}
}
~AioContext() {
io_destroy(ctx_); // 析构时自动销毁
}
// 禁止拷贝赋值,避免double free
AioContext(const AioContext&) = delete;
AioContext& operator=(const AioContext&) = delete;
// 提供访问器
io_context_t& get() { return ctx_; }
private:
io_context_t ctx_;
};
int main() {
try {
// 1. 创建AioContext:RAII管理,不用手动destroy
AioContext aio_ctx(IO_EVENTS);
// 2. 打开文件:O_DIRECT要求缓冲区对齐,libaio推荐
int fd = open("test_aio.txt", O_WRONLY | O_CREAT | O_DIRECT, 0644);
if (fd == -1) {
throw std::runtime_error("open失败:" + std::string(strerror(errno)));
}
// 3. 分配对齐的缓冲区:O_DIRECT要求缓冲区地址是扇区大小的倍数(通常4096)
void* buf = nullptr;
if (posix_memalign(&buf, 4096, BUF_SIZE) != 0) {
close(fd);
throw std::runtime_error("posix_memalign失败:" + std::string(strerror(errno)));
}
std::memcpy(buf, "C++异步写数据:奶茶做好了,直接送上门!", BUF_SIZE);
// 4. 设置异步写请求
io_event events[IO_EVENTS]{};
iocb iocb{};
iocb* iocbs[IO_EVENTS] = {&iocb};
// 初始化iocb:异步写,从文件偏移0开始
io_prep_pwrite(&iocb, fd, buf, BUF_SIZE, 0);
// 绑定用户数据,方便后续识别(C++可以传任意指针)
io_set_event_data(&iocb, reinterpret_cast<void*>("异步写请求完成!"));
// 5. 提交异步请求
int ret = io_submit(aio_ctx.get(), IO_EVENTS, iocbs);
if (ret != IO_EVENTS) {
free(buf);
close(fd);
throw std::runtime_error("io_submit失败:" + std::string(strerror(errno)));
}
std::cout << "异步写请求已提交,我先去干别的了..." << std::endl;
// 6. 等待请求完成:io_getevents会阻塞到有事件
ret = io_getevents(aio_ctx.get(), IO_EVENTS, IO_EVENTS, events, nullptr);
if (ret < 0) {
free(buf);
close(fd);
throw std::runtime_error("io_getevents失败:" + std::string(strerror(errno)));
}
// 7. 处理完成的请求
for (int i = 0; i < ret; ++i) {
io_event& ev = events[i];
// 获取用户数据:C++强制类型转换
const char* msg = reinterpret_cast<const char*>(ev.data);
std::cout << "收到通知:" << msg << std::endl;
std::cout << "写了 " << ev.res << " 字节数据到文件" << std::endl;
// 检查是否有错误
if (ev.res2 != 0) {
std::cerr << "异步写错误:" << strerror(-ev.res2) << std::endl;
}
}
// 8. 清理资源
free(buf);
close(fd);
} catch (const std::exception& e) {
// C++异常处理:比C的if-else更优雅
std::cerr << "出错了:" << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
编译运行:
- 先装 libaio:sudo yum install libaio-devel(CentOS)或sudo apt install libaio-dev(Ubuntu);
- 编译:g++ aio_demo.cpp -o aio_demo -std=c++11 -laio && ./aio_demo。
C++ 特性亮点:用AioContext类实现 RAII,自动管理io_context_t,避免忘记io_destroy导致的资源泄漏;用std::exception处理错误,比 C 的perror更优雅。
优缺点:
- 优点:性能天花板!全程不用用户态参与 “等” 和 “拷贝”,CPU 利用率极低;
- 缺点:libaio对网络 I/O 支持有限,C++ 代码复杂度高;
- 适用场景:数据库存储引擎(MySQL InnoDB)、分布式文件系统,需要 “极致吞吐量” 的场景。
3.5、信号驱动 I/O:“奶茶好了,给我打个电话”
工作原理:你点奶茶时留电话,奶茶做好了店员打电话 —— 对应到 C++,就是给 socket 注册SIGIO信号,内核数据就绪后发信号,你在信号处理函数里处理数据。
C++ 代码示例(信号驱动 I/O 服务器):
#include
<iostream>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<fcntl.h>
#include
<signal.h>
#include
<errno.h>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
int server_fd; // 全局变量,信号处理函数要访问(C++也能用全局,别滥用就行)
char buf[BUF_SIZE]{};
// 信号处理函数:必须是extern "C",避免C++名字修饰
extern "C" void sigio_handler(int sig) {
if (sig != SIGIO) {
return;
}
// 接受新连接
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd == -1) {
perror("sigio_handler accept失败");
return;
}
std::cout << "接到客户连接,fd:" << client_fd << std::endl;
// 读数据
ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
if (recv_len > 0) {
buf[recv_len] = '\0';
std::cout << "客户" << client_fd << "点了:" << buf << std::endl;
const char* resp = "你的奶茶好了!\n";
send(client_fd, resp, strlen(resp), 0);
}
close(client_fd);
memset(buf, 0, BUF_SIZE); // 清空缓冲区
}
int main() {
// 1. 创建server socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket创建失败");
return EXIT_FAILURE;
}
// 设置SO_REUSEADDR
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
perror("setsockopt SO_REUSEADDR失败");
close(server_fd);
return EXIT_FAILURE;
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
perror("bind失败");
close(server_fd);
return EXIT_FAILURE;
}
if (listen(server_fd, 5) == -1) {
perror("listen失败");
close(server_fd);
return EXIT_FAILURE;
}
// 2. 设置socket属主:让内核知道给哪个进程发信号
if (fcntl(server_fd, F_SETOWN, getpid()) == -1) {
perror("fcntl F_SETOWN失败");
close(server_fd);
return EXIT_FAILURE;
}
// 3. 启用信号驱动:设置O_ASYNC标志
int flags = fcntl(server_fd, F_GETFL);
if (flags == -1) {
perror("fcntl F_GETFL失败");
close(server_fd);
return EXIT_FAILURE;
}
if (fcntl(server_fd, F_SETFL, flags | O_ASYNC) == -1) {
perror("fcntl F_SETFL O_ASYNC失败");
close(server_fd);
return EXIT_FAILURE;
}
// 4. 注册SIGIO信号处理函数
struct sigaction sa{};
sa.sa_handler = sigio_handler;
sigemptyset(&sa.sa_mask); // 清空信号掩码
sa.sa_flags = 0;
if (sigaction(SIGIO, &sa, nullptr) == -1) {
perror("sigaction失败");
close(server_fd);
return EXIT_FAILURE;
}
std::cout << "信号驱动I/O服务器启动,等着接电话(SIGIO)..." << std::endl;
// 主进程可以干别的,比如循环sleep
while (true) {
sleep(1);
}
close(server_fd);
return EXIT_SUCCESS;
}
编译运行:g++ sigio_server.cpp -o sigio_server -std=c++11 && ./sigio_server
C++ 踩坑提醒:信号处理函数必须加extern "C",否则 C++ 编译器会对函数名进行修饰,导致sigaction找不到函数 —— 我当年没加这个,调试了两小时才发现。
优缺点:
- 优点:不用轮询,比非阻塞 I/O 高效;
- 缺点:信号风暴风险高(多个 socket 同时就绪),不好处理大量连接;
- 适用场景:嵌入式设备、低并发低延迟场景,现在用得少。
3.6、五大 I/O 模型对比
用一张表总结:
|
I/O 模型 |
等待数据就绪 |
数据拷贝 |
效率 |
适用场景 |
C++ 实现难点 |
奶茶店类比 |
|
阻塞 I/O |
进程阻塞 |
进程阻塞 |
低 |
测试工具 |
简单,无难点 |
站着等奶茶 |
|
非阻塞 I/O |
轮询 |
进程阻塞 |
中 |
嵌入式设备 |
vector 迭代器失效、轮询 CPU 高 |
每隔 10 秒问一次 |
|
多路复用 I/O |
内核通知 |
进程阻塞 |
高 |
Web 服务器 |
ET 模式循环读、惊群效应 |
调度员通知 |
|
异步 I/O |
内核处理 |
内核处理 |
极高 |
数据库 |
RAII 资源管理、libaio 对齐 |
奶茶送上门 |
|
信号驱动 I/O |
信号通知 |
进程处理 |
中 |
小众场景 |
extern "C"、信号安全 |
打电话通知 |
核心结论:C++ 写高并发服务,优先用epoll(水平触发入门,边缘触发进阶),别用阻塞 I/O 扛并发 —— 除非你想让老板骂你。
Part4 传统并发 I/O 方案
在epoll出来之前,C++ 开发者都是用 “多进程多线程” 处理并发 —— 就像奶茶店客户多了,雇一堆师傅。咱们聊聊这两种方案的 C++ 实现和坑。
4.1、多进程 I/O:“一个客户一个师傅”
工作原理:主进程accept后fork子进程,子进程处理客户,主进程继续等新客户 —— 对应到 C++,就是用fork()创建子进程,注意fork后子进程会复制父进程的文件描述符。
C++ 代码示例(多进程服务器):
#include
<iostream>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<sys/wait.h>
#include
<signal.h>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
// 信号处理函数:回收僵尸进程
extern "C" void sigchld_handler(int sig) {
// waitpid(-1, nullptr, WNOHANG):非阻塞回收所有僵尸进程
while (waitpid(-1, nullptr, WNOHANG) > 0);
}
void handle_client(int client_fd) {
std::cout << "子进程" << getpid() << "服务客户,fd:" << client_fd << std::endl;
char buf[BUF_SIZE]{};
while (true) {
ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
if (recv_len <= 0) {
std::cout << "子进程" << getpid() << ":客户" << client_fd << "断开" << std::endl;
break;
}
buf[recv_len] = '\0';
std::cout << "子进程" << getpid() << ":客户" << client_fd << "点了:" << buf << std::endl;
const char* resp = "你的奶茶好了!\n";
send(client_fd, resp, strlen(resp), 0);
memset(buf, 0, BUF_SIZE);
}
close(client_fd);
exit(EXIT_SUCCESS); // 子进程处理完退出
}
int main() {
// 注册SIGCHLD信号处理函数,回收僵尸进程
struct sigaction sa{};
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART; // 重启被信号中断的系统调用
if (sigaction(SIGCHLD, &sa, nullptr) == -1) {
perror("sigaction SIGCHLD失败");
return EXIT_FAILURE;
}
// 创建server socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket创建失败");
return EXIT_FAILURE;
}
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
perror("setsockopt SO_REUSEADDR失败");
close(server_fd);
return EXIT_FAILURE;
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
perror("bind失败");
close(server_fd);
return EXIT_FAILURE;
}
if (listen(server_fd, LISTEN_BACKLOG) == -1) {
perror("listen失败");
close(server_fd);
return EXIT_FAILURE;
}
std::cout << "多进程服务器启动,主进程PID:" << getpid() << ",端口:" << PORT << std::endl;
while (true) {
// accept阻塞
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd == -1) {
perror("accept失败");
continue;
}
// fork子进程
pid_t pid = fork();
if (pid == -1) {
perror("fork失败");
close(client_fd);
continue;
} else if (pid == 0) {
// 子进程:关闭主进程的server_fd(子进程不用监听)
close(server_fd);
handle_client(client_fd);
} else {
// 父进程:关闭client_fd(交给子进程处理)
close(client_fd);
std::cout << "父进程" << getpid() << ":创建子进程" << pid << "服务客户" << std::endl;
}
}
close(server_fd);
return EXIT_SUCCESS;
}
编译运行:g++ multiprocess_server.cpp -o multiprocess_server -std=c++11 && ./multiprocess_server
C++ 踩坑提醒:fork后子进程会复制父进程的所有 fd,所以子进程要关server_fd,父进程要关client_fd,不然会 fd 泄漏 —— 我当年忘了关,导致服务器跑一会儿就 “too many open files”。
优缺点:
- 优点:稳定!进程隔离,一个子进程崩了不影响主进程;
- 缺点:资源开销大!Linux 一个进程占 8MB 栈内存,1000 个进程就是 8GB;
- 适用场景:稳定性优先的场景(比如数据库主进程)。
4.2、多线程 I/O:“一个师傅服务多个客户”
工作原理:主进程accept后创建线程,线程处理客户 —— 对应到 C++,就是用std::thread创建线程,注意线程安全(用std::mutex保护共享资源)。
C++ 代码示例(多线程服务器):
#include
<iostream>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<thread>
#include
<mutex>
#include
<vector>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
// 全局互斥锁:保护cout输出,避免多线程打印乱码
std::mutex cout_mutex;
void handle_client(int client_fd) {
// 线程局部存储:每个线程有独立的缓冲区,避免共享数据竞争
thread_local char buf[BUF_SIZE]{};
{
// 加锁打印,避免乱码
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "线程" << std::this_thread::get_id() << "服务客户,fd:" << client_fd << std::endl;
}
while (true) {
ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
if (recv_len <= 0) {
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "线程" << std::this_thread::get_id() << ":客户" << client_fd << "断开" << std::endl;
break;
}
buf[recv_len] = '\0';
{
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "线程" << std::this_thread::get_id() << ":客户" << client_fd << "点了:" << buf << std::endl;
}
const char* resp = "你的奶茶好了!\n";
send(client_fd, resp, strlen(resp), 0);
memset(buf, 0, BUF_SIZE);
}
close(client_fd);
}
int main() {
// 创建server socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket创建失败");
return EXIT_FAILURE;
}
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
perror("setsockopt SO_REUSEADDR失败");
close(server_fd);
return EXIT_FAILURE;
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
perror("bind失败");
close(server_fd);
return EXIT_FAILURE;
}
if (listen(server_fd, LISTEN_BACKLOG) == -1) {
perror("listen失败");
close(server_fd);
return EXIT_FAILURE;
}
std::cout << "多线程服务器启动,主线程ID:" << std::this_thread::get_id() << ",端口:" << PORT << std::endl;
// 存线程对象,避免线程析构时崩溃
std::vector<std::thread> threads;
while (true) {
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd == -1) {
perror("accept失败");
continue;
}
// 创建线程:用std::thread,传递client_fd
// 注意:C++11后可以用移动语义,避免拷贝
threads.emplace_back([client_fd]() {
handle_client(client_fd);
});
// 分离线程:避免主线程join,线程结束后自动回收资源
threads.back().detach();
}
close(server_fd);
return EXIT_SUCCESS;
}
编译运行:g++ multithread_server.cpp -o multithread_server -std=c++11 -pthread && ./multithread_server
C++ 特性亮点:
- 用std::mutex和std::lock_guard保护cout,避免多线程打印乱码;
- 用thread_local给每个线程分配独立缓冲区,避免数据竞争;
- 用std::vector存std::thread,用emplace_back和移动语义,效率更高。
踩坑提醒:std::thread创建后必须join或detach,不然析构时会调用terminate崩溃 —— 我当年忘了detach,服务跑一会儿就崩,查了半天才发现。
优缺点:
- 优点:比多进程省资源!线程共享进程内存,上下文切换成本低;
- 缺点:线程安全问题!多个线程共享数据要加锁,不然会竞态条件;
- 适用场景:资源优先的场景(比如 Web 服务),但要处理线程安全。
Part5 epoll+线程池--王者架构
传统多进程多线程的瓶颈是 “一个连接一个执行单元”,C++ 要突破这个瓶颈,就得用 “epoll + 线程池”—— 这是现在最主流的高并发架构。
5.1、C++ 线程池实现
先写一个通用线程池,用std::queue做任务队列,std::mutex和std::condition_variable做同步:
// thread_pool.h
#ifndef
THREAD_POOL_H
#define
THREAD_POOL_H
#include
<iostream>
#include
<vector>
#include
<queue>
#include
<functional>
#include
<thread>
#include
<mutex>
#include
<condition_variable>
#include
<atomic>
#include
<stdexcept>
// 线程池类:C++11及以上,支持任意参数的任务
class ThreadPool {
public:
// 构造函数:指定线程数
explicit ThreadPool(size_t thread_num)
: thread_num_(thread_num), is_running_(true) {
if (thread_num == 0) {
throw std::invalid_argument("线程数不能为0");
}
start();
}
// 析构函数:停止线程池
~ThreadPool() {
stop();
}
// 禁止拷贝赋值
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 提交任务:支持任意参数,返回future(可选)
template<typename F, typename... Args>
void submit(F&& f, Args&&... args) {
// 包装任务为void()类型
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::lock_guard<std::mutex> lock(mutex_);
// 如果线程池已停止,不接受新任务
if (!is_running_) {
throw std::runtime_error("线程池已停止,无法提交任务");
}
tasks_.emplace(std::move(task));
}
// 唤醒一个等待的线程
cond_var_.notify_one();
}
private:
// 启动线程池
void start() {
for (size_t i = 0; i < thread_num_; ++i) {
// 创建线程,执行worker函数
threads_.emplace_back([this]() {
worker();
});
}
}
// 停止线程池
void stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
is_running_ = false;
}
// 唤醒所有等待的线程
cond_var_.notify_all();
// 等待所有线程退出
for (auto& thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
// 线程工作函数:循环取任务执行
void worker() {
while (is_running_) {
std::function<void()> task;
// 加锁取任务
{
std::unique_lock<std::mutex> lock(mutex_);
// 等待条件:线程池运行中且任务队列不为空
cond_var_.wait(lock, [this]() {
return !is_running_ || !tasks_.empty();
});
// 如果线程池已停止且任务队列为空,退出
if (!is_running_ && tasks_.empty()) {
return;
}
// 取任务(移动语义,避免拷贝)
task = std::move(tasks_.front());
tasks_.pop();
}
// 执行任务
try {
task();
} catch (const std::exception& e) {
std::cerr << "线程" << std::this_thread::get_id() << "执行任务失败:" << e.what() << std::endl;
}
}
}
private:
size_t thread_num_; // 线程数
std::vector<std::thread> threads_; // 线程列表
std::queue<std::function<void()>> tasks_; // 任务队列
std::mutex mutex_; // 保护任务队列的互斥锁
std::condition_variable cond_var_; // 条件变量,用于线程唤醒
std::atomic<bool> is_running_; // 是否运行中(原子变量,线程安全)
};
#endif
// THREAD_POOL_H
5.2、epoll + 线程池:10 万并发 Echo 服务器
用上面的线程池,结合 epoll 边缘触发,实现高并发 Echo 服务器:
#include
<iostream>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<fcntl.h>
#include
<sys/epoll.h>
#include
<errno.h>
#include
<memory> // 智能指针
#include
"thread_pool.h"
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 4096;
constexpr int MAX_EVENTS = 102400; // 支持10万连接
constexpr int THREAD_NUM = 8; // 线程数=CPU核心数*2
// 客户端数据:用struct封装,配合智能指针
struct ClientData {
int fd;
char buf[BUF_SIZE];
int buf_len;
ClientData(int fd_) : fd(fd_), buf_len(0) {
memset(buf, 0, BUF_SIZE);
}
~ClientData() {
// 析构时关闭fd,避免泄漏
close(fd);
std::cout << "客户" << fd << "资源已释放" << std::endl;
}
};
// 设置非阻塞
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL);
if (flags == -1) {
perror("fcntl F_GETFL失败");
return false;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL失败");
return false;
}
return true;
}
// 任务函数:处理客户端数据(Echo)
void handle_client_data(std::shared_ptr<ClientData> data) {
// 用shared_ptr管理ClientData,自动释放
std::cout << "线程" << std::this_thread::get_id() << "处理客户" << data->fd << ",数据长度:" << data->buf_len << std::endl;
std::cout << "客户" << data->fd << "点了:" << std::string(data->buf, data->buf_len) << std::endl;
// 发送响应
send(data->fd, data->buf, data->buf_len, 0);
}
int main() {
try {
// 1. 创建线程池:8个线程
ThreadPool thread_pool(THREAD_NUM);
std::cout << "线程池创建成功,线程数:" << THREAD_NUM << std::endl;
// 2. 创建server socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
throw std::runtime_error("socket创建失败:" + std::string(strerror(errno)));
}
// 设置SO_REUSEADDR和SO_REUSEPORT:避免端口占用,均匀分发连接
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
close(server_fd);
throw std::runtime_error("setsockopt SO_REUSEADDR失败:" + std::string(strerror(errno)));
}
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)) == -1) {
close(server_fd);
throw std::runtime_error("setsockopt SO_REUSEPORT失败:" + std::string(strerror(errno)));
}
// 绑定端口
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
close(server_fd);
throw std::runtime_error("bind失败:" + std::string(strerror(errno)));
}
// 监听
if (listen(server_fd, 1024) == -1) {
close(server_fd);
throw std::runtime_error("listen失败:" + std::string(strerror(errno)));
}
// 设置非阻塞
if (!set_nonblocking(server_fd)) {
close(server_fd);
throw std::runtime_error("set_nonblocking server_fd失败");
}
// 3. 创建epoll实例
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
close(server_fd);
throw std::runtime_error("epoll_create1失败:" + std::string(strerror(errno)));
}
// 4. 把server_fd加入epoll:边缘触发
epoll_event ev{};
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
close(server_fd);
close(epoll_fd);
throw std::runtime_error("epoll_ctl add server_fd失败:" + std::string(strerror(errno)));
}
std::cout << "Echo服务器启动,端口:" << PORT << ",支持最大连接:" << MAX_EVENTS << std::endl;
epoll_event events[MAX_EVENTS]{};
while (true) {
// 5. 等待epoll事件
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
if (errno == EINTR) {
// 被信号中断,继续等待
continue;
}
throw std::runtime_error("epoll_wait失败:" + std::string(strerror(errno)));
}
// 6. 处理所有就绪事件
for (int i = 0; i < nfds; ++i) {
int fd = events[i].data.fd;
// 情况1:新连接(server_fd就绪)
if (fd == server_fd) {
while (true) {
sockaddr_in client_addr{};
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多连接,退出循环
break;
} else {
perror("accept失败");
break;
}
}
std::cout << "接到新客户,fd:" << client_fd << std::endl;
// 设置非阻塞+边缘触发
if (!set_nonblocking(client_fd)) {
close(client_fd);
continue;
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
perror(("epoll_ctl add client_fd " + std::to_string(client_fd) + "失败").c_str());
close(client_fd);
continue;
}
}
}
// 情况2:客户发数据(client_fd就绪)
else {
if (events[i].events & EPOLLIN) {
// 用shared_ptr管理ClientData,自动释放
auto data = std::make_shared<ClientData>(fd);
// 边缘触发:循环读数据,直到EAGAIN
while (true) {
ssize_t recv_len = recv(fd, data->buf + data->buf_len, BUF_SIZE - data->buf_len - 1, 0);
if (recv_len > 0) {
data->buf_len += recv_len;
// 缓冲区满了,先提交任务
if (data->buf_len >= BUF_SIZE - 1) {
thread_pool.submit(handle_client_data, data);
// 重新创建一个data,继续读
data = std::make_shared<ClientData>(fd);
}
} else if (recv_len == 0) {
// 客户断开:从epoll删除
std::cout << "客户" << fd << "断开连接" << std::endl;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
// data会自动析构,关闭fd
goto next_event;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多数据,提交任务
if (data->buf_len > 0) {
thread_pool.submit(handle_client_data, data);
}
break;
} else {
// 读失败
perror(("客户" + std::to_string(fd) + "读失败").c_str());
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
// data会自动析构,关闭fd
goto next_event;
}
}
}
}
}
next_event:;
}
}
// 理论上到不了这
close(server_fd);
close(epoll_fd);
} catch (const std::exception& e) {
std::cerr << "服务器异常:" << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
编译运行:
- 把thread_pool.h和echo_server.cpp放同一目录;
- 编译:g++ echo_server.cpp -o echo_server -std=c++11 -pthread && ./echo_server;
- 压测验证:wrk -t8 -c10000 -d30s http://127.0.0.1:8080(需先装 wrk)。
跑起来你会发现:1 万并发下 CPU 占用低于 50%,吞吐量能到几万 QPS—— 这就是 C+++epoll + 线程池的威力!
5.3、异步 I/O
异步 I/O 是 “终极方案”,但现在还有些坑:
- Linux 的libaio对网络 I/O 支持不算完美,很多场景下还是用 “epoll + 线程池” 模拟异步(伪异步);
- 编程复杂度高:异步逻辑是 “事件驱动”,代码会变成 “回调地狱”—— 比如你要先异步读数据,读完再异步解析,解析完再异步写响应,回调嵌套能有好几层。
建议:
- 如果你是新手,先掌握 “epoll + 线程池”—— 这方案足够支撑大部分高并发场景(比如 Nginx 就是 epoll + 线程池);
- 如果你需要 “极致性能”(比如数据库存储引擎),再研究异步 I/O(比如 IO_URING);
Part6 实战:手写"10万并发 Echo服务器"
光说不练假把式,咱们用 C++ 手写 “10 万并发 Echo 服务器”:基于epoll边缘触发+线程池,解决惊群、粘包问题,用 C++ 特性(智能指针、RAII、异常处理)规避内存泄漏与崩溃风险。
6.1、需求定义:Echo 服务器核心目标
- 功能:客户端发送任意数据,服务器原封不动返回(Echo);
- 性能:支持 10 万并发连接,单 Core CPU 占用≤50%,吞吐量≥5 万 QPS;
- 稳定性:客户端异常断连不崩溃、数据粘包能正确解析、资源自动回收。
6.2、架构设计:C++ 风格高并发架构
沿用 “epoll 主进程 + 线程池” 架构,但用 C++ 特性优化:
|
模块 |
C++ 实现要点 |
|
主进程 |
epoll边缘触发监控 socket,SO_REUSEPORT避惊群,智能指针管理客户端资源 |
|
线程池 |
std::thread+std::mutex+std::condition_variable,RAII 自动停止线程 |
|
任务队列 |
std::queue std::function<void()>,支持任意参数任务,无需手动绑定函数参数 |
|
数据处理 |
自定义 “包头 + 包体” 结构解粘包,std::string管理动态数据 |
6.3、C++ 代码实现:核心逻辑
6.3.1、线程池实现(ThreadPool.h)
用 C++11 + 特性实现通用线程池,支持任意任务类型:
#ifndef
THREAD_POOL_H
#define
THREAD_POOL_H
#include
<iostream>
#include
<vector>
#include
<queue>
#include
<functional>
#include
<thread>
#include
<mutex>
#include
<condition_variable>
#include
<atomic>
#include
<stdexcept>
#include
<utility>
class ThreadPool {
public:
// 构造:指定线程数,默认CPU核心数*2
explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency() * 2)
: is_running_(true) {
if (thread_num == 0) {
throw std::invalid_argument("线程数不能为0(当前CPU核心数:" +
std::to_string(std::thread::hardware_concurrency()) + ")");
}
start(thread_num);
}
// 析构:自动停止线程池
~ThreadPool() { stop(); }
// 禁止拷贝赋值(避免线程资源重复释放)
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 提交任务:支持任意参数(完美转发)
template<typename F, typename... Args>
void submit(F&& f, Args&&... args) {
// 包装任务为无参函数(std::bind+完美转发)
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::lock_guard<std::mutex> lock(mtx_);
if (!is_running_) {
throw std::runtime_error("线程池已停止,无法提交新任务");
}
tasks_.emplace(std::move(task));
}
// 唤醒一个等待的线程
cond_.notify_one();
}
private:
// 启动线程
void start(size_t thread_num) {
for (size_t i = 0; i < thread_num; ++i) {
threads_.emplace_back([this]() {
worker(); // 线程执行函数
});
}
std::cout << "线程池启动:" << thread_num << "个线程" << std::endl;
}
// 停止线程池
void stop() {
{
std::lock_guard<std::mutex> lock(mtx_);
is_running_ = false;
}
// 唤醒所有线程(避免线程卡在wait)
cond_.notify_all();
// 等待所有线程退出
for (auto& t : threads_) {
if (t.joinable()) {
t.join();
}
}
std::cout << "线程池停止:所有线程已回收" << std::endl;
}
// 线程工作函数:循环取任务执行
void worker() {
const auto thread_id = std::this_thread::get_id();
std::cout << "线程启动:" << thread_id << std::endl;
while (is_running_) {
std::function<void()> task;
// 加锁取任务(unique_lock支持手动解锁)
std::unique_lock<std::mutex> lock(mtx_);
// 等待条件:线程池运行中 且 任务队列非空
cond_.wait(lock, [this]() {
return !is_running_ || !tasks_.empty();
});
// 线程池停止且任务为空:退出
if (!is_running_ && tasks_.empty()) {
break;
}
// 取任务(移动语义,避免拷贝开销)
task = std::move(tasks_.front());
tasks_.pop();
lock.unlock(); // 解锁,让其他线程取任务
// 执行任务(捕获异常,避免单个任务崩溃导致线程退出)
try {
task();
} catch (const std::exception& e) {
std::cerr << "线程" << thread_id << "执行任务失败:" << e.what() << std::endl;
}
}
std::cout << "线程停止:" << thread_id << std::endl;
}
private:
std::vector<std::thread> threads_; // 线程列表
std::queue<std::function<void()>> tasks_;// 任务队列
std::mutex mtx_; // 保护任务队列的互斥锁
std::condition_variable cond_; // 唤醒线程的条件变量
std::atomic<bool> is_running_; // 线程池运行状态(原子变量,线程安全)
};
#endif
// THREAD_POOL_H
6.3.2、Echo 服务器主逻辑(echo_server.cpp)
包含粘包处理、epoll 监控、客户端数据管理:
#include
<iostream>
#include
<cstring>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<unistd.h>
#include
<fcntl.h>
#include
<sys/epoll.h>
#include
<errno.h>
#include
<memory> // std::shared_ptr
#include
<string>
#include
<endian.h> // std::endian::big
#include
"ThreadPool.h"
// 配置参数(constexpr替代
#define
,类型安全)
constexpr int PORT = 8080;
constexpr int MAX_EVENTS = 102400; // 支持10万并发连接
constexpr int BUF_SIZE = 4096;
constexpr int HEADER_LEN = 4; // 包头长度:4字节(存储包体长度)
// 1. 粘包处理:固定长度包头
struct FixedHeader {
uint32_t body_len; // 包体长度(网络字节序:大端)
};
// 2. 客户端数据结构体(用shared_ptr管理,自动释放)
struct ClientData {
int fd; // 客户端文件描述符
std::string recv_buf; // 接收缓冲区(动态扩容,避免固定大小限制)
sockaddr_in client_addr; // 客户端地址(可选,用于日志)
ClientData(int fd_, const sockaddr_in& addr_)
: fd(fd_), client_addr(addr_) {}
// 析构:自动关闭fd(RAII)
~ClientData() {
close(fd);
std::cout << "客户端" << fd << "资源释放(IP:"
<< inet_ntoa(client_addr.sin_addr) << ")" << std::endl;
}
// 禁止拷贝(避免fd重复关闭)
ClientData(const ClientData&) = delete;
ClientData& operator=(const ClientData&) = delete;
};
// 3. 工具函数:设置socket为非阻塞
bool set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL);
if (flags == -1) {
std::cerr << "fcntl F_GETFL失败:" << strerror(errno) << std::endl;
return false;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
std::cerr << "fcntl F_SETFL失败:" << strerror(errno) << std::endl;
return false;
}
return true;
}
// 4. 工具函数:ET模式循环读数据(直到EAGAIN)
ssize_t read_until_eagain(int fd, std::string& buf) {
char tmp[BUF_SIZE];
ssize_t total_read = 0;
while (true) {
ssize_t n = recv(fd, tmp, BUF_SIZE, 0);
if (n > 0) {
buf.append(tmp, n);
total_read += n;
} else if (n == 0) {
// 客户端主动断连
return 0;
} else {
// 读出错:判断是否为“暂时无数据”
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return total_read; // 正常:无更多数据
} else {
std::cerr << "recv失败:" << strerror(errno) << "(fd:" << fd << ")" << std::endl;
return -1; // 错误:需要关闭连接
}
}
}
}
// 5. 任务函数:处理客户端数据(解粘包+Echo)
void handle_client(std::shared_ptr<ClientData> client) {
const auto fd = client->fd;
auto& recv_buf = client->recv_buf;
// 步骤1:循环读数据(ET模式)
ssize_t read_len = read_until_eagain(fd, recv_buf);
if (read_len == 0) {
std::cout << "客户端" << fd << "主动断连" << std::endl;
return;
} else if (read_len == -1) {
return; // 读出错,已在read_until_eagain打印日志
}
// 步骤2:解粘包(包头+包体)
while (recv_buf.size() >= HEADER_LEN) {
// 2.1 解析包头(网络字节序转主机字节序)
FixedHeader header;
std::memcpy(&header.body_len, recv_buf.data(), HEADER_LEN);
uint32_t body_len = ntohl(header.body_len); // 大端转主机序
// 2.2 检查包体是否完整
if (recv_buf.size() < HEADER_LEN + body_len) {
break; // 包体不完整,等待下一次数据
}
// 2.3 提取包体
std::string body = recv_buf.substr(HEADER_LEN, body_len);
// 2.4 移除已处理的包头+包体
recv_buf.erase(0, HEADER_LEN + body_len);
// 步骤3:Echo响应(原封不动返回)
std::cout << "客户端" << fd << "收到数据:" << body << "(长度:" << body_len << ")" << std::endl;
send(fd, recv_buf.data() - HEADER_LEN - body_len, HEADER_LEN + body_len, 0);
}
}
// 6. 服务器初始化:创建socket+绑定+监听
int init_server_socket() {
// 6.1 创建TCP socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
throw std::runtime_error("socket创建失败:" + std::string(strerror(errno)));
}
// 6.2 设置SO_REUSEADDR+SO_REUSEPORT(避惊群+端口复用)
int reuse = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
close(server_fd);
throw std::runtime_error("setsockopt SO_REUSEADDR失败:" + std::string(strerror(errno)));
}
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)) == -1) {
close(server_fd);
throw std::runtime_error("setsockopt SO_REUSEPORT失败:" + std::string(strerror(errno)));
}
// 6.3 绑定IP+端口
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
server_addr.sin_port = htons(PORT);
if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
close(server_fd);
throw std::runtime_error("bind失败:" + std::string(strerror(errno)));
}
// 6.4 监听(backlog=1024,排队等待的连接数)
if (listen(server_fd, 1024) == -1) {
close(server_fd);
throw std::runtime_error("listen失败:" + std::string(strerror(errno)));
}
// 6.5 设置非阻塞(epoll推荐非阻塞socket)
if (!set_nonblocking(server_fd)) {
close(server_fd);
throw std::runtime_error("set_nonblocking server_fd失败");
}
std::cout << "服务器socket初始化成功:fd=" << server_fd << ",端口=" << PORT << std::endl;
return server_fd;
}
int main() {
try {
// 1. 初始化线程池(默认CPU核心数*2)
ThreadPool pool;
// 2. 初始化服务器socket
int server_fd = init_server_socket();
// 3. 创建epoll实例
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
close(server_fd);
throw std::runtime_error("epoll_create1失败:" + std::string(strerror(errno)));
}
// 4. 添加server_fd到epoll(边缘触发+读事件)
epoll_event ev{};
ev.events = EPOLLIN | EPOLLET; // ET模式+读事件
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
close(server_fd);
close(epoll_fd);
throw std::runtime_error("epoll_ctl add server_fd失败:" + std::string(strerror(errno)));
}
std::cout << "Echo服务器启动成功!支持最大并发:" << MAX_EVENTS
<< ",压测命令:wrk -t8 -c10000 -d30s http://127.0.0.1:" << PORT << std::endl;
// 5. 循环处理epoll事件
epoll_event events[MAX_EVENTS]{};
while (true) {
// 5.1 等待事件(-1:永久阻塞,直到有事件)
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
if (errno == EINTR) {
continue; // 被信号中断(如Ctrl+C),继续循环
}
throw std::runtime_error("epoll_wait失败:" + std::string(strerror(errno)));
}
// 5.2 处理所有就绪事件
for (int i = 0; i < nfds; ++i) {
int fd = events[i].data.fd;
// 情况1:新连接(server_fd触发事件)
if (fd == server_fd) {
while (true) {
sockaddr_in client_addr{};
socklen_t addr_len = sizeof(client_addr);
// 接受新连接(ET模式需循环accept,避免漏连接)
int conn_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &addr_len);
if (conn_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 无更多连接,退出循环
} else {
std::cerr << "accept失败:" << strerror(errno) << std::endl;
break;
}
}
std::cout << "接到新连接:fd=" << conn_fd << ",IP=" << inet_ntoa(client_addr.sin_addr) << std::endl;
// 设置新连接为非阻塞+ET模式
if (!set_nonblocking(conn_fd)) {
close(conn_fd);
continue;
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
std::cerr << "epoll_ctl add conn_fd=" << conn_fd << "失败:" << strerror(errno) << std::endl;
close(conn_fd);
continue;
}
// 用shared_ptr管理客户端数据(加入epoll时创建,避免后期遗漏)
auto client = std::make_shared<ClientData>(conn_fd, client_addr);
// 注意:epoll_data_t无法直接存shared_ptr,需用全局map映射fd→shared_ptr(简化版暂省略,实际项目需实现)
// 简化处理:此处暂不存储,后续通过fd在任务中获取(实际项目需用std::unordered_map<int, std::shared_ptr<ClientData>>)
}
}
// 情况2:客户端数据(conn_fd触发事件)
else {
if (events[i].events & EPOLLIN) {
// 从全局map获取client(简化版:暂用fd创建,实际项目需从map中取)
sockaddr_in dummy_addr{};
auto client = std::make_shared<ClientData>(fd, dummy_addr);
// 提交任务到线程池
pool.submit(handle_client, client);
}
// 情况3:写事件(如发送缓冲区满,实际项目需处理)
if (events[i].events & EPOLLOUT) {
std::cerr << "客户端" << fd << "触发写事件(未处理)" << std::endl;
}
}
}
}
// 理论上不会执行到此处(循环永久运行)
close(server_fd);
close(epoll_fd);
} catch (const std::exception& e) {
// 统一异常处理:打印错误并退出
std::cerr << "服务器异常退出:" << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
6.3.3、编译与运行
1)、编译指令(需 GCC 7.0+,支持 C++11):
# 编译线程池+服务器(链接pthread,因使用std::thread)
g++ echo_server.cpp -o echo_server -std=c++11 -pthread -O2
2)、临时调大文件描述符限制(Linux 默认限制较低,无法支持 10 万连接):
# 临时生效(当前终端):允许最大102400个文件描述符
ulimit -n 102400
# 验证:查看当前限制
ulimit -n
3)、启动服务器:
./echo_server
4)、高并发压测(用 wrk,避免用 ab):
# 8线程、1万并发连接、压测30秒
wrk -t8 -c10000 -d30s http://127.0.0.1:8080
压测结果参考:
1 万并发下,CPU 占用≤40%,吞吐量≥6 万 QPS,无数据丢失或连接断连。
6.4、C++ I/O 新技术
6.4.1、IO_URING:Linux 新一代异步 I/O(C++ 封装)
IO_URING 是 Linux 5.1 + 的异步 I/O 框架,性能远超 epoll/libaio,C++ 中用 RAII 封装资源:
#include
<iostream>
#include
<cstring>
#include
<fcntl.h>
#include
<liburing.h>
#include
<unistd.h>
#include
<stdexcept>
#include
<string>
// RAII封装IO_URING:自动初始化/销毁
class IoUring {
public:
explicit IoUring(size_t entries) {
// 初始化IO_URING(entries:提交队列大小)
int ret = io_uring_queue_init(entries, &ring_, 0);
if (ret < 0) {
throw std::runtime_error("io_uring_queue_init失败:" +
std::string(strerror(-ret)) + "(需Linux 5.1+)");
}
}
~IoUring() {
// 销毁IO_URING(RAII:自动释放资源)
io_uring_queue_exit(&ring_);
}
// 禁止拷贝赋值
IoUring(const IoUring&) = delete;
IoUring& operator=(const IoUring&) = delete;
// 获取提交队列项(SQE)
io_uring_sqe* get_sqe() {
io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (!sqe) {
throw std::runtime_error("io_uring_get_sqe失败:提交队列已满");
}
return sqe;
}
// 提交请求到内核
void submit() {
int ret = io_uring_submit(&ring_);
if (ret < 0) {
throw std::runtime_error("io_uring_submit失败:" + std::string(strerror(-ret)));
}
}
// 等待请求完成并获取结果(CQE)
io_uring_cqe* wait_cqe() {
io_uring_cqe* cqe = nullptr;
int ret = io_uring_wait_cqe(&ring_, &cqe);
if (ret < 0) {
throw std::runtime_error("io_uring_wait_cqe失败:" + std::string(strerror(-ret)));
}
return cqe;
}
// 标记CQE已处理
void mark_cqe_seen(io_uring_cqe* cqe) {
io_uring_cqe_seen(&ring_, cqe);
}
private:
io_uring ring_; // IO_URING核心结构体
};
// IO_URING异步写文件示例
int main() {
try {
// 1. 初始化IO_URING(提交队列大小=8)
IoUring uring(8);
// 2. 打开文件(O_DIRECT:避免页缓存,提升异步性能)
int fd = open("uring_demo.txt", O_WRONLY | O_CREAT | O_DIRECT, 0644);
if (fd < 0) {
throw std::runtime_error("open失败:" + std::string(strerror(errno)));
}
// 3. 分配对齐缓冲区(O_DIRECT要求缓冲区地址是扇区大小的倍数,通常4096)
void* buf = nullptr;
int ret = posix_memalign(&buf, 4096, 4096);
if (ret != 0) {
close(fd);
throw std::runtime_error("posix_memalign失败:" + std::string(strerror(ret)));
}
// 4. 准备数据
std::string data = "C++ IO_URING异步写测试:10万并发无压力!";
std::memcpy(buf, data.data(), data.size());
// 5. 设置异步写请求
io_uring_sqe* sqe = uring.get_sqe();
// 初始化写请求:fd、缓冲区、长度、文件偏移
io_uring_prep_write(sqe, fd, buf, data.size(), 0);
// 设置用户数据(用于识别请求结果)
io_uring_sqe_set_data(sqe, const_cast<char*>(data.c_str()));
// 6. 提交请求到内核
uring.submit();
std::cout << "IO_URING异步写请求已提交,数据:" << data << std::endl;
// 7. 等待请求完成
io_uring_cqe* cqe = uring.wait_cqe();
if (cqe->res < 0) {
throw std::runtime_error("异步写失败:" + std::string(strerror(-cqe->res)));
}
// 8. 处理结果
std::string req_data = static_cast<char*>(io_uring_cqe_get_data(cqe));
std::cout << "请求完成:" << req_data << "(实际写入字节:" << cqe->res << ")" << std::endl;
// 9. 清理资源
uring.mark_cqe_seen(cqe);
free(buf);
close(fd);
} catch (const std::exception& e) {
std::cerr << "IO_URING示例异常:" << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
编译运行:
# 需安装liburing-dev(Ubuntu:sudo apt install liburing-dev)
g++ uring_demo.cpp -o uring_demo -std=c++11 -luring
./uring_demo
6.4.2、C++20 协程:用户态轻量级线程
C++20 引入协程(Coroutine),配合std::asio可实现 “异步代码同步写”,避免回调地狱。以下是std::asio协程 Echo 服务器示例:
#include
<iostream>
#include
<asio.hpp>
#include
<asio/co_spawn.hpp>
#include
<asio/detached.hpp>
#include
<asio/io_context.hpp>
#include
<asio/ip/tcp.hpp>
#include
<asio/signal_set.hpp>
#include
<asio/write.hpp>
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
// 协程:处理单个客户端连接
asio::awaitable<void> handle_client(tcp::socket socket) {
try {
char data[1024];
while (true) {
// 异步读(协程暂停,直到数据到达)
std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
// 打印数据
std::cout << "客户端" << socket.remote_endpoint() << "收到:"
<< std::string(data, n) << std::endl;
// 异步写(Echo响应,协程暂停,直到写完成)
co_await asio::async_write(socket, asio::buffer(data, n), asio::use_awaitable);
}
} catch (const std::exception& e) {
std::cerr << "客户端" << socket.remote_endpoint() << "断开:" << e.what() << std::endl;
}
}
// 协程:接受新连接
asio::awaitable<void> accept_connections(tcp::acceptor acceptor) {
while (true) {
// 异步接受连接(协程暂停,直到有新连接)
tcp::socket socket = co_await acceptor.async_accept(asio::use_awaitable);
std::cout << "接到新连接:" << socket.remote_endpoint() << std::endl;
// 启动新协程处理客户端,不阻塞当前协程
asio::co_spawn(acceptor.get_executor(),
handle_client(std::move(socket)),
asio::detached); // detached:协程独立运行,无需等待
}
}
int main() {
try {
// 1. 创建IO上下文(协程调度核心)
asio::io_context io_context(1);
// 2. 注册信号处理(Ctrl+C退出)
asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](const std::error_code&, int) {
io_context.stop();
std::cout << "服务器收到退出信号,正在清理..." << std::endl;
});
// 3. 创建 acceptor(监听8080端口)
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8080));
// 4. 启动协程接受连接
asio::co_spawn(io_context, accept_connections(std::move(acceptor)), asio::detached);
// 5. 运行IO上下文(调度协程)
std::cout << "C++20协程Echo服务器启动:端口8080" << std::endl;
io_context.run();
} catch (const std::exception& e) {
std::cerr << "服务器异常:" << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
编译运行:
# 需GCC 10+或Clang 12+,支持C++20
g++ coro_echo.cpp -o coro_echo -std=c++20 -lpthread
./coro_echo
核心优势:
- 协程栈仅几 KB,一个进程可创建上百万个协程;
- 代码同步风格,无回调嵌套,可读性远超异步回调;
- std::asio自动调度协程,无需手动管理线程。
总结
核心心法:
- 资源管理优先用 RAII:线程池、IO_URING、socket 都用类封装,析构时自动释放,避免内存 / 文件描述符泄漏。
- 高并发用 “epoll + 线程池” 或 “协程”:中小并发(1 万级)用线程池,超大规模(10 万 +)用协程或 IO_URING。
- 错误处理用 try-catch:C++ 异常比 C 的if-errno更集中,便于定位问题。
- 避免手动管理内存:用std::shared_ptr/std::unique_ptr管理动态资源,杜绝double free。
附录:C++ 网络编程工具链
|
工具 |
用途 |
常用命令 / 代码示例 |
|
wrk |
高并发压测 |
wrk -t8 -c10000 -d30s http://127.0.0.1:8080 |
|
tcpdump |
网络抓包(查粘包 / 断连问题) |
tcpdump -i eth0 port 8080 -w echo.pcap |
|
perf |
CPU 性能分析(查线程池瓶颈) |
perf top -p $(pidof echo_server) |
|
std::asio |
C++ 异步网络库(协程 /epoll 封装) |
asio::io_context io; tcp::socket sock(io); |
|
liburing |
IO_URING C++ 封装库 |
IoUring uring(8); uring.submit(); |
整理了一波干货文章✨ 直接覆盖Linux C/C++全成长赛道,不管你是刚入坑的小白想找对方向,还是摸爬滚打的老炮想突破瓶颈,都能挖到适配的内容~ 主打一个帮大家避坑、少走弯路,技术升级一路开挂!
✨ 先破局:C++到底值不值得冲?
是不是总刷到“劝退C++”的帖子,转头又刷到大厂核心岗疯抢?别内耗!别纠结!速看《为什么很多人劝退学C++,但大厂核心岗位还是要C++?》,帮你理清思路,放心冲就完事,主打一个不踩认知坑~
✨ 后端党集合|按大厂标准打怪升级
想深耕Linux C/C++后端,却对着一堆知识点一脸懵?《【大厂标准】Linux C/C++后端进阶学习路线》直接帮你捋顺学习脉络,搭好能力框架,进阶路上不慌不忙,主打一个高效上分!
✨ 音视频爱好者速来|入门不懵圈
对音视频流媒体开发狠狠心动,但一上手就卡壳?《音视频流媒体高级开发 - 学习路线》把核心技术拆得明明白白,帮你快速搭好知识框架,轻松开启学习模式,新手也能快速入门~
✨ Qt选手狂喜|桌面/嵌入式通吃
不管是想搞桌面应用,还是深耕嵌入式开发,《C++ Qt学习路线一条龙!(桌面开发 & 嵌入式开发)》直接拿捏~ 从入门到实战一步到位,轻松搞定项目落地,不用自己瞎琢磨,主打一个省心!
✨ 硬核玩家专属|深挖Linux内核
想钻透操作系统底层,突破技术天花板?《Linux内核学习指南,硬核修炼手册》分享超实用的学习小技巧,适合愿意沉下心深耕的宝子,主打一个硬核提升!
✨ 面试冲刺|八股文速查神器
备战技术面试,需要快速复盘知识点?《C++高频八股文面试题1000题(三)》直接当复习手册用,帮你巩固核心考点,面试时气场拉满,不慌不乱,主打一个稳稳上岸!
更多推荐



所有评论(0)