基于多反应堆的高并发服务器【C/C++/Reactor】(中)EventLoop初始化和启动

这篇具有很好参考价值的文章主要介绍了基于多反应堆的高并发服务器【C/C++/Reactor】(中)EventLoop初始化和启动。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

(一)详述EventLoop 

一个eventloop 可以有多少个连接,基于多反应堆的高并发服务器【C/C++/Reactor】,EventLoop初始化,C/C++/Reactor,高并发服务器,基于多反应堆的

这个Dispatcher是一个事件分发模型,通过这个模型,就能够检测对应的文件描述符的事件的时候,可以使用epoll/poll/select,前面说过三选一。另外不管是哪一个底层的检测模型,它们都需要使用一个数据块,这个数据块就叫做DispatcherData。除此之外,还有另外一个部分,因为在这个反应堆模型里边对应一系列的文件描述符,都属于epoll/poll/select,但是这些文件描述符,它们不是固定的。

  1. 可能我们和客户端新建立了一个连接,那么就需要把某个节点就是某个文件描述符添加到这个Dispatcher模型上边
  2. 或者说服务器和客户端断开连接了,对应的这个文件描述符,就需要从Dispatcher对应的检测集合中删除
  3. 还有一种情况就是要修改Dispatcher检测的这些文件描述符对应的事件

一个eventloop 可以有多少个连接,基于多反应堆的高并发服务器【C/C++/Reactor】,EventLoop初始化,C/C++/Reactor,高并发服务器,基于多反应堆的

struct EventLoop{
    bool isQuit;// 开关
    struct Dispatcher* dispatcher;
    void* dispatcherData;
    
    // 任务队列
    struct ChannelElement* head;
    struct ChannelElement* tail;

    // 用于存储channel的map
    struct ChannelMap* channelMap;
    
    // 线程ID,Name,mutex
    pthread_t threadID;
    char threadName[32];
    pthread_mutex_t mutex;
    // 待续写...
};

对于刚才描述的这三种情况,不管是哪一个,都可以把它们称为任务。既然是任务,如果产生了多个这样的任务,就需要把这些任务存储起来,所以对应的就需要有一个任务队列。在C语言里边是没有所谓现成的任务队列可直接拿来使用。C++中就有STL,可以直接拿一个队列来直接用,或者说拿一个list来直接用,在C语言里边,我们只能自己写了...

一个eventloop 可以有多少个连接,基于多反应堆的高并发服务器【C/C++/Reactor】,EventLoop初始化,C/C++/Reactor,高并发服务器,基于多反应堆的

// 处理该节点中的channel的方式
enum ElemType{
    ADD,    // 添加
    DELETE, // 删除
    MODIFY  // 修改
};

// 定义任务队列的节点
struct ChannelElement {
    int type;// 如何处理该节点中的channel
    struct Channel* channel;
    struct ChannelElement* next;
};

既然它是一个任务队列,也就意味着这个队列里边的节点的个数是不固定的,所以我们就需要一个动态的模型,可以实现一个链表。这个链表的节点是什么类型的?是ChannelElement类型.所谓ChannelElement类型。它里边主要其实是一个Channel,还有下一个节点的指针。通过指向下一个节点的指针,就可以把每一个节点连接起来了。当这个任务队列里边有了任务之后,我们就需要通过一个循环,把链表里边所有的节点都读出来。

  •     如果这是一个添加事件的节点,那么我们就把这个文件描述符添加到Dispatcher对应的检测集合中。
  •     如果它是删除的,那么我们就把这个文件描述符从Dispatcher的检测集合中删除。
  •     如果它是修改事件,那么我们就把这个节点在Dispatcher中的事件做一个修改。

这个EventLoop里边有一个任务队列,可以说这个EventLoop它是一个生产者和消费者模型

  • 消费者是谁呢?
    • 就是这个Dispatcher
  • 生产者是谁呢?
    • 生产者有可能是其他的线程,比如说主线程和客户端建立了连接,剩下的事就是通信。如果要通信,就对应一个通信的文件描述符。主线程就把这个任务添加到了子线程对应的这个EventLoop里边。此时在taskQueue里边就多出来了一个节点。在遍历这个任务队列的时候,读到这个节点之后,就需要把当前的这个节点添加到Dispatcher对应的检测模型里边。

另外,在这个EventLoop里边,还有一个ChannelMap,这个ChannelMap也是我们实现的,是通过一个数组来实现的。基于这个ChannelMap,就能够通过一个文件描述符得到对应的那个channel,为什么要得到那个channel呢?因为在这个channel里边有文件描述符,它的读事件写事件对应的回调函数,就是事件触发之后,执行什么样的处理动作。

