【linux】线程同步+基于BlockingQueue的生产者消费者模型

这篇具有很好参考价值的文章主要介绍了【linux】线程同步+基于BlockingQueue的生产者消费者模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

喜欢的点赞,收藏,关注一下把!【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

1.线程同步

在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢?

讲个小故事。
假设学校有一个VIP学霸自习室,这个自习室有一个特点,里面的环境巨好,但是只有一张桌椅板凳,只允许一个人进去学习。要去这个自习室学习学校规定必须是先到先得,这个自习室推出的时候在门口打了一根钉子挂了一把钥匙,早上谁先来谁拿这把钥匙进去把门反锁学习,此时他就互斥式的访问了这个自习室,你在自习室学习了两小时,你想上个厕所,此时你把自习室门打开,看到大家三五成群的议论着谁先来的,当你出来时大家都看着你,你就想上个厕所马上就回来,此时你把钥匙放在口袋里上厕所去了,在你上厕所期间别人也进不去这个自习室,就叫做什么,当你这个线程执行流被切走的时候你不是一个人走的,你是抱着钥匙走了,然后你上完厕所回来把门一开进去再反锁又学了两小时,此时门口人越来越多,你不好意思了,你出来把钥匙挂在墙上,你此时是距离钥匙最近的,当你正想离开时你想这么走是不是有点遗憾,好不容易起一大早。不行还得在学一会,你又拿着钥匙进去又学了一小时,这叫什么,这是你当前离这个资源最近,竞争能力最强你此时又拿到这个钥匙又进去反锁学习了。然后你饿了想去吃饭,开门放钥匙想着不知道一会什么时候在到你于是你又拿钥匙进去了,其他同学就看到这个人出来挂钥匙又拿钥匙再进去,因为你最近,所以大家就看到一个人在疯狂的开门放钥匙拿钥匙关门,这样这间自习室就没有创造价值!

此时我们把其他在门口的人长时间得不到锁资源而无法访问公共资源的人,处于门口这些同学叫做处于饥饿状态。一个线程频繁申请锁资源而导致其他线程得不到锁资源这就叫做饥饿问题。

这个同学错了吗?他没错,学校就是这样规定的啊,所以他没错,但是不合理。学校经过大量同学的反应于是出了一个规定:1.所有在门口等待的同学必须排队,2.从自习室出来的同学不能立马申请锁必须先排到当前队列的尾部然后到你的时候才能拿钥匙。

所以当你拿到钥匙进去学习在出来把钥匙挂墙上,你还想拿,老师马上阻止你,干什么呢?去排队去。所有同学都是这样出来之后不能在拿钥匙必须去排队。此时就在数据安全的情况下,让我们的同学按照一定的顺序访问锁的资源。这就叫做线程同步

线程同步的本质:当我们在进行临界资源安全访问的前提下,让多个线程按照一定的顺序进行资源访问,这就是线程同步!

为了支持同步因此又有了条件变量!
在继续谈条件变量接口和使用之前,我们先谈一谈一个比较关键的概念。
生产者消费者模型。生产消费者模型谈完我们在谈条件变量,两个概念谈完我们把它们结合在一起最后写一份基于Blcokqueue的生产者消费者模型。

2.生产者消费者模型

我们以生活中的例子来帮我们理解什么是生产消费模型
我们在生活中经常要去超市购买东西。
学生—消费者
超市是生产者吗?
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
咋一看好像是,但并不是,超市只是一个将东西聚集起来分发的交易场所,
各种商品的供货商才是生产者
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
那如果你想买一根火腿肠,你为什么不能供货商哪里去买呢?
你说供货商你给我生产一根火腿肠我给你一块钱你卖给我,供货商是不会卖给你的,他启动机器生产都不值这个价。就靠这样一人一次卖几根似的卖货,他亏死了,机器还要维护员工还要养。一般现实生活中这些工厂都比较距离人员较远,而超市距离人员较近。这样就由超市把各种需求结合在一起然后去找供货商拿货并且拿货量比较大。所以超市就充当了中间商。
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
现实生活供货商把商品放到超市,消费者再去超市销售,二者间接通过超市进行交易。
你今天去超市买火腿肠,那这个供货商在干什么呢?
他可能在放假或者生产等等。
那供货商在生产,消费者在干什么呢?
有可能吃火腿肠或者玩等等。
换句话说因为有超市的存在,我们的生产过程和消费过程就互相不怎么干扰了。
按计算机语言来说就是生产的过程和消费的过程 — 解耦
超市是一个临时保存产品的场所 — 缓冲区

