【Linux】深入理解生产者消费者模型

这篇具有很好参考价值的文章主要介绍了【Linux】深入理解生产者消费者模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。

一、为何要使用生产者消费者模型


在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,同理如果消费者的速度大于生产者那么消费者就会经常处理等待状态,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

【Linux】深入理解生产者消费者模型,linux,linux,dubbo

二、生产者消费者模型的理解

1、生产者消费者模型的特点


  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。

  • 两种角色: 生产者和消费者。(通常由进程或线程承担)

  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种数据结构组织起来)

为什么生产者和消费者之间为什么会存在互斥且同步关系?

  • 互斥关系:由于所有的生产者和消费者之间都是访问的同一段缓冲区(临界资源),为了避免出现数据不一致性,我们需要对访问缓冲区的线程进行加锁,于是无论是生产者还是消费者都要竞争同一把锁,所以是互斥关系。

  • 同步关系: 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。为了让生产者和消费者线程之间协同起来就需要有同步关系!

2、生产者消费者模型的优点

  • 解耦 :假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

  • 支持并发 :生产者如果直接调用消费者的某个方法,还有另一个弊端就是由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白浪费时间。

    使用了生产者-消费者模式之后,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。

  • 支持忙闲不均:缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

四、基于BlockQueue的生产者消费者模型


在多线程编程中阻塞队列(Block Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
  • 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出

联系: 管道的实现其实就是依据阻塞队列实现的!

【Linux】深入理解生产者消费者模型,linux,linux,dubbo

1、C++实现阻塞队列

// 阻塞队列
#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

// 容量的默认值
const size_t g_cap = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(size_t cap = g_cap)
        :_cap(cap)
    {
    	// 对锁和条件变量进行初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_consumerCond, nullptr);
        pthread_cond_init(&_producerCond, nullptr);
    }

    // 插入数据
    void push(const T& data)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 这里使用while能够防止被误唤醒(例如消费者使用的是broadcast)
        while (_q.size() == _cap)
        {
            // 不满足生产条件,需要进行等待
            pthread_cond_wait(&_producerCond, &_mutex);
        }
        // 插入数据
        _q.push(data);
        // 唤醒队列中的第一个消费者
        pthread_cond_signal(&_consumerCond);
        pthread_mutex_unlock(&_mutex);
    }
    
    
    // 删除数据,out是一个输出型数据表示删除的值,如果不关心删除的值可以传nullptr
    void pop(T* out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 这里使用while能够防止被误唤醒(例如生成者使用的是broadcast)
        while (_q.empty())
        {
        	// 不满足消费条件,需要进行等待
            pthread_cond_wait(&_consumerCond, &_mutex);
        }
        if (out != nullptr)
        {
            *out = _q.front();
        }
        // 删除数据
        _q.pop();
        // 唤醒队列中的第一个生成者
        pthread_cond_signal(&_producerCond);
        pthread_mutex_unlock(&_mutex);
    }
    
    ~BlockQueue()
    {
    	// 对锁和条件变量进行销毁
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumerCond);
        pthread_cond_destroy(&_producerCond);
    }
private:
	// 队列
    std::queue<T> _q;
    // 容量
    size_t _cap;
    // 互斥锁
    pthread_mutex_t _mutex;
    // 生产者条件变量
    pthread_cond_t _consumerCond;
    // 消费者条件变量
    pthread_cond_t _producerCond;
};

2、一些注意事项

判断是否满足生产消费条件时不能用if,而应该用while

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行,有可能会导致产生的数据大于容量上限,或者队列为空还在消费。

  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。

为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。


为了测试这份代码,我们先让生产者生成的慢一些,让消费者消费的快一些,我们应该看到:生成者生成一个,消费者消费一个。

#include "BlockQueue.hpp"
#include <cstdlib>
#include <unistd.h>

using namespace std;

void* producer(void* args)
{
    BlockQueue<int>* pbq = static_cast<BlockQueue<int>*> (args);
    srand((unsigned int)time(nullptr));

    while (true)
    {
    	// 让生产者生产的慢一些
        sleep(1);
        // 1.获取数据
        int data = rand() % 100;
        
        // 2.将数据交给阻塞队列
        pbq->push(data);
        cout << "A data is generated, it is :" << data << endl;
    }
    return nullptr;
}