思考】什么时候用到了这个ChannelMap了呢?

在实现epoll/select/poll的时候,分别调用了epoll_wait函数/select函数/poll函数,通过遍历内核传出来的这个集合,就得到了触发对应事件的那个文件描述符。但我们现在处理不了,因为得不到对应的channel,我们可以通过EventLoop里边提供的这个ChannelMap就能够得到对应的Channel指针,这个Channel指针就可以调用事件对应的处理函数了。

这三大块之外,还有一些其他的数据成员,比如threadID。因为在当前的服务器里边有多个EventLoop,每个EventLoop都属于一个线程。所以我们可以记录一下这个EventLoop它所对应的那个子线程的线程ID。关于子线程的这个名字肯定是我们给它起的,因为子线程创建出来之后,它只有一个ID,这是系统分配的。关于它的名字,操作系统是没有告诉我们的。

ThreadCondition是条件变量,可用于阻塞线程

思考ThreadMutex这个互斥锁它保护的是什么?

其实它保护的是这个任务队列。为什么要保护任务队列呢?对于这个EventLoop来说,它能够被多少个线程操作呢?

  • 如果是主线程的EventLoop,那就是一个。
  • 如果是子线程的EventLoop,那就有可能是两个,为什么是两个呢?
    • 当前,线程在执行这个EventLoop的时候,它肯定要遍历这个taskQueue吧,也就是当前线程需要读这个任务队列。
    • 除此之外,如果主线程和客户端建立了一个连接,主线程是有可能要把一个任务添加到这个EventLoop,对应的任务队列里边,就是额外的另一个线程了。
    • 如果涉及到两个线程操作,同一块共享资源,那么我们是要通过互斥锁来保护这个共享资源的。如果不保护,肯定就会出现数据混乱。

一个eventloop 可以有多少个连接,基于多反应堆的高并发服务器【C/C++/Reactor】,EventLoop初始化,C/C++/Reactor,高并发服务器,基于多反应堆的

整个项目的结构,在当前这个多反应堆模型的服务器程序里边,它是有多个EventLoop模型的。首先,在主线程里边就有一个EventLoop主线程的,这个EventLoop去检测客户端有没有新的连接到达。如果有新连接,就建立新连接。之后,主线程把这个通信的任务给到线程池里边,把主线程的那个EventLoop也传进来。一定要注意这个EventLoop和主线程的EventLoop是同一个实例。也就是说,线程池里边的这个MainEventLoop和外边这个MainEventLoop它们对应的是同一块内存地址

另外,在线程池里边还有若干个子线程,每个子线程里边都对应一个EventLoop。每个子线程里边的EventLoop它们主要是处理通信的文件描述符相关的操作。这些都是在子线程里边来完成的:

  • 比如说要把一个通信的文件描述符添加到EventLoop里边。
  • 如果服务器和客户端断开连接了,那么就需要把通信的文件描述符从EventLoop里边删除或者要修改这个通信的文件描述符检测的事件

思考】那么,为什么右侧的TcpConnection里边也有一个EventLoop呢?

关于这个TcpConnection,其实它是封装了用于通信的文件描述符,在这个模块里边,是把子线程里边那个EventLoop的地址传递给了TcpConnection。

在每个线程里边,都有一个EventLoop,也就是说EventLoop是属于线程的,不管是主线程还是子线程,里边都有一个EventLoop然后在这个TcpConnection里边,也有一个EventLoop,但是不是说EventLoop属于TcpConnection,而是TcpConnection属于EventLoop

如果TcpConnection,它属于EventLoop,那么这个TcpConnection就属于对应的某一个子线程。EventLoop属于哪个子线程,这个TcpConnection它就属于哪个子线程。它对应的那些任务处理就是和客户端通信,接收数据以及发送数据的操作就在哪个子线程里边来完成。在线程池里边传进来了一个主线程EventLoop,主线程的EventLoop也是一个反应堆实例。

思考】为什么要把主线程的反应堆实例传递给线程池呢?

是因为我们在给线程池做初始化的时候,如果指定线程池的子线程个数为0,此时线程池就没有了,不能工作。

