生产者消费者模型 C++ 版

这篇具有很好参考价值的文章主要介绍了生产者消费者模型 C++ 版。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基础生产者消费者模型

网上一般教程是使用std::queue,定义消费者 Consumer ,定义Producter类,在main函数里面加锁保证线程安全。
本片文章,实现一个线程安全的队列 threadsafe_queue,只在队列内部加锁。如此可适配,多生产者多消费者的场景

线程安全的队列 threadsafe_queue

#pragma once
#include<mutex>
#include <condition_variable>
#include<queue>
//最大产品数量
#define MAX_SIZE 20
#include <iostream>
template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}
    
    void wait_and_pop(T& value)  // 2
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
        std::cout << "Consumer pop : " << value << std::endl;
        data_cond.notify_all();
    }
    std::shared_ptr<T> wait_and_pop()  // 3
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });  // 4
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        data_cond.notify_all();
        return res;
    }
    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        data_cond.notify_all();
        return true;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();  // 5
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        data_cond.notify_all();
        return res;
    }
    void push(T new_value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this] {return data_queue.size() < MAX_SIZE; });
        std::cout << "Producter push : " << new_value << std::endl;
        data_queue.push(std::move(new_value));

        data_cond.notify_all();  // 1
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};


消费者

Consumer 头文件

#pragma once
#include"threadsafe_queue.h"
/*
基础版 消费者生产者模型
此文件为 消费者
*/

class Consumer
{
public:
	Consumer(threadsafe_queue<int>& queue);
	~Consumer();
	void start();

private:
	threadsafe_queue<int>& m_pQueue;
};

cpp文件

#include "Consumer.h"
#include <iostream>
#include<windows.h>
Consumer::Consumer(threadsafe_queue<int> &queue)
	: m_pQueue(queue)
{

}

Consumer::~Consumer()
{
}


void Consumer::start()
{
	while (true)
	{
		int value;
		m_pQueue.wait_and_pop(value);
		Sleep(10);
	}
}
 

生产者

#pragma once
#include"threadsafe_queue.h"
class Producter
{
public:
	Producter(threadsafe_queue<int>& queue,int i);
	~Producter();
	void start();

private:
	threadsafe_queue<int>& m_pQueue;
	int index;
};
#include "Producter.h"
#include<stdlib.h>
#include <iostream>
#include<windows.h>
Producter::Producter(threadsafe_queue<int>& queue, int i)
	: m_pQueue(queue),index(i)
{

}

Producter::~Producter()
{
}


void Producter::start()
{
	int i = 0;//为了测试是哪个线程,加入了哪些数据用的
	while (true)
	{
		//int data = rand();
		m_pQueue.push(i*2+ index);// index =0 为偶数生产者,只生产偶数,index =1则是生产奇数
		Sleep(100);
		i++;
	}
}

运行结果如下:
生产者消费者模型 C++ 版,C++,c++,算法

控制消费者有序消费

优先队列做缓存
头文件

#pragma once
#include<mutex>
#include <condition_variable>
#include <iostream>
#include <algorithm>
#include <vector>
#include <queue>

//最大产品数量
#define MAX_SIZE 20
/*


*/
template<typename T, class _Container = std::vector<T>, class _Pr = std::less<typename _Container::value_type>>
class threadsafe_priority_queue
{
public:
    threadsafe_priority_queue();
    ~threadsafe_priority_queue();

    void push(T new_value);
    bool empty() const;

    void wait_and_pop(T& value);

    std::shared_ptr<T> wait_and_pop();

    bool try_pop(T& value);

    std::shared_ptr<T> try_pop();


    void wait_and_pop(T& value, bool (*func)(const T&, int), int curIndex);

    std::shared_ptr<T> wait_and_pop(bool (*func)(const T&, int), int curIndex);

    bool try_pop(T& value, bool (*func)(const T&, int), int curIndex);

    std::shared_ptr<T> try_pop( bool (*func)(const T&, int), int curIndex);


private:
    mutable std::mutex mut;
    std::priority_queue<T, _Container, _Pr> data_queue;
    std::condition_variable data_cond;

};

实现

#include "threadsafe_priority_queue.h"
#include <iomanip>
template<typename T, class _Container, class _Pr >
threadsafe_priority_queue<T, _Container, _Pr>::threadsafe_priority_queue()
{}

template<typename T, class _Container, class _Pr >
threadsafe_priority_queue<T, _Container, _Pr>::~threadsafe_priority_queue()
{
}


