Linux 中常用的 IO 模型
进程发起 read/write 调用后,会一直阻塞直到数据就绪并完成拷贝。会一直阻塞进程,除非开个线程。2、非阻塞 IO(Non-blocking IO)IO 操作立即返回,若无数据则返回 EAGAIN 或 EWOULDBLOCK,需轮询。需要有个循环一直读取数据,很耗费CPU资源。3、O 多路复用(select/poll/epoll)单线程可以监听多个文件描述符,任一就绪时通知进程处理。一个客户
一、Linux 中常用的 IO 模型
1、阻塞 IO(Blocking IO)
进程发起 read/write 调用后,会一直阻塞直到数据就绪并完成拷贝。会一直阻塞进程,除非开个线程。
// 读取/发送数据(阻塞操作)
ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
2、非阻塞 IO(Non-blocking IO)
IO 操作立即返回,若无数据则返回 EAGAIN 或 EWOULDBLOCK,需轮询。需要有个循环一直读取数据,很耗费CPU资源。
//设置文件描述符为非阻塞
int flags = fcntl(fd, F_GETFL, 0);
// 非阻塞读取/写入数据
ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
send(new_socket, response, strlen(response), 0);
3、IO 多路复用(select/poll/epoll)
单线程可以监听多个文件描述符,任一就绪时通知进程处理。一个客户端连接可以看作是一个文件描述符。
3.1 IO 多路复用(select)
select——可以同时监听多个文件描述符,只要有一个文件描述符有消息,就会返回。然后遍历所有的文件描述符,(通过对每一个socket调用FD_ISSET(fd, &readfds),就能知道是那个文件描述符准备就绪了)。
缺点就是需要遍历所有的文件描述符,并且存在最大数量限制(通常由FD_SETSIZE定义,默认 1024)。每次代码中需要手动维护fd_set,每次调用前需重新初始化和设置需要监控的 fd。仅支持水平触发(LT,Level Triggered):当 fd 就绪后,若未被处理,每次调用select都会再次通知该 fd 就绪。
while(true)
{
fd_set readfds;
FD_ZERO(&readfds); // 初始化文件描述符集合
FD_SET(server_fd, &readfds); // 将服务器socket加入集合
for (int sd : client_sockets) // 将客户端socket加入集合
{
FD_SET(sd, &readfds);
}
// 等待活动(超时时间为NULL,表示无限等待)
int activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
// 检查服务器socket是否有新连接
if (FD_ISSET(server_fd, &readfds))
{
new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))
client_sockets.push_back(new_socket);
}
// 检查客户端socket是否有数据
for (auto it = client_sockets.begin(); it != client_sockets.end(); )
{
if (FD_ISSET(sd, &readfds))
{
// 读取数据
ssize_t valread = read(sd, buffer, BUFFER_SIZE);
//当客户端断开连接时,从select移除该客户端
it = fds.erase(fds.begin() + i);
}
++it;
}
}
3.2 IO 多路复用(poll)
poll——和select是相似的,扩展了可以监听的最大描述符个数。使用pollfd结构体数组(struct pollfd { int fd; short events; short revents; })存储文件描述符及其事件,更灵活。无最大数量限制,可监控任意数量的 fd。并且无需每次重新初始化数组。
缺点就是也需要遍历所有的文件描述符(通过fds[i].revents & POLLIN,知道那个socket是触发的返回,再通过fds[i].fd == server_fd判断是是不是服务器,不是就是客户端了),也仅支持水平触发(LT,Level Triggered)。
std::vector<struct pollfd> fds; //pollfd结构体数组
fds.push_back({server_fd, POLLIN, 0}); // 添加服务器socket到pollfd数组
while (true)
{
// 等待活动(超时时间为-1,表示无限等待)
int activity = poll(fds.data(), fds.size(), -1);
// 检查每个文件描述符
for (size_t i = 0; i < fds.size(); ++i)
{
if (fds[i].revents & POLLIN)
{
if (fds[i].fd == server_fd) // 服务器socket有新连接
{
//有新链接就加入
new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))
fds.push_back({new_socket, POLLIN, 0});
}
else // 客户端socket有数据
{
//当客户端断开连接时,从poll移除该客户端
fds.erase(fds.begin() + i);
}
}
}
3.3 IO 多路复用(epoll)
epoll——是linux特有的比较高效的io多路复用机制。基于内核中的红黑树(存储所有需要监控的 fd)和就绪链表(存储就绪的 fd)实现。采用事件驱动方式,用户态通过epoll_wait直接获取就绪的 fd 列表,无需遍历所有监控的 fd(通过events[i].data.fd判断是哪个socket,events[i].events会字段告诉你具体是什么事件如EPOLLIN)。支持水平触发(LT)和边缘触发(ET)两种模式。适用于高并发场景,是高性能服务器的首选方案。
- LT(Level-Triggered,水平触发):
对于读事件 EPOLLIN,只要socket上有未读完的数据EPOLLIN 就会一直触发;对于写事件 EPOLLOUT,只要socket可写EPOLLOUT 就会一直触发。 - 边缘触发(ET):
对于读事件 EPOLLIN,只有socket上的数据从无到有EPOLLIN 才会触发;对于写事件 EPOLLOUT,只有在socket写缓冲区从不可写变为可写,EPOLLOUT才会触发。(需配合非阻塞 IO 一次性读完)
// epoll事件结构体
struct epoll_event ev, events[MAX_EVENTS];
//将服务器添加到,epoll事件驱动中
ev.events = EPOLLIN;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1)
{
}
while (true)
{
// 等待事件发生
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1)
{
perror("epoll_wait");
continue;
}
// 处理就绪事件
for (int i = 0; i < nfds; ++i)
{
// 服务器socket有新连接
if (events[i].data.fd == server_fd)
{
new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))
//设置客户端为非阻塞
setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))
// 添加客户端socket到epoll
ev.events = EPOLLIN | EPOLLET; // 使用边缘触发模式
ev.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1)
{
}
}
else // 客户端socket有数据
{
//当客户端断开连接时,从epoll移除该客户端
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
}
}
3.4 select、poll、epoll差异
| 机制 | select | poll | epoll |
|---|---|---|---|
| 最大 fd 限制 | 默认由FD_SETSIZE限制(通常 1024) | 无,仅受系统内存和进程 fd 上限约束 | 无,仅受系统内存和进程 fd 上限约束 |
| 内核态复制开销 | 每次调用select时,需将全部监控 fd 从用户态复制到内核态(O (n) 开销) | 每次调用poll时,需将全部pollfd数组从用户态复制到内核态(O (n) 开销) | 仅在首次通过epoll_ctl(EPOLL_CTL_ADD)添加 fd 时复制,后续修改 / 查询无需重复复制(O (1) 开销) |
| 就绪 fd 遍历开销 | 内核仅返回 “是否有 fd 就绪”,用户态需遍历全部监控 fd 判断就绪状态(O (n),n 为总监控 fd 数) | 同 select,用户态需遍历全部pollfd数组检查revents字段(O (n)) | 内核直接将就绪 fd 放入 “就绪链表”,用户态通过epoll_wait直接获取就绪列表,无需遍历总 fd(O (k),k 为就绪 fd 数) |
| 触发模式 | 仅支持水平触发(LT,Level Triggered) | 仅支持水平触发(LT) | 支持水平触发(LT,默认)和边缘触发(ET,Edge Triggered) |
| 底层数据结构 | 基于位图(fd_set)实现 | 基于动态数组(struct pollfd)实现 | 基于红黑树(管理总监控 fd)+ 就绪链表(存储就绪 fd)实现 |
| 编程复杂度 | 较低,但需手动维护fd_set(每次调用前需重置) | 中等,pollfd数组可复用,但仍需遍历检查 | 较高(需理解epoll_ctl的添加 / 修改 / 删除逻辑,ET 模式需配合非阻塞 IO) |
| 典型应用场景 | 低并发、fd 数量固定且较少的场景(如简单工具、小规模客户端) | 中等并发、fd 数量适中的场景(如中小型服务) | 高并发、fd 数量庞大的场景(如 Web 服务器 Nginx、缓存 Redis、消息队列 Kafka) |
| 性能瓶颈 | fd 数量超过 1024 后无法使用,且遍历开销随 fd 数增长急剧上升 | 遍历开销随 fd 数增长线性上升,高并发下性能骤降 | 无明显瓶颈,性能随 fd 数增长基本稳定 |
4、信号驱动 IO(Signal-driven IO)
通过 SIGIO 信号异步通知进程数据已就绪,进程再发起读取。用的很少,而且实现相对复杂信号队列可能溢出,不适合高并发场景,了解名字即可。
5、异步 IO(Asynchronous IO)
POSIX AIO(Linux 原生):aio_read / aio_write 立即返回,内核完成所有操作后通知进程。它的底层是基于线程池模拟异步,并非真正的内核级异步,且对网络 IO 的支持不完整,性能不佳,使用复杂,了解即可。
6、异步 IO(io_uring)
真正的异步 IO(io_uring)Linux 5.1+ 引入,性能远超 epoll,支持真正的异步读写(包括网络)是未来高性能 IO 的方向
// 存储客户端连接信息
struct Connection
{
int fd;
char buffer[BUFFER_SIZE];
sockaddr_in addr;
socklen_t addr_len;
};
// 自定义IO操作数据
struct IOUringData
{
enum Type { ACCEPT, READ, WRITE } type;
Connection *conn;
};
// 初始化io_uring
struct io_uring ring;
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0)
{
printf("io_uring_queue_init failed");
}
// 存储连接的数组
std::vector<Connection*> connections;
// 准备接受连接的请求
auto *accept_conn = new Connection;
accept_conn->fd = server_fd;
accept_conn->addr_len = sizeof(accept_conn->addr);
auto *accept_data = new IOUringData;
accept_data->type = IOUringData::ACCEPT;
accept_data->conn = accept_conn;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_fd,
(struct sockaddr*)&accept_conn->addr,
&accept_conn->addr_len, 0);
io_uring_sqe_set_data(sqe, accept_data);
io_uring_submit(&ring);
while (true)
{
// 等待IO事件完成
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0)
{
perror("io_uring_wait_cqe failed");
break;
}
// 处理完成的事件
IOUringData *data = (IOUringData*)io_uring_cqe_get_data(cqe);
int res = cqe->res;
if (res < 0)
{
std::cerr << "操作失败: " << strerror(-res) << std::endl;
}
else
{
switch (data->type)
{
case IOUringData::ACCEPT:
{
}
case IOUringData::WRITE:
{
auto *read_data = new IOUringData;
read_data->type = IOUringData::READ;
read_data->conn = data->conn;
struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
io_uring_prep_recv(read_sqe, data->conn->fd,
data->conn->buffer, BUFFER_SIZE - 1, 0);
io_uring_sqe_set_data(read_sqe, read_data);
io_uring_submit(&ring);
delete data;
break;
}
}
}
// 标记CQE为已处理
io_uring_cqe_seen(&ring, cqe);
}
// 清理资源
for (auto conn : connections)
{
close(conn->fd);
delete conn;
}
io_uring_queue_exit(&ring);
7、异步 IO(iocp)window上的异步IO
真正的异步 IO,所有 IO 操作都是异步的,不会阻塞主线程。通过完成端口队列通知 IO 操作完成。说个冷知识,iocp是windows 1993发布的,epoll是2002年,io_uring是2019年。
//创建异步io
m_privater->m_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
//工作线程
void EventWorkerThread()
{
while (m_privater->m_running)
{
if (!GetQueuedCompletionStatus(m_privater->m_iocp, &lpNumberOfBytesTransferred, (PULONG_PTR) &lpCompletionKey,(LPOVERLAPPED *) &ioContext, INFINITE))
{
}
switch (ioContext->type)
{
case IOType::Read:
{
}
case IOType::Write:
{
}
}
}
//接受函数
void AcceptLoop()
{
while (m_privater->m_running)
{
SOCKET clientSocket = accept(m_privater->m_listenSocket, NULL, NULL);
// 将接入的套接字也设置为非阻塞的
unsigned long ul = 1;
ioctlsocket(clientSocket, FIONBIO, &ul)
//将客户端绑定到iocp
CreateIoCompletionPort((HANDLE)clientSocket, m_privater->m_iocp, 0, 0)
auto ioContext = new IOContext;
ioContext->socket = clientSocket;
ioContext->type = IOType::Read;
// 提交一次异步接受事件,这样只要客户端有数据接受,会在工作线程获取到队列
auto rt = WSARecv(clientSocket, &ioContext->wsaBuf, 1, &nBytes, &dwFlags, &ioContext->overlapped, nullptr);
}
二、补充
1、epoll、io_uring、iocp差异
epoll、io_uring、iocp三种模型是分别是,linux和windows系统中很流行的io模型。基本都支持数十万,甚至百万连接。io_uring、iocp和epoll最大的区别是,上下文切换较少,数据由内核拷贝,效率极高,拥有极致的性能体验。
| 对比项 | epoll | IOCP | io_uring |
|---|---|---|---|
| 模型 | Reactor | Proactor | Proactor |
| 异步程度 | 半异步(就绪通知) | 全异步(完成通知) | 全异步(完成通知) |
| 数据拷贝者 | 应用线程 | 内核 | 内核 |
| 线程模型 | 用户管理 | 内核线程池 | 用户管理 |
| 跨平台 | Linux | Windows | Linux |
2、示例代码
1. 阻塞 IO(Blocking IO)
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#define PORT 8080
#define BUFFER_SIZE 1024
int main()
{
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// 创建socket文件描述符
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定socket到端口
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听连接(最大等待队列长度为3)
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
std::cout << "阻塞IO服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 接受客户端连接(阻塞操作)
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("accept");
continue;
}
std::cout << "新客户端连接" << std::endl;
// 读取数据(阻塞操作)
ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
if (valread < 0)
{
perror("read failed");
close(new_socket);
continue;
}
std::cout << "收到数据: " << buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(new_socket, response, strlen(response), 0);
std::cout << "响应已发送" << std::endl;
close(new_socket);
}
return 0;
}
2. 非阻塞 IO(Non-blocking IO)
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#define PORT 8080
#define BUFFER_SIZE 1024
// 设置文件描述符为非阻塞模式
int set_non_blocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
{
return -1;
}
return 0;
}
int main()
{
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
// 设置服务器socket为非阻塞
if (set_non_blocking(server_fd) == -1)
{
perror("set non-blocking failed");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
std::cout << "非阻塞IO服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 非阻塞接受连接
new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
if (new_socket < 0)
{
// EAGAIN表示当前没有连接,不是真正的错误
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
perror("accept error");
}
// 短暂休眠,避免CPU占用过高
usleep(1000);
continue;
}
std::cout << "新客户端连接" << std::endl;
// 设置客户端socket为非阻塞
if (set_non_blocking(new_socket) == -1)
{
perror("set client socket non-blocking failed");
close(new_socket);
continue;
}
// 非阻塞读取数据
ssize_t valread = read(new_socket, buffer, BUFFER_SIZE);
if (valread < 0)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
perror("read error");
close(new_socket);
}
continue;
}
else if (valread == 0)
{
// 客户端关闭连接
std::cout << "客户端断开连接" << std::endl;
close(new_socket);
continue;
}
std::cout << "收到数据: " << buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(new_socket, response, strlen(response), 0);
std::cout << "响应已发送" << std::endl;
close(new_socket);
}
return 0;
}
3. IO 多路复用 - select
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/select.h>
#include <vector>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 10
int main()
{
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// 客户端socket集合
std::vector<int> client_sockets;
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
std::cout << "select服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 初始化文件描述符集合
fd_set readfds;
FD_ZERO(&readfds);
// 将服务器socket加入集合
FD_SET(server_fd, &readfds);
int max_sd = server_fd;
// 将客户端socket加入集合
for (int sd : client_sockets)
{
FD_SET(sd, &readfds);
if (sd > max_sd)
{
max_sd = sd;
}
}
// 等待活动(超时时间为NULL,表示无限等待)
int activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
if ((activity < 0) && (errno != EINTR))
{
perror("select error");
}
// 检查服务器socket是否有新连接
if (FD_ISSET(server_fd, &readfds))
{
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("accept");
continue;
}
std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;
// 将新客户端加入集合
if (client_sockets.size() < MAX_CLIENTS)
{
client_sockets.push_back(new_socket);
std::cout << "客户端数量: " << client_sockets.size() << std::endl;
}
else
{
const char *msg = "服务器忙,请稍后再试";
send(new_socket, msg, strlen(msg), 0);
close(new_socket);
std::cout << "服务器已满,拒绝连接" << std::endl;
}
}
// 检查客户端socket是否有数据
for (auto it = client_sockets.begin(); it != client_sockets.end(); )
{
int sd = *it;
if (FD_ISSET(sd, &readfds))
{
// 读取数据
ssize_t valread = read(sd, buffer, BUFFER_SIZE);
if (valread == 0)
{
// 客户端断开连接
getpeername(sd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
std::cout << "客户端断开连接,socket fd: " << sd << std::endl;
close(sd);
it = client_sockets.erase(it);
}
else if (valread > 0)
{
// 处理数据
buffer[valread] = '\0';
std::cout << "收到数据 (fd: " << sd << "): " << buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(sd, response, strlen(response), 0);
}
else
{
// 读取错误
perror("read error");
close(sd);
it = client_sockets.erase(it);
}
}
else
{
++it;
}
}
}
return 0;
}
4. IO 多路复用 - poll
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <poll.h>
#include <vector>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 10
int main()
{
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// pollfd结构数组
std::vector<struct pollfd> fds;
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
// 添加服务器socket到pollfd数组
fds.push_back({server_fd, POLLIN, 0});
std::cout << "poll服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 等待活动(超时时间为-1,表示无限等待)
int activity = poll(fds.data(), fds.size(), -1);
if (activity < 0)
{
perror("poll error");
continue;
}
// 检查每个文件描述符
for (size_t i = 0; i < fds.size(); ++i)
{
if (fds[i].revents & POLLIN)
{
// 服务器socket有新连接
if (fds[i].fd == server_fd)
{
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("accept");
continue;
}
std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;
// 添加新客户端到pollfd数组
if (fds.size() < MAX_CLIENTS + 1) // +1 是因为包含server_fd
{
fds.push_back({new_socket, POLLIN, 0});
std::cout << "客户端数量: " << fds.size() - 1 << std::endl;
}
else
{
const char *msg = "服务器忙,请稍后再试";
send(new_socket, msg, strlen(msg), 0);
close(new_socket);
std::cout << "服务器已满,拒绝连接" << std::endl;
}
}
// 客户端socket有数据
else
{
ssize_t valread = read(fds[i].fd, buffer, BUFFER_SIZE);
if (valread == 0)
{
// 客户端断开连接
getpeername(fds[i].fd, (struct sockaddr*)&address, (socklen_t*)&addrlen);
std::cout << "客户端断开连接,socket fd: " << fds[i].fd << std::endl;
close(fds[i].fd);
// 从数组中移除
fds.erase(fds.begin() + i);
i--; // 调整索引,因为删除了一个元素
}
else if (valread > 0)
{
// 处理数据
buffer[valread] = '\0';
std::cout << "收到数据 (fd: " << fds[i].fd << "): " << buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(fds[i].fd, response, strlen(response), 0);
}
else
{
// 读取错误
perror("read error");
close(fds[i].fd);
fds.erase(fds.begin() + i);
i--;
}
}
}
}
}
return 0;
}
5. IO 多路复用 - epoll
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <vector>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 10
int main()
{
int server_fd, new_socket, epoll_fd;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[BUFFER_SIZE] = {0};
// epoll事件结构体
struct epoll_event ev, events[MAX_EVENTS];
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
// 创建epoll实例
if ((epoll_fd = epoll_create1(0)) == -1)
{
perror("epoll_create1");
exit(EXIT_FAILURE);
}
// 添加服务器socket到epoll
ev.events = EPOLLIN;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1)
{
perror("epoll_ctl: server_fd");
exit(EXIT_FAILURE);
}
std::cout << "epoll服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 等待事件发生
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1)
{
perror("epoll_wait");
continue;
}
// 处理就绪事件
for (int i = 0; i < nfds; ++i)
{
// 服务器socket有新连接
if (events[i].data.fd == server_fd)
{
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("accept");
continue;
}
std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;
// 设置客户端socket为非阻塞(可选,但通常与epoll配合使用)
if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt for client");
close(new_socket);
continue;
}
// 添加客户端socket到epoll
ev.events = EPOLLIN | EPOLLET; // 使用边缘触发模式
ev.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &ev) == -1)
{
perror("epoll_ctl: new_socket");
close(new_socket);
continue;
}
}
// 客户端socket有数据
else
{
ssize_t valread = read(events[i].data.fd, buffer, BUFFER_SIZE);
if (valread == 0)
{
// 客户端断开连接
std::cout << "客户端断开连接,socket fd: " << events[i].data.fd << std::endl;
// 从epoll中移除并关闭socket
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
close(events[i].data.fd);
}
else if (valread > 0)
{
// 处理数据
buffer[valread] = '\0';
std::cout << "收到数据 (fd: " << events[i].data.fd << "): " << buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(events[i].data.fd, response, strlen(response), 0);
}
else
{
// 读取错误
perror("read error");
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
close(events[i].data.fd);
}
}
}
}
close(epoll_fd);
return 0;
}
6. 信号驱动 IO(Signal-driven IO)
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <errno.h>
#include <vector>
#define PORT 8080
#define BUFFER_SIZE 1024
// 全局变量存储客户端socket
std::vector<int> client_sockets;
char buffer[BUFFER_SIZE];
// 信号处理函数
void sigio_handler(int signo)
{
if (signo != SIGIO)
{
return;
}
// 遍历客户端socket检查事件
for (auto it = client_sockets.begin(); it != client_sockets.end(); )
{
int fd = *it;
ssize_t n = recv(fd, buffer, BUFFER_SIZE - 1, MSG_DONTWAIT);
if (n > 0)
{
buffer[n] = '\0';
std::cout << "收到数据 (fd: " << fd << "): " << buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(fd, response, strlen(response), 0);
++it;
}
else if (n == 0 || (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK))
{
// 客户端断开连接或发生错误
std::cout << "客户端断开连接,socket fd: " << fd << std::endl;
close(fd);
it = client_sockets.erase(it);
}
else
{
++it;
}
}
}
int main()
{
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
struct sigaction sa;
// 设置信号处理函数
sa.sa_handler = sigio_handler;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
if (sigaction(SIGIO, &sa, NULL) == -1)
{
perror("sigaction");
exit(EXIT_FAILURE);
}
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
// 设置进程为socket的所有者,以便接收SIGIO信号
if (fcntl(server_fd, F_SETOWN, getpid()) == -1)
{
perror("fcntl F_SETOWN");
exit(EXIT_FAILURE);
}
std::cout << "信号驱动IO服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 接受客户端连接(阻塞)
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("accept");
continue;
}
std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;
// 设置客户端socket为非阻塞
int flags = fcntl(new_socket, F_GETFL, 0);
if (fcntl(new_socket, F_SETFL, flags | O_NONBLOCK) == -1)
{
perror("fcntl F_SETFL O_NONBLOCK");
close(new_socket);
continue;
}
// 设置客户端socket的所有者,并启用信号驱动IO
if (fcntl(new_socket, F_SETOWN, getpid()) == -1)
{
perror("fcntl F_SETOWN for client");
close(new_socket);
continue;
}
flags = fcntl(new_socket, F_GETFL, 0);
if (fcntl(new_socket, F_SETFL, flags | O_ASYNC) == -1)
{
perror("fcntl F_SETFL O_ASYNC");
close(new_socket);
continue;
}
// 添加到客户端列表
client_sockets.push_back(new_socket);
}
return 0;
}
7. 异步 IO(Asynchronous IO)
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <aio.h>
#include <signal.h>
#include <fcntl.h>
#include <vector>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_PENDING 10
// 异步IO控制块结构
struct AsyncIOData
{
int fd;
struct aiocb aio;
char buffer[BUFFER_SIZE];
};
// 存储所有异步IO请求
std::vector<AsyncIOData*> async_requests;
// 信号处理函数 - 处理异步IO完成事件
void aio_completion_handler(int signo, siginfo_t *info, void *context)
{
if (signo != SIGIO || info->si_code != SI_ASYNCIO)
{
return;
}
// 获取完成的异步IO控制块
struct aiocb *aio = (struct aiocb*)info->si_value.sival_ptr;
if (!aio)
{
return;
}
// 查找对应的AsyncIOData
AsyncIOData* data = nullptr;
for (auto req : async_requests)
{
if (&req->aio == aio)
{
data = req;
break;
}
}
if (!data)
{
return;
}
// 检查IO操作结果
int ret = aio_return(aio);
if (ret > 0)
{
// 读取到数据
data->buffer[ret] = '\0';
std::cout << "收到数据 (fd: " << data->fd << "): " << data->buffer << std::endl;
// 发送响应
const char *response = "消息已收到";
send(data->fd, response, strlen(response), 0);
// 重新发起异步读取
memset(&data->aio, 0, sizeof(struct aiocb));
data->aio.aio_fildes = data->fd;
data->aio.aio_buf = data->buffer;
data->aio.aio_nbytes = BUFFER_SIZE - 1;
data->aio.aio_offset = 0;
data->aio.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
data->aio.aio_sigevent.sigev_signo = SIGIO;
data->aio.aio_sigevent.sigev_value.sival_ptr = &data->aio;
if (aio_read(&data->aio) == -1)
{
perror("aio_read failed");
close(data->fd);
// 从列表中移除
for (auto it = async_requests.begin(); it != async_requests.end(); ++it)
{
if (*it == data)
{
delete *it;
async_requests.erase(it);
break;
}
}
}
}
else if (ret == 0)
{
// 客户端断开连接
std::cout << "客户端断开连接,socket fd: " << data->fd << std::endl;
close(data->fd);
// 从列表中移除
for (auto it = async_requests.begin(); it != async_requests.end(); ++it)
{
if (*it == data)
{
delete *it;
async_requests.erase(it);
break;
}
}
}
else
{
// 发生错误
perror("aio error");
close(data->fd);
// 从列表中移除
for (auto it = async_requests.begin(); it != async_requests.end(); ++it)
{
if (*it == data)
{
delete *it;
async_requests.erase(it);
break;
}
}
}
}
int main()
{
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
struct sigaction sa;
// 设置信号处理函数
sa.sa_sigaction = aio_completion_handler;
sa.sa_flags = SA_SIGINFO;
sigemptyset(&sa.sa_mask);
if (sigaction(SIGIO, &sa, NULL) == -1)
{
perror("sigaction");
exit(EXIT_FAILURE);
}
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
std::cout << "异步IO服务器启动,监听端口 " << PORT << std::endl;
while (true)
{
// 接受客户端连接(阻塞)
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("accept");
continue;
}
std::cout << "新客户端连接,socket fd: " << new_socket << std::endl;
// 创建异步IO数据结构
AsyncIOData *data = new AsyncIOData;
data->fd = new_socket;
memset(&data->aio, 0, sizeof(struct aiocb));
// 设置异步IO参数
data->aio.aio_fildes = new_socket;
data->aio.aio_buf = data->buffer;
data->aio.aio_nbytes = BUFFER_SIZE - 1;
data->aio.aio_offset = 0;
data->aio.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
data->aio.aio_sigevent.sigev_signo = SIGIO;
data->aio.aio_sigevent.sigev_value.sival_ptr = &data->aio;
// 发起异步读取
if (aio_read(&data->aio) == -1)
{
perror("aio_read failed");
close(new_socket);
delete data;
continue;
}
// 添加到请求列表
async_requests.push_back(data);
// 限制最大并发请求数
if (async_requests.size() > MAX_PENDING)
{
AsyncIOData* oldest = async_requests.front();
aio_cancel(oldest->fd, &oldest->aio);
close(oldest->fd);
delete oldest;
async_requests.erase(async_requests.begin());
std::cout << "达到最大连接数,关闭最早的连接" << std::endl;
}
}
// 清理资源
for (auto req : async_requests)
{
close(req->fd);
delete req;
}
close(server_fd);
return 0;
}
8. 异步 IO (io_uring)
#include <iostream>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <liburing.h>
#include <vector>
#include <errno.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CONNECTIONS 10
#define QUEUE_DEPTH 32
// 存储客户端连接信息
struct Connection
{
int fd;
char buffer[BUFFER_SIZE];
sockaddr_in addr;
socklen_t addr_len;
};
// 自定义IO操作数据
struct IOUringData
{
enum Type { ACCEPT, READ, WRITE } type;
Connection *conn;
};
int main()
{
int server_fd;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
// 初始化io_uring
struct io_uring ring;
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0)
{
perror("io_uring_queue_init failed");
exit(EXIT_FAILURE);
}
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, MAX_CONNECTIONS) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
std::cout << "io_uring服务器启动,监听端口 " << PORT << std::endl;
// 存储连接的数组
std::vector<Connection*> connections;
// 准备接受连接的请求
auto *accept_conn = new Connection;
accept_conn->fd = server_fd;
accept_conn->addr_len = sizeof(accept_conn->addr);
auto *accept_data = new IOUringData;
accept_data->type = IOUringData::ACCEPT;
accept_data->conn = accept_conn;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_fd,
(struct sockaddr*)&accept_conn->addr,
&accept_conn->addr_len, 0);
io_uring_sqe_set_data(sqe, accept_data);
io_uring_submit(&ring);
while (true)
{
// 等待IO事件完成
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0)
{
perror("io_uring_wait_cqe failed");
break;
}
// 处理完成的事件
IOUringData *data = (IOUringData*)io_uring_cqe_get_data(cqe);
int res = cqe->res;
if (res < 0)
{
std::cerr << "操作失败: " << strerror(-res) << std::endl;
}
else
{
switch (data->type)
{
case IOUringData::ACCEPT:
{
// 处理新连接
int client_fd = res;
std::cout << "新客户端连接,socket fd: " << client_fd << std::endl;
// 创建新连接对象
auto *new_conn = new Connection;
new_conn->fd = client_fd;
new_conn->addr = data->conn->addr;
new_conn->addr_len = data->conn->addr_len;
connections.push_back(new_conn);
// 准备读取数据
auto *read_data = new IOUringData;
read_data->type = IOUringData::READ;
read_data->conn = new_conn;
struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
io_uring_prep_recv(read_sqe, client_fd,
new_conn->buffer, BUFFER_SIZE - 1, 0);
io_uring_sqe_set_data(read_sqe, read_data);
// 重新提交accept请求以接受新连接
struct io_uring_sqe *accept_sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(accept_sqe, server_fd,
(struct sockaddr*)&data->conn->addr,
&data->conn->addr_len, 0);
io_uring_sqe_set_data(accept_sqe, data);
io_uring_submit(&ring);
break;
}
case IOUringData::READ:
{
// 处理读取的数据
ssize_t bytes_read = res;
if (bytes_read == 0)
{
// 客户端断开连接
std::cout << "客户端断开连接,socket fd: " << data->conn->fd << std::endl;
close(data->conn->fd);
// 从连接列表中移除
for (auto it = connections.begin(); it != connections.end(); ++it)
{
if (*it == data->conn)
{
connections.erase(it);
break;
}
}
delete data->conn;
delete data;
}
else
{
data->conn->buffer[bytes_read] = '\0';
std::cout << "收到数据 (fd: " << data->conn->fd << "): " << data->conn->buffer << std::endl;
// 准备发送响应
auto *write_data = new IOUringData;
write_data->type = IOUringData::WRITE;
write_data->conn = data->conn;
const char *response = "消息已收到";
struct io_uring_sqe *write_sqe = io_uring_get_sqe(&ring);
io_uring_prep_send(write_sqe, data->conn->fd,
response, strlen(response), 0);
io_uring_sqe_set_data(write_sqe, write_data);
io_uring_submit(&ring);
delete data;
}
break;
}
case IOUringData::WRITE:
{
// 响应发送完成,准备再次读取
auto *read_data = new IOUringData;
read_data->type = IOUringData::READ;
read_data->conn = data->conn;
struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring);
io_uring_prep_recv(read_sqe, data->conn->fd,
data->conn->buffer, BUFFER_SIZE - 1, 0);
io_uring_sqe_set_data(read_sqe, read_data);
io_uring_submit(&ring);
delete data;
break;
}
}
}
// 标记CQE为已处理
io_uring_cqe_seen(&ring, cqe);
}
// 清理资源
for (auto conn : connections)
{
close(conn->fd);
delete conn;
}
io_uring_queue_exit(&ring);
close(server_fd);
return 0;
}
9. 异步 IO (iocp)
#include <iostream>
#include <winsock2.h>
#include <mswsock.h>
#include <vector>
#include <process.h>
#pragma comment(lib, "ws2_32.lib")
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_THREADS 4
#define MAX_CONNECTIONS 100
// 重叠IO结构
typedef struct _PER_IO_DATA
{
OVERLAPPED overlapped;
WSABUF data_buf;
char buffer[BUFFER_SIZE];
int operation_type; // 1: 接收, 2: 发送
} PER_IO_DATA, *PPER_IO_DATA;
// 每个客户端连接的数据
typedef struct _PER_HANDLE_DATA
{
SOCKET socket;
sockaddr_in client_addr;
} PER_HANDLE_DATA, *PPER_HANDLE_DATA;
// 全局IOCP句柄
HANDLE g_hCompletionPort;
// 工作线程函数
unsigned int __stdcall WorkerThread(LPVOID lpParam)
{
DWORD bytes_transferred;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
PPER_HANDLE_DATA per_handle_data;
PPER_IO_DATA per_io_data;
while (true)
{
// 等待IO完成
BOOL success = GetQueuedCompletionStatus(
g_hCompletionPort,
&bytes_transferred,
&completion_key,
&overlapped,
INFINITE
);
if (!success)
{
std::cerr << "GetQueuedCompletionStatus failed: " << WSAGetLastError() << std::endl;
continue;
}
// 无效的完成键,退出线程
if (completion_key == 0)
{
break;
}
per_handle_data = (PPER_HANDLE_DATA)completion_key;
per_io_data = (PPER_IO_DATA)overlapped;
// 客户端断开连接
if (bytes_transferred == 0 &&
(per_io_data->operation_type == 1 || per_io_data->operation_type == 2))
{
std::cout << "客户端断开连接" << std::endl;
// 关闭socket
closesocket(per_handle_data->socket);
free(per_handle_data);
free(per_io_data);
continue;
}
switch (per_io_data->operation_type)
{
case 1: // 接收完成
{
std::cout << "收到数据: " << per_io_data->buffer << std::endl;
// 准备发送响应
per_io_data->operation_type = 2;
const char* response = "消息已收到";
strcpy_s(per_io_data->buffer, BUFFER_SIZE, response);
per_io_data->data_buf.len = strlen(response);
// 异步发送
DWORD bytes_sent;
DWORD flags = 0;
int ret = WSASend(
per_handle_data->socket,
&per_io_data->data_buf,
1,
&bytes_sent,
flags,
&per_io_data->overlapped,
NULL
);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
std::cerr << "WSASend failed: " << WSAGetLastError() << std::endl;
closesocket(per_handle_data->socket);
free(per_handle_data);
free(per_io_data);
}
break;
}
case 2: // 发送完成
{
// 准备再次接收数据
per_io_data->operation_type = 1;
per_io_data->data_buf.len = BUFFER_SIZE;
memset(per_io_data->buffer, 0, BUFFER_SIZE);
// 异步接收
DWORD bytes_recv;
DWORD flags = 0;
int ret = WSARecv(
per_handle_data->socket,
&per_io_data->data_buf,
1,
&bytes_recv,
&flags,
&per_io_data->overlapped,
NULL
);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
closesocket(per_handle_data->socket);
free(per_handle_data);
free(per_io_data);
}
break;
}
}
}
return 0;
}
// 接受连接的函数
void AcceptConnections(SOCKET listen_socket)
{
while (true)
{
// 为新连接分配数据结构
PPER_HANDLE_DATA per_handle_data = (PPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));
if (!per_handle_data)
{
std::cerr << "内存分配失败" << std::endl;
continue;
}
int addr_len = sizeof(per_handle_data->client_addr);
per_handle_data->socket = accept(
listen_socket,
(sockaddr*)&per_handle_data->client_addr,
&addr_len
);
if (per_handle_data->socket == INVALID_SOCKET)
{
std::cerr << "accept failed: " << WSAGetLastError() << std::endl;
free(per_handle_data);
continue;
}
std::cout << "新客户端连接" << std::endl;
// 将socket与完成端口关联
if (CreateIoCompletionPort(
(HANDLE)per_handle_data->socket,
g_hCompletionPort,
(ULONG_PTR)per_handle_data,
0
) == NULL)
{
std::cerr << "CreateIoCompletionPort failed: " << WSAGetLastError() << std::endl;
closesocket(per_handle_data->socket);
free(per_handle_data);
continue;
}
// 分配IO操作数据结构
PPER_IO_DATA per_io_data = (PPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
if (!per_io_data)
{
std::cerr << "内存分配失败" << std::endl;
closesocket(per_handle_data->socket);
free(per_handle_data);
continue;
}
// 初始化重叠结构
memset(per_io_data, 0, sizeof(PER_IO_DATA));
per_io_data->data_buf.len = BUFFER_SIZE;
per_io_data->data_buf.buf = per_io_data->buffer;
per_io_data->operation_type = 1; // 接收操作
// 开始异步接收
DWORD bytes_recv;
DWORD flags = 0;
int ret = WSARecv(
per_handle_data->socket,
&per_io_data->data_buf,
1,
&bytes_recv,
&flags,
&per_io_data->overlapped,
NULL
);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
std::cerr << "WSARecv failed: " << WSAGetLastError() << std::endl;
closesocket(per_handle_data->socket);
free(per_handle_data);
free(per_io_data);
}
}
}
int main()
{
WSADATA wsa_data;
SOCKET listen_socket;
sockaddr_in server_addr;
HANDLE worker_threads[MAX_THREADS];
// 初始化Winsock
if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0)
{
std::cerr << "WSAStartup failed" << std::endl;
return 1;
}
// 创建监听socket
if ((listen_socket = WSASocket(
AF_INET,
SOCK_STREAM,
0,
NULL,
0,
WSA_FLAG_OVERLAPPED // 重叠IO
)) == INVALID_SOCKET)
{
std::cerr << "WSASocket failed: " << WSAGetLastError() << std::endl;
WSACleanup();
return 1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
// 绑定
if (bind(listen_socket, (sockaddr*)&server_addr, sizeof(server_addr)) == SOCKET_ERROR)
{
std::cerr << "bind failed: " << WSAGetLastError() << std::endl;
closesocket(listen_socket);
WSACleanup();
return 1;
}
// 监听
if (listen(listen_socket, MAX_CONNECTIONS) == SOCKET_ERROR)
{
std::cerr << "listen failed: " << WSAGetLastError() << std::endl;
closesocket(listen_socket);
WSACleanup();
return 1;
}
std::cout << "IOCP服务器启动,监听端口 " << PORT << std::endl;
// 创建完成端口
g_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (g_hCompletionPort == NULL)
{
std::cerr << "CreateIoCompletionPort failed: " << GetLastError() << std::endl;
closesocket(listen_socket);
WSACleanup();
return 1;
}
// 创建工作线程
for (int i = 0; i < MAX_THREADS; i++)
{
worker_threads[i] = (HANDLE)_beginthreadex(
NULL,
0,
WorkerThread,
NULL,
0,
NULL
);
if (worker_threads[i] == NULL)
{
std::cerr << "创建工作线程失败" << std::endl;
// 清理已创建的资源
for (int j = 0; j < i; j++)
{
CloseHandle(worker_threads[j]);
}
CloseHandle(g_hCompletionPort);
closesocket(listen_socket);
WSACleanup();
return 1;
}
}
// 接受连接
AcceptConnections(listen_socket);
// 清理资源(实际中需要适当的退出机制)
for (int i = 0; i < MAX_THREADS; i++)
{
PostQueuedCompletionStatus(g_hCompletionPort, 0, 0, NULL);
}
WaitForMultipleObjects(MAX_THREADS, worker_threads, TRUE, INFINITE);
for (int i = 0; i < MAX_THREADS; i++)
{
CloseHandle(worker_threads[i]);
}
CloseHandle(g_hCompletionPort);
closesocket(listen_socket);
WSACleanup();
return 0;
}
更多推荐


所有评论(0)