一、Linux 中常用的 IO 模型

1、阻塞 IO(Blocking IO)

        进程发起 read/write 调用后,会一直阻塞直到数据就绪并完成拷贝。会一直阻塞进程,除非开个线程。

	// 读取/发送数据(阻塞操作)
	ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);

2、非阻塞 IO(Non-blocking IO)

         IO 操作立即返回,若无数据则返回 EAGAIN 或 EWOULDBLOCK,需轮询。需要有个循环一直读取数据,很耗费CPU资源。

	//设置文件描述符为非阻塞
	int flags = fcntl(fd, F_GETFL, 0);

	// 非阻塞读取/写入数据
	ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
	send(new_socket, response, strlen(response), 0);

3、IO 多路复用(select/poll/epoll)

        单线程可以监听多个文件描述符,任一就绪时通知进程处理。一个客户端连接可以看作是一个文件描述符。

3.1 IO 多路复用(select)

        select——可以同时监听多个文件描述符,只要有一个文件描述符有消息,就会返回。然后遍历所有的文件描述符,(通过对每一个socket调用FD_ISSET(fd, &readfds),就能知道是那个文件描述符准备就绪了)。
        缺点就是需要遍历所有的文件描述符,并且存在最大数量限制(通常由FD_SETSIZE定义,默认 1024)。每次代码中需要手动维护fd_set,每次调用前需重新初始化和设置需要监控的 fd。仅支持水平触发(LT,Level Triggered):当 fd 就绪后,若未被处理,每次调用select都会再次通知该 fd 就绪。

while(true)
{
	fd_set readfds;
	FD_ZERO(&readfds);				// 初始化文件描述符集合
	FD_SET(server_fd, &readfds); 	// 将服务器socket加入集合
	for (int sd : client_sockets)   // 将客户端socket加入集合
	{
		FD_SET(sd, &readfds);
	}
	
	// 等待活动(超时时间为NULL,表示无限等待)
	int activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
	
	// 检查服务器socket是否有新连接
	if (FD_ISSET(server_fd, &readfds)) 
	{
		new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))
		client_sockets.push_back(new_socket);
	}
	
	// 检查客户端socket是否有数据
	for (auto it = client_sockets.begin(); it != client_sockets.end(); ) 
	{
		if (FD_ISSET(sd, &readfds)) 
		{
			// 读取数据
			ssize_t valread = read(sd, buffer, BUFFER_SIZE);
			
			//当客户端断开连接时,从select移除该客户端
			it = fds.erase(fds.begin() + i);
		}
		++it;
	}
}

3.2 IO 多路复用(poll)

        poll——和select是相似的,扩展了可以监听的最大描述符个数。使用pollfd结构体数组(struct pollfd { int fd; short events; short revents; })存储文件描述符及其事件,更灵活。无最大数量限制,可监控任意数量的 fd。并且无需每次重新初始化数组。
     缺点就是也需要遍历所有的文件描述符(通过fds[i].revents & POLLIN,知道那个socket是触发的返回,再通过fds[i].fd == server_fd判断是是不是服务器,不是就是客户端了),也仅支持水平触发(LT,Level Triggered)。

	std::vector<struct pollfd> fds;		    //pollfd结构体数组
    fds.push_back({server_fd, POLLIN, 0});	// 添加服务器socket到pollfd数组
    
    while (true) 
    {
        // 等待活动(超时时间为-1,表示无限等待)
        int activity = poll(fds.data(), fds.size(), -1);

		// 检查每个文件描述符
        for (size_t i = 0; i < fds.size(); ++i) 
        {
            if (fds[i].revents & POLLIN) 
            {
                
				if (fds[i].fd == server_fd) 	// 服务器socket有新连接
                {
                	//有新链接就加入
                	new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))
                	fds.push_back({new_socket, POLLIN, 0});
                }
                else							// 客户端socket有数据
                {
	                //当客户端断开连接时,从poll移除该客户端
					fds.erase(fds.begin() + i);
                }
        }
	}

3.3 IO 多路复用(epoll)

        epoll——是linux特有的比较高效的io多路复用机制。基于内核中的红黑树(存储所有需要监控的 fd)和就绪链表(存储就绪的 fd)实现。采用事件驱动方式,用户态通过epoll_wait直接获取就绪的 fd 列表,无需遍历所有监控的 fd(通过events[i].data.fd判断是哪个socket,events[i].events会字段告诉你具体是什么事件如EPOLLIN)。支持水平触发(LT)和边缘触发(ET)两种模式。适用于高并发场景,是高性能服务器的首选方案。

  1. LT(Level-Triggered,水平触发):
         对于读事件 EPOLLIN,只要socket上有未读完的数据EPOLLIN 就会一直触发;对于写事件 EPOLLOUT,只要socket可写EPOLLOUT 就会一直触发。
  2. 边缘触发(ET):
         对于读事件 EPOLLIN,只有socket上的数据从无到有EPOLLIN 才会触发;对于写事件 EPOLLOUT,只有在socket写缓冲区从不可写变为可写,EPOLLOUT才会触发。(需配合非阻塞 IO 一次性读完)
	// epoll事件结构体
    struct epoll_event ev, events[MAX_EVENTS];
	
	//将服务器添加到,epoll事件驱动中
	ev.events = EPOLLIN;
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) 
    {
    }
    
	while (true) 
    {
        // 等待事件发生
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) 
        {
            perror("epoll_wait");
            continue;
        }
        
		// 处理就绪事件
        for (int i = 0; i < nfds; ++i) 
        {
            // 服务器socket有新连接
            if (events[i].data.fd == server_fd) 
            {
            	new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))
            	
            	//设置客户端为非阻塞
            	setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))
            	// 添加客户端socket到epoll
                ev.events = EPOLLIN | EPOLLET;  // 使用边缘触发模式
                ev.data.fd = new_socket;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1) 
                {
                }
            }
            else							// 客户端socket有数据
			{
				//当客户端断开连接时,从epoll移除该客户端
				epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
			}
	}

