ZLMediaKit源码分析(一)服务启动
ZLMediaKit源码分析(二)推流创建
ZLMediaKit源码分析(三)拉流创建
ZLMediaKit源码分析(四)重封装
ZLMediaKit源码分析(五)代理服务
ZLMediaKit源码分析(六)http 点播
服务创建流程
- 创建TcpServer实例,其父类Server同时被实例化。
- EventPollerPool初始化线程池。批量创建EventPoller,每个EventPoller对应一个线程,对应一个CPU核。该线程池线程数量对应CPU核数或者由命令行参数-t指定。
- 注册TcpServer::_on_create_socket回调,该回调创建Socekt,并且绑定EventPoller。
- 启动TcpServer,此时该TcpServer尚未绑定Socket。
- 注册TcpServer::_session_alloc回调,该回调函数创建Session,SessionHelper。
5.1. Session,SessionHelper一一对应,SessionHelper操作全局静态变量session_map,管理session。
5.2. 注册SessionHelper::_on_create_socket回调,该回调也创建Socket,但是该回调创建的Socket没有Poller负载相关操作,其poller继承自父。 - 启动TcpServer,首先TcpServer::setupEvent()创建Socket。
socket绑定的poller,是在TcpServer的父类Server初始化的时候返回的。 - TcpServer::setupEvent()为socket注册两个回调函数。
7.1. Socket::_on_before_accept,在接受客户端链接请求之前,创建socket,此时绑定的poller有负载考虑。
7.2. Socket::_on_accept,接受客户端链接请求。使用Socket::_on_before_accept创建的socket,调用步骤4注册的TcpServer::_session_alloc回调创建session。并为该socket绑定数据可读回调。 - 启动监听服务。
8.1. 添加客户端请求事件。
8.2. 注册客户端请求响应回调,主要调用Socket::onAccept()。 - 客户端响应处理。
9.1. 接受客户端请求accept()。
9.2. 调用Socket::_on_before_accept回调,创建socket,并考虑到poller负载。
9.3. 如果8.2. 创建失败,则继承父_poller,创建socket。一般这里不执行。
9.4. 调用Socket::_on_accept接受客户端请求,核心调用回调TcpServer::onAcceptConnection(),创建客户端session,注册数据可读事件和回调。
9.5. 注册客户端可读事件,绑定数据可读回调。 - 根据poller数量,批量clone服务。
继承关系
创建TcpServer
创建TcpServer并调用start().
server/main.cpp
uint16_t rtspPort = mINI::Instance()[Rtsp::kPort];
auto rtspSrv = std::make_shared<TcpServer>();;
if (rtspPort) { rtspSrv->start<RtspSession>(rtspPort); }
TcpServer()初始化做了两件重要的事情:
- 创建EventPollerPool。
EventPollerPool是TcpServer父类Server初始化的时候创建的。会根据cpu核数count或者命令行参数-t count创建count个poller实例,并对应启动线程。 - 注册Socket创建回调TcpServer::_on_create_socket。该回调有两大用途:
Server启动时,优先创建socket。
Server accept客户端链接请求时,创建客户端socket。
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// main() 函数中 auto rtspSrv = std::make_shared<TcpServer>();;
// poller默认为空
// 初始化父类 Server(poller)
TcpServer::TcpServer(const EventPoller::Ptr &poller) : Server(poller) {
setOnCreateSocket(nullptr);
}
Server::Server() 初始化
3rdpart/ZLToolKit/src/Network/Server.cpp
Server::Server(EventPoller::Ptr poller) {
// EventPoller::Ptr Server::_poller;
// EventPollerPool::Instance() 会创建线程
_poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller();
}
创建EventPoller和线程
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
EventPollerPool::EventPollerPool() {
// 添加多个EventPoller并创建线程,
auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true, s_enable_cpu_affinity);
NoticeCenter::Instance().emitEvent(kOnStarted, *this, size);
InfoL << "EventPoller created size: " << size;
}
EventPollerPool继承自TaskExecutorGetterImp,addPoller()的实现在父类中。
addPoller会根据cpu核数count或者命令行参数-t count创建count个poller实例,并对应启动线程。
3rdpart/ZLToolKit/src/Thread/TaskExecutor.cpp
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread, bool enable_cpu_affinity) {
auto cpus = thread::hardware_concurrency();
size = size > 0 ? size : cpus;
for (size_t i = 0; i < size; ++i) {
auto full_name = name + " " + to_string(i);
EventPoller::Ptr poller(new EventPoller(full_name, (ThreadPool::Priority) priority));
poller->runLoop(false, register_thread);
poller->async([i, cpus, full_name, enable_cpu_affinity]() {
setThreadName(full_name.data());
if (enable_cpu_affinity) {
setThreadAffinity(i % cpus);
}
});
// TaskExecutorGetterImp::_threads 成员变量
// std::vector<TaskExecutor::Ptr> _threads;
_threads.emplace_back(std::move(poller));
}
return size;
}
创建epoll实例。
EventPoller::EventPoller(std::string name, ThreadPool::Priority priority) {
_name = std::move(name);
_priority = priority;
SockUtil::setNoBlocked(_pipe.readFD());
SockUtil::setNoBlocked(_pipe.writeFD());
#if defined(HAS_EPOLL)
_epoll_fd = epoll_create(EPOLL_SIZE);
if (_epoll_fd == -1) {
throw runtime_error(StrPrinter << "Create epoll fd failed: " << get_uv_errmsg());
}
SockUtil::setCloExec(_epoll_fd);
#endif //HAS_EPOLL
_logger = Logger::Instance().shared_from_this();
_loop_thread_id = this_thread::get_id();
//添加内部管道事件
if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
throw std::runtime_error("Add pipe fd to poller failed");
}
}
创建线程。
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
// poller->runLoop(false, register_thread);
void EventPoller::runLoop(bool blocked, bool ref_self) {
// 默认blocked为false
if (blocked) {
ThreadPool::setPriority(_priority);
lock_guard<mutex> lck(_mtx_running);
_loop_thread_id = this_thread::get_id();
if (ref_self) {
s_current_poller = shared_from_this();
}
_sem_run_started.post();
_exit_flag = false;
uint64_t minDelay;
#if defined(HAS_EPOLL)
struct epoll_event events[EPOLL_SIZE];
while (!_exit_flag) {
minDelay = getMinDelay();
startSleep();//用于统计当前线程负载情况
int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);
sleepWakeUp();//用于统计当前线程负载情况
if (ret <= 0) {
//超时或被打断
continue;
}
for (int i = 0; i < ret; ++i) {
struct epoll_event &ev = events[i];
int fd = ev.data.fd;
auto it = _event_map.find(fd);
if (it == _event_map.end()) {
epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
continue;
}
auto cb = it->second;
try {
(*cb)(toPoller(ev.events));
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do event task: " << ex.what();
}
}
}
#else
// select
......
#endif //HAS_EPOLL
} else {
_loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);
_sem_run_started.wait();
}
}
根据负载获取EventPoller
声明:
3rdpart/ZLToolKit/src/Poller/EventPoller.h
class EventPollerPool : public std::enable_shared_from_this<EventPollerPool>, public TaskExecutorGetterImp {
public:
......
/**
* 根据负载情况获取轻负载的实例
* 如果优先返回当前线程,那么会返回当前线程
* 返回当前线程的目的是为了提高线程安全性
* @param prefer_current_thread 是否优先获取当前线程
*/
EventPoller::Ptr getPoller(bool prefer_current_thread = true);
......
}
实现:
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
EventPoller::Ptr EventPollerPool::getPoller(bool prefer_current_thread) {
auto poller = EventPoller::getCurrentPoller();
// prefer_current_thread == true
// _prefer_current_thread
if (prefer_current_thread && _prefer_current_thread && poller) {
return poller;
}
// 执行了下面
return dynamic_pointer_cast<EventPoller>(getExecutor());
}
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
// static
EventPoller::Ptr EventPoller::getCurrentPoller() {
return s_current_poller.lock();
}
根据负载返回TaskExecutor。
3rdpart/ZLToolKit/src/Thread/TaskExecutor.cpp
TaskExecutor::Ptr TaskExecutorGetterImp::getExecutor() {
auto thread_pos = _thread_pos;
if (thread_pos >= _threads.size()) {
thread_pos = 0;
}
TaskExecutor::Ptr executor_min_load = _threads[thread_pos];
auto min_load = executor_min_load->load();
for (size_t i = 0; i < _threads.size(); ++i, ++thread_pos) {
if (thread_pos >= _threads.size()) {
thread_pos = 0;
}
auto th = _threads[thread_pos];
auto load = th->load();
if (load < min_load) {
min_load = load;
executor_min_load = th;
}
if (min_load == 0) {
break;
}
}
_thread_pos = thread_pos;
return executor_min_load;
}
注册TcpServer::_on_create_socket回调
调用:
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// main() 函数中 auto rtspSrv = std::make_shared<TcpServer>();;
// 所以poller默认为空
// 初始化父类 Server(poller)
TcpServer::TcpServer(const EventPoller::Ptr &poller) : Server(poller) {
setOnCreateSocket(nullptr);
}
实现:
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// cb 传入为nullptr
void TcpServer::setOnCreateSocket(Socket::onCreateSocket cb) {
if (cb) {
_on_create_socket = std::move(cb);
} else {
// 走这里
// 注册回调函数, 只在这里注册
// 调用该回调函数时,poller不为空
// TcpServer::_on_create_socket
_on_create_socket = [](const EventPoller::Ptr &poller) {
return Socket::createSocket(poller, false);
};
}
for (auto &pr : _cloned_server) {
// cb 传入为nullptr
pr.second->setOnCreateSocket(cb);
}
}
创建Socket
../3rdpart/ZLToolKit/src/Network/Socket.cpp
Socket::Ptr Socket::createSocket(const EventPoller::Ptr &poller, bool enable_mutex){
return std::make_shared<Socket>(poller, enable_mutex);
}
如果传入poller参数为空,则寻找最轻负载poller绑定socket。
../3rdpart/ZLToolKit/src/Network/Socket.cpp
Socket::Socket(const EventPoller::Ptr &poller, bool enable_mutex) :
_mtx_sock_fd(enable_mutex), _mtx_event(enable_mutex),
_mtx_send_buf_waiting(enable_mutex), _mtx_send_buf_sending(enable_mutex){
_poller = poller;
if (!_poller) {
_poller = EventPollerPool::Instance().getPoller();
}
setOnRead(nullptr);
setOnErr(nullptr);
setOnAccept(nullptr);
setOnFlush(nullptr);
setOnBeforeAccept(nullptr);
setOnSendResult(nullptr);
}
TcpServer::start()
该函数启动TcpServer服务。主要做两件事情:
- 注册TcpServer::_session_alloc回调。
a) 为SessionType(对应为RtspSession)申请内存空间。绑定socket。
b) 调用SocketHelper::setOnCreateSocket(),注册创建socket的回调函数,实际调用TcpServer::_on_create_socket。Session无setOnCreateSocket()实现,继承自SocketHelper。
c) 创建SessionHelper,绑定server和session。Session和SessionHelper一一对应。SessionHelper通过全局变量SessionMap管理session。 - 调用start_l(port, host, backlog)启动监听任务。
注册TcpServer::_session_alloc
3rdpart/ZLToolKit/src/Network/TcpServer.h
class TcpServer : public Server {
public:
using Ptr = std::shared_ptr<TcpServer>;
......
// if (rtspPort) { rtspSrv->start<RtspSession>(rtspPort); }
// RtspSession等同于SessionType, RtspSession由Session派生
template<typename SessionType>
void start(uint16_t port, const std::string &host = "::", uint32_t backlog = 1024) {
// Session创建器,通过它创建不同类型的服务器
// 注册创建session的回调
// std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> TcpServer::_session_alloc;
_session_alloc = [](const TcpServer::Ptr &server, const Socket::Ptr &sock) {
// RtspSession等同于SessionType, RtspSession由Session派生
auto session = std::make_shared<SessionType>(sock);
// 首先调用TcpServer::setOnCreateSocket()
// 然后再调用Socket::createSocket(),只注册回调函数
session->setOnCreateSocket(server->_on_create_socket);
return std::make_shared<SessionHelper>(server, session);
};
start_l(port, host, backlog);
}
private:
......
Socket::Ptr _socket;
std::shared_ptr<Timer> _timer;
Socket::onCreateSocket _on_create_socket;
std::unordered_map<SessionHelper *, SessionHelper::Ptr> _session_map;
std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> _session_alloc;
};
RtspSession继承自Session继承自SocketHelper。
3rdpart/ZLToolKit/src/Network/Socket.cpp
void SocketHelper::setOnCreateSocket(Socket::onCreateSocket cb){
if (cb) {
// 走这里,此时cb不为空,TcpServer::_on_create_socket
_on_create_socket = std::move(cb);
} else {
_on_create_socket = [](const EventPoller::Ptr &poller) {
return Socket::createSocket(poller, false);
};
}
}
SocketHelper::_on_create_socket 实际为TcpServer::_on_create_socket,在Socket::onAccept()->Socket::createSocket()中执行。
且当Socket::onAccept()->Socket::_on_before_accept创建失败的时候执行,继承父poller,没有poller负载考虑。
创建SessionHelper管理Session
3rdpart/ZLToolKit/src/Network/Server.cpp
SessionHelper::SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session) {
_server = server;
_session = std::move(session);
//记录session至全局的map,方便后面管理
// 这个是个静态变量, 只初始化一次。
_session_map = SessionMap::Instance().shared_from_this();
_identifier = _session->getIdentifier();
_session_map->add(_identifier, _session);
}
TcpServer::start_l()
缩进太深了,专门分析一下。
TcpServer::start_l()
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::start_l(uint16_t port, const std::string &host, uint32_t backlog) {
// 1. 创建socket,并绑定回调
// 创建TCPServer::_socket
// 注册TCPServer::_socket回调函数setOnBeforeAccept,setOnAccept
setupEvent();
// 2. 启动监听
if (!_socket->listen(port, host.c_str(), backlog)) {
//创建tcp监听失败,可能是由于端口占用或权限问题
string err = (StrPrinter << "Listen on " << host << " " << port << " failed: " << get_uv_errmsg(true));
throw std::runtime_error(err);
}
// 3. Timer
//新建一个定时器定时管理这些tcp会话
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
_timer = std::make_shared<Timer>(2.0f, [weak_self]() -> bool {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->onManagerSession();
return true;
}, _poller);
// 4. 循环创建多个TcpServer
EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
if (poller == _poller || !poller) {
return;
}
auto &serverRef = _cloned_server[poller.get()];
if (!serverRef) {
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});
InfoL << "TCP server listening on [" << host << "]: " << port;
}
TcpServer::setupEvent() 创建监听Socket
setupEvent()有两个主要作用:
- 根据poller创建对应的Socket。
- 为Socket注册两个回调,在响应客户端请求是调用。
a) setOnBeforeAccept() 创建客户端socket。根据poller最轻负载创建Socket。
b) setOnAccept() 接收客户端数据。
此次调用setupEvent(),TcpServer::_poller由TcpServer::TcpServer()->Server::Server()->EventPollerPool::Instance().getPoller(true)创建。
第二次调用setupEvent()在EventPollerPool::Instance().for_each()批量clone的时候,TcpServer::TcpServer()->Server::Server()->EventPollerPool::Instance()->TaskExecutorGetterImp::addPoller()已经批量创建了poller。
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::setupEvent() {
// Socket::Ptr TCPServer::_socket;
// TcpServer::_poller继承父类Server protect变量,此时不为空
_socket = createSocket(_poller);
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
// 回调函数1
// 实际是为Socket::_on_before_accept赋值,回调函数。
// 该回调函数内调用TcpServer::onBeforeAcceptConnection(poller)
_socket->setOnBeforeAccept([weak_self](const EventPoller::Ptr &poller) -> Socket::Ptr {
if (auto strong_self = weak_self.lock()) {
return strong_self->onBeforeAcceptConnection(poller);
}
return nullptr;
});
// 回调函数2
// 实际是为Socket::_on_accept赋值。
// 该回调内调用TcpServer::onAcceptConnection()
_socket->setOnAccept([weak_self](Socket::Ptr &sock, shared_ptr<void> &complete) {
if (auto strong_self = weak_self.lock()) {
auto ptr = sock->getPoller().get();
auto server = strong_self->getServer(ptr);
ptr->async([server, sock, complete]() {
//该tcp客户端派发给对应线程的TcpServer服务器
server->onAcceptConnection(sock);
});
}
});
}
TcpServer创建Socket
- 调用TcpServer::createSocket(),
- 调用回调函数TcpServer::_on_create_socket(poller),TcpServer::setOnCreateSocekt()注册该回调为Socket::createSocket()。
- 调用Socket::createSocket(), 构造Socket::Socket(const EventPoller::Ptr &poller, bool enable_mutex),返回Socket::Ptr。
参考:TcpServer::TcpServer(){setOnCreateSocket(nullptr);}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// 此时poller不为空,是TcpServer初始化的时候,继承自Server
Socket::Ptr TcpServer::createSocket(const EventPoller::Ptr &poller) {
// TcpServer::setOnCreateSoket()注册TcpServer::_on_create_socket
// 实际为Socket::createSocket(),
return _on_create_socket(poller);
}
注册Socket::setOnBeforeAccept()回调
接收请求之前创建Socket,考虑负载均衡。
调用:
3rdpart/ZLToolKit/src/Network/Server.cpp
void TcpServer::setupEvent() {
......
_socket = createSocket(_poller);
......
// 回调函数1
// 实际是为Socket::_on_before_accept赋值,回调函数。
// 该回调函数内调用TcpServer::onBeforeAcceptConnection(poller)
_socket->setOnBeforeAccept([weak_self](const EventPoller::Ptr &poller) -> Socket::Ptr {
if (auto strong_self = weak_self.lock()) {
// TcpServer::onBeforeAcceptConnection()
return strong_self->onBeforeAcceptConnection(poller);
}
return nullptr;
});
.....
}
为Socket::_on_before_accept赋值:
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnBeforeAccept(onCreateSocket cb){
LOCK_GUARD(_mtx_event);
if (cb) {
// 走这里
_on_before_accept = std::move(cb);
} else {
_on_before_accept = [](const EventPoller::Ptr &poller) {
return nullptr;
};
}
}
该回调根据poller负载,创建Socket。
参考TcpServer::setupEvent()->TcpServer::createSocket()。
EventPollerPool::Instance().getPoller(false) 会根据poller负载,返回最空闲的poller。
参考:TcpServer::TcpServer()->Server::Server()->EventPollerPool::Instance().getPoller()->TaskExecutorGetterImp::getExecutor()
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
Socket::Ptr TcpServer::onBeforeAcceptConnection(const EventPoller::Ptr &poller) {
// _poller 只做判断使用
assert(_poller->isCurrentThread());
//此处改成自定义获取poller对象,防止负载不均衡
return createSocket(EventPollerPool::Instance().getPoller(false));
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
Socket::Ptr TcpServer::createSocket(const EventPoller::Ptr &poller) {
return _on_create_socket(poller);
}
注册Socket::setOnAccept()回调
实际上就是为Socket::_on_accept赋值。该回调会在Socket::onAccept()中调用,接受客户端请求。
参数sock由Socket::_on_before_accept()回调创建,该回调根据负载分配一个负载最轻的poller。Socket::_on_before_accept由TcpServer::setupEvent()->Socket::setOnBeforeAccept()设置。
该回调内有重要的函数调用server->onAcceptConnection(sock),该函数有以下两个主要操作:
- 调用TcpServer::_session_alloc创建session,并添加该session至全局静态变量_session_map中。
- 为sock设置数据回调,该回调实际调用Session::onRecv()上抛数据。
调用:
3rdpart/ZLToolKit/src/Network/Server.cpp
void TcpServer::setupEvent() {
......
_socket = createSocket(_poller);
......
// 回调函数2
// 实际是为Socket::_on_accept赋值。
// 该回调内调用TcpServer::onAcceptConnection()
// sock实际为Socket::_on_before_accept根据poller负载创建。
_socket->setOnAccept([weak_self](Socket::Ptr &sock, shared_ptr<void> &complete) {
if (auto strong_self = weak_self.lock()) {
auto ptr = sock->getPoller().get();
// 逆向寻找server
auto server = strong_self->getServer(ptr);
ptr->async([server, sock, complete]() {
//该tcp客户端派发给对应线程的TcpServer服务器
server->onAcceptConnection(sock);
});
}
});
......
}
为Socket::_on_accept赋值:
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnAccept(onAcceptCB cb) {
LOCK_GUARD(_mtx_event);
if (cb) {
// 走这里
_on_accept = std::move(cb);
} else {
_on_accept = [](Socket::Ptr &sock, shared_ptr<void> &complete) {
WarnL << "Socket not set accept callback, peer fd: " << sock->rawFD();
};
}
}
创建并管理Session
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
// Server接收到客户端连接请求,上抛peer Socket至此。
// 一个请求一个Session
Session::Ptr TcpServer::onAcceptConnection(const Socket::Ptr &sock) {
assert(_poller->isCurrentThread());
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
//创建一个Session;这里实现创建不同的服务会话实例
// SessionHelper
auto helper = _session_alloc(std::dynamic_pointer_cast<TcpServer>(shared_from_this()), sock);
auto session = helper->session();
//把本服务器的配置传递给Session
session->attachServer(*this);
//_session_map::emplace肯定能成功
// 记录 裸指针和智能指针
auto success = _session_map.emplace(helper.get(), helper).second;
assert(success == true);
weak_ptr<Session> weak_session = session;
// 回调函数1
//会话接收数据事件
sock->setOnRead([weak_session](const Buffer::Ptr &buf, struct sockaddr *, int) {
//获取会话强应用
auto strong_session = weak_session.lock();
if (!strong_session) {
return;
}
try {
strong_session->onRecv(buf);
} catch (SockException &ex) {
strong_session->shutdown(ex);
} catch (exception &ex) {
strong_session->shutdown(SockException(Err_shutdown, ex.what()));
}
});
SessionHelper *ptr = helper.get();
// 回调函数2
//会话接收到错误事件
sock->setOnErr([weak_self, weak_session, ptr](const SockException &err) {
//在本函数作用域结束时移除会话对象
//目的是确保移除会话前执行其onError函数
//同时避免其onError函数抛异常时没有移除会话对象
onceToken token(nullptr, [&]() {
//移除掉会话
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
assert(strong_self->_poller->isCurrentThread());
if (!strong_self->_is_on_manager) {
//该事件不是onManager时触发的,直接操作map
strong_self->_session_map.erase(ptr);
} else {
//遍历map时不能直接删除元素
strong_self->_poller->async([weak_self, ptr]() {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->_session_map.erase(ptr);
}
}, false);
}
});
//获取会话强应用
auto strong_session = weak_session.lock();
if (strong_session) {
//触发onError事件回调
strong_session->onError(err);
}
});
return session;
}
TcpServer::_session_alloc()在TcpSerer::start()中注册。
3rdpart/ZLToolKit/src/Network/TcpServer.h
class TcpServer : public Server {
public:
......
template<typename SessionType>
void start(uint16_t port, const std::string &host = "::", uint32_t backlog = 1024) {
// Session创建器,通过它创建不同类型的服务器
// 注册创建session的回调
// std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> TcpServer::_session_alloc;
_session_alloc = [](const TcpServer::Ptr &server, const Socket::Ptr &sock) {
// RtspSession等同于SessionType, RtspSession由Session派生
auto session = std::make_shared<SessionType>(sock);
// 首先调用TcpServer::setOnCreateSocket()
// 然后再调用Socket::createSocket(),只注册回调函数
session->setOnCreateSocket(server->_on_create_socket);
return std::make_shared<SessionHelper>(server, session);
};
start_l(port, host, backlog);
}
private:
......
std::function<SessionHelper::Ptr(const TcpServer::Ptr &server, const Socket::Ptr &)> _session_alloc;
};
调用TcpServer::_session_alloc()创建Session(包含Socket::_on_before_accept()创建爱你的Socket,其poller负载最轻),并返回SessionHelper。
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
//创建一个Session;这里实现创建不同的服务会话实例
// SessionHelper
auto helper = _session_alloc(std::dynamic_pointer_cast<TcpServer>(shared_from_this()), sock);
auto session = helper->session();
//把本服务器的配置传递给Session
session->attachServer(*this);
3rdpart/ZLToolKit/src/Network/Server.cpp
const Session::Ptr &SessionHelper::session() const {
return _session;
}
(map<SessionHelper*,SessionHelper::Ptr>) TcpServer::_session_map,记录裸指针和智能指针的对应关系。
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
//_session_map::emplace肯定能成功
// 记录 裸指针和智能指针
auto success = _session_map.emplace(helper.get(), helper).second;
assert(success == true);
注册Socket::_on_read回调
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
Session::Ptr TcpServer::onAcceptConnection(const Socket::Ptr &sock) {
省略session创建部分
......
//会话接收数据事件
sock->setOnRead([weak_session](const Buffer::Ptr &buf, struct sockaddr *, int) {
//获取会话强应用
auto strong_session = weak_session.lock();
if (!strong_session) {
return;
}
try {
strong_session->onRecv(buf);
} catch (SockException &ex) {
strong_session->shutdown(ex);
} catch (exception &ex) {
strong_session->shutdown(SockException(Err_shutdown, ex.what()));
}
});
......
}
实现:
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnRead(onReadCB cb) {
LOCK_GUARD(_mtx_event);
if (cb) {
// 走这里
_on_read = std::move(cb);
} else {
_on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) {
WarnL << "Socket not set read callback, data ignored: " << buf->size();
};
}
}
RtspSession继承自Session。
该回调实际调用纯虚函数Session::onRevc(),实际调用RtspSession::onRecv()。
3rdpart/ZLToolKit/src/Network/Server.h
class Session : public std::enable_shared_from_this<Session>, public SocketHelper {
public:
using Ptr = std::shared_ptr<Session>;
Session(const Socket::Ptr &sock);
~Session() override = default;
/**
* 接收数据入口
* @param buf 数据,可以重复使用内存区,不可被缓存使用
*/
virtual void onRecv(const Buffer::Ptr &buf) = 0;
......
};
Socket::listen(ip, port)
3rdpart/ZLToolKit/src/Network/Socket.cpp
bool Socket::listen(uint16_t port, const string &local_ip, int backlog) {
int sock = SockUtil::listen(port, local_ip.data(), backlog);
if (sock == -1) {
return false;
}
return listen(makeSock(sock, SockNum::Sock_TCP));
}
调用系统接口SockUtil::listen()。
3rdpart/ZLToolKit/src/Network/sckutil.cpp
int SockUtil::listen(const uint16_t port, const char *local_ip, int back_log) {
int fd = -1;
int family = support_ipv6() ? (is_ipv4(local_ip) ? AF_INET : AF_INET6) : AF_INET;
if ((fd = (int)socket(family, SOCK_STREAM, IPPROTO_TCP)) == -1) {
WarnL << "Create socket failed: " << get_uv_errmsg(true);
return -1;
}
// 设置多路复用
setReuseable(fd, true, false);
setNoBlocked(fd);
setCloExec(fd);
if (bind_sock(fd, local_ip, port, family) == -1) {
close(fd);
return -1;
}
//开始监听
if (::listen(fd, back_log) == -1) {
WarnL << "Listen socket failed: " << get_uv_errmsg(true);
close(fd);
return -1;
}
return fd;
}
调用Socket::listen():
3rdpart/ZLToolKit/src/Network/Socket.cpp
// sock为系统接口socket() 返回套接字ID
bool Socket::listen(const SockFD::Ptr &sock){
closeSock();
weak_ptr<SockFD> weak_sock = sock;
weak_ptr<Socket> weak_self = shared_from_this();
_enable_recv = true;
// EventPoller::Ptr Socket::_poller;
// epoll实例addEvent绑定套接字
int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error,
[weak_self, weak_sock](int event) { // 注册回调函数
auto strong_self = weak_self.lock();
auto strong_sock = weak_sock.lock();
if (!strong_self || !strong_sock) {
return;
}
// 事件回调创建新链接,这是回调中的具体执行函数。
strong_self->onAccept(strong_sock, event);
}
);
if (result == -1) {
return false;
}
LOCK_GUARD(_mtx_sock_fd);
_sock_fd = sock;
return true;
}
Socket::closeSock()
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::closeSock() {
_con_timer = nullptr;
_async_con_cb = nullptr;
LOCK_GUARD(_mtx_sock_fd);
_sock_fd = nullptr;
}
epoll添加请求链接事件
epoll绑定套接字。
PollEventCB注册在Socket::listen(const SockFD::Ptr &sock)。
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
TimeTicker();
if (!cb) {
WarnL << "PollEventCB is empty";
return -1;
}
if (isCurrentThread()) {
#if defined(HAS_EPOLL)
struct epoll_event ev = {0};
ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
ev.data.fd = fd;
int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (ret == 0) {
_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
}
return ret;
#else
#ifndef _WIN32
//win32平台,socket套接字不等于文件描述符,所以可能不适用这个限制
if (fd >= FD_SETSIZE || _event_map.size() >= FD_SETSIZE) {
WarnL << "select() can not watch fd bigger than " << FD_SETSIZE;
return -1;
}
#endif
auto record = std::make_shared<Poll_Record>();
record->event = event;
record->call_back = std::move(cb);
_event_map.emplace(fd, record);
return 0;
#endif //HAS_EPOLL
}
async([this, fd, event, cb]() {
// 注册的回调,在Socket::listen(const SockFD::Ptr &sock)中。
addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));
});
return 0;
}
同步事件。
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
// 如果调用该函数的线程就是本对象的轮询线程,那么may_sync为true时就是同步执行任务
// 默认为true
Task::Ptr EventPoller::async(TaskIn task, bool may_sync) {
return async_l(std::move(task), may_sync, false);
}
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) {
TimeTicker();
if (may_sync && isCurrentThread()) {
task();
return nullptr;
}
auto ret = std::make_shared<Task>(std::move(task));
{
lock_guard<mutex> lck(_mtx_task);
if (first) {
_list_task.emplace_front(ret);
} else {
_list_task.emplace_back(ret);
}
}
//写数据到管道,唤醒主线程
_pipe.write("", 1);
return ret;
}
请求响应Socket::onAccept()
接受客户端线程。
- accept() 接受客户端请求socket client connect()。
- 创建客户端socket,这个地方有两步操作:
a) 优先执行Socket::_on_before_accept,根据poller负载创建socket。
b) 保底执行Socket::createSocket(_poller, false),根据父poller创建socket。 - setPeerSock()设置peer socket ip port
- 调用Socket::_on_accept创建session,并注册数据上抛回调Socket::_on_read。
Socket::_on_accept注册在TcpServer::setupEvent()->Socket::setOnAccept()。 - Socket::attachEvent() 主要执行以下操作:
a) 获取共享缓冲区;
b) 注册数据可读事件;
c) 数据上抛回调:i.recvFrom()接收数据;ii.执行Socket::onRead()->Socket::_on_read上抛数据。
3rdpart/ZLToolKit/src/Network/Socket.cpp
int Socket::onAccept(const SockFD::Ptr &sock, int event) noexcept {
int fd;
while (true) {
// 读取回调
if (event & EventPoller::Event_Read) {
do {
// 此时只获取客户端fd,没有获取客户端IP
// connsockfd = accept(listenfd, (struct sockaddr *)&clientaddr, &len);
fd = (int)accept(sock->rawFd(), nullptr, nullptr);
} while (-1 == fd && UV_EINTR == get_uv_error(true));
if (fd == -1) {
int err = get_uv_error(true);
if (err == UV_EAGAIN) {
//没有新连接
return 0;
}
auto ex = toSockException(err);
emitErr(ex);
ErrorL << "Accept socket failed: " << ex.what();
return -1;
}
SockUtil::setNoSigpipe(fd);
SockUtil::setNoBlocked(fd);
SockUtil::setNoDelay(fd);
SockUtil::setSendBuf(fd);
SockUtil::setRecvBuf(fd);
SockUtil::setCloseWait(fd);
SockUtil::setCloExec(fd);
// 优先执行这里创建 Socket,考虑负载均衡
// 只创建Socket使用。
Socket::Ptr peer_sock;
try {
//此处捕获异常,目的是防止socket未accept尽,epoll边沿触发失效的问题
LOCK_GUARD(_mtx_event);
//拦截Socket对象的构造
// 该_poller只用于判断,创建Socket用最轻负载的poller
peer_sock = _on_before_accept(_poller);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when emit on_before_accept: " << ex.what();
close(fd);
continue;
}
// _on_before_accept(_poller)失败才会走这里
// 这里继承父poller,没有负载均衡。
if (!peer_sock) {
//此处是默认构造行为,也就是子Socket共用父Socket的poll线程并且关闭互斥锁
peer_sock = Socket::createSocket(_poller, false);
}
//设置好fd,以备在onAccept事件中可以正常访问该fd
// 此时获取客户端IP和port
// int fd 转换为SockFD::Ptr &peer_sock_fd
auto peer_sock_fd = peer_sock->setPeerSock(fd);
// 实际先执行_on_accept(peer_sock, completed);
// 后执行attachEvent
shared_ptr<void> completed(nullptr, [peer_sock, peer_sock_fd](void *) {
try {
//然后把该fd加入poll监听(确保先触发onAccept事件然后再触发onRead等事件)
if (!peer_sock->attachEvent(peer_sock_fd)) {
//加入poll监听失败,触发onErr事件,通知该Socket无效
peer_sock->emitErr(SockException(Err_eof, "add event to poller failed when accept a socket"));
}
} catch (std::exception &ex) {
ErrorL << "Exception occurred: "<< ex.what();
}
});
try {
//此处捕获异常,目的是防止socket未accept尽,epoll边沿触发失效的问题
LOCK_GUARD(_mtx_event);
//先触发onAccept事件,此时应该监听该Socket的onRead等事件
// Socket::setOnAccept() 设定的回调函数
_on_accept(peer_sock, completed);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when emit on_accept: " << ex.what();
continue;
}
} // end if
if (event & EventPoller::Event_Error) {
auto ex = getSockErr(sock);
emitErr(ex);
ErrorL << "TCP listener occurred a err: " << ex.what();
return -1;
}
}
}
accept()接受客户端请求
系统接口。
创建socket
创建客户端socket,这个地方有两步操作:
a) 优先执行Socket::_on_before_accept,根据poller负载创建socket。
b) 保底执行Socket::createSocket(_poller, false),根据父poller创建socket。
Socket::_on_before_accept
根据poller负载创建socket。
Socket::createSocket()
Socket::createSocket() 继承父poller创建socket,逻辑上不执行该函数,该函数没有poller负载的考虑。除非Socket::_on_before_accept创建socket失败。
SocketHelper::_on_create_socket 实际调用TcpServer::_on_create_socket。
Socket::_on_before_accept实际也调用的是TcpServer::_on_create_socket。
3rdpart/ZLToolKit/src/Network/Socket.cpp
Socket::Ptr SocketHelper::createSocket(){
// 使用父_poller,没有负载的考虑
return _on_create_socket(_poller);
}
设置socket ip port
3rdpart/ZLToolKit/src/Network/Socket.cpp
SockFD::Ptr Socket::setPeerSock(int fd) {
closeSock();
// 该函数初始化的客户端ip和port
auto sock = makeSock(fd, SockNum::Sock_TCP);
LOCK_GUARD(_mtx_sock_fd);
_sock_fd = sock;
return sock;
}
Socket::_on_accept接受请求
Socket::_on_accept核心调用TcpServer::onAcceptConnection():
- 创建session;
- 注册数据上抛回调Socket::_on_read,实际调用Session::onRecv()->RtspSession::onRecv()。
赋值:
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnAccept(onAcceptCB cb) {
LOCK_GUARD(_mtx_event);
if (cb) {
// 走了这里
_on_accept = std::move(cb);
} else {
_on_accept = [](Socket::Ptr &sock, shared_ptr<void> &complete) {
WarnL << "Socket not set accept callback, peer fd: " << sock->rawFD();
};
}
}
参考:TcpServer::start_l()->TcpServer::setupEvent()->Socket::setOnAccept()。
3rdpart/ZLToolKit/src/Network/Server.cpp
void TcpServer::setupEvent() {
......
_socket = createSocket(_poller);
......
// 回调函数2
// 实际是为Socket::_on_accept赋值。
// 该回调内调用TcpServer::onAcceptConnection()
// sock实际为Socket::_on_before_accept根据poller负载创建。
_socket->setOnAccept([weak_self](Socket::Ptr &sock, shared_ptr<void> &complete) {
if (auto strong_self = weak_self.lock()) {
auto ptr = sock->getPoller().get();
// 逆向寻找server
auto server = strong_self->getServer(ptr);
ptr->async([server, sock, complete]() {
//该tcp客户端派发给对应线程的TcpServer服务器
server->onAcceptConnection(sock);
});
}
});
......
}
Socket::attachEvent() 绑定数据可读回调
Socket::attachEvent() 主要有以下三个功能:
- 获取共享缓冲区;
- 注册数据可读事件;
- 数据上抛回调;
i.recvFrom()接收数据;
ii.执行Socket::onRead()->Socket::_on_read上抛数据。
3rdpart/ZLToolKit/src/Network/Socket.cpp
bool Socket::attachEvent(const SockFD::Ptr &sock) {
weak_ptr<Socket> weak_self = shared_from_this();
weak_ptr<SockFD> weak_sock = sock;
_enable_recv = true;
_read_buffer = _poller->getSharedBuffer();
auto is_udp = sock->type() == SockNum::Sock_UDP;
int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, weak_sock, is_udp](int event) {
auto strong_self = weak_self.lock();
auto strong_sock = weak_sock.lock();
if (!strong_self || !strong_sock) {
return;
}
if (event & EventPoller::Event_Read) {
strong_self->onRead(strong_sock, is_udp);
}
if (event & EventPoller::Event_Write) {
strong_self->onWriteAble(strong_sock);
}
if (event & EventPoller::Event_Error) {
strong_self->emitErr(getSockErr(strong_sock));
}
});
return -1 != result;
}
申请共享缓冲区。
3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
BufferRaw::Ptr EventPoller::getSharedBuffer() {
auto ret = _shared_buffer.lock();
if (!ret) {
//预留一个字节存放\0结尾符
ret = BufferRaw::create();
ret->setCapacity(1 + SOCKET_DEFAULT_BUF_SIZE);
_shared_buffer = ret;
}
return ret;
}
注册数据可读事件
addEvent()数据可读事件。
数据上抛回调
recvFrom()接收数据;执行Socket::onRead()->Socket::_on_read上抛数据。
Socket::onRead() 使用数据上抛回调Socket::_on_read 接收数据并且上抛。
3rdpart/ZLToolKit/src/Network/Socket.cpp
ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept{
ssize_t ret = 0, nread = 0;
auto sock_fd = sock->rawFd();
auto data = _read_buffer->data();
//最后一个字节设置为'\0'
auto capacity = _read_buffer->getCapacity() - 1;
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
while (_enable_recv) {
do {
nread = recvfrom(sock_fd, data, capacity, 0, (struct sockaddr *)&addr, &len);
} while (-1 == nread && UV_EINTR == get_uv_error(true));
if (nread == 0) {
if (!is_udp) {
emitErr(SockException(Err_eof, "end of file"));
} else {
WarnL << "Recv eof on udp socket[" << sock_fd << "]";
}
return ret;
}
if (nread == -1) {
auto err = get_uv_error(true);
if (err != UV_EAGAIN) {
if (!is_udp) {
emitErr(toSockException(err));
} else {
WarnL << "Recv err on udp socket[" << sock_fd << "]: " << uv_strerror(err);
}
}
return ret;
}
if (_enable_speed) {
// 更新接收速率
_recv_speed += nread;
}
ret += nread;
data[nread] = '\0';
//设置buffer有效数据大小
_read_buffer->setSize(nread);
//触发回调
LOCK_GUARD(_mtx_event);
try {
//此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题
// 该回调在Socket::setOnAccept()->TcpServer::onAcceptConnection()->Socket::setOnRead()设置为RtspSession::onRecv()。
_on_read(_read_buffer, (struct sockaddr *)&addr, len);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when emit on_read: " << ex.what();
}
}
return 0;
}
Socket::setOnAccept()->TcpServer::onAcceptConnection()->Socket::setOnRead()注册Socket::_on_read。
3rdpart/ZLToolKit/src/Network/Socket.cpp
void Socket::setOnRead(onReadCB cb) {
LOCK_GUARD(_mtx_event);
if (cb) {
// 走这里
_on_read = std::move(cb);
} else {
_on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) {
WarnL << "Socket not set read callback, data ignored: " << buf->size();
};
}
}
RtspSession继承自Session。
该回调实际调用纯虚函数Session::onRevc(),实际调用RtspSession::onRecv()。
3rdpart/ZLToolKit/src/Network/Server.h
class Session : public std::enable_shared_from_this<Session>, public SocketHelper {
public:
using Ptr = std::shared_ptr<Session>;
Session(const Socket::Ptr &sock);
~Session() override = default;
/**
* 接收数据入口
* @param buf 数据,可以重复使用内存区,不可被缓存使用
*/
virtual void onRecv(const Buffer::Ptr &buf) = 0;
......
};
EventPollerPool::Instance().for_each()
循环创建服务器实例
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::start_l(uint16_t port, const std::string &host, uint32_t backlog) {
……
// for_each()继承自父类
//一个线程有一个TaskExecutor,现在是用这个函数处理每个 TaskExecutor
EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
// 如果poller 与当前poller相同,则返回
EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
if (poller == _poller || !poller) {
return;
}
// std::unordered_map<const EventPoller *, Ptr> _cloned_server;
// clone (TcpServer)serverRef,相当于有多个server
// 这里返回serverRef=null
auto &serverRef = _cloned_server[poller.get()];
// 如果serverRef查找失败
// 则调用onCreatServer()创建TcpServer,并绑定poller。
if (!serverRef) {
// 此时poller已创建完毕,poller不为空
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});
InfoL << "TCP server listening on [" << host << "]: " << port;
}
for_each()实现,继承自父类。
3rdpart/ZLToolKit/src/Thread/TaskExecutor.cpp
void TaskExecutorGetterImp::for_each(const function<void(const TaskExecutor::Ptr &)> &cb) {
for (auto &th : _threads) {
// 该回调即为EventPollerPool::Instance().for_each()注册的lambda
cb(th);
}
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
TcpServer::Ptr TcpServer::onCreatServer(const EventPoller::Ptr &poller) {
return std::make_shared<TcpServer>(poller);
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::cloneFrom(const TcpServer &that) {
if (!that._socket) {
throw std::invalid_argument("TcpServer::cloneFrom other with null socket");
}
// 创建Socket::Ptr TcpServer::_socket
setupEvent();
_on_create_socket = that._on_create_socket;
_session_alloc = that._session_alloc;
// 克隆一个相同fd的Socket对象
// 拷贝socket 包含SockFD,并listen。
_socket->cloneFromListenSocket(*(that._socket));
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
_timer = std::make_shared<Timer>(2.0f, [weak_self]() -> bool {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->onManagerSession();
return true;
}, _poller);
this->mINI::operator=(that);
_parent = &that;
}
setupEvent() 参考:TcpServer::start()->TcpServer::start_l()->TcpServer::setupEvent()。文章来源:https://www.toymoban.com/news/detail-508841.html
3rdpart/ZLToolKit/src/Network/Socket.cpp
bool Socket::cloneFromListenSocket(const Socket &other) {
auto sock = cloneSockFD(other);
if (!sock) {
return false;
}
return listen(sock);
}
3rdpart/ZLToolKit/src/Network/Socket.cpp
SockFD::Ptr Socket::cloneSockFD(const Socket &other) {
SockFD::Ptr sock;
{
LOCK_GUARD(other._mtx_sock_fd);
if (!other._sock_fd) {
WarnL << "sockfd of src socket is null";
return nullptr;
}
sock = std::make_shared<SockFD>(*(other._sock_fd), _poller);
}
return sock;
}
3rdpart/ZLToolKit/src/Network/TcpServer.cpp
void TcpServer::onManagerSession() {
assert(_poller->isCurrentThread());
onceToken token([&]() {
_is_on_manager = true;
}, [&]() {
_is_on_manager = false;
});
// std::unordered_map<SessionHelper *, SessionHelper::Ptr> _session_map;
for (auto &pr : _session_map) {
//遍历时,可能触发onErr事件(也会操作_session_map)
try {
pr.second->session()->onManager();
} catch (exception &ex) {
WarnL << ex.what();
}
}
}
–END–
参考:
https://blog.csdn.net/yudaichenydc/article/details/127997301文章来源地址https://www.toymoban.com/news/detail-508841.html
到了这里,关于ZLMediaKit源码分析(一)服务启动的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!