[学习分享]----sylar服务器框架源码阅读--IO协程调度模块

这篇具有很好参考价值的文章主要介绍了[学习分享]----sylar服务器框架源码阅读--IO协程调度模块。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

sylar作者在本站的地址为 这里,也可以查看 作者主页,也有视频教程可以 点击这里。此外,也可以看一下赵路强大佬的关于sylar协程模块的博客 点击这里,我本人在阅读sylar源码的时候也是参考了赵路强大佬的解析 可以点击这里。
各位看官也可以加我的qq和我讨论2511010742

IO 协程调度

sylar大人在协程调度模块中封装了epoll,对于每一个需要监听的文件描述符fd,都支持可读和可写事件。这部分操作是十分复杂的,需要读者对协程调度模块和epoll模型十分了解,接下来我会尽我所能向大家介绍清楚这部分内容。在协程调度模块中,当没有任务执行时就会在idle状态下忙等待,在本节中就利用了idle状态去做一些需要阻塞等待的任务,比如epoll_wait等。下面,我先介绍一下epoll的相关信息。

epoll简介

epoll 是 Linux 系统提供的一种事件通知机制,主要用于处理大量文件描述符的 I/O 事件。它是一种高效的 I/O 多路复用机制,相比于传统的 select 和 poll,epoll 在处理大量连接时有更好的性能。

1. epoll API

1.1 创建 epoll 实例

int epfd = epoll_create(int size);

这个函数用处是创建一个 epoll 实例,size 本意为希望监听的文件描述符的数量,不用太在意它。

1.2 控制 epoll 上的事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • epfd 是 epoll 实例的文件描述符。
  • op 表示操作类型,有三个值:
    EPOLL_CTL_ADD:添加一个文件描述符到 epoll 实例中。
    EPOLL_CTL_MOD:修改 epoll 实例上的文件描述符的事件。
    EPOLL_CTL_DEL:从 epoll 实例中删除一个文件描述符。
  • fd 是要操作的文件描述符。
  • event 是一个 struct epoll_event 结构体,用于描述事件类型和携带的数据。

1.3 等待事件发生

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • epfd 是 epoll 实例的文件描述符。
  • events 是一个数组,用于存储发生的事件。
  • maxevents 表示 events 数组的大小,即最多可以存储多少个事件。
  • timeout 表示等待事件发生的超时时间,单位是毫秒。传入 -1 表示一直等待,传入 0 表示立即返回,大于 0 表示等待指定的时间。

2. epoll 结构

2.1 eventpoll
epoll 在内核中主要使用两个结构体:struct eventpoll 和 struct epitem。这两个结构体用于管理 epoll 实例中的文件描述符和事件。eventpoll 里成员有很多,我们讲一些常用的。

struct eventpoll {
    spinlock_t      lock;
    struct mutex    mtx;
    struct file     *file;
    struct wait_queue_head *wq;
    struct wait_queue_entry wait;
    wait_queue_t    poll_wait;
    struct list_head rdllist;
    struct list_head ovflist;
    struct rb_root_cached rbr;
    struct list_head poll_siblings;
    struct list_head fasync_list;
    unsigned int    ovflist_length;
    struct ep_pqueue **poll_table;
    struct user_struct *user;
    unsigned long   gen;
    wait_queue_head_t proc_wait;
    struct wakeup_source __rcu *ws;
    unsigned int user_size;
    int wakeup_pipe;
};

主要字段包括:

  • lock: 用于实现对 eventpoll 结构的自旋锁,确保多线程并发安全。
  • mtx: 用于实现互斥锁,确保对 eventpoll 结构的互斥访问。
  • file: 与 epoll 实例关联的文件结构。
  • wq: 等待队列头,用于进程等待事件发生。
  • poll_table: 保存文件描述符到 epitem 的映射表,是一个指针数组。
  • rdllist: 就绪链表,用于存储就绪的事件项 (epitem)。
  • rbr: 红黑树根节点,用于快速查找文件描述符对应的 epitem。
  • 其他字段用于管理等待队列、异步事件、用户信息等。

2.2 epitem
struct epitem 表示 epoll 实例中的一个事件项,它包含了与文件描述符相关的信息和事件状态。