3.4 select、poll、epoll差异

机制 select poll epoll
最大 fd 限制 默认由FD_SETSIZE限制(通常 1024) 无,仅受系统内存和进程 fd 上限约束 无,仅受系统内存和进程 fd 上限约束
内核态复制开销 每次调用select时,需将全部监控 fd 从用户态复制到内核态(O (n) 开销) 每次调用poll时,需将全部pollfd数组从用户态复制到内核态(O (n) 开销) 仅在首次通过epoll_ctl(EPOLL_CTL_ADD)添加 fd 时复制,后续修改 / 查询无需重复复制(O (1) 开销)
就绪 fd 遍历开销 内核仅返回 “是否有 fd 就绪”,用户态需遍历全部监控 fd 判断就绪状态(O (n),n 为总监控 fd 数) 同 select,用户态需遍历全部pollfd数组检查revents字段(O (n)) 内核直接将就绪 fd 放入 “就绪链表”,用户态通过epoll_wait直接获取就绪列表,无需遍历总 fd(O (k),k 为就绪 fd 数)
触发模式 仅支持水平触发(LT,Level Triggered) 仅支持水平触发(LT) 支持水平触发(LT,默认)和边缘触发(ET,Edge Triggered)
底层数据结构 基于位图(fd_set)实现 基于动态数组(struct pollfd)实现 基于红黑树(管理总监控 fd)+ 就绪链表(存储就绪 fd)实现
编程复杂度 较低,但需手动维护fd_set(每次调用前需重置) 中等,pollfd数组可复用,但仍需遍历检查 较高(需理解epoll_ctl的添加 / 修改 / 删除逻辑,ET 模式需配合非阻塞 IO)
典型应用场景 低并发、fd 数量固定且较少的场景(如简单工具、小规模客户端) 中等并发、fd 数量适中的场景(如中小型服务) 高并发、fd 数量庞大的场景(如 Web 服务器 Nginx、缓存 Redis、消息队列 Kafka)
性能瓶颈 fd 数量超过 1024 后无法使用,且遍历开销随 fd 数增长急剧上升 遍历开销随 fd 数增长线性上升,高并发下性能骤降 无明显瓶颈,性能随 fd 数增长基本稳定

4、信号驱动 IO(Signal-driven IO)

        通过 SIGIO 信号异步通知进程数据已就绪,进程再发起读取。用的很少,而且实现相对复杂信号队列可能溢出,不适合高并发场景,了解名字即可。

5、异步 IO(Asynchronous IO)

        POSIX AIO(Linux 原生):aio_read / aio_write 立即返回,内核完成所有操作后通知进程。它的底层是基于线程池模拟异步,并非真正的内核级异步,且对网络 IO 的支持不完整,性能不佳,使用复杂,了解即可。

6、异步 IO(io_uring)

        真正的异步 IO(io_uring)Linux 5.1+ 引入,性能远超 epoll,支持真正的异步读写(包括网络)是未来高性能 IO 的方向

	// 存储客户端连接信息
	struct Connection 
	{
	    int fd;
	    char buffer[BUFFER_SIZE];
	    sockaddr_in addr;
	    socklen_t addr_len;
	};
	
	// 自定义IO操作数据
	struct IOUringData 
	{
	    enum Type { ACCEPT, READ, WRITE } type;
	    Connection *conn;
	};

	// 初始化io_uring
	struct io_uring ring;
    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) 
    {
        printf("io_uring_queue_init failed");
    }

	// 存储连接的数组
    std::vector<Connection*> connections;
    
    // 准备接受连接的请求
    auto *accept_conn = new Connection;
    accept_conn->fd = server_fd;
    accept_conn->addr_len = sizeof(accept_conn->addr);
    
    auto *accept_data = new IOUringData;
    accept_data->type = IOUringData::ACCEPT;
    accept_data->conn = accept_conn;

    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    io_uring_prep_accept(sqe, server_fd, 
                         (struct sockaddr*)&accept_conn->addr, 
                         &accept_conn->addr_len, 0);
    io_uring_sqe_set_data(sqe, accept_data);
    io_uring_submit(&ring);

	while (true) 
    {
        // 等待IO事件完成
        struct io_uring_cqe *cqe;
        int ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) 
        {
            perror("io_uring_wait_cqe failed");
            break;
        }

        // 处理完成的事件
        IOUringData *data = (IOUringData*)io_uring_cqe_get_data(cqe);
        int res = cqe->res;
        if (res < 0) 
        {
            std::cerr << "操作失败: " << strerror(-res) << std::endl;
        }
        else 
        {
            switch (data->type) 
            {
                case IOUringData::ACCEPT:
                {
                }
                case IOUringData::WRITE:
                {
                    auto *read_data = new IOUringData;
                    read_data->type = IOUringData::READ;
                    read_data->conn = data->conn;

                    struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
                    io_uring_prep_recv(read_sqe, data->conn->fd, 
                                      data->conn->buffer, BUFFER_SIZE - 1, 0);
                    io_uring_sqe_set_data(read_sqe, read_data);
                    io_uring_submit(&ring);
                    delete data;
                    break;
                }
            }
        }
        // 标记CQE为已处理
        io_uring_cqe_seen(&ring, cqe);
    }
    // 清理资源
    for (auto conn : connections) 
    {
        close(conn->fd);
        delete conn;
    }
    io_uring_queue_exit(&ring);

