Linux基于多线程和任务队列实现生产消费模型

这篇具有很好参考价值的文章主要介绍了Linux基于多线程和任务队列实现生产消费模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、生产者消费者模型

二、代码实现模型

2.1 BlockQueue.hpp

2.2 MainCP.cc

2.3 执行结果

三、效率优势


一、生产者消费者模型

Linux基于多线程和任务队列实现生产消费模型,Linux,linux

将上述图片逻辑转换成代码逻辑就是,一批线程充当生产者角色,一批线程充当消费者角色,仓库是生产者和消费者获取的公共资源!下面我想用321原则来解释这个模型。

既然是公共资源,那么我们就需要考虑线程安全问题!

3指的是三者之间的关系!当一个生产者向仓库生产资源的时候,消费者不可以进入仓库取资源!同样,当一个生产者向仓库放入资源的时候,其他生产者不能同时也向同一个位置放入资源,也就是说生产者要等上一个生产者走出仓库后进入仓库!同样消费者与消费者也是这样的关系!也就是说生产者与生产者互斥,生产者与消费者互斥,消费者与消费者互斥

当消费者与生产者完成自己的操作后,会提醒对方前来进行对方的操作!也就是说,生产者与消费者同步

2指的是2中角色:生产者和消费者

1指的是存储资源的容器(仓库):一段特定结构的缓冲区(队列、栈、链表)


二、代码实现模型

2.1 BlockQueue.hpp

#pragma once
#include<iostream>
#include<pthread.h>
#include<queue>

template <class T>
class BlockQueue
{
public:
    static const int gmaxcap = 5;

     BlockQueue(const int maxcap = gmaxcap):_capacity(maxcap)
     {
        //初始化
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
     }

    void push(const T& in)
    {
        //加锁互斥
        pthread_mutex_lock(&_mutex);

        //细节:为什么这里用while循环判断? 
        //-->有一种可能性,当生产者很多而消费者只有一个,第一次条件满足进入阻塞后,消费者处理了 
        // 一个任务
        //这样就会同时唤醒一批生产者,如果只是if,他们不会再次判断而是直接“同时!”进行后面push的 
        //逻辑!这样不是线程安全的

        while(is_full())
        {
            pthread_cond_wait(&_pcond,&_mutex);
        }
        //进行到这表示一定有空余空间存数据
        _q.push(in);

        //唤醒消费者
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        
        //和上面逻辑一样
        while(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        //到这表示一定有数据
        *out = _q.front();
        _q.pop();
        //唤醒生产者
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size() == _capacity;
    }
private:
    std::queue<T> _q; //存储任务队列
    int _capacity;
    pthread_mutex_t _mutex; //一把锁 -> 3互斥原则
    pthread_cond_t _pcond; //生产者条件变量
    pthread_cond_t _ccond; //消费者条件变量
};


2.2 MainCP.cc

#include "BlockQueue.hpp"
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <functional>

// 任务对象
class Task
{
public:
    //==typedef
    using func_t = std::function<double(int, int, char)>; 

    Task() {}

    Task(func_t callback, int x = 0, int y = 0, char op = '+') : _x(x), _y(y), _op(op), _callback(callback)
    {
    }
    // 仿函数
    std::string operator()()
    {
        double ret = _callback(_x, _y, _op);
        char buffer[64];
        snprintf(buffer, sizeof buffer, "%d %c %d = %lf", _x, _op, _y, ret);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;
    func_t _callback; // 回调函数
};

//处理数据函数
double calculator(int x, int y, char op)
{
    double ret = 0.0;
    switch (op)
    {
    case '+':
        ret = x + y;
        break;
    case '-':
        ret = x - y;
        break;
    case '*':
        ret = x * y;
        break;
    case '/':
        if (y == 0)
            ret = 0;
        else
            ret = (double)x / y;
        break;
    case '%':
        if (y == 0)
            ret = 0;
        else
            ret = x % y;
        break;
    default:
        break;
    }
    return ret;
}

// 生产者任务
void *producer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        // 1.获取数据
        const char *str = "+-*/%";
        int x = rand() % 10 + 1;
        int y = rand() % 5 + 1;
        char op = str[rand() % 5];

        // 2.构建任务对象&传送对于的处理方法
        Task t(calculator, x, y, op);

