多路转接-epoll/Reactor(2)

这篇具有很好参考价值的文章主要介绍了多路转接-epoll/Reactor(2)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

epoll

上次说到了poll,它存在效率问题,因此出现了改进的poll----epoll。

目前epoll是公认的效率最高的多路转接的方案。

快速了解epoll接口

 epoll_create:

多路转接-epoll/Reactor(2),服务器,运维

这个参数其实已经被废弃了。 这个值只要大于0就可以了。

多路转接-epoll/Reactor(2),服务器,运维 这是用来创建一个epoll模型的。

创建成功了就返回一个文件描述符。失败了返回-1

epoll_wait:

多路转接-epoll/Reactor(2),服务器,运维

第一个参数就是epoll_create的返回值。后面俩参数是用户定义的缓冲区,用来返回就绪的文件描述符和事件。这里的timeout的单位是毫秒,跟poll那里一样。

这个接口的返回值是:已经就绪的fd的个数。

epoll_event结构体

多路转接-epoll/Reactor(2),服务器,运维

 这里依旧是以位图的形式传递标记位。

epoll_ctl:

多路转接-epoll/Reactor(2),服务器,运维

第一个参数依旧是epfd,op表示我们要进行哪些操作

多路转接-epoll/Reactor(2),服务器,运维 看名字也能大概看出来,分别是新增,修改和删除。

epoll原理

在Linux内核中,专门为epoll设计了一套独立的模型

多路转接-epoll/Reactor(2),服务器,运维 首先在内核中设计了一颗红黑树,它以fd为键值,其结点的值一般是一个结构体,里面包含了要关心的fd,以及这个fd关心的事件,还有一些链接字段等,然后还会有一个等待队列,如果有fd就绪了,就会将结点链入到这个就绪队列中,等待上层来处理,注意:这个结点既可以在红黑树中,也可以同时在就绪队列中。我们用epoll的时候,OS会将一个回调函数注册到底层,底层一旦就绪,就会自动调用这个回调函数,比如将数据向上交付,交给tcp的接受队列,查找红黑树中的fd,然后构建就绪结点插入到就绪队列中。

  这样,用户只需要从就绪队列中获取就绪结点就可以了。因此我们把这三个组合起来就是epoll模型。

这样回想起来之间了解的接口,比如epoll_create,它创建的就是一套epoll模型和注册底层的回调机制。但是有没有可能创建了多个epoll模型呢?那么多个模型OS也要管理起来,(比如可以记录红黑树的头结点指针和队列的头结点指针,就可以将这两个结构放在一起。)所以内核会创建一个struct file,把它也当作一个文件,struct file中也存在一个指针指向这个模型,OS再将这个文件添加到进程的文件描述符表里,通过fd来找到,这就解释了为什么epoll_create返回的也是一个文件描述符,因为epoll模型也被接入到了struct file中了。

如下

多路转接-epoll/Reactor(2),服务器,运维

到这里,我们发现,select&&poll,他俩跟epoll完全不一样。 

多路转接-epoll/Reactor(2),服务器,运维

所以现在对着图来看epoll的接口,就会发现简单的很多了,epoll_create就是为了创建epoll模型,epoll_ctl就是拿着epfd,进行的操作其实就是在修改红黑树。 

epoll_wait也是拿着epfd,看就绪队列是否有fd就绪。

另外这里的红黑树的性质,像不像我们在使用select/poll时用户维护的数组?性质是一样的,不过这次是由内核维护的。所以epoll是单独设计的一点,还体现在用户不需要再使用额外的数据结构来管理文件描述符和要关心的事件了。

epoll的优势:

1.检测就绪的时间复杂度是O(1)。因为只要看就绪队列是否为空就可以了。

获取就绪fd的时间复杂度是O(n)。

2.fd和event没有上限。

