服务器IO复用reactor模式

这篇具有很好参考价值的文章主要介绍了服务器IO复用reactor模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <functional>
#include <future>
#include <algorithm>

const int MAX_CLIENTS = 10;
const int BUFFER_SIZE = 1024;
const int MAX_THREADS = 4;

struct EventData
{
    int clientSocket;
};

class ThreadPool
{
public:
    ThreadPool(size_t numThreads)
    {
        for (size_t i = 0; i < numThreads; ++i)
        {
            threads_.emplace_back([this]
                                  {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });

                        if (stop_ && tasks_.empty()) {
                            return;
                        }

                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    //std::cout << "task(); " << std::endl;
                    task();
                } });
        }
    }

    ~ThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            stop_ = true;
        }
        condition_.notify_all();

        for (std::thread &thread : threads_)
        {
            thread.join();
        }
    }

    void Enqueue(std::function<void()> func)
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            tasks_.emplace(std::move(func));
        }
        condition_.notify_one();
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;

    std::mutex mutex_;
    std::condition_variable condition_;
    bool stop_ = false;
};

class Reactor
{
public:
    Reactor(ThreadPool &threadPool) : threadPool_(threadPool)
    {
    }

    void Register(int clientSocket)
    {
        std::cout << "Register " << std::endl;
        //std::lock_guard<std::mutex> lock(mutex_);
        std::cout << "Register2 " << std::endl;
        clientSockets_.push_back(clientSocket);
        std::cout << "Socket " << clientSocket << " registered." << std::endl;
    }

    void Remove(int clientSocket)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        clientSockets_.erase(std::remove_if(clientSockets_.begin(), clientSockets_.end(),
                                            [clientSocket](int socket)
                                            { return socket == clientSocket; }),
                             clientSockets_.end());
        close(clientSocket);
        FD_CLR(clientSocket, &readFds);
        std::cout << "Socket " << clientSocket << " removed." << std::endl;
        clientSocket = 0;
    }

    void Run(int mainSocket)
    {
        int maxFd = mainSocket;

        while (true)
        {
            FD_ZERO(&readFds);
            FD_SET(mainSocket, &readFds);

            {
                std::unique_lock<std::mutex> lock(mutex_);

                for (int socket : clientSockets_)
                {
                    if (socket > 0)
                    {
                        FD_SET(socket, &readFds);
                        maxFd = std::max(maxFd, socket);
                    }
                }

                struct timeval timeout;
                timeout.tv_sec = 0;
                timeout.tv_usec = 0.01;

                int selectResult = select(maxFd + 1, &readFds, nullptr, nullptr, &timeout);
                if (selectResult == -1)
                {
                    perror("select");
                    return;
                }
                else if (selectResult == 0)
                {
                    // 没有就绪套接字,继续事件循环
                    continue;
                }

                if (FD_ISSET(mainSocket, &readFds))
                {
                    // 有新的连接请求
                    struct sockaddr_in clientAddress;
                    socklen_t clientAddressLength = sizeof(clientAddress);
                    int clientSocket = accept(mainSocket, (struct sockaddr *)&clientAddress, &clientAddressLength);

                    if (clientSocket == -1)
                    {
                        if (errno == EWOULDBLOCK)
                        {
                            // 没有新连接,继续事件循环
                            continue;
                        }
                        else
                        {
                            perror("accept");
                            break;
                        }
                    }
                    else
                    {
                        std::cout << "Accepted new connection." << std::endl;

                        // 将客户端套接字添加到客户端套接字数组

                        Register(clientSocket);
                        std::cout << "Register1 " << std::endl;
                    }
                }

                // for (int i = 0; i < MAX_CLIENTS; ++i)
                // {
                //     if (FD_ISSET(clientSockets[i], &readFds))
                //     {
                //         readySockets.push_back(clientSockets[i]);
                //     }
                // }
            }

            for (int socket : clientSockets_)
            {
                if (FD_ISSET(socket, &readFds))
                {
                    threadPool_.Enqueue([this, socket]()
                                        {
                    EventData eventData;
                    eventData.clientSocket = socket;
                    ProcessEvent(eventData); });
                }
            }
        }
    }

private:
    void ProcessEvent(EventData eventData)
    {
        // 处理事件,这里只是简单示例,回传客户端的数据
        char buffer[BUFFER_SIZE];
        memset(buffer, 0, sizeof(BUFFER_SIZE));
        ssize_t bytesRead = recv(eventData.clientSocket, buffer, BUFFER_SIZE, 0);
        if (bytesRead > 0)
        {
            send(eventData.clientSocket, buffer, bytesRead, 0);
        }
        else if (bytesRead == 0 || (bytesRead == -1 && errno != EWOULDBLOCK))
        {
            // 连接关闭或出错,移除客户端
            Remove(eventData.clientSocket);
        }
        // 通知事件已处理
        std::cout << "Processed socket " << eventData.clientSocket << "  event data " << buffer << "in Thread: " << std::this_thread::get_id() << std::endl;
    }

private:
    ThreadPool &threadPool_;
    std::mutex mutex_;
    std::vector<int> clientSockets_;
    fd_set readFds;
};

class Server
{
public:
    Server(ThreadPool &threadPool) : reactor_(threadPool)
    {
    }

