Liunx下的消费者与生产者模型与简单线程池的实现

这篇具有很好参考价值的文章主要介绍了Liunx下的消费者与生产者模型与简单线程池的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

本文主要会结束消费者生产者模型,以及简单线程池的实现。

1.消费者与生产者模型

之前我们学了条件变量和互斥等概念。条件变量本质就是一个队列,它会将因为某种条件不满足不能往后执行的线程添加到这个队列中,避免线程做无用功,当条件满足时,会将队列中的线程重新唤醒继续执行。我们接下将利用条件变量和互斥锁实现一个消费者与生产者模型。

消费者与生产者模型是什么呢?

在生活中,工厂将加工好的产品运到超市,超市将商品卖给消费者。这就是一个生产者消费者模型,这样做的好处是消费者能方便买到商品,工厂也能将产品快速输出没有积压货物。这是一种高效的处理方式。在计算机中,也有这种消费者生产者模型,有些线负责从某种渠道拿到数据,这就是生产者;有些线程处理其他线程手中拿到的数据,这就是消费者。我们可以定义一个缓冲区,相当于超市。一边让消费者线程处理数据,一边让生产者线程产生数据。这就是消费者与生产者模型。

消费者与生产者模型的高效性体现在哪里呢?

我们为啥要定义一个缓冲区这么麻烦呢,这个缓冲区是怎么提高效率的呢?其实很简单,负责拿到数据的线程,其实在拿到数据的时候需要处理时间,如果没有缓冲区,这个时候负责处理数据的线程就会陷入等待状态,执行效率就大幅度降低,如果有了这种缓冲区,那么久可以保证消费者线程时时刻刻都会有数据处理。这样就提高了效率。

消费者生产者模型的注意事项

这个缓冲区需要被消费者和生产者看到,这就意味着这个缓冲区就是一个公共资源,也就是临界资源。既然是多线程访问这个临界资源,这里就涉及到多线程的互斥与同步。怎么维护多线程之间的互斥与同步呢?我们分析一下消费者与生产者之间的关系。

首先,生产者与生产者之间的关系:互斥关系;毕竟缓冲区只有一个,只能允许一个线程去访问这个资源。消费者与消费者的关系同理也是互斥。消费者与生产者的关系:互斥和同步。因为不管是消费者还是生产者,只能其中一个去访问临界资源,这就是互斥。只有生产者生产了数据,消费者才能从缓冲区里获得数据消费,这就是同步关系。简单总结一下消费者和生产者模型就是3 2 1原则,即3种关系,2种角色,1种场所。

我们搞明白了上述的道理之后,就可以使用条件变量和互斥锁简单实现一下这个消费者于生产者模型了。我们可以用队列来模拟这个缓冲区,在定义两个函数,一个处理数据一个生产数据,充当消费者和生产者。

BlockQueue.hpp

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
const int gacp=5;
template<class T>
class BlockQueue
{ 
  public:
    BlockQueue(const int cap=gacp )
    :_cap(gacp)
    {
         pthread_mutex_init(&_mutex,nullptr);
         pthread_cond_init(&_consumerCond,nullptr);
         pthread_cond_init(&_productorCond,nullptr);
    }
   ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumerCond);
        pthread_cond_destroy(&_productorCond);
    }
    void push(const T& in)
    {   
         pthread_mutex_lock(&_mutex);
         while(isFull())
         {
            pthread_cond_wait(&_productorCond,&_mutex);
         }
         _q.push(in);
         //if(_q.size()>=_cap/2)
         
         pthread_cond_signal(&_consumerCond);//唤醒消费者
         
         pthread_mutex_unlock(&_mutex);
    }
   bool isFull()
   {
     return _q.size()==_cap;
   }
   bool isEmpty()
   {
     return _q.empty();
   }
    void pop(T*out)
    {
        pthread_mutex_lock(&_mutex);
        while(isEmpty())
        {
            pthread_cond_wait(&_consumerCond,&_mutex);
        }
        *out=_q.front();
        _q.pop();
        
        pthread_cond_signal(&_productorCond);//唤醒生产者
        pthread_mutex_unlock(&_mutex);
    }
  
  