正是因为有缓冲区的存在,生产出来的数据可以暂时放到超市里可以供消费者一段时间的消费,同样超市提供一大批展架可以供生产者生产。所以有了缓冲区可以让生产消费的步调并不怎么一致!

那有没有写代码的时候,就没有正儿八经的解耦,也没有提供缓存区的代码呢?
有,函数调用!
调用方:生产了数据
形成变量:变量暂时保存数据
目标函数:消费了数据
比如说main调用fun,main在调用fun的时候,main函数在干什么呢?
等待!什么都不做只能等fun函数调用结束返回。main和fun是强耦合的关系。
就比如不存在超市的情况,学生直接去找供货商去买火腿肠,这个工厂没有存货只能开机器生产,在生产的时候,学生只能等待!

下面我们结合上面的例子谈一谈生产消费模式
超市能被学生去消费,也能被供货商去生产。那超市是什么呢?
超市 — 共享资源
比如供货商正在给超市架子上放火腿肠,学生就过来拿了。那能拿到吗?
不一定!放火腿肠,要么就放,要么不放,不存在中间状态!如果有中间状态!那正在放时,他当前有没有在放就直接决定了我能不能拿成功。所以他有没有放是不确定的

所以生产和消费在并发的访问同一份资源的时候,就可能存在同时访问的问题,就类似抢票的造成数据不一致问题。
生活中这种情况很少存在,但是当两个线程就有可能,比如一个线程正在放hello和world,另一个线程想拿world,但还没有放完另一个线程就拿了,你怎么保证拿到的就是world?
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
下面我们就生产者和消费者的关系来谈一谈

生产者和生产者之间什么关系呢?
生活中他俩是竞争关系!!为什么呢?超市货架就那么多,放了你王中王就放不了双汇

消费者和消费者之间什么关系呢?
生活中他俩是竞争关系!比如世界末日超市就一根火腿肠,你俩不就是竞争关系了

生产者和消费者之间什么关系呢?
超市里有一个架子,生产者正在生产的时候你消费者能拿吗?答案是不能。就如刚才hello和world的问题。所以生产者和消费者之间首先要保存数据安全!只允许一个人去访问这份资源。还有比如你今天去买火腿肠但是超市火腿肠卖完了又被告知供货商放假了,然后第二天又去超市问有没有火腿肠超市还说没有,这样连续问了一个月,都告诉你没有,那么你这是在干什么呢?当前你在检测这个火腿肠的就绪状态,但是你一直再问都没有得到你想要的结果,第一这是浪费你的时间,本来没有货你还来问,第二也浪费了超市的时间,可能消费者并不是只有你一个都来问。本来超市可以去找供货商的。同理供货商来问超市要不要火腿肠,超市说没客人就不要你们就等等,供货商就一直在询问,同样也在浪费供货商的时间。它们都没错,首先我们保证了资源的安全,只允许一个人去访问,但是呢并不合理!假如你去超市买火腿肠,但超市暂时没有,超市把你微信留下来等有了再告诉你,然后你再来,对供货商也是一样的做法,这样不就合理了吗,生产一部分消费一部分。让我们的生产和消费协同起来。

总结:
生产者和生产者之间:互斥
消费者和消费者之间:互斥
生产者和消费者之间:互斥&&同步

下面我们把上面零零散散的知识做一个总结
生产者消费者模型其实只需要记住记住一个原则就行了
"321"原则
3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥(保证共享资源的安全性)&&同步)
2种角色:生产者线程,消费者线程
1个交易场所:一段特定结构的缓冲区

未来只要我们想写生产消费模型,我们本质工作其实就是维护321原则

生产消费模型的特点:
1.生产线程和消费线程进行解耦
2.支持生产和消费的一段时间的忙闲不均的问题
3.提高效率

生产消费提高效率,但是生产者和消费者存在互斥关系。可能生产者生产一个数据我们消费者然后去拿,由于互斥的存在,生产者只能等待,同理消费者来消费的时候发现超市没有商品,消费者只能等生产者来生产,换句话说只要我们维持严格的互斥关系,就退化成生产一个消费一个,消费一个生产一个,这样能保证是高效的吗?我们所说的高效到底体现到哪里?我们等下说。

到目前为止生产消费原理我们告一段段落,目前线程我们有了,互斥我们也有了,接下来我们谈谈如何实现同步!

当我们有了生产消费的概念,有了多线程需要协同的要求,然后再学习条件变量让我们多线程协同的技术时,才更合理!

条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