7、异步 IO(iocp)window上的异步IO

        真正的异步 IO,所有 IO 操作都是异步的,不会阻塞主线程。通过完成端口队列通知 IO 操作完成。说个冷知识,iocp是windows 1993发布的,epoll是2002年,io_uring是2019年。

	//创建异步io
    m_privater->m_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);

//工作线程
void EventWorkerThread()
{
    while (m_privater->m_running)
    {
        if (!GetQueuedCompletionStatus(m_privater->m_iocp, &lpNumberOfBytesTransferred, (PULONG_PTR) &lpCompletionKey,(LPOVERLAPPED *) &ioContext, INFINITE))
        {
        }
        switch (ioContext->type)
        {
        case IOType::Read:
        {
        }
        case IOType::Write:
        {
        }
    }
}

//接受函数
void AcceptLoop()
{
	while (m_privater->m_running)
    {
        SOCKET clientSocket = accept(m_privater->m_listenSocket, NULL, NULL);

        // 将接入的套接字也设置为非阻塞的
        unsigned long ul = 1;
        ioctlsocket(clientSocket, FIONBIO, &ul)
        
       	//将客户端绑定到iocp
		CreateIoCompletionPort((HANDLE)clientSocket, m_privater->m_iocp, 0, 0)

        auto ioContext = new IOContext;
        ioContext->socket = clientSocket;
        ioContext->type = IOType::Read;
        
        // 提交一次异步接受事件,这样只要客户端有数据接受,会在工作线程获取到队列
        auto rt = WSARecv(clientSocket, &ioContext->wsaBuf, 1, &nBytes, &dwFlags, &ioContext->overlapped, nullptr);
    }

二、补充

1、epoll、io_uring、iocp差异

        epoll、io_uring、iocp三种模型是分别是,linux和windows系统中很流行的io模型。基本都支持数十万,甚至百万连接。io_uring、iocp和epoll最大的区别是,上下文切换较少,数据由内核拷贝,效率极高,拥有极致的性能体验。

对比项 epoll IOCP io_uring
模型 Reactor Proactor Proactor
异步程度 半异步(就绪通知) 全异步(完成通知) 全异步(完成通知)
数据拷贝者 应用线程 内核 内核
线程模型 用户管理 内核线程池 用户管理
跨平台 Linux Windows Linux

2、示例代码

1. 阻塞 IO(Blocking IO)

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() 
{
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[BUFFER_SIZE] = {0};

    // 创建socket文件描述符
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定socket到端口
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听连接(最大等待队列长度为3)
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    std::cout << "阻塞IO服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 接受客户端连接(阻塞操作)
        if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) 
        {
            perror("accept");
            continue;
        }

        std::cout << "新客户端连接" << std::endl;

        // 读取数据(阻塞操作)
        ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
        if (valread < 0) 
        {
            perror("read failed");
            close(new_socket);
            continue;
        }
        
        std::cout << "收到数据: " << buffer << std::endl;

        // 发送响应
        const char *response = "消息已收到";
        send(new_socket, response, strlen(response), 0);
        std::cout << "响应已发送" << std::endl;

        close(new_socket);
    }

    return 0;
}

2. 非阻塞 IO(Non-blocking IO)

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>

#define PORT 8080
#define BUFFER_SIZE 1024

// 设置文件描述符为非阻塞模式
int set_non_blocking(int fd) 
{
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) 
    {
        return -1;
    }
    
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) 
    {
        return -1;
    }
    
    return 0;
}

int main() 
{
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[BUFFER_SIZE] = {0};

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    // 设置服务器socket为非阻塞
    if (set_non_blocking(server_fd) == -1) 
    {
        perror("set non-blocking failed");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    std::cout << "非阻塞IO服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 非阻塞接受连接
        new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
        
        if (new_socket < 0) 
        {
            // EAGAIN表示当前没有连接,不是真正的错误
            if (errno != EAGAIN && errno != EWOULDBLOCK) 
            {
                perror("accept error");
            }
            // 短暂休眠,避免CPU占用过高
            usleep(1000);
            continue;
        }

        std::cout << "新客户端连接" << std::endl;

        // 设置客户端socket为非阻塞
        if (set_non_blocking(new_socket) == -1) 
        {
            perror("set client socket non-blocking failed");
            close(new_socket);
            continue;
        }

        // 非阻塞读取数据
        ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
        if (valread < 0) 
        {
            if (errno != EAGAIN && errno != EWOULDBLOCK) 
            {
                perror("read error");
                close(new_socket);
            }
            continue;
        }
        else if (valread == 0) 
        {
            // 客户端关闭连接
            std::cout << "客户端断开连接" << std::endl;
            close(new_socket);
            continue;
        }

        std::cout << "收到数据: " << buffer << std::endl;

        // 发送响应
        const char *response = "消息已收到";
        send(new_socket, response, strlen(response), 0);
        std::cout << "响应已发送" << std::endl;

        close(new_socket);
    }

    return 0;
}

3. IO 多路复用 - select

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/select.h>
#include <vector>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 10

