Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)

这篇具有很好参考价值的文章主要介绍了Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇

1、完善Epoll简单服务器

上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每个fd建立映射。Start函数改一下,不管超时还是出错,就只处理数据,处理的部分交给HandlerEvent,改名成LoopOnce,也就是说,Start那里还是有循环,每次循环都去执行L函数,L函数用Wait提取一次,然后处理。

    void Start()
    {
        //1、将listensock添加到epoll中,要先有epoll模型
        bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);//只关心读事件
        assert(r);//可以做别的判断
        (void)r;
        struct epoll_event revs_[gnum];
        int timeout = 1000;
        while(true)
        {
            LoopOnce(timeout);
        }
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中
        bool r = epoller_.AddEvent(sock, EPOLLIN);
        assert(r);
        (void)r;
    }

    void Recver(int fd)
    {
        char request[1024];
        ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
        if (s > 0)
        {
            request[s - 1] = 0; // 对打印格式
            request[s - 2] = 0; // 做一下调整
            std::string response = func_(request);
            send(fd, response.c_str(), response.size(), 0);
        }
        else
        {
            if (s == 0)
                logMessage(Info, "client quit ...");
            else
                logMessage(Warning, "recv error, client quit...");
            close(fd);
            // 将文件描述符移除
            // 在处理异常的时候,fd必须合法才能被处理
            epoller_.DelEvent(fd);
        }
    }

    void LoopOnce(int timeout)
    {
        int n = epoller_.Wait(revs_, gnum, timeout);
        for(int i = 0; i < n; i++)
        {
            int fd = revs_[i].data.fd;
            uint32_t events = revs_[i].events;
            logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
            if(events & EPOLLIN)//判断读事件就绪
            {
                if (fd == listensock_.Fd())
                {
                    // 1、新连接到来
                    Accepter();
                }
                else
                {
                    // 2、读事件
                    Recver(fd);
                }
            }
        }
    }
class Connection
{
public:
    Connection(int fd): fd_(fd)
    {}
     
    ~Connection()
    {}
public:
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
};

std::unordered_map<int, Connection*> connections_;

把Start的初始化任务交给InitServer

    void InitServer()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
        epoller_.Create();
        logMessage(Debug, "init server success");
        //为listensock创建对应的connection对象
        Connection* conn = new Connection(listensock_.Fd());
        //将listensock和connection对象添加到connections_
        connections_.insert(std::pair<int, Connection*>(listensock_.Fd(), conn));
        //将listensock添加到epoll中
        bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);
        assert(r);
        (void)r;
    }

    void Start()
    {
        struct epoll_event revs_[gnum];
        int timeout = 1000;
        while(true)
        {
            LoopOnce(timeout);
        }
    }

同样地,Accepter有添加到epoll的fd也要映射上自己的Connection类,Recver那里就可以也改一下了

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中        
        Connection* conn = new Connection(sock);      
        connections_.insert(std::pair<int, Connection*>(sock, conn));
        bool r = epoller_.AddEvent(sock, EPOLLIN);
        assert(r);
        (void)r;
    }

    void Recver(int fd)
    {
        char request[1024];
        ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
        if (s > 0)
        {
            request[s - 1] = 0; // 对打印格式
            request[s - 2] = 0; // 做一下调整
            connections_[fd]->inbuffer_ += request;
            std::string response = func_(request);
            send(fd, response.c_str(), response.size(), 0);
        }
        else
        {
            if (s == 0)
                logMessage(Info, "client quit ...");
            else
                logMessage(Warning, "recv error, client quit...");
            close(fd);
            // 将文件描述符移除
            // 在处理异常的时候,fd必须合法才能被处理
            epoller_.DelEvent(fd);
        }
    }

所有就绪的fd,不只包含我们关心的fd,都要有Connection类。Accepter那里,得到连接后,获取套接字,不直接读取,因为不知道是否有数据,就交给epoll,不过获取套接字后,每个套接字都需要正确读取自己的报文,所以Connection有了两个buffer。

所有就绪的fd,不仅要有Connection类,还要被epoll管理。但这样的代码并不高效,删除的时候要从epoll里删,还要从connections_里删,且代码也不够简洁。

封装并修改一下形式

class Connection
{
public:
    Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
    : fd_(fd), clientip_(clientip), clientport_(clientport)
    {}
     
    ~Connection()
    {}
public:
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
    std::string clientip_;
    uint16_t clientport_;
};