    bool Init(int port)
    {
        mainSocket_ = socket(AF_INET, SOCK_STREAM, 0);
        if (mainSocket_ == -1)
        {
            std::cerr << "creat socket err" << std::endl;
            return false;
        }

        int opt = 1;
        if (setsockopt(mainSocket_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
        {
            std::cerr << "setsockopt err" << std::endl;
            close(mainSocket_);
            return false;
        }

        int flags = fcntl(mainSocket_, F_GETFL, 0);
        if (flags == -1)
        {
            std::cerr << "Error getting socket flags." << std::endl;
            close(mainSocket_);
            return false;
        }
        if (fcntl(mainSocket_, F_SETFL, flags | O_NONBLOCK) == -1)
        {
            std::cerr << "Error setting socket to non-blocking mode." << std::endl;
            close(mainSocket_);
            return false;
        }

        struct sockaddr_in serverAddress;
        serverAddress.sin_family = AF_INET;
        serverAddress.sin_addr.s_addr = INADDR_ANY;
        serverAddress.sin_port = htons(port);

        if (bind(mainSocket_, (struct sockaddr *)&serverAddress, sizeof(serverAddress)) == -1)
        {
            perror("bind");
            return false;
        }

        if (listen(mainSocket_, MAX_CLIENTS) == -1)
        {
            perror("listen");
            return false;
        }

        port_ = port;

        std::cout << "server init ok, listening on port: " << port_ << "。" << std::endl;

        return true;
    }

    void Run()
    {
        std::thread reactorThread([&]()
                                  { reactor_.Run(mainSocket_); });

        // 等待Reactor线程结束
        reactorThread.join();
    }

private:
    int mainSocket_;
    int port_;
    Reactor reactor_;
};

int main(int argc, char *argv[])
{
    if (argc < 2)
    {
        std::cerr << "please input the port of server。" << std::endl;
        return -1;
    }

    int port = atoi(argv[1]);
    ThreadPool threadPool(MAX_THREADS); // 创建线程池
    Server server(threadPool);          // 将线程池传递给服务器构造函数

    if (!server.Init(port))
    {
        std::cerr << "int server failed :" << port << std::endl;
        return 1;
    }

    server.Run();

    return 0;
}

  • 调试: Linux下nc命令作为客户端:
    nc 127.0.0.1 7777

文章来源地址https://www.toymoban.com/news/detail-694081.html

到了这里,关于服务器IO复用reactor模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)

    本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇 上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每

    2024年01月20日
    浏览(55)
  • 【高并发服务器 02】——线程池与IO多路复用

    线程池的好处 :所有的池都是为了事先把资源准备好,在后续用的时候可以更加方便的拿到这个资源—— 不用去申请、释放资源 什么时候用线程池 ? IO事务并发较高 :人在杭州,但是数据库在北京,想要查询数据库,需要通过互联网建立TCP三次握手,频繁地创建和销毁线

    2024年03月23日
    浏览(49)
  • TCP服务器的演变过程:IO多路复用机制select实现TCP服务器

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节的基础上,将并发的实现改为IO多路复用机制,使用select管理每个新接入的客户端连

    2024年02月03日
    浏览(55)
  • 【TCP服务器的演变过程】使用IO多路复用器epoll实现TCP服务器

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节的基础上,将IO多路复用机制select改为更高效的IO多路复用机制epoll,使用epoll管理每

    2024年01月17日
    浏览(66)
  • IO多路复用中select的TCP服务器模型和poll服务模型

    服务器端 客户端 poll客户端

    2024年02月12日
    浏览(48)
  • Linux多路IO复用技术——epoll详解与一对多服务器实现

    本文详细介绍了Linux中epoll模型的优化原理和使用方法,以及如何利用epoll模型实现简易的一对多服务器。通过对epoll模型的优化和相关接口的解释,帮助读者理解epoll模型的工作原理和优缺点,同时附带代码实现和图解说明。

    2024年02月05日
    浏览(41)
  • 架构篇19:单服务器高性能模式-Reactor与Proactor

    上篇介绍了单服务器高性能的 PPC 和 TPC 模式,它们的优点是实现简单,缺点是都无法支撑高并发的场景,尤其是互联网发展到现在,各种海量用户业务的出现,PPC 和 TPC 完全无能为力。今天我将介绍可以应对高并发场景的单服务器高性能架构模式:Reactor 和 Proactor。 PPC 模式

    2024年01月25日
    浏览(36)
  • 使用IO多路复用select完成TCP循环服务器接收客户端消息并打印

    服务器       客户端     结果    

    2024年02月12日
    浏览(47)
  • 【网络进阶】服务器模型Reactor与Proactor

    在高并发编程和网络连接的消息处理中,通常可分为两个阶段:等待消息就绪和消息处理。当使用默认的阻塞套接字时(例如每个线程专门处理一个连接),这两个阶段往往是合并的。因此,处理套接字的线程需要等待消息就绪,这在高并发场景下导致线程频繁地休眠和唤醒

    2024年02月01日
    浏览(44)
  • 【Linux】高级IO --- Reactor网络IO设计模式

    人其实很难抵制诱惑,人只能远离诱惑,所以千万不要高看自己的定力。 1. 多路转接接口select poll epoll所做的工作其实都是事件通知,只向上层通知事件到来,处理就绪事件的工作并不由这些API来完成,这些接口在进行事件通知时,有没有自己的策略呢? 其实是有的,在网络

    2024年02月09日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包