int main() 
{
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[BUFFER_SIZE] = {0};
    
    // 客户端socket集合
    std::vector<int> client_sockets;

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    std::cout << "select服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 初始化文件描述符集合
        fd_set readfds;
        FD_ZERO(&readfds);
        
        // 将服务器socket加入集合
        FD_SET(server_fd, &readfds);
        int max_sd = server_fd;

        // 将客户端socket加入集合
        for (int sd : client_sockets) 
        {
            FD_SET(sd, &readfds);
            if (sd > max_sd) 
            {
                max_sd = sd;
            }
        }

        // 等待活动(超时时间为NULL,表示无限等待)
        int activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
        
        if ((activity < 0) && (errno != EINTR)) 
        {
            perror("select error");
        }

        // 检查服务器socket是否有新连接
        if (FD_ISSET(server_fd, &readfds)) 
        {
            if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) 
            {
                perror("accept");
                continue;
            }

            std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;

            // 将新客户端加入集合
            if (client_sockets.size() < MAX_CLIENTS) 
            {
                client_sockets.push_back(new_socket);
                std::cout << "客户端数量: " << client_sockets.size() << std::endl;
            }
            else 
            {
                const char *msg = "服务器忙,请稍后再试";
                send(new_socket, msg, strlen(msg), 0);
                close(new_socket);
                std::cout << "服务器已满,拒绝连接" << std::endl;
            }
        }

        // 检查客户端socket是否有数据
        for (auto it = client_sockets.begin(); it != client_sockets.end(); ) 
        {
            int sd = *it;
            
            if (FD_ISSET(sd, &readfds)) 
            {
                // 读取数据
                ssize_t valread = read(sd, buffer, BUFFER_SIZE);
                
                if (valread == 0) 
                {
                    // 客户端断开连接
                    getpeername(sd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
                    std::cout << "客户端断开连接,socket fd: " << sd << std::endl;
                    
                    close(sd);
                    it = client_sockets.erase(it);
                }
                else if (valread > 0) 
                {
                    // 处理数据
                    buffer[valread] = '\0';
                    std::cout << "收到数据 (fd: " << sd << "): " << buffer << std::endl;
                    
                    // 发送响应
                    const char *response = "消息已收到";
                    send(sd, response, strlen(response), 0);
                }
                else 
                {
                    // 读取错误
                    perror("read error");
                    close(sd);
                    it = client_sockets.erase(it);
                }
            }
            else 
            {
                ++it;
            }
        }
    }

    return 0;
}

4. IO 多路复用 - poll

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <poll.h>
#include <vector>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 10

int main() 
{
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[BUFFER_SIZE] = {0};
    
    // pollfd结构数组
    std::vector<struct pollfd> fds;

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 添加服务器socket到pollfd数组
    fds.push_back({server_fd, POLLIN, 0});

    std::cout << "poll服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 等待活动(超时时间为-1,表示无限等待)
        int activity = poll(fds.data(), fds.size(), -1);
        
        if (activity < 0) 
        {
            perror("poll error");
            continue;
        }

        // 检查每个文件描述符
        for (size_t i = 0; i < fds.size(); ++i) 
        {
            if (fds[i].revents & POLLIN) 
            {
                // 服务器socket有新连接
                if (fds[i].fd == server_fd) 
                {
                    if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) 
                    {
                        perror("accept");
                        continue;
                    }

                    std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;

                    // 添加新客户端到pollfd数组
                    if (fds.size() < MAX_CLIENTS + 1)  // +1 是因为包含server_fd
                    {
                        fds.push_back({new_socket, POLLIN, 0});
                        std::cout << "客户端数量: " << fds.size() - 1 << std::endl;
                    }
                    else 
                    {
                        const char *msg = "服务器忙,请稍后再试";
                        send(new_socket, msg, strlen(msg), 0);
                        close(new_socket);
                        std::cout << "服务器已满,拒绝连接" << std::endl;
                    }
                }
                // 客户端socket有数据
                else 
                {
                    ssize_t valread = read(fds[i].fd, buffer, BUFFER_SIZE);
                    
                    if (valread == 0) 
                    {
                        // 客户端断开连接
                        getpeername(fds[i].fd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
                        std::cout << "客户端断开连接,socket fd: " << fds[i].fd << std::endl;
                        
                        close(fds[i].fd);
                        // 从数组中移除
                        fds.erase(fds.begin() + i);
                        i--;  // 调整索引,因为删除了一个元素
                    }
                    else if (valread > 0) 
                    {
                        // 处理数据
                        buffer[valread] = '\0';
                        std::cout << "收到数据 (fd: " << fds[i].fd << "): " << buffer << std::endl;
                        
                        // 发送响应
                        const char *response = "消息已收到";
                        send(fds[i].fd, response, strlen(response), 0);
                    }
                    else 
                    {
                        // 读取错误
                        perror("read error");
                        close(fds[i].fd);
                        fds.erase(fds.begin() + i);
                        i--;
                    }
                }
            }
        }
    }

    return 0;
}

5. IO 多路复用 - epoll

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <vector>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10

int main() 
{
    int server_fd, new_socket, epoll_fd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[BUFFER_SIZE] = {0};
    
    // epoll事件结构体
    struct epoll_event ev, events[MAX_EVENTS];

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 创建epoll实例
    if ((epoll_fd = epoll_create1(0)) == -1) 
    {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    // 添加服务器socket到epoll
    ev.events = EPOLLIN;
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) 
    {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }

    std::cout << "epoll服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 等待事件发生
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) 
        {
            perror("epoll_wait");
            continue;
        }

        // 处理就绪事件
        for (int i = 0; i < nfds; ++i) 
        {
            // 服务器socket有新连接
            if (events[i].data.fd == server_fd) 
            {
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) 
                {
                    perror("accept");
                    continue;
                }

                std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;

                // 设置客户端socket为非阻塞(可选,但通常与epoll配合使用)
                if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
                {
                    perror("setsockopt for client");
                    close(new_socket);
                    continue;
                }

                // 添加客户端socket到epoll
                ev.events = EPOLLIN | EPOLLET;  // 使用边缘触发模式
                ev.data.fd = new_socket;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1) 
                {
                    perror("epoll_ctl: new_socket");
                    close(new_socket);
                    continue;
                }
            }
            // 客户端socket有数据
            else 
            {
                ssize_t valread = read(events[i].data.fd, buffer, BUFFER_SIZE);
                
                if (valread == 0) 
                {
                    // 客户端断开连接
                    std::cout << "客户端断开连接,socket fd: " << events[i].data.fd << std::endl;
                    
                    // 从epoll中移除并关闭socket
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
                    close(events[i].data.fd);
                }
                else if (valread > 0) 
                {
                    // 处理数据
                    buffer[valread] = '\0';
                    std::cout << "收到数据 (fd: " << events[i].data.fd << "): " << buffer << std::endl;
                    
                    // 发送响应
                    const char *response = "消息已收到";
                    send(events[i].data.fd, response, strlen(response), 0);
                }
                else 
                {
                    // 读取错误
                    perror("read error");
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
                    close(events[i].data.fd);
                }
            }
        }
    }

    close(epoll_fd);
    return 0;
}

