基于无锁循环队列的线程池的实现

这篇具有很好参考价值的文章主要介绍了基于无锁循环队列的线程池的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

出处:B站码出名企路

应用场景 

设计实现

等待策略模块

晚绑定

C++ 中的 override关键字

C++中的 default 关键字

C++中的 delete 关键字

C++中的 explicit 关键字

C++中 using 别名技巧

sleep 和 yield的区别

noexcept关键字

volatile关键字

无锁循环队列的设计实现

原子性

内存序


出处:B站码出名企路

个人笔记:因为是跟着b站的教学视频以及文档初步学习,可能存在诸多的理解有误,对大家仅供借鉴,参考,然后是B站up阳哥的视频,我是跟着他学。大家有兴趣的可以到b站搜索。加油,一起学习。我的问题,大家如果看见,希望可以提出指正,谢谢大家。

应用场景 

从理论角度分析?频繁创建销毁线程场景下,利用线程池复用线程,避免CPU浪费在大量线程的创建,销毁操作上。提高性能,充分利用系统资源。

3. 线程池的具体使用场景有哪些?
需要开线程的地方都可以用。耗时任务的单独处理。它的应用场景和多线程的应用场景是有重合的。
eg:
写日志 
curd
计算

设计实现

等待策略模块

  • 第一点,为啥需要封装不同的等待策略?
  • 第二点,有哪些等待策略?
  • 第三点,如何实现?

对于第一个问题,是很容易根据常识得出结论的,就是具体问题,具体分析。具体场景,具体应对。在多线程编程的环境下,不同的应用场景,要求就需要应用不同的等待机制。每种等待策略,都有着它独特之处,有着它的特点。在对应场景下采用合适的等待策略,可以提高响应性和性能。不同的等待策略权衡了性能、资源利用和响应性。

具体有哪些等待策略,忙等待,休眠等待,条件等待,自旋等待,超时等待。

  1. 忙等待:在循环中不停的检查是否满足条件,一旦满足就退出循环。忙等待对于CPU的占据是恐怖的,是持续占用大量的CPU时间。所以它适合做那种需要迅速响应,短暂等待时间的情况。不然占着CPU不做事,浪费了系统资源。
  2. 休眠等待:相当于线程等待挂起,休眠会主动释放掉CPU资源。然后等到定时器触发再唤醒。对于等待时间比较长的时候应该更适合。然后吧。毕竟存在一个线程切换。上下文的保存和恢复需要开销。
  3. 条件等待:利用条件变量wait操作,等到条件满足再唤醒线程。这样看我感觉和休眠等待有所相似之处,比如两者都会释放CPU,挂起等待,也自然都有线程切换的代价。但是不同的是,两者的唤醒机制不一样。相对来说条件触发更灵活,比定时触发可操控性更强。
  4. 超时等待:给条件等待设置一个最长等待时间,超出等待时间做另外处理。可结合其他等待方式。
  5. 自旋等待:自旋等待也是不会主动释放 CPU,持续检查某个条件是否满足。对于等待时间短、期望低延迟的情况比较适用。等久了就会浪费资源。

补充1:主动释放CPU的等待策略共性:都必须线程切换,线程切换也是有代价的,比如每个线程都有上下文。切换时需要保存恢复上下文。所以需要大量切换上下文的场景下,这种方式利用资源的效率反而可能没那么高效。

补充2:占据CPU等待策略共性:忙等和自旋等待,这两货用起来要慎重。因为需要大量占用CPU时间,用的不好就会造成系统资源CPU浪费。利用不充分。

实际往往都是条件变量+锁进行条件等待,我是没太用过自旋锁。

提出一个疑问?自旋锁有哪些使用场景?等我学会了,一定回来回答它。

实现:要达到易扩展,可选择,选择权交给客户端。根据不同的场景选择不同的等待策略。不同的策略之间要解耦合,要达到解除耦合,就要避免大量的if else 判断语句。将选择权交给客户。

策略类基类设计:基类必须要是稳定的,固有的,不变的。扩展时候不能改变基类。这才是良好的设计。所以需要对未来有一定的预测。根据对功能的理解,将固定,共性的功能函数直接放到基类。将不确定,可变(可扩展)的功能函数定义为虚函数(接口函数),晚绑定。

具体策略类设计:继承基类,然后对于接口虚函数进行重写,具体策略具体实现即可。

积累:

虚函数,我们可以理解为接口,它稳定,但天生具有晚绑定,可重写扩展。

此处还想要跟大家讨论一个问题?为何需要虚函数,需要重写?它的出现源于什么?可以从生活角度入手,从各个方面做出回答。

虚:不定,可覆盖,支持变化。任何一类事物中都可能存在特例。凡物哪怕一类,也既有共性,自然也有差异性。辩证统一的看待同一性和差异性。

比如说:车子,有烧汽油的,也有靠电池的。它们都同属于车子这一大类。但是两者工作原理等等存在差异性。可能具有同样的功能接口,但接口的具体实现各不相同。所以需要虚。这就是多态。同一类事物,完成同样的接口功能,所产生的结果,所用的方法都不尽相同。

晚绑定

晚绑定(Late Binding):

  • 概念: 晚绑定是指在程序运行时,才确定调用哪个函数、类或方法。这通常与多态性(Polymorphism)有关,其中具体的实现在运行时动态选择,而不是在编译时静态确定。

  • 例子: 虚函数的实现就是一种晚绑定的例子。在基类定义一个虚函数,在派生类中提供具体实现,而在运行时系统动态选择正确的实现。

看代码吧:看完代码,再在代码中一起感受具体的实现。我觉得设计模式,理解的基础之上不断的通过代码感受它的那种美妙,那种变化的可控性。变化的尽在手掌,将变化关进笼子里面去,不让它乱跑,造成混乱。

//无锁线程池
/**
 * module one: 等待策略的封装, 扩展
 * 4种
*/


