前言:
- 这里的多线程主要指算法部署时所涉及的多线程内容,对于其他多线程知识需要自行补充
- 常用组件有thread、mutex、promise、future、condition_variable
- 启动线程,thread,以及join、joinable、detach、类函数启动为线程
- 生产者消费者模式
- 队列溢出的问题,生产太快,消费太慢。如何实现溢出限制
- 生产者如何拿到消费反馈
- RAII+接口模式的生产者消费者封装,以及多batch的体现
1、线程:
本单元是https://blog.csdn.net/zhuangtu1999/article/details/130903594?spm=1001.2014.3001.5501#t1的实例化单元,更加注重在实战中如何运用线程知识
上图是我们之前的一个例子,可以看到如果没有t.join(),那么线程是会崩掉的,因为他的生命周期在main()函数里面,所以当main结束之后他也会析构掉,但线程里面还在执行,所以会出现错误。
那如果要是没有启动线程就join呢?
可以看到也是会出错的。
所以可以发现:当线程启动了,就必须join,但如果线程没有启动,那就一定不能join,那这样肯定是很麻烦的,所以引申出来了下面的joinable概念:
joinable:
if (t1.joinable()) { t1.join(); }
这样的话无论线程有没有启动,退出的时候都可以正常退出。
detach
detach是分离线程,取消管理权,使线程变成野线程。但这个通常不建议使用
如果一个线程一旦detach之后,线程就交给了系统作管理,当程序结束后自动退出
可以看到在延迟了500毫秒之后,worker down这条语句来不及打印,就会直接退出。
参数传递:
在传递引用类型时,传入参数应用ref
类函数:
如果我们正常想搞一个类函数私有线程和方法,那我们首先想到的是可以这样做:
class Infer{
public:
Infer(){
worker_thread = thread(Infer_worker);
}
private:
thread worker_thread;
void Infer_worker(){
}
};
但这样做会报错:
显示他不是一个静态函数,但如果我们想要把这个函数加上static
这样做的确可以成功运行,但每次都要加上sellf很麻烦。
可以通过取地址方式,保留this参数,这样做可以避免static无法使用this->的操作。
2、生产者消费者模式
生产者:
queue<string> qjobs_; void video_capture(){ int pic_id = 0 ; while (true) { /* code */ char name[100]; sprintf(name ,"PIC - %d" , pic_id++); printf("生产了一个新图片:%s\n" , name ); qjobs_.push(name); this_thread::sleep_for(chrono::milliseconds(1000)); } }
消费者:
void infer_worker(){ while (true) { if (!qjobs_.empty()) { /* code */auto pic = qjobs_.front(); qjobs_.pop(); printf("消费掉一个图片:%s \n" , pic.c_str()); this_thread::sleep_for(chrono::milliseconds(1000)); } this_thread::yield(); } }
结果就是这样:
由于生产一个图片需要1ms,消费一张图片需要1ms,所以二者刚好是收支平衡的状态。
但这个设计到了一个共享资源访问的问题,queue不是线程安全的。
这就是设计到了mutex(https://blog.csdn.net/zhuangtu1999/article/details/130917521?spm=1001.2014.3001.5501)
我们设计了一个mutex然后在创建和消费的时候都加上🔓:
mutex lock_;
void video_capture(){
int pic_id = 0 ;
while (true)
{
/* code */
{
lock_guard<mutex> l(lock_);
char name[100];
sprintf(name ,"PIC - %d" , pic_id++);
printf("生产了一个新图片:%s\n" , name );
qjobs_.push(name);
this_thread::sleep_for(chrono::milliseconds(2000));
}
}
}
void infer_worker(){
while (true)
{
if (!qjobs_.empty())
{
{
lock_guard<mutex> l(lock_);
auto pic = qjobs_.front();
qjobs_.pop();
printf("消费掉一个图片:%s \n" , pic.c_str());
}
this_thread::sleep_for(chrono::milliseconds(1000));
}
this_thread::yield();
}
}
这就变成了一个原子操作。
condtion_varible:
如果生产太快,消费太慢,队列就会不断的增长,如果存储的是图片,可能就会造成显存爆炸:
所以设置一个上限就很有必要,这时就需要<codition_variable>这个头文件顾名思义,这个意思是条件变量。
if (qjobs_.size() > limite) { /* code */wait(); } qjobs_.push(name); //如果队列满了,就先不生产,等有空间了再生产
如果队列满了,就先不生产,等有空间了再生产,那就在push前面做一个判断。
如何通知函数,让wait退出,条件:消费掉一个就可以去通知
condition_variable cv_; void video_capture(){ int pic_id = 0 ; while (true) { /* code */ { unique_lock<mutex> l(lock_); char name[100]; sprintf(name ,"PIC - %d" , pic_id++); printf("生产了一个新图片:%s , qjob_.size = %d \n" , name , qjobs_.size() ); if (qjobs_.size() > limit) { // /* code */wait(); //如果队列满了,就先不生产,等有空间了再生产 //如何通知函数,让wait退出,条件:消费掉一个就可以去通知 cv_.wait(l, [& ]( ){ return qjobs_.size() < limit; //return false 继续等待 //return true 跳出等待 }); } qjobs_.push(name); //如果队列满了,就先不生产,等有空间了再生产 } this_thread::sleep_for(chrono::milliseconds(500)); } }
要注意:这里把lock_guard 换成了 unique_lock 这是为了能提前解锁。
而我们之前所说的队列满了就不生产实现了,还有通知没有实现。
void infer_worker(){ while (true) { if (!qjobs_.empty()) { { lock_guard<mutex> l(lock_); auto pic = qjobs_.front(); qjobs_.pop(); printf("消费掉一个图片:%s \n" , pic.c_str()); cv_.notify_one(); } this_thread::sleep_for(chrono::milliseconds(1000)); } this_thread::yield(); } }
就是在消费掉过后使用notify_one函数
wait的流程:
一旦进入wait()则解锁,解锁时video的线程停止运行,而infer的线程则获得了锁,正常进行
一旦退出wait()则加锁,加锁后video和infer又恢复为原子操作。
notify_one() 是 condition_variable 类的一个成员函数,它的作用是唤醒一个正在等待 condition_variable 的线程,使其获得锁并继续执行。它只会唤醒一个等待的线程,所以如果有多个线程在等待,那么只有一个线程会被唤醒,其他线程还是处于等待状态。使用 notify_one() 的方法是:在线程中使用 std::unique_lock 对象来锁定 condition_variable,然后调用 condition_variable 的 wait() 方法来等待,当线程需要唤醒其他线程时,调用 notify_one() 来唤醒等待的线程。
notify_one:此时调用notify_one会随机唤醒一个阻塞的线程,而其余的线程将仍然处于阻塞状态,等待下一次唤醒。
notify_all:调用notify_all则会唤醒所有线程,线程会争抢锁,当然只有一个线程会获得到锁,而其余未获得锁的线程也将不再阻塞,而是进入到类似轮询的状态,等待锁资源释放后再去争抢。假如同时有10个线程阻塞在wait方法上,则需要调用10次notify_one,而仅仅只需要调用1次notify_all
生产者如何拿到消费反馈:
也就是在我拿到了infer后的结果,我想将其送回到生产者该怎么办?
普通模式:
detection -> infer
face -> infer
feather -> push
异步模式:
detection -> push
face -> push
feather -> push
对于普通模式而言,每个线程的上下文都要去初始化,而且调用什么的也很费时间 , 所以使用异步操作才是更好的高性能选择。
这里需要用到future库
struct Job{ shared_ptr<promise<string>> pro ; string input; };
因为本次操作的是promise类型的,所以需要重新定义一个结构体
void video_capture(){ int pic_id = 0 ; while (true) { Job jobs; /* code */ { unique_lock<mutex> l(lock_); char name[100]; sprintf(name ,"PIC - %d" , pic_id++); printf("生产了一个新图片:%s , qjob_.size = %d \n" , name , qjobs_.size() ); // if (qjobs_.size() > limit) // { // /* code */wait(); //如果队列满了,就先产,等有空间了再生产 //如何通知函数,让wait退出,条件:消费掉一个就可以去通知 cv_.wait(l, [& ]( ){ return qjobs_.size() < limit; //return false 继续等待 //return true 跳出等待 }); // } jobs.pro.reset(new promise<string>());//promise就是说未来必须要返回一个结果 jobs.input = name; qjobs_.push(jobs); //如果队列满了,就先不生产,等有空间了再生产 } auto result = jobs.pro->get_future().get(); printf("%s-------->result:%s \n" ,jobs.input.c_str(), result .c_str()); this_thread::sleep_for(chrono::milliseconds(400)); } }
因为是jobs.pro.reset(new promise<string>())的变量,所以代表着一定会返还一个结果。
void infer_worker(){ while (true) { if (!qjobs_.empty()) { { unique_lock<mutex> l(lock_); auto pjobs_ = qjobs_.front(); qjobs_.pop(); cv_.notify_one(); printf("消费掉一个图片:%s \n" , pjobs_.input.c_str()); auto new_pic = pjobs_.input+"------infer"; pjobs_.pro->set_value(new_pic); } this_thread::sleep_for(chrono::milliseconds(1000)); } this_thread::yield(); } }
所以在infer里加上了
auto new_pic = pjobs_.input+"------infer"; pjobs_.pro->set_value(new_pic);
最后在生产者中
auto result = jobs.pro->get_future().get(); printf("%s-------->result:%s \n" ,jobs.input.c_str(), result .c_str());
但这部分要放在锁外,要不然会因为一直等待infer传回结果,而此时infer所在线程不被执行。造成死锁文章来源:https://www.toymoban.com/news/detail-613247.html
文章来源地址https://www.toymoban.com/news/detail-613247.html
到了这里,关于基础篇:多线程所需知识:的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!