五、Linux C/C++ 对epoll-reactor服务器的百万级高并发实现

这篇具有很好参考价值的文章主要介绍了五、Linux C/C++ 对epoll-reactor服务器的百万级高并发实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言:基于epoll的反应堆模式(reactor)的服务器程序,进行百万并发量的连接测试。通过代码优化,以及服务器与客户端的硬件配置优化,达到百万并发。

一、服务器:epoll-reactor

  • 代码实现
    #include <iostream>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <string.h>
    #include <sys/poll.h>
    #include <unistd.h>
    #include <arpa/inet.h>
    #include <sys/epoll.h>
    #include <sys/time.h>
    
    using namespace std;
    
    #define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
    struct timeval tv_last;
    
    #define BUFFER_LEN 512
    #define CLIENT_MAX_COUNT 1048576
    typedef int (*EPOLL_CALLBACK)(int fd);
    struct CONNECT_ITEM {
    	char rBuffer[BUFFER_LEN] = {0};
    	int rLen = 0;
    	char wBuffer[BUFFER_LEN] = {0};
    	int wLen = 0;
    
    	union {
    		EPOLL_CALLBACK accept_callback = nullptr;
    		EPOLL_CALLBACK recv_callback;
    	}recv_t;
    	EPOLL_CALLBACK send_callback = nullptr;
    }connect_item[CLIENT_MAX_COUNT];
    
    int epfd = 0;
    
    enum _EPOLL_CTRL{
    	ADD,
    	MOD
    };
    
    void setEvent(int fd, EPOLL_EVENTS events, _EPOLL_CTRL ctrl) {
    	epoll_event ev;
    	ev.events = events; //默认水平触发(LT),有(数据)事件就会一直触发,知道全部处理完
    	/*
    		EPOLLET为边沿触发(ET),当有事件发生时只触发一次,
    		比如来数据了,如果一次没有读完,不会再触发了,所以必须全部读完,在进行下一次epoll_wait
    	*/
    	//ev.events = EPOLLIN | EPOLLET;
    	ev.data.fd = fd;
    	epoll_ctl(epfd, ctrl == ADD ? EPOLL_CTL_ADD : EPOLL_CTL_MOD, fd, &ev);
    }
    
    int recv_cb(int fd) {
    	char* buffer = connect_item[fd].rBuffer;
    	int index = connect_item[fd].rLen;
    
    	int count = recv(fd, buffer + index, BUFFER_LEN - index, 0);
    	
    	if (count == 0) {
    		//printf("disconnect: %d\n", fd);
    
    		epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);		
    		close(fd);
    		
    		return count;
    	}else if (count < 0) {
    		//printf("error\n");
    
    		return count;
    	}
    
    	connect_item[fd].rLen += count; //这里的rLen可能会超出buffer的大小,这里就不做处理了
    	//printf("RECV===>>> clientfd: %d, count: %d, buffer: %s\n", fd, count, buffer);
    
    	//改变该文件描述符的事件类型为EPOLLOUT
    	setEvent(fd, EPOLLOUT, MOD);
    
    	//发送buffer赋值
    	memcpy(connect_item[fd].wBuffer, connect_item[fd].rBuffer, connect_item[fd].rLen);
    	connect_item[fd].wLen = connect_item[fd].rLen;
    
    	//发送了多少,代表处理了多少
    	connect_item[fd].rLen -= connect_item[fd].wLen;
    	
    	return count;
    }
    
    int send_cb(int fd) {
    	char* buffer = connect_item[fd].wBuffer;
    	int index = connect_item[fd].wLen;
    	
    	int count = send(fd, buffer, connect_item[fd].wLen, 0);
    
    	//改变该文件描述符的事件类型为EPOLLIN
    	setEvent(fd, EPOLLIN, MOD);
    
    	return count;
    }
    
    int accept_cb(int fd) {
    	struct sockaddr_in clientaddr;
    	socklen_t len = sizeof(clientaddr);
    	
    	int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    
    	setEvent(clientfd, EPOLLIN, ADD);
    
    	memset(connect_item[clientfd].rBuffer, 0, sizeof(connect_item[clientfd].rBuffer));
    	connect_item[clientfd].rLen = 0;
    	memset(connect_item[clientfd].wBuffer, 0, sizeof(connect_item[clientfd].wBuffer));
    	connect_item[clientfd].wLen = 0;
    	connect_item[clientfd].recv_t.recv_callback = recv_cb;
    	connect_item[clientfd].send_callback = send_cb;
    	
    	//printf("ACCEPT===>>> clientfd:%d\n", clientfd);
    
    	if (clientfd % 1000 == 999) {
    		struct timeval tv_cur;
    		gettimeofday(&tv_cur, NULL);
    		printf("clientfd: %d, time_used: %d\n", clientfd, int(TIME_SUB_MS(tv_cur, tv_last)));
    
    		tv_last = tv_cur;
    	}
    
    	return clientfd;
    }
    
    int init_server(int port) {
    	int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
        sockaddr_in serverAddr;
        memset(&serverAddr, 0, sizeof(serverAddr));
        serverAddr.sin_family = AF_INET;
        serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        serverAddr.sin_port = htons(port);
        bind(listenfd, (sockaddr*)&serverAddr, sizeof(serverAddr));
    
        listen(listenfd, 10);
    
    	return listenfd;
    }
    
    int main() {
        epfd = epoll_create(1); // int size
    	
    	gettimeofday(&tv_last, NULL);
    
    	for (int i = 0; i < 20; i++) {
    		int listenfd = init_server(2048 + i);
    
    		connect_item[listenfd].recv_t.accept_callback = accept_cb;
    		setEvent(listenfd, EPOLLIN, ADD);
    	}
    
    	struct epoll_event events[100000] = {0};
    	while (1) {
    		int nready = epoll_wait(epfd, events, 100000, -1);
    
    		for (int i = 0; i < nready; i++) {
    			int connfd = events[i].data.fd;
    
    		    if (events[i].events & EPOLLIN) {
    				int count = connect_item[connfd].recv_t.recv_callback(connfd);
    			}else if (events[i].events & EPOLLOUT) {
    				connect_item[connfd].send_callback(connfd);
    			}
    		}
    	}
    
    	for (int i = 0; i < 20; i++) {
        	close(i + 3);
    	}
    
        return 0;
    }
    