6. 信号驱动 IO(Signal-driven IO)

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <errno.h>
#include <vector>

#define PORT 8080
#define BUFFER_SIZE 1024

// 全局变量存储客户端socket
std::vector<int> client_sockets;
char buffer[BUFFER_SIZE];

// 信号处理函数
void sigio_handler(int signo) 
{
    if (signo != SIGIO) 
    {
        return;
    }

    // 遍历客户端socket检查事件
    for (auto it = client_sockets.begin(); it != client_sockets.end(); ) 
    {
        int fd = *it;
        ssize_t n = recv(fd, buffer, BUFFER_SIZE - 1, MSG_DONTWAIT);
        
        if (n > 0) 
        {
            buffer[n] = '\0';
            std::cout << "收到数据 (fd: " << fd << "): " << buffer << std::endl;
            
            // 发送响应
            const char *response = "消息已收到";
            send(fd, response, strlen(response), 0);
            ++it;
        }
        else if (n == 0 || (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)) 
        {
            // 客户端断开连接或发生错误
            std::cout << "客户端断开连接,socket fd: " << fd << std::endl;
            close(fd);
            it = client_sockets.erase(it);
        }
        else 
        {
            ++it;
        }
    }
}

int main() 
{
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    struct sigaction sa;

    // 设置信号处理函数
    sa.sa_handler = sigio_handler;
    sa.sa_flags = 0;
    sigemptyset(&sa.sa_mask);
    
    if (sigaction(SIGIO, &sa, NULL) == -1) 
    {
        perror("sigaction");
        exit(EXIT_FAILURE);
    }

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 设置进程为socket的所有者,以便接收SIGIO信号
    if (fcntl(server_fd, F_SETOWN, getpid()) == -1) 
    {
        perror("fcntl F_SETOWN");
        exit(EXIT_FAILURE);
    }

    std::cout << "信号驱动IO服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 接受客户端连接(阻塞)
        if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) 
        {
            perror("accept");
            continue;
        }

        std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;

        // 设置客户端socket为非阻塞
        int flags = fcntl(new_socket, F_GETFL, 0);
        if (fcntl(new_socket, F_SETFL, flags | O_NONBLOCK) == -1) 
        {
            perror("fcntl F_SETFL O_NONBLOCK");
            close(new_socket);
            continue;
        }

        // 设置客户端socket的所有者,并启用信号驱动IO
        if (fcntl(new_socket, F_SETOWN, getpid()) == -1) 
        {
            perror("fcntl F_SETOWN for client");
            close(new_socket);
            continue;
        }

        flags = fcntl(new_socket, F_GETFL, 0);
        if (fcntl(new_socket, F_SETFL, flags | O_ASYNC) == -1) 
        {
            perror("fcntl F_SETFL O_ASYNC");
            close(new_socket);
            continue;
        }

        // 添加到客户端列表
        client_sockets.push_back(new_socket);
    }

    return 0;
}

7. 异步 IO(Asynchronous IO)

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <aio.h>
#include <signal.h>
#include <fcntl.h>
#include <vector>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_PENDING 10

// 异步IO控制块结构
struct AsyncIOData 
{
    int fd;
    struct aiocb aio;
    char buffer[BUFFER_SIZE];
};

// 存储所有异步IO请求
std::vector<AsyncIOData*> async_requests;

// 信号处理函数 - 处理异步IO完成事件
void aio_completion_handler(int signo, siginfo_t *info, void *context) 
{
    if (signo != SIGIO || info->si_code != SI_ASYNCIO) 
    {
        return;
    }

    // 获取完成的异步IO控制块
    struct aiocb *aio = (struct aiocb*)info->si_value.sival_ptr;
    if (!aio) 
    {
        return;
    }

    // 查找对应的AsyncIOData
    AsyncIOData* data = nullptr;
    for (auto req : async_requests) 
    {
        if (&req->aio == aio) 
        {
            data = req;
            break;
        }
    }

    if (!data) 
    {
        return;
    }

    // 检查IO操作结果
    int ret = aio_return(aio);
    if (ret > 0) 
    {
        // 读取到数据
        data->buffer[ret] = '\0';
        std::cout << "收到数据 (fd: " << data->fd << "): " << data->buffer << std::endl;
        
        // 发送响应
        const char *response = "消息已收到";
        send(data->fd, response, strlen(response), 0);
        
        // 重新发起异步读取
        memset(&data->aio, 0, sizeof(struct aiocb));
        data->aio.aio_fildes = data->fd;
        data->aio.aio_buf = data->buffer;
        data->aio.aio_nbytes = BUFFER_SIZE - 1;
        data->aio.aio_offset = 0;
        data->aio.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
        data->aio.aio_sigevent.sigev_signo = SIGIO;
        data->aio.aio_sigevent.sigev_value.sival_ptr = &data->aio;

        if (aio_read(&data->aio) == -1) 
        {
            perror("aio_read failed");
            close(data->fd);
            // 从列表中移除
            for (auto it = async_requests.begin(); it != async_requests.end(); ++it) 
            {
                if (*it == data) 
                {
                    delete *it;
                    async_requests.erase(it);
                    break;
                }
            }
        }
    }
    else if (ret == 0) 
    {
        // 客户端断开连接
        std::cout << "客户端断开连接,socket fd: " << data->fd << std::endl;
        close(data->fd);
        // 从列表中移除
        for (auto it = async_requests.begin(); it != async_requests.end(); ++it) 
        {
            if (*it == data) 
            {
                delete *it;
                async_requests.erase(it);
                break;
            }
        }
    }
    else 
    {
        // 发生错误
        perror("aio error");
        close(data->fd);
        // 从列表中移除
        for (auto it = async_requests.begin(); it != async_requests.end(); ++it) 
        {
            if (*it == data) 
            {
                delete *it;
                async_requests.erase(it);
                break;
            }
        }
    }
}