private:
  std::queue<T>_q;
  int _cap;
  pthread_mutex_t _mutex;
  pthread_cond_t _consumerCond;//消费者对应条件变量
  pthread_cond_t _productorCond;//生产者对应的条件变量
  


};

用C++queue容器来封装这个阻塞队列作为缓冲区,push行为就是生产者的行为,pop行为就是消费者行为。我们使用条件变量,当队列为满时就不让生产者生产了,这个时候让生产行为处于等待状态,唤醒消费行为。当队列为空时就不让消费者消费了,这个时候让消费行为处于等待状态,唤醒生产行为。

Task.hpp

#include<iostream>
#include<string>
class Task
{
  public:
  Task()
  {

  }
  Task(int x,int y,char op):_x(x),_y(y),_op(op){}
  ~Task(){}
  void operator()()
  {
    switch(_op)
    {
        case '+':
        _ret=_x+_y;
        break;

        case '-':
        _ret=_x-_y;
        break;
       
        case '*':
        _ret=_x*_y;
        break;
        
        case '/':
        { 
            if(_y==0)
            {
                _exitCode=-1;
            }
            else
           _ret=_x/_y;
        }
        break;
        default:
        break;
    }
  }
  std::string formatRet()
  {
     return std::to_string(_ret)+" "+'('+std::to_string(_exitCode)+')';
  }
  std::string formatArg()
  {
    return std::to_string(_x)+" " +_op+" "+std::to_string(_y)+'=';
  }
    private:
    int _x;
    int _y;
    char _op;
    int _ret;
    int _exitCode;


};

这个是封装了一个任务类,模拟要处理的数据。这个任务类会产生一些需要计算的结果。将这些表达式和结果作为数据传入阻塞队列中由生产者和消费者进行处理。

test.cc

#include"BlockQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
 #include<ctime>
#include<unistd.h>
 void *consumer(void*arg)
 {   
    sleep(1);
    BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
    while(1)
    { 
        Task t;
        //1.从blockqueue中获取数据
        bq->pop(&t);
        t();
       //2.结合某种业务逻辑,处理数据
    std::cout<<"consumer data:"<<t.formatArg()<<t.formatRet()<<std::endl;

    }
 }
 void *producter(void*arg)
 {  
    BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
    std::string opers ="+-*/";
    while(1)
    {  
        //1.从某种渠道获取数据
        int x=rand()%10+1;
        int y=rand()%10+1;
        char op=opers[rand()%opers.size()];
        //2.将数据加入blockqueue中,完成生产过程
        Task t(x,y,op);
        bq->push(t);
        std::cout<<"prducter data:"<<t.formatArg()<<"=?"<<std::endl;
    }
 }
 int main()
 {
    //多产生单消费
    srand((uint64_t)time(nullptr));
    BlockQueue<Task>*bq=new BlockQueue<Task>();
    pthread_t  c[2],p[3];
    pthread_create(&c[0],nullptr,consumer,bq);
    pthread_create(&c[1],nullptr,consumer,bq);
    pthread_create(&p[0],nullptr,consumer,bq);
    pthread_create(&p[1],nullptr,producter,bq);
    pthread_create(&p[2],nullptr,producter,bq);
    
    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);
    pthread_join(p[2],nullptr);
  

    return 0;
 }

这里就是创建线程模拟生产者和生产者在缓冲区中处理数据。

Liunx下的消费者与生产者模型与简单线程池的实现,Liunx操作系统,Liunx,学习,线程,操作系统,线程池

以上就是生产者于消费者模型了,用多线程去模拟消费者和生产者,因为这里只有一份临界资源就是阻塞队列,我们已经对这个阻塞队列进行加锁保护了,同时又维护好了线程之间的同步互斥关系。所以一批线程是没问题的。

2.信号量