二、客户端:multi_port_client_epoll

  • 代码实现:
    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <errno.h>
    #include <netinet/tcp.h>
    #include <arpa/inet.h>
    #include <netdb.h>
    #include <fcntl.h>
    #include <sys/time.h>
    #include <unistd.h>
    
    
    #define MAX_BUFFER		128
    #define MAX_EPOLLSIZE	(384*1024)
    #define MAX_PORT		20
    
    #define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
    
    int isContinue = 0;
    
    static int ntySetNonblock(int fd) {
    	int flags;
    
    	flags = fcntl(fd, F_GETFL, 0);
    	if (flags < 0) return flags;
    	flags |= O_NONBLOCK;
    	if (fcntl(fd, F_SETFL, flags) < 0) return -1;
    	return 0;
    }
    
    static int ntySetReUseAddr(int fd) {
    	int reuse = 1;
    	return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
    }
    
    
    
    int main(int argc, char **argv) {
    	if (argc <= 2) {
    		printf("Usage: %s ip port\n", argv[0]);
    		exit(0);
    	}
    
    	const char *ip = argv[1];
    	int port = atoi(argv[2]);
    	int connections = 0;
    	char buffer[128] = {0};
    	int i = 0, index = 0;
    
    	struct epoll_event events[MAX_EPOLLSIZE];
    	
    	int epoll_fd = epoll_create(MAX_EPOLLSIZE);
    	
    	strcpy(buffer, " Data From MulClient\n");
    		
    	struct sockaddr_in addr;
    	memset(&addr, 0, sizeof(struct sockaddr_in));
    	
    	addr.sin_family = AF_INET;
    	addr.sin_addr.s_addr = inet_addr(ip);
    
    	struct timeval tv_begin;
    	gettimeofday(&tv_begin, NULL);
    
    	while (1) {
    		if (++index >= MAX_PORT) index = 0;
    		
    		struct epoll_event ev;
    		int sockfd = 0;
    
    		if (connections < 340000 && !isContinue) {
    			sockfd = socket(AF_INET, SOCK_STREAM, 0);
    			if (sockfd == -1) {
    				perror("socket");
    				goto err;
    			}
    
    			//ntySetReUseAddr(sockfd);
    			addr.sin_port = htons(port+index);
    
    			if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
    				perror("connect");
    				goto err;
    			}
    			ntySetNonblock(sockfd);
    			ntySetReUseAddr(sockfd);
    
    			sprintf(buffer, "Hello Server: client --> %d\n", sockfd);
    			send(sockfd, buffer, strlen(buffer), 0);
    
    			ev.data.fd = sockfd;
    			ev.events = EPOLLIN | EPOLLOUT;
    			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
    		
    			connections++;
    		}
    		//connections ++;
    		if (connections % 1000 == 999 || connections >= 340000) {
    			struct timeval tv_cur;
    			memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
    			
    			gettimeofday(&tv_begin, NULL);
    
    			int time_used = TIME_SUB_MS(tv_begin, tv_cur);
    			printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
    
    			int nfds = epoll_wait(epoll_fd, events, connections, 100);
    			for (i = 0;i < nfds;i ++) {
    				int clientfd = events[i].data.fd;
    
    				if (events[i].events & EPOLLOUT) {
    					//sprintf(buffer, "data from %d\n", clientfd);
    					//send(clientfd, buffer, strlen(buffer), 0);
    					//printf("send:%s\n", buffer);
    				}
    				
    				if (events[i].events & EPOLLIN) {
    					char rBuffer[MAX_BUFFER] = {0};				
    					ssize_t length = recv(clientfd, rBuffer, MAX_BUFFER, 0);
    					if (length > 0) {
    						//printf(" RecvBuffer:%s\n", rBuffer);
    
    						if (!strcmp(rBuffer, "quit")) {
    							isContinue = 0;
    						}
    						
    					} else if (length == 0) {
    						printf(" Disconnect clientfd:%d\n", clientfd);
    						connections --;
    						close(clientfd);
    					} else {
    						if (errno == EINTR) continue;
    
    						printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
    						close(clientfd);
    					}
    				}
    				
    				// else {
    				// 	printf(" clientfd:%d, errno:%d\n", clientfd, errno);
    				// 	close(clientfd);
    				// }
    			}
    		}
    
    		usleep(10);
    	}
    
    	return 0;
    
    err:
    	printf("error : %s\n", strerror(errno));
    	return 0;
    }
    