就好比今天在抢票,与抢票相对的就是放票,如果不放票,但是这些抢票的人也不怎么什么时候放票,其他人是不是只能疯狂的申请锁判断票数是否大于0,大于0就抢,小于0就推出,所有人都在进行刷票。

  • 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中在将该线程唤醒。这种情况就需要用到条件变量。

接下来我们见见条件变量的接口然后再举例子理解一下

如果你想使用一个条件变量,你首先需要定义个条件变量
条件变量也是一个数据类型,是pthread库给我们提供的

pthread_cond_t     //数据类型

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
使用前要初始化
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
cond:要初始化的条件变量
attr:NULL

不用了就销毁
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
如果条件变量是静态或者全局的我们如下初始化,就和互斥锁一样的做法。
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

目前我们访问临界资源的写法,就像抢票逻辑那样

先加锁
再判断 —>生产和消费条件是否满足
最后解锁

如果条件不满足,就不要再申请,而是将自己阻塞挂起!

如何挂起?条件变量给我们提供了这样的函数
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释

同样我们知道未来这个票数大于0了,我就可以去抢了,是不是也要把我叫醒啊,所以我们条件变量必然匹配了一个唤醒函数

将在指定条件变量下等待的线程尽快唤醒
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
唤醒一批
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
唤醒一个
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

接下来我们举个例子理解一下条件变量

今天有很多人去面试,面试地点是一个公司的会议室。当前面试官正在面试一个人,当前能不能很多人都去这个会议室让面试官面试呢?当然不能,这个面试官是一份临界资源必须要被互斥的访问,所以一次只能有一人到房间去面试。但是这家公司组织的特别不好,当前一个人面试完了让下一个人去面试的时候,大家都说我先来的你先面我,这么多人在无序的情况对这份临界资源展开竞争了,这个面试官在里面面试他也不清楚,只能是根据就近原则。面试官再次面试完一个人,大家又是一窝蜂的说我先来的。可能面试管是一个脸盲,前一个人觉得自己面的不好,但是他是最近的所以他又去面试去了,导致整个面试就他一个人在疯狂面试!到底面试效果并不好!
后来呢,有一个非常厉害的hr,这个hr管理能力很强,hr立了一个牌子叫做等待区,
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
让所有等待面试的人都给我区等待区去排队等,我只会按顺序的从等待区叫下一个面试的人。
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
此时这个等待区就是我们的条件变量
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
换句话说未来所有应聘者去等待面试时,都必须去条件变量下去等,面试官去唤醒某个人时,一定也是通过条件变量来唤醒等待的线程。

这个例子我们的理解是:当条件不满足的时候,我们线程必须去某些定义好的条件变量下进行等待!

接下来我们在以一张图从内核中理解这个条件变量
条件变量就是一个数据类型,假设它里面有一个状态属性,一个队列
,我们也有很多的线程
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
当线程申请某个资源但条件不就绪的情况下,这些线程都去队列下排队
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
当条件满足时,就可以唤醒等待的线程,拿到CPU上开始调度
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
所以我们可以认为条件变量带一个队列,条件不满足时,线程都去排队等待。

所以我们刚刚学了两个接口
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
下面我们先写一个简单的案例用一用条件变量

#include <iostream>
#include <pthread.h>
#include <cstdio>
#include <string>
#include <unistd.h>

int ticket = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

using namespace std;

void *GetTicket(void *args)
{
    string name = static_cast<const char *>(args);
    while (true)
    {
        pthread_mutex_lock(&lock);
        // 线程进来都先排队
        pthread_cond_wait(&cond, &lock);//为什么要有lock,后面就说
        if (ticket > 0)
        {
            usleep(1000);
            cout << name << " 正在进行抢票: " << ticket << endl;
            ticket--;
            pthread_mutex_unlock(&lock);
        }
        else
        {
            pthread_mutex_unlock(&lock);
            break;
        }
    }
}

int main()
{
    pthread_t t[5];
    for (int i = 0; i < 5; ++i)
    {
        char *name = new char[64];
        snprintf(name, 64, "thread %d", i + 1);
        pthread_create(t+i, nullptr, GetTicket, name);
    }

    while (true)
    {
        sleep(1);
        // 主线程一秒唤醒一个线程
        pthread_cond_signal(&cond);
        std::cout << "main thread wakeup one thread..." << std::endl;
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_join(t[i], nullptr);
    }

    return 0;
}

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
加了条件变量,我们的线程就按照一定的顺序进行访问同一份资源了。因为他们都在条件变量下排队的。

    while (true)
    {
        sleep(1);
        // 一秒唤醒一个线程
        //pthread_cond_signal(&cond);
        //唤醒一批线程
        pthread_cond_broadcast(&cond);
        std::cout << "main thread wakeup one thread..." << std::endl;
    }

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

