Linux——生产者消费者模型

这篇具有很好参考价值的文章主要介绍了Linux——生产者消费者模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一.为何要使用生产者消费者模型

 二.生产者消费者模型优点

 三.基于BlockingQueue的生产者消费者模型

1.BlockingQueue——阻塞队列

2.实现代码

 四.POSIX信号量

五.基于环形队列的生产消费模型


Linux——生产者消费者模型,linux,linux,运维,服务器

一.为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

 Linux——生产者消费者模型,linux,linux,运维,服务器

 二.生产者消费者模型优点

  • 解耦:生产者和消费者不直接解除,无需关心对方的情况,仅仅自己与缓冲区解除。
  • 支持并发:并发的体现并不在于多个消费者同时从缓冲区中拿数据,也不是多个生产者同时从缓冲区放数据,而是消费者在处理拿到的数据的时候,生产者可以继续向缓冲区放数据。
  • 支持忙闲不均 :当生产者生产过快的时候,可以让生产者慢下来,当消费者消费过快的时候,可以让消费者慢下来。

 三.基于BlockingQueue的生产者消费者模型

 1.BlockingQueue——阻塞队列

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

Linux——生产者消费者模型,linux,linux,运维,服务器

2.实现代码

#include <iostream>
#include <string>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <pthread.h>

using namespace std;

template <class T>
class BlockQueue
{
public:
    BlockQueue(size_t cap)
        : _cap(cap)
    {
        // 初始化条件变量
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }

    void push(T date)
    {
        // 将任务push进去队列,多线程加锁,每一只能一个线程push任务
        pthread_mutex_lock(&_mutex);
        while (_q.size() == _cap) // 如果队列已经满了,生产者要被阻塞
        {
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        _q.push(date);
        // 当push任务成功的时候,需要将唤醒消费者来处理数据
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

    T pop()
    {
        // 将任务从队列中拿出来,多线程加锁,每一只能一个线程拿任务
        pthread_mutex_lock(&_mutex);
        // 如果队列是空的就将消费者阻塞
        while (isempty())
        {
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        T tmp = _q.front();
        _q.pop();
        // 成功拿到数据以后,唤醒生产者继续生产任务
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);

        return tmp;
    }

    ~BlockQueue()
    {
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }

private:
    bool isempty()
    {
        return _q.empty();
    }
    bool isfull()
    {
        return _q.size() == _cap;
    }

private:
    queue<T> _q; // 队列
    size_t _cap; // 容量

    pthread_cond_t _c_cond;                             // 消费者条件变量
    pthread_cond_t _p_cond;                             // 生产者条件变量
    pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
};

cp模型:

#include "BlockQueue.hpp"

using namespace std;

// 构建任务
struct Task
{
    Task(int a, int b, char op)
        : _a(a), _b(b), _op(op)
    {
    }

    char _op;      // 运算符
    int _a;        // 运算数1
    int _b;        // 运算数2
    int ret;       // 结果
    int _exitcode; // 退出码
};

void *push_task(void *args)
{
    BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);
    while (1)
    {
        char op_arr[4] = {'+', '-', '*', '/'};
        int a = rand() % 10;
        int b = rand() % 10;
        char op = op_arr[(a * b) % 4];
        // 构建任务结构体
        Task tk(a, b, op);
        // push任务
        pBQ->push(tk);
        printf("%d %c %d =?\n", a, op, b);
        sleep(1);
    }

    return NULL;
}

void *get_task(void *args)
{
    BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);

    while (1)
    {
        // 获取任务并处理
        Task tk = pBQ->pop();
        switch (tk._op)
        {
        case '+':
            tk.ret = tk._a + tk._b;
            break;
        case '-':
            tk.ret = tk._a - tk._b;
            break;
        case '*':
            tk.ret = tk._a * tk._b;
            break;
        case '/':
            if (tk._b == 0)
            {
                exit(-1);
            }
            tk.ret = tk._a / tk._b;
            break;
        default:
            break;
        }
        printf("%d %c %d = %d\n", tk._a, tk._op, tk._b, tk.ret);
        sleep(1);
    }

    return NULL;
}

