【关于Linux中----生产消费模型】

这篇具有很好参考价值的文章主要介绍了【关于Linux中----生产消费模型】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


一、生产消费模型

1.1概念的引入

众所周知,在多线程环境里,避免不了会出现多个执行流访问同一块共享资源的情况。但是,如果一个执行流长时间地持有锁,或者它抢占锁的能力更强,就很有可能导致其他执行流出现饥饿问题。所以,我们急切地需要一种模式来尽可能地平衡多个执行流访问公共资源的公平性,于是就出现了“生产消费模型”。

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

1.2 321原则

生产消费模型中的“交易场所”(阻塞队列)就是一块共享资源。既然是共享资源,就有可能同时被多个执行流访问,所以就必须想办法将其保护起来。

首先要知道生产者和生产者之间是互斥(竞争)关系,消费者和消费者之间也是如此,生产者和消费者之间j既有互斥关系(为了保证数据安全)又有同步关系(为了保证双方效率)。

以上内容可以总结为321原则

3种关系:生产者和生产者(互斥),消费者和消费者(互斥)生产者和消费者(互斥&&同步)
2中角色:生产者线程和消费者线程
1个交易场所:一段特定结构的缓冲区

所以,生产消费模型本质上就是维护好321原则,也就保证了共享资源的安全性和双方的效率。

由此也可以总结出生产消费模型特点:

  • 解耦----生产消费双方都通过共享资源提供和使用数据
  • 支持生产消费忙闲不均----共享资源具有缓存数据的能力
  • 提高效率

虽然共享资源具有缓存数据的能力,但是当生产者没有向缓冲区中写入数据时,消费者就只能等待数据;同样的道理,当缓冲区已经存满时,生产者就只能等消费者取走数据。所以,生产消费模型的所谓“提高效率”的特点到底体现在哪里?(这个问题下文中解释)


二、条件变量

2.1概念的引入

由于需要保证生产消费模型中个共享资源的安全,所以生产消费者之间一定有互斥关系,所以二者在访问共享资源时一定要持有锁,并且判断可不可以向其中写入或读取数据,然后再解锁。
所以,当生产者将数据放入缓冲区,直至缓冲区被填满,它下一次在向其中放入数据时就会先申请锁,再判断可不可以放入数据,不能放入数据,最后解锁离开。但是由于生产者申请锁的能力较强,就会一直先于消费者去访问共享资源,这就又导致了消费者的饥饿问题。
为了解决这个问题,就必须使用条件变量(一种数据类型):

【关于Linux中----生产消费模型】

跟互斥锁的接口很相像,使用前要先定义一个条件变量,再初始化,使用完之后释放条件变量,当然也可以直接将其定义为全局的。

下面再回到上面说的“导致消费者饥饿”的问题,我们可以加一个条件变量,让生产者和消费者都在满足条件的时候,才可以访问公共资源,这里就要用到一个接口:
【关于Linux中----生产消费模型】

这个接口的作用是,在不满足条件时,将某一线程挂起。

既然有挂起,当然需要另一个接口在线程满足条件时,将其唤醒:
【关于Linux中----生产消费模型】

2.2理解条件变量

还是使用上文中的例子,多个线程访问资源时,可能会出现一个线程频繁访问,而其他线程从未得到过访问机会的情况。
为了解决这个问题,需要设置一个条件变量,对访问资源的线程要做出限制,只有满足条件的线程才能访问资源。

也就是说,当条件不满足时,线程必须去某些定义好的条件变量上进行等待。

其中,当线程不满足条件时,会调用==pthread_cond_wait()这个接口,将自己放在该条件变量的等待队列中排队;当满足条件时,会调用pthread_cond_signal()==这个接口,将自己唤醒去访问资源。

2.3条件变量的使用