现在我们左手有生产者消费者模型,右手有互斥和同步,接下来我们怎么把它们结合在一起呢?
所以我们接下来写一份基于BlockingQueue的生产者消费者模型

3.基于BlockingQueue的生产者消费者模型

BlockingQueue是一个阻塞队列,首先它是一个队列,既然是一个队列就有为空的情况,同时我们对队列设定一个上限。这时当队列为满为空就要约束生产和消费应该阻塞住不应该在生产和消费了。这种我们就称之为BlockQueue。

未来生产者一定是向BlockQueue里放数据,此时BlockQueue就是一段特定结构的缓冲区,消费者一定是向BlockQueu里取数据。

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
要写生产者和消费者模式必须满足321原则,不过我们刚开始学,我们先写单生产者和单消费者维护它们之间的互斥和同步关系。后面代码写完我们在推而广之变成多生产者和多消费者。

3种关系,先写单生成,单消费
2种角色,生产者线程,消费者线程
1个交易场所,BlockQueue阻塞队列

站在编程的角度,线程A往队列中放,线程B往队列中拿。这个队列就是两个线程的共享资源。线程A放的时候线程B不能拿,线程B拿的时候线程A不能放。队列满的时候生产者线程A就不能生产了,要想办法让线程A去等待,队列空的时候消费者线程B也不能拿了,也要想办法让线程B去等待。因此我们所学互斥锁和条件变量都是需要的。
今天我们直接用C++的queue充当我们的阻塞队列。

上层调用逻辑大的框架我们先写出来

#include"BlockQueue.hpp"

void* productor(void* args)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);

    while(true)
    {
        //生产活动,不断向阻塞队列中放
    }
    return nullptr;

}

void* consumer(void* args)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        //消费活动,不断向阻塞队列中取
    }
	return nullptr;
}

int main()
{
    BlockQueue<int>* bq=new BlockQueue<int>();

    pthread_t p,c;
    //两个线程看到同一个阻塞队列
    pthread_create(&p,nullptr,productor,bq);
    pthread_create(&c,nullptr,consumer,bq);

    pthread_join(p,nullptr);
    pthread_join(p,nullptr);

    delete bq;

    return 0;
}

阻塞队列大的逻辑框架

#pragma onec

#include<iostream>
#include<queue>
#include<pthread.h>

using namespace std;

const int maxcapacity=5;
 
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& _capacity=maxcapacity)
        :_capacity(capacity)
    {
        
    }

    //生产者放数据
    void push(const T& in)//输入型参数,const &
    {

    }

    //消费者拿数据
    void pop(T* out)//输出型参数,* //输入输出型 &
    {

    }

    ~BlockQueue()
    {

    }

private:
    queue<T> _q;
    int _capacity;//不能让阻塞队列无限扩容,所以给一个最大容量表示队列的上限
    pthread_mutex_t _mutex;//阻塞队列是一个共享资源,所以需要一把锁把它保护起来
    //生产者对应的条件变量
    pthread_cond_t _pcond;//队列满了,一定要让生产者在对应的条件变量下休眠
    //消费者对应的条件变量
    pthread_cond_t _ccond;//队列空了,让消费者也在对应条件变量下休眠
};

接下来把代码写完

#pragma onec

#include<iostream>
#include<queue>
#include<pthread.h>

using namespace std;

