gitee仓库:
1.阻塞队列代码:https://gitee.com/WangZihao64/linux/tree/master/BlockQueue
2.环形队列代码:https://gitee.com/WangZihao64/linux/tree/master/ringqueue
条件变量
概念
概念: 利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使“条件成立”(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。
同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而避免饥饿问题,叫做同步
竞态条件: 因为时序问题,而导致程序异常,我们称之为竞态条件,旨在描述一个系统或者进程的输出依赖于不受控制的事件出现顺序或者出现时机。
为什么存在线程同步?
线程同步使得每个线程都能够访问临界资源,多个线程协同高效完成某些任务。
条件变量如何与互斥锁结合使用?
条件变量是包含一个等待队列的。多个线程可以去竞争一把锁,没有得到锁资源的线程会在锁上继续挂起等待,当拥有锁的线程条件变量满足时,会先释放锁资源,然后进入到条件变量的等待队列去等待(等待其他线程唤醒),这样其他线程就可以获得锁资源**,如果此时唤醒的条件变量满足,该线程可以去唤醒等待队列中的第一个线程,自己释放锁资源,然后让第一个线程重新拥有锁资源**,依次如此,多个线程就是顺序地执行工作。这样就可以实现线程同步的操作
条件变量的接口
条件变量是一个类型为pthread_cond_t
的条件变量,通过定义变量的方式来定义一个条件变量
条件变量的初始化(和锁类似)
- 使用字段
PTHREAD_COND_INITIALIZER
进行初始化,全局或者static不需要初始化和销毁 - pthread_cond_init
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
参数:
restrict cond:要初始化的条件变量
restrict attr:不关心,置空
pthread_cond_destroy——条件变量的销毁
int pthread_cond_destroy(pthread_cond_t *cond);
参数:
restrict cond:要销毁的条件变量
pthread_cond_wait——等待条件变量满足
pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数:
restrict cond:在这个条件条件变量下等待
restrict mutex:互斥量
为什么pthread_cond_wait需要互斥量?
条件变量是实现线程同步的一种手段,如果一个线程进入等待队列还不释放锁资源,这样其他线程也不能够得到锁资源,这样唤醒线程的条件变量永远不可能满足,那么这个线程也将一直等待下去。所以一个线程进入等待队列需要释放自己手中的锁资源来实现真正地同步,进入等待区会释放锁,如果被唤醒,又会加锁
唤醒条件变量满足:
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
参数:
cond:第一个函数是唤醒在这个条件变量的等待队列中的所有线程;第二个条件变量是唤醒在这个条件变量的等待队列中的第一个线程
实验:主线程唤醒新线程
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <iostream>
using namespace std;
//创建一个全局的锁
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//创建一个全局的条件变量
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
int tickets=100;
void* start_routine(void* args)
{
long long id=(long long)args;
while(1)
{
pthread_mutex_lock(&lock); //加锁
pthread_cond_wait(&cond,&lock); //进入等待队列,同时释放锁!
cout << "id is" << id << " ticket left" << tickets << endl;
tickets --;
pthread_mutex_unlock(&lock);
}
}
int main()
{
#define NUM 5
pthread_t t[NUM];
//创建多线程
for(int i=0;i<NUM;++i)
{
pthread_create(&t[i], NULL, start_routine, (void*)i+1);
}
while(1)
{
sleep(1);
//唤醒线程
pthread_cond_signal(&cond);
cout << "main thread wakeup a thread" << endl;
}
//线程等待
for(int i=0;i<NUM;++i)
{
pthread_join(t[i],nullptr);
}
}
运行结果如下:
按照排队的次序去执行
生产者消费者模型
概念: 生产者消费者模式就是通过一个容器(缓冲区)来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过一个容器(缓冲区)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点:
- 解耦:生产者和消费者是通过一个共享数据区域来进行通信。而不是直接进行通信,这样两个角色之间的依耐性就降低了(代码层面实现解耦),变成了角色与共享数据区域之间的弱耦合,一个逻辑出错不影响两一个逻辑,二者变得更独立。
- 支持并发:生产者负责生产数据,消费者负责拿数据。生产者生产完数据可以继续生产,大部分时间内是不需要等待消费者消费数据才继续生产。也就是说,在任一时刻,二者都是在正常处理任务的,进度都得以推进。
- 支持忙闲下不均:生产者生产了数据是放进容器中,消费者不必立即消费,可以慢慢地从容器中取数据。容器快要空了,消费者的消费速度就可以降下来,让生产者继续生产。
生产消费模型特征(简记321):
- 3种关系: 生产者与生产者(互斥)、生产者与消费者(同步(主要)和互斥)和消费者与消费者(互斥)
- 两个角色: 生产者和消费者
- 一个交易场所: 容器、共享资源等
**互斥关系:**指进程之间因相互竞争使用独占型资源(互斥资源)所产生的制约关系。
**同步关系:**指进程之间为协同工作需要交换信息、相互等待而产生的制约关系。本题中两个进程之间的制约关系是同步关系,进程B必须在进程A将数据放入缓冲区后才能从缓冲区中读出数据。此外,共享的缓冲区一定是互斥访问的,所以它们也具有互斥关系。
基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
阻塞队列的特点:
- 队列: 使用STL中的queue来实现
- 容量: 阻塞队列的容量,由用户给定,我们也可以提供一个默认的容量
- 互斥量: 为了实现生产者和消费者的同步,我们需要使用条件变量和互斥量来实现同步的操作
- 生产者唤醒和等待的条件变量: 当队列满了,生产者等待条件满足,应该挂起等待,等待消费者唤醒
- 消费者唤醒和等待的条件变量: 当队列为空,消费者等待条件满足,应该挂起等待,等待生产者唤醒
BlockQueue.hpp
对阻塞队列的一些基本操作进行了封装,有以下几个处理动作(可以设置为私有方法):
- 判断队列为空或为满
- 唤醒生产者和唤醒消费者
- 生产者挂起等待和消费者挂起等待
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
const int gmaxcap = 5;
template<class T>
class BlockQueue
{
public:
//构造函数
BlockQueue(const int &maxcap=gmaxcap)
:_maxcap(maxcap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_pcond,nullptr);
pthread_cond_init(&_ccond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
void push(const T& in) //输入型参数 const &
{
//保证阻塞队列的安全 1.加锁
pthread_mutex_lock(&_mutex);
//2.判断
//【细节】:充当条件判断的语法必须是while,不能用if
// 如果消费者只有一个,生产者有10个
// 消费者使用broadcast,同时唤醒,但是我们只缺少一个数据
//所以必须使用while判断,否则会出错
//并且只要是函数调用就可能失败
while(is_full())
{
//【细节】第二个参数,必须是我们正在使用的互斥锁
//该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起
//该函数被唤醒返回的时候,会自动的重新获取你传入的锁
pthread_cond_wait(&_pcond,&_mutex);
}
//走到这里一定没有满
_q.push(in);
//阻塞队列里面一定有数据
//这时我们可以唤醒我们的消费者进行消费
//唤醒线程可以自己设置策略
//【细节这个函数可以放在临界区内部,也可以放在外部】
pthread_cond_signal(&_ccond);
//3.解锁
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_ccond);
}
void pop(T* out) //输出型参数
{
//保证阻塞队列的安全
pthread_mutex_lock(&_mutex);
while(is_empty())
{
pthread_cond_wait(&_ccond,&_mutex);
}
//走到这里一定不为空
*out=_q.front();
_q.pop();
//绝对能保证,阻塞队列里面,至少有一个空的位置
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
private:
bool is_empty()
{
return _q.empty();
}
bool is_full()
{
return _q.size()==_maxcap;
}
queue<T> _q;
int _maxcap; //队列中元素的上限
pthread_mutex_t _mutex; //锁
pthread_cond_t _pcond; //生产者对应的条件变量
pthread_cond_t _ccond; //消费者对应的条件变量
};
test.cpp
#include "BlockQueue.hpp"
#include <time.h>
#include <sys/types.h>
#include <unistd.h>
void* consumer(void* bq_)
{
BlockQueue<int>* bq=(BlockQueue<int>*)bq_;
while(1)
{
//消费活动
int data;
bq->pop(&data);
cout << "消费数据:" << data << endl;
sleep(1);
}
return nullptr;
}
void* productor(void* bq_)
{
BlockQueue<int>* bq=(BlockQueue<int>*)bq_;
while(1)
{
int data=rand()%10+1;
bq->push(data);
cout << "生产数据" << data << endl;
// sleep(1);
}
return nullptr;
}
int main()
{
#define NUM 5
srand((unsigned long)time(nullptr) ^ getpid());
pthread_t c,p;
BlockQueue<int>* bq=new BlockQueue<int>();
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&p,nullptr,productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
运行结果如下:
这个是一个简单的小程序,那么我们来稍微做复杂一点的尝试一下,我们可以给队列中派发任务,而这个任务不再是简单的输出数字,任务是计算加减法,并且我们多一个保存的任务,所以原本的消费者又要多一个职位——生产者
task.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
class CalTask
{
using func_t=std::function<int(int,int,char)>;
// typedef std::function<int(int,int,char)> func_t;
public:
CalTask()
{}
CalTask(int x,int y,char op,func_t func)
:_x(x)
,_y(y)
,_op(op)
,_func(func)
{}
std::string operator()()
{
int res=_func(_x,_y,_op);
char buffer[64];
snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,res);
return buffer;
}
std::string toTaskstring()
{
char buffer[64];
snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _func;
};
class SaveTask
{
using func_t=std::function<void(const std::string&)>;
public:
SaveTask()
{}
SaveTask(std::string& message,func_t func)
:_message(message)
,_func(func)
{}
void operator()()
{
_func(_message);
}
private:
std::string _message;
func_t _func;
};
void save(const std::string& message)
{
const std::string target="log.txt";
FILE* fp=fopen(target.c_str(),"a+");
if(fp==nullptr)
{
perror("fopen");
return ;
}
fputs(message.c_str(),fp);
fputs("\n",fp);
fclose(fp);
}
const std::string oper = "+-*/%";
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
// do nothing
break;
}
return result;
}
BlockQueue.hpp和之前一样
test.cpp
#include "BlockQueue.hpp"
#include <time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include "task.hpp"
//中间的既是生产者,也是消费者
// c:计算
// s:存储
template<class c,class s>
class BlockQueues
{
public:
BlockQueue<c>* c_bq;
BlockQueue<s>* s_bq;
};
void* consumer(void* bqs_)
{
//这里的强制转换不要写错!
BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->c_bq;
BlockQueue<SaveTask>* bq_s=(static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;
while(1)
{
//消费活动
CalTask cal;
bq->pop(&cal);
string res=cal();
cout << "计算完成:" << res << endl;
//生产活动
SaveTask sv(res,save);
bq_s->push(sv);
// sleep(1);
}
return nullptr;
}
void* productor(void* bqs_)
{
BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->c_bq;
while(1)
{
int x=rand()%100;
int y=rand()%10;
int operCode=rand()%oper.size();
CalTask cal(x,y,oper[operCode],mymath);
bq->push(cal);
cout << "生产计算" << cal.toTaskstring() << endl;
sleep(1);
}
return nullptr;
}
//这也是消费者,他把阻塞队列中的任务进行消费,不过是消费到文件上
void* saver(void* bqs_)
{
BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;
while(1)
{
//处理存储的任务
SaveTask save;
save_bq->pop(&save);
save();
cout << "存储成功" << endl;
}
return nullptr;
}
int main()
{
#define NUM 5
srand((unsigned long)time(nullptr) ^ getpid());
pthread_t c,p,s;
// BlockQueue<CalTask>* bq=new BlockQueue<CalTask>();
BlockQueues<CalTask,SaveTask> bqs;
bqs.c_bq=new BlockQueue<CalTask>();
bqs.s_bq=new BlockQueue<SaveTask>();
pthread_create(&c,nullptr,consumer,&bqs);
pthread_create(&p,nullptr,productor,&bqs);
pthread_create(&s,nullptr,saver,&bqs);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
pthread_join(s,nullptr);
delete bqs.c_bq;
delete bqs.s_bq;
return 0;
}
多生产者和多消费者
- 生产者之间需要互斥,也就是生产者和生产者之间需要组内竞争一把锁,消费者也是如此
- 生产者和消费者之间用互斥量和条件变量做到同步和互斥
疑问:多生产多消费者高效在哪里?
虽然在阻塞队列中是串行执行,但是在生产之前和消费之后是并行执行!所以高效并不是在阻塞队列中
信号量
在介绍信号量之前,我们之前代码不足的地方在哪里?
我们每一次操作都要先加锁,再检测,再操作,再解锁,在没有访问之前我们无法得知,所以只能先加锁
**POSIX信号量:**该信号量允许进程和线程同步对共享资源的访问。同时也可以用于实现线程间同步。
疑问:
- 是什么? 信号量本质是一个计数器,描述临界资源的有效个数。申请一个资源就对信号量减1(P操作,必须保证操作的原子性),释放一个资源就对信号量加1(V操作,必须保证操作的原子性)
- 为什么? 临界资源可以看成很多份,互相不冲突且高效
- 怎么用? 可以使用信号量的相关接口,来申请信号量和释放信号量(下面详细介绍)
申请信号量的本质:对临界资源中特定小块资源的预定机制
接口介绍
POSIX信号量相关接口都是在semaphore.h
的头文件中。信号量是一个类型为sem_t
的变量
1.sem_init——初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
- sem:信号量
- pshared:0表示线程间共享,非零表示进程间共享
- value:信号量初始值
返回值:
成功返回0,失败返回-1
2.sem_destroy——销毁信号量
int sem_destroy(sem_t *sem);
参数:
sem:信号量
返回值:
成功返回0,失败返回-1
3.sem_wait——等待信号量
int sem_wait(sem_t *sem);
功能:
等待信号量,会将信号量的值减1
参数:
sem:信号量
返回值:
成功返回0,失败返回-1
4.sem_post——发布信号量
int sem_post(sem_t *sem);
功能:
发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
参数:
sem:信号量
返回值:
成功返回0,失败返回-1
基于环形队列的生产消费模型
环形队列介绍
**环形队列:**环形队列和普通队列的区别就是,这种队列是一种环形的结构(是数组抽象出来的),有一个头指针和一个尾指针维护环中一小段队列。(如下图)
我们可以使用环形队列去模拟生产者消费者模型,如果不为空或不为满的时候,生产者和消费者可以并发执行,但如果为空的时候应该让生产者先执行,如果为满的时候应该让消费者先执行
实现
概述
一个交易场所: 循环队列
两个角色:
- 生产者:需要申请空间资源(P操作),然后释放数据资源(V操作)
- 消费者:需要申请数据资源(P操作),然后释放空间资源(V操作)
三种关系: 生产者与生产者(互斥)、生产者与消费者(同步(主要)和互斥)和消费者与消费者(互斥)
几个变量成员:
- 队列:数组模拟
- 容量:由用户给定
- 空间资源信号量:队列的容量大小
- 数据资源信号量:开始为0
- 生产者的下标位置:开始为0
- 消费者的下标位置:开始为0
以下是基于多生产者多消费者的循环队列
代码如下:
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
using namespace std;
const int gcap=5;
// static const int gcap = 5;
template<class T>
class ringqueue
{
public:
//构造函数
ringqueue(const int &cap = gcap)
:_queue(cap)
,_cap(cap)
,_productorStep(0)
,_consumerStep(0)
{
// _queue.resize(cap);
sem_init(&_spaceSem,0,cap);
sem_init(&_dataSem,0,0);
}
//PV操作
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
//生产者
//多生产者多消费者需要加锁
//多生产者只有一个可以进入环形队列
//多消费者也只有一个可以进入环形队列
//普通情况下【进入环形队列的消费者和生产者】可以并发执行
//不过环形队列空时,只有生产者可以进入
//环形队列满的时候,只有消费者可以进入
//生产者与生产者互斥关系,消费者与生产者互斥与同步关系,消费者与消费者互斥关系
void push(const T& in)
{
P(_spaceSem); // 申请到了空间信号量,意味着,我一定能进行正常的生产
//多消费者多生产者需要对环形队列资源进行加锁
pthread_mutex_lock(&p_mutex);
_queue[_productorStep++]=in;
_productorStep%=_cap;
pthread_mutex_unlock(&p_mutex);
V(_dataSem);
}
//消费者
void pop(T* out)
{
P(_dataSem);
pthread_mutex_lock(&c_mutex);
*out=_queue[_consumerStep++];
_consumerStep%=_cap;
pthread_mutex_unlock(&c_mutex);
V(_spaceSem);
}
//析构函数
~ringqueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_dataSem);
}
private:
vector<T> _queue; //vector模拟环形队列
int _cap; //容量
sem_t _spaceSem; //生产者要生产,要的是空间资源
sem_t _dataSem; //消费者要消费,要的是数据资源
int _productorStep; //生产者和消费者的下标
int _consumerStep;
pthread_mutex_t c_mutex; //消费者的锁
pthread_mutex_t p_mutex; //生产者的锁
};
注意:这里不需要判断队列是否满了,因为有信号量作计数器,空间信号量资源为0,生产者如果继续申请就会挂起等待。所以,队列中满了这个状态我们不必关心了,有信号量在其中作用文章来源:https://www.toymoban.com/news/detail-437135.html
主函数代码和之前一致文章来源地址https://www.toymoban.com/news/detail-437135.html
到了这里,关于线程同步、生产者消费模型和POSIX信号量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!