        // 3.存入队列
        bq->push(t);
        std::cout << "生产任务: " << x << " " << op << " " << y << " = ?" << std::endl;
        sleep(1);
    }
}

// 消费者任务
void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        Task t;

        // 1.获取任务
        bq->pop(&t);

        // 2.执行仿函数
        std::cout << "消费任务: " << t() << std::endl;
    }
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());

    pthread_t c, p;

    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, producer, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

2.3 执行结果

Linux基于多线程和任务队列实现生产消费模型,Linux,linux


三、效率优势

上面的是单个线程,可以拓展到多生产者多消费者!但是我们疑惑的是这个模型在访问资源的时候,都是互斥的!他们只有一个能进入到临界区,这样怎么是高效的?原来这个模型的高效不是在访问临界资源,而是对于每个线程可以独立的准备数据!我们上述实现的数据来源以及数据处理是简单的,但是后面数据可能来源于网络或者磁盘文件,这个过程每个线程都可以同时在线获取或者处理!这个过程如果串行是低效率的,但是多线程可以大大提高IO效率!这就是生产者消费者模型高效的原因!它可以将IO数据分散给多个线程并行处理,极大地提高了效率!文章来源地址https://www.toymoban.com/news/detail-701561.html

到了这里,关于Linux基于多线程和任务队列实现生产消费模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(42)
  • Linux之【多线程】生产者与消费者模型&BlockQueue(阻塞队列)

    举个例子:学生要买东西,一般情况下都会直接联系厂商,因为买的商品不多,对于供货商来说交易成本太高,所以有了交易场所超市这个媒介的存在。目的就是为了集中需求,分发产品。 消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,厂商可以继续生产

    2024年02月02日
    浏览(40)
  • 【linux】POSIX信号量+基于环形队列的生产消费模型

    喜欢的点赞,收藏,关注一下把! 上篇文章最后我们基于BlockQueue生产者消费者模型写了代码,测试什么的都通过了。最后我们说代码还有一些不足的地方,由这些不足从而引入了我们接下来要学的信号量! 我们在看一看不足的地方 1.一个线程,在操作临界资源的时候,必须

    2024年02月01日
    浏览(39)
  • 【Linux系统编程二十九】基于信号量的环形队列生产消费模型

    当共享资源被当成整体使用时,则共享资源的数量要么是1,要么是0。 当被访问使用时,共享资源的数量就为0,当没有被使用时,数量就为1。 共享资源是可以被分成多份来使用的,只不过不同的线程访问的是该共享资源的不同的区域,它是允许多个线程并发访问的,只不过

    2024年01月22日
    浏览(48)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(98)
  • 【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型

    ​🌠 作者:@阿亮joy. 🎆 专栏: 《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于

    2023年04月08日
    浏览(62)
  • Python多线程Thread——生产者消费者模型 python队列与多线程——生产者消费者模型

    下面面向对象的角度看线程 那么你可以试试看能不能用面向对象的方法实现生产者消费者模型吧。

    2024年02月09日
    浏览(52)
  • 多线程(初阶七:阻塞队列和生产者消费者模型)

    目录 一、阻塞队列的简单介绍 二、生产者消费者模型 1、举个栗子: 2、引入生产者消费者模型的意义: (1)解耦合 (2)削峰填谷 三、模拟实现阻塞队列 1、阻塞队列的简单介绍 2、实现阻塞队列 (1)实现普通队列 (2)加上线程安全 (3)加上阻塞功能 3、运用阻塞队列

    2024年02月05日
    浏览(38)
  • 【Linux】多线程 --- 线程同步与互斥+生产消费模型

    人生总是那么痛苦吗?还是只有小时候是这样? —总是如此 1. 假设现在有一份共享资源tickets,如果我们想让多个线程都对这个资源进行操作,也就是tickets- -的操作,但下面两份代码分别出现了不同的结果,上面代码并没有出现问题,而下面代码却出现了票为负数的情况,这

    2024年02月06日
    浏览(43)
  • [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

    网络层与传输层 内置于操作系统的内核中 ,网络层一般使用 ip 协议,传输层常用协议为 Tcp 协议和 Udp 协议, Tcp 协议和 Udp 协议拥有各自的特点和应用场景: sockaddr_in 结构体用于存储网络通信主机进程的 ip 和端口号等信息 小项目的完整文件的gittee链接 Tcp 服务器架构: 序列反序列

    2024年02月22日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包