1. 生产消费者模型基本概念
生产者消费者模型是一种常用的并发设计模式,它可以解决生产者和消费者之间的速度不匹配、解耦、异步等问题。生产者消费者模型的应用场景有很多,例如Excutor任务执行框架、消息中间件activeMQ、任务的处理时间比较长的情况下等。
生产者消费者模型的基本结构如下:
- 生产者(Producer):负责生成数据或任务,放入缓冲区(Buffer)中。
- 消费者(Consumer):负责从缓冲区中取出数据或任务,进行处理。
- 缓冲区(Buffer):一般是一个有限大小的队列,用来存储生产者生成的数据或任务,同时提供给消费者使用。
生产者消费者模型的核心是缓冲区,它可以平衡生产者和消费者的处理能力,起到一个数据缓存的作用,同时也达到了一个解耦的作用
。缓冲区的实现方式有多种,例如:
- 使用Object的wait()和notify()方法,让生产者和消费者在缓冲区满或空时进行等待和唤醒。
- 使用Semaphore的acquire()和release()方法,让生产者和消费者通过信号量控制缓冲区的访问。
- 使用BlockingQueue阻塞队列,让生产者和消费者通过put()和take()方法自动实现阻塞和唤醒。
- 使用Lock和Condition的await()和signal()方法,让生产者和消费者通过条件变量控制缓冲区的状态。
- 使用PipedInputStream和PipedOutputStream,让生产者和消费者通过管道流进行通信。
生产者消费者模型的应用场景有很多,例如:
- Excutor任务执行框架:通过将任务的提交和任务的执行解耦开来,提交任务的操作相当于生产者,执行任务的操作相当于消费者。
- 消息中间件activeMQ: 双十一的时候,会产生大量的订单,那么不可能同时处理那么多的订单,需要将订单放入一个队列里面,然后由专门的线程处理订单。
- 任务的处理时间比较长的情况下:比如上传附件并处理,那么这个时候可以将用户上传和处理附件分成两个过程,用一个队列暂时存储用户上传的附件,然后立刻返回用户上传成功,然后有专门的线程处理队列中的附件。
生产者消费者模型优点:
- 解耦:生产者和消费者之间不直接通信,而是通过缓冲区来进行通信,降低了代码之间的依赖性,简化了工作负载的管理。
- 复用:生产者和消费者可以独立地进行复用和扩展,增加了代码的可维护性和可扩展性。
调整并发数:生产者和消费者的处理速度可能不一致,可以通过调整并发数来平衡速度差异,提高系统的吞吐量和效率。 - 异步:生产者不需要等待消费者处理完数据才能继续生产,消费者也不需要等待生产者生成数据才能继续消费,通过异步的方式支持高并发,提高系统的响应性和灵活性。
- 支持分布式:生产者和消费者可以运行在不同的机器上,通过分布式的缓冲区来进行通信,增加了系统的可伸缩性和容错性。
2. 基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
基于BlockingQueue的生产者消费者模型,可以封装一个类,这个类就只有简单的插入、删除操作是一个简单的阻塞队列,并且内部用条件变量来实现。
blockQueue.hpp源代码:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
const int gcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gacp)
:_cap = cap
{
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_productorCond, nullptr);
}
bool isFull()
{
return _q.size() == _cap;
}
bool isEmpty()
{
return _q.empty();
}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
while (isFull())
{
pthread_cond_wait(&_productorCond, &_mutex);
sleep(1); // 每隔1s询问一次队列是否为满,因为消费者可能在这1s中消费
}
_q.push(in); // 队列不满,则可以插入
pthread_cond_signal(&_consumerCond); // 队列某一时刻可能为空,消费者被阻塞,所以需要在这唤醒消费者
// 线程如果醒着,那么再唤醒不会出问题;相反线程阻塞,再次用函数阻塞也没问题
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&mutex);
while (isEmpty())
{
pthread_cond_wait(&_consumerCond, &_mutex);
sleep(1);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_productorCond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
queue<T> _q;
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _consumerCond; // 消费者对应的条件变量,若空,则wait
pthread_cond_t _productorCond; // 生产者对应的条件变量,若满,则wait
};
然后就可以实现简单的单生产单消费以及多生产多消费的样例,main文件中的代码如下:
#include "blockQueue.hpp"
#include <time.h>
void *productor(void *args)
{
BlockQueue<int> *q = static_cast<BlockQueue<int>*>(args);
int count = 20;
while (count--)
{
int val = rand() % 5 + 1;
cout << "生产的数据:" << val << endl;
q->push(val);
}
return nullptr;
}
void *consumer(void* args)
{
BlockQueue<int> *q = static_cast<BlockQueue<int>*>(args);
while (true)
{
int val = 0;
q->pop(&val);
cout << "消费的数据:" << val << endl;
usleep(300);
if (q->isEmpty())
break;
}
return nullptr;
}
// 单生产单消费
int main()
{
srand((unsigned int)time(0)); //随机数种子
BlockQueue<int> *q = new BlockQueue<int>;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, q);
pthread_create(&p, nullptr, productor, q);
while (true)
{
sleep(1);
if (q->isEmpty())
break;
}
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete q;
return 0;
}
//多生产多消费
// int main()
// {
// srand((uint64_t)time(nullptr));
// // BlockQueue<int> *bq = new BlockQueue<int>();
// BlockQueue<int> *bq = new BlockQueue<int>();
// // 单生产和单消费 -> 多生产和多消费
// pthread_t c[2], p[3];
// pthread_create(&c[0], nullptr, consumer, bq);
// pthread_create(&c[1], nullptr, consumer, bq);
// pthread_create(&p[0], nullptr, productor, bq);
// pthread_create(&p[1], nullptr, productor, bq);
// pthread_create(&p[2], nullptr, productor, bq);
// pthread_join(c[0], nullptr);
// pthread_join(c[1], nullptr);
// pthread_join(p[0], nullptr);
// pthread_join(p[1], nullptr);
// pthread_join(p[2], nullptr);
// delete bq;
// return 0;
// }
运行结果如下:
基于BlockingQueue的生产者消费者模型是一种常见的多线程设计模式,它有以下几个优点:
- 简化编程:BlockingQueue提供了线程安全的入队和出队操作,无需自己实现同步和锁机制,降低了编程难度和出错风险。
- 提高性能:BlockingQueue支持阻塞和超时机制,可以根据队列的状态自动调整生产者和消费者的状态,避免了无效的等待和轮询,提高了系统的吞吐量和响应速度。
- 增强可扩展性:BlockingQueue可以作为有界队列或无界队列使用,可以根据实际需求调整队列的容量和策略,增加了系统的灵活性和可扩展性。
3. 基于环形队列的生产消费模型
环形队列是一种特殊的队列,它是在队列的基础上添加了一些限制条件,使得队列可以在固定大小的存储空间下进行循环使用。环形队列可以用数组实现,数组中的元素按照一定的顺序排列,并且当队列头或者队列尾指针到达数组的尾部时,会自动从数组的头部开始重新循环使用。环形队列的一个好处是,当队列满时,可以通过覆盖队列头部的元素来继续存储新的元素,这样可以使得队列在一定程度上具有循环使用的能力,节省存储空间。但是在使用环形队列时需要注意一些细节问题,比如队列空、队列满、队列大小等等。
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。但是现在有信号量这个计数器,就很简单的进行多线程间的同步过程,所以环形队列的生产消费者模型内部使用信号量来实现。
基于环形队列的生产消费模型的代码如下:
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
template <class T>
class RingQueue
{
public:
RingQueue(int num = N)
:_ring(num)
,_cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, _cap);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mutex,nullptr);
pthread_mutex_init(&_p_mutex,nullptr);
}
// 生产
void push(const T &in)
{
P(_space_sem);
Lock(_p_mutex);
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem);
}
// 消费
void pop(T &out)
{
P(_data_sem);
Lock(_c_mutex);
out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
private:
vector<T> _ring;
int _cap; // 环形队列容器大小
sem_t _data_sem; // 表示数据量的信号量,只有消费者关心
sem_t _space_sem; // 表示空间量的信号量,只有生产者关心
int _c_step; // 环形队列中消费的位置
int _p_step; // 环形队列中生产的位置
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
然后生产一些任务,这些任务不弄简单的随机数,而弄一些随机出来的加减乘除运算,所以再封装一个类,这个类可以做加减乘除运算,该类的代码如下:
#pragma once
#include <iostream>
using namespace std;
class Task
{
public:
Task()
{}
Task(const int x, const int y, const char op)
:_x(x)
,_y(y)
,_op(op)
,_exitCode(0)
{}
void operator()()
{
switch(_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
break;
case '%':
_result = _x % _y;
break;
default:
break;
}
}
string formatArg() // 输入的格式
{
return to_string(_x) + _op +to_string(_y) + '=';
}
string formatRes() // 输出的格式
{
return to_string(_result) + '(' + to_string(_exitCode) + ')';
}
~Task()
{}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
然后直接用多生产多消费进行验证,多生产多消费的main.cc文件代码如下:
#include "RingQueue.hpp"
#include "task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
using namespace std;
const char *ops = "+-*/%";
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->pop(t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
}
}
int main()
{
srand(time(nullptr));
RingQueue<Task> *rq = new RingQueue<Task>();
// 单生产单消费
// pthread_t c, p;
// pthread_create(&c, nullptr, consumerRoutine, rq);
// pthread_create(&p, nullptr, productorRoutine, rq);
// pthread_join(c, nullptr);
// pthread_join(p, nullptr);
pthread_t c[3], p[2];
for (int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, consumerRoutine, rq);
for (int i = 0; i < 2; i++)
pthread_create(p + i, nullptr, productorRoutine, rq);
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
delete rq;
return 0;
}
最后就会有如下形式的任务被生产者派发,然后由消费者处理问题。
基于环形队列的生产消费模型是一种常见的并发同步模式,它有以下优缺点:
优点:
- 解耦:生产者和消费者不直接交互,而是通过环形队列进行数据传递,降低了两者之间的耦合度。
- 支持并发:生产者和消费者可以同时访问环形队列的不同位置,提高了并发性能。
- 支持忙闲不均:当生产者和消费者的速度不匹配时,环形队列可以缓冲数据,避免数据丢失或阻塞。
缺点:
- 需要额外的空间:环形队列需要预先分配固定大小的空间,可能造成空间浪费或不足。
- 需要额外的同步机制:环形队列需要使用信号量或其他同步机制来控制生产者和消费者之间的协作,增加了编程复杂度。
- 可能出现饥饿或饱和:当环形队列满或空时,生产者或消费者可能会长时间等待,影响系统的响应性。
4. 线程池
Linux线程池是一种管理多个线程的技术,它可以提高程序的性能和资源利用率。Linux线程池的基本思想是:
- 预先创建一定数量的线程,放在一个池中,这些线程称为核心线程。
- 当有新的任务到来时,如果有空闲的核心线程,就分配给它执行;如果没有空闲的核心线程,就将任务放在一个任务队列中,等待有空闲的线程来执行。
- 如果任务队列也满了,就创建新的线程,超过核心线程数量的线程称为非核心线程。
- 如果非核心线程空闲时间超过一定的限制,就销毁这些线程,回收资源。
- 如果核心线程空闲时间超过一定的限制,并且设置了允许回收核心线程的标志,就销毁这些线程,回收资源。
Linux线程池的优点有:
- 降低创建和销毁线程的开销,提高响应速度。
- 控制并发的数量,避免过多的线程竞争,提高系统稳定性。
- 统一管理和调度线程,提高代码的可维护性。
Linux线程池的实现方法有:
- 使用POSIX标准提供的pthread库来创建和管理线程,使用互斥锁和条件变量来实现任务队列和同步机制。
- 使用C++标准库中的std::thread类来创建和管理线程,使用std::queue容器来实现任务队列,使用std::mutex和std::condition_variable来实现同步机制。
- 使用第三方库或框架来实现线程池,例如Boost.Asio、libevent、libuv等。
对于上述所描述的,可以使用互斥锁和条件变量来实现任务队列和同步机制,实现简单的线程池,代码如下:文章来源:https://www.toymoban.com/news/detail-723417.html
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <unistd.h>
#include "task.hpp"
using namespace std;
const static int N = 5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N)
:_num(num)
,_threads(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void lockQueue()
{
pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void threadWait()
{
pthread_cond_wait(&_cond, &_mutex);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
void pushTask(const T &t)
{
lockQueue();
_tasks.push(t);
threadWakeup(); // 插入任务后唤醒线程处理任务
unlockQueue();
}
static void *threadRoutine(void *args) // 对于类的内部成员函数,会有默认的this指针,所以可以将这个函数定义在类的外部,
// 或者定义静态成员函数,但静态成员函数不能直接访问类的内部成员。
{
pthread_detach(pthread_self()); // 线程分离,这样子线程就可以自己释放自己的资源
ThreadPool<T> *tp = static_cast<ThreadPool<T>*>(args);
// 对于tp指针来说,他不能访问私有成员,所以可以用一些函数去访问类的私有成员,或者将私有成员暴露出来,属性设置为public
while (true)
{
// 检测有没有任务
tp->lockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
T t = tp->popTask(); // 拿出队列中的任务
tp->unlockQueue();
//test:放入一些数据
t(); // task任务内部是用该仿函数来处理任务的
cout << "thread handler done, result: " << t.formatRes() << std::endl;
}
}
void start()
{
for (int i = 0; i < _num; ++i)
{
pthread_create(&_threads[i], nullptr, threadRoutine, this);
}
}
~ThreadPool()
{
pthread_cond_destroy(&_cond);
pthread_mutex_destroy(&_mutex);
}
private:
vector<pthread_t> _threads;
int _num;
queue<T> _tasks;
pthread_mutex_t _mutex; // 使用互斥锁和条件变量来实现任务队列和同步机制
pthread_cond_t _cond;
};
#include <memory>
int main()
{
unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->start();
while (true)
{
int x, y;
char op;
cout << "please Enter x> ";
cin >> x;
cout << "please Enter y> ";
cin >> y;
cout << "please Enter op(+-*/%)> ";
cin >> op;
Task t(x, y, op);
tp->pushTask(t);
usleep(500);
}
return 0;
}
当有任务时,该线程池会处理任务,没有任务是则会等待。
文章来源地址https://www.toymoban.com/news/detail-723417.html
到了这里,关于Linux线程同步实例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!