struct epitem {
    struct rb_node  rbn;
    struct list_head rdllink;
    struct list_head fllink;
    struct file     *file;
    struct eventpoll *ep;
    struct wait_queue_head wq;
    struct epoll_event event;
    unsigned long   last_wakeup;
    spinlock_t      lock;
};

主要字段包括:

  • rbn: 用于将 epitem 插入到 struct eventpoll 的红黑树中,以实现快速查找。
  • rdllink: 用于将 epitem 插入到 struct eventpoll 的就绪链表中,以等待用户程序处理。
  • fllink: 用于将 epitem 插入到 struct eventpoll 的就绪链表中的尾部。
  • file: 关联的文件结构。
  • ep: 指向 struct eventpoll 的指针。
  • wq: 等待队列头,用于进程等待事件发生。
  • event: 存储文件描述符的事件类型和数据。
  • last_wakeup: 记录上次唤醒的时间戳,用于避免多次重复唤醒。
  • lock: 用于实现对 epitem 结构的自旋锁,确保多线程并发安全。

这两个结构体是 epoll 内核实现中的核心数据结构,通过它们实现了高效的事件管理和处理机制。struct eventpoll 用于维护 epoll 实例的状态,而 struct epitem 用于表示每个文件描述符的事件状态。

epoll_event 结构体用于描述文件描述符上的事件,它是在 epoll 操作中用到的关键数据结构。以下是 epoll_event 结构体的定义:

struct epoll_event {
    uint32_t events;  // 事件类型,可以是 EPOLLIN、EPOLLOUT、EPOLLERR 等
    epoll_data_t data; // 用户数据,可以携带额外信息
};

events:表示事件的类型,可以是以下几个宏的组合:

  • EPOLLIN:表示对应的文件描述符可以读取(Readable)。
  • EPOLLOUT:表示对应的文件描述符可以写入(Writable)。
  • EPOLLRDHUP:表示对端断开连接,或者是写半关闭状态(可读、对端关闭写)。
  • EPOLLHUP:表示发生挂起事件,通常表示连接被挂断或发生错误。
  • EPOLLERR:表示发生错误,需要通过 errno 获取具体错误信息。

data:是一个联合体,用于携带额外的信息。epoll_data_t 的定义如下:

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

epoll_data 可以用于携带用户数据,具体使用取决于应用的需要。如果用户程序希望在事件发生时携带一些额外的信息,可以通过设置 data 字段。sylar就使用了其中的 ptr 指针来保存事件上下文信息。

首先,通过 epoll_create 创建一个 epoll 实例,得到一个文件描述符,然后,通过 epoll_ctl 将需要监听的文件描述符注册到 epoll 实例中,并指定关注的事件类型。这个过程会将我们添加的文件描述符添加到 epoll 句柄的红黑树中,然后执行 epoll_wait 函数则会阻塞等待这些文件描述符所关心的事件发生,当有关注的事件发生时,epoll_wait 返回,需要处理的事件都会保存在 events 数组里,遍历并处理这些就绪的事件。

为了高效地管理已经就绪的事件项,epoll 使用双向链表维护一个就绪事件链表。当文件描述符上的事件发生时,对应的事件项会被从红黑树中找到,然后加入到就绪链表中等待用户程序处理。用户程序调用 epoll_wait 进行阻塞等待事件的发生。内核检查红黑树上的文件描述符,如果有就绪的文件描述符,将相应的事件项加入到就绪链表中。epoll_wait 返回时,将就绪链表上的事件项传递给用户程序,用户程序可以遍历链表处理已经就绪的事件。

简单介绍完之后,让我们进入今天的主题,读一下 sylar 这部分源码。

源码分析

对于IO协程调度来说,我们只关心它的 fd、事件类型和对应的回调函数,其中 epoll 负责 fd 和事件之间的联系,回调函数则是需要协程调度去调度执行。sylar 建立了一个 FdContext 结构来保存对应的文件描述符,以及事件信息和回调函数。并将其保存在epoll_event的私有数据指针data.ptr中,当 wait 之后,遍历就绪事件就可以拿到这些信息,从而去执行相应的操作。

调度器在 idle 时会 epoll_wait 所有已经注册的 fd,返回时就可以拿到对应的 FdContext。然后将对应事件的回调函数加入到调度器的任务列表中,在 idle 退出后就可以调度这些回调函数了。下面我们看一下头文件信息。