信号量之前就提到过,是用来描述临界资源数数目,它的工作机制和买票看电影类似,是一种资源预订机制。它本质就是一个计数器,P操作就是减减,V操作就是加加,PV操作本身是原子的,信号量申请成功表示资源可用,否则表示资源不可用。使用信号量就是相当于把对资源的判断转化成对信号量的申请行为。之前的互斥锁就可以看做是二元信号量,由1到0,再由0到1。信号量也是要被其他线程或者进程所看见的,本质上也是一种临界资源,所以在申请信号量和释放信号量的时候也需要加锁保护。

1.信号量的接口

初始化信号量的函数sem_init

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数说明:sem:需要初始化的信号量。pshared:传入0值表示线程间共享,传入非零值表示进程间共享。value:信号量的初始值(计数器的初始值)。

销毁信号量的函数sem_destroy

int sem_destroy(sem_t *sem);

参数说明:sem:需要销毁的信号量。

等待信号量的函数sem_wait(相当于P操作

int sem_wait(sem_t *sem);

sem:需要等待的信号量。

发布信号量(释放信号量)函数sem_pos(相当于V操作

int sem_post(sem_t *sem);

sem:需要发布的信号量。

2.使用环形队列模拟生产者消费者模型

这个信号量是对临界资源数目的描述,也就是说在某些情况下临界资源是可以拆成一份份的来访问。我们使用环形队列模拟生产者消费者模型时,就可以使用信号量。循环队列每个位置可以看成一份临界资源,为保证消费者和生产者能安全高效的生产的数据,我们规定生产者先生产者数据,消费者再消费数据,同事生产者不能套圈生产数据,不然之前生产的数据可能还没有被消费就会被新数据覆盖,造成数据丢失。消费者也不能追上生产者,因为它没有数据需要消费还可能导致一些异常情况出现。

Liunx下的消费者与生产者模型与简单线程池的实现,Liunx操作系统,Liunx,学习,线程,操作系统,线程池

生产消费关系分析

1.生产者和消费者关系的资源不一样,生产者关心空间,消费者关心数据。2.只要信号量不为0,表示资源可用,线程可访问。3.环形队列我们只要访问不同的区域,生产行为和消费行为是可以同时进行的。4.当队列为空的时生产者先行,当队列为满的时候消费者先行。生产者不能套圈消费者,消费者不能超过生产者。

Liunx下的消费者与生产者模型与简单线程池的实现,Liunx操作系统,Liunx,学习,线程,操作系统,线程池

当我们将资源整体使用的时候就优先考虑互斥锁,当资源可以分成多份使用的时候就优先考虑信号量。

RingQueue.h

#pragma once

#include<iostream>
#include<vector>
#include <semaphore.h>

static const int N=5;

template<class T>
class RingQueue
{
private:
void P(sem_t& s)
{
   sem_wait(&s);                                                                                                                
}
void V(sem_t& s)
{
     sem_post(&s);
}
public:
void push(const T&in)
{
    //生产
    P(_space_sem);
    //一定有对应的空间资源,不用判断 
    _ring[_p_step++]=in;
    _p_step%=_cap;
    V(_data_sem);

     

}
void pop(T*out)
{
   //消费
   P(_data_sem);
   *out=_ring[_c_step++];
   _c_step%=_cap;
   V(_space_sem);



}
RingQueue(int num=N):_ring(num),_cap(num)
{
      sem_init(&_data_sem,0,0);
      sem_init(&_space_sem,0,num);
      _c_step=_p_step=0;
}
~RingQueue()
{
       sem_destroy(&_space_sem);
       sem_destroy(&_data_sem);
}


private:
   std::vector<int>_ring; 
   int _cap;//环形队列容量
   sem_t _data_sem ;//数据信号量,消费者关心
   sem_t _space_sem ;//空间信号量,生产者关心
   int _c_step;//消费位置
   int _p_step;//生产位置
};

test.cc

#include<pthread.h>
#include<memory>
#include<unistd.h>
#include<sys/types.h>
#include"RingQueue.hpp"
using namespace std;

void *consumer(void *arg)
{
  RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
  while(1)
  {
    int data=0;
    rq->pop(&data);
    cout<<"consumer done: "<<data<<endl;

  }
}
void *productor(void *arg)
{
 RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
  while(1)
  {
    int data=rand()%10+1;
    rq->push(data);
    cout<<"productor done: "<<data<<endl;
    sleep(1);
  }
}
int main()
{   
    srand(time(nullptr));

    RingQueue<int>*rq=new RingQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,rq);
    pthread_create(&p,nullptr,productor,rq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
}

Liunx下的消费者与生产者模型与简单线程池的实现,Liunx操作系统,Liunx,学习,线程,操作系统,线程池

因为这里是单生产单消费者,所以对信号量的访问都没有做加锁保护,我们可以将其改造成多生产者多消费者模型。

Task.hpp

#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }
       //模拟处理过程所花费的时间
        usleep(10000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

将之前的模拟计算任务的头文件拿过来,充当数据分配给线程处理。

RingQueue

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

static const int N = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }
    void V(sem_t &s)
    {
        sem_post(&s);
    }
    void Lock(pthread_mutex_t &m)
    {
        pthread_mutex_lock(&m);
    }
    void Unlock(pthread_mutex_t &m)
    {
        pthread_mutex_unlock(&m);
    }

public:
    RingQueue(int num = N) : _ring(num), _cap(num)
    {
        sem_init(&_data_sem, 0, 0);
        sem_init(&_space_sem, 0, num);
        _c_step = _p_step = 0;

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }
    // 生产
    void push(const T &in)
    {  
        //先申请信号量,进行资源分配,再加锁效率高
        
        P(_space_sem);  
        Lock(_p_mutex); 
        _ring[_p_step++] = in;
        _p_step %= _cap;
        Unlock(_p_mutex);
        V(_data_sem);
    }
    // 消费
    void pop(T *out)
    {  //先申请信号量,进行资源分配,再加锁效率高
        P(_data_sem);
        Lock(_c_mutex); 
        *out = _ring[_c_step++];
        _c_step %= _cap;
        Unlock(_c_mutex);
        V(_space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);

        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

private:
    std::vector<T> _ring;
    int _cap;         // 环形队列容器大小
    sem_t _data_sem;  // 消费者关心 数据信号量
    sem_t _space_sem; // 生产者关心 空间信号量
    int _c_step;      // 消费位置
    int _p_step;      // 生产位置

    pthread_mutex_t _c_mutex;//消费者锁
    pthread_mutex_t _p_mutex;//生产者锁
};

这里是多线程并发访问就需要加锁处理,消费者与生产者之间的关系可以通过信号量来维护,消费者与消费者,生产者与生产者的关系是互斥的,需要加锁来访问资源确保任意一个资源只能由一个线程访问,申请资源进行资源的分配,所以这里就定义了两把锁,分别作用与生产者和消费者。

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>

using namespace std;

const char *ops = "+-*/%";

void *consumerRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        rq->pop(&t);
        t();
        cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
    }
}

