网络 I/O 的演进,是一场围绕“如何高效管理等待”的核心革命。

从多进程的“人海战术”,到多路复用的“精兵强将”,再到异步 I/O 的“自动化未来”,网络 I/O 模型的演进,其目标始终如一:在有限的资源下,承载更高的并发。

理解这些核心模型,不仅是掌握高并发编程的基石,更是为我们构建下一代高效应用提供关键的架构视野。

Part1  网络 I/O--后端人的"生死线"

1.1、为啥 I/O 是性能瓶颈的 “背锅侠”?

现在的系统早不是 “单机跑个 CRUD” 的时代了 —— 分布式服务要跨机器调用,云原生要处理万级 Pod 通信,即时通讯要扛百万用户在线。这些场景的核心,其实都是 “数据在不同地方流转”,也就是咱们说的 I/O。

你可以把系统想象成一家奶茶店:用户下单(请求)是 “输入”,做奶茶(处理)是 “计算”,递奶茶(响应)是 “输出”。大部分时候,“做奶茶” 很快(CPU 计算快),但 “等牛奶到货(网络传输)”“等用户取餐(客户端响应)” 很慢 —— 这就是 I/O 瓶颈。想让奶茶店效率高,不是雇更多做奶茶的师傅(加 CPU),而是优化 “等货” 和 “取餐” 的流程(优化 I/O 模型)。

1.2、环境

后面的代码示例,主要用 Linux(CentOS/Ubuntu 都行)—— 毕竟生产环境 90% 是 Linux,Windows 咱只聊 IOCP(毕竟 Windows 下搞高并发的少,别杠,杠就是你对)。

Part2  I/O 到底是个啥?

2.1、I/O 的本质

很多人以为 I/O 就是 “读文件”“发网络包”,其实错了 ——I/O 的本质是 “用户态程序让内核帮忙拿数据”。

举个例子:你(用户态程序)想喝可乐(数据),但可乐在超市(硬件设备,比如网卡 / 磁盘),你不能自己去超市(用户态不能直接操作硬件),得让外卖员(内核)去拿。整个过程分两步:

  1. 等可乐到货:外卖员去超市,得等超市有货(数据就绪);
  2. 把可乐给你:外卖员把可乐送上门(数据从内核态拷贝到用户态)。

这两步,就是 I/O 的核心流程 ——“等待数据就绪” 和 “数据拷贝”。所有 I/O 模型的差异,本质上都是 “怎么等” 和 “谁来拷贝” 的区别。

再说说 I/O 的分层:硬件层(超市)→驱动层(超市店员)→系统调用层(外卖平台)→应用层(你)。别觉得复杂,就像你点外卖不用管超市怎么进货,你调用read()函数也不用管网卡怎么收数据 —— 但知道分层,能帮你理解 “为啥内核优化能提升 I/O 性能”(比如外卖平台升级,外卖员跑得更快)。

2.2、I/O 设备分类

I/O 设备分两类,记不住没关系,看类比:

  • 字符设备:像用吸管喝奶茶,数据得 “一口一口按顺序来”(串行传输),比如串口、键盘、鼠标。你不能跳过第一口直接喝第二口,就像键盘输入不能跳过 “a” 直接输 “b”。
  • 块设备:像用勺子挖冰沙,数据按 “一块一块” 来(随机访问),比如磁盘、SSD。你可以先挖中间的,再挖边上的,就像磁盘能直接读第 100 个扇区,不用先读前 99 个。

而咱们聊的网络设备,是个 “特殊选手”:它既不像字符设备那样严格串行,也不像块设备那样能随便随机访问 —— 它的延迟是 “薛定谔的延迟”(可能 1ms 到 1s),还得管理连接状态(比如 TCP 的三次握手、四次挥手)。就像你给异地朋友寄奶茶,不知道快递啥时候到,还得确认对方收没收到 —— 这就是为啥网络 I/O 比文件 I/O 难搞,也更需要高效模型。

2.3、并发 I/O 的 “三大坑”

想让奶茶店同时服务 100 个客户,你会发现三个绕不开的坑:

  1. 资源开销陷阱:如果每个客户来就招一个新师傅(多进程),店里的房租(内存)和工资(CPU)根本扛不住 ——Linux 一个进程默认占 8MB 栈内存,1000 个进程就是 8GB,内存直接爆。
  2. 连接数爆炸:如果客户太多(比如 10 万个),你根本没法给每个客户配一个师傅(进程 / 线程)—— 就像奶茶店最多只能坐 20 人,10 万人挤进来直接倒闭。
  3. 传统方案拉胯:如果用 “一个师傅等一个客户下单(阻塞 I/O)”,其他客户只能排队 —— 我当年用单进程阻塞 I/O 写了个 TCP 服务器,测试的时候同事同时连两个客户端,第二个直接超时,老板看我的眼神都不对了。

这些坑,就是咱们后面要解决的问题 —— 而解决的关键,就是选对 I/O 模型。

Part3  五大 I/O 模型

咱们用 “奶茶店接订单” 的例子,用大白话把五大 I/O 模型讲明白。每个模型都会带代码示例,还有我踩过的坑。

3.1、阻塞 I/O:“等不到奶茶,我就不走”

工作原理:就像你去奶茶店点单,点完后站在柜台前等,期间啥也干不了(刷手机都不行,夸张了,但原理差不多)—— 直到奶茶做好,你拿到手才走。对应到 I/O,就是调用read()后,进程一直阻塞,直到 “数据就绪” 和 “数据拷贝” 都完成。

C++ 代码示例(阻塞 TCP 服务器)

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>


// 用constexpr代替宏,C++风格
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;


int main() {
    // 1. 创建socket:AF_INET=IPv4,SOCK_STREAM=TCP
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket创建失败"); // C++也能用perror,方便查错
        return EXIT_FAILURE;
    }


    // 2. 绑定IP和端口:用sockaddr_in结构体
    sockaddr_in server_addr{}; // 初始化清空,C++11特性
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
    server_addr.sin_port = htons(PORT); // 端口转网络字节序


    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        perror("bind失败");
        close(server_fd); // 记得关fd,避免泄漏
        return EXIT_FAILURE;
    }


    // 3. 监听连接
    if (listen(server_fd, LISTEN_BACKLOG) == -1) {
        perror("listen失败");
        close(server_fd);
        return EXIT_FAILURE;
    }


    std::cout << "阻塞服务器启动,端口:" << PORT << ",等着接订单..." << std::endl;


    while (true) {
        // 4. accept()阻塞:没客户就一直等
        sockaddr_in client_addr{};
        socklen_t client_addr_len = sizeof(client_addr);
        int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
        if (client_fd == -1) {
            perror("accept失败");
            continue;
        }


        std::cout << "接到客户连接,fd:" << client_fd << std::endl;


        // 5. recv()阻塞:没数据就等
        char buf[BUF_SIZE]{};
        ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
        if (recv_len <= 0) {
            std::cerr << "客户" << client_fd << "断开或读失败" << std::endl;
            close(client_fd);
            continue;
        }


        buf[recv_len] = '\0'; // 加字符串结束符
        std::cout << "客户" << client_fd << "点了:" << buf << std::endl;


        // 6. 发响应
        const char* resp = "你的奶茶好了!\n";
        send(client_fd, resp, strlen(resp), 0);


        // 7. 关闭连接
        close(client_fd);
        std::cout << "客户" << client_fd << "服务结束" << std::endl;
    }


    // 理论上到不了这,但好习惯还是要养成
    close(server_fd);
    return EXIT_SUCCESS;
}

编译运行:g++ blocking_server.cpp -o blocking_server -std=c++11 && ./blocking_server

跑起来你会发现:同时开两个客户端,第二个得等第一个服务完才能连 —— 就像奶茶店只有一个师傅,C++ 再快也没用,阻塞 I/O 天生低效。

优缺点

  • 优点:简单!C++ 新手也能写对,不用处理复杂逻辑;
  • 缺点:低效!一个连接占一个线程,并发一高就歇菜;
  • 适用场景:写个测试工具、简单命令行服务,别用来扛高并发。

3.2、非阻塞 I/O:“我每隔 10 秒问一次,奶茶好了没?”

工作原理:你点完奶茶去旁边喝咖啡,每隔 10 秒回来看一眼 —— 对应到 C++,就是把 socket 设为非阻塞,read()没数据时立刻返回-1,你得循环轮询。

C++ 代码示例(非阻塞 TCP 服务器)

#include
 <iostream>
#include
 <vector>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <fcntl.h>
#include
 <errno.h>
#include
 <chrono>
#include
 <thread>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
