【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

这篇具有很好参考价值的文章主要介绍了【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.线程的同步与互斥

死锁问题

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

互斥

当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能导致数据不一致问题,为了避免此问题的发生,就需要对这块临界资源增加一些限制,一次只能有一个线程访问临界资源,即线程互斥

那么如何实现互斥?

答案是加锁!

当线程申请所成功,才能向后执行,否则阻塞等待,该进程访问完成后,释放锁,然后其它线程再来申请锁。

注意,在有锁保护的临界区中,线程仍然可以被切换,并且会连带着锁一起被切换,在这期间,其它的线程依旧不能访问临界区,因为它们没有申请到锁,锁仍然被那个线程拿着。

关于锁的一些函数

【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池,Linux,开发语言,linux,生产者消费者模型,同步与互斥,单例模式

同步

上面可能导致一个问题,当在纯互斥的环境下,可能一个线程对锁的竞争能力很强,导致它释放锁后,又马上申请到了锁,这样就一直是这一个线程持有锁,而其它线程无法申请到锁,也就无法访问临界区,造成这些线程的饥饿问题

为了解决这个问题,我们可以让所有阻塞的线程排成一个队,当一个线程释放锁后,就排到队尾,然后由位于队首的线程申请锁,这样就很好地避免了线程的饥饿问题。

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问
题,叫做同步

如何实现线程同步?

答案是条件变量!

让所有阻塞等待的线程都到条件变量队列下等待,当一个线程释放锁时,就唤醒一个条件变量队列中的线程。

关于条件变量的一些函数

【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池,Linux,开发语言,linux,生产者消费者模型,同步与互斥,单例模式


二.生产者消费者模型

什么是生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过一个共享容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给共享容器,消费者不找生产者要数据,而是直接从共享容器里取,共享容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个共享容器就是用来给生产者和消费者解耦的。

在生产者消费者模型中,生产者有两项任务

  1. 获取数据
  2. 生产数据

消费者也有两项任务:

  1. 消费数据
  2. 处理加工数据

这因为生产者和消费者有着这些任务,当生产者在获取数据的时候,消费者可以消费数据,当消费者在处理加个数据的时候,生产者可以生产数据,而获取数据和处理加工数据都是独立进行的,不需要共享容器的参与,这就提高了生产者和消费者的并发度

321原则

我们可以把生产者消费者模型简单记成 “321” 原则

  • 3是指有三种关系:消费者和消费者(互斥关系),生产者和生产者(互斥关系),消费者和生产者(互斥和同步关系)
  • 2是指有两个角色:生产者和消费者
  • 1是指有一个共享容器

优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池,Linux,开发语言,linux,生产者消费者模型,同步与互斥,单例模式


三.基于阻塞队列的生产者消费者模型

这个需要用到锁和条件变量。

阻塞队列就是生产者和消费者的共享容器,生产者是从数据到阻塞队列中,消费者从阻塞队列中拿数据。

需要注意的是:

  • 当阻塞队列为空时,消费者不可以从阻塞队列中拿数据,此时消费者进入条件变量队列下等待,当消费了一个数据,就可以唤醒一个生产者生产了
  • 当阻塞队列满时,生产者不可以向阻塞队列中生产数据,此时生产者进入条件变量队列下等待,当生产了一个数据,就可以唤醒一个消费者消费了

源码BlockQueue.hpp

#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <queue>

using namespace std;


class thread_data
{
public:
    string name;
    pthread_t tid;
};

static const int defaultcap=5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap=defaultcap)
    :_maxcap(cap)
    {   
        //初始化锁和条件变量
        pthread_mutex_init(&mutex,nullptr);
        pthread_cond_init(&_c_cond,nullptr);
        pthread_cond_init(&_p_cond,nullptr);
    }


    void push(const T&data)
    {
        pthread_mutex_lock(&mutex);  //加锁要在判断是否能合法生产前,因为判断也是访问临界资源,并且判断语句不具有原子性
        while(_q.size()>=_maxcap)   //使用while,防止伪唤醒
        {
            pthread_cond_wait(&_p_cond,&mutex);   //当函数成功返回时,锁会被自动释放
        }

        _q.push(data);   //生产一个数据
        pthread_cond_signal(&_c_cond);   //唤醒一个消费者线程
        pthread_mutex_unlock(&mutex);   //解锁
    }

    T pop()
    {
        pthread_mutex_lock(&mutex);
        while(_q.size()<=0)   //同样要先判断是否能合法消费
        {
            pthread_cond_wait(&_c_cond,&mutex);
        }

        T data=_q.front();   //消费一个数据
        _q.pop();
        pthread_cond_signal(&_p_cond);   //唤醒一个生产者线程
        pthread_mutex_unlock(&mutex);   //解锁

        return data;
    }

    ~BlockQueue()
    {
        //销毁锁和条件变量
        pthread_mutex_destroy(&mutex);

        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }
private:
    queue<T> _q;   //阻塞队列
    int _maxcap;   //最大容量

    pthread_mutex_t mutex;  //锁

    pthread_cond_t _c_cond;  //消费者的条件变量
    pthread_cond_t _p_cond;  //生产者的条件变量
};

四.基于环形队列的生产者消费者模型

POSIX信号量

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

 销毁信号量

int sem_destroy(sem_t *sem);

 等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

 发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

环形队列

  • 当环形队列为空或为满时,生产者和消费者才会相遇

  • 消费者不能超过生产者,生产者不能超过消费者一个圈 

  • 生产者关心的是还剩多少剩余空间,消费者关心的是现有多少数据

  • 生产者和消费者访问下标的行为互斥的,所以需要用到锁

【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池,Linux,开发语言,linux,生产者消费者模型,同步与互斥,单例模式

源码RingQueue.hpp

#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <queue>
#include <semaphore.h>

using namespace std;

static const int defaultcap=5;

template<class T>
class RingQueue
{
public:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem)
    {
        sem_post(&sem);
    }

    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap=defaultcap)
    :_cap(cap),_p_step(0),_c_step(0),Ringqueue(cap)
    {
        sem_init(&cdata_sem,0,0);    //初始有0个数据
        sem_init(&pspace_sem,0,_cap);   //初始有所有的空间

        pthread_mutex_init(&c_mutex,nullptr);
        pthread_mutex_init(&p_mutex,nullptr);
    }

    void push(const T&t)
    {
        P(pspace_sem);

        Lock(p_mutex);    //注意P操作在上锁前,因为P操作并不会访问下标,且P操作是原子的
        Ringqueue[_p_step]=t;
        _p_step++;
        _p_step%=_cap;
        Unlock(p_mutex);

        V(cdata_sem);
    }

    void  pop(T*out)
    {
        P(cdata_sem);

        Lock(c_mutex);
        *out=Ringqueue[_c_step];
        _c_step++;
        _c_step%=_cap;
        Unlock(c_mutex);

        V(pspace_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
private:
    vector<T> Ringqueue;   //使用数组模拟循环队列

    int _cap;  //最大容量

    int _p_step;  //生产者下标
    int _c_step;  //消费者下标

    sem_t cdata_sem;   //现有资源信号量
    sem_t pspace_sem;  //剩余空间信号量

    pthread_mutex_t c_mutex;   //消费者的锁
    pthread_mutex_t p_mutex;   //生产者的锁
};

五.基于单例模式的线程池的简单实现

 其实线程池就是利用的生产者消费者模型。

线程池是什么

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。

线程池维护着多个线程,等待着监督管理者分配可并发执行的任务

避免了在处理短时间任务时创建与销毁线程的代价

线程池不仅能够保证内核的充分利用,还能防止过分调度。

可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

线程池的应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

单例模式

单例模式就是类只能实例化一个对象。

这样可以节省空间。

什么是饿汉模式?什么是懒汉模式?

  • 吃完饭, 立刻洗碗, 这种就是饿汉方式,因为下一顿吃的时候可以立刻拿着碗就能吃饭.
  • 吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式;懒汉方式最核心的思想是 "延时加载",从而能够优化服务器的启动速度

饿汉模式实现单例模式


//单例模式的核心是只能实例化一个对象,所以要禁止生成拷贝构造和赋值重载,以防生成多余的对象
//为了只实例化一个对象,要把构造函数设置成私有,并设置一个专门的函数来获取单例对象
//所以这个函数不可以是普通的类内函数,因为要想调用普通的类内函数,必须要先有对象,但构造函数已经私有了,不可能再创建对象,所以要把这个函数设置成静态的
template <class T>
class Singleton 
{
public:
    static Singleton<T>* GetInstance() //获取单例对象
    {
        return &data;
    }
private:
    Singleton()   //构造函数私有

    Singleton(const Singleton&)=delete;   //禁止生成拷贝构造
    const Singleton& operator=(const Singleton&)=delete;   //禁止生成赋值重载

    ~Singleton()  //析构函数私有
private: 
    static Singleton<T> data;
};

懒汉模式实现单例模式

template <class T>
class Singleton 
{
public:
    static Singleton<T>* GetInstance() //获取单例对象
    {
        if(init==nullptr)
        {
            init=new Singleton<T>();
        }
        return init;
    }
private:
    Singleton()   //构造函数私有

    Singleton(const Singleton&)=delete;   //禁止生成拷贝构造
    const Singleton& operator=(const Singleton&)=delete;   //禁止生成赋值重载

    ~Singleton()  //析构函数私有
private: 
    static Singleton<T>* init;
};

但是其实这样存在一个严重的问题,就是线程不安全,所以要加锁。 

C++类内创建线程须知

C++的类内成员函数是默认传一个参数this指针的,这就不符合线程创建所需要的函数特征,即参数必须是:void*

所以在类内,我们把这个函数声明为 static ,但是声明成 static 就不能访问类内成员了,所以线程创建函数再传一个 this 指针过去

基于懒汉模式的简易线程池源码threadpool.hpp

#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <queue>

using namespace std;

static const int defaultnum = 5;

struct ThreadInfo
{
    pthread_t tid;
    string name;
};

template <class T>
class ThreadPool
{
public:
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }

    void ThreadSleep(pthread_cond_t &cond, pthread_mutex_t &mutex)
    {
        pthread_cond_wait(&cond, &mutex);
    }

    void Wakeup(pthread_cond_t &cond)
    {
        pthread_cond_signal(&cond);
    }

    bool IsEmpty()
    {
        return _tasks.empty();
    }

public:
    static void *HandlerThread(void *args) // 在类内创建线程要声明成static,因为类内函数默认有一个参数this指针,这就不符合创建线程所需函数的特征(参数必须是void*)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        while (true)
        {
            tp->Lock(tp->mutex);
            while (tp->IsEmpty())
            {
                tp->ThreadSleep(tp->cond, tp->mutex);
            }

            T t = tp->pop();
            tp->Unlock(tp->mutex);
            cout << "执行任务..." << endl;
        }
    }

    void Start() // 启动线程池,即创建线程
    {
        for (int i = 0; i < _threads.size(); i++)
        {
            _threads[i].name = "thread-" + to_string(i + 1);
            pthread_create(&_threads[i].tid, nullptr, HandlerThread, this); // 声明成static就不能访问类内成员,所传个this指针过去
        }
    }

    T pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    void Push(const T &t) // 为线程派发任务
    {
        Lock(mutex);
        _tasks.push(t);
        Wakeup(cond);
        Unlock(mutex);
    }

    //基于懒汉模式的单例模式
    static ThreadPool<T> *GetInstace()  //获取单例对象
    {
        if (_tp == nullptr)   //因为只有第一次实例化对象时,_tp才为nullptr,后面每次获取单例对象都不为空了,此时加锁是没有必要的,为了保证效率,这里采用双层判断,如果_tp为空才加锁并实例化对象
        {
            pthread_mutex_lock(&_lock);
            if (_tp == nullptr)
            {
                _tp = new ThreadPool<T>();
                cout << "first create  Instance " << endl;
            }
            pthread_mutex_unlock(&_lock);
        }

        return _tp;
    }