class WaitStrategy { //等待策略的基类封装
public:
    virtual void NotifyOne() {}    //cv.notifyone
    virtual void BreakAllWait() {} //cv.notifyall
    virtual bool EmptyWait() = 0;  //cv.wait的定制实现。
    virtual ~WaitStrategy() {}     
};



/**
 * 阻塞等待策略. 这是平常我们用的最多的, 但不一定是最好的
 * override关键字: specify override.
*/
class BlockWaitStrategy: public WaitStrategy{
public:
    BlockWaitStrategy() = default;
    void NotifyOne() override { 
        m_cv.notify_one();
    }
    
    void BreakAllWait() override { 
        m_cv.notify_all();
    }

    bool EmptyWait() override {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock);//等锁
        return true;
    }

private:
    std::mutex m_mutex;
    std::condition_variable m_cv;
};

/**
 * sleep 等待策略. 
 * 存粹的进入sleep休眠状态. 
 * explicit 禁止隐式类型转换
 * using 技巧,给类型起别名
*/

class SleepWaitStrategy: public WaitStrategy {
    using uint = uint64_t; //定制一下,不习惯打数字.
public:
    SleepWaitStrategy() = default;

    explicit SleepWaitStrategy(uint sleep_time_us)
        : m_sleep_time_us(sleep_time_us) {}

    bool EmptyWait() override {
        std::this_thread::sleep_for(std::chrono::microseconds(m_sleep_time_us));
        return true;//休眠空等
    }

    void SetSleepTime(uint sleep_time_us) { 
        m_sleep_time_us = sleep_time_us;
    }

private:
    uint m_sleep_time_us = 10000;
};


//yield 具体原理
/**
 * 效果不同: yield 暂停和恢复执行,保留协程的状态;
 * sleep 暂停线程的执行,不保留线程的状态。
 * yield 用于协程, sleep 用于线程
 * 两个都会让出CPU, 区别在于状态保留.
*/

class YieldWaitStrategy: public WaitStrategy {
    public:
        YieldWaitStrategy(){}
        bool EmptyWait()override{
            //让出cpu,节省资源
            std::this_thread::yield();
            return true;
        }
};


/**
 * timeout等待
 * 实际项目中经常采取的一种方式.
 * 结合了定时器和阻塞等待. 在一定时间内等到的处理和超时等到的处理相互分离。
 * 1. 未超时,等待条件满足了。 2. 超时。
*/

class TimeoutBlockWaitStrategy: public WaitStrategy {
    using uint = uint64_t;
public:
    TimeoutBlockWaitStrategy() = default;
    explicit TimeoutBlockWaitStrategy(uint time_out_ms)
        : m_time_out_ms(time_out_ms) {}
    
    void NotifyOne() override { 
        m_cv.notify_one();
    }
    
    void BreakAllWait() override { 
        m_cv.notify_all();
    }

    bool EmptyWait() override {
        std::unique_lock<std::mutex> lock(m_mutex);
        
        if (m_cv.wait_for(lock, m_time_out_ms) == std::cv_status::timeout) {
            //定时器触发了,睡眠等
            std::cout << "定时器触发了, 超时" << std::endl;
            return false;
        } 

        return true; //条件触发了, 锁等到了
    }

    void SetTimeOut(uint time_out) {
        m_time_out_ms = std::chrono::milliseconds(time_out);
    }

private:
    std::condition_variable m_cv;
    std::mutex m_mutex;
    std::chrono::milliseconds m_time_out_ms; 
    //直接用这个成员,因为可能用到的地方多, 可以少些一点
    //如果用uint 每次都要转换成 milliseconds
};


/**
 * 可扩展等待策略类实现的体会,感悟. 
 * 重写, 覆盖体会 override 可以使得基类中不固定的部分变得可扩展起来.
 * 并且具有可定制性,根据场景选择适合的策略。有点设计模式喔。
 * 是多态,设计模式的实际应用,应用了策略模式。
 * 策略模式的好处,强烈的一种具体算法,策略实现的可定制性。可选择性。
 * 策略模式允许客户端在运行时选择算法的具体实现,而不必修改其代码。
 * 实现方式:父类提供抽象接口,子类指定具体实现.
*/

/**
 * 提出疑问,固定的cv.notifyone() 以及 cv.notifyall() 为何不直接具体化下来,而要抽象接口,可扩展?
 * 因为有些类无需实现这两个函数,但有些类需要实现。
 * 具体问题具体分析,存在特例,定制,变化的功能模块,
 * 我们就可以考虑是否运用多态,使其扩展开放,且不影响外部接口调用
*/

我学习这份代码的过程中,有一些思考感受。在注释中,大家可以参考一下,我觉得还是很有用的。

代码中内涵的各种细节知识点汇总:

/**
 * 阻塞等待策略. 这是平常我们用的最多的, 但不一定是最好的
 * override关键字: specify override. 
*/

C++ 中的 override关键字
  1. override针对基类中虚函数,子类希望对该虚函数进行重写时加上该关键字,可以避免写时失误而重定义新函数或者参数重载;
  2. override不能写在子类非虚函数后面,也不能写在基类中没有的虚函数后面;
  3. 如果我们将某个函数指定为为final,则之后任何尝试覆盖该函数的操作都将引发错误

总之加上override为了明确告诉编译器你的意图,即你打算重写一个基类中的虚函数。

帮助编译器检查你的代码,确保你所声明的函数确实是基类中虚函数的一个覆盖。如果没有正确覆盖,编译器将产生错误。增加可读性。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

C++中的 default 关键字

default关键字就是告诉编译器,让他显示的生成默认构造函数以及特殊成员函数

class MyClass {
public:
    MyClass() = default; // 使用default关键字生成默认构造函数
};