void *productorRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[(x + y) % strlen(ops)];
        Task t(x, y, op);
        rq->push(t);
        cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
    }
}

int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq = new RingQueue<Task>();
    
    pthread_t c[3], p[2];
    for (int i = 0; i < 3; i++)
        {
            pthread_create(c + i, nullptr, consumerRoutine, rq);
        }
    for (int i = 0; i < 2; i++)
        { 
         pthread_create(p + i, nullptr, productorRoutine, rq);
        }

    for (int i = 0; i < 3; i++)
        {
            pthread_join(c[i], nullptr);
        }
    for (int i = 0; i < 2; i++)
        {
            pthread_join(p[i], nullptr);
        }

    delete rq;
    return 0;
}

Liunx下的消费者与生产者模型与简单线程池的实现,Liunx操作系统,Liunx,学习,线程,操作系统,线程池

简单总结一下:当我们将临界资源整体使用后,优先考虑互斥锁,当临界资源可以被拆分使用的时候就要考虑信使用号量。我们在信号量之后加锁比较合适,这样提前将资源分配好,这样锁申请成功之后就能直接使用临界资源了,一定程度上提高了效率。同时消费者和生产者还有一个高效点,就是在处理的数据的过程中是没有枷锁的吗,比如消费者函数在处理计算表达式的时候就没有加锁的,只有访问队列的时候才触及加锁,同样的生产者函数,产生计算表达式的式的时候也没有加锁。这才是高效的地方。