为了能够保证线程池能够工作,也就传进来了一个主线程的反应堆实例,在没有子线程的情况下,那么就借用主线程的反应堆实例来完成对应的这一系列的任务处理。在此时,客户端和服务器建立连接之后,得到了用于通信的文件描述符,这个通信的文件描述符被TcpConnection封装起来了。我们就需要把这个TcpConnection放到一个反应堆模型里边,就是放到主线程的EventLoop里边,这样客户端和服务器的通信操作也就能实现了。这种比较极端的情况,对于程序来说,它就是一个单反应堆模型。

  • 如果在创建线程池的时候指定这个子线程的个数大于0,那么就是一个多反应堆的服务器模型。
  • 如果在创建线程池的时候指定线程的个数等于0,此时就是一个单反应堆的服务器模型。

>>总结

(1)反应堆模型中的EventLoop介绍 

  • 详细介绍了反应堆模型中的EventLoop,包括它的主要作用、如何被多个线程操作,以及它与任务队列和Dispatcher检测模型的关系。对于理解反应堆模型的工作原理至关重要。

(2)主线程与子线程的交互过程

  • 主线程与子线程通过任务队列进行交互,子线程处理通信相关的文件描述符操作
  • 主线程它只能负责和客户端建立连接,如果这个连接建立好了,剩下的事情都是需要由这个子线程来完成的。所以主线程肯定不会给你去处理任务队列里边的节点。在主线程里边,其实它是有一个反应堆模型的,在当前的这个子线程里边也有一个反应堆模型。每个反应堆模型里边都有一个Dispatcher。关于这个Dispatcher就是epoll、poll、或者select,所以主线程去处理的话,这个任务就放到主线程的那个Dispatcher里边了,这样很显然是不对的。故在子线程的任务队列里边有了任务之后,还需要交给子线程的Dispatcher去处理。
  • 因此这个节点的处理,还需要判断当前线程到底是什么线程。如果它是主线程不能让它去处理,如果是子线程,直接让它去处理

(3)EventLoop与任务队列的关系

  • 当任务队列中有任务时,会通过循环遍历链表,将任务添加到Dispatcher对应的检测模型中。

(4)文件描述符的管理

  • 新连接建立或断开时,或要修改Dispatcher检测的这些文件描述符对应的事件时,文件描述符的添加、删除和修改操作都与EventLoop紧密相关。

 (二)EventLoop 的初始化

  •  EventLoop .h
// 初始化
struct EventLoop* eventLoopInit();
struct EventLoop* eventLoopInitEx(const char* threadName);
  •  EventLoop .c
// 初始化
struct EventLoop* eventLoopInit() {
    return eventLoopInitEx(NULL);
}

struct EventLoop* eventLoopInitEx(const char* threadName) {
    struct EventLoop* evLoop = (struct EventLoop*)malloc(sizeof(struct EventLoop));
    evLoop->isQuit = false; // 没有运行
    evLoop->dispatcher = &EpollDispatcher;
    evLoop->dispatcherData = evLoop->dispatcher->init(); 
    
    // 任务队列(链表)
    evLoop->head = evLoop->tail = NULL;

    // 用于存储channel的map
    evLoop->channelMap = channelMapInit(128);

    evLoop->threadId = pthread_self(); // 当前线程ID
    strcpy(evLoop->threadName,threadName == NULL ? "MainThread" : threadName); // 线程的名字
    pthread_mutex_init(&evLoop->mutex, NULL); 

    ...(待续写)
    return evLoop;
}

 (二)启动 EventLoop 

  •  EventLoop .h
// 启动反应堆模型
int eventLoopRun(struct EventLoop* evLoop);
  •  EventLoop .c

参数有效性检查 启动函数要求传入一个有效的指针。通过断言,我们可以确保指针的有效性。如果无效,程序将抛出异常。

模型提取与循环EventLoop 中取出模型后,需要让它进行while循环。这个循环中,可调用的是epoll模型epoll_wait函数/poll模型poll函数/select模型select函数。但这个调用不是直接的,而是通过dispatcher指针的dispatch函数。

线程安全检查 在执行dispatch函数时,会检查当前线程的ID与保存的线程ID是否匹配。如果不匹配,则视为异常情况,返回-1 

动态任务函数理解dispatch指向的任务函数是动态的,根据初始化的底层lO模型(如epoll、poll或select)不同,dispatch函数内部调用的函数也不同

文件描述符处理 每次调用 select/poll/epoll_wait 时,会得到一些被激活的文件描述符。通过for循环,对这些文件描述符进行处理