三、百万并发测试过程

1、硬件配置

  • 1台服务器:8G运行内存 8核CPU
  • 3台客户端:4G运行内存 4核CPU

这些硬件配置可以通过虚拟机配置。

按照客户端测试用例代码的实现,一台客户端最大可以创建340000个连接,而一台电脑的端口最多有65535个,一个端口建立一个连接,除去系统用的前1024个端口,我们可用的也最多只有65535-1024 = 64511个连接,而我们的客户端代码之所以能达到340000个连接,这是因为在tcp协议中的四元组概念,即(源IP,源port,目的IP,目的port),这个四元组构成一个tcp连接,只要四元组中的任何一个参数不同那么就是一个完全不同的连接,因此在源IP与目的IP都固定的情况下,我们从端口来入手,突破65535的限制,我们可以在服务端创建多个监听socket,分别绑定不同的端口,比如创建了20个监听socket,分别绑定在2048 ~ 2067端口,此时客户端就可以循环绑定这20个端口了,客户端的1个端口就可以在四元组里与服务端建立20个连接(这是因为服务端的目的port有20个),而客户端有1025 ~ 65535的端口可供应用程序使用,因此理论上就能建立(65535-1025) x 20 = 1290200个连接,已经超超过了百万,只不过我们的客户端代码限定了最大340000个连接。

