Linux网络设计中的Reactor网络模型与百万级并发
结构说明:以fd作为索引,存放在block中;当一个fd到来时,通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。
数据结构的代码实现如下:
代码语言:javascript
AI代码解释
struct ntyevnt{ int fd;//事件fd char buffer[BUFFER_LENGTH];//缓冲区 int length;//缓存长度 int status;//状态 int events;//事件 void *arg;//callback的参数 int(*callback)(int fd, int events, void* arg);//回调函数 }; struct eventblock{ struct *sock_items;//事件集合 struct eventblock *next;//指向下一个内存块 }; struct reactor{ int epfd;//epoll的文件描述符 int blkcnt;//事件块的数量 struct eventblock *evtblk;//事件块的起始地址 };step 2:实现Reactor容器初始化功能
我们这里使用epoll作为IO多路复用器。 思路:初始化reactor内存块,避免脏数据;创建events和block并初始化,将events添加到block中,将block添加到reactor的链表中管理。
代码语言:javascript
AI代码解释
int ntyreactor_init(struct ntyreactor *reactor) { if (reactor == NULL) return -1; memset(reactor, 0, sizeof(struct ntyreactor)); //创建epoll,作为IO多路复用器 reactor->epfd = epoll_create(1); if (reactor->epfd <= 0) { printf("create epfd in %s error %s\n", __func__, strerror(errno)); return -2; } // 创建事件集 struct ntyevnt *events = (struct ntyevnt *)malloc(MAX_EPOLL_EVENTS * sizeof(struct ntyevnt)); if (events == NULL) { printf("create ntyevnt in %s error %s\n", __func__, strerror(errno)); close(reactor->epfd); return -3; } memset(events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); //创建事件内存块 struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock)); if (block == NULL) { printf("create eventblock in %s error %s\n", __func__, strerror(errno)); free(events); close(reactor->epfd); return -4; } block->events = events; block->next = NULL; // reactor初始化赋值 reactor->evblks=block; reactor->blkcnt = 1; return 0; }step 3:实现socket初始化功能
定义成一个函数,方便初始化多个监听端口。
代码语言:javascript
AI代码解释
int init_sock(short port) { int ret = 0; int fd = socket(AF_INET, SOCK_STREAM, 0);//创建套字接 if (fd == -1) { printf("create socket in %s error %s\n", __func__, strerror(errno)); return -1; } ret=fcntl(fd, F_SETFL, O_NONBLOCK);//设置非阻塞 if (ret == -1) { printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno)); return -1; } // 设置属性 struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET;// IPV4 server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port); // 绑定 ret = bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); if (ret == -1) { printf("bind() in %s error %s\n", __func__, strerror(errno)); return -1; } //监听 ret = listen(fd, 20); if (ret < 0) { printf("listen failed : %s\n", strerror(errno)); return -1; } printf("listen server port : %d\n", port); return fd; }step 4:实现Reactor动态扩容功能
为了实现高并发,服务器需要监听多个端口。当高并发时需要reactor容器进行扩容管理。 核心思路:找到链表的末端,分别为events和block分配内存并初始化,将events添加到block中,将block添加到reactor的链表中管理。
代码语言:javascript
AI代码解释
int ntyreactor_alloc(struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->evblks == NULL) return -1; //找到链表末端 struct eventblock *blk = reactor->evblks; while (blk->next != NULL) blk = blk->next; // 创建事件集 struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); if (evs == NULL) { printf("ntyreactor_alloc ntyevent failed\n"); return -2; } memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); // 创建事件块 struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock)); if (block == NULL) { printf("ntyreactor_alloc eventblock failed\n"); return -3; } block->events = evs; block->next = NULL; //实现扩容 blk->next = block; reactor->blkcnt++; return 0; }step 5:实现Reactor索引功能
思路:通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。 例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。
代码语言:javascript
AI代码解释
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) { if (reactor == NULL) return NULL; if (reactor->evblks == NULL) return NULL; // fd所在block序号 int blkidx = sockfd / MAX_EPOLL_EVENTS; while (blkidx >= reactor->blkcnt) { // 扩容 ntyreactor_alloc(reactor); } //找到fd对应block的位置 int i = 0; struct eventblock *blk = reactor->evblks; while (i++ != blkidx && blk != NULL) { blk = blk->next; } // 返回item 地址 return &blk->events[sockfd%MAX_EPOLL_EVENTS]; }step 6:实现设置事件信息功能
将事件的相关信息保存到数据结构中。主要实现填充关键信息到event结构体中。
代码语言:javascript
AI代码解释
void nty_event_set(struct ntyevent *ev,int fd,NCALLBACK callback,void *arg) { ev->fd = fd; ev->events = 0; ev->callback = callback; ev->arg = arg; }step 7:实现IO事件监听功能
这里使用epoll作为IO多路复用器,将事件添加到epoll中监听。 思路:主要是epoll_ctl操作,将事件添加到reactor的event结构体中。
代码语言:javascript
AI代码解释
int nty_event_add(int epfd, int events, struct ntyevent *ev) { // 设置epoll事件信息 struct epoll_event ep_ev = { 0,{ 0} }; ep_ev.data.ptr = ev; ep_ev.events = ev->events = events; // 判断,设置epfd的操作模式 int op; if (ev->status == 1) op = EPOLL_CTL_MOD; else { op = EPOLL_CTL_ADD; ev->status = 1; } // 设置epoll int ret = epoll_ctl(epfd, op, ev->fd, &ep_ev); if (ret < 0) { printf("event add failed [fd=%d], events[%d],ret:%d\n", ev->fd, events,ret); printf("event add failed in %s error %s\n", __func__, strerror(errno)); return -1; } return 0; }step 8:实现IO事件移除功能
由于设置了非阻塞模式,当事件到来时,需要暂时移除监听,避免干扰。