3.简单实现线程池

什么是线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。简单来数线程池就是提前构建一批线程处理任务,这样免去了构建线程的开销,有任务推送即可马上处理。下面我们简单模拟一下线程池的实现。

我们可以用vector容器作为存分线程的载体,在vector中存放一批线程,同时将需要需要处理的任务放置在任务队列中,我们可以将先将推送的任务添加至任务队列中,再将任务队列中线程取出分配给线程。这样就能维护一个线程池了。同时如果任务队列中无任务了,就需要将线程池中处理任务的逻辑给休眠,等到有任务时在将其换新进行任务的处理。实际上,这个线程池就相当于消费者,缓冲区是封装在线程池中。这也是消费者与生产者模型。生产者我们可以直接自定义的导入任务放入线程池中。

ThreaPool.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"

const static int N = 5;

template <class T>
class ThreadPool
{
public:
    ThreadPool(int num = N) : _num(num)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    pthread_mutex_t* getlock()
    {
        return &_lock;
    }
    void threadWait()
    {
        pthread_cond_wait(&_cond, &_lock);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    bool isEmpty()
    {
        return _tasks.empty();
    }
    T popTask()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }
    static void threadRoutine(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        while (true)
        {
            // 1. 检测有没有任务
            // 2. 有:处理
            // 3. 无:等待
            //必定加锁
            T t;
            {
                LockGuard lockguard(tp->getlock());
                while (tp->isEmpty())
                {
                    tp->threadWait();
                }
                t = tp->popTask(); // 从公共区域拿到私有区域
            }
           
            t();
            std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
            
        }
    }
    void init()
    {
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(Thread(i, threadRoutine, this));
        }
    }
    void start()
    {
        for (auto &t : _threads)
        {
            t.run();
        }
    }
    void check()
    {
        for (auto &t : _threads)
        {
            std::cout << t.threadname() << " running..." << std::endl;
        }
    }
    void pushTask(const T &t)
    {
        LockGuard lockgrard(&_lock);
        _tasks.push(t);
        threadWakeup();
    }
    ~ThreadPool()
    {
        for (auto &t : _threads)
        {
            t.join();
        }
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

private:
    std::vector<Thread> _threads;//线程池
    int _num;

    std::queue<T> _tasks; //任务队列

    pthread_mutex_t _lock;
    pthread_cond_t _cond;
};

接下来就是对一些的简单封装了,比如简单封装一下互斥锁和线程以及任务。

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex // 自己不维护锁,有外部传入
{
public:
    Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmutex);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *_pmutex;
};

class LockGuard // 自己不维护锁,有外部传入
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        _mutex.lock();
    }
    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

Task

#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }

        usleep(100000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>

class Thread
{
public:
    typedef enum
    {
        NEW = 0,
        RUNNING,
        EXITED
    } ThreadStatus;
    typedef void (*func_t)(void *);

public:
    Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args)
    {
        char name[128];
        snprintf(name, sizeof(name), "thread-%d", num);
        _name = name;
    }
    int status() { return _status; }
    std::string threadname() { return _name; }
    pthread_t threadid()
    {
        if (_status == RUNNING)
            return _tid;
        else
        {
            return 0;
        }
    }
  
    static void *runHelper(void *args)
    {
        Thread *ts = (Thread*)args; // 就拿到了当前对象
        (*ts)();
        return nullptr;
    }
    void operator ()() //仿函数
    {
        if(_func != nullptr) _func(_args);
    }
    void run()
    {
        int n = pthread_create(&_tid, nullptr, runHelper, this);
        if(n != 0) exit(1);
        _status = RUNNING;
    }
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        if( n != 0)
        {
            std::cerr << "main thread join thread " << _name << " error" << std::endl;
            return;
        }
        _status = EXITED;
    }
    ~Thread()
    {
    }

