🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
👉POSIX信号量👈
POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步。信号量分为二元信号量和多远信号量。二元信号量(Binary Semaphore),即:计数器维护的 value 只有 0 和 1 着两种可能,以此来实现互斥,所以也称为互斥信号量(互斥锁)。1 表示可以访问资源,0 表示不可以访问资源。多元信号量是 value 大于 2 的信号量,以原子的方式对该值进行加加和减减操作。
深入理解信号量
初始化信号量
- sem:是 sem_t 类型变量的地址,也就是信号量。
- pshared:0 表示线程间共享,非零表示进程间共享。
- value:信号量本质是一个计数器,value 就是信号量的初始值。
注:信号量本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
销毁信号量
等待信号量
- P 操作:等待信号量,会将信号量的值减 1。
发布信号量
- V 操作:发布信号量,表示资源使用完毕,可以归还资源了并将信号量值加 1。
注:P 操作和 V 操作都是原子操作。
👉基于环形队列的生产者消费者模型👈
空间资源和数据资源
- 生产者关注的是空间资源,只有当环形队列中有空间,生产者才能进行生产。消费者关注的是数据资源,只有当环形队列中有数据,消费者才能进行消费。
- 通过信号量来描述环形队列中的空间资源(space_sem)和数据资源(data_sem)。space_sem 的初始值是为环形队列的容量上限,因为刚开始时环形队列当中全是空间。data_sem 的初始值是 0,因为刚开始环形队列中没有数据。
申请和释放资源
生产者申请空间资源,释放数据资源。
对于生产者来说,生产者每次生产数据前都需要申请 space_sem。
- 如果 space_sem 不为 0,则申请信号量成功,生产者可以进行生产。
- 如果 space_sem 为 0,则申请信号量失败,生产者需要在 space_sem 的等待队列中进行阻塞等待,直到环形队列中有新的空间资源,才能被唤醒。
当生产者生产数据后,应该释放 data_sem。
- 虽然生产者在进行生产前是对 space_sem 进行的 P 操作,但是生产结束后,并不是对 space_sem 进行 V 操作,而是对 data_sem 进行 V 操作。
- 当生产者生产完数据后,环形队列中就会多了一个数据资源,因此我们要对 data_sem 进行 V 操作。
消费者申请数据资源,释放空间资源。
对于消费者来说,消费者每次消费数据前都要申请 data_sem。
- 如果 data_sem 不为 0,则申请信号量成功,消费者可以进行消费。
- 如果 data_sem 为 0,则申请信号量失败,消费者需要在 data_sem 的等待队列中进行阻塞等待,直到环形队列中有新的数据资源,才能被唤醒。
当消费者消费完数据后,需要释放 space_sem。
- 虽然消费者在进行消费前是对 data_sem 进行的 P 操作,但是消费结束后,并不是对 data_sem 进行 V 操作,而是对 space_sem 进行 V 操作。
- 当消费者消费完数据后,环形队列中就会多了一个空间资源,因此我们要对 space_sem 进行 V 操作。
两个规则
在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守两个规则。
第一个规则:生产者和消费者不能对同一个位置进行访问。
生产者和消费者对环形队列进行访问时
- 如果生产者和消费者访问的是环形队列中的同一个位置,那么就相当于生产者和消费者同时对一块临界资源进行访问,这样将可能会出现数据不一致问题。
- 如果生产者和消费者访问的是环形队列中的不同位置,那么生产者和消费就可以并发地进行生产数据和消费数据,并不会出现数据不一致的问题。
如果保证生产者和消费者不会对环形队列的同一个位置进行访问?
- 当生产者和消费者将要对环形队列的同一个位置进行访问时,此时的环形队列要么为满,要么为空。
- 当环形队列为满时,space_sem 为 0,生产者不能对环形队列进行访问,需要在 space_sem 的等待队列中进行阻塞等待。
- 当环形队列为空时,data_sem 为 0,消费者不能对环形队列进行访问,需要在 data_sem 的等待队列中进行等待。
- 通过信号量就保证了当生产者和消费者指向环形队列的同一个位置时,生产和消费的串行化过程。同时也保证了当生产者和消费者执行的不是同一个位置时,生产者和消费者可以并发地进行生产和消费,以提高效率。
第二规则:生产者不能将消费者套圈,消费者不能超过生产者。
- 如果生产者将消费者套圈了,那么就会出现这样的情况:消费者还没有将生产者之前生产的数据消费掉,该数据就被覆盖掉了,这很显然是不允许的。所以当生产者生产了一圈后,再次遇到消费者时,生产者就不能再进行生产了,需要等消费者消费数据后,才能进行生产。
- 如果消费者超过了生产者,那么就会出现这样的情况:消费者会将之前已经消费过的废弃数据再消费一次,这也是不允许的。所以当消费者消费一圈后,再次遇到生产者,消费者就不能再进行消费了,需要等生产者生产数据后,才能进行消费。
- 很明显,第二个规则也是通过信号量来保证的。
代码实现
Sem 是对信号量的封装,其构造函数是对信号量进行初始化,析构函数是对信号量进行销毁,同时也将等待信号量和发布信号量分别封装成 P 操作和 V 操作。
// sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_
#include <semaphore.h>
class Sem
{
public:
Sem(int value)
{
sem_init(&_sem, 0, value);
}
~Sem()
{
sem_destroy(&_sem);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
#endif
环形队列 RingQueue 是生产者和消费者模型当中的交易场所,我们可以通过 STL 中的 vector 来模拟实现。
// ringQueue.hpp
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
const int DefaultCapacity = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = DefaultCapacity)
: _rq(cap)
, _cap(cap)
, _p_pos(0)
, _c_pos(0)
, _space_sem(cap)
, _data_sem(0)
{}
~RingQueue()
{}
// 生产数据(生产者调用)
void Push(const T& in)
{
_space_sem.P();
_rq[_p_pos++] = in;
_p_pos %= _cap;
_data_sem.V();
}
// 消费数据(消费者调用)
void Pop(T& out)
{
_data_sem.P();
out = _rq[_c_pos++];
_c_pos %= _cap;
_space_sem.V();
}
private:
std::vector<T> _rq;
int _cap;
int _p_pos; // 生产位置
int _c_pos; // 消费位置
Sem _space_sem; // 空间资源
Sem _data_sem; // 数据资源
};
#endif
相关说明:
- 当不指定环形队列的大小时,将会使用缺省值 DefaultCapacity 作为环形队列的容量上限。
- 生产者每次生产时,会将生产的数据放在下标为 _p_pos 的位置上;消费者每次消费时,会取出下标为 _c_pos 的位置上的数据进行消费。
- 生产者和消费者生产和消费结束后,需要对 _p_pos 和 _c_pos 进行加加操作,以标记下一次放入数据的位置和取数据的位置,最好还需要对下标进行模除操作,以达到环形效果。
- _p_pos 只会由生产者线程更新,_c_pos 只会由消费者线程更新,所以对它们的访问不需要进行保护,因此它们的更新操作可以放在 V 操作之后。
单生产者单消费者
// Test.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ringQueue.hpp"
void* Consumer(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*)(args);
while(true)
{
int data;
rq->Pop(data);
std::cout << "Comsumer: " << data << std::endl;
}
return nullptr;
}
void* Productor(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*)(args);
while(true)
{
sleep(1);
int data = rand() % 100 + 1;
rq->Push(data);
std::cout << "Productor: " << data << std::endl;
}
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr));
RingQueue<int>* rq = new RingQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, (void*)rq);
pthread_create(&p, nullptr, Productor, (void*)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
}
多生产者多消费者
有了多个生产者和多消费者,就会存在生产者和生产者之间的竞争关系、消费者和消费者之间的竞争关系,那么我们就需要对临界资源(下标)进行加锁保护。
// RingQueue.hpp
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
const int DefaultCapacity = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = DefaultCapacity)
: _rq(cap)
, _cap(cap)
, _p_pos(0)
, _c_pos(0)
, _space_sem(cap)
, _data_sem(0)
{
pthread_mutex_init(&_plock, nullptr);
pthread_mutex_init(&_clock, nullptr);
}
~RingQueue()
{
pthread_mutex_destroy(&_plock);
pthread_mutex_destroy(&_clock);
}
// 生产数据(生产者调用)
void Push(const T& in)
{
// 先申请信号量:先将资源分发给线程
_space_sem.P();
pthread_mutex_lock(&_plock);
// 一定是竞争成功的生产者线程 -- 就一个!
_rq[_p_pos++] = in;
_p_pos %= _cap;
pthread_mutex_unlock(&_plock);
_data_sem.V();
}
// 消费数据(消费者调用)
void Pop(T& out)
{
_data_sem.P();
pthread_mutex_lock(&_clock);
// 一定是竞争成功的消费者线程 -- 就一个!
out = _rq[_c_pos++];
_c_pos %= _cap;
pthread_mutex_unlock(&_clock);
_space_sem.V();
}
private:
std::vector<T> _rq;
int _cap;
int _p_pos; // 生产位置
int _c_pos; // 消费位置
Sem _space_sem; // 空间资源
Sem _data_sem; // 数据资源
pthread_mutex_t _plock;
pthread_mutex_t _clock;
};
#endif
注意:_p_pos 和 c_pos 的更新需要再加锁和解锁之间。如果它们的更新不在加锁和解锁之间,将可能会出现这样的情况:线程 A 释放了锁彬没来得及将下标进行更新,然后线程 B 就获得了锁并执行到更新下标的地方,这样就有可能会出现数据不一致的问题!
// Test.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ringQueue.hpp"
void* Consumer(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*)(args);
while(true)
{
sleep(1);
int data;
rq->Pop(data);
std::cout << "Comsumer: " << data << " [" << pthread_self() << "]" << std::endl;
}
return nullptr;
}
void* Productor(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*)(args);
while(true)
{
int data = rand() % 100 + 1;
rq->Push(data);
std::cout << "Productor: " << data << " [" << pthread_self() << "]" << std::endl;
}
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr));
RingQueue<int>* rq = new RingQueue<int>();
pthread_t c[3], p[2];
pthread_create(c, nullptr, Consumer, (void*)rq);
pthread_create(c+1, nullptr, Consumer, (void*)rq);
pthread_create(c+2, nullptr, Consumer, (void*)rq);
pthread_create(p, nullptr, Productor, (void*)rq);
pthread_create(p+1, nullptr, Productor, (void*)rq);
for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
delete rq;
return 0;
}
多生产多消费的意义
生产的本质是将私有的任务或数据放入到公共空间中,消费的本质是将公共空间中的任务或数据变成私有。生产和消费并不是简单地将任务或数据放入到交易场所或从交易场所中取出,还需要考虑数据或任务放入到交易场所前和拿到任务或数据之后的处理,这两个过程是最耗费时间的。所以,多生产多消费的意义就在于能够并发地生产和处理任务。
信号量的意义
信号量本质是一个计数器,那这个计数器的意义是什么呢?计数器的意义就是不用进入临界区,就可以得知临界资源的情况,甚至可以减少临界区内部的判断!而在基于阻塞队列的生产者和消费者模型中,需要申请锁,然后进行临界资源是否满足的判断再进行访问,最后释放锁,需要进行判断的原因就是我们并不清楚临界资源的情况!而信号量要提前预设资源的情况,在进行 PV 操作的过程中,我们在临界区外部就能得知临界资源的情况。文章来源:https://www.toymoban.com/news/detail-400673.html
👉总结👈
本篇博客主要讲解了信号量、信号量的相关函数以及基于环形队列的生产者消费者模型等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️文章来源地址https://www.toymoban.com/news/detail-400673.html
到了这里,关于【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!