3.epoll的返回值 n,表示有几个fd就绪了,并且在内核中,会把这些结点一个一个弹出。就绪事件是连续的。这样就不需要用户再进行遍历排除非法的fd了,也就是需要浪费用户额外的时间了。

在进入代码前,可以先了解一下cmake

CMake

cmake是一个自动生成makefile的工具。

在CentOS下可以用

sudo yum install -y cmake

 来安装Cmake,如果是Ubuntu的话把yum改成apt就好了。

cmake --version

这个命令来查看cmake的版本。 

关于使用

首先要创建一个文件:CMakeLists.txt

多路转接-epoll/Reactor(2),服务器,运维

因为我的CentOS只能安装到2.8.12.2这样的版本,所以在这里我选择最低的版本为2.8这样的,写好后,我们执行

cmake .

多路转接-epoll/Reactor(2),服务器,运维

然后就会生成一大堆文件,包括makefile,

多路转接-epoll/Reactor(2),服务器,运维 vim一下这个Makefile

多路转接-epoll/Reactor(2),服务器,运维

发现已经帮我们写好了

我们可以直接make

多路转接-epoll/Reactor(2),服务器,运维

接下来就跟我们熟悉的一样了。

所以其实makefile已经有取代方案了,今后我们面对复杂的项目,使用CMake会简单很多。 

epoll的工作模式 

  epoll有两种工作模式:

LT(Level Triggered): 水平触发

一般epoll的默认工作模式就是LT模式。

当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
仍然会立刻返回并通知socket读事件就绪.
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
支持阻塞读写和非阻塞读写

也就是说,在LT模式下,如果上层不及时处理数据,LT会一直通知,直到数据全部被处理。 

ET(Edge Triggered):边缘触发

当epoll检测到socket上事件就绪时, 必须立刻处理.
如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,
epoll_wait 不会再返回了.
也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.
只支持非阻塞的读写
也就是说只有因为新增而引起的变化时,ET才会通知。
一般普遍的认为ET的工作效率要比LT要高一些。
ET的效率高主要是它的 通知效率高。
因为ET模式下,只会通知一遍,因此在ET模式下,每次通知,必须把数据全部读走,一般是用循环读取,直到读取的字节数少于预期读取的字节数,或者读取出错才停止。另外因为fd是默认阻塞的,所以在ET模式下,要讲fd设置成非阻塞的,避免服务器因为读取被挂起。(这里有可能会成为面试题的)
ET的效率更高,不仅仅是通知效率更高(因为通知一次就要全部读走),它的IO效率也要更高一些,但是IO效率更高要如何理解呢?
我们以TCP协议为例,如果我们能一次将数据全部读走,那么我们就能给对方通知一个更大的窗口,从概率上让对方能一次发送更多的数据,因此IO效率也可能就更高。
但是话说回来,LT的效率一定比ET要低吗?如果在LT模式下,我们也是循环读取,在通知第一次的时候就将数据全部读走的话,不就和ET一样了吗?所以这个时候ET就不一定比LT高效了。

epollecho服务简单实现 

首先对epoll进行简单的封装

Epoller.hpp

#pragma once

#include "nocopy.hpp"
#include "Log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>

class Epoller : nocopy
{
    static const int size = 128;
public:
    Epoller()
    {
        _epfd = epoll_create(size);
        if(_epfd == -1)
        {
            lg(Error,"epoll_create error: %s",strerror(errno));
        }
        else
        {
            lg(Info,"epoll_create success: %d",_epfd);
        }
    }

    int Epollwait(struct epoll_event revents[],int num)
    {
        int n = epoll_wait(_epfd,revents,num,-1); // -1表示阻塞等待
        return n;
    }