class IOManager : public Scheduler, public TimerManager {
public:
    typedef std::shared_ptr<IOManager> ptr;
    typedef RWMutex RWMutexType;

    // IO事件
    enum Event {
        // 无事件
        NONE = 0x0,
        // 读事件
        READ = 0x1,
        // 写事件
        WRITE = 0x4,
    };
private:
    // Socket事件上下文类
    struct FdContext {
        typedef Mutex MutexType;

        // 事件上下文类
        struct EventContext {
            // 这个事件执行的调度器
            Scheduler* scheduler = nullptr;
            // 事件协程
            Fiber::ptr fiber;
            // 事件回调函数
            std::function<void()> cb;
        };

        // 获取事件上下文类
        EventContext& getContext(Event event);

		// 重新设置事件上下文
        void resetContext(EventContext& ctx);

        // 触发事件
        void triggerEvent(Event event);

		// 定义一读一写事件
        EventContext read, write;

        // 事件关联的句柄
        int fd = 0;

        // 当前描述符所关注事件
        Event events = NONE;

        // 事件的锁
        MutexType mutex;
    };

public:
    IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = "");

    ~IOManager();

    /**
     * @brief 添加事件
     * @param[in] fd socket句柄
     * @param[in] event 事件类型
     * @param[in] cb 事件回调函数
     * @return 添加成功返回0,失败返回-1
     */
    int addEvent(int fd, Event event, std::function<void()> cb = nullptr);

    /**
     * @brief 删除事件
     * @param[in] fd socket句柄
     * @param[in] event 事件类型
     * @attention 不会触发事件
     */
    bool delEvent(int fd, Event event);
	
	// 取消 fd上的某个事件
    bool cancelEvent(int fd, Event event);

	// 取消 fd上的所有事件
    bool cancelAll(int fd);

    // 返回当前IOManager
    static IOManager* GetThis();
protected:
    void tickle() override;
    bool stopping() override;
    void idle() override;
    void onTimerInsertedAtFront() override;

    void contextResize(size_t size);

    /**
     * @brief 判断是否可以停止
     * @param[out] timeout 最近要出发的定时器事件间隔
     * @return 返回是否可以停止
     */
    bool stopping(uint64_t& timeout);
private:
    // epoll文件句柄
    int m_epfd = 0;
    // pipe 句柄
    int m_tickleFds[2];
    // 当前等待执行的事件总数
    std::atomic<size_t> m_pendingEventCount = {0};
    // IOManager的锁
    RWMutexType m_mutex;
    // socket事件上下文的容器
    std::vector<FdContext*> m_fdContexts;

};

可以看到,这里对于每一个文件描述符都关联了相关的事件,以及回调。换句话说就是每一个文件描述符 fd 都可以关注三种事件类型,读写以及无事件,每种事件都可以注册一个专属的回调函数,并且还支持删除和取消事件。

下面来阅读一下 sylar 关于这部分的函数定义,我们先来看一下主体部分。

IOManager::IOManager(size_t threads, bool use_caller, const std::string& name)
    :Scheduler(threads, use_caller, name) {

    std::cout << "[DEBUG]  IOManager create" << std::endl;

	// 创建 epoll实例
    m_epfd = epoll_create(5000);
    //SYLAR_ASSERT(m_epfd > 0);
	
	// 初始化管道
	/*
		这里 sylar使用管道提醒其他线程有任务来了
	*/
    int rt = pipe(m_tickleFds);
    //SYLAR_ASSERT(!rt);

	// 为管道读端注册可读事件
    epoll_event event;
    memset(&event, 0, sizeof(epoll_event));
    // 监听事件,边沿触发
    event.events = EPOLLIN | EPOLLET;
    event.data.fd = m_tickleFds[0];

    rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
    //SYLAR_ASSERT(!rt);

    rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
    //SYLAR_ASSERT(!rt);

    contextResize(32);

	// 直接开启调度
    start();
}

sylar 使用管道来通知有任务要调度

void IOManager::tickle() {
    std::cout << "[INFO]   IOManager::tickle" << std::endl;
    if(!hasIdleThreads()) {
        return;
    }
    // 向管道写段写入数据
    int rt = write(m_tickleFds[1], "T", 1);
    //SYLAR_ASSERT(rt == 1);
}

接下来是 idle 协程的重写了