const int maxcapacity=5;
 
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& capacity=maxcapacity)
        :_capacity(capacity)
    {
        //构造时初始化
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    //生产者放数据
    void push(const T& in)//输入型参数,const &
    {
        //放之前先加锁保护共享资源,在加锁和解锁之间就是安全的临界资源
        pthread_mutex_lock(&_mutex);
        //1.判满
        if(is_full())//bug?
        {
            //因为生产条件不满足,无法生产,此时我们的生产者进行等待
            pthread_cond_wait(&_pcond,&_mutex);//_muext?
        }
        //2.走到这里一定是没有满
        _q.push(in);
        //3.绝对能保证,阻塞队列里面一定有数据
        pthread_cond_signal(&_ccond);//唤醒消费者,这里可以有一定策略,比如说满足三分之一在唤醒
        pthread_mutex_unlock(&_mutex);

    }

    //消费者拿数据
    void pop(T* out)//输出型参数,* //输入输出型 &
    {
        //这里也要加锁,因为要保证访问同一份资源是安全,所以用的是同一把锁
        pthread_mutex_lock(&_mutex);
        //1.判空
        if(is_empty())//bug?
        {
            pthread_cond_wait(&_ccond,&_mutex);//_mutex?
        }
        //2.走到这里我们能保证,一定不为空
        *out=_q.front();
        _q.pop();
        //3.绝对能保证,阻塞队列里面至少有一个空的位置
        pthread_cond_signal(&_pcond);//这里可以有一定策略
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        //析构时销毁
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

private:
    bool is_full()
    {
        return _q.size()==_capacity;
    }

    bool is_empty()
    {
        return _q.empty();
    }

private:
    queue<T> _q;
    int _capacity;//不能让阻塞队列无限扩容,所以给一个最大容量表示队列的上限
    pthread_mutex_t _mutex;//阻塞队列是一个共享资源,所以需要一把锁把它保护起来
    //生产者对应的条件变量
    pthread_cond_t _pcond;//队列满了,一定要让生产者在对应的条件变量下休眠
    //消费者对应的条件变量
    pthread_cond_t _ccond;//队列空了,让消费者也在对应条件变量下休眠
};
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>

void* productor(void* args)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);

    while(true)
    {
        //生产活动
        int data=rand()%10+1;//在这里先用随机数.构建一个数据
        bq->push(data);
        cout<<"生产数据: "<<data<<endl;
        sleep(1);//生产的慢一些
    }
	return nullptr;
}

void* consumer(void* args)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        //消费活动
        int data;
        bq->pop(&data);
        cout<<"消费数据: "<<data<<endl;
    }
	return nullptr;
}

int main()
{
    //随机数种子
    srand((unsigned int)time(nullptr));

    BlockQueue<int>* bq=new BlockQueue<int>();

    pthread_t p,c;
    //两个线程看到同一个阻塞队列
    pthread_create(&p,nullptr,productor,bq);
    pthread_create(&c,nullptr,consumer,bq);

    pthread_join(p,nullptr);
    pthread_join(p,nullptr);

    delete bq;

    return 0;
}

如何证明这是一份生产者消费者模型呢?
我们先让生产者慢一点生产,生产一个消费一个
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
在让消费者慢一点,看到生产一堆,而消费者只能消费一个,不过消费的是历史数据,消费之后生产者才能继续生产

void* consumer(void* args)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        //消费活动
        int data;
        bq->pop(&data);
        cout<<"消费数据: "<<data<<endl;
        sleep(1);//消费的慢一些
    }
}

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

代码写了测试也都通过了,但是这份代码还有很多细节需要我们雕琢的地方。

以生产为例
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
细节一

首先加锁然后最后才是解锁,在判断满的生产条件不满足被挂起,但是挂起的时候可是在临界区中被挂起,如果我挂起期间还持有锁,那其他线程也进不来。
因此pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
a.该函调用的时候,会以原子性的方式将锁释放,并将自己挂起
b.该函数在被唤醒返回的时候,会自动的重新获取你传入的锁

如果当前醒来锁没有获取成功,你也必须是处于竞争锁的状态,直到获取锁成功了这个函数才会返回。换言之只要这个函数返回了这个锁一定获取成功了。

细节二

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

当前判断生产条件不满足就把自己挂起,但是这有个问题pthread_cond_wait这是一个函数,只要是函数就有调用失败的可能。
另外还存在伪唤醒的情况,假设只有一个消费者,十个生产者。只消费了一个但是却唤醒了一批。但是你这里是if判断,都去push肯定是有问题的。
因此充当条件判断的语法必须是while,不能用if

    void push(const T& in)//输入型参数,const &
    {
        //放之前先加锁保护共享资源,在加锁和解锁之间就是安全的临界资源
        pthread_mutex_lock(&_mutex);
        //1.判满
        while(is_full())
        {
            //因为生产条件不满足,无法生产,此时我们的生产者进行等待
            pthread_cond_wait(&_pcond,&_mutex);
        }
        //2.走到这里一定是没有满
        _q.push(in);
        //3.绝对能保证,阻塞队列里面一定有数据
        pthread_cond_signal(&_ccond);//唤醒消费者,这里可以有一定策略,比如说满足三分之一在唤醒
        pthread_mutex_unlock(&_mutex);

    }

    //消费者拿数据
    void pop(T* out)//输出型参数,* //输入输出型 &
    {
        //这里也要加锁,因为要保证访问同一份资源是安全,所以用的是同一把锁
        pthread_mutex_lock(&_mutex);
        //1.判空
        while(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        //2.走到这里我们能保证,一定不为空
        *out=_q.front();
        _q.pop();
        //3.绝对能保证,阻塞队列里面至少有一个空的位置
        pthread_cond_signal(&_pcond);//这里可以有一定策略
        pthread_mutex_unlock(&_mutex);
    }

细节三
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
pthread_cond_signal这个函数,可以放在临界区内部,也可以放在外部。
也就是说这个唤醒可以放在解锁之前也可以放在解锁之后。但是一般建议放在里面。
因为不关心谁拿到锁,只关心有人生产消费。

下面我们修改一下代码,这样生产消费数据太low了,所以我们写了模板可以放任意内容。我们写一个任务。

//Task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <string>
using namespace std;

class Task
{
    typedef function<int(int, int,char)> func_t;

public:
    Task(){}

    Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    {}

    // 把任务返回去可以看到
    string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }

    // 把生产的任务也打印出来
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;         // 对应+-*/%操作
    func_t _callback; // 回调函数
};