class ExplicitlyDefaulted {
public:
    ExplicitlyDefaulted() = default;
    ExplicitlyDefaulted(const ExplicitlyDefaulted&) = default; 
// 显式声明编译器生成复制构造函数
    ExplicitlyDefaulted& operator=(const ExplicitlyDefaulted&) = default; 
// 显式声明编译器生成赋值运算符
};
C++中的 delete 关键字

对应default的还有一个delete,可以禁用特殊成员函数,比如构造,赋值成员函数,写单例就要用到它。

class NoCopy {
public:
    NoCopy(const NoCopy&) = delete; // 显式删除复制构造函数
    NoCopy& operator=(const NoCopy&) = delete; // 显式删除赋值运算符
};
C++中的 explicit 关键字

explicit 禁止隐式类型转换

C++中 using 别名技巧

using 技巧,给类型起别名

using uint = uint64_t; //定制一下,不习惯打数字.
sleep 和 yield的区别

//yield 具体原理
/**
 * 效果不同: yield 暂停和恢复执行,保留协程的状态;
 * sleep 暂停线程的执行,不保留线程的状态。
 * yield 用于协程, sleep 用于线程
 * 两个都会让出CPU, 区别在于状态保留.
*/

/**
 * 可扩展等待策略类实现的体会,感悟. 
 * 重写, 覆盖体会 override 可以使得基类中不固定的部分变得可扩展起来.
 * 并且具有可定制性,根据场景选择适合的策略。有点设计模式喔。
 * 是多态,设计模式的实际应用,应用了策略模式。
 * 策略模式的好处,强烈的一种具体算法,策略实现的可定制性。可选择性。
 * 策略模式允许客户端在运行时选择算法的具体实现,而不必修改其代码。
 * 实现方式:父类提供抽象接口,子类指定具体实现.
*/

/**
 * 提出疑问,固定的cv.notifyone() 以及 cv.notifyall() 为何不直接具体化下来,而要抽象接口,可扩展?
 * 因为有些类无需实现这两个函数,但有些类需要实现。
 * 具体问题具体分析,存在特例,定制,变化的功能模块。
 * 我们就可以考虑是否运用多态,使其扩展开放,且不影响外部接口调用。

* 只要存在变化的可能我们就要抽象接口,变化就是通过抽象接口的继承重写来达到的。在基类显示的明确这些变化接口。把它们都关在一起。不让他们混乱,到处招惹是非。
*/

noexcept关键字

用于声明一个函数不会抛出异常。

volatile关键字

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

告诉编译器你不要优化它。每次都必须去读实际的内存,不要优化去读cache。我很易变,你去读实际内存,不要读缓存,因为可能缓存失效了。多线程场景下。另一个线程修改了它,本线程中的缓存就没用了。

用途: 通常用于多线程或者在中断服务程序中,当一个变量的值可能被多个线程或者中断同时修改时,为了避免编译器对变量进行过度的优化,可以使用volatile。

有一个注意点:C++中的 volatile 跟Java中的volatile有区别,C++中并不能直接用来禁止指令重排。注意区分。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

无锁循环队列的设计实现

这块不太好写,核心是要搞懂入队,出队逻辑。这两货是最重要的函数,是实现的关键。无锁,我们就需要利用CAS。CAS几乎是所有语言并发编程都有的操作,不管那个语言肯定都提供了可以完成CAS的API函数。

CAS:compare and swap. 比较并且交换。和期望的旧值比较,如果这个值没有变,说明当前别的线程没有进行原子操作修改这个变量,就可以swap完成原子更新操作。如果比较发现值变化了。就跟新期望旧值,重新再来。这不就是天然的 do {} while();

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

原子性

为什么操作没有原子性?

因为存在中间层,所有的操作都不是不是CPU直接跟内存交互,直接完成的,而是通过缓存(寄存器)这个中间层间接完成的。这会导致数据不一致性。缓存不一致性。因为缓存的修改可以并没有及时的刷新回内存。

eg如下:对于核心1,2结果跟新到缓存,但是缓存结果并未滴落,跟新到内存。核心3看到的还是Old, 过时的数据,运算的最终结果就会出现问题。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

因为它可以在任何增量值从缓存中滴入内存之前读到内存中的旧的X.

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

什么是原子性,原子操作?

原子性:看到的状态只有开始,结束,没有中间

原子操作:指的是要么处于已完成的状态,要么处于初始状态,而不存在中间状态的操作

如何保证原子性?

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池
基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

原子操作不总是最快

原子操作不总是无锁

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池
基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

上述的无锁指的是硬件实现有无锁。是否需要锁总线。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

内存序
  • 为什么会有内存序的问题?

  • 如何解决读写操作的内存序问题,在达到预期的条件下尽量提高性能?

内存序的问题是因为在多线程环境下存在  CPU指令重排 和 编译器优化重排。

这种重排就可能导致指令执行顺序的不确定性,所以我们需要规定内存序,来指导CPU和编译器进行指令重排,来达到预期的结果。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

我自认为积累还不够,暂时理解不了那个深度。所以我选择先记下来,简单理解常见的内存序用在什么场景下就行。

常用的就那么5个:

memory_order_relaxed:宽松内存序,只关心原子性,对于指令执行顺序不关心。对于数据的最新同步性也不要求。给编译器和CPU自由,你们随意调整。

memory_order_seq_cst:默认内存序(全局内存序),保障指令的执行肯定是顺序的,根本没给编译器和CPU留余地,是最严苛的内存序级别。结果肯定能保证符合预期,但是性能可能有所损耗

memory_order_acquire:获取内存序。用于读取操作,也就是load操作。可以保证结果没问题,性能相对还好点,具体为啥,我不是很懂,往下肯定就是内存屏障。

memory_order_release:释放内存序。用于写操作,也就是store操作。

memory_order_acq_rel:获取释放内存序,用于读和写操作。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

获取内存屏障

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

释放内存屏障

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