三台客户端就可以实现百万级并发测试。我们接下来只演示一个客户端的并发测试,另外两台操作一样。

2、测试流程

(1) 在服务器上编译运行 目录一 的服务器代码:

g++ -o epoll-reactor epoll-reactor.cpp
./epoll-reactor

(2) 在客户端上编译运行 目录二 的客户端代码:

gcc -o mul_port_client_epoll mul_port_client_epoll.c
./mul_port_client_epoll 192.168.1.20 2048

192.168.1.20 2048 是服务器的地址与端口(根据自己的电脑传入自己的地址)

(3) 运行结果如图:
百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言

  • 先看客户端提示:Too many open files,这是因为系统设置的最多可打开的文件描述符太少了,通过ulimit -a命令查看open files的限制大小为1024,最多可以打开1024个文件描述符,所以会提示connections只有999个,就报错了。通过ulimit -n 1048576 将其改为百万级的可打开的文件描述符个数(1048576为1024x1024)。如图:
    百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言
  • 再次在客户端运行测试用例,服务端也运行起来:
    百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言
  • 我们发现客户端仍然只创建了999个(实际比这多,只是代码每1000个连接打印一次)连接,就出错了,不过提示的错误不一样了:Connection refused(连接被拒绝),这是服务器拒绝了客户端的连接,我们看服务器的提示信息:Segmentation fault (core dumped),出现了段错误,仔细看代码没有内存越界的问题,可能也是open files的问题,我们也通过命令ulimit -n 1048576 将其也改为百万级,再次运行测试:
    百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言
  • 这次达到了19999个连接后,出现了Cannot assign requested address(不能分配请求的地址),这是因为linux系统还有一个参数限制了我们连接的数目,在/etc/sysctl.conf文件里有这么一个参数:net.ipv4.ip_local_port_range = 1024 2048,可供应用程序使用的端口范围是1024 ~ 2048,但是为什么客户端能创建19999个连接才挂掉呢,按照端口范围最多有2048-1024 = 1024个连接,这是因为我们前面所说到的四元组概念,我们的服务端提供了20个端口可供客户端去绑定,因此最多可创建20 x 1024 = 20480个连接,所以客户端达到了19999个连接后挂掉了。所以我们需要修改客户端的/etc/sysctl.conf文件里的这个参数net.ipv4.ip_local_port_range = 1024 65535。修改完成后,执行 sudo sysctl -p让其生效,如图:
    百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言
  • 如果出现cannot stat /proc/sys/net/nf_conntrack_max: No such file or directory的错误,执行 sudo modprobe ip_conntrack命令,然后再次执行sudo sysctl -p命令,让其生效。我们再次运行测试:
    百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言
  • 这次客户端连接数量达到了62999,然后服务器挂掉了,废话不多说,直接给出答案:这是因为客户端的/etc/sysctl.conf文件里还有一个参数:fs.file-max = 65535,这个参数代表的是文件描述符的最大值是65535,所以上图的sockfd只达到了63002(实际比63002多,因为测试用例的代码没有打印出来),我们修改fs.file-max = 1048576,保存退出后,再次执行sudo sysctl -p命令使其生效。然后我们再次测试,如下图:如果服务端再次出现段错误,并且客户端出现连接被拒绝,这是因为服务端的fs.file-max参数值没有被修改,我们也将其修改为1048576。
    百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言
    在这里我提示一点,我在测试的过程中,不小心将fs.file-max改成了1024,改的太小了,以至于系统的任何编辑文件的命令都无法执行了,这样系统就不能用了。但是还有补救办法:执行 echo 65535 > /proc/sys/fs/file-max命令,然后再次打开/etc/sysctl.conf文件修改fs.file-max参数。