//...

    void InitServer()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
        epoller_.Create();
        //为listensock创建对应的connection对象
        //将listensock和connection对象添加到connections_       
        //将listensock添加到epoll中
        AddConnection(listensock_.Fd(), EPOLLIN);
        logMessage(Debug, "init server success");
    }

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //1、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //2、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中        
        AddConnection(sock, EPOLLIN, clientip, clientport);
    }

    void Recver(int fd)
    {
        char request[1024];
        ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
        if (s > 0)
        {
            request[s - 1] = 0; // 对打印格式
            request[s - 2] = 0; // 做一下调整
            connections_[fd]->inbuffer_ += request;
            std::string response = func_(request);
            send(fd, response.c_str(), response.size(), 0);
        }
        else
        {
            if (s == 0)
                logMessage(Info, "client quit ...");
            else
                logMessage(Warning, "recv error
            // 在处理异常的时候,fd必须合法才能被处理
            epoller_.DelEvent(fd);
        }
    }

2、打造统一的分开处理的体系

现有的Accepter、Recver都是处理写事件的,LoopOnce那里可以加个读事件的判断,但相关的处理函数要怎么写?为了简便,这里再引入回调函数。

const static int gport = 8888;
class Connection;

using func_t = std::function<std::string (std::string)>;
using callback_t = std::function<void(Connection*)>;

class Connection
{
public:
    Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
    : fd_(fd), clientip_(clientip), clientport_(clientport)
    {}
     
    void Register(callback_t recver, callback_t sender, callback_t excepter)
    {
        recver_ = recver;
        sender_ = sender;
        excepter_ = excepter;
    }

    ~Connection()
    {}
public:
    //IO信息
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
    //IO处理
    callback_t recver_;
    callback_t sender_; 
    callback_t excepter_;
    //用户信息
    std::string clientip_;
    uint16_t clientport_;
};

Register为注册方法,也就是要使用的方法。在AddConnection函数中,要判断一下,是我们关心的和不是我们关心的,都调用注册方法,但传的参数不一样。

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        if(fd == listensock_.Fd())
        {
            conn->Register();
        }
        else
        {
            conn->Register();
        }
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //3、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

Accepter那里,里面有AddConnection函数。当LoopOnce调用Accepter时,这个函数也要用回调函数,这样就是一个类的成员函数要调用另一个类的回调函数。

    void Accepter(Connection* conn)
    {
        (void) conn;//先闲置不用
        std::string clientip;
        uint16_t clientport;
        int sock = listensock_.Accept(&clientip, &clientport);
        if (sock < 0) return ;
        logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
        // 还不能recv,即使有了连接但也不知道有没有数据
        // 只有epoll知道具体情况,所以将sock添加到epoll中        
        AddConnection(sock, EPOLLIN, clientip, clientport);
    }

AddConnection中,Regsiter三个参数都是callback_t类型的,我们可以这样写

        if(fd == listensock_.Fd())
        {
            conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        }

这样设置,当我们关心的套接字上有事件就绪时,读方法就绑定Accepter。是其它套接字的话

        else
        {
            conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
                           std::bind(&EpollServer::Sender, this, std::placeholders::_1),
                           std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
        }
    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        if(fd == listensock_.Fd())
        {
            conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        }
        else
        {
            conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
                           std::bind(&EpollServer::Sender, this, std::placeholders::_1),
                           std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
        }
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //3、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

Recver和Sender函数要传的参数都是Connection* conn,Sender和Excepter下面再写。这样的设计主要是为了更集中实现功能,代码分明。

在LoopOnce就这样写:

    void LoopOnce(int timeout)
    {
        int n = epoller_.Wait(revs_, gnum, timeout);
        for(int i = 0; i < n; i++)
        {
            int fd = revs_[i].data.fd;
            uint32_t events = revs_[i].events;
            logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
            if(events & EPOLLIN) connections_[fd]->recver_(connections_[fd]);
            if(events & EPOLLOUT) connections_[fd]->sender_(connections_[fd]);
            if((events & EPOLLERR) || (events & EPOLLHUP)) connections_[fd]->excepter_(connections_[fd]);
        }
    }

这样就形成了一整个体系。写事件,读事件,其它事件都有了处理。当服务器启动后,服务器就监听事件,一旦事件就绪,就会根据不同的事件类型来派发事件到不同的Connection中,由Connection来调用对应的函数来处理。

这时候,Start函数就是事件派发器,可以写为Disptcher()。接下来要写Recver、Sender、Excepter。

3、epoll工作模式

select,poll,epoll三个,一旦有事件就绪,如果上层不取,底层就会一直通知事件就绪,这种模式叫做LT模式,水平触发Level Triggered工作模式。epoll默认LT,另有一个ET模式,边缘触发Edge Triggered工作模式,在数据变化时只通知一次,变化就是从无到有,从有到多。ET倒逼程序员必须一次将本轮数据全部读取完毕,怎样保证读完?可以循环读取,直到某次读取的数量比每次要的量少,比如等于0或者小于这个数,就说明读完了;但因为recv/read是默认阻塞的,所以循环读取可能阻塞住,比如读完几次后刚好全部读完,那么下次读取就阻塞了,所以ET模式下,所有的读取和写入都必须是非阻塞的接口。

LT也可以在非阻塞的情况写入,读取,当然也可以在阻塞模式下工作。但LT也不能代替ET,因为代码无法统一起来,而ET只能是非阻塞,ET倒逼程序员写成它自己的形式。ET通知效率 >= LT,IO效率也是一样。

一次通知就是一次系统调用返回,一次返回必定对应一次调用,ET能有效减少系统调用次数。ET倒逼程序员尽快取走数据的本质是让TCP底层更新出更大的接收窗口,以较大概率地增加对方的滑动窗口的大小,提高发送效率。

ET并非能替代LT,ET适合高IO场景,LT能够读一部分就处理一部分,ET必须得读完才行。epoll接口默认LT。

4、ET模式

ET的设置是一个宏,EPOLLET。在我们的InitServer初始化函数中加上这个宏就行。下面也放了AddConnection的代码。

    void InitServer()
    {
        listensock_.Socket();
        listensock_.Bind(port_);
        listensock_.Listen();
        epoller_.Create();
        //为listensock创建对应的connection对象
        //将listensock和connection对象添加到connections_       
        //将listensock添加到epoll中
        AddConnection(listensock_.Fd(), EPOLLIN | EPOLLET);
        logMessage(Debug, "init server success");
    }

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        if(fd == listensock_.Fd())
        {
            conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        }
        else
        {
            conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1),
                           std::bind(EpollServer::Sender, this, std::placeholders::_1),
                           std::bind(EpollServer::Excepter, this, std::placeholders::_1));
        }
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //3、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