int main() 
{
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    struct sigaction sa;

    // 设置信号处理函数
    sa.sa_sigaction = aio_completion_handler;
    sa.sa_flags = SA_SIGINFO;
    sigemptyset(&sa.sa_mask);
    
    if (sigaction(SIGIO, &sa, NULL) == -1) 
    {
        perror("sigaction");
        exit(EXIT_FAILURE);
    }

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, 3) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    std::cout << "异步IO服务器启动,监听端口 " << PORT << std::endl;

    while (true) 
    {
        // 接受客户端连接(阻塞)
        if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) 
        {
            perror("accept");
            continue;
        }

        std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;

        // 创建异步IO数据结构
        AsyncIOData *data = new AsyncIOData;
        data->fd = new_socket;
        memset(&data->aio, 0, sizeof(struct aiocb));

        // 设置异步IO参数
        data->aio.aio_fildes = new_socket;
        data->aio.aio_buf = data->buffer;
        data->aio.aio_nbytes = BUFFER_SIZE - 1;
        data->aio.aio_offset = 0;
        data->aio.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
        data->aio.aio_sigevent.sigev_signo = SIGIO;
        data->aio.aio_sigevent.sigev_value.sival_ptr = &data->aio;

        // 发起异步读取
        if (aio_read(&data->aio) == -1) 
        {
            perror("aio_read failed");
            close(new_socket);
            delete data;
            continue;
        }

        // 添加到请求列表
        async_requests.push_back(data);

        // 限制最大并发请求数
        if (async_requests.size() > MAX_PENDING) 
        {
            AsyncIOData* oldest = async_requests.front();
            aio_cancel(oldest->fd, &oldest->aio);
            close(oldest->fd);
            delete oldest;
            async_requests.erase(async_requests.begin());
            std::cout << "达到最大连接数,关闭最早的连接" << std::endl;
        }
    }

    // 清理资源
    for (auto req : async_requests) 
    {
        close(req->fd);
        delete req;
    }
    close(server_fd);

    return 0;
}

8. 异步 IO (io_uring)

#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <liburing.h>
#include <vector>
#include <errno.h>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CONNECTIONS 10
#define QUEUE_DEPTH 32

// 存储客户端连接信息
struct Connection 
{
    int fd;
    char buffer[BUFFER_SIZE];
    sockaddr_in addr;
    socklen_t addr_len;
};

// 自定义IO操作数据
struct IOUringData 
{
    enum Type { ACCEPT, READ, WRITE } type;
    Connection *conn;
};