constexpr int POLL_INTERVAL_MS = 1000; // 轮询间隔1秒
// 设置socket为非阻塞,C++封装成函数,复用性高
bool set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    if (flags == -1) {
        perror("fcntl F_GETFL失败");
        return false;
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL失败");
        return false;
    }
    return true;
}
int main() {
    // 1. 创建并初始化server socket
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket创建失败");
        return EXIT_FAILURE;
    }
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        perror("bind失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    if (listen(server_fd, LISTEN_BACKLOG) == -1) {
        perror("listen失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    // 关键:设置server socket为非阻塞
    if (!set_nonblocking(server_fd)) {
        close(server_fd);
        return EXIT_FAILURE;
    }
    std::vector<int> client_fds; // 用STL vector存客户端fd,避免裸数组
    std::cout << "非阻塞服务器启动,端口:" << PORT << ",每隔" << POLL_INTERVAL_MS << "ms轮询一次..." << std::endl;
    while (true) {
        // 2. 非阻塞accept:没新连接就返回-1,不阻塞
        sockaddr_in client_addr{};
        socklen_t client_addr_len = sizeof(client_addr);
        int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
        if (client_fd != -1) {
            // 新客户:设置非阻塞并加入列表
            if (set_nonblocking(client_fd)) {
                client_fds.push_back(client_fd);
                std::cout << "接到新客户,fd:" << client_fd << ",当前客户数:" << client_fds.size() << std::endl;
            } else {
                close(client_fd); // 设非阻塞失败,关fd
            }
        } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
            // 不是“暂时没数据”的错误,才报错
            perror("accept异常");
        }
        // 3. 轮询所有客户端,读数据
        for (auto it = client_fds.begin(); it != client_fds.end();) {
            int fd = *it;
            char buf[BUF_SIZE]{};
            ssize_t recv_len = recv(fd, buf, BUF_SIZE - 1, 0);
            if (recv_len > 0) {
                // 读到数据:处理并响应
                buf[recv_len] = '\0';
                std::cout << "客户" << fd << "点了:" << buf << std::endl;
                const char* resp = "你的奶茶好了!\n";
                send(fd, resp, strlen(resp), 0);
                ++it; // 正常,迭代器后移
            } else if (recv_len == 0) {
                // 客户断开
                std::cout << "客户" << fd << "断开连接" << std::endl;
                close(fd);
                it = client_fds.erase(it); // 从vector删除,迭代器要更新
            } else {
                // 读失败:判断是不是“暂时没数据”
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    ++it; // 暂时没数据,继续轮询
                } else {
                    // 真失败,关fd删列表
                    perror(("客户" + std::to_string(fd) + "读失败").c_str());
                    close(fd);
                    it = client_fds.erase(it);
                }
            }
        }
        // 4. 轮询间隔:用std::this_thread::sleep_for,C++风格
        std::this_thread::sleep_for(std::chrono::milliseconds(POLL_INTERVAL_MS));
        std::cout << "轮询结束,等待下一次..." << std::endl;
    }
    close(server_fd);
    return EXIT_SUCCESS;
}

编译运行:g++ nonblocking_server.cpp -o nonblocking_server -std=c++11 && ./nonblocking_server

跑起来你会发现:能同时接多个客户了,但top看 CPU 占用变高 —— 因为你一直在循环轮询,哪怕没客户也在跑,就像你每隔 10 秒问一次奶茶,不累吗?

C++ 踩坑提醒:用vector存客户端 fd 时,erase会返回新迭代器,别直接it++,不然会迭代器失效崩溃 —— 我当年就这么坑过,查了半天才发现。

优缺点

  • 优点:能处理多连接,不用等一个客户完再服务下一个;
  • 缺点:轮询耗 CPU!1000 个客户就每秒轮询 1000 次,CPU 直接拉满;
  • 适用场景:嵌入式设备、短连接低延迟场景,别单独用在高并发服务。

3.3、多路复用 I/O:“让快递员告诉我,谁的奶茶好了”

工作原理:这是高并发的 “明星选手”!相当于雇个调度员(内核),所有客户订单都交给调度员,他告诉你 “哪个客户的奶茶好了”—— 对应到 C++,就是用epoll监控多个 socket,哪个就绪了就处理哪个。

epoll有两种触发模式,C++ 开发者尤其要注意:

  • 水平触发(LT):奶茶到了没拿,调度员一直提醒你 —— 适合新手,不容易丢数据;
  • 边缘触发(ET):奶茶到了只提醒一次,没拿就没了 —— 效率高,但得一次性把数据读完,不然会丢包。

C++ 代码示例(epoll 服务器,水平触发)

#include
 <iostream>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <fcntl.h>
#include
 <sys/epoll.h>
#include
 <errno.h>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
constexpr int MAX_EVENTS = 1024; // 最多监控1024个事件
bool set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    if (flags == -1) {
        perror("fcntl F_GETFL失败");
        return false;
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL失败");
        return false;
    }
    return true;
}
int main() {
    // 1. 创建server socket
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket创建失败");
        return EXIT_FAILURE;
    }
    // 设置SO_REUSEADDR:避免端口占用问题
    int reuse = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
        perror("setsockopt SO_REUSEADDR失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        perror("bind失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    if (listen(server_fd, LISTEN_BACKLOG) == -1) {
        perror("listen失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    // 2. 创建epoll实例:epoll_create1(0) = 传统epoll_create
    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    // 3. 把server socket加入epoll监控:水平触发(默认)
    epoll_event ev{};
    ev.events = EPOLLIN; // 监控读事件(新连接/客户发数据)
    ev.data.fd = server_fd; // 绑定fd,方便后续识别
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
        perror("epoll_ctl add server_fd失败");
        close(server_fd);
        close(epoll_fd);
        return EXIT_FAILURE;
    }
    std::cout << "epoll服务器启动(水平触发),端口:" << PORT << ",等着调度员通知..." << std::endl;
    epoll_event events[MAX_EVENTS]{}; // 存就绪事件
    while (true) {
        // 4. 等待epoll事件:-1表示一直等,直到有事件
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait失败");
            continue;
        }
        // 5. 处理所有就绪事件
        for (int i = 0; i < nfds; ++i) {
            int fd = events[i].data.fd;
            // 情况1:新连接(server_fd就绪)
            if (fd == server_fd) {
                sockaddr_in client_addr{};
                socklen_t client_addr_len = sizeof(client_addr);
                int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
                if (client_fd == -1) {
                    perror("accept失败");
                    continue;
                }
                // 新客户设为非阻塞(epoll推荐非阻塞,避免卡主)
                if (!set_nonblocking(client_fd)) {
                    close(client_fd);
                    continue;
                }
                // 把新客户加入epoll监控
                ev.events = EPOLLIN; // 水平触发
                ev.data.fd = client_fd;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
                    perror(("epoll_ctl add client_fd " + std::to_string(client_fd) + "失败").c_str());
                    close(client_fd);
                    continue;
                }
                std::cout << "接到新客户,fd:" << client_fd << std::endl;
            }
            // 情况2:客户发数据(client_fd就绪)
            else {
                char buf[BUF_SIZE]{};
                ssize_t recv_len = recv(fd, buf, BUF_SIZE - 1, 0);
                if (recv_len > 0) {
                    // 处理数据
                    buf[recv_len] = '\0';
                    std::cout << "客户" << fd << "点了:" << buf << std::endl;
                    const char* resp = "你的奶茶好了!\n";
                    send(fd, resp, strlen(resp), 0);
                } else if (recv_len == 0) {
                    // 客户断开:从epoll删除并关fd
                    std::cout << "客户" << fd << "断开连接" << std::endl;
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                    close(fd);
                } else {
                    // 读失败
                    perror(("客户" + std::to_string(fd) + "读失败").c_str());
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                    close(fd);
                }
            }
        }
    }
    // 理论上到不了这
    close(server_fd);
    close(epoll_fd);
    return EXIT_SUCCESS;
}

编译运行:g++ epoll_lt_server.cpp -o epoll_lt_server -std=c++11 && ./epoll_lt_server

跑起来你会发现:CPU 占用率极低 —— 因为不用轮询了,内核只在有事件的时候通知你,Nginx、Redis 的底层就是这逻辑。

C++ 进阶:边缘触发(ET)改造

只需改两处:

  1. 加入 epoll 时,事件设为EPOLLIN | EPOLLET;
  2. 读数据时循环读,直到recv返回EAGAIN:
// 情况2:客户发数据(ET模式)
else {
    char buf[BUF_SIZE]{};
    ssize_t recv_len;
    // 循环读,直到没数据(ET模式必须读完)
    while ((recv_len = recv(fd, buf, BUF_SIZE - 1, 0)) > 0) {
        buf[recv_len] = '\0';
        std::cout << "客户" << fd << "点了:" << buf << std::endl;
        const char* resp = "你的奶茶好了!\n";
        send(fd, resp, strlen(resp), 0);
        memset(buf, 0, BUF_SIZE); // 清空缓冲区
    }
    if (recv_len == 0) {
        // 客户断开
        std::cout << "客户" << fd << "断开连接" << std::endl;
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
        close(fd);
    } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
        // 不是“暂时没数据”的错误
        perror(("客户" + std::to_string(fd) + "读失败").c_str());
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
        close(fd);
    }
}

3.4、异步 I/O:“奶茶做好了,直接送我家”

工作原理:你在网上点奶茶,备注 “做好了放门口”,然后该干啥干啥 —— 对应到 C++,就是调用libaio的异步接口,内核帮你完成 “等数据 + 拷贝数据”,完事了通知你。