除此之外,监听的套接字也得设置成非阻塞,用fcntl接口。写一个Util.hpp

#pragma once

#include <iostream>
#include <unistd.h>
#include <fcntl.h>

class Util
{
public:
    static bool SetNonBlock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        if(fl < 0) return false;
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
        return true;
    }
};

在EpollServer.hpp中

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //1、设置fd非阻塞
        if(events & EPOLLET) Util::SetNonBlock(fd);
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);

Accepter里

AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport); 

5、继续完善,处理写事件

现在的代码,从InitServer,传listensock_.Fd()到AddConnection中,然后会调用Accetper函数,但只能读一个连接,得改成循环的。给Connection类加一个成员uint32_t events,AddConnection函数中在插入connections_数组前给conn->events赋值,Accepter函数中传过来的参数是conn,通过events来判断是否需要循环。

class Connection
{
public:
    Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
    : fd_(fd), clientip_(clientip), clientport_(clientport)
    {}
     
    void Register(callback_t recver, callback_t sender, callback_t excepter)
    {
        recver_ = recver;
        sender_ = sender;
        excepter_ = excepter;
    }

    ~Connection()
    {}
public:
    //IO信息
    int fd_;
    std::string inbuffer_;
    std::string outbuffer_;
    //IO处理
    callback_t recver_;
    callback_t sender_; 
    callback_t excepter_;
    //用户信息
    std::string clientip_;
    uint16_t clientport_;

    uint32_t events;
};

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
    {
        //1、设置fd非阻塞
        if(events & EPOLLET) Util::SetNonBlock(fd);
        //2、构建connection对象,交给connections_管理
        Connection* conn = new Connection(fd, ip, port);
        if(fd == listensock_.Fd())
        {
            conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
        }
        else
        {
            conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1),
                           std::bind(EpollServer::Sender, this, std::placeholders::_1),
                           std::bind(EpollServer::Excepter, this, std::placeholders::_1));
        }
        conn->events = events;
        connections_.insert(std::pair<int, Connection*>(fd, conn));
        //3、fd和events写到内核中
        bool r = epoller_.AddEvent(fd, events);
        assert(r);
        (void)r;
        logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
    }

    void Accepter(Connection* conn)
    {
        do
        {
            std::string clientip;
            uint16_t clientport;
            int sock = listensock_.Accept(&clientip, &clientport);
            if (sock < 0) return;
            logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
            // 还不能recv,即使有了连接但也不知道有没有数据
            // 只有epoll知道具体情况,所以将sock添加到epoll中
            AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
        } while (conn->events & EPOLLET);//如果是ET模式就循环,不是就退出
    }