void IOManager::idle() {
    std::cout << "[DEBUG]  IOManager::idle" << std::endl;
    const uint64_t MAX_EVENTS = 256;
    // 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零
    epoll_event* events = new epoll_event[MAX_EVENTS]();
    // 使用std::shared_ptr来管理这个动态分配的数组的内存
    std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){
        // lambda表达式,用于在shared_ptr释放内存时调用
        // 删除动态分配的epoll_event数组
        delete[] ptr;
    });

    while(true) {
        uint64_t next_timeout = 0;
        if(stopping(next_timeout)) {
            break;
        }

        int rt = 0;
        do {
            static const int MAX_TIMEOUT = 3000;
            if(next_timeout != ~0ull) {
                next_timeout = (int)next_timeout > MAX_TIMEOUT
                                ? MAX_TIMEOUT : next_timeout;
            } else {
                next_timeout = MAX_TIMEOUT;
            }
            rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);
            if(rt < 0 && errno == EINTR) {
                // 在中断时不执行任何操作
            } else {
                break;
            }
        } while(true);
		
        std::vector<std::function<void()> > cbs;
        // 填充过期的回调
        listExpiredCb(cbs);
        if(!cbs.empty()) {
            schedule(cbs.begin(), cbs.end());
            cbs.clear();
        }
		// 上面这一部分关于定时器的内容目前不必理会
		

        for(int i = 0; i < rt; ++i) {
            epoll_event& event = events[i];
            if(event.data.fd == m_tickleFds[0]) {
                uint8_t dummy[256];
                while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
                continue;
                // 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。
            }
			
			// 拿到对应 fd的上下文信息
            FdContext* fd_ctx = (FdContext*)event.data.ptr;
            FdContext::MutexType::Lock lock(fd_ctx->mutex);
            if(event.events & (EPOLLERR | EPOLLHUP)) {
            	// 如果事件是EPOLLERR | EPOLLHUP,就更新事件类型信息
                event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
            }
            // 定义真实发生的事件
            int real_events = NONE;
            if(event.events & EPOLLIN) {
                real_events |= READ;
            }
            if(event.events & EPOLLOUT) {
                real_events |= WRITE;
            }
            // 以上,确定实际发生的事件

            if((fd_ctx->events & real_events) == NONE) {
                continue;
            }

            /*
            下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合,
            而 real_events 表示实际发生的事件集合。
            ~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &,
            将原始关注事件集合中对应实际发生事件的位清零。
            这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。
            */
            int left_events = (fd_ctx->events & ~real_events);
            int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
            event.events = EPOLLET | left_events;

            /*
            接下来,根据剩余的未发生事件集合 left_events 的情况,
            确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。
            如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD;
            否则,操作类型 op 就是 EPOLL_CTL_DEL。

            最后,将 event.events 设置为 EPOLLET | left_events,
            将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件,
            用于后续调用 epoll_ctl 进行修改或删除监听。
            */
            int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
            if(rt2) {
                std::cout << "[ERROR]  epoll_ctl(" << m_epfd << ", "
                    << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):"
                    << rt2 << " (" << errno << ") (" << strerror(errno) << ")";
                continue;
            }

            if(real_events & READ) {
                fd_ctx->triggerEvent(READ);
                --m_pendingEventCount;
            }
            if(real_events & WRITE) {
                fd_ctx->triggerEvent(WRITE);
                --m_pendingEventCount;
            }
            // 触发相关事件
        }

		// 退出 idle协程
        Fiber::ptr cur = Fiber::GetThis();
        auto raw_ptr = cur.get();
        cur.reset();

        raw_ptr->swapOut();
        //raw_ptr->back();
    }
    std::cout << "[DEBUG]  IOManager::idle over" << std::endl;
}

接下来就是剩下的部分了,我会给出全部的代码,并尽我所能给出详细的注释。

// 获得当前事件上下文
IOManager::FdContext::EventContext& IOManager::FdContext::getContext(IOManager::Event event) {
    switch(event) {
        case IOManager::READ:
            return read;
        case IOManager::WRITE:
            return write;
        default:
            //SYLAR_ASSERT2(false, "getContext");
            ;
    }
    // 一般是使用无效参数调用函数抛出异常
    throw std::invalid_argument("getContext invalid event");
}