    int EpollUpdata(int oper,int sock,uint32_t event)
    {
        int n = 0;
        if(event == EPOLL_CTL_DEL) // 这里对于删除操作单独处理了
        {
            n = epoll_ctl(_epfd,oper,sock,nullptr);
            if(n != 0)
            {
                lg(Error,"epoll_ctl delete error");
            }
        }
        else 
        {
            // EPOLL_CTL_ADD || EPOLL_CTL_MOD
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock; // 为了方便后期它就绪时,我们知道是哪个fd。
            n = epoll_ctl(_epfd,oper,sock,&ev); // 本质上是对红黑树进行插入或修改结点
            if(n != 0)
            {
                lg(Error,"epoll_ctl error");
            }
        }

        return n;
    }

    ~Epoller()
    {
        if(_epfd >= 0)
            close(_epfd);
    }
private:
    int _epfd;
    int _timeout;
};

 EpollServer.hpp

#pragma once

#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "nocopy.hpp"

uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);

class EpollServer : nocopy
{
    const int num = 64;
public:
    EpollServer(uint16_t port = 8080)
        :_port(port)
        ,_listensocket_ptr(new Sock())
        ,_epoller_ptr(new Epoller())
        {}
    
    void Init()
    {
        _listensocket_ptr->Socket();
        _listensocket_ptr->Bind(_port);
        _listensocket_ptr->Listen();

        lg(Info,"create listen socket success :%d\n",_listensocket_ptr->Fd());
    }

    void Start()
    {
        // 首先将listensock添加到epoll中(还有其关心的事件),也就是添加到内核的红黑树中
        _epoller_ptr->EpollUpdata(EPOLL_CTL_ADD,_listensocket_ptr->Fd(),EVENT_IN);
        struct epoll_event revs[num]; // num是我们自己定的,不受接口限制
        while(true)
        {
            int n = _epoller_ptr->Epollwait(revs,num);
            if(n > 0)
            {
                // 有事件就绪了
                lg(Debug,"event happend, fd is : %d",revs[0].data.fd);
                Dispatcher(revs,n);
            }
            else if(n == 0)
            {
                lg(Info,"time out ....\n");
            }
            else 
            {
                lg(Error,"epoll_wait eroor\n");
            }
        }
    }

    void Dispatcher(struct epoll_event* revs,int num)
    {
        for(int i = 0; i < num; ++i)
        {
            uint32_t events = revs[i].events;
            int fd = revs[i].data.fd;
            if(events & EVENT_IN)
            {
                if(fd == _listensocket_ptr->Fd())
                {
                    //新链接
                    Accepter();
                }
                else 
                {
                    // 普通读取
                    Recver(fd);
                }
            }
            else // 这里只考虑读取事件了
            {}
        }
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = _listensocket_ptr->Accept(&clientip,&clientport);
        if(sock < 0)
        {
            lg(Error,"Accept error, fd is : %d",sock);
        }

        // 获取到链接后,不能直接read,而是将fd添加进epoll
        _epoller_ptr->EpollUpdata(EPOLL_CTL_ADD,sock,EVENT_IN);
        lg(Info,"get a new link, clientip : %s, clientport: %d",clientip.c_str(),clientport);
    }
    
    void Recver(int fd)
    {
        // 这里依旧只是一个demo,并没有处理到位
        char buffer[1024];
        ssize_t n = read(fd,buffer,sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a massge: " << buffer << std::endl;
            //然后回显给对方
            std::string echo_str = "server echo $ ";
            echo_str += buffer;
            write(fd,echo_str.c_str(),echo_str.size());
        }
        else if(n == 0)
        {
            lg(Info,"client quit, me too,close fd is: %d",fd);
            // 这里关闭时有一个细节,要先从epoll中移除
            // 再close,避免先close导致fd非法再从epoll中移除而报错
            _epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);
            close(fd);
        }
        else 
        {
            lg(Warning,"recver error,close ..");
            _epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);
            close(fd);
        }
    }

    ~EpollServer()
    {
        _listensocket_ptr->Close();
    }

private:
    std::unique_ptr<Sock> _listensocket_ptr;
    std::unique_ptr<Epoller> _epoller_ptr;
    uint16_t _port;
};