+----------------------------+----------------------------+
|          thread A          |          thread B          |
+----------------------------+----------------------------+
| store A                    |                            |
| inst B                     |                            |
| release store X            |                            |
| store D                    | acquire load X             |
|                            | load A // valid            |
|                            | load D // maybe invalid    |
+----------------------------+----------------------------+

保障释放内存屏障之前的写操作一定是在release store X 之前完成。获取内存屏障之后的读取操作一定是在acquire load  X 之后完成。  这就保障了A线程的写入操作对线程B可见。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

总结:对于性能而言,不论是无锁和有锁。还是内存屏障或者说内存顺序的选择。都并没有绝对的定论。无锁不是一定就更快。内存序也不是一定默认内存序快。在不同的场景选择合适的方式来实现才能达到对性能的极值追求。

接口实现原理:最主要的就是Enqueue和Dequeue两个接口了。

为什么要采用无锁队列? 无锁和有锁的区别是什么?

锁会带来的问题 (频繁线程切换,抢占所带来的损耗)

  • Cache失效

在保存和恢复上下文的过程中还隐藏了额外的开销:Cache中的数据会失效,因为它缓存的是将被换出任务 的数据,这些数据对于新换进的任务是没用的

  • Mutex上下文切换

任务将大量的时间(睡眠,等待,唤醒)浪费在获得保护队列数据的互斥锁,而不是处理队列中的数据上。 保存,恢复上下文。

  • 频繁的动态内存分配和释放

当⼀个任务从堆中分配内存时,标准的内存分配机制会阻 塞所有与这个任务共享地址空间的其它任务(进程中的所有线程)。

有哪些无锁队列的设计方法? 设计思路,对应好处。(对于zmq只写个大致思路,不写具体实现)

  1. 参考zmq:结合数组和链表两者。(每次申请一个chunk大块,chunk节点包括N个元素的数组。这样既有链表的动态分配,大小不受限制的好处。又有减少内存分配次数,提高性能的好处。)
  2. 循环无锁队列:大小固定,支持多写多读,1写多读,多写1读。有多生产者问题,在多生产者场景下如何保证顺序插入性?  很重要。如何保证按照顺序插入数据,生产数据,保证读取的时候数据一定写入了空间,这些是关键点。

具体实现:采取两个多余空间,一个存储头部,另一个存储尾部的循环队列实现。

结构定义

#define CACHELINE_SIZE 64

/*
@第二部分:有界队列,用来存储模板类型 T的元素

该队列存放线程池任务,最常用的接口,入队和出队队列:task 
采用的非循环队列. 轻队列设计,重线程池设计.
*/

template <typename T>
class BoundedQueue {
    using uint = uint64_t;

public:
    BoundedQueue &operator=(const BoundedQueue &other) = delete;
    BoundedQueue(const BoundedQueue &other) = delete;
    //禁止掉拷贝构造和拷贝赋值操作。

    BoundedQueue() = default;
    ~BoundedQueue();
    void BreakAllWait(); //notifyall

    bool Init(uint capacity); //默认等待策略
    bool Init(uint capacity, WaitStrategy *strategy); //可选择等待策略.

    // 入队,实际入队
    bool Enqueue(const T &element);
    // bool Enqueue(T&& element);

    /*
    @ 队列满时,阻塞等待, 条件等待
    */
    bool WaitEnqueue(const T &element);
    // bool WaitEnqueue(T&& element);

    // 出队
    bool Dequeue(T *element);
    bool WaitDequeue(T *element);

    uint Size() {
        return m_tail - m_head - 1;
    }

    bool Empty() {
        return Size() == 0;
    }
    
    void SetWaitStrategy(WaitStrategy *strategy) { //设置等待策略.
        m_wait_strategy.reset(strategy);
    }
   
    uint Head() { return m_head.load(); }
    uint Tail() { return m_tail.load(); }
    uint MaxHead() { return m_max_head.load(); }

private:
    // 队里索引下标
    uint GetIndex(uint num);

    // 指定内存对齐方式, 可提高代码性能和效率
    // atomic 保障原子操作, 无锁.
    alignas(CACHELINE_SIZE) std::atomic<uint> m_head = {0};
    alignas(CACHELINE_SIZE) std::atomic<uint> m_tail = {1};
    alignas(CACHELINE_SIZE) std::atomic<uint> m_max_head = {1}; //最大的head, tail的备份 

    uint m_pool_capacity = 0;         // 记录线程池容量
    T *m_pool = nullptr;              // 线程池数组容器
    std::unique_ptr<WaitStrategy> m_wait_strategy = nullptr;  //等待策略
    volatile bool m_break_all_wait = false; // 标记是否存在等待
};

Init初始化

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

template <typename T>
bool BoundedQueue<T>::Init(uint capacity, WaitStrategy *strategy) {
    m_pool_capacity = capacity + 2; //多出两个空间
    m_pool = reinterpret_cast<T*>(std::calloc(m_pool_capacity, sizeof(T)));
    
    if (m_pool == nullptr) {
        return false;
    }

    for (uint i = 0; i < m_pool_capacity; i ++) {
        new (&m_pool[i]) T(); //定位new.
    }

    m_wait_strategy.reset(strategy);    
    return true;
}

Wait操作

wait操作的逻辑都是一样的,先尝试一次入队或者出队。如果成功则返回。如果失败则陷入等待,如果等待条件触发则再次尝试。  如果等待超时则break。入队或出队失败。

template<typename T>
bool BoundedQueue<T>::WaitEnqueue(const T &element) {
    
    while (!m_break_all_wait) {
        if (Enqueue(element)) { //首次尝试插入
            return true;
        }
        
        // 没有插入成功. 说明队满, 按照等待策略等待
        if (m_wait_strategy->EmptyWait()) { 
            continue;  //如果cond条件触发再次尝试插入
        }

        break; //timeout
    }
    
    return false;
}

Enqueue操作

存在多生产者的问题,多个生产者线程同时要插入数据到循环队列中。

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