template<typename T, class _Container, class _Pr >
void threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop(T& value)  // 2
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });
    value = std::move(data_queue.front());
    data_queue.pop();
    std::cout << "Consumer pop : " << value << std::endl;
    data_cond.notify_all();
}

template<typename T, class _Container, class _Pr >
std::shared_ptr<T> threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop()  // 3
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 4
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

template<typename T, class _Container, class _Pr >
bool threadsafe_priority_queue<T, _Container, _Pr>::try_pop(T& value)
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
        return false;
    value = std::move(data_queue.front());
    data_queue.pop();
    data_cond.notify_all();
    return true;
}

template<typename T, class _Container, class _Pr >
std::shared_ptr<T> threadsafe_priority_queue<T, _Container, _Pr>::try_pop()
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
        return std::shared_ptr<T>();  // 5
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

template<typename T, class _Container, class _Pr >
void threadsafe_priority_queue<T, _Container, _Pr>::push(T new_value)
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] {return data_queue.size() < MAX_SIZE; });
    std::cout.flags(std::ios::left);  //左对齐
    std::cout << "Producter push : " << new_value << std::endl;
    data_queue.push(std::move(new_value));

    data_cond.notify_all();  // 1
}

template<typename T, class _Container, class _Pr >
bool threadsafe_priority_queue<T, _Container, _Pr>::empty() const
{
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
}



template<typename T, class _Container, class _Pr >
void threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop(T& value, bool (*func)(const T&, int), int curIndex )
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this, func, curIndex] {
        if (!data_queue.empty() && func(data_queue.top(), curIndex))
        {
            return true;
        }
        if (!data_queue.empty())
            std::cout << "top value: " << data_queue.top() << " ,current index : " << curIndex << std::endl;
        return false;
        });
    value = std::move(data_queue.top());
    data_queue.pop();
   //右对齐
    std::cout << std::setw(100) << setiosflags(std::ios::right)<< "Consumer pop : " << value << std::endl;
    data_cond.notify_all();
}

template<typename T, class _Container, class _Pr >
std::shared_ptr<T> threadsafe_priority_queue<T, _Container, _Pr>::wait_and_pop(bool (*func)(const T&, int), int curIndex)
{
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this, func, curIndex] {
        if (!data_queue.empty() && func(data_queue.front(), curIndex))
        {
            return true;
        }
        return false; 
        });  // 4
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

template<typename T, class _Container, class _Pr >
bool threadsafe_priority_queue<T, _Container, _Pr>::try_pop(T& value, bool (*func)(const T&, int), int curIndex)
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty() || func(data_queue.front(), curIndex))
    {
        return std::shared_ptr<T>();
    }
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}


template<typename T, class _Container, class _Pr >
std::shared_ptr<T>  threadsafe_priority_queue<T, _Container, _Pr>::try_pop(bool (*func)(const T&, int), int curIndex)
{
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty() || func(data_queue.front(), curIndex))
    {
        return std::shared_ptr<T>();
    }
       
    std::shared_ptr<T> res(
        std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    data_cond.notify_all();
    return res;
}

生产者

#include "threadsafe_priority_queue.h"
class Producter2
{
public:
	Producter2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue, int i);
	~Producter2();
	void start();

private:
	threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& m_pQueue;
	int index;
};

Producter2::Producter2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue, int i)
	: m_pQueue(queue), index(i)
{

}

Producter2::~Producter2()
{
}


void Producter2::start()
{
	int i = 0;
	while (i < 200)
	{
		//int data = rand();
		m_pQueue.push(i * 3 + index);
		Sleep(30);
		i++;
	}
}

消费者

 #include "threadsafe_priority_queue.h"

class Consumer2
{
public:
	Consumer2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue);
	~Consumer2();
	void start();

private:
	threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& m_pQueue;
	int m_curIndex=-1;
};

Consumer2::Consumer2(threadsafe_priority_queue<int, std::vector<int>, std::greater<int>>& queue)
	: m_pQueue(queue)
{

}

Consumer2::~Consumer2()
{
}

bool compare( const int & nextIndex,int curindex)
{
	return nextIndex == (curindex + 1);
}

void Consumer2::start()
{
	int i = 0;
	while (i < 200)
	{
		int value;
		m_pQueue.wait_and_pop(value, compare, m_curIndex);
		m_curIndex++;
		Sleep(10);
		i++;
	}
}

main函数