// 启动反应堆模型
int eventLoopRun(struct EventLoop* evLoop) {
    assert(evLoop != NULL);
    // 取出事件分发和检测模型
    struct Dispatcher* dispatcher = evLoop->dispatcher;
    // 比较线程ID是否正常
    if(evLoop->threadID != pthread_self()) {
        return -1;
    }
    // 循环进行事件处理
    while(!evLoop->isQuit) {
        /*
            dispatch指向的任务函数其实是动态的,由于在初始化的时候指向的是底层的
            IO模型用的是epoll模型,那么dispatcher->dispatch(evLoop,2);
            就是调用epollDispatch,里头的epoll_wait函数会随之被调用,每
            调用一次epoll_wait函数,就会得到一些被激活的文件描述符
            然后通过for循环,对被激活的文件描述符做一系列的处理 
            如果是poll,就是调用pollDispatch,里头的poll函数会随之被调用,每
            调用一次poll函数,就会得到一些被激活的文件描述符
            然后通过for循环,对被激活的文件描述符做一系列的处理 
            如果是select,就是调用selectDispatch,里头的select函数会随之被调用,每
            调用一次select函数,就会得到一些被激活的文件描述符
            然后通过for循环,对被激活的文件描述符做一系列的处理 
        */
        dispatcher->dispatch(evLoop,2); // 超时时长 2s
        ...(待续写)
    }
    return 0;   
}

详解注释:

dispatch指向的任务函数其实是动态的:由于在初始化的时候指向的是底层的IO模型用的是epoll模型,那么dispatcher->dispatch(evLoop,2);就是调用epollDispatch,里头的epoll_wait函数会随之被调用,每调用一次epoll_wait函数,就会得到一些被激活的文件描述符然后通过for循环,对被激活的文件描述符做一系列的处理 

static int epollDispatch(struct EventLoop* evLoop,int timeout) {
    struct EpollData* data = (struct EpollData*)evLoop->dispatcherData;
    int count = epoll_wait(data->epfd,data->events,Max,timeout * 1000);
    for(int i=0;i<count;++i) {
        int events = data->events[i].events;
        int fd = data->events[i].data.fd;
        if(events & EPOLLERR || events & EPOLLHUP) {
            // 对方断开了连接,删除 fd
            // epollRemove(&evLoop->channels[fd],evLoop);
            continue;
        }
        if(events & EPOLLIN) {
            ...(待续写)
        }
        if(events & EPOLLOUT) {
            ...(待续写)
        }
    }
    return 0;
}

如果是poll,就是调用pollDispatch,里头的poll函数会随之被调用,每调用一次poll函数,就会得到一些被激活的文件描述符,然后通过for循环,对被激活的文件描述符做一系列的处理 

static int pollDispatch(struct EventLoop* evLoop,int timeout) {
    struct PollData* data = (struct PollData*)evLoop->dispatcherData;
    int count = poll(data->fds,data->maxfd + 1,timeout * 1000);
    if(count == -1) {
        perror("poll");
        exit(0);
    }
    for(int i=0;i<=data->maxfd;++i) {
        if(data->fds[i].fd == -1) {
            continue;
        }
        if(data->fds[i].revents & POLLIN) {
            ...(待续写)
        }
        if(data->fds[i].revents & POLLOUT) {
            ...(待续写)
        }
    }
    return 0;
}

如果是select,就是调用selectDispatch,里头的select函数会随之被调用,每调用一次select函数,就会得到一些被激活的文件描述符,然后通过for循环,对被激活的文件描述符做一系列的处理 

static int selectDispatch(struct EventLoop* evLoop,int timeout) {
    struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;
    struct timeval val;
    val.tv_sec = timeout;
    val.tv_usec = 0;
    fd_set rdtmp = data->readSet;
    fd_set wrtmp = data->writeSet;
    int count = select(Max,&rdtmp,&wrtmp,NUll,&val);
    if(count == -1) {
        perror("select");
        exit(0);
    }
    for(int i=0;i<Max;++i) { 
        if(FD_ISSET(i,&rdtmp)) {
            ...(待续写)
        }
        if(FD_ISSET(i,&wrtmp)) {
            ...(待续写)
        }
    }
    return 0;
}

总结 :反应堆模型的启动流程涉及多个关键环节,从参数有效性检查到线程安全检查,再到动态任务函数的执行。理解这一流程对于维护和优化系统的稳定性至关重要。 文章来源地址https://www.toymoban.com/news/detail-818726.html

到了这里,关于基于多反应堆的高并发服务器【C/C++/Reactor】(中)EventLoop初始化和启动的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包