基于无锁循环队列的线程池的实现,基础组件,课程实践,开发语言,c++,中间件,无锁线程池

第一个CAS:是为了保证原子性的单个操作。这个不需要管顺序性。只需要先获取存储空间。

第二个CAS:其实不只是为了保证原子性,更是为了保证真正的顺序插入。以保障读写一致性。否则读Dqueue那边不清楚到底插入了几个元素了。而顺序插入则一定可以保证当前的m_max_head,也就是最大读取下标之前所有空间的元素已经全部完成插入。这一点很重要,是这个无锁循环队列实现的关键。大家结合代码逻辑认知体会。

template<typename T>
bool BoundedQueue<T>::Enqueue(const T &element) {
    uint new_tail = 0;  //用于存储new_tail
    uint old_tail = m_tail.load(std::memory_order_acquire); //获取旧值
    uint old_max_head = 0; //临时存储old_max_head

    do {    
        new_tail = old_tail + 1; 
        
        if (GetIndex(new_tail) == GetIndex(m_head.load(std::memory_order_acquire))) {
            return false; //队列满 插入失败. 后续陷入等待.
        }
    } while (!m_tail.compare_exchange_weak(old_tail, new_tail,
                                           std::memory_order_acq_rel, 
                                           std::memory_order_relaxed)); //保障空间申请的原子性
    
    m_pool[GetIndex(old_tail)] = element; //旧(就)地插入

    do {
        old_max_head = old_tail;   
    } while (!m_max_head.compare_exchange_weak(old_tail, new_tail,
                                               std::memory_order_acq_rel, 
                                               std::memory_order_relaxed)); //更新max_head. 

    m_wait_strategy->NotifyOne(); //通知消费
    return true;
}

Dequeue操作

上面那个Enqueue读懂这个Dequeue完全OK.

template<typename T>
bool BoundedQueue<T>::Dequeue(T *element) {
    uint new_head = 0;  //存储最新head
    uint old_head = m_head.load(std::memory_order_acquire); //加载存储旧head

    do {
        new_head = old_head + 1; //空间移除, 计算新头
        
        //是否满足在最大出队下标以内
        //此时不用 m_tail 判断队列空,
        //原因是: 可能和入队操作冲突, 先申请了空间, 元素还没插入, 入队操作还未彻底完成
        if (new_head == m_max_head.load(std::memory_order_acquire)) {
            return false; //队空
        }

        //用element传出参数传出pop的数据, 也就是new_head中的数据
        *element = m_pool[GetIndex(new_head)];

    } while (!m_head.compare_exchange_weak(old_head, new_head,
                                           std::memory_order_acq_rel,
                                           std::memory_order_relaxed)); 
                                            
    m_wait_strategy->NotifyOne(); //通知生产.
    return true;
}

线程池结构设计实现

就是一个生产者消费者模型嘛。

class ThreadPool
{
public:
    explicit ThreadPool(std::size_t thread_num,
                        std::size_t max_task_num = 1000) : stop_(false) {

        // 初始化失败抛出异常
        if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
            throw std::runtime_error("Task queue init failed");
        }

        // 存放多个 std::thread线程对象
        workers_.reserve(thread_num);

        for (size_t i = 0; i < thread_num; ++i) {
            // 使用一个 lambda表达式来创建每个线程
            // 功能是 从任务队列中获取任务,并执行任务的函数对象
            workers_.emplace_back([this] {
                while(!stop_) {
                    std::function<void()> task;
                    if (task_queue_.WaitDequeue(&task)){
                        task();
                    }
                } 
            });
        }

    }

    template <typename F, typename... Args>
    auto Enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type> {

        using return_type = typename std::result_of<F(Args...)>::type;

        // 函数 f和其参数args, 打包成一个 std::packaged_task对象,放入任务队列
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        // 并返回一个与该任务关联的 std::future对象
        std::future<return_type> res = task->get_future();

        if (stop_) {
            return std::future<return_type>();
        }

        task_queue_.Enqueue([task]()
                            { (*task)(); });
        return res;
    }

    inline ~ThreadPool() {

        if (stop_.exchange(true)) {
            return;
        }

        task_queue_.BreakAllWait();

        for (std::thread &worker : workers_) {
            worker.join();
        }
    }

private:
    std::vector<std::thread> workers_;
    BoundedQueue<std::function<void()>> task_queue_;
    std::atomic_bool stop_;
};

此处思路不难,就是开启线程,不断从任务队列中WaitDeueue处理任务罢了。

如果说有难点也是Enqueue的再封装,用到的语法很是新奇,我没咋用过。核心也就是打包一个函数任务塞入任务队列,等待消费者线程wockers消化。并且以future对象形式返回任务执行结果。

如下:源码,完整版。

#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <iostream>
#include <chrono>
#include <atomic>

#include <vector>
#include <string>
#include <queue>
#include <future>
#include <sstream>

#include <cstdint>
#include <cstdlib>
#include <memory>
#include <utility>


#define CACHELINE_SIZE 64


//无锁线程池
/**
 * module one: 等待策略的封装, 扩展
 * 4种
*/


class WaitStrategy { //等待策略的基类封装
public:
    virtual void NotifyOne() {}    //cv.notifyone
    virtual void BreakAllWait() {} //cv.notifyall
    virtual bool EmptyWait() = 0;  //cv.wait的定制实现。
    virtual ~WaitStrategy() {}     
};



/**
 * 阻塞等待策略. 这是平常我们用的最多的, 但不一定是最好的
 * override关键字: specify override.
*/
class BlockWaitStrategy: public WaitStrategy{
public:
    BlockWaitStrategy() = default;
    void NotifyOne() override { 
        m_cv.notify_one();
    }
    
    void BreakAllWait() override { 
        m_cv.notify_all();
    }

    bool EmptyWait() override {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock);//等锁
        return true;
    }

private:
    std::mutex m_mutex;
    std::condition_variable m_cv;
};