先来模拟一个场景:
两个线程进行抢票(不包含主线程),抢票过程暂时忽略,主线程每隔两秒唤醒一个条件变量下等待的线程。使用到的锁和条件变量直接使用默认初始值,并将其定义为全局变量。
那么,当程序运行起来时,一定会看到两个线程在抢票,并且一定会有先后顺序(虽然我们现在并不清楚谁先谁后),模拟代码如下:

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>

int tickets=1000;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

void* start_routine(void* args)
{
    std::string name=static_cast<const char*>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond,&mutex);
        std::cout<<name<<"->"<<tickets<<std::endl;
        tickets--;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    //通过条件变量来控制线程执行
    pthread_t t1,t2;
    pthread_create(&t1,nullptr,start_routine,(void*)"thread 1");
    pthread_create(&t2,nullptr,start_routine,(void*)"thread 2");

    while(true)
    {
        sleep(2);
        pthread_cond_signal(&cond);//唤醒该条件变量下等待的线程
        std::cout<<"main thread weakup one thread..."<<std::endl;
    }

    pthread_join(t1,nullptr);
     pthread_join(t2,nullptr);
    return 0;
}

Makefile内容如下:

Cond:Cond.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f Cond

运行结果如下:

[sny@VM-8-12-centos threadcon]$ ./Cond
main thread weakup one thread...thread 2->
1000
main thread weakup one thread...
thread 1->999
main thread weakup one thread...
thread 2->998
main thread weakup one thread...
thread 1->997
main thread weakup one thread...
thread 2->996
^C

可以看到,两个线程抢票时,是由明显的先后顺序的,因为两个线程在条件变量下是时刻在排队的。各位读者也可以复制粘贴代码自己运行一下试试。

上述唤醒线程是一个线程,当然上图中还有可以唤醒某一条件变量下的所有线程的接口,读者们可以自己试试。


三、基于BlockingQueue的生产者消费者模型

3.1BlockingQueue的介绍

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

【关于Linux中----生产消费模型】

3.2C++ queue模拟阻塞队列的生产消费模型

阻塞队列封装之后的代码如下:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>

static const int gmaxcap=5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap):_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        //1.判断
        if(is_full())
        {//生产条件不满足,无法生产,生产者进行等待
            pthread_cond_wait(&_pcond,&_mutex);
        }
        //2.这里一定没有满
        _q.push(in);
        //3.走到这里,阻塞队列里一定有数据了
        pthread_cond_signal(&_ccond);//唤醒消费者
        pthread_mutex_unlock(&_mutex);
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        //1.判断
        if(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        //2.这里一定不为空
        *out=_q.front();
        _q.pop();
        //3.走到这里,阻塞队列一定有位置了
        pthread_cond_signal(&_pcond);//唤醒生产者
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    std::queue<T> _q;
    int _maxcap;//队列最大容量
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;//消费者对应的条件变量
};

模拟生产消费场景额代码如下:

#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

void* consumer(void* bq_)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(bq_);
    while(true)
    {
        //消费
        int data;
        bq->pop(&data);
        std::cout<<"消费数据: "<<data<<std::endl;
    }
    return nullptr;
}