int main() 
{
    int server_fd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    
    // 初始化io_uring
    struct io_uring ring;
    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) 
    {
        perror("io_uring_queue_init failed");
        exit(EXIT_FAILURE);
    }

    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) 
    {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置socket选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) 
    {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    // 绑定
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) 
    {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    // 监听
    if (listen(server_fd, MAX_CONNECTIONS) < 0) 
    {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    std::cout << "io_uring服务器启动,监听端口 " << PORT << std::endl;

    // 存储连接的数组
    std::vector<Connection*> connections;

    // 准备接受连接的请求
    auto *accept_conn = new Connection;
    accept_conn->fd = server_fd;
    accept_conn->addr_len = sizeof(accept_conn->addr);
    
    auto *accept_data = new IOUringData;
    accept_data->type = IOUringData::ACCEPT;
    accept_data->conn = accept_conn;

    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    io_uring_prep_accept(sqe, server_fd, 
                         (struct sockaddr*)&accept_conn->addr, 
                         &accept_conn->addr_len, 0);
    io_uring_sqe_set_data(sqe, accept_data);
    io_uring_submit(&ring);

    while (true) 
    {
        // 等待IO事件完成
        struct io_uring_cqe *cqe;
        int ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) 
        {
            perror("io_uring_wait_cqe failed");
            break;
        }

        // 处理完成的事件
        IOUringData *data = (IOUringData*)io_uring_cqe_get_data(cqe);
        int res = cqe->res;
        
        if (res < 0) 
        {
            std::cerr << "操作失败: " << strerror(-res) << std::endl;
        }
        else 
        {
            switch (data->type) 
            {
                case IOUringData::ACCEPT:
                {
                    // 处理新连接
                    int client_fd = res;
                    std::cout << "新客户端连接,socket fd: " << client_fd << std::endl;

                    // 创建新连接对象
                    auto *new_conn = new Connection;
                    new_conn->fd = client_fd;
                    new_conn->addr = data->conn->addr;
                    new_conn->addr_len = data->conn->addr_len;
                    connections.push_back(new_conn);

                    // 准备读取数据
                    auto *read_data = new IOUringData;
                    read_data->type = IOUringData::READ;
                    read_data->conn = new_conn;

                    struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
                    io_uring_prep_recv(read_sqe, client_fd, 
                                      new_conn->buffer, BUFFER_SIZE - 1, 0);
                    io_uring_sqe_set_data(read_sqe, read_data);

                    // 重新提交accept请求以接受新连接
                    struct io_uring_sqe *accept_sqe = io_uring_get_sqe(&ring);
                    io_uring_prep_accept(accept_sqe, server_fd, 
                                        (struct sockaddr*)&data->conn->addr, 
                                        &data->conn->addr_len, 0);
                    io_uring_sqe_set_data(accept_sqe, data);

                    io_uring_submit(&ring);
                    break;
                }
                
                case IOUringData::READ:
                {
                    // 处理读取的数据
                    ssize_t bytes_read = res;
                    if (bytes_read == 0) 
                    {
                        // 客户端断开连接
                        std::cout << "客户端断开连接,socket fd: " << data->conn->fd << std::endl;
                        close(data->conn->fd);
                        
                        // 从连接列表中移除
                        for (auto it = connections.begin(); it != connections.end(); ++it) 
                        {
                            if (*it == data->conn) 
                            {
                                connections.erase(it);
                                break;
                            }
                        }
                        delete data->conn;
                        delete data;
                    }
                    else 
                    {
                        data->conn->buffer[bytes_read] = '\0';
                        std::cout << "收到数据 (fd: " << data->conn->fd << "): " << data->conn->buffer << std::endl;

                        // 准备发送响应
                        auto *write_data = new IOUringData;
                        write_data->type = IOUringData::WRITE;
                        write_data->conn = data->conn;

                        const char *response = "消息已收到";
                        struct io_uring_sqe *write_sqe = io_uring_get_sqe(&ring);
                        io_uring_prep_send(write_sqe, data->conn->fd, 
                                          response, strlen(response), 0);
                        io_uring_sqe_set_data(write_sqe, write_data);

                        io_uring_submit(&ring);
                        delete data;
                    }
                    break;
                }
                
                case IOUringData::WRITE:
                {
                    // 响应发送完成,准备再次读取
                    auto *read_data = new IOUringData;
                    read_data->type = IOUringData::READ;
                    read_data->conn = data->conn;

                    struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
                    io_uring_prep_recv(read_sqe, data->conn->fd, 
                                      data->conn->buffer, BUFFER_SIZE - 1, 0);
                    io_uring_sqe_set_data(read_sqe, read_data);

                    io_uring_submit(&ring);
                    delete data;
                    break;
                }
            }
        }

        // 标记CQE为已处理
        io_uring_cqe_seen(&ring, cqe);
    }

    // 清理资源
    for (auto conn : connections) 
    {
        close(conn->fd);
        delete conn;
    }
    io_uring_queue_exit(&ring);
    close(server_fd);

    return 0;
}

9. 异步 IO (iocp)

#include <iostream>
#include <winsock2.h>
#include <mswsock.h>
#include <vector>
#include <process.h>

#pragma comment(lib, "ws2_32.lib")

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_THREADS 4
#define MAX_CONNECTIONS 100

// 重叠IO结构
typedef struct _PER_IO_DATA 
{
    OVERLAPPED overlapped;
    WSABUF data_buf;
    char buffer[BUFFER_SIZE];
    int operation_type;  // 1: 接收, 2: 发送
} PER_IO_DATA, *PPER_IO_DATA;

// 每个客户端连接的数据
typedef struct _PER_HANDLE_DATA 
{
    SOCKET socket;
    sockaddr_in client_addr;
} PER_HANDLE_DATA, *PPER_HANDLE_DATA;

// 全局IOCP句柄
HANDLE g_hCompletionPort;

// 工作线程函数
unsigned int __stdcall WorkerThread(LPVOID lpParam) 
{
    DWORD bytes_transferred;
    ULONG_PTR completion_key;
    LPOVERLAPPED overlapped;
    PPER_HANDLE_DATA per_handle_data;
    PPER_IO_DATA per_io_data;

    while (true) 
    {
        // 等待IO完成
        BOOL success = GetQueuedCompletionStatus(
            g_hCompletionPort,
            &bytes_transferred,
            &completion_key,
            &overlapped,
            INFINITE
        );

        if (!success) 
        {
            std::cerr << "GetQueuedCompletionStatus failed: " << WSAGetLastError() << std::endl;
            continue;
        }

        // 无效的完成键,退出线程
        if (completion_key == 0) 
        {
            break;
        }

        per_handle_data = (PPER_HANDLE_DATA)completion_key;
        per_io_data = (PPER_IO_DATA)overlapped;

        // 客户端断开连接
        if (bytes_transferred == 0 && 
            (per_io_data->operation_type == 1 || per_io_data->operation_type == 2)) 
        {
            std::cout << "客户端断开连接" << std::endl;
            
            // 关闭socket
            closesocket(per_handle_data->socket);
            free(per_handle_data);
            free(per_io_data);
            continue;
        }

        switch (per_io_data->operation_type) 
        {
            case 1:  // 接收完成
            {
                std::cout << "收到数据: " << per_io_data->buffer << std::endl;

                // 准备发送响应
                per_io_data->operation_type = 2;
                const char* response = "消息已收到";
                strcpy_s(per_io_data->buffer, BUFFER_SIZE, response);
                per_io_data->data_buf.len = strlen(response);
                
                // 异步发送
                DWORD bytes_sent;
                DWORD flags = 0;
                int ret = WSASend(
                    per_handle_data->socket,
                    &per_io_data->data_buf,
                    1,
                    &bytes_sent,
                    flags,
                    &per_io_data->overlapped,
                    NULL
                );

                if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) 
                {
                    std::cerr << "WSASend failed: " << WSAGetLastError() << std::endl;
                    closesocket(per_handle_data->socket);
                    free(per_handle_data);
                    free(per_io_data);
                }
                break;
            }

            case 2:  // 发送完成
            {
                // 准备再次接收数据
                per_io_data->operation_type = 1;
                per_io_data->data_buf.len = BUFFER_SIZE;
                memset(per_io_data->buffer, 0, BUFFER_SIZE);
                
                // 异步接收
                DWORD bytes_recv;
                DWORD flags = 0;
                int ret = WSARecv(
                    per_handle_data->socket,
                    &per_io_data->data_buf,
                    1,
                    &bytes_recv,
                    &flags,
                    &per_io_data->overlapped,
                    NULL
                );

                if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) 
                {
                    std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
                    closesocket(per_handle_data->socket);
                    free(per_handle_data);
                    free(per_io_data);
                }
                break;
            }
        }
    }

    return 0;
}