string oper="+-*/%";

// 回调函数
int mymath(int x, int y, char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
        {
            if (y == 0)
            {
                cout << "div zero error" << endl;
                result = -1;
            }
            else
            {
                result = x / y;
            }
        }
        break;
    case '%':
        {
            if (y == 0)
            {
                cout << "mod zero error" << endl;
                result = -1;
            }
            else
            {
                result = x % y;
            }
        }
        break;
    default:
        break;
    }
    return result;
}
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
#include"Task.hpp"

void* productor(void* args)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);

    while(true)
    {
        //生产活动
        int x=rand()%10+1;
        int y=rand()%5;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        bq->push(t);
        cout<<"生产任务: "<<t.toTaskString()<<endl;

        sleep(1);//生产的慢一些
    }

}

void* consumer(void* args)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);

    while(true)
    {
        Task t;
        bq->pop(&t);
        cout<<"消费任务: "<<t()<<endl;
    }

}

int main()
{
    //随机数种子
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq=new BlockQueue<Task>();
    
    pthread_t p,c;
    //两个线程看到同一个阻塞队列
    pthread_create(&p,nullptr,productor,bq);
    pthread_create(&c,nullptr,consumer,bq);

    pthread_join(p,nullptr);
    pthread_join(p,nullptr);

    delete bq;

    return 0;
}

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

现在我还想把需求变一变,我让一个线程来生产派发任务,另一个线程来消费处理任务,再来一个线程记录任务结果,将结果记录在文件中!该怎么办呢?
再来一个阻塞队列!
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言

#pragma once
#include <iostream>
#include <functional>
#include <string>
using namespace std;

class CallTask
{
    typedef function<int(int, int, char)> func_t;

public:
    CallTask() {}

    CallTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    // 把任务返回去可以看到
    string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

    // 把生产的任务也打印出来
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;         // 对应+-*/%操作
    func_t _callback; // 回调函数
};

string oper = "+-*/%";

// 回调函数
int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            cout << "div zero error" << endl;
            result = -1;
        }
        else
        {
            result = x / y;
        }
    }
    break;
    case '%':
    {
        if (y == 0)
        {
            cout << "mod zero error" << endl;
            result = -1;
        }
        else
        {
            result = x % y;
        }
    }
    break;
    default:
        break;
    }
    return result;
}

class SaveTask
{
    typedef function<void(string)> func_t;
public:
    SaveTask(){}

    SaveTask(string messages,func_t func):_messages(messages),_func(func)
    {}

    void operator()()
    {
        _func(_messages);
    }

private:
    string _messages;
    func_t _func;
};

void Save(string messages)
{
    string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(!fp)
    {
        cout<<"fopen error"<<endl;
        return;
    }
    fprintf(fp,"%s\n",messages.c_str());
    fclose(fp);
}
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
#include"Task.hpp"

//C 计算
//S 存储
template<class C,class S>
class BlockQueues
{
public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;
};

void* productor(void* args)
{
    BlockQueue<CallTask>* _c_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->c_bq;

    while(true)
    {
        //生产活动
        int x=rand()%10+1;
        int y=rand()%5;
        char op=oper[rand()%oper.size()];
        CallTask t(x,y,op,mymath);
        _c_bq->push(t);
        cout<<"productor thread, 生产计算任务: "<<t.toTaskString()<<endl; 

        sleep(1);//生产的慢一些
    }

}

void* consumer(void* args)
{
    BlockQueue<CallTask>* _c_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->c_bq;
    BlockQueue<SaveTask>* _s_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->s_bq;


    while(true)
    {
        CallTask t;
        _c_bq->pop(&t);
        cout<< "cal thread, 完成计算任务: "<<t()<<endl;

        SaveTask s(t(),Save);
        _s_bq->push(s);
        cout<< "cal thread, 推送存储任务完成..." <<t()<<endl;

        //sleep(1);//消费的慢一些
    }

}