是我们需要的,就走Accetper方法,不是就走Recver,Sender,Excepter方法。上面Accepter函数中的Accept函数有返回值,这里的处理就是如果出错就返回,不过现在得处理出错,如果不出错才能继续走AddConnection函数。先在Sock.hpp中改一下,加上err参数,err = errno。

    int Accept(std::string* clientip, uint16_t* clientport, int* err)
    {
        struct sockaddr_in temp;
        socklen_t len = sizeof(temp);
        int sock = accept(_sock, (struct sockaddr*)&temp, &len);
        *err = errno;
        if(sock < 0)
        {
            logMessage(Warning, "accept error, code: %d, errstring: %s", errno, strerror(errno));
        }
        else
        {
            *clientip = inet_ntoa(temp.sin_addr);//这个函数就可以从结构体中拿出ip地址,转换好后返回
            *clientport = ntohs(temp.sin_port);
        }
        return sock;
    }

Accepter函数

    void Accepter(Connection* conn)
    {
        do
        {
            int err = 0;
            std::string clientip;
            uint16_t clientport;
            int sock = listensock_.Accept(&clientip, &clientport, &err);
            if (sock < 0)
            {
                logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
                // 还不能recv,即使有了连接但也不知道有没有数据
                // 只有epoll知道具体情况,所以将sock添加到epoll中
                AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
            }
            else
            {
                if(err == EAGAIN || err == EWOULDBLOCK) break;//读完了,缓冲区满了
                else if(err == EINTR) continue;//有信号暂时中断,后续还得继续读
                else//异常,本次获取连接失败,继续读下一个连接
                {
                    logMessage(Warning, "errstring: %s, errcode: %d", strerror(err), err);
                    continue;
                }
            }
        } while (conn->events & EPOLLET);//如果是ET模式就循环,不是就退出
        logMessage(Debug, "accepter done ...");
    }

再完成Recver,Sender,Excepter函数

    void Recver(Connection* conn)
    {
        //读取完了本轮数据
        do
        {
            char buffer[bsize];//1024
            ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);
            if(n > 0)
            {
                buffer[n] = 0;
                conn->inbuffer_ += buffer;
                //根据基本协议,进行数据分析,边读取边分析
                
            }
            else if(n == 0)//另一端关闭了套接字,要关闭连接
            {
                conn->excepter_(conn);//归到异常处理
            }
            else
            {
                if(errno == EAGAIN || errno == EWOULDBLOCK) break;
                else if(errno == EINTR) continue;
                else conn->excepter_(conn);
            }
        } while (conn->events & EPOLLET); 
        //根据基本协议,进行数据分析
    }

分析数据可以全读完再分析,也可以边读边分析,这就需要有协议规定,协议在之前有写过简单的代码。现在有3种情况会调用异常处理函数Excpter,Recver函数读的时候异常, Sender函数发送数据出现异常,LoopOnce里也有。这样情况有些多,代码写起来也不够好。改一下

    void LoopOnce(int timeout)
    {
        int n = epoller_.Wait(revs_, gnum, timeout);
        for(int i = 0; i < n; i++)
        {
            int fd = revs_[i].data.fd;
            uint32_t events = revs_[i].events;
            logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
            //下面这句就是把所有异常情况都转化为Recver和Sender去调用异常函数
            if((events & EPOLLERR) || (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);
            //下面这两个也要改一下,要保证连接存在
            if((events & EPOLLIN) && ConnIsExists(fd))
                connections_[fd]->recver_(connections_[fd]);
            if((events & EPOLLOUT) && ConnIsExists(fd)) 
                connections_[fd]->sender_(connections_[fd]);
        }
    }

    bool ConnIsExists(int fd)
    {
        return connections_.find(fd) != connections_.end();
    }

6、引入自定义协议,处理写事件

用之前的Util.hpp和Protocol.hpp,有序列化反序列化,所以Makefile里得加上-ljsoncpp

epollserver:Main.cc
	g++ -o $@ $^ -ljsoncpp -std=c++11
.PHONY:clean
clean:
	rm -f epollserver

Util.hpp

#pragma once

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <vector>
#include <cstdlib>
using namespace std;

class Util
{
public:
    static bool SetNonBlock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        if(fl < 0) return false;
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
        return true;
    }

    static bool StringSplit(const std::string& str, const std::string& sep, std::vector<std::string>* result)
    {
        size_t start = 0;
        while(start < str.size())
        {
            auto pos = str.find(sep, start);
            if(pos == std::string::npos) break;
            result->push_back(str.substr(start, pos - start));
            start = pos + sep.size();
        }
        if(start < str.size()) result->push_back(str.substr(start));
        return true;
    }

    static int toInt(const std::string& s)
    {
        return atoi(s.c_str());
    }
};

