linux:生产者消费者模型

这篇具有很好参考价值的文章主要介绍了linux:生产者消费者模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

linux:生产者消费者模型,Linux,linux,服务器

个人主页 : 个人主页
个人专栏 : 《数据结构》 《C语言》《C++》《Linux》


前言

本文是对于生产者消费者模型的知识总结


一、生产者消费者模型

生产者消费者模型就是通过一个容器来解决生产者消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接交给容器,消费者不找生产者要数据,而是直接从容器中取数据,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,其中这个容器就是用于生产者和消费者之间解耦的
(强耦合是指两个或多个系统,组件或模块之间存在紧密依赖关系。)

linux:生产者消费者模型,Linux,linux,服务器


特点:

  1. 三种关系:生产者与生产者之间(互斥),消费者与消费者之间(互斥),生产者与消费者之间(互斥 && 同步)
  2. 两种角色:生产者和消费者
  3. 一个交易(通讯)场所:一个容器(一段内存空间)

因为我们是多个线程访问同一个容器,那必然会导致数据不一致的问题,所以我们需要对该临界资源加锁,所以生产者与生产者之间,消费者与消费者之间,生产者与消费者之间都是互斥的。
又因为容器可能为空(满),此时消费者(生产者)还一直在临界区申请锁,又因没有数据(空间)而释放锁,从而不断申请锁释放锁,导致生产者(消费者)的饥饿问题。此时我们就需要生产者与消费者之间的同步。

对于2,3两点,这很好理解不解释。
我们编写生产者消费者模型的本质就是对以上三点的维护。(互斥保证数据安全,同步保证效率)


优点:

  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

对于第一点,不就是生产者与消费者通过容器来解耦提升效率(如果没有这个容器,则生产者生产完数据,就必须等待消费者来接受处理数据,不能立刻继续生产数据)。
对于第二点,当生产者在生产数据时,消费者也同时在处理数据
对于第三点,当生产者生产数据的速度超过消费者的处理能力时,容器可以起到缓存的作用,将多余的数据暂时存储,等待消费者有空闲时再进行处理。如果消费者处理数据的能力超过生产者时,同理。

二、基于阻塞队列的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

  • 队列为空时,从队列中取数据将被阻塞,直到队列中有数据时被唤醒。
  • 队列为满时,向队列中放入数据将被阻塞,直到队列中有数据取出被唤醒。

代码实现

下面是一个单生成单消费模型
linux:生产者消费者模型,Linux,linux,服务器
LockGuard.hpp 文件 将加锁释放锁,交给一个对象处理,当对象创建加锁,对象销毁释放锁

#pragma once
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mutex):_mutex(mutex)
    {}

    void Lock()
    {
        pthread_mutex_lock(_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(_mutex);
    }

    ~Mutex()
    {}
private:
    pthread_mutex_t *_mutex;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex): _lock(mutex)
    {
        _lock.Lock();
    }

    ~LockGuard()
    {
        _lock.UnLock();
    }
private:
    Mutex _lock;
};

Blockqueue.hpp 文件

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"

using namespace std;
const int CAPACITY = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap = CAPACITY):_capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p, nullptr);
        pthread_cond_init(&_c, nullptr);
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

    void Push(const T &in)
    {
        LockGuard mutex(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(isFull())
        {
            pthread_cond_wait(&_p, &_mutex);
        }

        _bq.push(in);
        // 唤醒策略为 生产一个,消费一个
        pthread_cond_signal(&_c);

        //pthread_mutex_unlock(&_mutex);
    }

    bool isEmpty()
    {
        return _bq.size() == 0;
    }

    void Pop(T *out)
    {
        LockGuard mutex(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(isEmpty())
        {
            pthread_cond_wait(&_c, &_mutex);
        }

        *out = _bq.front();
        _bq.pop();
        // 唤醒策略为 消费一个,生产一个
        pthread_cond_signal(&_p);

        //pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p);
        pthread_cond_destroy(&_c);
    }
private:
    queue<T> _bq;
    int _capacity;

    pthread_mutex_t _mutex;
    pthread_cond_t _p;
    pthread_cond_t _c;
};

Task.hpp 文件

#pragma once
#include <string>

const char *opers = "+-*/%";

enum
{
    ok = 0,
    div_zero,
    mod_zero
};

class Task
{
public:
    Task()
    {}

    Task(int x, int y, char op) : _data_x(x), _data_y(y), _oper(op)
    {
        _code = ok;
    }

    void Run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data_x + _data_y;
            break;
        case '-':
            _result = _data_x - _data_y;
            break;
        case '*':
            _result = _data_x * _data_y;
            break;
        case '/':
            {
                if(_data_y == 0)
                {
                    _code = div_zero;
                }
                else
                {
                    _result = _data_x / _data_y;
                }
            }
            break;
        case '%':
            {
                if(_data_y == 0)
                {
                    _code = mod_zero;
                }
                else
                {
                    _result = _data_x % _data_y;
                }
            }
            break;
        default:
            break;
        }
    }

    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string ret = std::to_string(_data_x);
        ret += _oper;
        ret += std::to_string(_data_y);

        ret += "=?";
        return ret;
    }

    std::string PrintResult()
    {
        std::string ret = std::to_string(_data_x);
        ret += _oper;
        ret += std::to_string(_data_y);

        ret += "=";
        if(_code == ok)
        {
            ret += std::to_string(_result);
        }
        else
        {
            ret += "?";
        }

        ret += "[";
        ret += std::to_string(_code);
        ret += "]";

        return ret;
    }

    ~Task()
    {}

private:
    int _data_x;
    int _data_y;
    char _oper;

    int _result;
    int _code; // 错误码
};