// 重置事件上下文信息
void IOManager::FdContext::resetContext(EventContext& ctx) {
    ctx.scheduler = nullptr;
    ctx.fiber.reset();
    ctx.cb = nullptr;
}

// 触发这些事件的回调函数
void IOManager::FdContext::triggerEvent(IOManager::Event event) {
    //SYLAR_ASSERT(events & event);

    events = (Event)(events & ~event);
    EventContext& ctx = getContext(event);
    // 将对应的回调或协程加入调度器
    if(ctx.cb) {
        ctx.scheduler->schedule(&ctx.cb);
    } else {
        ctx.scheduler->schedule(&ctx.fiber);
    }
    ctx.scheduler = nullptr;
    return;
}

IOManager::IOManager(size_t threads, bool use_caller, const std::string& name)
    :Scheduler(threads, use_caller, name) {

    std::cout << "[DEBUG]  IOManager create" << std::endl;

    m_epfd = epoll_create(5000);
    //SYLAR_ASSERT(m_epfd > 0);

    int rt = pipe(m_tickleFds);
    //SYLAR_ASSERT(!rt);

    epoll_event event;
    memset(&event, 0, sizeof(epoll_event));
    // 监听事件,边沿触发
    event.events = EPOLLIN | EPOLLET;
    event.data.fd = m_tickleFds[0];

    rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
    //SYLAR_ASSERT(!rt);

    rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
    //SYLAR_ASSERT(!rt);

    contextResize(32);

    start();
}

IOManager::~IOManager() {
    std::cout << "[INFO]   ~IOManager" << std::endl;
    stop();
    close(m_epfd);
    close(m_tickleFds[0]);
    close(m_tickleFds[1]);

    for(size_t i = 0; i < m_fdContexts.size(); ++i) {
        if(m_fdContexts[i]) {
            delete m_fdContexts[i];
        }
    }
}

// 重置 fd上下文容器
void IOManager::contextResize(size_t size) {
    m_fdContexts.resize(size);

    for(size_t i = 0; i < m_fdContexts.size(); ++i) {
        if(!m_fdContexts[i]) {
            m_fdContexts[i] = new FdContext;
            m_fdContexts[i]->fd = i;
        }
    }
}

// 向某个文件描述符上添加事件
int IOManager::addEvent(int fd, Event event, std::function<void()> cb) {
    FdContext* fd_ctx = nullptr;
    RWMutexType::ReadLock lock(m_mutex);
    // 如果 m_fdContexts大小不够的话,就扩容
    if((int)m_fdContexts.size() > fd) {
        fd_ctx = m_fdContexts[fd];
        lock.unlock();
    } else {
        lock.unlock();
        RWMutexType::WriteLock lock2(m_mutex);
        contextResize(fd * 1.5);
        fd_ctx = m_fdContexts[fd];
    }

    FdContext::MutexType::Lock lock2(fd_ctx->mutex);

    // 确定是添加事件(EPOLL_CTL_ADD)还是修改事件(EPOLL_CTL_MOD)。
    // 如果 fd_ctx->events 不为零,说明文件描述符已经关注了其他事件,需要进行修改。
    int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
    epoll_event epevent;
    // 边缘触发 + 原有的事件 + 新加入的事件
    epevent.events = EPOLLET | fd_ctx->events | event;
    // data.ptr 设置为 fd_ctx,用于在事件发生时追溯到对应的文件描述符上下文。
    epevent.data.ptr = fd_ctx;

    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if(rt) {
        std::cout << "[ERROR]  epoll_ctl(" << m_epfd << ", "
            << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
            << rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events="
            << (EPOLL_EVENTS)fd_ctx->events;
        return -1;
    }

    // 更新 IOManager 中待处理事件的计数,并更新文件描述符上下文的关注事件集合。
    ++m_pendingEventCount;
    fd_ctx->events = (Event)(fd_ctx->events | event);
    FdContext::EventContext& event_ctx = fd_ctx->getContext(event);
	// 设置调度器
    event_ctx.scheduler = Scheduler::GetThis();
    if(cb) {
        event_ctx.cb.swap(cb);
    } else {
        event_ctx.fiber = Fiber::GetThis();
    }
    return 0;
}

