IO多路复用有几种实现方式:select poll和epoll。本篇文章对epoll进行总结理解。
IO多路复用的含义,我个人的理解是通过一个线程实现对多个socket的侦听,epoll与select和poll的区别是epoll效率最高。select的最高管理1024个socket并且是通过轮询的方式实现的管理,管理的socket个数越多,耗时越长。而epoll则没有1024这个限制,并且不是通过轮询的方式实现,这也是epoll应用于高并发的场景的原因所在。
epoll是一种IO事件通知机制。
select | poll | epoll | |
---|---|---|---|
性能 | 随着连接数的增加,性能急剧下降,处理成千上万的并发连接数时,性能很差 | 随着连接数的增加,性能急剧下降,处理成千上万的并发连接数时,性能很差 | 随着连接数的增加,性能基本没有变化 |
连接数 | 一般1024 | 无限制 | 无限制 |
内存拷贝 | 每次调用select拷贝 | 每次调用poll拷贝 | fd首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝 |
数据结构 | bitmap | 数组 | 红黑树 |
内在处理机制 | 线性轮询 | 线性轮询 | FD挂在红黑树,通过事件回调callback |
时间复杂度 | O(n) | O(n) | O(log(n)) |
epoll是IO多路复用的一种实现方式,也是目前主流的高并发实现方案。
epoll的作用
经常看到epoll的作用,也知道他是IO多路复用的一种实现形式,但是由于过往经历使用select比较多,对epoll总是知其然,而不知其所以然。
epoll主要用于对socket进行侦听,实现一个线程对多个socket的管理,相对于select和poll能够有效的减少系统开销,性能稳定。
epoll的API接口
int epoll_create(int size);
功能:该函数生成一个 epoll 专用的文件描述符。
参数size: 用来告诉内核这个监听的数目一共有多大,参数 size 并不是限制了 epoll 所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。自从 linux 2.6.8 之后,size 参数是被忽略的,也就是说可以填只有大于 0 的任意值。返回值:如果成功,返回poll 专用的文件描述符,否者失败,返回-1。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能:epoll 的事件注册函数,它不同于 select() 是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。参数epfd: epoll 专用的文件描述符,epoll_create()的返回值参数op: 表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
参数fd: 需要监听的文件描述符参数event: 告诉内核要监听什么事件,struct epoll_event 结构如:events 可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET :将 EPOLL 设为边缘触发(Edge Trigger)模式,这是相对于水平触发(Level Trigger)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里返回值:0表示成功,-1表示失败。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
功能:等待事件的产生,收集在 epoll 监控的事件中已经发送的事件,类似于 select() 调用。参数epfd: epoll 专用的文件描述符,epoll_create()的返回值参数events: 分配好的 epoll_event 结构体数组,epoll 将会把发生的事件赋值到events 数组中(events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)。参数maxevents: maxevents 告之内核这个 events 有多少个 。参数timeout: 超时时间,单位为毫秒,为 -1 时,函数为阻塞。返回值:
如果成功,表示返回需要处理的事件数目
如果返回0,表示已超时
如果返回-1,表示失败
epoll为什么高效?
说到epoll为什么高效,还是要从IO多路复现的实现历史说起,IO多路复用的实现最初是select,然后select有几个问题:
- 默认的select实现管理的socket数量一般为1024,数量存在限制,虽然可以修改,但是需要重新编译内核
- 每次调用select接口,都会将侦听的fd的数组从用户态内存拷贝到内核态缓冲区;另外当有socket可读或者可写时也会将fd数组从内核态缓冲区拷贝至用户态内存。用户态至内核态或者内核态至用户态数据的拷贝,这样的拷贝对于资源的消耗是很大的。
- 无论是内核态还是用户态由于保存fd的是一个数组,都需要通过轮询的方式遍历fd数组,找到可读或者可写的fd,当fd数量增大时,性能是下降的。
select运行原理示意视频:
select-CSDN直播
针对select存在这样的问题,后续发展出了poll,但是poll相对于select的优化有限,仅仅只改善了select管理socket上线的问题,其余两点都没有进行优化。
再往后就发展了出了epoll,epoll相对于select和poll出现了跨越式的改进,将select涉及的问题都做了响应的改进:
- 管理的socket无上限,而且是通过函数传参的形式指定管理的socket个数,而select是通过头文件中的FD_SIZE来指定的。不言而喻,通过函数传参的方式更灵活。
- epoll内部管理fd的数据结构是红黑树,查找、修改和删除的时间复杂度都很优秀。
- epoll_wait的每次调用不会向select调用一样,每次都会产生用户态到内核态的拷贝,从而减少资源消耗
- 当内核检测到某个fd的可读或者可写事件时,会自动调用该fd的poll回调函数,将该fd的信息拷贝到数组中
- epoll仅会将检测到可读可写的事件fd写入到数组中,传递到用户态内存中,这一点与select是不同的,select是要所有监听的fd的集合拷贝到用户区中。
总结起来就是:
- 管理的socket无上限
- 用户态内存和内核缓冲区内存拷贝次数减少
- 传递出的可读或者可写的事件仅包含这些可读可写的fd,这一点也是与select不同的,select传出的是所有fd的集合。
epoll运行原理示意视频:
epoll-CSDN直播
epoll的触发方式
epoll有两种触发方式,一种是水平触发,一种是边缘触发。
- 水平触发,这种触发方式的含义是只要读缓冲区存在数据,epoll会一直提示该fd有可读事件;当为写缓冲区时,如果写缓冲区空间不满,则epoll_wait会提示用户该fd有可写事件。epoll默认的触发方式是水平触发。
对于读操作,只要缓冲内容不为空,LT模式返回读就绪。
对于写操作,只要缓冲区还不满,LT模式会返回写就绪。
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在尚没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你。如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
- 边缘触发,只有当缓冲区的状态发生变化的时候才会触发可读可写事件。例如读缓冲区内由无数据变为有数据,只有此种情况下才会触发可读事件,也就是说对于读缓冲区,读缓冲区从数据变为有数据,只会发送一次可读事件,至于读缓冲区内的事件是否读完不太关心,需要用户自己去处理;若为写缓冲区,写缓冲区由不可写入变为可以写入的情况下会触发可写事件,其余情况不会触发该事件。若要修改边沿触发模式,则需要调用epoll_ctl接口修改,在event参数中添加EPOLLET即可。
对于读操作
当缓冲区由不可读变为可读的时候,即缓冲区由空变为不空的时候。
当有新数据到达时,即缓冲区中的待读数据变多的时候。
当缓冲区有数据可读,且应用进程对相应的描述符进行EPOLL_CTL_MOD 修改EPOLLIN事件时。
对于写操作
当缓冲区由不可写变为可写时。
当有旧数据被发送走,即缓冲区中的内容变少的时候。
当缓冲区有空间可写,且应用进程对相应的描述符进行EPOLL_CTL_MOD 修改EPOLLOUT事件时。
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。
在ET模式下, 缓冲区从不可读变成可读,会唤醒应用进程,缓冲区数据变少的情况,则不会再唤醒应用进程。
对于水平触发和边缘触发更形象的解释:
水平触发:0为无数据,1为有数据。缓冲区有数据则一直为1,则一直触发。
边缘触发发:0为无数据,1为有数据,只要在0变到1的上升沿才触发。
JDK并没有实现边缘触发,Netty重新实现了epoll机制,采用边缘触发方式;另外像Nginx也采用边缘触发。
JDK在Linux已经默认使用epoll方式,但是JDK的epoll采用的是水平触发,而Netty重新实现了epoll机制,采用边缘触发方式,netty epoll transport 暴露了更多的nio没有的配置参数,如 TCP_CORK, SO_REUSEADDR等等;另外像Nginx也采用边缘触发。
1、对于非阻塞socket,如果epoll使用边缘模式检测事件可读,那么一旦触发,一定要一次性把socket上数据收取干净,即循环调用recv函数直到recv出错
bool recvEtMode()
{
// 每次只收取256个字节
char buf[256];
while (true) {
int nRecv = ::recv(clientfd, buf, 256, 0);
if (nRecv == -1) {
if (errno == EWOULDBLOCK) {
return true;
} else if (errno == EINTR) {
continue;
} else {
return false;
}
}
else if (nRecv == 0) {
// 对端关闭了socket
return false;
} else {
inputBuffer.add(buf, (size_t)nRecv);
}
}
return true;
}
2、如果是水平模式,可以根据业务一次性收取固定字节数
下面总结一下两者在编码上需要注意的地方:
1、LT模式下,读事件触发后可以按需收取想要的字节数,不用把本次数据收取干净;
ET模式下,读事件必须把数据收取干净,因为我们不一定再有机会收取数据了。
2、LT模式下,不需要写事件时一定要及时移除,避免不必要地触发且浪费CPU资源。
ET模式下,写事件触发后,如果还需要下一次的写事件触发来驱动任务(例如发送上次剩余的数据),则我们需要继续注册一次检测可写事件
3、LT会导致多次触发,ET优点是触发次数少
epoll代码运行实例
/*************************************************************************
# > File Name:server.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Tue 08 Nov 2022 02:07:34 PM CST
************************************************************************/
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <iostream>
using namespace std;
typedef struct socketinfo
{
int fd; //要操作的文件描述符
int epfd; //红黑树实例
} SocketInfo;
void *acceptConn(void *arg)
{
//printf("acception tid: %ld\n", pthread_self());
cout<<"acception tid: "<<pthread_self()<<endl;
SocketInfo *info = (SocketInfo *)arg;
cout<<"acceptConn 1111111"<<endl;
// 建立新的连接
int cfd = accept(info->fd, NULL, NULL);
cout<<"#########acceptConn cfd : "<<cfd<<endl;
// 将文件描述符设置为非阻塞
// 得到文件描述符的属性
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
// 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
// 通信的文件描述符检测读缓冲区数据的时候设置为边沿模式
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 读缓冲区是否有数据
ev.data.fd = cfd;
int ret = epoll_ctl(info->epfd, EPOLL_CTL_ADD, cfd, &ev);
if (ret == -1)
{
perror("epoll_ctl-accept");
exit(0);
}
free(info);
return NULL;
}
void *communication(void *arg)
{
printf("communication tid: %ld\n", pthread_self());
SocketInfo *info = (SocketInfo *)arg;
int curfd = info->fd;
int epfd = info->epfd;
// 处理通信的文件描述符
// 接收数据
char buf[5];
char temp[1024];
memset(buf, 0, sizeof(buf));
bzero(temp, sizeof(temp));
// 循环读数据
while (1)
{
int len = recv(curfd, buf, sizeof(buf), 0);
if (len == 0)
{
// 非阻塞模式下和阻塞模式是一样的 => 判断对方是否断开连接
printf("客户端断开了连接...\n");
// 将这个文件描述符从epoll模型中删除
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
close(curfd);
break;
}
else if (len > 0)
{
// 通信
// 接收的数据打印到终端
for (int i = 0; i < len; i++)
{
buf[i] = toupper(buf[i]);
}
strncat(temp + strlen(temp), buf, len);
write(STDOUT_FILENO, buf, len);
// 发送数据
// send(curfd, buf, len, 0);
}
else
{
// len == -1
if (errno == EAGAIN)
{
printf("数据读完了...\n");
//发送数据
send(curfd, temp, strlen(temp) + 1, 0);
break;
}
else
{
perror("recv error");
break;
// exit(0); //不能exit因为会结束整个程序
}
}
}
free(info);
return NULL;
}
// server
int main(int argc, const char *argv[])
{
//printf("begin\n");
std::cout<<"begin"<<std::endl;
// 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
printf("create socket = %d\n", lfd);
if (lfd == -1)
{
perror("socket error");
exit(1);
}
// 绑定
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(9527);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP
// 127.0.0.1
// inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr);
// 设置端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 绑定端口
int ret = bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret == -1)
{
perror("bind error");
exit(1);
}
// 监听
ret = listen(lfd, 64);
if (ret == -1)
{
perror("listen error");
exit(1);
}
printf("已完成初始化\n");
// 现在只有监听的文件描述符
// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
// 创建一个epoll模型
int epfd = epoll_create(100);
if (epfd == -1)
{
perror("epoll_create");
exit(0);
}
// 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
struct epoll_event ev;
ev.events = EPOLLIN; // 检测lfd读读缓冲区是否有数据
ev.data.fd = lfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
if (ret == -1)
{
perror("epoll_ctl");
exit(0);
}
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(struct epoll_event);
// 持续检测
while (1)
{
// 调用一次, 检测一次
std::cout<<" epoll_wait "<<std::endl;
int num = epoll_wait(epfd, evs, size, -1);
//printf("==== num: %d\n", num);
std::cout<<"==== num: "<<num<<std::endl;
pthread_t tid;
for (int i = 0; i < num; ++i)
{
// 取出当前的文件描述符
int curfd = evs[i].data.fd;
SocketInfo *info = (SocketInfo *)malloc(sizeof(SocketInfo));
info->fd = curfd;
info->epfd = epfd;
// 判断这个文件描述符是不是用于监听的
printf("curfd = %d, lfd=%d\n", curfd, lfd);
if (curfd == lfd)
{
pthread_create(&tid, NULL, acceptConn, (void *)info);
pthread_detach(tid);
}
else
{
pthread_create(&tid, NULL, communication, (void *)info);
pthread_detach(tid);
}
}
}
return 0;
}
客户端代码
#include <QCoreApplication>
//int main(int argc, char *argv[])
//{
// QCoreApplication a(argc, argv);
// return a.exec();
//}
/*************************************************************************
# > File Name:client.c
# > Author: Jay
# > Mail: billysturate@gmail.com
# > Created Time: Tue 08 Nov 2022 03:10:51 PM CST
************************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define MAXLINE 80
#define SERV_PORT 9527
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
{
perror("create failed");
exit(1);
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);
int i = connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
if (i < 0)
{
perror("connect failed");
exit(1);
}
int num = 0;
printf("服务器连接成功\n");
while (1)
{
sprintf(buf, "hello, world, %d\n", num++);
printf("send msg: %s\n", buf);
write(sockfd, buf, strlen(buf) + 1);
recv(sockfd, buf, sizeof(buf), 0);
printf("recv msg:%s\n", buf);
usleep(10000);
}
recv(sockfd, buf, sizeof(buf), 0);
printf("-------recv msg:%s\n", buf);
printf("-------over-----------\n");
close(sockfd);
return 0;
}
epoll使用步骤介绍
1、创建侦听socket
2、设置socket属性(可选)
3、调用bind listen接口
4、通过epoll_create接口创建epoll实例
5、调用epoll_ctl接口注册侦听socket,注册可读事件
6、在通过accept接口创建子socket后,再次通过epoll_ctl注册子socket的可读事件
7、epoll_wait收到可读可写事件后,进行根据输出参数中的fd值或者事件类型进行区别操作,这里读写操作可以分开。
上面的的例子都是注册的可读事件,下面给出一个即注册可读事件,又注册可写事件的例子:
/**********************server***********************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <sys/syscall.h>
#define SERVER_CONSUMER "./server_consumer"
#define CLIENT_SERVER "./client_server"
#define N_THREAD 3 //线程数
#define DATA_SIZE 5 //数据大小
#define BUFFER_SIZE 20 //缓冲区大小
int fd[N_THREAD]; //server-consumer-pipe描述符
//定义循环队列缓冲区
typedef struct Queue{
int rear;
int front;
int elem[BUFFER_SIZE];
}Queue;
void initQueue(Queue* q)
{
memset(q, 0, sizeof(Queue));
}
int isEmpty(Queue* q)
{
return q->rear == q->front;
}
int isFull(Queue* q)
{
return (q->rear + 1) % BUFFER_SIZE == q->front;
}
int push(Queue* q, int data)
{
if(isFull(q))
return 0;
q->elem[q->rear] = data;
q->rear = (q->rear + 1) % BUFFER_SIZE;
return 1;
}
int pop(Queue* q, int* data)
{
if(isEmpty(q))
return 0;
*data = q->elem[q->front];
q->front = (q->front + 1) % BUFFER_SIZE;
return 1;
}
//创建消费者任务,其消费者通过id区分
void* consumer(void *arg)
{
int id = *((int*)arg);
char buf[DATA_SIZE] = {0};
while(1){
memset(buf, 0, sizeof(buf));
read(fd[id], buf, sizeof(buf));
sleep(rand() % 3 + 1);
int data;
sscanf(buf, "%d", &data);
printf("id:%d data:%d\n", id, data);
}
return nullptr;
}
int epollserver()
{
//初始化环形队列
Queue buffer;
initQueue(&buffer);
//创建并打开server-consumer-pipe
for(int i = 0; i < N_THREAD; ++i){
char path[128] = {0};
sprintf(path, "%s%d", SERVER_CONSUMER, i);
mkfifo(path, 0666);
fd[i] = open(path, O_RDWR);
}
//打开client-server-pipe(由client创建)
int cs = open(CLIENT_SERVER, O_RDONLY);
//创建N个消费者子线程
pthread_t tid[N_THREAD];
int id[N_THREAD]; //线程标识
for(int i = 0; i < N_THREAD; ++i){
id[i] = i;
pthread_create(&tid[i], NULL, consumer, id + i);
}
//创建epoll实例
int epfd = epoll_create(N_THREAD + 1);
struct epoll_event event[N_THREAD + 1];
for(int i = 0; i < N_THREAD; ++i){
event[i].data.fd = fd[i];
event[i].events = EPOLLOUT; //监听写事件
epoll_ctl(epfd, EPOLL_CTL_ADD, fd[i], event + i);
}
event[N_THREAD].data.fd = cs;
event[N_THREAD].events = EPOLLIN; //监听读事件
epoll_ctl(epfd, EPOLL_CTL_ADD, cs, event + N_THREAD);
//监听epoll,等待事件可读可写的事件返回
struct epoll_event wait_event[N_THREAD + 1];
while(1){
int n = epoll_wait(epfd, wait_event, N_THREAD + 1, -1);
char buf[DATA_SIZE] = {0};
for(int i = 0; i < n; ++i){
if(wait_event[i].data.fd == cs){
memset(buf, 0, sizeof(buf));
read(cs, buf, sizeof(buf));
if(!isFull(&buffer)){
int data;
sscanf(buf, "%d", &data);
push(&buffer, data);
}
}
else{
if(!isEmpty(&buffer)){
int data;
pop(&buffer, &data);
memset(buf, 0, sizeof(buf));
sprintf(buf, "%d", data);
write(wait_event[i].data.fd, buf, sizeof(buf));
}
}
}
}
//等待线程退出
for(int i = 0; i < N_THREAD; ++i){
pthread_join(tid[i], NULL);
}
//关闭文件句柄
for(int i = 0; i < N_THREAD; ++i){
close(fd[i]);
}
return 0;
}
客户端例子:
/**********************client***********************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <sys/syscall.h>
#define PRODUCER_CLIENT "./producer_client"
#define CLIENT_SERVER "./client_server"
#define N_THREAD 3 //线程数
#define DATA_SIZE 5 //数据大小
#define BUFFER_SIZE 20//缓冲区大小
int cp[N_THREAD]; //client-producer-pipe描述符
//定义缓冲区
typedef struct Queue{
int rear;
int front;
int elem[BUFFER_SIZE];
}Queue;
void* producer(void *arg)
{
int id = *((int*)arg);
char buf[DATA_SIZE] = {0};
while(1){
sleep(rand() % 3 + 1);
int data = rand() % 1000;
sprintf(buf, "%d", data);
write(cp[id], buf, sizeof(buf));
printf("id:%d data:%d\n", id, data);
memset(buf, 0, sizeof(buf));
}
}
int isEmpty(Queue* q)
{
return q->rear == q->front;
}
int isFull(Queue* q)
{
return (q->rear + 1) % BUFFER_SIZE == q->front;
}
void initQueue(Queue* q)
{
memset(q, 0, sizeof(Queue));
}
int push(Queue* q, int data)
{
if(isFull(q))
return 0;
q->elem[q->rear] = data;
q->rear = (q->rear + 1) % BUFFER_SIZE;
return 1;
}
int pop(Queue* q, int* data)
{
if(isEmpty(q))
return 0;
*data = q->elem[q->front];
q->front = (q->front + 1) % BUFFER_SIZE;
return 1;
}
int epoolclient(int argc, char *argv[])
{
Queue buffer;
initQueue(&buffer);
//创建并打开client-producer-pipe
for(int i = 0; i < N_THREAD; ++i){
char path[128] = {0};
sprintf(path, "%s%d", PRODUCER_CLIENT, i);
mkfifo(path, 0666);
cp[i] = open(path, O_RDWR);
}
//创建并打开client-server-pipe
mkfifo(CLIENT_SERVER, 0666);
int cs = open(CLIENT_SERVER, O_WRONLY);
//创建生产者子线程
pthread_t tid[N_THREAD];
int id[N_THREAD]; //线程标识
for(int i = 0; i < N_THREAD; ++i){
id[i] = i;
pthread_create(&tid[i], NULL, producer, id + i);
}
//创建epoll
int epfd = epoll_create(N_THREAD + 1);
struct epoll_event event[N_THREAD + 1];
for(int i = 0; i < N_THREAD; ++i){
event[i].data.fd = cp[i];
event[i].events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, cp[i], event + i);
}
event[N_THREAD].data.fd = cs;
event[N_THREAD].events = EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_ADD, cs, event + N_THREAD);
//监听epoll
struct epoll_event wait_event[N_THREAD + 1];
char buf[DATA_SIZE] = {0};
while(1){
int n = epoll_wait(epfd, wait_event, N_THREAD + 1, -1);
for(int i = 0; i < n; ++i){
if(wait_event[i].data.fd == cs){
if(!isEmpty(&buffer)){
int data;
pop(&buffer, &data);
memset(buf, 0, sizeof(buf));
sprintf(buf, "%d", data);
write(cs, buf, sizeof(buf));
}
}
else{
memset(buf, 0, sizeof(buf));
read(wait_event[i].data.fd, buf, sizeof(buf));
if(!isFull(&buffer)){
int data;
sscanf(buf, "%d", &data);
push(&buffer, data);
}
}
}
}
for(int i = 0; i < N_THREAD; ++i){
pthread_join(tid[i], NULL);
}
for(int i = 0; i < N_THREAD; ++i){
close(cp[i]);
}
close(cs);
return 0;
}
epoll与设计模式的关系
待补充
参考链接
epoll详解文章来源:https://www.toymoban.com/news/detail-728154.html
不同的IO多路复用具体实现文章来源地址https://www.toymoban.com/news/detail-728154.html
到了这里,关于Linux之epoll理解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!