Protocol.hpp

#pragma once

#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <jsoncpp/json/json.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "Util.hpp"

//#define MESELF 1

//给网络版本计算机定制协议
namespace protocol_ns
{
    #define SEP " "
    #define SEP_LEN strlen(SEP)//不能用sizeof
    #define HEADER_SEP "\r\n"
    #define HEADER_SEP_LEN strlen("\r\n")
    //"长度"\r\n"_x_op_y"\r\n
    //假设报文是这样的: "7"\r\n"10 + 20"\r\n,这就相当于报头 + 有效载荷
    //请求/响应 = 报头\r\n有效载荷\r\n,只是请求和响应的有效载荷不同

    std::string AddHeader(const std::string &str)
    {
        std::cout << "AddHeader 之前:\n"
                  << str << std::endl;
        std::string s = std::to_string(str.size());
        s += HEADER_SEP;
        s += str;
        s += HEADER_SEP;
        std::cout << "AddHeader 之hou:\n"
                  << s << std::endl;
        return s;
    }

    std::string RemoveHeader(const std::string& str, int len)
    {
        std::cout << "RemoveHeader 之前:\n"
                  << str << std::endl;
        std::string res = str.substr(str.size() - HEADER_SEP_LEN - len, len);
        std::cout << "RemoveHeader 之后:\n"
                  << str << std::endl;
        return res;
    }

    int ReadPackage(int sock, std::string& inbuffer, std::string* package)
    {
        std::cout << "ReadPackage inbuffer 之前:\n"
                  << inbuffer << std::endl;
        //读取 ———— 字符串"7"\r\n"10 + 20"\r\n
        char buffer[1024];
        ssize_t s = recv(sock, buffer, sizeof(buffer - 1), 0);
        if(s <= 0) return -1;
        buffer[s] = 0;
        inbuffer += buffer;//此时inbuffer里就有了这样的字符串: "7"\r\n"10 + 20"\r\n
        //分析
        auto pos = inbuffer.find(HEADER_SEP);
        if(pos == std::string::npos) return 0;//没找到\r\n那么就不是正确的字符串,不动inbuffer里的内容,退出
        std::string lenStr = inbuffer.substr(0, pos);//获取头部字符串7
        int len = Util::toInt(lenStr);//得到了长度7,也就是有效载荷长度
        int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有报文的字符串长度就是这个
        if(inbuffer.size() < targetPackagelen) return 0;
        //提取报文有效载荷
        *package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符号的工作交给RemoveHeader
        inbuffer.erase(0, targetPackagelen);//只有到这里才改变inbuffer里的内容,从inbuffer里直接移除整个报文
        std::cout << "ReadPackage inbuffer 之后:\n"
                  << inbuffer << std::endl;
        return len;//len就是有效载荷的长度
    }

    class Request
    {
    public:
        Request() {}//为无参构造而准备的,这样就是一个无参一个有参
        Request(int x, int y, char op) 
        : _x(x), _y(y), _op(op)
        {}

        bool Serialize(std::string* outstr)//序列化:结构体转字符串
        {
            *outstr = "";
#ifdef MYSELF
            std::string x_string = std::to_string(_x);
            std::string y_string = std::to_string(_y);
            // 手动序列化
            *outstr = x_string + SEP + _op + SEP + y_string;
#else
            Json::Value root;//Value是一个万能对象,接受任何一个kv类型
            root["x"] = _x;
            root["y"] = _y;//所有放进去的会自动转为string类型
            root["op"] = _op;
            //Json::FastWriter writer;//FastWriter用来序列化,把结构化的数据转为字符串类型
            Json::StyledWriter writer;
            *outstr = writer.write(root);
#endif
            return true;
        }