// 删除事件
bool IOManager::delEvent(int fd, Event event) {
    RWMutexType::ReadLock lock(m_mutex);
    if((int)m_fdContexts.size() <= fd) {
        return false;
    }
    FdContext* fd_ctx = m_fdContexts[fd];
    lock.unlock();

    FdContext::MutexType::Lock lock2(fd_ctx->mutex);
    if(!(fd_ctx->events & event)) {
        return false;
    }

    // 计算新的关注事件集合,通过将指定事件从原有集合中去除。
    Event new_events = (Event)(fd_ctx->events & ~event);
    int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
    epoll_event epevent;
    epevent.events = EPOLLET | new_events;
    epevent.data.ptr = fd_ctx;

    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if(rt) {
        std::cout << "[ERROR]  epoll_ctl(" << m_epfd << ", "
            << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
            << rt << " (" << errno << ") (" << strerror(errno) << ")";
        return false;
    }

    --m_pendingEventCount;
    fd_ctx->events = new_events;
    FdContext::EventContext& event_ctx = fd_ctx->getContext(event);
    fd_ctx->resetContext(event_ctx);
    // 获取特定事件类型的上下文信息,并重置该上下文信息。重置的目的是清除可能存在的回调函数和协程等相关信息。

    return true;
}

// 取消 fd的某个事件
bool IOManager::cancelEvent(int fd, Event event) {
    RWMutexType::ReadLock lock(m_mutex);
    if((int)m_fdContexts.size() <= fd) {
        return false;
    }
    FdContext* fd_ctx = m_fdContexts[fd];
    lock.unlock();

    FdContext::MutexType::Lock lock2(fd_ctx->mutex);
    if(!(fd_ctx->events & event)) {
        return false;
    }

    Event new_events = (Event)(fd_ctx->events & ~event);
    int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
    epoll_event epevent;
    epevent.events = EPOLLET | new_events;
    epevent.data.ptr = fd_ctx;

    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if(rt) {
        std::cout << "[ERROR]  epoll_ctl(" << m_epfd << ", "
            << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
            << rt << " (" << errno << ") (" << strerror(errno) << ")";
        return false;
    }

    // 触发事件,执行相应的回调函数或唤醒关联的协程。
    fd_ctx->triggerEvent(event);
    --m_pendingEventCount;
    return true;
}

// 取消某个描述符上的所有事件
bool IOManager::cancelAll(int fd) {
    RWMutexType::ReadLock lock(m_mutex);
    if((int)m_fdContexts.size() <= fd) {
        return false;
    }
    FdContext* fd_ctx = m_fdContexts[fd];
    lock.unlock();

    FdContext::MutexType::Lock lock2(fd_ctx->mutex);
    if(!(fd_ctx->events)) {
        return false;
    }    

    int op = EPOLL_CTL_DEL;
    epoll_event epevent;
    epevent.events = 0;
    epevent.data.ptr = fd_ctx;

    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if(rt) {
        std::cout << "[ERROR]  epoll_ctl(" << m_epfd << ", "
            << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
            << rt << " (" << errno << ") (" << strerror(errno) << ")";
        return false;
    }

    if(fd_ctx->events & READ) {
        fd_ctx->triggerEvent(READ);
        --m_pendingEventCount;
    }
    if(fd_ctx->events & WRITE) {
        fd_ctx->triggerEvent(WRITE);
        --m_pendingEventCount;
    }
    // 如果文件描述符上下文关注了读或写事件,则触发相应的事件,并更新 IOManager 中待处理事件的计数。

    //SYLAR_ASSERT(fd_ctx->events == 0);
    return true;
}

IOManager* IOManager::GetThis() {
    // dynamic_cast 是 C++ 中的一个运算符,用于在运行时执行安全的类型转换。
    // 它主要用于进行基类和派生类之间的安全转型。
    return dynamic_cast<IOManager*> (Scheduler::GetThis());
}

void IOManager::tickle() {
    std::cout << "[INFO]   IOManager::tickle" << std::endl;
    if(!hasIdleThreads()) {
        return;
    }
    int rt = write(m_tickleFds[1], "T", 1);
    //SYLAR_ASSERT(rt == 1);
}

bool IOManager::stopping(uint64_t& timeout) {
    timeout = getNextTimer();
    return timeout == ~0ull
        && m_pendingEventCount == 0
        && Scheduler::stopping();
}

bool IOManager::stopping() {
    uint64_t timeout = 0;
    return stopping(timeout);
}