private:
    pthread_t _tid;
    std::string _name;
    func_t _func; // 线程未来要执行的回调
    void *_args;
    ThreadStatus _status;
};

接着就是调用逻辑了。

#include"ThreadPool.hpp"
#include<memory>

int main()
{
    std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
    tp->init();
    tp->start();
    while(1)
    {
     int x, y;
        char op;
        std::cout << "please Enter x> ";
        std::cin >> x;
        std::cout << "please Enter y> ";
        std::cin >> y;
        std::cout << "please Enter op(+-*/%)> ";
        std::cin >> op;

        Task t(x, y, op);
        tp->pushTask(t);

        // 充当生产者, 从读取数据,构建成为任务,推送给线程池
         sleep(1);
    
    }
}

Liunx下的消费者与生产者模型与简单线程池的实现,Liunx操作系统,Liunx,学习,线程,操作系统,线程池

以上就是对线程池的简单实现,这将之前写过的东西都给串起来了。

4.补充说明

STL中的容器是否是线程安全的?不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.

其他常见的几种锁

悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁等),当其他线程想要访问数据时,被阻塞挂起。
乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。

以上内容如有问题,欢迎指正!文章来源地址https://www.toymoban.com/news/detail-545588.html

到了这里,关于Liunx下的消费者与生产者模型与简单线程池的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • linux:生产者消费者模型

    个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C++》《Linux》 本文是对于生产者消费者模型的知识总结 生产者消费者模型就是通过一个容器来解决生产者消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者

    2024年04月15日
    浏览(43)
  • Linux——生产者消费者模型

    目录 一.为何要使用生产者消费者模型  二.生产者消费者模型优点  三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列 2.实现代码  四.POSIX信号量 五.基于环形队列的生产消费模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者

    2024年02月08日
    浏览(46)
  • rabbitmq消费者与生产者

    在第一次学习rabbitmq的时候,遇到了许多不懂得 第一步导包 第二步新增生产者 在这里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填写自己的队列名称,如果你的为”/“则填写\\\'\\\'/\\\'\\\' 第三步新增消费者 消息获取成功 注意如果你用的云服务器需要打开这两个端口 5672 15672 如果你使

    2024年02月11日
    浏览(46)
  • 多线程之生产者消费者

    目的是回顾多线程的几个api 多生产者+多消费者+共享池

    2024年02月07日
    浏览(52)
  • kafka生产者消费者练习

    需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    浏览(46)
  • LabVIEW建立生产者消费者

    LabVIEW建立生产者消费者 生产者/消费者设计模式由并行循环组成,这些循环分为两类:生产者循环和消费者循环。生产者循环和消费者循环间的通信可以使用队列或通道连线来实现。 队列 LabVIEW内置的队列操作VI可在函数选板数据通信队列操作( Functions Data Communication  Que

    2024年02月07日
    浏览(38)
  • 【JavaEE】生产者消费者模式

    作者主页: paper jie_博客 本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。 本文于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将基础知识一网打尽,希望可以帮到读者们哦。 其他专栏:《MySQL》《C语言》

    2024年02月05日
    浏览(43)
  • 线程同步--生产者消费者模型

    条件变量是 线程间共享的全局变量 ,线程间可以通过条件变量进行同步控制 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数 进入条件变量等待队列(同时释放互斥锁) ,其他线程则可以通过条件变量在特定的条件下唤醒该线程( 唤醒后线

    2024年01月19日
    浏览(43)
  • SpringCloud深入理解 | 生产者、消费者

    💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! Spring Cloud是一组用于构建分布式系统和微服务架构的开源框架和工具集合。它是在Spring生态系统的基础上构建的,旨在简化开发人员构建分布式系统和微服务应用程序的过程 SpringCloud生态 Spring Cloud NetFlix Spring Cloud Net

    2024年02月14日
    浏览(30)
  • 【设计模式】生产者消费者模型

    带你轻松理解生产者消费者模型!生产者消费者模型可以说是同步与互斥最典型的应用场景了!文末附有模型简单实现的代码,若有疑问可私信一起讨论。 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过

    2023年04月17日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包