Main.cc 文件

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;


void *producer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 产生任务
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        char oper = opers[rand() % strlen(opers)];

        Task task(x, y, oper);
        cout << "producer: " << task.PrintTask() << endl;
        bq->Push(task);
        sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
        if (bq->isFull())
        {
            Task task;
            bq->Pop(&task);

            task();
            cout << "consumer: " << task.PrintResult() << endl;
            //sleep(1);
        }
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p;
    pthread_create(&p, nullptr, producer, (void *)&bq);

    pthread_t c;
    pthread_create(&c, nullptr, consumer, (void *)&bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

linux:生产者消费者模型,Linux,linux,服务器
linux:生产者消费者模型,Linux,linux,服务器

那如何将这个单生产单消费该为多生产多消费呢?因为多生产多消费本质也是多个线程访问临界资源,那我们单生产和单消费不也是多个线程访问临界资源吗,所以我们不需要对BlockQueue.hpp文件进行修改,只需要在main函数中,创建多个生产者和消费者即可。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;

template <class T>
class ThreadData
{
public:
    ThreadData(pthread_t tid, const string threadname, BlockQueue<T> *bq)
        : _tid(tid), _threadname(threadname), _bq(bq)
    {}

public:
    pthread_t _tid;
    string _threadname;
    BlockQueue<T>* _bq;
};

void *producer(void *args)
{
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 产生任务
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        char oper = opers[rand() % strlen(opers)];

        Task task(x, y, oper);
        cout << data->_tid << ", " << data->_threadname <<": " << task.PrintTask() << endl;
        data->_bq->Push(task);
        sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
        if (data->_bq->isFull())
        {
            Task task;
            data->_bq->Pop(&task);

            task();
            cout << data->_tid << ", " << data->_threadname << ": " << task.PrintResult() << endl;
            // sleep(1);
        }
    }
}



int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p1;
    ThreadData<Task> data1(p1, "product-1", &bq);
    pthread_create(&p1, nullptr, producer, (void *)&data1);

    pthread_t p2;
    ThreadData<Task> data2(p2, "product-2", &bq);
    pthread_create(&p2, nullptr, producer, (void *)&data2);

    pthread_t c1;
    ThreadData<Task> data3(c1, "consumer-1", &bq);
    pthread_create(&c1, nullptr, consumer, (void *)&data3);
    pthread_t c2;
    ThreadData<Task> data4(c2, "consumer-2", &bq);
    pthread_create(&c2, nullptr, consumer, (void *)&data4);

    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);

    return 0;
}

linux:生产者消费者模型,Linux,linux,服务器
linux:生产者消费者模型,Linux,linux,服务器


总结

以上就是我对于线程同步的总结。

linux:生产者消费者模型,Linux,linux,服务器文章来源地址https://www.toymoban.com/news/detail-852542.html

到了这里,关于linux:生产者消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Linux】生产者消费者模型代码实现和信号量

    一定要先理解生产者消费者模型的原理~ 文章目录 一、生产者消费者模型实现代码 二、信号量 1.基于环形队列的生产者消费者模型 总结 下面我们实现基于阻塞队列的生产消费模型: 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其

    2024年02月11日
    浏览(29)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(88)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(29)
  • Linux之信号量 | 消费者生产者模型的循环队列

    目录 一、信号量 1、概念 2、信号量操作函数 二、基于环形队列的生产者消费者模型 1、模型分析 2、代码实现 1、单生产单消费的生产者消费者模型 2、多生产多消费的生产者消费者模型 引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临

    2024年04月17日
    浏览(29)
  • 『Linux』第九讲:Linux多线程详解(四)_ 生产者消费者模型

    「前言」文章是关于Linux多线程方面的知识,上一篇是 Linux多线程详解(三),今天这篇是 Linux多线程详解(四),内容大致是生产消费者模型,讲解下面开始! 「归属专栏」Linux系统编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 「枫叶先生有点文青病」「每篇一句」

    2024年02月07日
    浏览(33)
  • 【Linux】线程同步 -- 条件变量 | 生产者消费者模型 | 自旋锁 |读写锁

    举一个例子: 学生去超市消费的时候,与厂家生产的时候,两者互不相冲突。 生产的过程与消费的过程 – 解耦 临时的保存产品的场所(超时) – 缓冲区 模型总结“321”原则: 3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥[保证共享资

    2024年02月14日
    浏览(27)
  • 《Linux从练气到飞升》No.29 生产者消费者模型

    🕺作者: 主页 我的专栏 C语言从0到1 探秘C++ 数据结构从0到1 探秘Linux 菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇 码字不易,你的👍点赞🙌收藏❤️关注对我真的很重要,有问题可在评论区提出,感谢阅读!!!

    2024年02月05日
    浏览(35)
  • Linux之【多线程】生产者与消费者模型&BlockQueue(阻塞队列)

    举个例子:学生要买东西,一般情况下都会直接联系厂商,因为买的商品不多,对于供货商来说交易成本太高,所以有了交易场所超市这个媒介的存在。目的就是为了集中需求,分发产品。 消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,厂商可以继续生产

    2024年02月02日
    浏览(27)
  • 线程池-手写线程池Linux C简单版本(生产者-消费者模型)

    本线程池采用C语言实现 线程池的场景: 当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池 线程池的一般特点: 线程池通常是一个生产者-消费者模型 生产者线程用于发布任务,任务通常保存在任务队列中 线程池作为消费者,

    2024年02月14日
    浏览(32)
  • 【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型

    ​🌠 作者:@阿亮joy. 🎆 专栏: 《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于

    2023年04月08日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包