sylar作者在本站的地址为 这里,也可以查看 作者主页,也有视频教程可以 点击这里。此外,也可以看一下赵路强大佬的关于sylar协程模块的博客 点击这里,我本人在阅读sylar源码的时候也是参考了赵路强大佬的解析 可以点击这里。
各位看官也可以加我的qq和我讨论2511010742
IO 协程调度
sylar大人在协程调度模块中封装了epoll,对于每一个需要监听的文件描述符fd,都支持可读和可写事件。这部分操作是十分复杂的,需要读者对协程调度模块和epoll模型十分了解,接下来我会尽我所能向大家介绍清楚这部分内容。在协程调度模块中,当没有任务执行时就会在idle状态下忙等待,在本节中就利用了idle状态去做一些需要阻塞等待的任务,比如epoll_wait等。下面,我先介绍一下epoll的相关信息。
epoll简介
epoll 是 Linux 系统提供的一种事件通知机制,主要用于处理大量文件描述符的 I/O 事件。它是一种高效的 I/O 多路复用机制,相比于传统的 select 和 poll,epoll 在处理大量连接时有更好的性能。
1. epoll API
1.1 创建 epoll 实例
int epfd = epoll_create(int size);
这个函数用处是创建一个 epoll 实例,size 本意为希望监听的文件描述符的数量,不用太在意它。
1.2 控制 epoll 上的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- epfd 是 epoll 实例的文件描述符。
- op 表示操作类型,有三个值:
EPOLL_CTL_ADD:添加一个文件描述符到 epoll 实例中。
EPOLL_CTL_MOD:修改 epoll 实例上的文件描述符的事件。
EPOLL_CTL_DEL:从 epoll 实例中删除一个文件描述符。 - fd 是要操作的文件描述符。
- event 是一个 struct epoll_event 结构体,用于描述事件类型和携带的数据。
1.3 等待事件发生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- epfd 是 epoll 实例的文件描述符。
- events 是一个数组,用于存储发生的事件。
- maxevents 表示 events 数组的大小,即最多可以存储多少个事件。
- timeout 表示等待事件发生的超时时间,单位是毫秒。传入 -1 表示一直等待,传入 0 表示立即返回,大于 0 表示等待指定的时间。
2. epoll 结构
2.1 eventpoll
epoll 在内核中主要使用两个结构体:struct eventpoll 和 struct epitem。这两个结构体用于管理 epoll 实例中的文件描述符和事件。eventpoll 里成员有很多,我们讲一些常用的。
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
struct file *file;
struct wait_queue_head *wq;
struct wait_queue_entry wait;
wait_queue_t poll_wait;
struct list_head rdllist;
struct list_head ovflist;
struct rb_root_cached rbr;
struct list_head poll_siblings;
struct list_head fasync_list;
unsigned int ovflist_length;
struct ep_pqueue **poll_table;
struct user_struct *user;
unsigned long gen;
wait_queue_head_t proc_wait;
struct wakeup_source __rcu *ws;
unsigned int user_size;
int wakeup_pipe;
};
主要字段包括:
- lock: 用于实现对 eventpoll 结构的自旋锁,确保多线程并发安全。
- mtx: 用于实现互斥锁,确保对 eventpoll 结构的互斥访问。
- file: 与 epoll 实例关联的文件结构。
- wq: 等待队列头,用于进程等待事件发生。
- poll_table: 保存文件描述符到 epitem 的映射表,是一个指针数组。
- rdllist: 就绪链表,用于存储就绪的事件项 (epitem)。
- rbr: 红黑树根节点,用于快速查找文件描述符对应的 epitem。
- 其他字段用于管理等待队列、异步事件、用户信息等。
2.2 epitem
struct epitem 表示 epoll 实例中的一个事件项,它包含了与文件描述符相关的信息和事件状态。
struct epitem {
struct rb_node rbn;
struct list_head rdllink;
struct list_head fllink;
struct file *file;
struct eventpoll *ep;
struct wait_queue_head wq;
struct epoll_event event;
unsigned long last_wakeup;
spinlock_t lock;
};
主要字段包括:
- rbn: 用于将 epitem 插入到 struct eventpoll 的红黑树中,以实现快速查找。
- rdllink: 用于将 epitem 插入到 struct eventpoll 的就绪链表中,以等待用户程序处理。
- fllink: 用于将 epitem 插入到 struct eventpoll 的就绪链表中的尾部。
- file: 关联的文件结构。
- ep: 指向 struct eventpoll 的指针。
- wq: 等待队列头,用于进程等待事件发生。
- event: 存储文件描述符的事件类型和数据。
- last_wakeup: 记录上次唤醒的时间戳,用于避免多次重复唤醒。
- lock: 用于实现对 epitem 结构的自旋锁,确保多线程并发安全。
这两个结构体是 epoll 内核实现中的核心数据结构,通过它们实现了高效的事件管理和处理机制。struct eventpoll 用于维护 epoll 实例的状态,而 struct epitem 用于表示每个文件描述符的事件状态。
epoll_event 结构体用于描述文件描述符上的事件,它是在 epoll 操作中用到的关键数据结构。以下是 epoll_event 结构体的定义:
struct epoll_event {
uint32_t events; // 事件类型,可以是 EPOLLIN、EPOLLOUT、EPOLLERR 等
epoll_data_t data; // 用户数据,可以携带额外信息
};
events:表示事件的类型,可以是以下几个宏的组合:
- EPOLLIN:表示对应的文件描述符可以读取(Readable)。
- EPOLLOUT:表示对应的文件描述符可以写入(Writable)。
- EPOLLRDHUP:表示对端断开连接,或者是写半关闭状态(可读、对端关闭写)。
- EPOLLHUP:表示发生挂起事件,通常表示连接被挂断或发生错误。
- EPOLLERR:表示发生错误,需要通过 errno 获取具体错误信息。
data:是一个联合体,用于携带额外的信息。epoll_data_t 的定义如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
epoll_data 可以用于携带用户数据,具体使用取决于应用的需要。如果用户程序希望在事件发生时携带一些额外的信息,可以通过设置 data 字段。sylar就使用了其中的 ptr 指针来保存事件上下文信息。
首先,通过 epoll_create 创建一个 epoll 实例,得到一个文件描述符,然后,通过 epoll_ctl 将需要监听的文件描述符注册到 epoll 实例中,并指定关注的事件类型。这个过程会将我们添加的文件描述符添加到 epoll 句柄的红黑树中,然后执行 epoll_wait 函数则会阻塞等待这些文件描述符所关心的事件发生,当有关注的事件发生时,epoll_wait 返回,需要处理的事件都会保存在 events 数组里,遍历并处理这些就绪的事件。
为了高效地管理已经就绪的事件项,epoll 使用双向链表维护一个就绪事件链表。当文件描述符上的事件发生时,对应的事件项会被从红黑树中找到,然后加入到就绪链表中等待用户程序处理。用户程序调用 epoll_wait 进行阻塞等待事件的发生。内核检查红黑树上的文件描述符,如果有就绪的文件描述符,将相应的事件项加入到就绪链表中。epoll_wait 返回时,将就绪链表上的事件项传递给用户程序,用户程序可以遍历链表处理已经就绪的事件。
简单介绍完之后,让我们进入今天的主题,读一下 sylar 这部分源码。
源码分析
对于IO协程调度来说,我们只关心它的 fd、事件类型和对应的回调函数,其中 epoll 负责 fd 和事件之间的联系,回调函数则是需要协程调度去调度执行。sylar 建立了一个 FdContext 结构来保存对应的文件描述符,以及事件信息和回调函数。并将其保存在epoll_event的私有数据指针data.ptr中,当 wait 之后,遍历就绪事件就可以拿到这些信息,从而去执行相应的操作。
调度器在 idle 时会 epoll_wait 所有已经注册的 fd,返回时就可以拿到对应的 FdContext。然后将对应事件的回调函数加入到调度器的任务列表中,在 idle 退出后就可以调度这些回调函数了。下面我们看一下头文件信息。
class IOManager : public Scheduler, public TimerManager {
public:
typedef std::shared_ptr<IOManager> ptr;
typedef RWMutex RWMutexType;
// IO事件
enum Event {
// 无事件
NONE = 0x0,
// 读事件
READ = 0x1,
// 写事件
WRITE = 0x4,
};
private:
// Socket事件上下文类
struct FdContext {
typedef Mutex MutexType;
// 事件上下文类
struct EventContext {
// 这个事件执行的调度器
Scheduler* scheduler = nullptr;
// 事件协程
Fiber::ptr fiber;
// 事件回调函数
std::function<void()> cb;
};
// 获取事件上下文类
EventContext& getContext(Event event);
// 重新设置事件上下文
void resetContext(EventContext& ctx);
// 触发事件
void triggerEvent(Event event);
// 定义一读一写事件
EventContext read, write;
// 事件关联的句柄
int fd = 0;
// 当前描述符所关注事件
Event events = NONE;
// 事件的锁
MutexType mutex;
};
public:
IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = "");
~IOManager();
/**
* @brief 添加事件
* @param[in] fd socket句柄
* @param[in] event 事件类型
* @param[in] cb 事件回调函数
* @return 添加成功返回0,失败返回-1
*/
int addEvent(int fd, Event event, std::function<void()> cb = nullptr);
/**
* @brief 删除事件
* @param[in] fd socket句柄
* @param[in] event 事件类型
* @attention 不会触发事件
*/
bool delEvent(int fd, Event event);
// 取消 fd上的某个事件
bool cancelEvent(int fd, Event event);
// 取消 fd上的所有事件
bool cancelAll(int fd);
// 返回当前IOManager
static IOManager* GetThis();
protected:
void tickle() override;
bool stopping() override;
void idle() override;
void onTimerInsertedAtFront() override;
void contextResize(size_t size);
/**
* @brief 判断是否可以停止
* @param[out] timeout 最近要出发的定时器事件间隔
* @return 返回是否可以停止
*/
bool stopping(uint64_t& timeout);
private:
// epoll文件句柄
int m_epfd = 0;
// pipe 句柄
int m_tickleFds[2];
// 当前等待执行的事件总数
std::atomic<size_t> m_pendingEventCount = {0};
// IOManager的锁
RWMutexType m_mutex;
// socket事件上下文的容器
std::vector<FdContext*> m_fdContexts;
};
可以看到,这里对于每一个文件描述符都关联了相关的事件,以及回调。换句话说就是每一个文件描述符 fd 都可以关注三种事件类型,读写以及无事件,每种事件都可以注册一个专属的回调函数,并且还支持删除和取消事件。
下面来阅读一下 sylar 关于这部分的函数定义,我们先来看一下主体部分。
IOManager::IOManager(size_t threads, bool use_caller, const std::string& name)
:Scheduler(threads, use_caller, name) {
std::cout << "[DEBUG] IOManager create" << std::endl;
// 创建 epoll实例
m_epfd = epoll_create(5000);
//SYLAR_ASSERT(m_epfd > 0);
// 初始化管道
/*
这里 sylar使用管道提醒其他线程有任务来了
*/
int rt = pipe(m_tickleFds);
//SYLAR_ASSERT(!rt);
// 为管道读端注册可读事件
epoll_event event;
memset(&event, 0, sizeof(epoll_event));
// 监听事件,边沿触发
event.events = EPOLLIN | EPOLLET;
event.data.fd = m_tickleFds[0];
rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
//SYLAR_ASSERT(!rt);
rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
//SYLAR_ASSERT(!rt);
contextResize(32);
// 直接开启调度
start();
}
sylar 使用管道来通知有任务要调度
void IOManager::tickle() {
std::cout << "[INFO] IOManager::tickle" << std::endl;
if(!hasIdleThreads()) {
return;
}
// 向管道写段写入数据
int rt = write(m_tickleFds[1], "T", 1);
//SYLAR_ASSERT(rt == 1);
}
接下来是 idle 协程的重写了
void IOManager::idle() {
std::cout << "[DEBUG] IOManager::idle" << std::endl;
const uint64_t MAX_EVENTS = 256;
// 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零
epoll_event* events = new epoll_event[MAX_EVENTS]();
// 使用std::shared_ptr来管理这个动态分配的数组的内存
std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){
// lambda表达式,用于在shared_ptr释放内存时调用
// 删除动态分配的epoll_event数组
delete[] ptr;
});
while(true) {
uint64_t next_timeout = 0;
if(stopping(next_timeout)) {
break;
}
int rt = 0;
do {
static const int MAX_TIMEOUT = 3000;
if(next_timeout != ~0ull) {
next_timeout = (int)next_timeout > MAX_TIMEOUT
? MAX_TIMEOUT : next_timeout;
} else {
next_timeout = MAX_TIMEOUT;
}
rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);
if(rt < 0 && errno == EINTR) {
// 在中断时不执行任何操作
} else {
break;
}
} while(true);
std::vector<std::function<void()> > cbs;
// 填充过期的回调
listExpiredCb(cbs);
if(!cbs.empty()) {
schedule(cbs.begin(), cbs.end());
cbs.clear();
}
// 上面这一部分关于定时器的内容目前不必理会
for(int i = 0; i < rt; ++i) {
epoll_event& event = events[i];
if(event.data.fd == m_tickleFds[0]) {
uint8_t dummy[256];
while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
continue;
// 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。
}
// 拿到对应 fd的上下文信息
FdContext* fd_ctx = (FdContext*)event.data.ptr;
FdContext::MutexType::Lock lock(fd_ctx->mutex);
if(event.events & (EPOLLERR | EPOLLHUP)) {
// 如果事件是EPOLLERR | EPOLLHUP,就更新事件类型信息
event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
}
// 定义真实发生的事件
int real_events = NONE;
if(event.events & EPOLLIN) {
real_events |= READ;
}
if(event.events & EPOLLOUT) {
real_events |= WRITE;
}
// 以上,确定实际发生的事件
if((fd_ctx->events & real_events) == NONE) {
continue;
}
/*
下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合,
而 real_events 表示实际发生的事件集合。
~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &,
将原始关注事件集合中对应实际发生事件的位清零。
这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。
*/
int left_events = (fd_ctx->events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
event.events = EPOLLET | left_events;
/*
接下来,根据剩余的未发生事件集合 left_events 的情况,
确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。
如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD;
否则,操作类型 op 就是 EPOLL_CTL_DEL。
最后,将 event.events 设置为 EPOLLET | left_events,
将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件,
用于后续调用 epoll_ctl 进行修改或删除监听。
*/
int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if(rt2) {
std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):"
<< rt2 << " (" << errno << ") (" << strerror(errno) << ")";
continue;
}
if(real_events & READ) {
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
if(real_events & WRITE) {
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
// 触发相关事件
}
// 退出 idle协程
Fiber::ptr cur = Fiber::GetThis();
auto raw_ptr = cur.get();
cur.reset();
raw_ptr->swapOut();
//raw_ptr->back();
}
std::cout << "[DEBUG] IOManager::idle over" << std::endl;
}
接下来就是剩下的部分了,我会给出全部的代码,并尽我所能给出详细的注释。文章来源:https://www.toymoban.com/news/detail-798936.html
// 获得当前事件上下文
IOManager::FdContext::EventContext& IOManager::FdContext::getContext(IOManager::Event event) {
switch(event) {
case IOManager::READ:
return read;
case IOManager::WRITE:
return write;
default:
//SYLAR_ASSERT2(false, "getContext");
;
}
// 一般是使用无效参数调用函数抛出异常
throw std::invalid_argument("getContext invalid event");
}
// 重置事件上下文信息
void IOManager::FdContext::resetContext(EventContext& ctx) {
ctx.scheduler = nullptr;
ctx.fiber.reset();
ctx.cb = nullptr;
}
// 触发这些事件的回调函数
void IOManager::FdContext::triggerEvent(IOManager::Event event) {
//SYLAR_ASSERT(events & event);
events = (Event)(events & ~event);
EventContext& ctx = getContext(event);
// 将对应的回调或协程加入调度器
if(ctx.cb) {
ctx.scheduler->schedule(&ctx.cb);
} else {
ctx.scheduler->schedule(&ctx.fiber);
}
ctx.scheduler = nullptr;
return;
}
IOManager::IOManager(size_t threads, bool use_caller, const std::string& name)
:Scheduler(threads, use_caller, name) {
std::cout << "[DEBUG] IOManager create" << std::endl;
m_epfd = epoll_create(5000);
//SYLAR_ASSERT(m_epfd > 0);
int rt = pipe(m_tickleFds);
//SYLAR_ASSERT(!rt);
epoll_event event;
memset(&event, 0, sizeof(epoll_event));
// 监听事件,边沿触发
event.events = EPOLLIN | EPOLLET;
event.data.fd = m_tickleFds[0];
rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
//SYLAR_ASSERT(!rt);
rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
//SYLAR_ASSERT(!rt);
contextResize(32);
start();
}
IOManager::~IOManager() {
std::cout << "[INFO] ~IOManager" << std::endl;
stop();
close(m_epfd);
close(m_tickleFds[0]);
close(m_tickleFds[1]);
for(size_t i = 0; i < m_fdContexts.size(); ++i) {
if(m_fdContexts[i]) {
delete m_fdContexts[i];
}
}
}
// 重置 fd上下文容器
void IOManager::contextResize(size_t size) {
m_fdContexts.resize(size);
for(size_t i = 0; i < m_fdContexts.size(); ++i) {
if(!m_fdContexts[i]) {
m_fdContexts[i] = new FdContext;
m_fdContexts[i]->fd = i;
}
}
}
// 向某个文件描述符上添加事件
int IOManager::addEvent(int fd, Event event, std::function<void()> cb) {
FdContext* fd_ctx = nullptr;
RWMutexType::ReadLock lock(m_mutex);
// 如果 m_fdContexts大小不够的话,就扩容
if((int)m_fdContexts.size() > fd) {
fd_ctx = m_fdContexts[fd];
lock.unlock();
} else {
lock.unlock();
RWMutexType::WriteLock lock2(m_mutex);
contextResize(fd * 1.5);
fd_ctx = m_fdContexts[fd];
}
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
// 确定是添加事件(EPOLL_CTL_ADD)还是修改事件(EPOLL_CTL_MOD)。
// 如果 fd_ctx->events 不为零,说明文件描述符已经关注了其他事件,需要进行修改。
int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
epoll_event epevent;
// 边缘触发 + 原有的事件 + 新加入的事件
epevent.events = EPOLLET | fd_ctx->events | event;
// data.ptr 设置为 fd_ctx,用于在事件发生时追溯到对应的文件描述符上下文。
epevent.data.ptr = fd_ctx;
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events="
<< (EPOLL_EVENTS)fd_ctx->events;
return -1;
}
// 更新 IOManager 中待处理事件的计数,并更新文件描述符上下文的关注事件集合。
++m_pendingEventCount;
fd_ctx->events = (Event)(fd_ctx->events | event);
FdContext::EventContext& event_ctx = fd_ctx->getContext(event);
// 设置调度器
event_ctx.scheduler = Scheduler::GetThis();
if(cb) {
event_ctx.cb.swap(cb);
} else {
event_ctx.fiber = Fiber::GetThis();
}
return 0;
}
// 删除事件
bool IOManager::delEvent(int fd, Event event) {
RWMutexType::ReadLock lock(m_mutex);
if((int)m_fdContexts.size() <= fd) {
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
if(!(fd_ctx->events & event)) {
return false;
}
// 计算新的关注事件集合,通过将指定事件从原有集合中去除。
Event new_events = (Event)(fd_ctx->events & ~event);
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = EPOLLET | new_events;
epevent.data.ptr = fd_ctx;
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}
--m_pendingEventCount;
fd_ctx->events = new_events;
FdContext::EventContext& event_ctx = fd_ctx->getContext(event);
fd_ctx->resetContext(event_ctx);
// 获取特定事件类型的上下文信息,并重置该上下文信息。重置的目的是清除可能存在的回调函数和协程等相关信息。
return true;
}
// 取消 fd的某个事件
bool IOManager::cancelEvent(int fd, Event event) {
RWMutexType::ReadLock lock(m_mutex);
if((int)m_fdContexts.size() <= fd) {
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
if(!(fd_ctx->events & event)) {
return false;
}
Event new_events = (Event)(fd_ctx->events & ~event);
int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = EPOLLET | new_events;
epevent.data.ptr = fd_ctx;
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}
// 触发事件,执行相应的回调函数或唤醒关联的协程。
fd_ctx->triggerEvent(event);
--m_pendingEventCount;
return true;
}
// 取消某个描述符上的所有事件
bool IOManager::cancelAll(int fd) {
RWMutexType::ReadLock lock(m_mutex);
if((int)m_fdContexts.size() <= fd) {
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
if(!(fd_ctx->events)) {
return false;
}
int op = EPOLL_CTL_DEL;
epoll_event epevent;
epevent.events = 0;
epevent.data.ptr = fd_ctx;
int rt = epoll_ctl(m_epfd, op, fd, &epevent);
if(rt) {
std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):"
<< rt << " (" << errno << ") (" << strerror(errno) << ")";
return false;
}
if(fd_ctx->events & READ) {
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
if(fd_ctx->events & WRITE) {
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
// 如果文件描述符上下文关注了读或写事件,则触发相应的事件,并更新 IOManager 中待处理事件的计数。
//SYLAR_ASSERT(fd_ctx->events == 0);
return true;
}
IOManager* IOManager::GetThis() {
// dynamic_cast 是 C++ 中的一个运算符,用于在运行时执行安全的类型转换。
// 它主要用于进行基类和派生类之间的安全转型。
return dynamic_cast<IOManager*> (Scheduler::GetThis());
}
void IOManager::tickle() {
std::cout << "[INFO] IOManager::tickle" << std::endl;
if(!hasIdleThreads()) {
return;
}
int rt = write(m_tickleFds[1], "T", 1);
//SYLAR_ASSERT(rt == 1);
}
bool IOManager::stopping(uint64_t& timeout) {
timeout = getNextTimer();
return timeout == ~0ull
&& m_pendingEventCount == 0
&& Scheduler::stopping();
}
bool IOManager::stopping() {
uint64_t timeout = 0;
return stopping(timeout);
}
void IOManager::idle() {
std::cout << "[DEBUG] IOManager::idle" << std::endl;
const uint64_t MAX_EVENTS = 256;
// 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零
epoll_event* events = new epoll_event[MAX_EVENTS]();
// 使用std::shared_ptr来管理这个动态分配的数组的内存
std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){
// lambda表达式,用于在shared_ptr释放内存时调用
// 删除动态分配的epoll_event数组
delete[] ptr;
});
while(true) {
uint64_t next_timeout = 0;
// if(SYLAR_UNLIKELY(stopping(next_timeout))) {
// SYLAR_LOG_INFO(g_logger) << "name=" << getName()
// << " idle stopping exit";
// break;
// }
if(stopping(next_timeout)) {
break;
}
int rt = 0;
do {
static const int MAX_TIMEOUT = 3000;
if(next_timeout != ~0ull) {
next_timeout = (int)next_timeout > MAX_TIMEOUT
? MAX_TIMEOUT : next_timeout;
} else {
next_timeout = MAX_TIMEOUT;
}
rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout);
if(rt < 0 && errno == EINTR) {
// 在中断时不执行任何操作
} else {
break;
}
} while(true);
std::vector<std::function<void()> > cbs;
// 填充过期的回调
listExpiredCb(cbs);
if(!cbs.empty()) {
schedule(cbs.begin(), cbs.end());
cbs.clear();
}
for(int i = 0; i < rt; ++i) {
epoll_event& event = events[i];
if(event.data.fd == m_tickleFds[0]) {
uint8_t dummy[256];
while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
continue;
// 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。
}
FdContext* fd_ctx = (FdContext*)event.data.ptr;
FdContext::MutexType::Lock lock(fd_ctx->mutex);
if(event.events & (EPOLLERR | EPOLLHUP)) {
event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events;
}
int real_events = NONE;
if(event.events & EPOLLIN) {
real_events |= READ;
}
if(event.events & EPOLLOUT) {
real_events |= WRITE;
}
// 以上,确定实际发生的事件
if((fd_ctx->events & real_events) == NONE) {
continue;
}
/*
下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合,
而 real_events 表示实际发生的事件集合。
~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &,
将原始关注事件集合中对应实际发生事件的位清零。
这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。
*/
int left_events = (fd_ctx->events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
event.events = EPOLLET | left_events;
/*
接下来,根据剩余的未发生事件集合 left_events 的情况,
确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。
如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD;
否则,操作类型 op 就是 EPOLL_CTL_DEL。
最后,将 event.events 设置为 EPOLLET | left_events,
将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件,
用于后续调用 epoll_ctl 进行修改或删除监听。
*/
int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if(rt2) {
std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", "
<< (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):"
<< rt2 << " (" << errno << ") (" << strerror(errno) << ")";
continue;
}
//SYLAR_LOG_INFO(g_logger) << " fd=" << fd_ctx->fd << " events=" << fd_ctx->events
// << " real_events=" << real_events;
if(real_events & READ) {
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
if(real_events & WRITE) {
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
// 触发相关事件
}
Fiber::ptr cur = Fiber::GetThis();
auto raw_ptr = cur.get();
cur.reset();
raw_ptr->swapOut();
//raw_ptr->back();
}
std::cout << "[DEBUG] IOManager::idle over" << std::endl;
}
void IOManager::onTimerInsertedAtFront() {
tickle();
}
总的来说这部分内容还是很抽象的,我的个人能力有限,只能表达出这些。关于这部分内容,我也跑了一些测试。测试效果也如预想一样,这次时间也有点些许的匆忙,这次测试结果,我会在下一节定时器中一起呈上来。文章来源地址https://www.toymoban.com/news/detail-798936.html
到了这里,关于[学习分享]----sylar服务器框架源码阅读--IO协程调度模块的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!