在读取这里,以TCP为例,因为TCP是面向字节流的,我们并不能保证每次读取都是能读到完整的数据, 如果当前缓冲区里只有半个请求,我们读吗?读了之后放哪呢?在这里我们并没有处理,因此会在Ractor中解决。

我们可以通过单独设计一个类来进行防拷贝

nocopy.hpp

#pragma once

class nocopy
{
public: 
    nocopy(){}
    nocopy(const nocopy &) = delete;
    const nocopy& operator=(const nocopy &) = delete;
};

Reactor

 Reactor模式是一种事件驱动的并发编程模型,它解决了在高并发环境下处理大量客户端请求的问题。

  Reactor其实是一种半同步半异步的模型,同步是因为调用epoll,要自己等,异步体现在它可以进行回调处理。

代码:

首先我们先对设置非阻塞的函数进行一个封装

Comm.hpp

#pragma once

#include <unistd.h>
#include <fcntl.h>

void SetNonBlockOrDie(int sock)
{
    int fl = fcntl(sock,F_GETFL);
    if(fl < 0)
        exit(4);
    fcntl(sock,F_SETFL,fl | O_NONBLOCK);
}

TcpServer.hpp

#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Comm.hpp"
#include "Log.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"

class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

const static int g_buffer_size = 128;

using func_t = std::function<void(std::weak_ptr<Connection>)>;
using except_func = std::function<void(std::weak_ptr<Connection>)>;

// 对每个链接进行管理
class Connection
{
public:
    Connection(int sock)
        : _sock(sock)
    {
    }

    void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

    int SockFd()
    {
        return _sock;
    }

    void AppendInBuffer(const std::string &info)
    {
        _inbuffer += info;
    }

    void AppendOutBuffer(const std::string &info)
    {
        _outbuffer += info;
    }

    std::string &InBuffer()
    {
        return _inbuffer;
    }

    std::string &OutBuffer()
    {
        return _outbuffer;
    }

    void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr)
    {
        _tcp_server_ptr = tcp_server_ptr;
    }

    ~Connection()
    {
    }

private:
    int _sock;
    std::string _inbuffer; // 输入缓冲区,但是无法接收二进制流
    std::string _outbuffer;

public:
    // 回调方法
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;

    // 回指指针
    std::weak_ptr<TcpServer> _tcp_server_ptr;

    std::string _ip;
    uint16_t _port;
};

class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
    static const int num = 64;