// 接受连接的函数
void AcceptConnections(SOCKET listen_socket) 
{
    while (true) 
    {
        // 为新连接分配数据结构
        PPER_HANDLE_DATA per_handle_data = (PPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));
        if (!per_handle_data) 
        {
            std::cerr << "内存分配失败" << std::endl;
            continue;
        }

        int addr_len = sizeof(per_handle_data->client_addr);
        per_handle_data->socket = accept(
            listen_socket,
            (sockaddr*)&per_handle_data->client_addr,
            &addr_len
        );

        if (per_handle_data->socket == INVALID_SOCKET) 
        {
            std::cerr << "accept failed: " << WSAGetLastError() << std::endl;
            free(per_handle_data);
            continue;
        }

        std::cout << "新客户端连接" << std::endl;

        // 将socket与完成端口关联
        if (CreateIoCompletionPort(
            (HANDLE)per_handle_data->socket,
            g_hCompletionPort,
            (ULONG_PTR)per_handle_data,
            0
        ) == NULL) 
        {
            std::cerr << "CreateIoCompletionPort failed: " << WSAGetLastError() << std::endl;
            closesocket(per_handle_data->socket);
            free(per_handle_data);
            continue;
        }

        // 分配IO操作数据结构
        PPER_IO_DATA per_io_data = (PPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
        if (!per_io_data) 
        {
            std::cerr << "内存分配失败" << std::endl;
            closesocket(per_handle_data->socket);
            free(per_handle_data);
            continue;
        }

        // 初始化重叠结构
        memset(per_io_data, 0, sizeof(PER_IO_DATA));
        per_io_data->data_buf.len = BUFFER_SIZE;
        per_io_data->data_buf.buf = per_io_data->buffer;
        per_io_data->operation_type = 1;  // 接收操作

        // 开始异步接收
        DWORD bytes_recv;
        DWORD flags = 0;
        int ret = WSARecv(
            per_handle_data->socket,
            &per_io_data->data_buf,
            1,
            &bytes_recv,
            &flags,
            &per_io_data->overlapped,
            NULL
        );

        if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) 
        {
            std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
            closesocket(per_handle_data->socket);
            free(per_handle_data);
            free(per_io_data);
        }
    }
}

int main() 
{
    WSADATA wsa_data;
    SOCKET listen_socket;
    sockaddr_in server_addr;
    HANDLE worker_threads[MAX_THREADS];

    // 初始化Winsock
    if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0) 
    {
        std::cerr << "WSAStartup failed" << std::endl;
        return 1;
    }

    // 创建监听socket
    if ((listen_socket = WSASocket(
        AF_INET,
        SOCK_STREAM,
        0,
        NULL,
        0,
        WSA_FLAG_OVERLAPPED  // 重叠IO
    )) == INVALID_SOCKET) 
    {
        std::cerr << "WSASocket failed: " << WSAGetLastError() << std::endl;
        WSACleanup();
        return 1;
    }

    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    // 绑定
    if (bind(listen_socket, (sockaddr*)&server_addr, sizeof(server_addr)) == SOCKET_ERROR) 
    {
        std::cerr << "bind failed: " << WSAGetLastError() << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    // 监听
    if (listen(listen_socket, MAX_CONNECTIONS) == SOCKET_ERROR) 
    {
        std::cerr << "listen failed: " << WSAGetLastError() << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    std::cout << "IOCP服务器启动,监听端口 " << PORT << std::endl;

    // 创建完成端口
    g_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (g_hCompletionPort == NULL) 
    {
        std::cerr << "CreateIoCompletionPort failed: " << GetLastError() << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    // 创建工作线程
    for (int i = 0; i < MAX_THREADS; i++) 
    {
        worker_threads[i] = (HANDLE)_beginthreadex(
            NULL,
            0,
            WorkerThread,
            NULL,
            0,
            NULL
        );

        if (worker_threads[i] == NULL) 
        {
            std::cerr << "创建工作线程失败" << std::endl;
            // 清理已创建的资源
            for (int j = 0; j < i; j++) 
            {
                CloseHandle(worker_threads[j]);
            }
            CloseHandle(g_hCompletionPort);
            closesocket(listen_socket);
            WSACleanup();
            return 1;
        }
    }

    // 接受连接
    AcceptConnections(listen_socket);

    // 清理资源(实际中需要适当的退出机制)
    for (int i = 0; i < MAX_THREADS; i++) 
    {
        PostQueuedCompletionStatus(g_hCompletionPort, 0, 0, NULL);
    }

    WaitForMultipleObjects(MAX_THREADS, worker_threads, TRUE, INFINITE);

    for (int i = 0; i < MAX_THREADS; i++) 
    {
        CloseHandle(worker_threads[i]);
    }

    CloseHandle(g_hCompletionPort);
    closesocket(listen_socket);
    WSACleanup();

    return 0;
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