void* saver(void* args)
{
    BlockQueue<SaveTask>* _s_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->s_bq;

    while(true)
    {
        SaveTask s;
        _s_bq->pop(&s);
        s();
        cout<<"save thread, 保存任务完成..."<<endl;
    }
}

int main()
{
    //随机数种子
    srand((unsigned int)time(nullptr));

    //写一个类把两个阻塞队列都传过去
    BlockQueues<CallTask,SaveTask>* bq=new BlockQueues<CallTask,SaveTask>();
    bq->c_bq=new BlockQueue<CallTask>();
    bq->s_bq=new BlockQueue<SaveTask>();


    pthread_t p,c,s;
    pthread_create(&p,nullptr,productor,bq);
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&s,nullptr,saver,bq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    pthread_join(s,nullptr);

    delete bq->c_bq;
    delete bq->s_bq;
    return 0;
}

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
到目前为止,我们写的都是一个生产者一个消费者,我想写多生产者多消费者直接把我们的代码改成多个生产者多个消费者可以吗?

#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
#include"Task.hpp"

void* productor(void* args)
{
    BlockQueue<CallTask>* _c_bq=static_cast<BlockQueue<CallTask>*>(args);


    while(true)
    {
        //生产活动
        int x=rand()%10+1;
        int y=rand()%5;
        char op=oper[rand()%oper.size()];
        CallTask t(x,y,op,mymath);
        _c_bq->push(t);
        cout<<"productor thread, 生产计算任务: "<<t.toTaskString()<<endl; 

        sleep(1);//生产的慢一些
    }

}

void* consumer(void* args)
{
    BlockQueue<CallTask>* _c_bq=static_cast<BlockQueue<CallTask>*>(args);



    while(true)
    {
        //消费活动
        CallTask t;
        _c_bq->pop(&t);
        cout<< "cal thread, 完成计算任务: "<<t()<<endl;
    }

}

int main()
{
    //随机数种子
    srand((unsigned int)time(nullptr));

    BlockQueue<CallTask>* bq=new BlockQueue<CallTask>();


    pthread_t p[3],c[2];
    for(int i=0;i<3;++i)
    {
        pthread_create(p+i,nullptr,productor,bq);
    }
    
    for(int i=0;i<2;++i)
    {
        pthread_create(c+i,nullptr,consumer,bq);
    }

    for(int i=0;i<3;++i)
    {
        pthread_join(p[i],nullptr);
    }
    
    for(int i=0;i<2;++i)
    {
        pthread_join(c[i],nullptr);
    }

    return 0;
}

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
当然可以,不管是多个生产者还是消费者进入生产和消费的代码之前都必须先加锁,都要先去竞争锁,只有持有锁的才能进入阻塞队列当中,阻塞队列中同时永远只有一个线程再跑!

接下来我们的问题就变成了,在一个阻塞队列中有多个生产者,有多个消费者,但是因为锁的存在,所以任何一个时刻只有一个执行流在队列中放或者拿。
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
1.那你创建多线程生产和消费的意义是什么??
2.生产消费模型高效在哪里?

【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
对于生产者它构建任务从那来的?
今天我们写的代码下x,y是为了测试随便写的,可是未来这个x,y大概率是从数据库、网络、外设拿来的用户数据,然后构建任务push到任务队列中,生产者线程构建任务是要花费时间的!!

对于消费者线程来说,把任务pop从任务队列中拿出来,今天我们的加减乘除很简单,有没有可能未来后序我们处理任务非常耗时!!

把任务拿出来多个消费者线程的高效体现在一个线程从任务队列中拿出任务之后做计算,那另一个线程也可以拿出然后继续在计算,然后再另一个线程拿在计算,所以高效并不是体现在从队列中拿数据高效,而是让一个或多个线程可以同时并发的计算多个任务,在计算的同时还不影响其他线程从任务队列中拿任务的过程。
同样的生产者线程获取任务的成本很高,你这个线程获取成功了你可以把任务放到任务队列中,但你这个线程在放任务的同时,其他线程可以同步的并发的从外部中拿任务。

所以生产者消费者模型高效在哪里,答案是生产者消费者模式并不高效在队列中拿放,而是在生产之前和消费之后,让线程并行执行!!
同样生产者消费者的意义也不再队列中,而是在放之前同时生产,拿之后同时消费。