#include <iostream>
#include "Producter.h"
#include "Consumer.h"
# include<thread>
#include"threadsafe_queue.h"
#include"threadsafe_queue.cpp"
#include<windows.h>
#include "threadsafe_priority_queue.h"
#include "threadsafe_priority_queue.cpp"

int main()
{
	threadsafe_priority_queue<int, std::vector<int>, std::greater<int>> queue;
	Consumer2 aonsumer(queue);
	Producter2 producter(queue,0);
	Producter2 producter2(queue, 1);
	Producter2 producter3(queue, 2);
	std::thread tProducter3(&Producter2::start, &producter3);
	std::thread tProducter2(&Producter2::start, &producter2);
	std::thread tProducter1(&Producter2::start, &producter);
	std::thread tConsumer(&Consumer2::start, &aonsumer);
	
	tConsumer.join();
	tProducter3.join();
	tProducter2.join();
	tProducter1.join();
	return 0;

}

生产者消费者模型 C++ 版,C++,c++,算法
如何停止消费者?
采用毒丸策略
多线程生产者、消费者模式中,如何停止消费者文章来源地址https://www.toymoban.com/news/detail-637754.html

到了这里,关于生产者消费者模型 C++ 版的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 线程同步--生产者消费者模型

    条件变量是 线程间共享的全局变量 ,线程间可以通过条件变量进行同步控制 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数 进入条件变量等待队列(同时释放互斥锁) ,其他线程则可以通过条件变量在特定的条件下唤醒该线程( 唤醒后线

    2024年01月19日
    浏览(27)
  • Linux——生产者消费者模型

    目录 一.为何要使用生产者消费者模型  二.生产者消费者模型优点  三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列 2.实现代码  四.POSIX信号量 五.基于环形队列的生产消费模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者

    2024年02月08日
    浏览(33)
  • 【设计模式】生产者消费者模型

    带你轻松理解生产者消费者模型!生产者消费者模型可以说是同步与互斥最典型的应用场景了!文末附有模型简单实现的代码,若有疑问可私信一起讨论。 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过

    2023年04月17日
    浏览(26)
  • 【Linux】深入理解生产者消费者模型

    生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。 在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,同理如果消费

    2024年02月06日
    浏览(26)
  • 【Linux】线程安全-生产者消费者模型

    1个线程安全的队列:只要保证先进先出特性的数据结构都可以称为队列 这个队列要保证互斥(就是保证当前只有一个线程对队列进行操作,其他线程不可以同时来操作),还要保证同步,当生产者将队列中填充满了之后要通知消费者来进行消费,消费者消费之后通知生产者

    2024年02月10日
    浏览(31)
  • Java中的生产者/消费者模型

    生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。 比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据的模块,则被称为消费者。 生产者和消费者在同一段时间内共用同一个存储空

    2024年02月07日
    浏览(34)
  • 生产者消费者模型(基于go实现)

    基于 Channel 编写一个简单的单线程生产者消费者模型: 队列: 队列长度 10,队列元素类型为 int 生产者: 每 1 秒往队列中放入一个类型为 int 的元素,消费者: 每一秒从队列中获取一个元素并打印。 基于 Channel 编写一个简单的单线程生产者消费者模型: 队列: 队列长度

    2024年02月11日
    浏览(24)
  • 基于互斥锁的生产者消费者模型

    生产者消费者模型 是一种常用的 并发编程模型 ,用于 解决多线程或多进程环境下的协作问题 。该模型包含两类角色: 生产者和消费者 。 生产者负责生成数据 ,并将数据存放到共享的缓冲区中。 消费者则从缓冲区中获取数据 并进行处理。生产者和消费者之间通过共享的

    2024年02月12日
    浏览(31)
  • Linux——生产者消费者模型和信号量

    目录 ​​​​​​​ 基于BlockingQueue的生产者消费者模型 概念 条件变量的第二个参数的作用  锁的作用 生产者消费者模型的高效性 生产者而言,向blockqueue里面放置任务 消费者而言,从blockqueue里面拿取任务: 总结 完整代码(不含存储数据的线程) 完整代码(含存储线程)  信

    2024年02月07日
    浏览(31)
  • 基于 BlockQueue(阻塞队列) 的 生产者消费者模型

    阻塞队列(Blocking Queue) 是一种特殊类型的队列,它具有阻塞操作的特性。在并发编程中,阻塞队列可以用于实现线程间的安全通信和数据共享。 阻塞队列的 主要特点 是: 当 队列为空时 ,消费者线程尝试从队列中获取(出队)元素时会被阻塞,直到有新的元素被添加到队

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包