前言:基于epoll的反应堆模式(reactor)的服务器程序,进行百万并发量的连接测试。通过代码优化,以及服务器与客户端的硬件配置优化,达到百万并发。
一、服务器:epoll-reactor
- 代码实现
#include <iostream> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <sys/poll.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <sys/time.h> using namespace std; #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) struct timeval tv_last; #define BUFFER_LEN 512 #define CLIENT_MAX_COUNT 1048576 typedef int (*EPOLL_CALLBACK)(int fd); struct CONNECT_ITEM { char rBuffer[BUFFER_LEN] = {0}; int rLen = 0; char wBuffer[BUFFER_LEN] = {0}; int wLen = 0; union { EPOLL_CALLBACK accept_callback = nullptr; EPOLL_CALLBACK recv_callback; }recv_t; EPOLL_CALLBACK send_callback = nullptr; }connect_item[CLIENT_MAX_COUNT]; int epfd = 0; enum _EPOLL_CTRL{ ADD, MOD }; void setEvent(int fd, EPOLL_EVENTS events, _EPOLL_CTRL ctrl) { epoll_event ev; ev.events = events; //默认水平触发(LT),有(数据)事件就会一直触发,知道全部处理完 /* EPOLLET为边沿触发(ET),当有事件发生时只触发一次, 比如来数据了,如果一次没有读完,不会再触发了,所以必须全部读完,在进行下一次epoll_wait */ //ev.events = EPOLLIN | EPOLLET; ev.data.fd = fd; epoll_ctl(epfd, ctrl == ADD ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, fd, &ev); } int recv_cb(int fd) { char* buffer = connect_item[fd].rBuffer; int index = connect_item[fd].rLen; int count = recv(fd, buffer + index, BUFFER_LEN - index, 0); if (count == 0) { //printf("disconnect: %d\n", fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); return count; }else if (count < 0) { //printf("error\n"); return count; } connect_item[fd].rLen += count; //这里的rLen可能会超出buffer的大小,这里就不做处理了 //printf("RECV===>>> clientfd: %d, count: %d, buffer: %s\n", fd, count, buffer); //改变该文件描述符的事件类型为EPOLLOUT setEvent(fd, EPOLLOUT, MOD); //发送buffer赋值 memcpy(connect_item[fd].wBuffer, connect_item[fd].rBuffer, connect_item[fd].rLen); connect_item[fd].wLen = connect_item[fd].rLen; //发送了多少,代表处理了多少 connect_item[fd].rLen -= connect_item[fd].wLen; return count; } int send_cb(int fd) { char* buffer = connect_item[fd].wBuffer; int index = connect_item[fd].wLen; int count = send(fd, buffer, connect_item[fd].wLen, 0); //改变该文件描述符的事件类型为EPOLLIN setEvent(fd, EPOLLIN, MOD); return count; } int accept_cb(int fd) { struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len); setEvent(clientfd, EPOLLIN, ADD); memset(connect_item[clientfd].rBuffer, 0, sizeof(connect_item[clientfd].rBuffer)); connect_item[clientfd].rLen = 0; memset(connect_item[clientfd].wBuffer, 0, sizeof(connect_item[clientfd].wBuffer)); connect_item[clientfd].wLen = 0; connect_item[clientfd].recv_t.recv_callback = recv_cb; connect_item[clientfd].send_callback = send_cb; //printf("ACCEPT===>>> clientfd:%d\n", clientfd); if (clientfd % 1000 == 999) { struct timeval tv_cur; gettimeofday(&tv_cur, NULL); printf("clientfd: %d, time_used: %d\n", clientfd, int(TIME_SUB_MS(tv_cur, tv_last))); tv_last = tv_cur; } return clientfd; } int init_server(int port) { int listenfd = socket(AF_INET, SOCK_STREAM, 0); sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); serverAddr.sin_port = htons(port); bind(listenfd, (sockaddr*)&serverAddr, sizeof(serverAddr)); listen(listenfd, 10); return listenfd; } int main() { epfd = epoll_create(1); // int size gettimeofday(&tv_last, NULL); for (int i = 0; i < 20; i++) { int listenfd = init_server(2048 + i); connect_item[listenfd].recv_t.accept_callback = accept_cb; setEvent(listenfd, EPOLLIN, ADD); } struct epoll_event events[100000] = {0}; while (1) { int nready = epoll_wait(epfd, events, 100000, -1); for (int i = 0; i < nready; i++) { int connfd = events[i].data.fd; if (events[i].events & EPOLLIN) { int count = connect_item[connfd].recv_t.recv_callback(connfd); }else if (events[i].events & EPOLLOUT) { connect_item[connfd].send_callback(connfd); } } } for (int i = 0; i < 20; i++) { close(i + 3); } return 0; }
二、客户端:multi_port_client_epoll
- 代码实现:
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> #include <errno.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <netdb.h> #include <fcntl.h> #include <sys/time.h> #include <unistd.h> #define MAX_BUFFER 128 #define MAX_EPOLLSIZE (384*1024) #define MAX_PORT 20 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int isContinue = 0; static int ntySetNonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int ntySetReUseAddr(int fd) { int reuse = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)); } int main(int argc, char **argv) { if (argc <= 2) { printf("Usage: %s ip port\n", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); int connections = 0; char buffer[128] = {0}; int i = 0, index = 0; struct epoll_event events[MAX_EPOLLSIZE]; int epoll_fd = epoll_create(MAX_EPOLLSIZE); strcpy(buffer, " Data From MulClient\n"); struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); struct timeval tv_begin; gettimeofday(&tv_begin, NULL); while (1) { if (++index >= MAX_PORT) index = 0; struct epoll_event ev; int sockfd = 0; if (connections < 340000 && !isContinue) { sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { perror("socket"); goto err; } //ntySetReUseAddr(sockfd); addr.sin_port = htons(port+index); if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { perror("connect"); goto err; } ntySetNonblock(sockfd); ntySetReUseAddr(sockfd); sprintf(buffer, "Hello Server: client --> %d\n", sockfd); send(sockfd, buffer, strlen(buffer), 0); ev.data.fd = sockfd; ev.events = EPOLLIN | EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev); connections++; } //connections ++; if (connections % 1000 == 999 || connections >= 340000) { struct timeval tv_cur; memcpy(&tv_cur, &tv_begin, sizeof(struct timeval)); gettimeofday(&tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used); int nfds = epoll_wait(epoll_fd, events, connections, 100); for (i = 0;i < nfds;i ++) { int clientfd = events[i].data.fd; if (events[i].events & EPOLLOUT) { //sprintf(buffer, "data from %d\n", clientfd); //send(clientfd, buffer, strlen(buffer), 0); //printf("send:%s\n", buffer); } if (events[i].events & EPOLLIN) { char rBuffer[MAX_BUFFER] = {0}; ssize_t length = recv(clientfd, rBuffer, MAX_BUFFER, 0); if (length > 0) { //printf(" RecvBuffer:%s\n", rBuffer); if (!strcmp(rBuffer, "quit")) { isContinue = 0; } } else if (length == 0) { printf(" Disconnect clientfd:%d\n", clientfd); connections --; close(clientfd); } else { if (errno == EINTR) continue; printf(" Error clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } // else { // printf(" clientfd:%d, errno:%d\n", clientfd, errno); // close(clientfd); // } } } usleep(10); } return 0; err: printf("error : %s\n", strerror(errno)); return 0; }
三、百万并发测试过程
1、硬件配置
- 1台服务器:8G运行内存 8核CPU
- 3台客户端:4G运行内存 4核CPU
这些硬件配置可以通过虚拟机配置。
按照客户端测试用例代码的实现,一台客户端最大可以创建340000个连接,而一台电脑的端口最多有65535个,一个端口建立一个连接,除去系统用的前1024个端口,我们可用的也最多只有65535-1024 = 64511个连接,而我们的客户端代码之所以能达到340000个连接,这是因为在tcp协议中的四元组概念,即(源IP,源port,目的IP,目的port),这个四元组构成一个tcp连接,只要四元组中的任何一个参数不同那么就是一个完全不同的连接,因此在源IP与目的IP都固定的情况下,我们从端口来入手,突破65535的限制,我们可以在服务端创建多个监听socket,分别绑定不同的端口,比如创建了20个监听socket,分别绑定在2048 ~ 2067端口,此时客户端就可以循环绑定这20个端口了,客户端的1个端口就可以在四元组里与服务端建立20个连接(这是因为服务端的目的port有20个),而客户端有1025 ~ 65535的端口可供应用程序使用,因此理论上就能建立(65535-1025) x 20 = 1290200个连接,已经超超过了百万,只不过我们的客户端代码限定了最大340000个连接。
三台客户端就可以实现百万级并发测试。我们接下来只演示一个客户端的并发测试,另外两台操作一样。
2、测试流程
(1) 在服务器上编译运行 目录一 的服务器代码:
g++ -o epoll-reactor epoll-reactor.cpp
./epoll-reactor
(2) 在客户端上编译运行 目录二 的客户端代码:
gcc -o mul_port_client_epoll mul_port_client_epoll.c
./mul_port_client_epoll 192.168.1.20 2048
192.168.1.20 2048 是服务器的地址与端口(根据自己的电脑传入自己的地址)
(3) 运行结果如图:
文章来源:https://www.toymoban.com/news/detail-828223.html
- 先看客户端提示:Too many open files,这是因为系统设置的最多可打开的文件描述符太少了,通过ulimit -a命令查看open files的限制大小为1024,最多可以打开1024个文件描述符,所以会提示connections只有999个,就报错了。通过ulimit -n 1048576 将其改为百万级的可打开的文件描述符个数(1048576为1024x1024)。如图:
- 再次在客户端运行测试用例,服务端也运行起来:
- 我们发现客户端仍然只创建了999个(实际比这多,只是代码每1000个连接打印一次)连接,就出错了,不过提示的错误不一样了:Connection refused(连接被拒绝),这是服务器拒绝了客户端的连接,我们看服务器的提示信息:Segmentation fault (core dumped),出现了段错误,仔细看代码没有内存越界的问题,可能也是open files的问题,我们也通过命令ulimit -n 1048576 将其也改为百万级,再次运行测试:
- 这次达到了19999个连接后,出现了Cannot assign requested address(不能分配请求的地址),这是因为linux系统还有一个参数限制了我们连接的数目,在/etc/sysctl.conf文件里有这么一个参数:net.ipv4.ip_local_port_range = 1024 2048,可供应用程序使用的端口范围是1024 ~ 2048,但是为什么客户端能创建19999个连接才挂掉呢,按照端口范围最多有2048-1024 = 1024个连接,这是因为我们前面所说到的四元组概念,我们的服务端提供了20个端口可供客户端去绑定,因此最多可创建20 x 1024 = 20480个连接,所以客户端达到了19999个连接后挂掉了。所以我们需要修改客户端的/etc/sysctl.conf文件里的这个参数net.ipv4.ip_local_port_range = 1024 65535。修改完成后,执行 sudo sysctl -p让其生效,如图:
- 如果出现cannot stat /proc/sys/net/nf_conntrack_max: No such file or directory的错误,执行 sudo modprobe ip_conntrack命令,然后再次执行sudo sysctl -p命令,让其生效。我们再次运行测试:
- 这次客户端连接数量达到了62999,然后服务器挂掉了,废话不多说,直接给出答案:这是因为客户端的/etc/sysctl.conf文件里还有一个参数:fs.file-max = 65535,这个参数代表的是文件描述符的最大值是65535,所以上图的sockfd只达到了63002(实际比63002多,因为测试用例的代码没有打印出来),我们修改fs.file-max = 1048576,保存退出后,再次执行sudo sysctl -p命令使其生效。然后我们再次测试,如下图:如果服务端再次出现段错误,并且客户端出现连接被拒绝,这是因为服务端的fs.file-max参数值没有被修改,我们也将其修改为1048576。
(在这里我提示一点,我在测试的过程中,不小心将fs.file-max改成了1024,改的太小了,以至于系统的任何编辑文件的命令都无法执行了,这样系统就不能用了。但是还有补救办法:执行 echo 65535 > /proc/sys/fs/file-max命令,然后再次打开/etc/sysctl.conf文件修改fs.file-max参数。)
我们再次运行测试:
文章来源地址https://www.toymoban.com/news/detail-828223.html
- 这次终于达到了340000个连接,我们的测试流程到这里也就结束了,想要达到百万级并发,我们只需要再准备两台电脑作为客户端,重复上述的操作即可。
到了这里,关于五、Linux C/C++ 对epoll-reactor服务器的百万级高并发实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!