reactor的解析与实现【Linux C epoll】
,recv(fd…) 这些函数,关注的是fd对应的IO(或者连接)是否可读或者可写,这样很难做进一步的开发。所以我们需要epoll去代替我们关注什么时候这些IO可读可写,我们要做的就只是往里面传数据。reactor 是一种事件驱动的设计模式,通过一个 / 多个 “反应器”(Reactor)监听 IO 事件,触发对应的回调(callback),从而实现了IO 操作与业务逻辑的异步分离,大大提高开发效
文章目录
概要
reactor 是一种事件驱动的设计模式,通过一个 / 多个 “反应器”(Reactor)监听 IO 事件,触发对应的回调(callback),从而实现了IO 操作与业务逻辑的异步分离,大大提高开发效率。
在Posix API层面的网络编程,比如accept(fd…),recv(fd…),send(…) 这些函数,关注的是fd对应的IO(或者连接)是否可读或者可写,这样很难做进一步的开发。所以我们需要epoll去代替我们关注什么时候这些IO可读可写,我们要做的就只是往里面传数据。所以就有了基于epoll开发的reactor。
reactor.h
首先我们要明确两件事
- epoll 在检测到有IO就绪时会且只会告诉我们这个IO就绪对应的fd,并告知我们就绪的事件类型,也即EPOLLIN或EPOLLOUT,或两者都有
- reactor需要能够封装fd的事件信息,比如这个fd在可读时执行哪个回调,读出来的信息,将要写入的信。比如我们业务层想要写入哪些信息,就要写到这个buffer里,业务层想要读取哪些信息,就要从这个buffer里读。
--------struct conn
在reactor.c里面需要这么一个结构体列表帮我们管理fd,这是与业务层交互的关键。
struct conn conn_list[CONNECTION_SIZE] = {0};
#define BUFFER_LENGTH 1024
typedef int (*RCALLBACK)(int fd);
struct conn {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
union {
RCALLBACK recv_callback;
RCALLBACK accept_callback;
} r_action;
};
--------reactor.h源码
#ifndef __REACTOR_H__
#define __REACTOR_H__
#define BUFFER_LENGTH 1024
typedef int (*RCALLBACK)(int fd);
struct conn {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
union {
RCALLBACK recv_callback;
RCALLBACK accept_callback;
} r_action;
};
#endif
reactor.c
--------初始化监听端口
明确一个监听的端口号,服务端的话本机ip地址会自动选择一个可用的。可多开一些端口避免客户端在大量连接时,客户端的本地端口耗尽。
- 可用端口号范围为1024-65536,前1024个为系统所保留
- 这么做增加了网络通信中五元组的可用范围
int main() {
unsigned short port = 2000;
for (i = 0;i < MAX_PORTS;i ++) {
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
}
}
把创建监听端口的过程抽象成init_server
int init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
servaddr.sin_port = htons(port); // 0-1023,
if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
printf("bind failed: %s\n", strerror(errno));
}
listen(sockfd, 10);
//printf("listen finshed: %d\n", sockfd); // 3
return sockfd;
}
--------fd的回调
listenfd可读时,执行accept回调
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
//printf("accept finshed: %d\n", clientfd);
if (clientfd < 0) {
printf("accept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
event_register(clientfd, EPOLLIN); // | EPOLLET
return 0;
}
- 在accept回调中,执行accept拿到一个fd,为clientfd,参照reactor的"管理者"struct cnn为fd绑定回调,fd绑定读写空间,抽象出来一个event_register。
- 这里set_event就是让epoll去管理这个id的IO,1为新增fd用于创建fd时,0为修改fd用于执行完fd的回调切换IO的监听状态。就不再贴出来了,可自行翻阅下文源码。
int event_register(int fd, int event) {
if (fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd, event, 1);
}
recv_cb 收一组数据
int recv_cb(int fd) {
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0) { // disconnect
printf("client disconnect: %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished
return 0;
} else if (count < 0) { //
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
set_event(fd, EPOLLOUT, 0);
return count;
}
}
send_cb 发一组数据
int send_cb(int fd) {
int count = 0;
if (conn_list[fd].wlength != 0) {
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
}
set_event(fd, EPOLLIN, 0)
return count;
}
--------在mainloop部署epoll
在mainloop中我们要做的就是让epoll不断地去检测是否有IO就绪,epoll就不在这里多说了
int main() {
epfd = epoll_create(1);
}
while (1) { // mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
}
}
--------reactor.c源码
内容可能跟前文有出入,结合后续的协议理解
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include "server.h"
#define CONNECTION_SIZE 1048576 // 1024 * 1024
#define MAX_PORTS 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int epfd = 0;
struct timeval begin;
struct conn conn_list[CONNECTION_SIZE] = {0};
// fd
int set_event(int fd, int event, int flag) {
if (flag) { // non-zero add
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else { // zero mod
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event) {
if (fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd, event, 1);
}
// listenfd(sockfd) --> EPOLLIN --> accept_cb
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
//printf("accept finshed: %d\n", clientfd);
if (clientfd < 0) {
printf("accept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
event_register(clientfd, EPOLLIN); // | EPOLLET
if ((clientfd % 1000) == 0) {
struct timeval current;
gettimeofday(¤t, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, ¤t, sizeof(struct timeval));
printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);
}
return 0;
}
int recv_cb(int fd) {
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0) { // disconnect
printf("client disconnect: %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinished
return 0;
} else if (count < 0) { //
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
conn_list[fd].rlength = count;
//printf("RECV: %s\n", conn_list[fd].rbuffer);
#if 0 // echo
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
#elif 0
http_request(&conn_list[fd]);
#else
ws_request(&conn_list[fd]);
#endif
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd) {
#if 0
http_response(&conn_list[fd]);
#else
ws_response(&conn_list[fd]);
#endif
int count = 0;
#if 0
if (conn_list[fd].status == 1) {
//printf("SEND: %s\n", conn_list[fd].wbuffer);
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLOUT, 0);
} else if (conn_list[fd].status == 2) {
set_event(fd, EPOLLOUT, 0);
} else if (conn_list[fd].status == 0) {
if (conn_list[fd].wlength != 0) {
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
}
set_event(fd, EPOLLIN, 0);
}
#else
if (conn_list[fd].wlength != 0) {
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
}
set_event(fd, EPOLLIN, 0);
#endif
//set_event(fd, EPOLLOUT, 0);
return count;
}
int init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
servaddr.sin_port = htons(port); // 0-1023,
if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
printf("bind failed: %s\n", strerror(errno));
}
listen(sockfd, 10);
//printf("listen finshed: %d\n", sockfd); // 3
return sockfd;
}
int main() {
unsigned short port = 2000;
epfd = epoll_create(1);
int i = 0;
for (i = 0;i < MAX_PORTS;i ++) {
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
}
gettimeofday(&begin, NULL);
while (1) { // mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
#if 0
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
} else if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
#else
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
#endif
}
}
}
https://github.com/0voice
更多推荐

所有评论(0)