C++ 代码示例(libaio 异步文件写)

#include
 <iostream>
#include
 <cstring>
#include
 <fcntl.h>
#include
 <libaio.h>
#include
 <unistd.h>
#include
 <errno.h>
constexpr int BUF_SIZE = 1024;
constexpr int IO_EVENTS = 1;
// RAII封装io_context_t:自动销毁,避免内存泄漏
class AioContext {
public:
    AioContext(int max_events) : ctx_{} {
        if (io_setup(max_events, &ctx_) != 0) {
            throw std::runtime_error("io_setup失败:" + std::string(strerror(errno)));
        }
    }
    ~AioContext() {
        io_destroy(ctx_); // 析构时自动销毁
    }
    // 禁止拷贝赋值,避免double free
    AioContext(const AioContext&) = delete;
    AioContext& operator=(const AioContext&) = delete;
    // 提供访问器
    io_context_t& get() { return ctx_; }
private:
    io_context_t ctx_;
};
int main() {
    try {
        // 1. 创建AioContext:RAII管理,不用手动destroy
        AioContext aio_ctx(IO_EVENTS);
        // 2. 打开文件:O_DIRECT要求缓冲区对齐,libaio推荐
        int fd = open("test_aio.txt", O_WRONLY | O_CREAT | O_DIRECT, 0644);
        if (fd == -1) {
            throw std::runtime_error("open失败:" + std::string(strerror(errno)));
        }
        // 3. 分配对齐的缓冲区:O_DIRECT要求缓冲区地址是扇区大小的倍数(通常4096)
        void* buf = nullptr;
        if (posix_memalign(&buf, 4096, BUF_SIZE) != 0) {
            close(fd);
            throw std::runtime_error("posix_memalign失败:" + std::string(strerror(errno)));
        }
        std::memcpy(buf, "C++异步写数据:奶茶做好了,直接送上门!", BUF_SIZE);
        // 4. 设置异步写请求
        io_event events[IO_EVENTS]{};
        iocb iocb{};
        iocb* iocbs[IO_EVENTS] = {&iocb};
        // 初始化iocb:异步写,从文件偏移0开始
        io_prep_pwrite(&iocb, fd, buf, BUF_SIZE, 0);
        // 绑定用户数据,方便后续识别(C++可以传任意指针)
        io_set_event_data(&iocb, reinterpret_cast<void*>("异步写请求完成!"));
        // 5. 提交异步请求
        int ret = io_submit(aio_ctx.get(), IO_EVENTS, iocbs);
        if (ret != IO_EVENTS) {
            free(buf);
            close(fd);
            throw std::runtime_error("io_submit失败:" + std::string(strerror(errno)));
        }
        std::cout << "异步写请求已提交,我先去干别的了..." << std::endl;
        // 6. 等待请求完成:io_getevents会阻塞到有事件
        ret = io_getevents(aio_ctx.get(), IO_EVENTS, IO_EVENTS, events, nullptr);
        if (ret < 0) {
            free(buf);
            close(fd);
            throw std::runtime_error("io_getevents失败:" + std::string(strerror(errno)));
        }
        // 7. 处理完成的请求
        for (int i = 0; i < ret; ++i) {
            io_event& ev = events[i];
            // 获取用户数据:C++强制类型转换
            const char* msg = reinterpret_cast<const char*>(ev.data);
            std::cout << "收到通知:" << msg << std::endl;
            std::cout << "写了 " << ev.res << " 字节数据到文件" << std::endl;
            // 检查是否有错误
            if (ev.res2 != 0) {
                std::cerr << "异步写错误:" << strerror(-ev.res2) << std::endl;
            }
        }
        // 8. 清理资源
        free(buf);
        close(fd);
    } catch (const std::exception& e) {
        // C++异常处理:比C的if-else更优雅
        std::cerr << "出错了:" << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

编译运行

  1. 先装 libaio:sudo yum install libaio-devel(CentOS)或sudo apt install libaio-dev(Ubuntu);
  1. 编译:g++ aio_demo.cpp -o aio_demo -std=c++11 -laio && ./aio_demo。

C++ 特性亮点:用AioContext类实现 RAII,自动管理io_context_t,避免忘记io_destroy导致的资源泄漏;用std::exception处理错误,比 C 的perror更优雅。

优缺点

  • 优点:性能天花板!全程不用用户态参与 “等” 和 “拷贝”,CPU 利用率极低;
  • 缺点:libaio对网络 I/O 支持有限,C++ 代码复杂度高;
  • 适用场景:数据库存储引擎(MySQL InnoDB)、分布式文件系统,需要 “极致吞吐量” 的场景。

3.5、信号驱动 I/O:“奶茶好了,给我打个电话”

工作原理:你点奶茶时留电话,奶茶做好了店员打电话 —— 对应到 C++,就是给 socket 注册SIGIO信号,内核数据就绪后发信号,你在信号处理函数里处理数据。

C++ 代码示例(信号驱动 I/O 服务器)

#include
 <iostream>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <fcntl.h>
#include
 <signal.h>
#include
 <errno.h>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
int server_fd; // 全局变量,信号处理函数要访问(C++也能用全局,别滥用就行)
char buf[BUF_SIZE]{};
// 信号处理函数:必须是extern "C",避免C++名字修饰
extern "C" void sigio_handler(int sig) {
    if (sig != SIGIO) {
        return;
    }
    // 接受新连接
    sockaddr_in client_addr{};
    socklen_t client_addr_len = sizeof(client_addr);
    int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
    if (client_fd == -1) {
        perror("sigio_handler accept失败");
        return;
    }
    std::cout << "接到客户连接,fd:" << client_fd << std::endl;
    // 读数据
    ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
    if (recv_len > 0) {
        buf[recv_len] = '\0';
        std::cout << "客户" << client_fd << "点了:" << buf << std::endl;
        const char* resp = "你的奶茶好了!\n";
        send(client_fd, resp, strlen(resp), 0);
    }
    close(client_fd);
    memset(buf, 0, BUF_SIZE); // 清空缓冲区
}
int main() {
    // 1. 创建server socket
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket创建失败");
        return EXIT_FAILURE;
    }
    // 设置SO_REUSEADDR
    int reuse = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
        perror("setsockopt SO_REUSEADDR失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        perror("bind失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    if (listen(server_fd, 5) == -1) {
        perror("listen失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    // 2. 设置socket属主:让内核知道给哪个进程发信号
    if (fcntl(server_fd, F_SETOWN, getpid()) == -1) {
        perror("fcntl F_SETOWN失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    // 3. 启用信号驱动:设置O_ASYNC标志
    int flags = fcntl(server_fd, F_GETFL);
    if (flags == -1) {
        perror("fcntl F_GETFL失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    if (fcntl(server_fd, F_SETFL, flags | O_ASYNC) == -1) {
        perror("fcntl F_SETFL O_ASYNC失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    // 4. 注册SIGIO信号处理函数
    struct sigaction sa{};
    sa.sa_handler = sigio_handler;
    sigemptyset(&sa.sa_mask); // 清空信号掩码
    sa.sa_flags = 0;
    if (sigaction(SIGIO, &sa, nullptr) == -1) {
        perror("sigaction失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    std::cout << "信号驱动I/O服务器启动,等着接电话(SIGIO)..." << std::endl;
    // 主进程可以干别的,比如循环sleep
    while (true) {
        sleep(1);
    }
    close(server_fd);
    return EXIT_SUCCESS;
}

编译运行:g++ sigio_server.cpp -o sigio_server -std=c++11 && ./sigio_server

C++ 踩坑提醒:信号处理函数必须加extern "C",否则 C++ 编译器会对函数名进行修饰,导致sigaction找不到函数 —— 我当年没加这个,调试了两小时才发现。

优缺点

  • 优点:不用轮询,比非阻塞 I/O 高效;
  • 缺点:信号风暴风险高(多个 socket 同时就绪),不好处理大量连接;
  • 适用场景:嵌入式设备、低并发低延迟场景,现在用得少。

3.6、五大 I/O 模型对比

用一张表总结:

I/O 模型

等待数据就绪

数据拷贝

效率

适用场景

C++ 实现难点

奶茶店类比

阻塞 I/O

进程阻塞

进程阻塞

测试工具

简单,无难点

站着等奶茶

非阻塞 I/O

轮询

进程阻塞

嵌入式设备

vector 迭代器失效、轮询 CPU 高

每隔 10 秒问一次

多路复用 I/O

内核通知

进程阻塞

Web 服务器

ET 模式循环读、惊群效应

调度员通知

异步 I/O

内核处理

内核处理

极高

数据库

RAII 资源管理、libaio 对齐

奶茶送上门

信号驱动 I/O

信号通知

进程处理

小众场景

extern "C"、信号安全

打电话通知

核心结论:C++ 写高并发服务,优先用epoll(水平触发入门,边缘触发进阶),别用阻塞 I/O 扛并发 —— 除非你想让老板骂你。

Part4  传统并发 I/O 方案

在epoll出来之前,C++ 开发者都是用 “多进程多线程” 处理并发 —— 就像奶茶店客户多了,雇一堆师傅。咱们聊聊这两种方案的 C++ 实现和坑。

4.1、多进程 I/O:“一个客户一个师傅”

工作原理:主进程accept后fork子进程,子进程处理客户,主进程继续等新客户 —— 对应到 C++,就是用fork()创建子进程,注意fork后子进程会复制父进程的文件描述符。

C++ 代码示例(多进程服务器)

#include
 <iostream>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <sys/wait.h>
#include
 <signal.h>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
// 信号处理函数:回收僵尸进程
extern "C" void sigchld_handler(int sig) {
    // waitpid(-1, nullptr, WNOHANG):非阻塞回收所有僵尸进程
    while (waitpid(-1, nullptr, WNOHANG) > 0);
}
void handle_client(int client_fd) {
    std::cout << "子进程" << getpid() << "服务客户,fd:" << client_fd << std::endl;
    char buf[BUF_SIZE]{};
    while (true) {
        ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
        if (recv_len <= 0) {
            std::cout << "子进程" << getpid() << ":客户" << client_fd << "断开" << std::endl;
            break;
        }
        buf[recv_len] = '\0';
        std::cout << "子进程" << getpid() << ":客户" << client_fd << "点了:" << buf << std::endl;
        const char* resp = "你的奶茶好了!\n";
        send(client_fd, resp, strlen(resp), 0);
        memset(buf, 0, BUF_SIZE);
    }
    close(client_fd);
    exit(EXIT_SUCCESS); // 子进程处理完退出
}
int main() {
    // 注册SIGCHLD信号处理函数,回收僵尸进程
    struct sigaction sa{};
    sa.sa_handler = sigchld_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = SA_RESTART; // 重启被信号中断的系统调用
    if (sigaction(SIGCHLD, &sa, nullptr) == -1) {
        perror("sigaction SIGCHLD失败");
        return EXIT_FAILURE;
    }
    // 创建server socket
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket创建失败");
        return EXIT_FAILURE;
    }
    int reuse = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
        perror("setsockopt SO_REUSEADDR失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        perror("bind失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    if (listen(server_fd, LISTEN_BACKLOG) == -1) {
        perror("listen失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    std::cout << "多进程服务器启动,主进程PID:" << getpid() << ",端口:" << PORT << std::endl;
    while (true) {
        // accept阻塞
        sockaddr_in client_addr{};
        socklen_t client_addr_len = sizeof(client_addr);
        int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
        if (client_fd == -1) {
            perror("accept失败");
            continue;
        }
        // fork子进程
        pid_t pid = fork();
        if (pid == -1) {
            perror("fork失败");
            close(client_fd);
            continue;
        } else if (pid == 0) {
            // 子进程:关闭主进程的server_fd(子进程不用监听)
            close(server_fd);
            handle_client(client_fd);
        } else {
            // 父进程:关闭client_fd(交给子进程处理)
            close(client_fd);
            std::cout << "父进程" << getpid() << ":创建子进程" << pid << "服务客户" << std::endl;
        }
    }
    close(server_fd);
    return EXIT_SUCCESS;
}

编译运行:g++ multiprocess_server.cpp -o multiprocess_server -std=c++11 && ./multiprocess_server

C++ 踩坑提醒:fork后子进程会复制父进程的所有 fd,所以子进程要关server_fd,父进程要关client_fd,不然会 fd 泄漏 —— 我当年忘了关,导致服务器跑一会儿就 “too many open files”。

优缺点

  • 优点:稳定!进程隔离,一个子进程崩了不影响主进程;
  • 缺点:资源开销大!Linux 一个进程占 8MB 栈内存,1000 个进程就是 8GB;
  • 适用场景:稳定性优先的场景(比如数据库主进程)。

4.2、多线程 I/O:“一个师傅服务多个客户”

工作原理:主进程accept后创建线程,线程处理客户 —— 对应到 C++,就是用std::thread创建线程,注意线程安全(用std::mutex保护共享资源)。

C++ 代码示例(多线程服务器)

#include
 <iostream>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <thread>
#include
 <mutex>
#include
 <vector>
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 1024;
constexpr int LISTEN_BACKLOG = 5;
// 全局互斥锁:保护cout输出,避免多线程打印乱码
std::mutex cout_mutex;
void handle_client(int client_fd) {
    // 线程局部存储:每个线程有独立的缓冲区,避免共享数据竞争
    thread_local char buf[BUF_SIZE]{};
    {
        // 加锁打印,避免乱码
        std::lock_guard<std::mutex> lock(cout_mutex);
        std::cout << "线程" << std::this_thread::get_id() << "服务客户,fd:" << client_fd << std::endl;
    }
    while (true) {
        ssize_t recv_len = recv(client_fd, buf, BUF_SIZE - 1, 0);
        if (recv_len <= 0) {
            std::lock_guard<std::mutex> lock(cout_mutex);
            std::cout << "线程" << std::this_thread::get_id() << ":客户" << client_fd << "断开" << std::endl;
            break;
        }
        buf[recv_len] = '\0';
        {
            std::lock_guard<std::mutex> lock(cout_mutex);
            std::cout << "线程" << std::this_thread::get_id() << ":客户" << client_fd << "点了:" << buf << std::endl;
        }
        const char* resp = "你的奶茶好了!\n";
        send(client_fd, resp, strlen(resp), 0);
        memset(buf, 0, BUF_SIZE);
    }
    close(client_fd);
}
int main() {
    // 创建server socket
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket创建失败");
        return EXIT_FAILURE;
    }
    int reuse = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
        perror("setsockopt SO_REUSEADDR失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        perror("bind失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    if (listen(server_fd, LISTEN_BACKLOG) == -1) {
        perror("listen失败");
        close(server_fd);
        return EXIT_FAILURE;
    }
    std::cout << "多线程服务器启动,主线程ID:" << std::this_thread::get_id() << ",端口:" << PORT << std::endl;
    // 存线程对象,避免线程析构时崩溃
    std::vector<std::thread> threads;
    while (true) {
        sockaddr_in client_addr{};
        socklen_t client_addr_len = sizeof(client_addr);
        int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
        if (client_fd == -1) {
            perror("accept失败");
            continue;
        }
        // 创建线程:用std::thread,传递client_fd
        // 注意:C++11后可以用移动语义,避免拷贝
        threads.emplace_back([client_fd]() {
            handle_client(client_fd);
        });
        // 分离线程:避免主线程join,线程结束后自动回收资源
        threads.back().detach();
    }
    close(server_fd);
    return EXIT_SUCCESS;
}

编译运行:g++ multithread_server.cpp -o multithread_server -std=c++11 -pthread && ./multithread_server

C++ 特性亮点

  • 用std::mutex和std::lock_guard保护cout,避免多线程打印乱码;
  • 用thread_local给每个线程分配独立缓冲区,避免数据竞争;
  • 用std::vector存std::thread,用emplace_back和移动语义,效率更高。

踩坑提醒:std::thread创建后必须join或detach,不然析构时会调用terminate崩溃 —— 我当年忘了detach,服务跑一会儿就崩,查了半天才发现。

优缺点

  • 优点:比多进程省资源!线程共享进程内存,上下文切换成本低;
  • 缺点:线程安全问题!多个线程共享数据要加锁,不然会竞态条件;
  • 适用场景:资源优先的场景(比如 Web 服务),但要处理线程安全。

Part5  epoll+线程池--王者架构

传统多进程多线程的瓶颈是 “一个连接一个执行单元”,C++ 要突破这个瓶颈,就得用 “epoll + 线程池”—— 这是现在最主流的高并发架构。

5.1、C++ 线程池实现

先写一个通用线程池,用std::queue做任务队列,std::mutex和std::condition_variable做同步:

// thread_pool.h
#ifndef
 THREAD_POOL_H
#define
 THREAD_POOL_H
#include
 <iostream>
#include
 <vector>
#include
 <queue>
#include
 <functional>
#include
 <thread>
#include
 <mutex>
#include
 <condition_variable>
#include
 <atomic>
#include
 <stdexcept>
// 线程池类:C++11及以上,支持任意参数的任务
class ThreadPool {
public:
    // 构造函数:指定线程数
    explicit ThreadPool(size_t thread_num) 
        : thread_num_(thread_num), is_running_(true) {
        if (thread_num == 0) {
            throw std::invalid_argument("线程数不能为0");
        }
        start();
    }
    // 析构函数:停止线程池
    ~ThreadPool() {
        stop();
    }
    // 禁止拷贝赋值
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    // 提交任务:支持任意参数,返回future(可选)
    template<typename F, typename... Args>
    void submit(F&& f, Args&&... args) {
        // 包装任务为void()类型
        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        {
            std::lock_guard<std::mutex> lock(mutex_);
            // 如果线程池已停止,不接受新任务
            if (!is_running_) {
                throw std::runtime_error("线程池已停止,无法提交任务");
            }
            tasks_.emplace(std::move(task));
        }
        // 唤醒一个等待的线程
        cond_var_.notify_one();
    }
private:
    // 启动线程池
    void start() {
        for (size_t i = 0; i < thread_num_; ++i) {
            // 创建线程,执行worker函数
            threads_.emplace_back([this]() {
                worker();
            });
        }
    }
    // 停止线程池
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            is_running_ = false;
        }
        // 唤醒所有等待的线程
        cond_var_.notify_all();
        // 等待所有线程退出
        for (auto& thread : threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }
    // 线程工作函数:循环取任务执行
    void worker() {
        while (is_running_) {
            std::function<void()> task;
            // 加锁取任务
            {
                std::unique_lock<std::mutex> lock(mutex_);
                // 等待条件:线程池运行中且任务队列不为空
                cond_var_.wait(lock, [this]() {
                    return !is_running_ || !tasks_.empty();
                });
                // 如果线程池已停止且任务队列为空,退出
                if (!is_running_ && tasks_.empty()) {
                    return;
                }
                // 取任务(移动语义,避免拷贝)
                task = std::move(tasks_.front());
                tasks_.pop();
            }
            // 执行任务
            try {
                task();
            } catch (const std::exception& e) {
                std::cerr << "线程" << std::this_thread::get_id() << "执行任务失败:" << e.what() << std::endl;
            }
        }
    }
private:
    size_t thread_num_; // 线程数
    std::vector<std::thread> threads_; // 线程列表
    std::queue<std::function<void()>> tasks_; // 任务队列
    std::mutex mutex_; // 保护任务队列的互斥锁
    std::condition_variable cond_var_; // 条件变量,用于线程唤醒
    std::atomic<bool> is_running_; // 是否运行中(原子变量,线程安全)
};
#endif
 // THREAD_POOL_H

5.2、epoll + 线程池:10 万并发 Echo 服务器

用上面的线程池,结合 epoll 边缘触发,实现高并发 Echo 服务器:

#include
 <iostream>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <fcntl.h>
#include
 <sys/epoll.h>
#include
 <errno.h>
#include
 <memory> // 智能指针
#include
 "thread_pool.h"
constexpr int PORT = 8080;
constexpr int BUF_SIZE = 4096;
constexpr int MAX_EVENTS = 102400; // 支持10万连接
constexpr int THREAD_NUM = 8; // 线程数=CPU核心数*2
// 客户端数据:用struct封装,配合智能指针
struct ClientData {
    int fd;
    char buf[BUF_SIZE];
    int buf_len;
    ClientData(int fd_) : fd(fd_), buf_len(0) {
        memset(buf, 0, BUF_SIZE);
    }
    ~ClientData() {
        // 析构时关闭fd,避免泄漏
        close(fd);
        std::cout << "客户" << fd << "资源已释放" << std::endl;
    }
};
// 设置非阻塞
bool set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    if (flags == -1) {
        perror("fcntl F_GETFL失败");
        return false;
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL失败");
        return false;
    }
    return true;
}
// 任务函数:处理客户端数据(Echo)
void handle_client_data(std::shared_ptr<ClientData> data) {
    // 用shared_ptr管理ClientData,自动释放
    std::cout << "线程" << std::this_thread::get_id() << "处理客户" << data->fd << ",数据长度:" << data->buf_len << std::endl;
    std::cout << "客户" << data->fd << "点了:" << std::string(data->buf, data->buf_len) << std::endl;
    // 发送响应
    send(data->fd, data->buf, data->buf_len, 0);
}
int main() {
    try {
        // 1. 创建线程池:8个线程
        ThreadPool thread_pool(THREAD_NUM);
        std::cout << "线程池创建成功,线程数:" << THREAD_NUM << std::endl;
        // 2. 创建server socket
        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (server_fd == -1) {
            throw std::runtime_error("socket创建失败:" + std::string(strerror(errno)));
        }
        // 设置SO_REUSEADDR和SO_REUSEPORT:避免端口占用,均匀分发连接
        int reuse = 1;
        if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
            close(server_fd);
            throw std::runtime_error("setsockopt SO_REUSEADDR失败:" + std::string(strerror(errno)));
        }
        if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)) == -1) {
            close(server_fd);
            throw std::runtime_error("setsockopt SO_REUSEPORT失败:" + std::string(strerror(errno)));
        }
        // 绑定端口
        sockaddr_in server_addr{};
        server_addr.sin_family = AF_INET;
        server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        server_addr.sin_port = htons(PORT);
        if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
            close(server_fd);
            throw std::runtime_error("bind失败:" + std::string(strerror(errno)));
        }
        // 监听
        if (listen(server_fd, 1024) == -1) {
            close(server_fd);
            throw std::runtime_error("listen失败:" + std::string(strerror(errno)));
        }
        // 设置非阻塞
        if (!set_nonblocking(server_fd)) {
            close(server_fd);
            throw std::runtime_error("set_nonblocking server_fd失败");
        }
        // 3. 创建epoll实例
        int epoll_fd = epoll_create1(0);
        if (epoll_fd == -1) {
            close(server_fd);
            throw std::runtime_error("epoll_create1失败:" + std::string(strerror(errno)));
        }
        // 4. 把server_fd加入epoll:边缘触发
        epoll_event ev{};
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = server_fd;
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
            close(server_fd);
            close(epoll_fd);
            throw std::runtime_error("epoll_ctl add server_fd失败:" + std::string(strerror(errno)));
        }
        std::cout << "Echo服务器启动,端口:" << PORT << ",支持最大连接:" << MAX_EVENTS << std::endl;
        epoll_event events[MAX_EVENTS]{};
        while (true) {
            // 5. 等待epoll事件
            int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
            if (nfds == -1) {
                if (errno == EINTR) {
                    // 被信号中断,继续等待
                    continue;
                }
                throw std::runtime_error("epoll_wait失败:" + std::string(strerror(errno)));
            }
            // 6. 处理所有就绪事件
            for (int i = 0; i < nfds; ++i) {
                int fd = events[i].data.fd;
                // 情况1:新连接(server_fd就绪)
                if (fd == server_fd) {
                    while (true) {
                        sockaddr_in client_addr{};
                        socklen_t client_addr_len = sizeof(client_addr);
                        int client_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
                        if (client_fd == -1) {
                            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                // 没有更多连接,退出循环
                                break;
                            } else {
                                perror("accept失败");
                                break;
                            }
                        }
                        std::cout << "接到新客户,fd:" << client_fd << std::endl;
                        // 设置非阻塞+边缘触发
                        if (!set_nonblocking(client_fd)) {
                            close(client_fd);
                            continue;
                        }
                        ev.events = EPOLLIN | EPOLLET;
                        ev.data.fd = client_fd;
                        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
                            perror(("epoll_ctl add client_fd " + std::to_string(client_fd) + "失败").c_str());
                            close(client_fd);
                            continue;
                        }
                    }
                }
                // 情况2:客户发数据(client_fd就绪)
                else {
                    if (events[i].events & EPOLLIN) {
                        // 用shared_ptr管理ClientData,自动释放
                        auto data = std::make_shared<ClientData>(fd);
                        // 边缘触发:循环读数据,直到EAGAIN
                        while (true) {
                            ssize_t recv_len = recv(fd, data->buf + data->buf_len, BUF_SIZE - data->buf_len - 1, 0);
                            if (recv_len > 0) {
                                data->buf_len += recv_len;
                                // 缓冲区满了,先提交任务
                                if (data->buf_len >= BUF_SIZE - 1) {
                                    thread_pool.submit(handle_client_data, data);
                                    // 重新创建一个data,继续读
                                    data = std::make_shared<ClientData>(fd);
                                }
                            } else if (recv_len == 0) {
                                // 客户断开:从epoll删除
                                std::cout << "客户" << fd << "断开连接" << std::endl;
                                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                                // data会自动析构,关闭fd
                                goto next_event;
                            } else {
                                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                    // 没有更多数据,提交任务
                                    if (data->buf_len > 0) {
                                        thread_pool.submit(handle_client_data, data);
                                    }
                                    break;
                                } else {
                                    // 读失败
                                    perror(("客户" + std::to_string(fd) + "读失败").c_str());
                                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
                                    // data会自动析构,关闭fd
                                    goto next_event;
                                }
                            }
                        }
                    }
                }
next_event:;
            }
        }
        // 理论上到不了这
        close(server_fd);
        close(epoll_fd);
    } catch (const std::exception& e) {
        std::cerr << "服务器异常:" << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

编译运行

  1. 把thread_pool.h和echo_server.cpp放同一目录;
  1. 编译:g++ echo_server.cpp -o echo_server -std=c++11 -pthread && ./echo_server;
  1. 压测验证:wrk -t8 -c10000 -d30s http://127.0.0.1:8080(需先装 wrk)。

跑起来你会发现:1 万并发下 CPU 占用低于 50%,吞吐量能到几万 QPS—— 这就是 C+++epoll + 线程池的威力!

5.3、异步 I/O

异步 I/O 是 “终极方案”,但现在还有些坑:

  • Linux 的libaio对网络 I/O 支持不算完美,很多场景下还是用 “epoll + 线程池” 模拟异步(伪异步);
  • 编程复杂度高:异步逻辑是 “事件驱动”,代码会变成 “回调地狱”—— 比如你要先异步读数据,读完再异步解析,解析完再异步写响应,回调嵌套能有好几层。

建议

  • 如果你是新手,先掌握 “epoll + 线程池”—— 这方案足够支撑大部分高并发场景(比如 Nginx 就是 epoll + 线程池);
  • 如果你需要 “极致性能”(比如数据库存储引擎),再研究异步 I/O(比如 IO_URING);

Part6  实战:手写"10万并发 Echo服务器"

光说不练假把式,咱们用 C++ 手写 “10 万并发 Echo 服务器”:基于epoll边缘触发+线程池,解决惊群、粘包问题,用 C++ 特性(智能指针、RAII、异常处理)规避内存泄漏与崩溃风险。

6.1、需求定义:Echo 服务器核心目标

  • 功能:客户端发送任意数据,服务器原封不动返回(Echo);
  • 性能:支持 10 万并发连接,单 Core CPU 占用≤50%,吞吐量≥5 万 QPS;
  • 稳定性:客户端异常断连不崩溃、数据粘包能正确解析、资源自动回收。

6.2、架构设计:C++ 风格高并发架构

沿用 “epoll 主进程 + 线程池” 架构,但用 C++ 特性优化:

模块

C++ 实现要点

主进程

epoll边缘触发监控 socket,SO_REUSEPORT避惊群,智能指针管理客户端资源

线程池

std::thread+std::mutex+std::condition_variable,RAII 自动停止线程

任务队列

std::queue std::function<void()>,支持任意参数任务,无需手动绑定函数参数

数据处理

自定义 “包头 + 包体” 结构解粘包,std::string管理动态数据

6.3、C++ 代码实现:核心逻辑

6.3.1、线程池实现(ThreadPool.h)

用 C++11 + 特性实现通用线程池,支持任意任务类型:

#ifndef
 THREAD_POOL_H
#define
 THREAD_POOL_H
#include
 <iostream>
#include
 <vector>
#include
 <queue>
#include
 <functional>
#include
 <thread>
#include
 <mutex>
#include
 <condition_variable>
#include
 <atomic>
#include
 <stdexcept>
#include
 <utility>
class ThreadPool {
public:
    // 构造:指定线程数,默认CPU核心数*2
    explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency() * 2)
        : is_running_(true) {
        if (thread_num == 0) {
            throw std::invalid_argument("线程数不能为0(当前CPU核心数:" + 
                std::to_string(std::thread::hardware_concurrency()) + ")");
        }
        start(thread_num);
    }
    // 析构:自动停止线程池
    ~ThreadPool() { stop(); }
    // 禁止拷贝赋值(避免线程资源重复释放)
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    // 提交任务:支持任意参数(完美转发)
    template<typename F, typename... Args>
    void submit(F&& f, Args&&... args) {
        // 包装任务为无参函数(std::bind+完美转发)
        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        {
            std::lock_guard<std::mutex> lock(mtx_);
            if (!is_running_) {
                throw std::runtime_error("线程池已停止,无法提交新任务");
            }
            tasks_.emplace(std::move(task));
        }
        // 唤醒一个等待的线程
        cond_.notify_one();
    }
private:
    // 启动线程
    void start(size_t thread_num) {
        for (size_t i = 0; i < thread_num; ++i) {
            threads_.emplace_back([this]() {
                worker(); // 线程执行函数
            });
        }
        std::cout << "线程池启动:" << thread_num << "个线程" << std::endl;
    }
    // 停止线程池
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            is_running_ = false;
        }
        // 唤醒所有线程(避免线程卡在wait)
        cond_.notify_all();
        // 等待所有线程退出
        for (auto& t : threads_) {
            if (t.joinable()) {
                t.join();
            }
        }
        std::cout << "线程池停止:所有线程已回收" << std::endl;
    }
    // 线程工作函数:循环取任务执行
    void worker() {
        const auto thread_id = std::this_thread::get_id();
        std::cout << "线程启动:" << thread_id << std::endl;
        while (is_running_) {
            std::function<void()> task;
            // 加锁取任务(unique_lock支持手动解锁)
            std::unique_lock<std::mutex> lock(mtx_);
            // 等待条件:线程池运行中 且 任务队列非空
            cond_.wait(lock, [this]() {
                return !is_running_ || !tasks_.empty();
            });
            // 线程池停止且任务为空:退出
            if (!is_running_ && tasks_.empty()) {
                break;
            }
            // 取任务(移动语义,避免拷贝开销)
            task = std::move(tasks_.front());
            tasks_.pop();
            lock.unlock(); // 解锁,让其他线程取任务
            // 执行任务(捕获异常,避免单个任务崩溃导致线程退出)
            try {
                task();
            } catch (const std::exception& e) {
                std::cerr << "线程" << thread_id << "执行任务失败:" << e.what() << std::endl;
            }
        }
        std::cout << "线程停止:" << thread_id << std::endl;
    }
private:
    std::vector<std::thread> threads_;       // 线程列表
    std::queue<std::function<void()>> tasks_;// 任务队列
    std::mutex mtx_;                         // 保护任务队列的互斥锁
    std::condition_variable cond_;           // 唤醒线程的条件变量
    std::atomic<bool> is_running_;           // 线程池运行状态(原子变量,线程安全)
};
#endif
 // THREAD_POOL_H

6.3.2、Echo 服务器主逻辑(echo_server.cpp)

包含粘包处理、epoll 监控、客户端数据管理:

#include
 <iostream>
#include
 <cstring>
#include
 <sys/socket.h>
#include
 <netinet/in.h>
#include
 <unistd.h>
#include
 <fcntl.h>
#include
 <sys/epoll.h>
#include
 <errno.h>
#include
 <memory>   // std::shared_ptr
#include
 <string>
#include
 <endian.h> // std::endian::big
#include
 "ThreadPool.h"
// 配置参数(constexpr替代
#define
,类型安全)
constexpr int PORT = 8080;
constexpr int MAX_EVENTS = 102400; // 支持10万并发连接
constexpr int BUF_SIZE = 4096;
constexpr int HEADER_LEN = 4;      // 包头长度:4字节(存储包体长度)
// 1. 粘包处理:固定长度包头
struct FixedHeader {
    uint32_t body_len; // 包体长度(网络字节序:大端)
};
// 2. 客户端数据结构体(用shared_ptr管理,自动释放)
struct ClientData {
    int fd;                     // 客户端文件描述符
    std::string recv_buf;       // 接收缓冲区(动态扩容,避免固定大小限制)
    sockaddr_in client_addr;    // 客户端地址(可选,用于日志)
    ClientData(int fd_, const sockaddr_in& addr_) 
        : fd(fd_), client_addr(addr_) {}
    // 析构:自动关闭fd(RAII)
    ~ClientData() {
        close(fd);
        std::cout << "客户端" << fd << "资源释放(IP:" 
                  << inet_ntoa(client_addr.sin_addr) << ")" << std::endl;
    }
    // 禁止拷贝(避免fd重复关闭)
    ClientData(const ClientData&) = delete;
    ClientData& operator=(const ClientData&) = delete;
};
// 3. 工具函数:设置socket为非阻塞
bool set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    if (flags == -1) {
        std::cerr << "fcntl F_GETFL失败:" << strerror(errno) << std::endl;
        return false;
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        std::cerr << "fcntl F_SETFL失败:" << strerror(errno) << std::endl;
        return false;
    }
    return true;
}
// 4. 工具函数:ET模式循环读数据(直到EAGAIN)
ssize_t read_until_eagain(int fd, std::string& buf) {
    char tmp[BUF_SIZE];
    ssize_t total_read = 0;
    while (true) {
        ssize_t n = recv(fd, tmp, BUF_SIZE, 0);
        if (n > 0) {
            buf.append(tmp, n);
            total_read += n;
        } else if (n == 0) {
            // 客户端主动断连
            return 0;
        } else {
            // 读出错:判断是否为“暂时无数据”
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                return total_read; // 正常:无更多数据
            } else {
                std::cerr << "recv失败:" << strerror(errno) << "(fd:" << fd << ")" << std::endl;
                return -1; // 错误:需要关闭连接
            }
        }
    }
}
// 5. 任务函数:处理客户端数据(解粘包+Echo)
void handle_client(std::shared_ptr<ClientData> client) {
    const auto fd = client->fd;
    auto& recv_buf = client->recv_buf;
    // 步骤1:循环读数据(ET模式)
    ssize_t read_len = read_until_eagain(fd, recv_buf);
    if (read_len == 0) {
        std::cout << "客户端" << fd << "主动断连" << std::endl;
        return;
    } else if (read_len == -1) {
        return; // 读出错,已在read_until_eagain打印日志
    }
    // 步骤2:解粘包(包头+包体)
    while (recv_buf.size() >= HEADER_LEN) {
        // 2.1 解析包头(网络字节序转主机字节序)
        FixedHeader header;
        std::memcpy(&header.body_len, recv_buf.data(), HEADER_LEN);
        uint32_t body_len = ntohl(header.body_len); // 大端转主机序
        // 2.2 检查包体是否完整
        if (recv_buf.size() < HEADER_LEN + body_len) {
            break; // 包体不完整,等待下一次数据
        }
        // 2.3 提取包体
        std::string body = recv_buf.substr(HEADER_LEN, body_len);
        // 2.4 移除已处理的包头+包体
        recv_buf.erase(0, HEADER_LEN + body_len);
        // 步骤3:Echo响应(原封不动返回)
        std::cout << "客户端" << fd << "收到数据:" << body << "(长度:" << body_len << ")" << std::endl;
        send(fd, recv_buf.data() - HEADER_LEN - body_len, HEADER_LEN + body_len, 0);
    }
}
// 6. 服务器初始化:创建socket+绑定+监听
int init_server_socket() {
    // 6.1 创建TCP socket
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        throw std::runtime_error("socket创建失败:" + std::string(strerror(errno)));
    }
    // 6.2 设置SO_REUSEADDR+SO_REUSEPORT(避惊群+端口复用)
    int reuse = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
        close(server_fd);
        throw std::runtime_error("setsockopt SO_REUSEADDR失败:" + std::string(strerror(errno)));
    }
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)) == -1) {
        close(server_fd);
        throw std::runtime_error("setsockopt SO_REUSEPORT失败:" + std::string(strerror(errno)));
    }
    // 6.3 绑定IP+端口
    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
    server_addr.sin_port = htons(PORT);
    if (bind(server_fd, reinterpret_cast<sockaddr*>(&server_addr), sizeof(server_addr)) == -1) {
        close(server_fd);
        throw std::runtime_error("bind失败:" + std::string(strerror(errno)));
    }
    // 6.4 监听(backlog=1024,排队等待的连接数)
    if (listen(server_fd, 1024) == -1) {
        close(server_fd);
        throw std::runtime_error("listen失败:" + std::string(strerror(errno)));
    }
    // 6.5 设置非阻塞(epoll推荐非阻塞socket)
    if (!set_nonblocking(server_fd)) {
        close(server_fd);
        throw std::runtime_error("set_nonblocking server_fd失败");
    }
    std::cout << "服务器socket初始化成功:fd=" << server_fd << ",端口=" << PORT << std::endl;
    return server_fd;
}
int main() {
    try {
        // 1. 初始化线程池(默认CPU核心数*2)
        ThreadPool pool;
        // 2. 初始化服务器socket
        int server_fd = init_server_socket();
        // 3. 创建epoll实例
        int epoll_fd = epoll_create1(0);
        if (epoll_fd == -1) {
            close(server_fd);
            throw std::runtime_error("epoll_create1失败:" + std::string(strerror(errno)));
        }
        // 4. 添加server_fd到epoll(边缘触发+读事件)
        epoll_event ev{};
        ev.events = EPOLLIN | EPOLLET; // ET模式+读事件
        ev.data.fd = server_fd;
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
            close(server_fd);
            close(epoll_fd);
            throw std::runtime_error("epoll_ctl add server_fd失败:" + std::string(strerror(errno)));
        }
        std::cout << "Echo服务器启动成功!支持最大并发:" << MAX_EVENTS 
                  << ",压测命令:wrk -t8 -c10000 -d30s http://127.0.0.1:" << PORT << std::endl;
        // 5. 循环处理epoll事件
        epoll_event events[MAX_EVENTS]{};
        while (true) {
            // 5.1 等待事件(-1:永久阻塞,直到有事件)
            int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
            if (nfds == -1) {
                if (errno == EINTR) {
                    continue; // 被信号中断(如Ctrl+C),继续循环
                }
                throw std::runtime_error("epoll_wait失败:" + std::string(strerror(errno)));
            }
            // 5.2 处理所有就绪事件
            for (int i = 0; i < nfds; ++i) {
                int fd = events[i].data.fd;
                // 情况1:新连接(server_fd触发事件)
                if (fd == server_fd) {
                    while (true) {
                        sockaddr_in client_addr{};
                        socklen_t addr_len = sizeof(client_addr);
                        // 接受新连接(ET模式需循环accept,避免漏连接)
                        int conn_fd = accept(server_fd, reinterpret_cast<sockaddr*>(&client_addr), &addr_len);
                        if (conn_fd == -1) {
                            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                                break; // 无更多连接,退出循环
                            } else {
                                std::cerr << "accept失败:" << strerror(errno) << std::endl;
                                break;
                            }
                        }
                        std::cout << "接到新连接:fd=" << conn_fd << ",IP=" << inet_ntoa(client_addr.sin_addr) << std::endl;
                        // 设置新连接为非阻塞+ET模式
                        if (!set_nonblocking(conn_fd)) {
                            close(conn_fd);
                            continue;
                        }
                        ev.events = EPOLLIN | EPOLLET;
                        ev.data.fd = conn_fd;
                        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
                            std::cerr << "epoll_ctl add conn_fd=" << conn_fd << "失败:" << strerror(errno) << std::endl;
                            close(conn_fd);
                            continue;
                        }
                        // 用shared_ptr管理客户端数据(加入epoll时创建,避免后期遗漏)
                        auto client = std::make_shared<ClientData>(conn_fd, client_addr);
                        // 注意:epoll_data_t无法直接存shared_ptr,需用全局map映射fd→shared_ptr(简化版暂省略,实际项目需实现)
                        // 简化处理:此处暂不存储,后续通过fd在任务中获取(实际项目需用std::unordered_map<int, std::shared_ptr<ClientData>>)
                    }
                }
                // 情况2:客户端数据(conn_fd触发事件)
                else {
                    if (events[i].events & EPOLLIN) {
                        // 从全局map获取client(简化版:暂用fd创建,实际项目需从map中取)
                        sockaddr_in dummy_addr{};
                        auto client = std::make_shared<ClientData>(fd, dummy_addr);
                        // 提交任务到线程池
                        pool.submit(handle_client, client);
                    }
                    // 情况3:写事件(如发送缓冲区满,实际项目需处理)
                    if (events[i].events & EPOLLOUT) {
                        std::cerr << "客户端" << fd << "触发写事件(未处理)" << std::endl;
                    }
                }
            }
        }
        // 理论上不会执行到此处(循环永久运行)
        close(server_fd);
        close(epoll_fd);
    } catch (const std::exception& e) {
        // 统一异常处理:打印错误并退出
        std::cerr << "服务器异常退出:" << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

6.3.3、编译与运行

1)、编译指令(需 GCC 7.0+,支持 C++11):