void IOManager::idle() {
    std::cout << "[DEBUG]  IOManager::idle" << std::endl;
    const uint64_t MAX_EVENTS = 256;
    // 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零
    epoll_event* events = new epoll_event[MAX_EVENTS]();
    // 使用std::shared_ptr来管理这个动态分配的数组的内存
    std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){
        // lambda表达式,用于在shared_ptr释放内存时调用
        // 删除动态分配的epoll_event数组
        delete[] ptr;
    });

    while(true) {
        uint64_t next_timeout = 0;
        // if(SYLAR_UNLIKELY(stopping(next_timeout))) {
        //     SYLAR_LOG_INFO(g_logger) << "name=" << getName()
        //                              << " idle stopping exit";
        //     break;
        // }

        if(stopping(next_timeout)) {
            break;
        }

        int rt = 0;
        do {
            static const int MAX_TIMEOUT = 3000;
            if(next_timeout != ~0ull) {
                next_timeout = (int)next_timeout > MAX_TIMEOUT
                                ? MAX_TIMEOUT : next_timeout;
            } else {
                next_timeout = MAX_TIMEOUT;
            }
            rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);
            if(rt < 0 && errno == EINTR) {
                // 在中断时不执行任何操作
            } else {
                break;
            }
        } while(true);

        std::vector<std::function<void()> > cbs;
        // 填充过期的回调
        listExpiredCb(cbs);
        if(!cbs.empty()) {
            schedule(cbs.begin(), cbs.end());
            cbs.clear();
        }

        for(int i = 0; i < rt; ++i) {
            epoll_event& event = events[i];
            if(event.data.fd == m_tickleFds[0]) {
                uint8_t dummy[256];
                while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
                continue;
                // 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。
            }

            FdContext* fd_ctx = (FdContext*)event.data.ptr;
            FdContext::MutexType::Lock lock(fd_ctx->mutex);
            if(event.events & (EPOLLERR | EPOLLHUP)) {
                event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
            }
            int real_events = NONE;
            if(event.events & EPOLLIN) {
                real_events |= READ;
            }
            if(event.events & EPOLLOUT) {
                real_events |= WRITE;
            }
            // 以上,确定实际发生的事件

            if((fd_ctx->events & real_events) == NONE) {
                continue;
            }

            /*
            下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合,
            而 real_events 表示实际发生的事件集合。
            ~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &,
            将原始关注事件集合中对应实际发生事件的位清零。
            这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。
            */
            int left_events = (fd_ctx->events & ~real_events);
            int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
            event.events = EPOLLET | left_events;

            /*
            接下来,根据剩余的未发生事件集合 left_events 的情况,
            确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。
            如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD;
            否则,操作类型 op 就是 EPOLL_CTL_DEL。

            最后,将 event.events 设置为 EPOLLET | left_events,
            将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件,
            用于后续调用 epoll_ctl 进行修改或删除监听。
            */
            int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
            if(rt2) {
                std::cout << "[ERROR]  epoll_ctl(" << m_epfd << ", "
                    << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):"
                    << rt2 << " (" << errno << ") (" << strerror(errno) << ")";
                continue;
            }

            //SYLAR_LOG_INFO(g_logger) << " fd=" << fd_ctx->fd << " events=" << fd_ctx->events
            //                         << " real_events=" << real_events;
            if(real_events & READ) {
                fd_ctx->triggerEvent(READ);
                --m_pendingEventCount;
            }
            if(real_events & WRITE) {
                fd_ctx->triggerEvent(WRITE);
                --m_pendingEventCount;
            }
            // 触发相关事件
        }

        Fiber::ptr cur = Fiber::GetThis();
        auto raw_ptr = cur.get();
        cur.reset();

        raw_ptr->swapOut();
        //raw_ptr->back();
    }
    std::cout << "[DEBUG]  IOManager::idle over" << std::endl;
}

void IOManager::onTimerInsertedAtFront() {
    tickle();
}

总的来说这部分内容还是很抽象的,我的个人能力有限,只能表达出这些。关于这部分内容,我也跑了一些测试。测试效果也如预想一样,这次时间也有点些许的匆忙,这次测试结果,我会在下一节定时器中一起呈上来。文章来源地址https://www.toymoban.com/news/detail-798936.html

到了这里,关于[学习分享]----sylar服务器框架源码阅读--IO协程调度模块的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包