接下来我们找找看我们的代码有没有不足的地方
【linux】线程同步+基于BlockingQueue的生产者消费者模型,Linux,linux,java,开发语言
1.一个线程,在操作临界资源的时候,必须临界资源是满足条件的!
2.可是,公共资源是否满足生产或者消费条件,我们无法直接得知(我们不能事先得知(在没有访问之前无法得知))
3.只能先加锁,再检测,再操作,再解锁
为什么要先加锁呢?因为你要检测的本质也是在访问临界资源!

总而言之就是,因为我们在操作临界资源的时候,有可能不就绪,但是我们无法提前得知,所以只能先加锁,在检测,根据检测结果,决定下一步怎么走!这是我们刚才写代码的逻辑。

那有没有一种方法在实际操作的时候就把对应的资源情况得知呢?
下一篇文章细说!文章来源地址https://www.toymoban.com/news/detail-778600.html

到了这里,关于【linux】线程同步+基于BlockingQueue的生产者消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 线程同步--生产者消费者模型--单例模式线程池

    条件变量是 线程间共享的全局变量 ,线程间可以通过条件变量进行同步控制 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数 进入条件变量等待队列(同时释放互斥锁) ,其他线程则可以通过条件变量在特定的条件下唤醒该线程( 唤醒后线

    2024年01月20日
    浏览(45)
  • 线程同步、生产者消费模型和POSIX信号量

    gitee仓库: 1.阻塞队列代码:https://gitee.com/WangZihao64/linux/tree/master/BlockQueue 2.环形队列代码:https://gitee.com/WangZihao64/linux/tree/master/ringqueue 概念 : 利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待\\\"条件变量的条件成立\\\"而挂起;另一个线程使“

    2024年02月03日
    浏览(47)
  • 【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

    死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能 导致数据不一致问题 ,为了避免此问题的发生

    2024年01月24日
    浏览(48)
  • 【Linux系统编程:线程】 线程控制 -- 创建、终止、等待、分离 | 线程互斥与同步 | 互斥量与条件变量 | 生产者消费者模型 | 线程池 | STL/智能指针与线程安全 | 读者写者模型

    写在前面 本文重点: 了解线程概念,理解线程与进程区别与联系。 学会线程控制,线程创建,线程终止,线程等待。 了解线程分离与线程安全。 学会线程同步。 学会使用互斥量,条件变量,posix 信号量,以及读写锁。 理解基于读写锁的读者写者问题。 一、线程概念 💦

    2024年02月04日
    浏览(70)
  • 12.3用信号量进行线程同步——生产者与消费者问题

    1.shell程序设计 2.内存管理 3.链接库 4.文件操作

    2024年02月04日
    浏览(40)
  • 【Linux学习】多线程——信号量 | 基于环形队列的生产者消费者模型 | 自旋锁 | 读写锁

    🐱作者:一只大喵咪1201 🐱专栏:《Linux学习》 🔥格言: 你只管努力,剩下的交给时间! 之前在学习进程间通信的时候,本喵简单的介绍过一下信号量,今天在这里进行详细的介绍。 这是之前写的基于阻塞队列的生产者消费者模型中向阻塞队列中push任务的代码。 上面代码

    2024年02月07日
    浏览(56)
  • 【Linux】线程安全-生产者消费者模型

    1个线程安全的队列:只要保证先进先出特性的数据结构都可以称为队列 这个队列要保证互斥(就是保证当前只有一个线程对队列进行操作,其他线程不可以同时来操作),还要保证同步,当生产者将队列中填充满了之后要通知消费者来进行消费,消费者消费之后通知生产者

    2024年02月10日
    浏览(46)
  • 线程池-手写线程池Linux C简单版本(生产者-消费者模型)

    本线程池采用C语言实现 线程池的场景: 当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池 线程池的一般特点: 线程池通常是一个生产者-消费者模型 生产者线程用于发布任务,任务通常保存在任务队列中 线程池作为消费者,

    2024年02月14日
    浏览(48)
  • 『Linux』第九讲:Linux多线程详解(四)_ 生产者消费者模型

    「前言」文章是关于Linux多线程方面的知识,上一篇是 Linux多线程详解(三),今天这篇是 Linux多线程详解(四),内容大致是生产消费者模型,讲解下面开始! 「归属专栏」Linux系统编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 「枫叶先生有点文青病」「每篇一句」

    2024年02月07日
    浏览(44)
  • Linux之【多线程】生产者与消费者模型&BlockQueue(阻塞队列)

    举个例子:学生要买东西,一般情况下都会直接联系厂商,因为买的商品不多,对于供货商来说交易成本太高,所以有了交易场所超市这个媒介的存在。目的就是为了集中需求,分发产品。 消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,厂商可以继续生产

    2024年02月02日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包