int main()
{
    BlockQueue<Task> BQ(5);
    pthread_t tid_c[4];
    pthread_t tid_p[4];
    srand(time(nullptr));
    // push
    pthread_create(&tid_c[0], NULL, push_task, &BQ);
    pthread_create(&tid_c[1], NULL, push_task, &BQ);
    pthread_create(&tid_c[2], NULL, push_task, &BQ);
    pthread_create(&tid_c[3], NULL, push_task, &BQ);
    // get
    pthread_create(&tid_p[0], NULL, get_task, &BQ);
    pthread_create(&tid_p[1], NULL, get_task, &BQ);
    pthread_create(&tid_p[2], NULL, get_task, &BQ);
    pthread_create(&tid_p[3], NULL, get_task, &BQ);

    pthread_join(tid_c[0], NULL);
    pthread_join(tid_c[1], NULL);
    pthread_join(tid_c[2], NULL);
    pthread_join(tid_c[3], NULL);
    pthread_join(tid_p[0], NULL);
    pthread_join(tid_p[1], NULL);
    pthread_join(tid_p[2], NULL);
    pthread_join(tid_p[3], NULL);

    return 0;
}

测试结果:

Linux——生产者消费者模型,linux,linux,运维,服务器

 四.POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

定义信号量:

sem_t sem;

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  1. pshared:0表示线程间共享,非零表示进程间共享。
  2. value:信号量初始值。

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1。

int sem_wait(sem_t *sem); //P()

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

int sem_post(sem_t *sem);//V()

说明:

  • 信号量本身就是一个计数器,用来描述可用资源的数目。
  • 信号量机制就像是我们看电影买票一样,是对资源的预定机制。
  • 只有申请到信号量才能对共享资源访问。
  • 只要我们申请信号量成功了,将来我们一定可以访问临界资源,就像看电影,我们只要买到了电影票,不管我们去不去电影院,电影院里一定有我们的位置。

五.基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性。

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

代码:

RingQueue.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include "mutex.hpp"
#include "Task.hpp"
using namespace std;

const size_t size = 5;

template <class T>
class RingQueue
{
    void P(sem_t &sem) // 申请信号量
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem) // 释放信号量
    {
        sem_post(&sem);
    }

public:
    RingQueue(int cap = size)
        : _cap(cap), _index_space(0), _index_date(0)
    {
        // 初始化信号量
        sem_init(&_sem_date, 0, 0);    // 数据信号量初始化为0
        sem_init(&_sem_space, 0, cap); // 空间信号量初始化为容量大小
        // 初始化锁
        pthread_mutex_init(&_mutex, nullptr);
        _rq.resize(_cap);
    }

    void push(const T date)
    {
        // 申请空间信号量
        P(_sem_space);
        {
            MutexGuard lock(&_mutex);
            _rq[_index_date++] = date;
            _index_date %= _cap;
        }
        // 释放数据信号量
        V(_sem_date);
    }

    T pop()
    {
        // 申请数据信号量
        P(_sem_date);
        T tmp;
        {
            MutexGuard lock(&_mutex);
            tmp = _rq[_index_space++];
            _index_space %= _cap;
        }
        // 释放空间信号量
        V(_sem_space);
        return tmp;
    }

    ~RingQueue()
    {
        // 释放信号量和互斥锁
        sem_destroy(&_sem_date);
        sem_destroy(&_sem_space);
        pthread_mutex_destroy(&_mutex);
    }

private:
    vector<T> _rq;
    size_t _cap; // 容量

    sem_t _sem_space; // 记录环形队列的空间信号量
    sem_t _sem_date;  // 记录环形队列的数据信号量

    size_t _index_space; // 生产者的生产位置
    size_t _index_date;  // 消费者的消费位置

    pthread_mutex_t _mutex; // 容量
};

mutex.hpp:

class Mutex
{
public:
    Mutex(pthread_mutex_t *mutex)
        : _mutex(mutex)
    {
    }

    void lock()
    {
        pthread_mutex_lock(_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_mutex);
    }
    ~Mutex()
    {
    }

private:
    pthread_mutex_t *_mutex;
};

class MutexGuard
{
public:
    MutexGuard(pthread_mutex_t *mutex)
        : _mutex(mutex)
    {
        _mutex.lock();
    }
    ~MutexGuard()
    {
        _mutex.unlock();
    }

private:
    Mutex _mutex;
};

Task.hpp:

#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>

struct Task
{
    Task(int a = 1, int b = 1, char op = '+')
        : _a(a), _b(b), _op(op)
    {
    }

    void run()
    {
        switch (_op)
        {
        case '+':
            _ret = _a + _b;
            break;
        case '-':
            _ret = _a - _b;
            break;
        case '*':
            _ret = _a * _b;
            break;
        case '/':
            if (_b == 0)
            {
                _exitcode = -1;
                exit(1);
            }
            _ret = _a / _b;
            break;
        default:
            break;
        }
    }