我们再次运行测试:
百万级并发需要大服务器吗,LinuxC/C++,网络IO,服务器,linux,c++,c语言文章来源地址https://www.toymoban.com/news/detail-828223.html

  • 这次终于达到了340000个连接,我们的测试流程到这里也就结束了,想要达到百万级并发,我们只需要再准备两台电脑作为客户端,重复上述的操作即可。

到了这里,关于五、Linux C/C++ 对epoll-reactor服务器的百万级高并发实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • TCP服务器的演变过程:使用epoll构建reactor网络模型实现百万级并发(详细代码)

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节介绍了如何使用epoll开发高效的服务器,本节将介绍使用epoll构建reactor网络模型,实

    2024年02月01日
    浏览(54)
  • Linux多路IO复用技术——epoll详解与一对多服务器实现

    本文详细介绍了Linux中epoll模型的优化原理和使用方法,以及如何利用epoll模型实现简易的一对多服务器。通过对epoll模型的优化和相关接口的解释,帮助读者理解epoll模型的工作原理和优缺点,同时附带代码实现和图解说明。

    2024年02月05日
    浏览(33)
  • Linux学习记录——사십삼 高级IO(4)--- Epoll型服务器(1)

    poll依然需要OS去遍历所有fd。一个进程去多个特定的文件中等待,只要有一个就绪,就使用select/poll系统调用,让操作系统把所有文件遍历一遍,哪些就绪就加上哪些fd,再返回。一旦文件太多了,遍历效率就显而易见地低。epoll是为处理大批量句柄而作了改进的poll,句柄就是

    2024年01月18日
    浏览(41)
  • Linux网络编程:多路I/O转接服务器(select poll epoll)

    文章目录: 一:select 1.基础API  select函数 思路分析 select优缺点 2.server.c 3.client.c 二:poll 1.基础API  poll函数  poll优缺点 read函数返回值 突破1024 文件描述符限制 2.server.c 3.client.c 三:epoll 1.基础API epoll_create创建   epoll_ctl操作  epoll_wait阻塞 epoll实现多路IO转接思路 epoll优缺点

    2024年02月11日
    浏览(42)
  • day-08 基于Linux的网络编程(套接字和标准I/O、分离I/O流、epoll、多线程服务器)

    标准I/O函数(stdio)是在C语言中用于进行输入和输出操作的库函数 。它们包括了一组标准的输入和输出函数,如printf、scanf、fopen、fclose等。标准I/O函数具有以下优点: 简单易用 :标准I/O函数提供了简洁的接口,使得输入和输出操作变得简单易用。开发人员无需自行处理底层

    2024年02月09日
    浏览(46)
  • epoll实现并发服务器

    epoll 是Linux操作系统提供的一种事件通知机制,用于高效处理大量文件描述符上的事件。它是一种基于内核的I/O事件通知接口,可以用于实现高性能的并发服务器和异步I/O操作。 与传统的事件通知机制(如 select 和 poll )相比, epoll 具有更高的性能和扩展性。它采用了一种基

    2024年02月09日
    浏览(39)
  • 服务器IO复用reactor模式

    调试: Linux下nc命令作为客户端: nc 127.0.0.1 7777

    2024年02月10日
    浏览(32)
  • epoll并发服务器的实现

    1.实现并发通信的三种方式 ​ 实现并发通信主要有三种方式: 多进程服务器 、 多路复用服务器 (I/O复用)、 多线程服务器 多进程服务器 ​ 多进程服务器指的是利用不同进程处理来自不同客户端发来的连接请求,进程之间以轮转的方式运行,由于各个进程之间轮转运行的时

    2024年02月03日
    浏览(42)
  • epoll多路复用_并发服务器

    应用程序: 驱动程序:

    2024年02月15日
    浏览(43)
  • 【网络进阶】服务器模型Reactor与Proactor

    在高并发编程和网络连接的消息处理中,通常可分为两个阶段:等待消息就绪和消息处理。当使用默认的阻塞套接字时(例如每个线程专门处理一个连接),这两个阶段往往是合并的。因此,处理套接字的线程需要等待消息就绪,这在高并发场景下导致线程频繁地休眠和唤醒

    2024年02月01日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包