/**
 * sleep 等待策略. 
 * 存粹的进入sleep休眠状态. 
 * explicit 禁止隐式类型转换
 * using 技巧,给类型起别名
*/

class SleepWaitStrategy: public WaitStrategy {
    using uint = uint64_t; //定制一下,不习惯打数字.
public:
    SleepWaitStrategy() = default;

    explicit SleepWaitStrategy(uint sleep_time_us)
        : m_sleep_time_us(sleep_time_us) {}

    bool EmptyWait() override {
        std::this_thread::sleep_for(std::chrono::microseconds(m_sleep_time_us));
        return true;//休眠空等
    }

    void SetSleepTime(uint sleep_time_us) { 
        m_sleep_time_us = sleep_time_us;
    }

private:
    uint m_sleep_time_us = 10000;
};


//yield 具体原理
/**
 * 效果不同: yield 暂停和恢复执行,保留协程的状态;
 * sleep 暂停线程的执行,不保留线程的状态。
 * yield 用于协程, sleep 用于线程
 * 两个都会让出CPU, 区别在于状态保留.
*/

class YieldWaitStrategy: public WaitStrategy {
    public:
        YieldWaitStrategy() {}
        bool EmptyWait() override {
            //让出cpu,节省资源
            std::this_thread::yield();
            return true;
        }
};


/**
 * timeout等待
 * 实际项目中经常采取的一种方式.
 * 结合了定时器和阻塞等待. 在一定时间内等到的处理和超时等到的处理相互分离。
 * 1. 未超时,等待条件满足了。 2. 超时。
*/

class TimeoutBlockWaitStrategy: public WaitStrategy {
    using uint = uint64_t;
public:
    TimeoutBlockWaitStrategy() = default;
    explicit TimeoutBlockWaitStrategy(uint time_out_ms)
        : m_time_out_ms(time_out_ms) {}
    
    void NotifyOne() override { 
        m_cv.notify_one();
    }
    
    void BreakAllWait() override { 
        m_cv.notify_all();
    }

    bool EmptyWait() override {
        std::unique_lock<std::mutex> lock(m_mutex);
        
        if (m_cv.wait_for(lock, m_time_out_ms) == std::cv_status::timeout) {
            //定时器触发了,睡眠等
            std::cout << "定时器触发了, 超时" << std::endl;
            return false;
        } 

        return true; //条件触发了, 锁等到了
    }

    void SetTimeOut(uint time_out) {
        m_time_out_ms = std::chrono::milliseconds(time_out);
    }

private:
    std::condition_variable m_cv;
    std::mutex m_mutex;
    std::chrono::milliseconds m_time_out_ms; 
    //直接用这个成员,因为可能用到的地方多, 可以少些一点
    //如果用uint 每次都要转换成 milliseconds
};


/**
 * 可扩展等待策略类实现的体会,感悟. 
 * 重写, 覆盖体会 override 可以使得基类中不固定的部分变得可扩展起来.
 * 并且具有可定制性,根据场景选择适合的策略。有点设计模式喔。
 * 是多态,设计模式的实际应用,应用了策略模式。
 * 策略模式的好处,强烈的一种具体算法,策略实现的可定制性。可选择性。
 * 策略模式允许客户端在运行时选择算法的具体实现,而不必修改其代码。
 * 实现方式:父类提供抽象接口,子类指定具体实现.
*/

/**
 * 提出疑问,固定的cv.notifyone() 以及 cv.notifyall() 为何不直接具体化下来,而要抽象接口,可扩展?
 * 因为有些类无需实现这两个函数,但有些类需要实现。
 * 具体问题具体分析,存在特例,定制,变化的功能模块,
 * 我们就可以考虑是否运用多态,使其扩展开放,且不影响外部接口调用
*/


/*
@第二部分:有界队列,用来存储模板类型 T的元素

该队列存放线程池任务,最常用的接口,入队和出队队列:task 
采用的非循环队列. 轻队列设计,重线程池设计.
*/

template <typename T>
class BoundedQueue {
    using uint = uint64_t;

public:
    BoundedQueue &operator=(const BoundedQueue &other) = delete;
    BoundedQueue(const BoundedQueue &other) = delete;
    //禁止掉拷贝构造和拷贝赋值操作。

    BoundedQueue() = default;
    ~BoundedQueue();
    void BreakAllWait(); //notifyall

    bool Init(uint capacity); //默认等待策略
    bool Init(uint capacity, WaitStrategy *strategy); //可选择等待策略.

    // 入队,实际入队
    bool Enqueue(const T &element);
    // bool Enqueue(T&& element);

    /*
    @ 队列满时,阻塞等待, 条件等待
    */
    bool WaitEnqueue(const T &element);
    // bool WaitEnqueue(T&& element);

    // 出队
    bool Dequeue(T *element);
    bool WaitDequeue(T *element);

    uint Size() {
        return m_tail - m_head - 1;
    }

    bool Empty() {
        return Size() == 0;
    }
    
    void SetWaitStrategy(WaitStrategy *strategy) { //设置等待策略.
        m_wait_strategy.reset(strategy);
    }
   
    uint Head() { return m_head.load(); }
    uint Tail() { return m_tail.load(); }
    uint MaxHead() { return m_max_head.load(); }

private:
    // 队里索引下标
    uint GetIndex(uint num);

    // 指定内存对齐方式, 可提高代码性能和效率
    // atomic 保障原子操作, 无锁.
    alignas(CACHELINE_SIZE) std::atomic<uint> m_head = {0};
    alignas(CACHELINE_SIZE) std::atomic<uint> m_tail = {1};
    alignas(CACHELINE_SIZE) std::atomic<uint> m_max_head = {1}; //最大的head, tail的备份 

    uint m_pool_capacity = 0;         // 记录线程池容量
    T *m_pool = nullptr;              // 线程池数组容器
    std::unique_ptr<WaitStrategy> m_wait_strategy = nullptr;  //等待策略
    volatile bool m_break_all_wait = false; // 标记是否存在等待
};