private:
    ThreadPool(int num = defaultnum)   //构造函数私有
        : _threads(num)
    {
        pthread_mutex_init(&mutex, nullptr);
        pthread_cond_init(&cond, nullptr);
    }

    ThreadPool(const ThreadPool<T> &) = delete;   //禁止生成拷贝构造
    const ThreadPool<T> & operator=(const ThreadPool<T>&) = delete;  //禁止生成赋值重载

    ~ThreadPool()   //析构函数私有
    {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }

private:
    vector<ThreadInfo> _threads;
    queue<T> _tasks; // 线程的任务

    pthread_mutex_t mutex;
    pthread_cond_t cond;

    static pthread_mutex_t _lock;   //为了实例单例对象时的线程安全

    static ThreadPool<T> *_tp;
};

//类内静态变量一般在类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;   

template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

 六.常见的锁

其实上面我们用到的都是互斥锁,还有一些其它常见的锁。

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁:把频繁申请访问临界区的锁,称为自旋锁

上文使用的互斥锁。如果申请锁不成功,那就一直被阻塞挂起,直到锁被释放,这种适合,访问临界区时间长的场景

而自旋锁是即使你申请失败了,也不会被阻塞挂起,而是一直访问,这种适合访问临界区时间短的场景

linux中也有一批关于自旋锁的接口:

【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池,Linux,开发语言,linux,生产者消费者模型,同步与互斥,单例模式

【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池,Linux,开发语言,linux,生产者消费者模型,同步与互斥,单例模式 用法都和互斥锁的类似。


🐬🤖本篇文章到此就结束了, 若有错误或是建议的话,欢迎小伙伴们指出;🕊️👻

😄😆希望小伙伴们能支持支持博主啊,你们的支持对我很重要哦;🥰🤩

😍😁谢谢你的阅读。😸😼

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=27ajwbqqwluss文章来源地址https://www.toymoban.com/news/detail-821077.html

到了这里,关于【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Linux】生产者消费者模型代码实现和信号量

    一定要先理解生产者消费者模型的原理~ 文章目录 一、生产者消费者模型实现代码 二、信号量 1.基于环形队列的生产者消费者模型 总结 下面我们实现基于阻塞队列的生产消费模型: 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其

    2024年02月11日
    浏览(40)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(45)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(99)
  • Linux之信号量 | 消费者生产者模型的循环队列

    目录 一、信号量 1、概念 2、信号量操作函数 二、基于环形队列的生产者消费者模型 1、模型分析 2、代码实现 1、单生产单消费的生产者消费者模型 2、多生产多消费的生产者消费者模型 引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临

    2024年04月17日
    浏览(40)
  • 『Linux』第九讲:Linux多线程详解(四)_ 生产者消费者模型

    「前言」文章是关于Linux多线程方面的知识,上一篇是 Linux多线程详解(三),今天这篇是 Linux多线程详解(四),内容大致是生产消费者模型,讲解下面开始! 「归属专栏」Linux系统编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 「枫叶先生有点文青病」「每篇一句」

    2024年02月07日
    浏览(45)
  • 《Linux从练气到飞升》No.29 生产者消费者模型

    🕺作者: 主页 我的专栏 C语言从0到1 探秘C++ 数据结构从0到1 探秘Linux 菜鸟刷题集 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇 码字不易,你的👍点赞🙌收藏❤️关注对我真的很重要,有问题可在评论区提出,感谢阅读!!!

    2024年02月05日
    浏览(42)
  • 【Linux】线程同步 -- 条件变量 | 生产者消费者模型 | 自旋锁 |读写锁

    举一个例子: 学生去超市消费的时候,与厂家生产的时候,两者互不相冲突。 生产的过程与消费的过程 – 解耦 临时的保存产品的场所(超时) – 缓冲区 模型总结“321”原则: 3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥[保证共享资

    2024年02月14日
    浏览(37)
  • Linux之【多线程】生产者与消费者模型&BlockQueue(阻塞队列)

    举个例子:学生要买东西,一般情况下都会直接联系厂商,因为买的商品不多,对于供货商来说交易成本太高,所以有了交易场所超市这个媒介的存在。目的就是为了集中需求,分发产品。 消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,厂商可以继续生产

    2024年02月02日
    浏览(46)
  • 线程池-手写线程池Linux C简单版本(生产者-消费者模型)

    本线程池采用C语言实现 线程池的场景: 当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池 线程池的一般特点: 线程池通常是一个生产者-消费者模型 生产者线程用于发布任务,任务通常保存在任务队列中 线程池作为消费者,

    2024年02月14日
    浏览(48)
  • 【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型

    ​🌠 作者:@阿亮joy. 🎆 专栏: 《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于

    2023年04月08日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包