        bool Deserialize(const std::string& instr)//反序列化:字符串转结构体
        {
#ifdef MYSELF
            std::vector<std::string> result;
            Util::StringSplit(instr, SEP, &result);
            if (result.size() != 3)
                return false;
            _x = Util::toInt(result[0]);
            _y = Util::toInt(result[2]);
            if (result[1].size() == 1)
                return false;   // 协议规定
            _op = result[1][0]; // 因为是字符,所以只要一个符号即可
            std::cout << "_x: \n"
                      << _x << "_y: \n"
                      << _y << "_op: " << _op << std::endl;
#else
            Json::Value root;
            Json::Reader reader;//Reader用来反序列化
            reader.parse(instr, root);
            _x = root["x"].asInt();//拿到的是字符串,要转成int类型
            _y = root["y"].asInt();
            //_op虽然是char,但它在计算机里就是整数,序列化时放进root的就是整数类型,反序列化时转成int类型,然后编译器会根据char类型自动解释成char类型
            _op = root["op"].asInt();
#endif
            return true;
        }

        ~Request() {}
    public:
        int _x;
        int _y;
        char _op;
    };

    class Response
    {
    public:
        Response() {}
        Response(int result, int code)
        : _result(result), _code(code)
        {}

        bool Serialize(std::string* outstr)
        {
            *outstr = "";
#ifdef MYSELF
            std::string res_string = std::to_string(_result);
            std::string code_string = std::to_string(_code);
            // 手动序列化
            *outstr = res_string + SEP + code_string;
#else
            Json::Value root;
            root["result"] = _result;
            root["code"] = _code;
            //Json::FastWriter writer;
            Json::StyledWriter writer;
            *outstr = writer.write(root);
#endif
            return true;
        }

        bool Deserialize(const std::string& instr)
        {
#ifdef MYSELF
            std::vector<std::string> result;
            Util::StringSplit(instr, SEP, &result);
            if (result.size() != 2)
                return false;
            _result = Util::toInt(result[0]);
            _code = Util::toInt(result[1]);
            std::cout << "_result: \n"
                      << _result << "_code: " << _code << std::endl;
#else
            Json::Value root;
            Json::Reader reader;
            reader.parse(instr, root);
            _result = root["result"].asInt();
            _code = root["code"].asInt();
#endif
            return true;
        }

        ~Response() {}
    public:
        int _result;
        int _code;//0表示计算成功,剩余的数字就是各种非法操作的错误码
    };
}

ReadPackage改一下,之前是接收并分析,现在只做分析

    int ParsePackage(std::string& inbuffer, std::string* package)
    {
        std::cout << "ReadPackage inbuffer 之前:\n"
                  << inbuffer << std::endl;
        //分析
        auto pos = inbuffer.find(HEADER_SEP);
        if(pos == std::string::npos) return 0;//没找到\r\n那么就不是正确的字符串,不动inbuffer里的内容,退出
        std::string lenStr = inbuffer.substr(0, pos);//获取头部字符串7
        int len = Util::toInt(lenStr);//得到了长度7,也就是有效载荷长度
        int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有报文的字符串长度就是这个
        if(inbuffer.size() < targetPackagelen) return 0;
        //提取报文有效载荷
        *package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符号的工作交给RemoveHeader
        inbuffer.erase(0, targetPackagelen);//只有到这里才改变inbuffer里的内容,从inbuffer里直接移除整个报文
        std::cout << "ReadPackage inbuffer 之后:\n"
                  << inbuffer << std::endl;
        return len;//len就是有效载荷的长度
    }

继续写EpollServer.hpp中的Recver和Sender函数

    void Recver(Connection* conn)
    {
        //读取完了本轮数据
        do
        {
            char buffer[bsize];//1024
            ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);
            if(n > 0)
            {
                buffer[n] = 0;
                conn->inbuffer_ += buffer;
                //根据基本协议,进行数据分析,边读取边分析;
                std::string requestStr;
                int n = ParsePackage(conn->inbuffer_, &requestStr);
                //看ParsePackage
                //n为0表示没有不合理字符串或者inbuffer剩下的不够规定的长度,不用判断
                if(n > 0)//保证读到了完整的请求
                {
                    //回调函数在Main.cc中
                    //这边先反序列化,再交给回调函数
                    //上面改成using func_t = std::function<void(const Request&)>;
                    requestStr = RemoveHeader(requestStr, n);
                    Request req;
                    req.Deserialize(requestStr);
                    func_(req);//交给回调函数处理
                }
            }
            else if(n == 0)//另一端关闭了套接字,要关闭连接
            {
                conn->excepter_(conn);//归到异常处理
            }
            else
            {
                if(errno == EAGAIN || errno == EWOULDBLOCK) break;
                else if(errno == EINTR) continue;
                else conn->excepter_(conn);
            }
        } while (conn->events & EPOLLET); 
    }