template <typename T>
inline uint64_t BoundedQueue<T>::GetIndex(uint num) {
    return num - (num / m_pool_capacity) * m_pool_capacity;
}

template <typename T>
inline void BoundedQueue<T>::BreakAllWait() { //唤醒所有
    m_break_all_wait = 1;
    m_wait_strategy->BreakAllWait();  
}
template <typename T>
inline bool BoundedQueue<T>::Init(uint capacity) {
    return Init(capacity, new SleepWaitStrategy());
}

template <typename T>
bool BoundedQueue<T>::Init(uint capacity, WaitStrategy *strategy) {
    m_pool_capacity = capacity + 2; //多出两个空间
    m_pool = reinterpret_cast<T*>(std::calloc(m_pool_capacity, sizeof(T)));
    
    if (m_pool == nullptr) {
        return false;
    }

    for (uint i = 0; i < m_pool_capacity; i ++) {
        new (&m_pool[i]) T(); //定位new.
    }

    m_wait_strategy.reset(strategy);    
    return true;
}

template <typename T>
BoundedQueue<T>::~BoundedQueue() {
    
    if (m_wait_strategy) { //唤醒所有, 都该销毁了
        m_wait_strategy->BreakAllWait(); 
    }

    if (m_pool) { //析构对象释放内存.
        for (uint i = 0; i < m_pool_capacity; i++) {
            m_pool[i].~T();  //显示析构
        }
        std::free(m_pool);
    }
}

template<typename T>
bool BoundedQueue<T>::WaitEnqueue(const T &element) {
    
    while (!m_break_all_wait) {
        if (Enqueue(element)) { //首次尝试插入
            return true;
        }
        
        // 没有插入成功. 说明队满, 按照等待策略等待
        if (m_wait_strategy->EmptyWait()) { 
            continue;  //如果cond条件触发再次尝试插入
        }

        break; //timeout
    }
    
    return false;
}


template<typename T>
bool BoundedQueue<T>::WaitDequeue(T *element) {
    
    while (!m_break_all_wait) {
        
        if (Dequeue(element)) { //先尝试出队
            return true;
        }
        
        if (m_wait_strategy->EmptyWait()) { //出队失败, 等
            continue; //条件触发, 再次尝试
        }

        break;  //timeout
    }

    return false;
}


/**
 * push 入队逻辑 先申请空间,拿到空间,后插入元素。 
 * 注意:只是申请空间必须保证原子性,顺序性。
 * 空间原子申请到了,顺序了,元素插入顺序不所谓,不影响最终结果
 * 注意,完成 tail+'1' 操作拿到空间,并且完成了 元素放入空间,插入操作并未结束.
 * 必须保证了m_max_head的跟新操作,而且m_max_head的跟新也必须顺序. 
 * 原因, 保证数据拷贝进入之后才允许消费者线程将其出队
 * 呼应 dequeue操作的 new_head < m_max_head 才出队
*/

template<typename T>
bool BoundedQueue<T>::Enqueue(const T &element) {
    uint new_tail = 0;  //用于存储new_tail
    uint old_tail = m_tail.load(std::memory_order_acquire); //获取旧值
    uint old_max_head = 0; //临时存储old_max_head

    do {    
        new_tail = old_tail + 1; 
        
        if (GetIndex(new_tail) == GetIndex(m_head.load(std::memory_order_acquire))) {
            return false; //队列满 插入失败. 后续陷入等待.
        }
    } while (!m_tail.compare_exchange_weak(old_tail, new_tail,
                                           std::memory_order_acq_rel, 
                                           std::memory_order_relaxed)); //保障空间申请的原子性
    
    m_pool[GetIndex(old_tail)] = element; //旧(就)地插入

    do {
        old_max_head = old_tail;   
    } while (!m_max_head.compare_exchange_weak(old_tail, new_tail,
                                               std::memory_order_acq_rel, 
                                               std::memory_order_relaxed)); //更新max_head. 

    m_wait_strategy->NotifyOne(); //通知消费
    return true;
}


/**
 * pop 逻辑是 head 先加一 再pop new_head. head 本来就是队首元素的前一位.
 * head 是空队头
*/

template<typename T>
bool BoundedQueue<T>::Dequeue(T *element) {
    uint new_head = 0;  //存储最新head
    uint old_head = m_head.load(std::memory_order_acquire); //加载存储旧head

    do {
        new_head = old_head + 1; //空间移除, 计算新头
        
        //是否满足在最大出队下标以内
        //此时不用 m_tail 判断队列空,
        //原因是: 可能和入队操作冲突, 先申请了空间, 元素还没插入, 入队操作还未彻底完成
        if (new_head == m_max_head.load(std::memory_order_acquire)) {
            return false; //队空
        }

        //用element传出参数传出pop的数据, 也就是new_head中的数据
        *element = m_pool[GetIndex(new_head)];

    } while (!m_head.compare_exchange_weak(old_head, new_head,
                                           std::memory_order_acq_rel,
                                           std::memory_order_relaxed)); 
                                            
    m_wait_strategy->NotifyOne(); //通知生产.
    return true;
}



/*
@第三部分: threadPool实现,对外接口,将任务提交到一个任务队列
然后使用多个线程来并发处理这些任务

微信公众号 《码出名企路》  获取视频,文档,代码,入圈,与小伙伴一起学习

*/

class ThreadPool
{
public:
    explicit ThreadPool(std::size_t thread_num,
                        std::size_t max_task_num = 1000) : stop_(false) {

        // 初始化失败抛出异常
        if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
            throw std::runtime_error("Task queue init failed");
        }

        // 存放多个 std::thread线程对象
        workers_.reserve(thread_num);