    void showtask()
    {
        printf("producer:%d %c %d = ?\n", _a, _op, _b);
    }

    void showresult()
    {
        printf("consumer:%d %c %d = %d(exitcode:%d)\n", _a, _op, _b, _ret, _exitcode);
    }

    ~Task() {}

private:
    int _a;
    int _b;
    char _op;
    int _ret;

    int _exitcode = 0;
};

pthread.cc:文章来源地址https://www.toymoban.com/news/detail-713361.html

#include "RingQueue.hpp"

void *run_p(void *args)
{
    char ops[4] = {'+', '-', '*', '/'};
    RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);
    while (1)
    {
        int a = rand() % 100;
        int b = rand() % 100;
        int op = ops[(a * b) % 4];
        Task tk(a, b, op);

        RQ->push(tk);
        tk.showtask();
        sleep(1);
    }
}
void *run_c(void *args)
{
    RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);
    while (1)
    {
        Task tk = RQ->pop();
        tk.run();
        tk.showresult();
        sleep(1);
    }
}

int main()
{
    RingQueue<Task> *RQ = new RingQueue<Task>(5);
    srand(time(0));
    pthread_t tid_c[5];
    pthread_t tid_p[5];

    for (int i = 0; i < 5; i++)
    {
        pthread_create(&tid_c[i], nullptr, run_c, RQ);
        pthread_create(&tid_p[i], nullptr, run_p, RQ);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(tid_c[i], nullptr);
        pthread_join(tid_p[i], nullptr);
    }

    delete RQ;

    return 0;
}

到了这里,关于Linux——生产者消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Linux】生产者消费者模型代码实现和信号量

    一定要先理解生产者消费者模型的原理~ 文章目录 一、生产者消费者模型实现代码 二、信号量 1.基于环形队列的生产者消费者模型 总结 下面我们实现基于阻塞队列的生产消费模型: 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其

    2024年02月11日
    浏览(29)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(88)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(29)
  • Linux之信号量 | 消费者生产者模型的循环队列

    目录 一、信号量 1、概念 2、信号量操作函数 二、基于环形队列的生产者消费者模型 1、模型分析 2、代码实现 1、单生产单消费的生产者消费者模型 2、多生产多消费的生产者消费者模型 引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临

    2024年04月17日
    浏览(29)
  • 『Linux』第九讲:Linux多线程详解(四)_ 生产者消费者模型

    「前言」文章是关于Linux多线程方面的知识,上一篇是 Linux多线程详解(三),今天这篇是 Linux多线程详解(四),内容大致是生产消费者模型,讲解下面开始! 「归属专栏」Linux系统编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 「枫叶先生有点文青病」「每篇一句」

    2024年02月07日
    浏览(33)
  • 【Linux】线程同步 -- 条件变量 | 生产者消费者模型 | 自旋锁 |读写锁

    举一个例子: 学生去超市消费的时候,与厂家生产的时候,两者互不相冲突。 生产的过程与消费的过程 – 解耦 临时的保存产品的场所(超时) – 缓冲区 模型总结“321”原则: 3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥[保证共享资

    2024年02月14日
    浏览(27)
  • 《Linux从练气到飞升》No.29 生产者消费者模型

    🕺作者: 主页 我的专栏 C语言从0到1 探秘C++ 数据结构从0到1 探秘Linux 菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇 码字不易,你的👍点赞🙌收藏❤️关注对我真的很重要,有问题可在评论区提出,感谢阅读!!!

    2024年02月05日
    浏览(35)
  • Linux之【多线程】生产者与消费者模型&BlockQueue(阻塞队列)

    举个例子:学生要买东西,一般情况下都会直接联系厂商,因为买的商品不多,对于供货商来说交易成本太高,所以有了交易场所超市这个媒介的存在。目的就是为了集中需求,分发产品。 消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,厂商可以继续生产

    2024年02月02日
    浏览(27)
  • 线程池-手写线程池Linux C简单版本(生产者-消费者模型)

    本线程池采用C语言实现 线程池的场景: 当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池 线程池的一般特点: 线程池通常是一个生产者-消费者模型 生产者线程用于发布任务,任务通常保存在任务队列中 线程池作为消费者,

    2024年02月14日
    浏览(32)
  • 【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型

    ​🌠 作者:@阿亮joy. 🎆 专栏: 《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于

    2023年04月08日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包