# 编译线程池+服务器(链接pthread,因使用std::thread)
g++ echo_server.cpp -o echo_server -std=c++11 -pthread -O2

2)、临时调大文件描述符限制(Linux 默认限制较低,无法支持 10 万连接):

# 临时生效(当前终端):允许最大102400个文件描述符
ulimit -n 102400
# 验证:查看当前限制
ulimit -n

3)、启动服务器

./echo_server

4)、高并发压测(用 wrk,避免用 ab):

# 8线程、1万并发连接、压测30秒
wrk -t8 -c10000 -d30s http://127.0.0.1:8080

压测结果参考

1 万并发下,CPU 占用≤40%,吞吐量≥6 万 QPS,无数据丢失或连接断连。

6.4、C++ I/O 新技术

6.4.1、IO_URING:Linux 新一代异步 I/O(C++ 封装)

IO_URING 是 Linux 5.1 + 的异步 I/O 框架,性能远超 epoll/libaio,C++ 中用 RAII 封装资源:

#include
 <iostream>
#include
 <cstring>
#include
 <fcntl.h>
#include
 <liburing.h>
#include
 <unistd.h>
#include
 <stdexcept>
#include
 <string>
// RAII封装IO_URING:自动初始化/销毁
class IoUring {
public:
    explicit IoUring(size_t entries) {
        // 初始化IO_URING(entries:提交队列大小)
        int ret = io_uring_queue_init(entries, &ring_, 0);
        if (ret < 0) {
            throw std::runtime_error("io_uring_queue_init失败:" + 
                std::string(strerror(-ret)) + "(需Linux 5.1+)");
        }
    }
    ~IoUring() {
        // 销毁IO_URING(RAII:自动释放资源)
        io_uring_queue_exit(&ring_);
    }
    // 禁止拷贝赋值
    IoUring(const IoUring&) = delete;
    IoUring& operator=(const IoUring&) = delete;
    // 获取提交队列项(SQE)
    io_uring_sqe* get_sqe() {
        io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
        if (!sqe) {
            throw std::runtime_error("io_uring_get_sqe失败:提交队列已满");
        }
        return sqe;
    }
    // 提交请求到内核
    void submit() {
        int ret = io_uring_submit(&ring_);
        if (ret < 0) {
            throw std::runtime_error("io_uring_submit失败:" + std::string(strerror(-ret)));
        }
    }
    // 等待请求完成并获取结果(CQE)
    io_uring_cqe* wait_cqe() {
        io_uring_cqe* cqe = nullptr;
        int ret = io_uring_wait_cqe(&ring_, &cqe);
        if (ret < 0) {
            throw std::runtime_error("io_uring_wait_cqe失败:" + std::string(strerror(-ret)));
        }
        return cqe;
    }
    // 标记CQE已处理
    void mark_cqe_seen(io_uring_cqe* cqe) {
        io_uring_cqe_seen(&ring_, cqe);
    }
private:
    io_uring ring_; // IO_URING核心结构体
};
// IO_URING异步写文件示例
int main() {
    try {
        // 1. 初始化IO_URING(提交队列大小=8)
        IoUring uring(8);
        // 2. 打开文件(O_DIRECT:避免页缓存,提升异步性能)
        int fd = open("uring_demo.txt", O_WRONLY | O_CREAT | O_DIRECT, 0644);
        if (fd < 0) {
            throw std::runtime_error("open失败:" + std::string(strerror(errno)));
        }
        // 3. 分配对齐缓冲区(O_DIRECT要求缓冲区地址是扇区大小的倍数,通常4096)
        void* buf = nullptr;
        int ret = posix_memalign(&buf, 4096, 4096);
        if (ret != 0) {
            close(fd);
            throw std::runtime_error("posix_memalign失败:" + std::string(strerror(ret)));
        }
        // 4. 准备数据
        std::string data = "C++ IO_URING异步写测试:10万并发无压力!";
        std::memcpy(buf, data.data(), data.size());
        // 5. 设置异步写请求
        io_uring_sqe* sqe = uring.get_sqe();
        // 初始化写请求:fd、缓冲区、长度、文件偏移
        io_uring_prep_write(sqe, fd, buf, data.size(), 0);
        // 设置用户数据(用于识别请求结果)
        io_uring_sqe_set_data(sqe, const_cast<char*>(data.c_str()));
        // 6. 提交请求到内核
        uring.submit();
        std::cout << "IO_URING异步写请求已提交,数据:" << data << std::endl;
        // 7. 等待请求完成
        io_uring_cqe* cqe = uring.wait_cqe();
        if (cqe->res < 0) {
            throw std::runtime_error("异步写失败:" + std::string(strerror(-cqe->res)));
        }
        // 8. 处理结果
        std::string req_data = static_cast<char*>(io_uring_cqe_get_data(cqe));
        std::cout << "请求完成:" << req_data << "(实际写入字节:" << cqe->res << ")" << std::endl;
        // 9. 清理资源
        uring.mark_cqe_seen(cqe);
        free(buf);
        close(fd);
    } catch (const std::exception& e) {
        std::cerr << "IO_URING示例异常:" << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

编译运行

# 需安装liburing-dev(Ubuntu:sudo apt install liburing-dev)
g++ uring_demo.cpp -o uring_demo -std=c++11 -luring
./uring_demo

6.4.2、C++20 协程:用户态轻量级线程

C++20 引入协程(Coroutine),配合std::asio可实现 “异步代码同步写”,避免回调地狱。以下是std::asio协程 Echo 服务器示例:

#include
 <iostream>
#include
 <asio.hpp>
#include
 <asio/co_spawn.hpp>
#include
 <asio/detached.hpp>
#include
 <asio/io_context.hpp>
#include
 <asio/ip/tcp.hpp>
#include
 <asio/signal_set.hpp>
#include
 <asio/write.hpp>
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
// 协程:处理单个客户端连接
asio::awaitable<void> handle_client(tcp::socket socket) {
    try {
        char data[1024];
        while (true) {
            // 异步读(协程暂停,直到数据到达)
            std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
            
            // 打印数据
            std::cout << "客户端" << socket.remote_endpoint() << "收到:" 
                      << std::string(data, n) << std::endl;
            
            // 异步写(Echo响应,协程暂停,直到写完成)
            co_await asio::async_write(socket, asio::buffer(data, n), asio::use_awaitable);
        }
    } catch (const std::exception& e) {
        std::cerr << "客户端" << socket.remote_endpoint() << "断开:" << e.what() << std::endl;
    }
}
// 协程:接受新连接
asio::awaitable<void> accept_connections(tcp::acceptor acceptor) {
    while (true) {
        // 异步接受连接(协程暂停,直到有新连接)
        tcp::socket socket = co_await acceptor.async_accept(asio::use_awaitable);
        std::cout << "接到新连接:" << socket.remote_endpoint() << std::endl;
        
        // 启动新协程处理客户端,不阻塞当前协程
        asio::co_spawn(acceptor.get_executor(), 
            handle_client(std::move(socket)), 
            asio::detached); //  detached:协程独立运行,无需等待
    }
}
int main() {
    try {
        // 1. 创建IO上下文(协程调度核心)
        asio::io_context io_context(1);
        // 2. 注册信号处理(Ctrl+C退出)
        asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](const std::error_code&, int) {
            io_context.stop();
            std::cout << "服务器收到退出信号,正在清理..." << std::endl;
        });
        // 3. 创建 acceptor(监听8080端口)
        tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 8080));
        // 4. 启动协程接受连接
        asio::co_spawn(io_context, accept_connections(std::move(acceptor)), asio::detached);
        // 5. 运行IO上下文(调度协程)
        std::cout << "C++20协程Echo服务器启动:端口8080" << std::endl;
        io_context.run();
    } catch (const std::exception& e) {
        std::cerr << "服务器异常:" << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

编译运行

# 需GCC 10+或Clang 12+,支持C++20
g++ coro_echo.cpp -o coro_echo -std=c++20 -lpthread
./coro_echo

核心优势

  • 协程栈仅几 KB,一个进程可创建上百万个协程;
  • 代码同步风格,无回调嵌套,可读性远超异步回调;
  • std::asio自动调度协程,无需手动管理线程。

总结

核心心法:

  1. 资源管理优先用 RAII:线程池、IO_URING、socket 都用类封装,析构时自动释放,避免内存 / 文件描述符泄漏。
  2. 高并发用 “epoll + 线程池” 或 “协程”:中小并发(1 万级)用线程池,超大规模(10 万 +)用协程或 IO_URING。
  3. 错误处理用 try-catch:C++ 异常比 C 的if-errno更集中,便于定位问题。
  4. 避免手动管理内存:用std::shared_ptr/std::unique_ptr管理动态资源,杜绝double free。

附录:C++ 网络编程工具链

工具

用途

常用命令 / 代码示例

wrk

高并发压测

wrk -t8 -c10000 -d30s http://127.0.0.1:8080

tcpdump

网络抓包(查粘包 / 断连问题)

tcpdump -i eth0 port 8080 -w echo.pcap

perf

CPU 性能分析(查线程池瓶颈)

perf top -p $(pidof echo_server)

std::asio

C++ 异步网络库(协程 /epoll 封装)

asio::io_context io; tcp::socket sock(io);

liburing

IO_URING C++ 封装库

IoUring uring(8); uring.submit();

整理了一波干货文章✨ 直接覆盖Linux C/C++全成长赛道,不管你是刚入坑的小白想找对方向,还是摸爬滚打的老炮想突破瓶颈,都能挖到适配的内容~ 主打一个帮大家避坑、少走弯路,技术升级一路开挂!

✨ 先破局:C++到底值不值得冲?

是不是总刷到“劝退C++”的帖子,转头又刷到大厂核心岗疯抢?别内耗!别纠结!速看《为什么很多人劝退学C++,但大厂核心岗位还是要C++?》,帮你理清思路,放心冲就完事,主打一个不踩认知坑~

✨ 后端党集合|按大厂标准打怪升级

想深耕Linux C/C++后端,却对着一堆知识点一脸懵?《【大厂标准】Linux C/C++后端进阶学习路线》直接帮你捋顺学习脉络,搭好能力框架,进阶路上不慌不忙,主打一个高效上分!

✨ 音视频爱好者速来|入门不懵圈

对音视频流媒体开发狠狠心动,但一上手就卡壳?《音视频流媒体高级开发 - 学习路线》把核心技术拆得明明白白,帮你快速搭好知识框架,轻松开启学习模式,新手也能快速入门~

✨ Qt选手狂喜|桌面/嵌入式通吃

不管是想搞桌面应用,还是深耕嵌入式开发,《C++ Qt学习路线一条龙!(桌面开发 & 嵌入式开发)》直接拿捏~ 从入门到实战一步到位,轻松搞定项目落地,不用自己瞎琢磨,主打一个省心!

✨ 硬核玩家专属|深挖Linux内核

想钻透操作系统底层,突破技术天花板?《Linux内核学习指南,硬核修炼手册》分享超实用的学习小技巧,适合愿意沉下心深耕的宝子,主打一个硬核提升!

✨ 面试冲刺|八股文速查神器

备战技术面试,需要快速复盘知识点?《C++高频八股文面试题1000题(三)》直接当复习手册用,帮你巩固核心考点,面试时气场拉满,不慌不乱,主打一个稳稳上岸!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