void* consumer(void* args)
{
    BlockQueue<int>* pbq = static_cast<BlockQueue<int>*> (args);

    while (true)
    {
        //sleep(1);  // 让消费者消费的慢一些
        // 1.从阻塞队列中获取数据
        int out;
        pbq->pop(&out);
        // 2.处理数据
        cout << "A data is consumed, it is :" << out << endl;
    }
    return nullptr;
}

int main()
{
    pthread_t tp, tc;
    BlockQueue<int> bq;
    pthread_create(&tp, nullptr, producer, &bq);
    pthread_create(&tc, nullptr, consumer, &bq);

    pthread_join(tp, nullptr);
    pthread_join(tc, nullptr);
    return 0;
}

可以看到结果符合我们的预期:

【Linux】深入理解生产者消费者模型,linux,linux,dubbo

接下来我们让生成者生成快一些,让消费者消费慢一些,我们应该看到生产者先把阻塞队列塞满,然后消费者消费一个,生产者生成一个。
(将上述代码的void* producer(void* args)函数中的sleep注释掉,将void* consumer(void* args)函数中的sleep注释去掉)

【Linux】深入理解生产者消费者模型,linux,linux,dubbo

五、基于环形队列的生产者消费者模型

1、信号量的原理

  • 我们知道一把互斥锁只能对一份临界资源进行保护,当我们对加锁的资源使用时相当于将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。

  • 实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流进行同时访问,此时并不会出现数据不一致等问题。

  • 信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。

  • 每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。

  • 信号量的工作机制类似于我们看电影买票,是一种资源的预订机制!不管线程对这份资源是用还是不用,这份资源一定是有的!如果申请信号量失败,则线程会被挂起等待,直到有资源可以使用才会自动被唤醒。

  • 如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。


信号量的PV操作:

P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。

V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。

此外,PV操作是原子操作,只有这样才能保证信号量的线程安全


2、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

使用下面的函数需要包含头文件 : #include <semaphore.h>,并链接库-lpthread

初始化信号量函数

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数说明:

  • sem:需要初始化的信号量。
  • pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
  • value:计数器的初始值。

返回值说明:

  • 初始化信号量成功返回0,失败返回-1。

销毁信号量

int sem_destroy(sem_t *sem);

参数说明:

  • sem:需要销毁的信号量。

返回值说明:

  • 销毁信号量成功返回0,失败返回-1。

等待信号量
功能:等待信号量,会将信号量的值减1

int sem_wait(sem_t *sem); 

参数说明:

  • sem:需要等待的信号量。

返回值说明:

  • 等待信号量成功返回0,信号量的值减1。
  • 等待信号量失败返回-1,信号量的值保持不变。

发布信号量

int sem_post(sem_t *sem);

参数说明:

  • sem:需要发布的信号量。

返回值说明:

  • 发布信号量成功返回0,信号量的值加一。
  • 发布信号量失败返回-1,信号量的值保持不变。

3、基于环形队列的生产消费模型

在阻塞队列中,我们将队列作为整体使用,生产者和消费者在同一时刻只能有一个人进行访问,但是在环形队列里面我们可以发现,生产者和消费者关心的内容是不一样的!

【Linux】深入理解生产者消费者模型,linux,linux,dubbo

  • 生产者关心空间,消费者关心的是数据,环形队列只要生产者和消费者访问不同的区域,生产和消费行为可以同时并发进行。

那么它们什么时候会访问同一块区域呢?

  1. 刚开始时,数据为空,空间为满,生产者和消费者指向同一个位置,存在竞争关系,这时我们应该让生产者先运行!
    【Linux】深入理解生产者消费者模型,linux,linux,dubbo

  2. 数据为满,空间为空,生产者和消费者指向同一个位置,存在竞争关系,这时我们应该让消费者先运行!

【Linux】深入理解生产者消费者模型,linux,linux,dubbo

4、代码实现

#include <vector>
#include <semaphore.h>
#include <pthread.h>

template<class T>
class RingQueue
{
public:
	//	构造函数设置默认环形队列的大小是5 
    RingQueue(int cap = 5)
        :_cap(cap),  _ring(cap),_c_step(0), _p_step(0)
    {
    	// 刚开始时,数据信号量为0
        sem_init(&_data_sem, 0, 0);
        // 刚开始时,空间信号量为cap
        sem_init(&_space_sem, 0, cap);
        pthread_mutex_init(&_c_step_mtx, nullptr);
        pthread_mutex_init(&_p_step_mtx, nullptr);
    }
    
