前面我们学习的非阻塞IO,虽然能够在数据不就绪的时候处理其他事情,但是还是有一些不方便,而且每次都要为了一个文件描述符而进行等待,所以为了提高IO效率我们还要学习IO多路转接技术。
一、I/O多路转接之select
1、select函数
select
是系统提供的一个多路转接接口。
函数原型:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
功能:
-
select
系统调用可以让我们的程序同时监视多个文件描述符的上的事件是否就绪。 -
select
的核心工作就是等,当监视的多个文件描述符中有一个或多个事件就绪时,select
才会成功返回并将对应文件描述符的就绪事件告知给调用者。
参数说明:
-
nfds
:需要监视的文件描述符中,最大的文件描述符的值+1,例如一个进程打开了0, 1, 2 ,3
四个文件描述符,我们想要对这四个文件描述符都进行监控,我们就需要填写4
。 -
readfds
:输入输出型参数,其类型fd_set
是一个位图结构。- 调用时用户通过设置对应的比特位为1,告知内核需要监视哪些文件描述符的读事件是否就绪,
- 返回时内核通过设置对应的比特位为1,告知用户哪些文件描述符的读事件已经就绪。
-
writefds
:输入输出型参数,其类型fd_set
是一个位图结构。- 调用时用户通过设置对应的比特位为1,告知内核需要监视哪些文件描述符的写事件是否就绪。
- 返回时内核通过设置对应的比特位为1,告知用户哪些文件描述符的写事件已经就绪。
-
exceptfds
:输入输出型参数,其类型fd_set
是一个位图结构。- 调用时用户通过设置对应的比特位为1,告知内核需要监视哪些文件描述符的异常事件是否就绪。
- 返回时内核通过设置对应的比特位为1,告知用户哪些文件描述符的异常事件已经就绪。
-
timeout
:输入输出型参数- 调用时由用户设置
select
函数在其内部的等待数据的时间。 - 返回时表示
timeout
的剩余时间。
- 调用时由用户设置
参数timeout
的值:
-
NULL/nullptr
:select
调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪才返回。 -
0
:selec
调用后进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,select
检测后都会立即返回。 -
特定的时间值:
select
调用后在指定的时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则在该时间后select
进行超时返回。
timeval
结构体的定义:
返回值说明:
- 如果函数调用成功,则返回有事件就绪的文件描述符个数。
- 如果
timeout
时间耗尽,则返回0。 - 如果函数调用失败,则返回
-1
,同时错误码会被设置。
select
调用失败时,错误码可能被设置为:
-
EBADF
:文件描述符为无效的或该文件已关闭。 -
EINTR
:此调用被信号所中断。 -
EINVAL
:参数不合法,例如nfds为负值。 -
ENOMEM
:核心内存不足。
2、fd_set的相关内容
fd_set
本质也是一个位图,用位图中对应的位来表示要监视的文件描述符fd_set
结构如下:
操作fd_set的相关接口
我们要调用select
函数之前就需要先用fd_set
结构定义出对应的文件描述符集,然后将需要监视的文件描述符添加到文件描述符集当中,这个添加的过程本质就是在进行位操作,但是这个位操作不需要用户进行,系统提供了一组专门的宏接口,用于对fd_set
类型的位图进行各种操作。
void FD_CLR(int fd, fd_set *set); //用来清除文件描述符集set中相关fd的位
int FD_ISSET(int fd, fd_set *set); //用来测试文件描述符集set中相关fd的位是否为真
void FD_SET(int fd, fd_set *set); //用来设置文件描述符集set中相关fd的位
void FD_ZERO(fd_set *set); //用来清除文件描述符集set的全部位
fd_set的大小
我们可以使用下面的代码进行测试fd_set
的大小:
#include <iostream>
#include <sys/select.h>
int main()
{
std::cout << sizeof(fd_set) << std::endl;
return 0;
}
运行结果:
这个位图的上限是
128
∗
8
=
1024
128 * 8 = 1024
128∗8=1024,也就是我们的select
系统调用最多同时等待1024
个文件描述符。
而在Linux下我们可以使用ulimit -a
命令查看我们一个进程最多打开的文件描述符的个数:
显然select
并不能够对一个进程的所有文件描述符都进行等待。
3、如何在代码中高效的使用select函数
在了解如何在代码中高效的使用select
函数之前,我们先来看一看select
的基本使用过程。
- 创建
fd_set
set变量,并使用FD_ZERO()
函数进行初始化,则set用bit位表示是0000,0000。 - 若fd= 5,执行
FD_SET(fd,&set)
后set变为0001,0000
(第5位置为1)。 - 若再加入fd= 2, fd=1,则set变为
0001,0011
。 - 执行
select(6,&set,nullptr,nullptr,nullptr)
阻塞等待。 - 若fd=1,fd=2上都发生可读事件,则
select
返回,此时set变为0000,0011
。注意:没有事件发生的fd=5被清空了。
所以我们会发现select
每次调用都需要我们重新设置要关心的文件描述符,因此select
服务器,使用的时候,需要程序员自己维护一个第三方数组,来进行对已经获得的sock
进行管理!
select使用的基本工作流程
如果我们要实现一个简单的select服务器,该服务器要做的就是读取客户端发来的数据进行回发并打印,那么这个select
服务器的工作流程应该是这样的:
-
先初始化服务器,完成套接字的创建、绑定和监听。
-
定义一个fd_array数组用于保存监听套接字和已经与客户端建立连接的套接字。
-
然后服务器开始循环调用select函数,检测读事件是否就绪,如果就绪则执行对应的操作。
-
每次调用select函数之前,都需要定义一个读文件描述符集readfds,并将fd_array当中的文件描述符依次设置进readfds当中,表示让select帮我们监视这些文件描述符的读事件是否就绪。
-
当select检测到数据就绪时会将读事件就绪的文件描述符设置进readfds当中,此时我们就能够得知哪些文件描述符的读事件就绪了,并对这些文件描述符进行对应的操作。
-
如果读事件就绪的是监听套接字,则调用
accept
函数从底层全连接队列获取已经建立好的连接,并将该连接对应的套接字继续添加到fd_array
数组当中。 -
如果读事件就绪的是与客户端建立连接的套接字,则调用read函数读取客户端发来的数据并进行打印输出。
-
当然,服务器与客户端建立连接的套接字读事件就绪,也可能是因为客户端将连接关闭了,此时服务器应该调用
close
关闭该套接字,并将该套接字从fd_array数组当中清除,因为下一次不需要再监视该文件描述符的读事件了。
说明一下:
-
因为传入select函数的readfds、writefds和exceptfds都是输入输出型参数,当select函数返回时这些参数当中的值已经被修改了,因此每次调用select函数时都需要对其进行重新设置,timeout也是类似的道理。
-
因为每次调用select函数之前都需要对readfds进行重新设置,所以需要定义一个fd_array数组保存与客户端已经建立的若干连接和监听套接字,实际fd_array数组当中的文件描述符就是需要让select监视读事件的文件描述符。
-
我们的select服务器只是读取客户端发来的数据,因此只需要让select帮我们监视特定文件描述符的读事件,如果要同时让select帮我们监视特定文件描述符的读事件和写事件,则需要分别定义readfds和writefds,并定义两个数组分别保存需要被监视读事件和写事件的文件描述符,便于每次调用select函数前对readfds和writefds进行重新设置。
-
服务器刚开始运行时,fd_array数组当中只有监听套接字,因此select第一次调用时只需要监视监听套接字的读事件是否就绪,但每次调用accept获取到新连接后,都会将新连接对应的套接字添加到fd_array当中,因此后续select调用时就需要监视监听套接字和若干连接套接字的读事件是否就绪。
-
由于调用select时还需要传入被监视的文件描述符中最大文件描述符值+1,因此每次在遍历fd_array对readfds进行重新设置时,还需要记录最大文件描述符值。
这其中还有很多细节,让我们边写代码边讲解其中的细节。
4、select服务器
首先我们要对socket
的相关接口进行一下封装,以便于我们后续更好的使用,我们封装的Sock
既能够用于服务器,也能够运用于客户端,当然其中也涉及到了其他的文件和函数,err.h
是错误码文件,logMessage
是一个日志打印函数。
// Sock.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <cerrno>
#include <cstdlib>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include "log.hpp"
#include "err.h"
class Sock
{
public:
Sock()
:_sock(-1)
{}
void Socket()
{
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
logMessage(Fatal, "socket fail: %s", strerror(errno));
exit(SOCKET_ERR);
}
// 设置地址复用
int opt = 1;
if (setsockopt(_sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
logMessage(Fatal, "setsocket fail: %s", strerror(errno));
exit(SETSOCK_ERR);
}
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
socklen_t len = sizeof(local);
memset(&local, 0, len);
local.sin_family = AF_INET;
local.sin_addr.s_addr = INADDR_ANY;
local.sin_port = htons(port);
if (bind(_sock, (struct sockaddr*)&local, len) < 0)
{
logMessage(Fatal, "bind fail : %s", strerror(errno));
exit(BIND_ERR);
}
}
void Listen(int backlog = 32)
{
if (listen(_sock, backlog) < 0)
{
logMessage(Fatal, "listen fail : %s", strerror(errno));
exit(LISTEN_ERR);
}
}
int Accept(std::string* client_ip, uint16_t* client_port)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
memset(&client, 0, len);
int sockfd = accept(_sock, (struct sockaddr*)&client, &len);
if (sockfd < 0)
{
logMessage(Warning, "accept fail : %s", strerror(errno));
}
else
{
// 提取客户端的相关信息
char buf[len];
inet_ntop(AF_INET, &client.sin_addr, buf, len);
*client_ip = buf;
*client_port = ntohs(client.sin_port);
}
// 不保证是正确的通信套接字,由外部自己判断
return sockfd;
}
int Connect(const std::string& server_ip, uint16_t server_port)
{
struct sockaddr_in server;
socklen_t len = sizeof(server);
memset(&server, 0, len);
server.sin_family = AF_INET;
inet_pton(AF_INET, server_ip.c_str(), &server.sin_addr);
server.sin_port = htons(server_port);
// 不保证是正确的通信套接字,由外部自己判断
return connect(_sock, (struct sockaddr*)&server, len);
}
int Fd()
{
return _sock;
}
void Close()
{
if (_sock >= 0)
{
close(_sock);
_sock = -1;
}
}
~Sock()
{
Close();
}
private:
int _sock; // 套接字,对于服务端来说是监听套接字,对于客户端来说是通信套接字
};
// log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
// 日志等级
enum { Debuge = 0, Info, Warning, Error, Fatal, Unkonw };
static std::string toLevelString(int level)
{
switch (level)
{
case Debuge:
return "Debuge";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unkonw";
}
}
static std::string getTime()
{
char buf[128];
time_t timep = time(nullptr);
struct tm stdtm;
localtime_r(&timep, &stdtm);
snprintf(buf, sizeof(buf), "%d-%d-%d %d:%d:%d", stdtm.tm_year + 1900, stdtm.tm_mon + 1, stdtm.tm_mday,
stdtm.tm_hour, stdtm.tm_min, stdtm.tm_sec);
return buf;
}
// 日志打印函数
// 日志格式: [等级] [时间] [进程id] :消息体
void logMessage(int level, const char* format, ...)
{
// 1.形成左边的固定格式
char logLeft[1024];
char logRight[1024];
std::string logLevel = toLevelString(level);
std::string curTime = getTime();
snprintf(logLeft, sizeof(logLeft), "[%s] [%s] [%d] : ", logLevel.c_str(), curTime.c_str(), getpid());
// 2.形成右边的消息体格式
va_list ap;
va_start(ap, format);
vsnprintf(logRight, sizeof(logRight), format, ap);
va_end(ap);
// 3.进行拼接,形成完整的日志 (此处可以根据需要重定向到文件中)
printf("%s%s\n", logLeft, logRight);
}
// err.h
#pragma once
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
SETSOCK_ERR,
BIND_ERR,
LISTEN_ERR,
ACCEPT_ERR,
CONNECT_ERR
};
初始化
- 首先对于服务器,我们需要先进行创建,绑定,监听。
- 然后需要先把数组中所有的位置初始化为无效,并将监听套接字添加到该数组当中,fd_array数组当中保存的就是需要被select监视读事件是否就绪的文件描述符。
#pragma once
#include <iostream>
#include <string>
#include <sys/select.h>
#include <cerrno>
#include <cstring>
#include "log.hpp"
#include "Sock.hpp"
// 设置默认端口号
static const uint16_t default_port = 8080;
// 设置默认无效的文件描述符
static const int default_fd = -1;
class SelectServer
{
using type_t = int;
public:
SelectServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 1.先把数组中所有的位置初始化为无效
for (size_t i = 0; i < _N; i++)
{
_fd_array[i] = default_fd;
}
// 2.把_listen_fd监听套接字设置进_fd_array
_fd_array[0] = _listen_fd.Fd();
}
~SelectServer()
{
_listen_fd.Close();
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
static const int _N = sizeof(fd_set) * 8; // fd_array内元素的个数
type_t _fd_array[_N]; // 文件描述符管理数组
};
运行服务器
-
select服务器运行起来以后就会就不断调用select函数监视读事件是否就绪,每次调用select函数之前都需要重新设置readfds,所以我们应该在每一次调用之前都要先将readfds,和nfds先设置好。
-
具体设置过程就是遍历fd_array数组,将fd_array数组当中的合法文件描述符添加到readfds当中,并同时记录最大的文件描述符值nfds,因为后续调用select函数时需要将nfds作为第一个参数传入。
-
当select函数返回后,如果返回值为0,则说明timeout时间耗尽,此时直接准备进行下一次select调用即可。
-
如果select的返回值为-1,则说明select调用失败,此时也让服务器准备进行下一次select调用,但实际应该进一步判断错误码,根据错误码来判断是否应该继续调用select函数。
-
如果select的返回值大于0,则说明select函数调用成功,此时已经有文件描述符的读事件就绪,接下来就应该对就绪事件进行处理。
class SelectServer
{
using type_t = int;
public:
SelectServer(uint16_t port = default_port)
:_port(port)
{
// ...
}
void Start()
{
while (true)
{
fd_set readfds;
FD_ZERO(&readfds);
// 令nfds等于第一个文件描述符
int nfds = _fd_array[0];
for (size_t i = 0; i < _N; i++)
{
// 设置有效的文件描述符到readfds中
if (_fd_array[i] != default_fd)
{
FD_SET(_fd_array[i], &readfds);
// 寻找最大的nfds
nfds = nfds > _fd_array[i] ? nfds : _fd_array[i];
}
}
// 实际的nfds要 + 1
nfds += 1;
// struct timeval timeout = { 2, 0 }; 为了方便演示,这里我们采用阻塞调用select
int n = select(nfds, &readfds, nullptr, nullptr, nullptr);
switch (n)
{
case -1:
// select出错
logMessage(Warning, "select fail : %s, errno code :%d", strerror(errno), errno);
break;
case 0:
// 没有读事件就绪
logMessage(Debuge, "select timeout : %s, errno code :%d", strerror(errno), errno);
break;
default:
logMessage(Debuge, "有%d个事件发生了!", n);
// 处理就绪的事件
HandleEvent(readfds);
break;
}
}
}
void HandleEvent(fd_set& readfds)
{
// 处理事件...
}
~SelectServer()
{
_listen_fd.Close();
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
static const int _N = sizeof(fd_set) * 8; // fd_array内元素的个数
type_t _fd_array[_N]; // 文件描述符管理数组
};
事件处理
当select检测到有文件描述符的读事件就绪并成功返回后,接下来就应该对就绪事件进行处理了,这里编写一个HandlerEvent函数,当读事件就绪后就调用该函数进行事件处理。
-
在进行事件处理时需要遍历fd_array数组当中的文件描述符,依次判断各个文件描述符对应的读事件是否就绪,如果就绪则需要进行事件处理。
-
当一个文件描述符的读事件就绪后,还需要进一步判断该文件描述符是否是监听套接字,如果是监听套接字的读事件就绪,那么就应该调用accept函数将底层的连接获取上来。但是光光调用accept将连接获取上来还不够,为了下一次调用select函数时能够让select帮我们监视新连接的读事件是否就绪,在连接获取上来后还应该将该连接对应的文件描述符添加到fd_array数组当中,这样在下一次调用select函数前对readfds重新设置时就能将该文件描述符添加进去了。
-
如果是与客户端建立的连接对应的读事件就绪,那么就应该调用recv函数读取客户端发来的数据,如果读取成功则将读到的数据在服务器端进行打印并转发给客户端。如果调用recv函数读取失败或者客户端关闭了连接,那么select服务器也应该调用close函数关闭对应的连接,但此时光光关闭连接也是不够的,还应该将该连接对应的文件描述符从fd_array数组当中清除,否则后续调用的select函数还会帮我们监视该连接的读事件是否就绪,但实际已经不需要了。
class SelectServer
{
using type_t = int;
public:
SelectServer(uint16_t port = default_port)
:_port(port)
{
// ...
}
void Start()
{
// ...
}
void HandleEvent(fd_set& readfds)
{
// 处理就绪事件
for (size_t i = 0; i < _N; i++)
{
if ((_fd_array[i] == _listen_fd.Fd()) && (FD_ISSET(_fd_array[i], &readfds)))
{
// 处理连接事件
Accepter();
}
else if(_fd_array[i] != _listen_fd.Fd() && (FD_ISSET(_fd_array[i], &readfds)))
{
// 处理读事件
ServiceIO(_fd_array[i]);
}
// 可以在HandleEvent里面添加 writefds参数,在这里继续判断处理写事件
// ...
}
// Print是打印_fd_array[]里面的内容,为了我们调试我们的代码。
Print();
}
void Accepter()
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listen_fd.Accept(&client_ip, &client_port);
if (sockfd < 0) return;
// 将新的sockfd设置进_fd_array里面
size_t i = 1;
for (; i < _N; i++)
{
if (_fd_array[i] == default_fd)
{
break;
}
}
if (i >= _N)
{
//_fd_array满了
logMessage(Warning, "_fd_array is full!");
close(sockfd);
}
else
{
_fd_array[i] = sockfd;
}
}
void ServiceIO(int fd)
{
char buf[1024];
ssize_t n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0)
{
if (errno == EINTR)
{
logMessage(Info, "ServiceIO's recv was interrupted by the signal ...");
return;
}
else if (n == 0)
{
logMessage(Info, "ServiceIO's recv reached the end of file, fd %d -> fd %d", fd, default_fd);
}
else
{
logMessage(Warning, "ServiceIO's recv fail: %s , errno code :%d", strerror(errno), errno);
}
// 关闭fd
close(fd);
// 将fd从_fd_array[]中移除
for (size_t i = 0; i < _N; i++)
{
if (_fd_array[i] == fd)
{
_fd_array[i] = default_fd;
break;
}
}
}
else
{
// 处理读到\r\n
buf[n - 2] = 0;
buf[n - 1] = 0;
std::cout << "client# " << buf << std::endl;
std::string respond = buf;
respond += "[Server echo]\n";
// 这里也要进行判断写事件是否就绪,我们先忽略
send(fd, respond.c_str(), respond.size(), 0);
}
}
void Print()
{
// 打印_fd_array[]
std::cout << "_fd_array[] : ";
for (int i = 0; i < _N; i++)
{
if (_fd_array[i] != default_fd)
{
std::cout << _fd_array[i] << " ";
}
}
std::cout << std::endl;
}
~SelectServer()
{
_listen_fd.Close();
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
static const int _N = sizeof(fd_set) * 8; // fd_array内元素的个数
type_t _fd_array[_N]; // 文件描述符管理数组
};
当前我们编写的select
服务器实际还存在一些问题,我们暂时先进行忽略(后面的poll
,epoll
也存在相同的问题):
-
服务器不能直接调用
send
函数,因为我们直接调用send函数时实际也分为“等”和“拷贝”两步,我们的发送缓冲区可能并没有足够的空间供我们进行写入,所以我们也应该将“等”的这个过程交给select函数,因此在每次调用select函数之前,除了需要重新设置readfds还需要重新设置writefds,因此我们还需要一个数组来保存需要被监视写事件是否就绪的文件描述符,当某一文件描述符的写事件就绪时我们才能够调用send函数向客户端发送数据。 -
没有定制协议。代码中读取数据时并没有按照某种规则进行读取,此时就可能造成粘包或者数据读取不全的问题,根本原因就是因为我们没有定制协议,比如HTTP协议规定在读取底层数据时读取到空行就表明读完了一个HTTP报头,此时再根据HTTP报头当中的Content-Length属性得知正文的长度,最终就能够读取到一个完整的HTTP报文,HTTP协议通过这种方式就避免了粘包和数据读取不全的问题。
select服务器测试
运行select
服务器时需要先实例化出一个SelectServer对象,然后让select服务器直接调用进行Start
函数后就可以运行服务器了。
#include <iostream>
#include <memory>
#include "SelectServer.hpp"
int main()
{
std::unique_ptr<SelectServer> up(new SelectServer);
up->Start();
return 0;
}
至此简单的select服务器代码已经编写完毕,我们先使用telnet工具进行测试一下,telnet工具连接我们的服务器以后,此时通过telnet向服务器发送的数据就能够被服务器读到并且打印输出了。
5、select的优缺点
select的优点:
- 可以同时等待多个文件描述符,并且只负责等待,实际的IO操作由accept、recv、send等接口来完成,这些接口在进行IO操作时不会被阻塞。
- select同时等待多个文件描述符,因此可以将“等”的时间重叠,提高了IO的效率。
当然,这也是所有多路转接接口的优点。
select的缺点:
- 每次调用select,都需要手动设置fd集合,从接口使用角度来说也非常不便。
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会变大。
- 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
- select可监控的文件描述符数量有限。
6、select的适用场景
多路转接接口select
、poll
和epoll
,需要在一定的场景下使用,如果场景选择的不适宜,可能会适得其反。
-
多路转接接口一般适用于多连接,且多连接中只有少部分连接比较活跃。因为少量连接比较活跃,也就意味着几乎所有的连接在进行IO操作时,都需要花费大量时间来等待事件就绪,此时使用多路转接接口就可以将这些等的事件进行重叠,提高IO效率。
-
对于多连接中大部分连接都很活跃的场景,其实并不适合使用多路转接。因为每个连接都很活跃,也就意味着任何时刻每个连接上的事件基本都是就绪的,此时根本不需要动用多路转接接口来帮我们进行等待,毕竟使用多路转接接口也是需要花费系统的时间和空间资源的。
多连接中只有少量连接是比较活跃的,比如聊天工具QQ,我们登录QQ后大部分时间其实是没有聊天的,此时服务器端不可能每一个用户都派发一个线程调用一个read函数阻塞等待读事件就绪。
多连接中大部分连接都很活跃,比如企业当中进行数据备份时,两台服务器之间不断在交互数据,这时的连接是特别活跃的,几乎不需要等的过程,也就没必要使用多路转接接口了。
二、I/O多路转接之poll
poll也是系统提供的一个多路转接接口。
poll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,和select的定位是一样的,适用场景也是一样的。
1、poll函数
函数原型:
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数说明:
-
fds
:一个poll函数监视的结构列表,每一个元素包含三部分内容:文件描述符、监视的事件集合、就绪的事件集合。 -
nfds
:表示fds数组的长度。 -
timeout
:表示poll函数的超时时间,单位是毫秒(ms)。
参数timeout
的取值:
- -1:poll调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪。
- 0:poll调用后进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,poll检测后都会立即返回。
- 特定的时间值:poll调用后在指定的时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则在该时间后poll进行超时返回。
返回值说明:
- 如果函数调用成功,则返回有事件就绪的文件描述符个数。
- 如果timeout时间耗尽,则返回0。
- 如果函数调用失败,则返回-1,同时错误码会被设置。
poll调用失败时,错误码可能被设置为:
-
EFAULT
:fds数组不包含在调用程序的地址空间中。 -
EINTR
:此调用被信号所中断。 -
EINVAL
:nfds值超过RLIMIT_NOFILE
值。 -
ENOMEM
:核心内存不足。
2、struct pollfd结构
struct pollfd结构当中包含三个成员:
struct pollfd
{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
-
fd
:特定的文件描述符,若设置为负值则忽略events
字段并且revents
字段返回0。 -
events
:需要监视该文件描述符上的哪些事件,由用户设置。 -
revents
:poll函数返回时告知用户该文件描述符上的哪些事件已经就绪,由内核进行设置。
events
和revents
的取值:
事件 | 描述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
这些取值实际都是以宏的方式进行定义的,它们的二进制序列当中有且只有一个比特位是1,且为1的比特位是各不相同的,我们实际使用时最常用的就是POLLIN
和POLLOUT
。
- 因此在调用poll函数之前,可以通过或(
|
)运算符将要监视的事件添加到events
成员当中。 - 在poll函数返回后,可以通过与(
&
)运算符检测revents
成员中是否包含特定事件,以得知对应文件描述符的特定事件是否就绪。
2、poll服务器
poll的工作流程和select是基本类似的,这里我们也实现一个简单poll服务器,该服务器也只是读取客户端发来的数据打印并进行转发。
初始化
-
首先对于服务器,我们需要先向操作系统申请一块内存用于定义一个
fds
数组,该数组当中的每个位置都是一个struct pollfd
结构,后续调用poll函数时会作为参数进行传入,然后进行创建,绑定,监听套接字。 -
最后需要把fds数组中所有的位置初始化为无效,并将监听套接字添加到该数组当中,表示服务器刚开始运行时只需要监视监听套接字的读事件。
#pragma once
#include <iostream>
#include <string>
#include <poll.h>
#include <cerrno>
#include <cstring>
#include "log.hpp"
#include "Sock.hpp"
// 设置默认端口号
static const uint16_t default_port = 8080;
// 设置默认无效的文件描述符
static const int default_fd = -1;
// 设置默认的无效事件
static const short default_event = 0;
class PollServer
{
public:
PollServer(uint16_t port = default_port, size_t num = 1024)
:_port(port), _num(num), _fd_array(new struct pollfd[_num])
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 1.先把数组中所有的位置初始化为无效
for (size_t i = 0; i < _num; i++)
{
_fd_array[i].fd = default_fd;
_fd_array[i].events = default_event;
}
// 2.把_listen_fd监听套接字设置进_fd_array,并设置关心事件为读事件
_fd_array[0].fd = _listen_fd.Fd();
_fd_array[0].events = POLLIN;
}
~PollServer()
{
_listen_fd.Close();
if (_fd_array)
{
delete[] _fd_array;
_fd_array = nullptr;
}
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
size_t _num; // _fd_array的长度
struct pollfd* _fd_array; // 指向了管理fd的数组
};
运行服务器
- poll服务器就不断调用poll函数监视读事件是否就绪。如果poll函数的返回值大于0,则说明poll函数调用成功,此时已经有文件描述符的读事件就绪,接下来就应该对就绪事件进行处理。
- 如果poll函数的返回值等于0,则说明timeout时间耗尽,此时直接准备进行下一次poll调用即可。
- 如果poll函数的返回值为-1,则说明poll调用失败,此时也让服务器准备进行下一次poll调用,但实际应该进一步判断错误码,根据错误码来判断是否应该继续调用poll函数。
可以看出这里我们使用poll时要比使用select时要简单的多!
class PollServer
{
public:
PollServer(uint16_t port = default_port, size_t num = 1024)
:_port(port), _num(num), _fd_array(new struct pollfd[_num])
{
// ....
}
void Start()
{
while (true)
{
// int timeout = 2000; 为了方便演示这里采用了阻塞调用
int n = poll(_fd_array, _num, -1);
switch (n)
{
case -1:
// poll出错
logMessage(Warning, "poll fail : %s, errno code :%d", strerror(errno), errno);
break;
case 0:
// 没有读事件就绪
logMessage(Debuge, "poll timeout : %s, errno code :%d", strerror(errno), errno);
break;
default:
logMessage(Debuge, "有%d个事件发生了!", n);
HandleEvent();
break;
}
}
}
void HandleEvent()
{
// 处理事件
}
~PollServer()
{
_listen_fd.Close();
if (_fd_array)
{
delete[] _fd_array;
_fd_array = nullptr;
}
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
size_t _num; // _fd_array的长度
struct pollfd* _fd_array; // 指向了管理fd的数组
};
事件处理
当poll检测到有文件描述符的读事件就绪,就会在其对应的struct pollfd
结构中的revents
成员中添加读事件并返回,接下来poll服务器就应该对就绪事件进行处理了,事件处理过程如下:
-
首先遍历fds数组中的每个struct pollfd结构,如果该结构当中的fd有效,且revents当中包含读事件,则说明该文件描述符的读事件就绪,接下来就需要进一步判断该文件描述符是监听套接字还是与客户端建立的套接字。
-
如果是监听套接字的读事件就绪,则调用accept函数将底层建立好的连接获取上来,并将获取到的套接字添加到fds数组当中,表示下一次调用poll函数时需要监视该套接字的读事件,如果数组满了我们可以进行动态扩容。
-
如果是与客户端建立的连接对应的读事件就绪,则调用recv函数读取客户端发来的数据,并将读取到的数据在服务器端进行打印并转发。
-
如果在调用recv函数时发现客户端将连接关闭或recv函数调用失败,则poll服务器也直接关闭对应的连接,并将该连接对应的文件描述符从fds数组当中清除,表示下一次调用poll函数时无需再监视该套接字的读事件。
class PollServer
{
public:
PollServer(uint16_t port = default_port, size_t num = 1024)
:_port(port), _num(num), _fd_array(new struct pollfd[_num])
{
// ...
}
void Start()
{
// ...
}
void HandleEvent()
{
// 处理就绪事件
for (size_t i = 0; i < _num; i++)
{
if (_fd_array[i].fd == default_fd)
{
continue;
}
// 处理读取事件
if (_fd_array[i].revents & POLLIN)
{
// 处理连接事件
if (_fd_array[i].fd == _listen_fd.Fd())
{
Accepter();
}
else
{
// 处理读事件
Recver(_fd_array[i].fd);
}
}
else if (_fd_array[i].revents & POLLOUT)
{
// 处理写事件
}
}
// Print是打印_fd_array[]里面的fd,为了我们调试我们的代码。
Print();
}
void Accepter()
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listen_fd.Accept(&client_ip, &client_port);
if (sockfd < 0) return;
// 将新的sockfd设置进_fd_array里面
size_t i = 1;
for (; i < _num; i++)
{
if (_fd_array[i].fd == default_fd)
{
break;
}
}
if (i >= _num)
{
//_fd_array满了
size_t new_num = _num * 2;
logMessage(Info, "_fd_array is full!, _num:%d -> _num:%d", _num, new_num);
// 进行扩容
struct pollfd* pollfd_array = new struct pollfd[new_num];
for (size_t i = 0; i < new_num; i++)
{
pollfd_array[i].fd = default_fd;
pollfd_array[i].events = default_event;
}
memcpy(pollfd_array, _fd_array, sizeof(struct pollfd) * _num);
delete[] _fd_array;
_fd_array = pollfd_array;
_num = new_num;
}
_fd_array[i].fd = sockfd;
_fd_array[i].events = POLLIN;
//_fd_array[i].events = POLLIN | POLLOUT;
}
void Recver(int fd)
{
char buf[1024];
ssize_t n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0)
{
if (errno == EINTR)
{
logMessage(Info, "Recver's recv was interrupted by the signal ...");
return;
}
else if (n == 0)
{
logMessage(Info, "Recver's recv reached the end of file, fd %d -> fd %d", fd, default_fd);
}
else
{
logMessage(Warning, "Recver's recv fail: %s , errno code :%d", strerror(errno), errno);
}
// 关闭fd
close(fd);
// 将fd从_fd_array[]中移除
for (size_t i = 0; i < _num; i++)
{
if (_fd_array[i].fd == fd)
{
_fd_array[i].fd = default_fd;
_fd_array[i].events = default_event;
break;
}
}
}
else
{
// 处理读到的\r\n
buf[n - 2] = 0;
buf[n - 1] = 0;
std::cout << "client# " << buf << std::endl;
std::string respond = buf;
respond += "[Server echo]\n";
// 这里也要进行判断写事件是否就绪
send(fd, respond.c_str(), respond.size(), 0);
}
}
void Print()
{
// 打印_fd_array[]
std::cout << "_fd_array[] : ";
for (int i = 0; i < _num; i++)
{
if (_fd_array[i].fd != default_fd)
{
std::cout << _fd_array[i].fd << " ";
}
}
std::cout << std::endl;
}
~PollServer()
{
_listen_fd.Close();
if (_fd_array)
{
delete[] _fd_array;
_fd_array = nullptr;
}
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
size_t _num; // _fd_array的长度
struct pollfd* _fd_array; // 指向了管理fd的数组
};
poll服务器测试
运行poll
服务器时需要先实例化出一个PollServer对象,然后让PollServer服务器直接调用进行Start
函数后就可以运行服务器了。
#include <iostream>
#include <memory>
#include "PollServer.hpp"
int main()
{
std::unique_ptr<PollServer> up(new PollServer);
up->Start();
return 0;
}
和select一样,我们也使用telnet工具进行测试一下,测试的流程一样
3、poll的优缺点
poll的优点:
-
struct pollfd
结构当中包含了events
和revents
,相当于将select
的输入输出型参数进行分离,因此在每次调用poll
之前,不需要像select
一样重新对参数进行设置。 - poll可监控的文件描述符数量没有限制。
- 当然,poll也可以同时等待多个文件描述符,能够提高IO的效率。
poll的缺点:
- 和select函数一样,当poll返回后,需要遍历fds数组来获取就绪的文件描述符。
- 每次调用poll,都需要把大量的
struct pollfd
结构从用户态拷贝到内核态,这个开销也会随着poll监视的文件描述符数目的增多而增大。 - 同时每次调用poll都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也变大。
三、I/O多路转接之epoll
epoll初识,epoll也是系统提供的一个多路转接接口。
- epoll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,与select和poll的定位是一样的,适用场景也相同。
- epoll在命名上比poll多了一个e,这个e可以理解成是extend,是为处理大批量句柄而作了改进的poll。
- epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
1、epoll的相关系统调用
epoll有三个相关的系统调用,分别是epoll_create
、epoll_ctl
和epoll_wait
。
1、epoll_create函数
int epoll_create(int size);
功能:
-
epoll_create
函数用于创建一个epoll
模型。
参数说明:
-
size
:自从Linux2.6.8之后,size参数是被忽略的,但size的值必须设置为大于0的值。
返回值说明:
-
epoll
模型创建成功返回其对应的文件描述符,否则返回-1
,同时错误码会被设置。
注意: 当不再使用时,必须调用close函数关闭epoll模型对应的文件描述符,当所有引用epoll实例的文件描述符都已关闭时,内核将销毁该实例并释放相关资源。
2、epoll_ctl函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能:
-
epoll_ctl
函数用于向指定的epoll模型中注册事件。
参数说明:
-
epfd
:指定的epoll模型。 -
op
:表示具体的动作,用三个宏来表示。 -
fd
:需要监视的文件描述符。 -
event
:需要监视该文件描述符上的哪些事件。
第二个参数op的取值有以下三种:
宏名称 | 功能 |
---|---|
EPOLL_CTL_ADD | 注册新的文件描述符到指定的epoll模型中 |
EPOLL_CTL_MOD | 修改已经注册的文件描述符的监听事件 |
EPOLL_CTL_DEL | 从epoll模型中删除指定的文件描述符 |
返回值说明:
- 函数调用成功返回0,调用失败返回-1,同时错误码会被设置。
3、epoll_wait函数
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
功能:
-
epoll_wait
函数用于收集监视的事件中已经就绪的事件。
参数说明:
-
epfd
:指定的epoll模型。 -
events
:内核会将已经就绪的事件拷贝到events数组当中(events不能是空指针,内核只负责将就绪事件拷贝到该数组中,不会帮我们在用户态中分配内存)。 -
maxevents
:events数组中的元素个数。 -
timeout
:表示epoll_wait函数的超时时间,单位是毫秒(ms)。
参数timeout
的取值:
-
-1
:epoll_wait调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪。 -
0
:epoll_wait调用后进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,epoll_wait检测后都会立即返回。 -
特定的时间值:epoll_wait调用后在直到的时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则在该时间后epoll_wait进行超时返回。
返回值说明:
- 如果函数调用成功,则返回有事件就绪的文件描述符个数。
- 如果
timeout
时间耗尽,则返回0。 - 如果函数调用失败,则返回
-1
,同时错误码会被设置。
epoll_wait
调用失败时,错误码可能被设置为:
-
EBADF
:传入的epoll模型对应的文件描述符无效。 -
EFAULT
:events指向的数组空间无法通过写入权限访问。 -
EINTR
:此调用被信号所中断。 -
EINVAL
:epfd不是一个epoll模型对应的文件描述符,或传入的maxevents值小于等于0。
2、struct epoll_event结构
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
结构中有两个成员:
- 第一个成员
events
表示的是需要监视的事件。- 当我们使用
epoll_ctl
函数时,通过这个字段告诉操作系统我们要关心那些事件。 - 当我们使用
epoll_wait
函数时,通过这个字段操作系统告诉我们关心的那些事件就绪了。
- 当我们使用
- 第二个成员
data
是一个联合体结构,里面记录的是我们用户的相关数据,当我们在epoll_ctrl
时进行设置,在epoll_wait
时就能够拿到这个数据,一般选择使用该结构当中的fd,这样在epoll_wait
时我们通过第一个字段能够拿到发生了什么事件,通过第二个字段能够拿到了发生的事件是那个文件描述符的。
events
的常用取值如下,这些取值实际也是以宏的方式进行定义的,它们的二进制序列当中有且只有一个比特位是1,且为1的比特位是各不相同的。
宏名称 | 宏功能 |
---|---|
EPOLLIN | 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) |
EPOLLOUT | 表示对应的文件描述符可以写 |
EPOLLPRI | 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) |
EPOLLERR | 表示对应的文件描述符发送错误 |
EPOLLHUP | 表示对应的文件描述符被挂断,即对端将文件描述符关闭了 |
EPOLLET | 将epoll的工作方式设置为边缘触发(Edge Triggered)模式 |
EPOLLONESHOT | 只监听一次事件,当监听完这次事件之后,如果还需要继续监听该文件描述符的话,需要重新将该文件描述符添加到epoll模型中 |
可以看出epoll
不仅能够监视正常的IO事件,也能够监听对应的文件描述符是否发生了错误!
3、epoll的工作原理
1、三大机制
- 红黑树
- 就绪队列
- 回调机制
当某一进程调用epoll_create
方法时,Linux内核会创建一个eventpoll
结构体,这个结构体又会在内核中创建一颗红黑树和一个就绪队列,这个结构体中有两个数据结构成员与epoll的使用方式密切相关。
struct eventpoll
{
// ...
// 红黑树的根节点,这棵树中存储着所有添加到epoll中的需要监视的事件
struct rb_root rbr;
// 就绪队列中则存放着将要通过epoll_wait返回给用户的满足条件的事件
struct list_head rdlist;
// ...
}
-
每一个epoll模型都有一个独立的
eventpoll
结构体,用于存放通过epoll_ctl
方法向epoll模型中添加进来的事件,这些事件都会被挂载在红黑树中,当我们想要增加一个事件,删除一个事件,修改(本质是查找)一个事件时,我们的时间复杂度为 O ( l o g n ) O (logn) O(logn),相比于线性遍历在数据量很大时,我们的增删改的效率依然很高。 -
所有添加到epoll中的事件都会与设备(例如:网卡)驱动程序建立回调关系,也就是说:当响应的事件发生时(比如数据就绪了),会调用这个回调方法,这个回调方法在内核中叫
ep_poll_callback
,它会将发生的事件添加到rdlist
双链表(也就是我们的就绪队列)中。 -
当调用
epoll_wait
检查是否有事件发生时,只需要检查eventpoll
对象中的rdlist
双链表中是否有元素即可,这个操作的时间复杂度是 O ( 1 ) O(1) O(1),如果rdlist
不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。
扩展知识
- 在epoll中,对于每一个事件,都会建立一个
epitem
结构体,红黑树和就绪队列当中的节点分别是基于epitem
结构中的rbn
成员和rdllink
成员的,epitem
结构当中的成员ffd
记录的是指定的文件描述符值,event
成员记录的就是该文件描述符对应的事件。
struct epitem
{
struct rb_node rbn; //红黑树节点
struct list_head rdllink; //双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
- 对于epitem结构当中rbn成员来说,ffd与event的含义是,需要监视ffd上的event事件是否就绪。
- 对于epitem结构当中的rdlink成员来说,ffd与event的含义是,ffd上的event事件已经就绪了。
2、 一些细节补充
- 红黑树:
- 红黑树是一种二叉搜索树,因此必须有键值key,而这里的文件描述符就天然的可以作为红黑树的key值。
- 调用
epoll_ctl
向红黑树当中新增节点时,如果设置了EPOLLONESHOT
选项,当监听完这次事件后,如果还需要继续监听该文件描述符则需要重新将其添加到epoll
模型中,本质就是当设置了EPOLLONESHOT
选项的事件就绪时,操作系统会自动将其从红黑树当中删除。 - 而如果调用epoll_ctl向红黑树当中新增节点时没有设置
EPOLLONESHOT
,那么该节点插入红黑树后就会一直存在,除非用户调用epoll_ctl
将该节点从红黑树当中删除。
-
回调机制
-
对于
select
和poll
来说,操作系统在监视多个文件描述符上的事件是否就绪时,需要让操作系统主动对这多个文件描述符进行轮询检测,这一定会增加操作系统的负担。 -
而对于
epoll
来说,操作系统不需要主动进行事件的检测,当红黑树中监视的事件就绪时,驱动程序会自动调用对应的回调方法,将红黑树中就绪的节点直接链入就绪队列(不需要进行拷贝,直接使用指针操作将节点插入链表中)。 -
采用回调机制最大的好处就是:不再需要操作系统主动对就绪事件进行检测了,当事件就绪时会自动调用对应的回调函数进行处理。
-
其他的细节
-
当不断有监视的事件就绪时,会不断调用回调方法向就绪队列当中插入节点,而上层也会不断调用
epoll_wait
函数从就绪队列当中获取节点,这是典型的生产者消费者模型。 -
由于就绪队列可能会被多个执行流同时访问,因此必须要使用互斥锁对其进行保护,
eventpoll
结构当中的lock
和mtx
就是用于保护临界资源的,因此epoll
本身是线程安全的。 -
eventpoll
结构当中的wq
(wait queue)就是等待队列,当多个执行流想要同时访问同一个epoll模型时,就需要在该等待队列下进行等待。 -
epoll_create
函数在调用完毕,如果成功了会返回一个文件描述符,这是因为Linux的设计理念是:一切皆文件,Linux将epoll
模型(红黑树,就绪队列,回调机制)也看成一个文件,将来我们的epoll
的相关接口一定是进程在进行调用,因此通过进程的task_struct
(PCB)结构一定能够找到一个文件管理结构file_struct
,在file_struct
结构中有一个数组,通过这个数组的下标(本质就是文件描述符)我们能够找到对应的file
结构体,因此通过这个file
结构体我们也就能够找到对应的红黑树和就绪队列了于是就可以对其进行增删改查了!
4、epoll服务器
初始化
- 首先对于服务器,我们需要先进行创建,绑定,监听套接字,然后使用
epoll_create
创建一个epoll
模型,然后使用epoll_ctl
将监听套接字添加到epoll
模型中。
为了简化我们的代码我们也对epoll
模型的系统调用进行一下简单的封装,其类名为Epoller
并放到另外一个头文件中。
// EpollServer.hpp
#pragma once
#include <iostream>
#include "Epoller.hpp"
#include "Sock.hpp"
#include "log.hpp"
#include "err.h"
//设置默认端口号
static const uint16_t default_port = 8080;
class EpollServer
{
public:
EpollServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 创建的epoll模型
_epoller.Create();
// 添加要关心的事件
_epoller.AddEvent(_listen_fd.Fd(), EPOLLIN);
}
~EpollServer()
{
_listen_fd.Close();
_epoller.Close();
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
Epoller _epoller; // epoll模型
static const size_t _num = 64; // _revs的长度
struct epoll_event _revs[_num]; // epoll_wait就绪事件的缓冲区
};
// Epoller.hpp
#pragma once
#include <iostream>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <sys/epoll.h>
#include <unistd.h>
#include "log.hpp"
#include "err.h"
// 默认无效的epfd
static const int default_epfd = -1;
class Epoller
{
public:
Epoller(int epfd = default_epfd)
:_epfd(epfd)
{}
void Create()
{
_epfd = epoll_create(1024);
if (_epfd < 0)
{
logMessage(Fatal, "epoll_create fail: %s, errno code %d", strerror(errno), errno);
exit(EPOLL_CREATE_ERR);
}
}
// 添加事件
bool AddEvent(int fd, uint32_t event)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
if (n < 0)
{
logMessage(Warning, "AddEvent's epoll_ctl fail: %s, errno code %d", strerror(errno), errno);
return false;
}
return true;
}
// 移除事件
bool DelEvent(int fd)
{
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
if (n < 0)
{
logMessage(Warning, "DelEvent's epoll_ctl fail: %s, errno code %d", strerror(errno), errno);
return false;
}
return true;
}
// 等待事件
int Wait(struct epoll_event* revs, int num, int timeout)
{
return epoll_wait(_epfd, revs, num, timeout);
}
void Close()
{
if (_epfd != default_epfd)
{
close(_epfd);
_epfd = default_epfd;
}
}
~Epoller()
{
Close();
}
private:
int _epfd; // epoll模型的fd
};
运行服务器
运行epoll服务器要做的事情和select
,poll
相似,就是不断调用epoll_wait
函数(对应我们封装的Wait
函数),从就绪队列当中获取就绪事件进行处理即可。
class EpollServer
{
public:
EpollServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
_epoller.Create();
bool is_sucess = _epoller.AddEvent(_listen_fd.Fd(), EPOLLIN);
assert(is_sucess);
}
void Start()
{
// 设置超时事件为2s,为了演示的方便这里我们使用阻塞的方式进行演示
// int timeout = 2000;
while (true)
{
int n = _epoller.Wait(_revs, _num, -1);
switch (n)
{
case -1:
logMessage(Warning, "epoll_wait fail : %s, errno code : %d", strerror(errno), errno);
break;
case 0:
logMessage(Info, "timeout...");
break;
default:
logMessage(Info, "有%d个事件就绪了!", n);
HandleEvent(n);
break;
}
}
}
void HandleEvent(int n)
{
// 处理事件
}
~EpollServer()
{
_listen_fd.Close();
_epoller.Close();
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
Epoller _epoller; // epoll模型
static const size_t _num = 64; // _revs的长度
struct epoll_event _revs[_num]; // epoll_wait就绪事件的缓冲区
};
事件处理
如果底层就绪队列当中有就绪事件,那么调用epoll_wait
函数时就会将底层就绪队列中的事件按照线性拷贝的方式拷贝到用户提供的_revs
数组当中,即_revs
输出数组中拷贝的内容,从左向后连续有效的!接下来epoll服务器就应该对就绪事件进行处理了,事件处理过程如下:
-
根据调用
epoll_wait
时得到的返回值,来判断操作系统向_revs
数组中拷贝了多少个struct epoll_event
结构,进而对这些文件描述符上的事件进行处理。 -
对于每一个拷贝上来的
struct epoll_event
结构,如果该结构当中的events
当中包含读事件,则说明该文件描述符对应的读事件就绪,但接下来还需要进一步判断该文件描述符是监听套接字还是与客户端建立的套接字。 -
如果是监听套接字的读事件就绪,则调用
accept
函数将底层建立好的连接获取上来,并调用AddEvent
函数将获取到的套接字添加到epoll模型当中,表示下一次调用Wait
函数时需要监视该套接字上的事件。 -
如果是与客户端建立的连接对应的读事件就绪,则调用
recv
函数读取客户端发来的数据,并将读取到的数据在服务器端进行打印并转发。 -
如果在调用
recv
函数时发现客户端将连接关闭或recv
函数调用失败,则epoll服务器也直接关闭对应的连接,并调用DelEvent
函数将该连接对应的文件描述符从epoll模型中删除,表示下一次调用Wait
函数时无需再监视该套接字上的事件。
class EpollServer
{
public:
EpollServer(uint16_t port = default_port)
:_port(port)
{
// ...
}
void Start()
{
// ...
}
void HandleEvent(int n)
{
for (int i = 0; i < n; i++)
{
uint32_t event = _revs[i].events;
int fd = _revs[i].data.fd;
// 处理读事件
if (event & EPOLLIN)
{
// 1.新的连接事件到来
if (fd == _listen_fd.Fd())
{
Accepter(fd);
}
else
{
// 2.新的读取事件到来
Recver(fd);
}
}
else if (event & EPOLLOUT)
{
// 处理写事件
}
else
{
// 处理其他事件,例如异常事件
}
}
}
void Accepter(int fd)
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listen_fd.Accept(&client_ip, &client_port);
if (sockfd < 0)
{
logMessage(Warning, "accept fail : %s, errno code: %d", strerror(errno), errno);
return;
}
// 将新的连接添加到epoll模型中
_epoller.AddEvent(sockfd, EPOLLIN);
}
void Recver(int fd)
{
char buf[1024];
int n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0)
{
if (errno == EINTR)
{
logMessage(Info, "Recver's recv was interrupted by the signal ... ");
return;
}
else if (n == 0)
{
logMessage(Info, "Recver's recv reached the end of file, fd %d will be remove", fd);
}
else
{
logMessage(Warning, "Recver's recv fail: %s, errno code %d", strerror(errno), errno);
}
// 在处理异常时要先将fd从epoll模型中移除,然后再关闭文件描述符
_epoller.DelEvent(fd);
close(fd);
}
else
{
// 处理\r\n
buf[n - 2] = 0;
buf[n - 1] = 0;
std::cout << "client: " << buf << std::endl;
std::string response(buf);
response += "[server echo]\n";
// 这里其实也应该先进行检查写事件是否就绪
send(fd, response.c_str(), response.size(), 0);
}
}
~EpollServer()
{
// ...
}
private:
Sock _listen_fd; // 监听套接字
uint16_t _port; // 端口号
Epoller _epoller; // epoll模型
static const size_t _num = 64; // _revs的长度
struct epoll_event _revs[_num]; // epoll_wait就绪事件的缓冲区
};
- 这里有一个注意点:在处理异常时要先将fd从epoll模型中移除,然后再关闭文件描述符,不然就有可能fd从epoll模型中移除时访问已经释放的资源。
epoll服务器测试
运行epoll服务器时需要先实例化出一个EpollServer对象,然后让epoll服务器直接调用进行Start
函数后就可以运行服务器了。
#include <iostream>
#include <memory>
#include "EpollServer.hpp"
int main()
{
std::unique_ptr<EpollServer> up(new EpollServer);
up->Start();
return 0;
}
5、epoll的优点
-
接口使用方便:虽然拆分成了三个函数,但是反而使用起来更方便高效。
-
数据拷贝轻量:只在新增监视事件的时候调用
epoll_ctl
将数据从用户拷贝到内核,而select
和poll
每次都需要重新将需要监视的事件从用户拷贝到内核。此外,调用epoll_wait
获取就绪事件时,只会拷贝就绪的事件,不会进行不必要的拷贝操作。 -
优秀的回调机制:避免操作系统主动轮询检测事件就绪,而是采用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。
-
没有数量限制:监视的文件描述符数目无上限,只要内存允许,就可以一直向红黑树当中新增节点。
一个注意点:
有人说epoll中使用了内存映射机制,内核可以直接将底层就绪队列通过mmap的方式映射到用户态,此时用户就可以直接读取到内核中就绪队列当中的数据,避免了内存拷贝的额外性能开销。
-
这种说法是错误的,实际操作系统并没有做任何映射机制,因为操作系统是不相信任何人的,操作系统不会让用户进程直接访问到内核的数据的,因此用户要获取内核当中的数据,势必还是需要将内核的数据拷贝到用户空间。
-
在
epoll
的内核源码中,epoll_wait
实现的内核代码中调用了__put_user
函数,这个函数的作用就是就是将数据从内核拷贝到用户空间。
epoll与select和poll的不同之处
-
在使用
select
和poll
时,都需要借助第三方数组来维护历史上的文件描述符以及需要监视的事件,这个第三方数组是由用户自己维护的,对该数组的增删改操作都需要用户自己来进行。 -
使用epoll时,不需要用户自己维护所谓的第三方数组,epoll底层的红黑树就充当了这个第三方数组的功能,并且该红黑树的增删改操作都是由内核维护的,用户只需要调用
epoll_ctl
让内核对该红黑树进行对应的操作即可。 -
在使用多路转接接口时,数据流都有两个方向,一个是用户告知内核,一个是内核告知用户。select和poll将这两件事情都交给了同一个函数来完成,而epoll在接口层面上就将这两件事进行了分离,epoll通过调用
epoll_ctl
完成用户告知内核,通过调用epoll_wait
完成内核告知用户。
6、epoll工作方式
epoll有两种工作方式,分别是水平触发工作模式和边缘触发工作模式。
- 水平触发(LT,Level Triggered)
只要底层有事件就绪的事件,或者就绪的事件没有被处理完全,epoll就会一直通知用户,就像数字电路当中的高电平触发一样,只要一直处于高电平状态,则会一直触发,直到我们的电平状态变为了低电平。
epoll默认状态下就是LT工作模式,select
和poll
其实默认状态下也是LT工作模式。
- 由于在LT工作模式下,只要底层有事件就绪就会一直通知用户,因此当epoll检测到底层读事件就绪时,可以不立即进行处理,或者只处理一部分,因为只要底层数据没有处理完,下一次epoll还会通知用户事件就绪。
实验:
在前面我们写的代码中,我们只需要EpollServer.hpp
中的EpollServer
中的Start
函数中的HandleEvent
函数调用给注释掉,即有事件到来了,我们不去处理,当我们循环再次调用epoll_wait
时,由于就绪的事件没有处理,就绪队列中对应的节点没有被清除,所以下面我们使用的是阻塞调用就形同虚设了(因为就绪队列一直有没有被处理的数据)。
然后运行我们修改过的代码,我们应该看到当我们连接服务器时,有一个读事件发生,我们不去处理,服务器一直在给我们通知。
void Start()
{
// int timeout = 2000;
while (true)
{
int n = _epoller.Wait(_revs, _num, -1);
switch (n)
{
case -1:
logMessage(Warning, "epoll_wait fail : %s, errno code : %d", strerror(errno), errno);
break;
case 0:
logMessage(Info, "timeout...");
break;
default:
logMessage(Info, "有%d个事件就绪了!", n);
//HandleEvent(n);
break;
}
}
}
通过这个现象我们能看出epoll
的默认的LT工作模式。
- 边缘触发(ET,Edge Triggered)
只有底层就绪事件数量由无到有或由有到多发生变化的时候,epoll
才会通知用户,就像数字电路当中的上升沿触发一样,只有当电平由低变高的那一瞬间才会触发。
如果要将epoll改为ET工作模式,则需要在添加事件时设置EPOLLET
选项。
-
由于在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,因此当
epoll
检测到底层读事件就绪时,必须立即进行处理,而且必须全部处理完毕,因为有可能此后底层再也没有事件就绪,那么epoll就再也不会通知用户进行事件处理,此时没有处理完的数据就相当于丢失了。 -
ET工作模式下epoll通知用户的次数一般比LT少,因此ET的性能一般比LT性能更高,Nginx就是默认采用ET模式使用epoll的。
实验:
和刚才的实验同理,这次依然我们需要EpollServer.hpp
中的EpollServer
中的Start
函数中的HandleEvent
函数调用给注释掉,同时将EpollServer
的构造函数中的AddEvent
中的事件添加| EPOLLET
,有事件到来了,我们依然不进行处理,运行代码我们看到epoll
确实只给我们通知了一次!
EpollServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 创建的epoll模型
_epoller.Create();
// 添加要关心的事件
_epoller.AddEvent(_listen_fd.Fd(), EPOLLIN | EPOLLET);
}
LT和ET工作模式的对比
-
通知效率:在ET模式下,一个文件描述符就绪之后,用户不会反复收到通知,一般来说ET比LT的通知效率更高效。
一次通知就是一次系统调用返回,一次返回必定对应一次调用,相比于LT模式ET有效减少系统调用次数,所以ET的这个特点能够提升ET模式的工作效率。 -
数据读写的方式
- 对于ET模式,由无到有或由有到多发生变化,才会通知上层读取数据,如果本次通知的数据上层不全部读取完毕,此时没有处理完的数据就有可能读取不到了,这就倒逼程序员必须一次将本轮数据全部读取完毕。
于是就会产生下面的逻辑链:
所以在ET模式下所有的读取和写入都必须是非阻塞的接口!
ET倒逼程序员尽快取走所有的数据,本质是: 让TCP底层更新出更大的接受窗口,从而在较大概率上提供对方的滑动窗口的大小,提高发送效率! - 在LT模式下,由于当我们数据没有读取完毕时,
epoll
会给我们进行通知,所以我们既可以采用一次读取完毕,或者一次读取一些,在LT模式下我们也能够使用非阻塞接口进行读取而且我们还能使用阻塞接口进行读取,因为当epoll
给我们通知时,说明数据肯定已经就绪了!当然为了IO的效率我们一般选择一次读取完毕。
- 对于ET模式,由无到有或由有到多发生变化,才会通知上层读取数据,如果本次通知的数据上层不全部读取完毕,此时没有处理完的数据就有可能读取不到了,这就倒逼程序员必须一次将本轮数据全部读取完毕。
所以我们的LT模式在使用非阻塞接口并且选择一次读取完毕时,其IO的效率和ET模式是一样的,所以论效率的上限是 ET = LT
的,但是LT模式还能够使用阻塞式接口进行读取,是不是ET模式就没有必要存在了呢?
答案是:不是,在服务器中我们一般使用的还是ET模式。
- 首先:是因为ET的工作效率是比较高的!
- 其次:ET是必须使用非阻塞接口的,如果中间没有使用非阻塞接口很容易将问题的bug暴漏出来进行测试修正,而LT如果中间使用了非阻塞接口不容易将问题暴漏出来,就不容易进行修正。
当然LT也有它的一些适用场景,例如在一个IO量很大又需要实时性很高的场景,我们就可以选择LT进行多次IO,边读取边分析边响应,如果是ET模式就不能够做到及时的响应了。
ET工作模式下应该如何进行读写
因为在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,这就倒逼用户当读事件就绪时必须一次性将数据全部读取完毕,当写事件就绪时必须一次性将发送缓冲区写满,否则可能再也没有机会进行读写了。
-
因此读数据时必须循环调用recv函数进行读取,写数据时必须循环调用send函数进行写入。
-
当底层读事件就绪时,循环调用recv函数进行读取,直到某次调用recv读取时,实际读取到的字节数小于期望读取的字节数,则说明本次底层数据已经读取完毕了。
-
但有可能最后一次调用recv读取时,刚好实际读取的字节数和期望读取的字节数相等,但此时底层数据也恰好读取完毕了,如果我们再调用recv函数进行读取,那么recv就会因为底层没有数据而被阻塞住。
-
而这里的阻塞是非常严重的,就比如我们这里写的服务器都是单进程的服务器,如果recv被阻塞住,并且此后该数据再也不就绪,那么就相当于我们的服务器挂掉了,因此在ET工作模式下循环调用recv函数进行读取时,必须将对应的文件描述符设置为非阻塞状态。
-
调用send函数写数据时也是同样的道理,需要循环调用send函数进行数据的写入,并且必须将对应的文件描述符设置为非阻塞状态。
强调: ET工作模式下,recv
和send
操作的文件描述符必须设置为非阻塞状态,这是必须的,不是可选的!文章来源:https://www.toymoban.com/news/detail-756782.html
参考资料:
IO多路转接 ——— select、poll、epoll文章来源地址https://www.toymoban.com/news/detail-756782.html
到了这里,关于【Linux】I/O多路转接技术的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!