void* productor(void* bq_)
{
    BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(bq_);
    while(true)
    {
        //生产
        int data=rand()%10+1;//模拟生产的数据
        bq->push(data);
        std::cout<<"生产数据: "<<data<<std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<int>* bq=new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

Makefile内容如下:

MainCp:MainCp.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f MainCp

需要注意,在代码中,我们设置生产者每生产一个数据就休眠一秒,而对消费者不做限制。所以,如果最后的执行结果是“消费者每一次消费的数据都是生产者最后一次生产的数据”的话,就证明了二者的共享资源是一个阻塞队列!

运行结果如下:

[sny@VM-8-12-centos blockqueue]$ ./MainCp
生产数据: 10
消费数据: 10
生产数据: 6
消费数据: 6
生产数据: 1
消费数据: 1
^C

可见,结果和预测相同!

下面对代码中的几个细节做出解释:
①调用pthread_cond_wait()接口时,第二个参数一定是正在使用的互斥锁

这是因为,某一个线程因为不满足条件时,要将自己挂起等待。但是不能影响其他线程申请锁,所以,这个接口会以原子性的方式将锁释放,并将该线程挂起。
②pthread_cond_wait()这个函数在被唤醒返回的时候,会自动重新获取线程所传入的锁,以便可以继续向下执行。
③判断是否满足条件时,不应该用if,而应该用while。

因为当多个线程同时被唤醒,但最终只有一个线程可以访问资源时,就会存在异常或伪唤醒的情况,所以先后才能醒来之后,必须再判断一次是否满足条件。
所以,要对上面的代码稍作改动:

void push(const T& in)
{
    pthread_mutex_lock(&_mutex);
    //1.判断
    while(is_full())
    {//生产条件不满足,无法生产,生产者进行等待
        pthread_cond_wait(&_pcond,&_mutex);
    }
    //2.这里一定没有满
    _q.push(in);
    //3.走到这里,阻塞队列里一定有数据了
    pthread_cond_signal(&_ccond);//唤醒消费者
    pthread_mutex_unlock(&_mutex);
}

判空的时候亦是如此!

④pthread_cond_signal()可以放在临界资源内部,也可以放在外部,只要能线程被唤醒即可。

3.3对生产消费任务的模拟封装

生产者和消费者的任务是不确定的,可能是各种各样的“业务”,所以,两者的任务应该是各种类型的。
接下来,就模拟一个小小的任务(包括加减乘除取模四种运算),新增一个Task.hpp文件,内容如下:

#pragma once
#include <iostream>
#include <functional>
#include <cstdio>

class Task
{
    using func_t =std::function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func)
    {}
    std::string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    func_t _callback;
    char _op;
};

将MainCp中代码稍作修改,如下:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>

const std::string oper="+-*/%";

int mymath(int x,int y,char op)
{
    int result=0;
    switch(op)
    {
    case '+':
        result= x+y;
        break;
    case '-':
        result= x-y;
        break;
    case '*':
        result= x*y;
        break;
    case '/':
        if(y==0)
        {
            std::cerr<<"div zero error!"<<std::endl;
            result=-1;
        } 
        else
            result=x/y;
        break;
    case '%':
        if(y==0)
        {
            std::cerr<<"mod zero error!"<<std::endl;
            result=-1;
        } 
        else
            result=x%y;
        break;
    default:
        break;
    }
    return result;
}

void* consumer(void* bq_)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(bq_);
    while(true)
    {
        //消费
        Task t;
        bq->pop(&t);
        std::cout<<"消费任务: "<<t()<<std::endl;
    }
    return nullptr;
}