        for (size_t i = 0; i < thread_num; ++i) {
            // 使用一个 lambda表达式来创建每个线程
            // 功能是 从任务队列中获取任务,并执行任务的函数对象
            workers_.emplace_back([this] {
                while(!stop_) {
                    std::function<void()> task;
                    if (task_queue_.WaitDequeue(&task)){
                        task();
                    }
                } 
            });
        }

    }

    template <typename F, typename... Args>
    auto Enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type> {

        using return_type = typename std::result_of<F(Args...)>::type;

        // 函数 f和其参数args, 打包成一个 std::packaged_task对象,放入任务队列
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        // 并返回一个与该任务关联的 std::future对象
        std::future<return_type> res = task->get_future();

        if (stop_) {
            return std::future<return_type>();
        }

        task_queue_.Enqueue([task]()
                            { (*task)(); });
        return res;
    }

    inline ~ThreadPool() {

        if (stop_.exchange(true)) {
            return;
        }

        task_queue_.BreakAllWait();

        for (std::thread &worker : workers_) {
            worker.join();
        }
    }

private:
    std::vector<std::thread> workers_;
    BoundedQueue<std::function<void()>> task_queue_;
    std::atomic_bool stop_;
};

/*
@ 第四部分:线程池的测试用例

1,封装线程池的等待策略:4
2,有界队列:保持task:封装了等待策略对象,用来选择不同的wait
3,threadpool,对外提供接口,封装有界队列对象,用来入队

微信公众号 《码出名企路》

*/

class Test_ThreadPool
{

public:
    void test() {
        ThreadPool thread_pool(4);
        std::vector<std::future<std::string>> results;

        for (int i = 0; i < 8; i++) {
            results.emplace_back(
                thread_pool.Enqueue(
                    [i]() {
                        std::ostringstream ss;
                        ss << "hello world"<< i;
                        std::cout<< ss.str() << std::endl;
                        return ss.str(); 
                    }
                )
            );
        }

        for (auto &&result : results) {
            std::cout << "result: " << result.get() << std::endl;
        }
    }
};

int main()
{
    Test_ThreadPool test_;
    test_.test();

    return 0;
}

这篇我的理解还很浅。因为以前没咋用过,所以有点知识的堆砌,大家理解,以后肯定会改进。刚学,大家感兴趣的可以去b站搜索阳哥,他给了更详细的文档,只是要点费用,但是不多,都是学知识嘛。慢慢积累。文章来源地址https://www.toymoban.com/news/detail-820019.html

到了这里,关于基于无锁循环队列的线程池的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于线程池的TCP套接字通信

    还是只改变 server.cpp 其中 main 函数, 也就是 主线程中的处理流程 : 创建监听的套接字 绑定IP和端口 设置监听 创建线程池实例对象 添加监听任务 acceptConn 主线程退出 监听任务函数的处理流程 如下: 阻塞等待并接受客户端连接 检测有客户端连接时, 添加通信任务 worker 释放资源

    2024年01月23日
    浏览(62)
  • linux线程池、基于线程池的单例模式、读者写者问题

    线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

    2024年02月03日
    浏览(37)
  • 【Linux】线程终结篇:线程池以及线程池的实现

    linux线程完结 文章目录 前言 一、线程池的实现 二、了解性知识 1.其他常见的各种锁 2.读者写者问题 总结 什么是线程池呢? 线程池一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行

    2024年02月12日
    浏览(35)
  • Linux基于多线程和任务队列实现生产消费模型

    目录 一、生产者消费者模型 二、代码实现模型 2.1 BlockQueue.hpp 2.2 MainCP.cc 2.3 执行结果 三、效率优势 将上述图片逻辑转换成代码逻辑就是,一批线程充当生产者角色,一批线程充当消费者角色,仓库是生产者和消费者获取的公共资源!下面我想用 321原则 来解释这个模型。 既

    2024年02月09日
    浏览(46)
  • SpringBoot线程池和Java线程池的实现原理

    @ 目录 使用默认的线程池 方式一:通过 @Async 注解调用 方式二:直接注入 ThreadPoolTaskExecutor 线程池默认配置信息 SpringBoot 线程池的实现原理 覆盖默认的线程池 管理多个线程池 JAVA常用的四种线程池 newCachedThreadPool newFixedThreadPool newScheduledThreadPool newSingleThreadExecutor Java 线程池中

    2023年04月11日
    浏览(47)
  • Java 线程池的原理与实现

    最近在学习线程池、内存控制等关于提高程序运行性能方面的编程技术,线程池就是其中之一,一提到线程,我们会想到以前《操作系统》的生产者与消费者,信号量,同步控制等等。 一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议 :在阅读本文前,先理

    2024年02月13日
    浏览(40)
  • SpringBoot线程池和Java线程池的用法和实现原理

    @ 目录 使用默认的线程池 方式一:通过 @Async 注解调用 方式二:直接注入 ThreadPoolTaskExecutor 线程池默认配置信息 SpringBoot 线程池的实现原理 覆盖默认的线程池 管理多个线程池 JAVA常用的四种线程池 newCachedThreadPool newFixedThreadPool newScheduledThreadPool newSingleThreadExecutor Java 线程池中

    2023年04月11日
    浏览(39)
  • 【Linux】简单线程池的设计与实现 -- 单例模式

    线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务 。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

    2024年02月12日
    浏览(45)
  • 【Java系列】多线程案例学习——基于阻塞队列实现生产者消费者模型

    个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得,欢迎大家在评论区交流讨论💌 什么是阻塞式队列(有两点): 第一点:当队列满的时候

    2024年02月04日
    浏览(57)
  • 【并发编程】无锁环形队列Disruptor并发框架使用

    Disruptor 是苹国外厂本易公司LMAX开发的一个高件能列,研发的初夷是解决内存队列的延识问顾在性能测试中发现竟然与10操作处于同样的数量级),基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCn演讲后,获得了业界关注,201年,企业应用软件专家Martin Fower专门撰

    2024年02月14日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包