更改在n > 0的判断后

Main.cc

#include "EpollServer.hpp"
#include <memory>

//用之前网络计算器的计算函数
Response CalculateHelper(const Request& req)
{
    Response resp(0, 0);
    switch(req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
        if(req._y == 0) resp._code = 1;
        else resp._result = req._x / req._y;
        break;
    case '%':
        if(req._y == 0) resp._code = 2;
        else resp._result = req._x / req._y;
        break; 
    default:
        resp._code = 3;
        break; 
    }
    return resp;
}

void Calculate(const Request& req)
{
    Response resp = CalculateHelper(req);
    //序列化,当然这里放到EpollServer.hpp更好
    std::string sendStr;
    resp.Serialize(&sendStr);
    sendStr = AddHeader(sendStr);
    //序列化后发送出去
    
}

int main()
{
    std::unique_ptr<EpollServer> svr(new EpollServer(Calculate));
    svr->InitServer();
    svr->Disptcher();
    return 0;
}

epoll中关于fd的读取,一般要常设置,也就是一直要让epoll关心;关于fd的写入,则是按需设置,不能常设置,只有需要发送的时候才设置。发送的对象就是Connection中的outbuffer。

//EpollServer.hpp
using func_t = std::function<void(Connection *, const Request&)>;
//...
func_(conn, req);//Recver函数中
//Main.cc
void Calculate(Connection* conn, const Request& req)
{
    Response resp = CalculateHelper(req);
    //序列化,当然这里放到EpollServer.hpp更好
    std::string sendStr;
    resp.Serialize(&sendStr);
    //序列化后发送出去
    conn->outbuffer_ += sendStr;
    //开启对写事件的关心
}

加上写事件是要对内核做操作,在EpollServer.hpp的EpollServer类中中再添加一个函数专门做这个事,不过Main.cc中传过来的参数只有Connection类的,所有这个类还得添加一个成员来调用这个函数

class EpollServer;

//...
EpollServer* R;

//AddConnection里conn->events = events后
conn->R = this;//函数属于EpollServer类,this就是这个类

//...
void Calculate(Connection* conn, const Request& req)
{
    Response resp = CalculateHelper(req);
    //序列化,当然这里放到EpollServer.hpp更好
    std::string sendStr;
    resp.Serialize(&sendStr);
    //序列化后发送出去
    conn->outbuffer_ += sendStr;
    //开启对写事件的关心
    conn->R->EnableReadWrite(conn, true, true);
}