    // 插入数据
    void push(const T& data)
    {
        // 生产者申请空间资源,P操作
        sem_wait(&_space_sem);
        // 信号量申请成功,必定有资源可以使用,具体是哪一个资源由程序员分配资源
        //_p_step是临界资源
        pthread_mutex_lock(&_p_step_mtx);
        _ring[_p_step++] = data;
        // 防止越界
        _p_step %= _cap;
        pthread_mutex_unlock(&_p_step_mtx);
        // 释放对方关心的信号量,增加了一个数据,V操作
        sem_post(&_data_sem);
    }

    void pop(T* out)
    {
    	//  消费者者申请数据资源,P操作
        sem_wait(&_data_sem);
        
        pthread_mutex_lock(&_c_step_mtx);
        *out = _ring[_c_step++];
        _c_step %= _cap;
        pthread_mutex_unlock(&_c_step_mtx);
        
		// 释放对方关心的信号量,增加了一个空间,V操作
        sem_post(&_space_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);
    }
private:
    int _cap;               // 容量
    std::vector<T> _ring;   // 容器
    sem_t _data_sem;        // 数据信号量
    sem_t _space_sem;       // 空间信号量
    size_t _c_step;         // 消费者位置
    size_t _p_step;         // 生产者位置

    pthread_mutex_t _c_step_mtx; // _c_step对应的锁
    pthread_mutex_t _p_step_mtx; // _p_step对应的锁
};

相关说明

  • 生产者 / 消费者每次生产数据后_p_step / _c_step都会进行++,标记下一次生产/消费数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。

  • 尽管我们已经通过信号量保证了生产者和消费者大部分情况下在该环形队列可以让生产者和消费者并发的执行,但是由于生产者和生产者,消费者和消费者存在竞争关系,所以我们还需要两把锁, _c_step_mtx, _p_step_mtx来保证它们之间的竞争关系。文章来源地址https://www.toymoban.com/news/detail-739135.html

到了这里,关于【Linux】深入理解生产者消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Linux——生产者消费者模型和信号量

    目录 ​​​​​​​ 基于BlockingQueue的生产者消费者模型 概念 条件变量的第二个参数的作用  锁的作用 生产者消费者模型的高效性 生产者而言,向blockqueue里面放置任务 消费者而言,从blockqueue里面拿取任务: 总结 完整代码(不含存储数据的线程) 完整代码(含存储线程)  信

    2024年02月07日
    浏览(40)
  • Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(46)
  • 【Linux】cp问题,生产者消费者问题代码实现

    生产者消费者模式就是通过一个容器 来解决生产者和消费者的强耦合问题 。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞

    2024年02月04日
    浏览(35)
  • Linux操作系统实验:生产者和消费者问题

    “生产者消费者”问题是一个著名的同时性编程问题的集合。通过编写经典的”生产者消费者”问题的实验,读者可以进一步熟悉 Linux 中多线程编程,并且掌握用信号量处理线程间的同步互斥问题。 VMware Workstation Pro “生产者消费者”问题描述如下。 有一个有限缓冲区和两

    2024年02月06日
    浏览(55)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(98)
  • 【Linux】生产者消费者模型代码实现和信号量

    一定要先理解生产者消费者模型的原理~ 文章目录 一、生产者消费者模型实现代码 二、信号量 1.基于环形队列的生产者消费者模型 总结 下面我们实现基于阻塞队列的生产消费模型: 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其

    2024年02月11日
    浏览(38)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(43)
  • Linux之信号量 | 消费者生产者模型的循环队列

    目录 一、信号量 1、概念 2、信号量操作函数 二、基于环形队列的生产者消费者模型 1、模型分析 2、代码实现 1、单生产单消费的生产者消费者模型 2、多生产多消费的生产者消费者模型 引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临

    2024年04月17日
    浏览(38)
  • 『Linux』第九讲:Linux多线程详解(四)_ 生产者消费者模型

    「前言」文章是关于Linux多线程方面的知识,上一篇是 Linux多线程详解(三),今天这篇是 Linux多线程详解(四),内容大致是生产消费者模型,讲解下面开始! 「归属专栏」Linux系统编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 「枫叶先生有点文青病」「每篇一句」

    2024年02月07日
    浏览(43)
  • 【Linux】线程同步 -- 条件变量 | 生产者消费者模型 | 自旋锁 |读写锁

    举一个例子: 学生去超市消费的时候,与厂家生产的时候,两者互不相冲突。 生产的过程与消费的过程 – 解耦 临时的保存产品的场所(超时) – 缓冲区 模型总结“321”原则: 3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥[保证共享资

    2024年02月14日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包