void* productor(void* bq_)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(bq_);
    while(true)
    {
        //生产
        int x=rand()%10+1;//模拟生产的数据
        int y=rand()%5;
        int operCode=rand()%oper.size();
        Task t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"生产任务: "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<Task>* bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

最终执行结果如下:

[sny@VM-8-12-centos blockqueue]$ ./MainCp
生产任务: 6 + 1 = ?
消费任务: 6 + 1 = 7
生产任务: 9 % 0 = ?
mod zero error!
消费任务: 9 % 0 = -1
生产任务: 1 % 1 = ?
消费任务: 1 % 1 = 0
^C

四、遗留问题

下面来回答一下上文中没有回答的问题
创建多个生产者和消费者的意义是什么?生产消费模型的高效体现在哪里?

首先,我们需要知道,对于生产者而言,它的任务可能来自于各个地方,包括数据库、网络等等,它获取任务和构建任务都是需要花费时间的

其次,对于消费者而言,它从任务队列中取出任务,后续还要执行任务,也是需要花费时间的。

而生产消费模型可以让消费者在执行任务的时候,生产者也在生产任务。也可以保证一个消费者或生产者在执行或生成任务时,其他的生产者或消费者也可以进行同样的操作。可以实现在生产之前,消费之后,让线程并发执行

以上就是生产消费模型的左右内容!


本篇完,青山不改,绿水长流!文章来源地址https://www.toymoban.com/news/detail-427073.html

到了这里,关于【关于Linux中----生产消费模型】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Linux】深入理解生产者消费者模型

    生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。 在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,同理如果消费

    2024年02月06日
    浏览(26)
  • 【Linux】多线程 --- 线程同步与互斥+生产消费模型

    人生总是那么痛苦吗?还是只有小时候是这样? —总是如此 1. 假设现在有一份共享资源tickets,如果我们想让多个线程都对这个资源进行操作,也就是tickets- -的操作,但下面两份代码分别出现了不同的结果,上面代码并没有出现问题,而下面代码却出现了票为负数的情况,这

    2024年02月06日
    浏览(36)
  • Linux基于多线程和任务队列实现生产消费模型

    目录 一、生产者消费者模型 二、代码实现模型 2.1 BlockQueue.hpp 2.2 MainCP.cc 2.3 执行结果 三、效率优势 将上述图片逻辑转换成代码逻辑就是,一批线程充当生产者角色,一批线程充当消费者角色,仓库是生产者和消费者获取的公共资源!下面我想用 321原则 来解释这个模型。 既

    2024年02月09日
    浏览(36)
  • Linux——生产者消费者模型和信号量

    目录 ​​​​​​​ 基于BlockingQueue的生产者消费者模型 概念 条件变量的第二个参数的作用  锁的作用 生产者消费者模型的高效性 生产者而言,向blockqueue里面放置任务 消费者而言,从blockqueue里面拿取任务: 总结 完整代码(不含存储数据的线程) 完整代码(含存储线程)  信

    2024年02月07日
    浏览(32)
  • Linux入门之多线程|线程的同步|生产消费模型

    文章目录 一、多线程的同步 1.概念 2.条件变量 2.1条件变量概念 2.2条件变量接口 1.条件变量初始化 2.等待条件满足 3.唤醒等待 3.销毁条件变量 2.3条件变量demo 二、生产消费模型 1.生产消费模型 2.基于BlockQueue的生产者消费者模型 3.基于C++用条件变量和互斥锁实现一个生产消费模

    2024年02月09日
    浏览(27)
  • 【linux】POSIX信号量+基于环形队列的生产消费模型

    喜欢的点赞,收藏,关注一下把! 上篇文章最后我们基于BlockQueue生产者消费者模型写了代码,测试什么的都通过了。最后我们说代码还有一些不足的地方,由这些不足从而引入了我们接下来要学的信号量! 我们在看一看不足的地方 1.一个线程,在操作临界资源的时候,必须

    2024年02月01日
    浏览(34)
  • 【Linux】多线程02 --- 线程的同步互斥问题及生产消费模型

    🍎 作者: 阿润菜菜 📖 专栏: Linux系统编程 线程同步互斥问题是指多线程程序中,如何保证共享资源的正确访问和线程间的协作。 因为线程互斥是实现线程同步的基础和前提,我们先讲解线程互斥问题。 在多线程中,假设我们有一个黄牛抢票的代码,其中有一份共享资源

    2024年02月08日
    浏览(31)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(89)
  • 【Linux】详解线程第三篇——线程同步和生产消费者模型

    本篇线程同步的内容是完全基于线程互斥来讲的,如果屏幕前的你对于线程互斥还不是很了解的话,可以看看我上一篇博客:【Linux】详解线程第二篇——用黄牛抢陈奕迅演唱会门票的例子来讲解【 线程互斥与锁 】 上篇线程互斥中重点讲了互斥锁,虽然解决了多线程并发导

    2024年02月07日
    浏览(29)
  • 【Linux】生产者消费者模型代码实现和信号量

    一定要先理解生产者消费者模型的原理~ 文章目录 一、生产者消费者模型实现代码 二、信号量 1.基于环形队列的生产者消费者模型 总结 下面我们实现基于阻塞队列的生产消费模型: 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其

    2024年02月11日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包