开启后,Epoll底层会调用Sender函数来发送。

    void Sender(Connection* conn)
    {
        do
        {
            ssize_t n = send(conn->fd_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
            //体现按需思路
            if(n > 0)//发送成功,发送了局部或全部
            {
                conn->outbuffer_.erase(0, n);
                if(conn->outbuffer_.empty())//把数据发完了
                {
                    EnableReadWrite(conn, true, false);//去掉对写事件的关心
                    break;
                }
                else//没发完,也就是发送了局部
                {
                    EnableReadWrite(conn, true, true);//继续
                }
            }
            else
            {
                //和Accepter里的一样的解释
                if(errno == EAGAIN || errno == EWOULDBLOCK) break;
                else if(errno == EINTR) continue;
                else
                {
                    conn->excepter_(conn);
                    break;
                }
            }
        } while (conn->events & EPOLLET);//ET模式就一直循环,不是就退出
        
    }

通常初次设置对写事件的关心,发送缓冲区是空的会,因此立马触发一次对应的fd的就绪,此时epoll底层会自动调用回调函数,从而使用Sender函数。

实现EnableReadWrite函数

    bool EnableReadWrite(Connection* conn, bool readable, bool writeable)
    {
        conn->events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        //修改Epoll.hpp中的AddEvent函数为AddModEvent,传入一个op,用来实现Add和Mod两个功能
        return epoller_.AddModEvent(conn->fd_, conn->events, EPOLL_CTL_MOD);
    }

    bool AddModEvent(int fd, uint32_t events, int op)
    {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = fd;//属于用户的数据,epoll底层不对该数据做任何修改,为了给未来就绪返回
        int n = epoll_ctl(epfd_, op, fd, &ev);
        if(n < 0)
        {
            logMessage(Fatal, "epoll_ctl error, code: %d, errstring: %s", errno, strerror(errno));
            return false;
        }
        return true;
    }

这样就完成了服务器的拉取工作,也就是有数据来了,可以返回结果。

现在这样的代码是没问题的,但也有些复杂,我们希望暴露在外面的更少,所有工作都在底层完成,上层不需要关心,只调用接口就好。

本篇gitee

下一篇继续写。

结束。文章来源地址https://www.toymoban.com/news/detail-808020.html

到了这里,关于Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Linux多路IO复用技术——epoll详解与一对多服务器实现

    本文详细介绍了Linux中epoll模型的优化原理和使用方法,以及如何利用epoll模型实现简易的一对多服务器。通过对epoll模型的优化和相关接口的解释,帮助读者理解epoll模型的工作原理和优缺点,同时附带代码实现和图解说明。

    2024年02月05日
    浏览(41)
  • 多路IO—POll函数,epoll服务器开发流程

    \\\"在计算机网络编程中,多路IO技术是非常常见的一种技术。其中,Poll函数和Epoll函数是最为常用的两种多路IO技术。这两种技术可以帮助服务器端处理多个客户端的并发请求,提高了服务器的性能。本文将介绍Poll和Epoll函数的使用方法,并探讨了在服务器开发中使用这两种技

    2024年02月06日
    浏览(39)
  • IO模型之epoll实现服务器客户端收发

     epoll.ser epoll.cri result      

    2024年02月13日
    浏览(54)
  • 【TCP服务器的演变过程】使用IO多路复用器epoll实现TCP服务器

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节的基础上,将IO多路复用机制select改为更高效的IO多路复用机制epoll,使用epoll管理每

    2024年01月17日
    浏览(66)
  • 多路转接高性能IO服务器|select|poll|epoll|模型详细实现

    那么这里博主先安利一下一些干货满满的专栏啦! Linux专栏 https://blog.csdn.net/yu_cblog/category_11786077.html?spm=1001.2014.3001.5482 操作系统专栏 https://blog.csdn.net/yu_cblog/category_12165502.html?spm=1001.2014.3001.5482 手撕数据结构 https://blog.csdn.net/yu_cblog/category_11490888.html?spm=1001.2014.3001.5482 去仓库获

    2024年02月15日
    浏览(59)
  • 手撕测试tcp服务器效率工具——以epoll和io_uring对比为例

    服务器的性能测试主要包括2部分: 并发量。能容纳多大的连接 效率。在不崩坏的情况下能对报文的处理效率。 本文主要进行效率测试,看看基于epoll模型和io_uring模型的tcp服务器,谁的效率更高。 测试思路 客户端(一个或多个)大量地向服务器发送报文,测试服务器的处理

    2024年01月18日
    浏览(99)
  • 【Linux】高级IO --- 多路转接,select,poll,epoll

    所有通过捷径所获取的快乐,无论是金钱、性还是名望,最终都会给自己带来痛苦 1. 后端服务器最常用的网络IO设计模式其实就是Reactor,也称为反应堆模式,Reactor是单进程,单线程的,但他能够处理多客户端向服务器发起的网络IO请求,正因为他是单执行流,所以他的成本就

    2024年02月09日
    浏览(60)
  • Linux学习记录——사십 高级IO(1)

    其它IO类型的实现在这篇之后的三篇 input,output。调用read或recv接口时,如果对方长时间不向我方接收缓冲区拷贝数据,我们的进程就只能阻塞,这是读取条件不满足。阻塞的时间成本最后会体现在用户上。因此可以说,IO = 等 + 数据拷贝。高效IO则是单位事件内,等的比重越

    2024年01月21日
    浏览(43)
  • Linux网络编程:多路I/O转接服务器(select poll epoll)

    文章目录: 一:select 1.基础API  select函数 思路分析 select优缺点 2.server.c 3.client.c 二:poll 1.基础API  poll函数  poll优缺点 read函数返回值 突破1024 文件描述符限制 2.server.c 3.client.c 三:epoll 1.基础API epoll_create创建   epoll_ctl操作  epoll_wait阻塞 epoll实现多路IO转接思路 epoll优缺点

    2024年02月11日
    浏览(52)
  • 五、Linux C/C++ 对epoll-reactor服务器的百万级高并发实现

    前言:基于epoll的反应堆模式(reactor)的服务器程序,进行百万并发量的连接测试。通过代码优化,以及服务器与客户端的硬件配置优化,达到百万并发。 代码实现 代码实现: 1台服务器:8G运行内存 8核CPU 3台客户端:4G运行内存 4核CPU 这些硬件配置可以通过虚拟机配置。 按照

    2024年02月20日
    浏览(84)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包