public:
    TcpServer(uint16_t port, func_t OnMessage)
        : _port(port),
          _OnMessage(OnMessage),
          _quit(true),
          _epoller_ptr(new Epoller()),
          _listensock_ptr(new Sock())
    {
    }

    void Init()
    {
        _listensock_ptr->Socket();
        SetNonBlockOrDie(_listensock_ptr->Fd());
        _listensock_ptr->Bind(_port);
        _listensock_ptr->Listen();
        lg(Info, "create listen socket success: %d", _listensock_ptr->Fd());
        Addconnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }

    void Addconnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb,
                       const std::string &ip = "0.0.0.0", uint16_t port = 0)
    {
        // 1.给sock建立一个Connection对象,并插入到map中
        std::shared_ptr<Connection> new_connection(new Connection(sock));
        new_connection->SetWeakPtr(shared_from_this()); // shared_from_this()是返回当前对象的shared_ptr,目的是设置回指
        new_connection->SetHandler(recv_cb, send_cb, except_cb);
        new_connection->_ip = ip;
        new_connection->_port = port;

        // 2.添加到unordered_map中
        _connections.insert(std::make_pair(sock, new_connection));

        // 3.添加对应的fd和事件到epoll中
        _epoller_ptr->EpollUpdata(EPOLL_CTL_ADD, sock, event);
    }

    // 链接管理器
    void Accepter(std::weak_ptr<Connection> conn)
    {
        auto connection = conn.lock();
        while (true)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len); // :: 表示使用系统原生的接口
            if (sock > 0)
            {
                uint16_t peerport = ntohs(peer.sin_port);
                char ipbuf[128];
                inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
                lg(Debug, "get a new client,get info[%s : %d],sockfd : %d", ipbuf, peerport, sock);

                SetNonBlockOrDie(sock);
                // listensock只要设置recv_cb,而其他sock读写异常都要关心设置
                Addconnection(sock, EVENT_IN,
                              std::bind(&TcpServer::Recver, this, std::placeholders::_1),
                              std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                              std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
                              ipbuf, peerport);
            }
            else
            {
                if (errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                    break; // 说明是真出错了
            }
        }
    }

    // 事件管理器
    // 这里不需要关心数据的格式,服务器只要IO数据即可,有没有读完,报文的细节交给上层
    void Recver(std::weak_ptr<Connection> conn)
    {
        if (conn.expired())
            return;
        auto connection = conn.lock();
        int sock = connection->SockFd();
        while (true) // 循环读取数据,因为是ET模式
        {
            char buffer[g_buffer_size];
            memset(buffer, 0, sizeof(buffer));
            int n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 虽然这里flags是0,但是sock已经设置成非阻塞了
            if (n > 0)
            {
                connection->AppendInBuffer(buffer);
            }
            else if (n == 0)
            {
                lg(Info, "sockfd : %d,client info %s:%d,quit...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            }
            else
            {
                if (errno = EWOULDBLOCK)
                    break;
                if (errno == EINTR)
                    continue;
                else
                {
                    lg(Warning, "sockfd: %d,client info %s:%d recv error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        // 数据读上来了,但是不一定完整,交给上层处理
        _OnMessage(connection);
    }

    // 发送这里会麻烦一些
    void Sender(std::weak_ptr<Connection> conn)
    {
        if (conn.expired())
            return;
        auto connection = conn.lock();
        auto &outbuffer = connection->OutBuffer();
        while (true)
        {
            ssize_t n = send(connection->SockFd(), outbuffer.c_str(), sizeof(outbuffer), 0); // 同理,不会再阻塞了
            if (n > 0)
            {
                outbuffer.erase(0, n);
                if (outbuffer.empty())
                    break;
            }
            else if (n == 0)
            {
                return;
            }
            else
            {
                if (errno = EWOULDBLOCK)
                    break;
                if (errno == EINTR)
                    continue;
                else
                {
                    lg(Warning, "sockfd: %d,client info %s:%d send error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        // 如果发送缓冲区被打满了,可能还没发完就退出循环了
        // 此时才来判断是否要关心写事件就绪
        if(!outbuffer.empty())
        {
            // 开启对写事件的关心
            EnableEvent(connection->SockFd(),true,true);
        }
        else 
        {
            // 关闭对写事件的关心
            EnableEvent(connection->SockFd(),true,false);
        }
    }

    void Excepter(std::weak_ptr<Connection> conn)
    {
        if(conn.expired());
        auto connection = conn.lock();

        int fd = connection->SockFd();
        lg(Warning,"Excepter hander sockfd: %d,client info %s : %d, excepter hander",
            fd,connection->_ip.c_str(),connection->_port);

        // 1.移除对该fd的关心
        _epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);

        // 2.再关闭fd
        close(fd);
        lg(Debug,"close fd done %d  ...",fd);

        // 3.再从unordered_map中移除
        _connections.erase(fd);
    }

    void EnableEvent(int sock,bool readable,bool writeable)
    {
        uint32_t events = 0;
        events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        _epoller_ptr->EpollUpdata(EPOLL_CTL_MOD,sock,events);
    }

    bool IsConnectionSafe(int fd)
    {
        auto iter = _connections.find(fd);
        if(iter == _connections.end())
            return false;
        else 
            return true;
    }

    void Dispatcher(int timeout)
    {
        int n = _epoller_ptr->Epollwait(revs,num,timeout);
        for(int i = 0; i < n; ++i)
        {
            uint32_t event = revs[i].events;
            int sock = revs[i].data.fd;

            //这里只处理读写事件
            if((event & EPOLLIN) && IsConnectionSafe(sock))
            {
                if(_connections[sock]->_recv_cb)
                    _connections[sock]->_recv_cb(_connections[sock]);
            }
            if((event & EPOLLOUT) && IsConnectionSafe(sock))
            {
                if(_connections[sock]->_send_cb)
                    _connections[sock]->_send_cb(_connections[sock]);
            }
        }
    }

    void Loop()
    {
        _quit = false;

        while(!_quit)
        {
            Dispatcher(-1); // 阻塞式

        }

        _quit = true;
    }

    void PrintConnection()
    {
        std::cout << "_connections fd list: ";
        for (auto &connection : _connections)
        {
            std::cout << connection.second->SockFd() << ", ";
            std::cout << "inbuffer: " << connection.second->InBuffer().c_str();
        }
        std::cout << std::endl;
    }

    ~TcpServer()
    {}

private:
    std::shared_ptr<Epoller> _epoller_ptr;                             // 内核
    std::shared_ptr<Sock> _listensock_ptr;                             // 监听socket
    std::unordered_map<int, std::shared_ptr<Connection>> _connections; // 哈希表管理链接
    struct epoll_event revs[num];
    uint16_t _port;
    bool _quit;
    // 处理上层信息(跟上层的应用场景有关)
    func_t _OnMessage;
};

其中在写,也就是Sender那里要说明:

select/poll/epoll因为发送缓冲区经常有空间,因此写事件是经常就绪的,如果我们对它进行EPOLLOUT设置关心,EPOLLOUT几乎每次都就绪,server经常返回,浪费CPU资源,

因此:对于读事件,我们设置常关心,对于写事件,按需关心 -> 也就是直接写入,如果写入完成,那么就结束了,不需要关心,如果因为一些原因(比如发送缓冲区满了)而没写完,也就是outbuffer里面还有数据,此时再设置关心,等下次写完了,再去掉关心。 

Main.cc

#include <iostream>
#include <functional>
#include <memory>
#include "Log.hpp"
#include "TcpServer.hpp"  // 只处理IO
// 这里可以把之前的网络计算器搬过来,用来当作上层应用


// for debug
void DefaultOnmessage(std::weak_ptr<Connection> conn)
{
    if(conn.expired()) return;
    auto connection_ptr = conn.lock();

    std::cout << "上层得到了数据: " << connection_ptr->InBuffer() << std::endl;

    // 在这里可以加入多线程(线程池),那么主线程只负责读取数据,在这里交给其他线程处理数据

    //std::string response_str = ?
    // if(response_str.empty()) return;
    // lg(Debug, "%s", response_str.c_str());
    // response_str 发送出去
    // connection_ptr->AppendOutBuffer(response_str);
    // 正确的理解发送?
    // connection_ptr->_send_cb(connection_ptr);
    
    auto tcpserver = connection_ptr->_tcp_server_ptr.lock();
    tcpserver->Sender(connection_ptr);
}

int main()
{
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080,DefaultOnmessage));
    epoll_svr->Init();
    epoll_svr->Loop();

    return 0;
}

总之我们的服务仅仅只是处理数据,将读取上来,而并没有做处理,我们也可以加入上层应用,比如之前的网络版本计算器,现在没有加入,导致读取上来的数据是堆积在一起的。

多路转接-epoll/Reactor(2),服务器,运维

在大型服务器中,我们还可以加入多线程,主线程专门只做连接,和读取数据,其他线程处理数据。

还可以加入链接管理机制,比如用一个小堆,里面放入每个链接的链接时长,对于长时间没有相应的链接,我们可以判定超时而将它关闭,如果在超时前发送了消息,那么就重置它的超时时间。 

总之Reactor就是一个反应堆,它是一个半同步半异步模型,说是同步,体现在它也要进行等待,它参与了这个过程,异步体现在它可以将数据交给其他线程处理,然后只将结果返回。

就像打地鼠游戏一样,玩家监视着很多地洞,哪个洞冒出了地鼠,我们玩家就要去处理。 文章来源地址https://www.toymoban.com/news/detail-849426.html

到了这里,关于多路转接-epoll/Reactor(2)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 服务器(I/O)之多路转接

    1、阻塞等待:在内核将数据准备好之前,系统调用会一直等待。所有的套接字,默认都是阻塞方式。 2、非阻塞等待:如果内核没有将数据准备好,系统调用仍然会返回,并且会返回EWUOLDBLOCK或者EAGAIN错误码。 3、信号驱动:内核将数据准备好的时候,使用SIGIO信号通知应用程

    2024年02月09日
    浏览(62)
  • epoll多路复用_并发服务器

    应用程序: 驱动程序:

    2024年02月15日
    浏览(58)
  • 计算机网络编程 | 多路I/O转接服务器

    欢迎关注博主 Mindtechnist 或加入【Linux C/C++/Python社区】一起学习和分享Linux、C、C++、Python、Matlab,机器人运动控制、多机器人协作,智能优化算法,滤波估计、多传感器信息融合,机器学习,人工智能等相关领域的知识和技术。 专栏:《网络编程》 多路IO转接服务器也叫做多

    2024年02月12日
    浏览(52)
  • 多路IO—POll函数,epoll服务器开发流程

    \\\"在计算机网络编程中,多路IO技术是非常常见的一种技术。其中,Poll函数和Epoll函数是最为常用的两种多路IO技术。这两种技术可以帮助服务器端处理多个客户端的并发请求,提高了服务器的性能。本文将介绍Poll和Epoll函数的使用方法,并探讨了在服务器开发中使用这两种技

    2024年02月06日
    浏览(43)
  • Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)

    本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇 上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每

    2024年01月20日
    浏览(59)
  • Linux学习记录——사십사 高级IO(6)--- Epoll型服务器(3)(Reactor)

    看完前两篇再看这篇,本篇将会写Reactor EpollServer.hpp中创建一个函数HandlerRequest,用它来做Recver函数的数据处理,也就是数据分析。 改一下回调函数,不向外暴露Connection类。 Main.cc中就不需要两个函数,一个计算函数就可以 处理数据那里再加上最后的步骤 回到Recver函数,调用

    2024年01月20日
    浏览(46)
  • Linux学习记录——사십오 高级IO(6)--- Epoll型服务器(3)(Reactor)

    看完前两篇再看这篇,本篇将会写Reactor EpollServer.hpp中创建一个函数HandlerRequest,用它来做Recver函数的数据处理,也就是数据分析。 改一下回调函数,不向外暴露Connection类。 Main.cc中就不需要两个函数,一个计算函数就可以 处理数据那里再加上最后的步骤 回到Recver函数,调用

    2024年01月23日
    浏览(59)
  • 【TCP服务器的演变过程】使用IO多路复用器epoll实现TCP服务器

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节的基础上,将IO多路复用机制select改为更高效的IO多路复用机制epoll,使用epoll管理每

    2024年01月17日
    浏览(70)
  • Linux多路IO复用技术——epoll详解与一对多服务器实现

    本文详细介绍了Linux中epoll模型的优化原理和使用方法,以及如何利用epoll模型实现简易的一对多服务器。通过对epoll模型的优化和相关接口的解释,帮助读者理解epoll模型的工作原理和优缺点,同时附带代码实现和图解说明。

    2024年02月05日
    浏览(44)
  • TCP服务器的演变过程:使用epoll构建reactor网络模型实现百万级并发(详细代码)

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节介绍了如何使用epoll开发高效的服务器,本节将介绍使用epoll构建reactor网络模型,实

    2024年02月01日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包