C++:设计一个线程安全的队列

这篇具有很好参考价值的文章主要介绍了C++:设计一个线程安全的队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


c++ 线程安全队列,C/C++,多线程,c++,单元测试,ThreadSanitizer,pthread

1. 目的

串行的程序只用到单个 CPU 核心, 希望加速整个程序, 考虑使用多线程加速。典型情况下可以找到生产者、消费者,两个角色之间通过队列进行数据交互:

  • 生产者负责把数据放入队列Q
  • 消费者负责从队列取出数据Q

要求队列是线程安全的,即:不能有读写冲突等。

2. 实现?验证!

这里并不给出具体实现, 主要原因是网络上有太多的“实现”,也许很强大,但是否正确则有待验证,反倒是怎样验证正确性,总是被忽略:

  • 新手小白,或者“算法工程师”们,往往没怎么写过合格的单元测试
  • 验证也许只是粗略跑一下,Thread Sanitizer 这样的有力武器没有被用上

makefile

我是在 Linux 下验证的, 用的 makefile 如下, 重点是 tsan 的设定, 以及 gtest 的配置:

SANITIZER_OPTION=-fsanitize=thread -fno-omit-frame-pointer
#SANITIZER_OPTION=

all:
	clang++ test_queue.cpp -I. -g `pkg-config --cflags --libs gtest gtest_main` ${SANITIZER_OPTION}

Queue 类的 public 成员

template<typename T>
class Queue
{
public:
    Queue(unsigned int max_size = 0);
    ~Queue();
    void push(const T& elem);
    T pop();
    bool empty();
    size_t size();

其中:

  • Queue是模板类,这样可以支持任意数据类型作为队列元素(但队列中所有元素类型需要相同)
  • 所有成员函数都不能是 const 的, 尤其是 empty 和 size 函数, 原因是当前线程调用它们时,其他线程可能立即改变队列成员,需要 mutex 锁住, 对于 mutex 的操作导致函数不再是 const 的
  • 支持设定队列最大元素数量,如果没指定, 看似用0,实际表示“无限”

单元测试

如下是基于 GoogleTest 和 Queue 的 ADT 给出的单元测试代码。
如果你基于上述 Queue 类的定义, 能通过如下单元测试, 那么程序的正确性应该说比较高了。这部分代码的价值比 Queue 本身的价值要更高, 但往往被人们忽略:

#include <gtest/gtest.h>
#include <digimon/queue.hpp>
#include <shadow/queue.hpp>
#include <unistd.h>

using namespace digimon;
//using namespace Shadow;

TEST(Queue, SingleThread)
{
    Queue<int> q;
    EXPECT_EQ(q.empty(), true);

    q.push(1);
    q.push(2);
    EXPECT_EQ(q.empty(), false);
    
    int x = q.pop();
    EXPECT_EQ(x, 1);

    x = q.pop();
    EXPECT_EQ(x, 2);
}

class ThreadData
{
public:
    ThreadData() {}
    ThreadData(Queue<int>* _q, int _start, int _end) :
        q(_q), start(_start), end(_end)
    {}

public:
    Queue<int>* q;
    int start;
    int end;
};

class ConsumerThreadData
{
public:
    ConsumerThreadData(Queue<int>* _q, int _start, int _end) :
        q(_q), start(_start), end(_end), sum(0)
    {
        pthread_mutex_init(&mutex, NULL);
    }
    ~ConsumerThreadData()
    {
        pthread_mutex_destroy(&mutex);
    }

public:
    Queue<int>* q;
    int start;
    int end;
    int sum;
    pthread_mutex_t mutex;
};

static void* producer(void* _thread_data)
{
    ThreadData* thread_data = (ThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        thread_data->q->push(i);
    }

    return NULL;
}

TEST(Queue, MultiThread_MultiProducer)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ThreadData thread_data2(&q, 0, 10);
    pthread_create(&t2, NULL, producer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), false);
    EXPECT_EQ(q.size(), 20);

    int sum = 0;
    while (!q.empty())
    {
        int x = q.pop();
        sum += x;
    }
    int expected_sum = 90;
    EXPECT_EQ(expected_sum, sum);
}

static void* consumer(void* _thread_data)
{
    ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        int x = thread_data->q->pop();
        thread_data->sum += x;
        std::cout << x << std::endl;
    }

    return NULL;
}

TEST(Queue, MultiThread_SingleProducer_SingleConsumer)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 10);
    pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), true);
    EXPECT_EQ(q.size(), 0);
}

static void* producer_slow(void* _thread_data)
{
    ThreadData* thread_data = (ThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        sleep(1);
        thread_data->q->push(i);
    }

    return NULL;
}

TEST(Queue, MultiThread_Consumer_Meaningless_Grab_Mutex)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 3);
    pthread_create(&t1, NULL, producer_slow, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 3);
    pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), true);
    EXPECT_EQ(q.size(), 0);
    EXPECT_EQ(thread_data2.sum, 3);
}

static void* consumer_slow(void* _thread_data)
{
    ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        EXPECT_EQ(thread_data->q->size(), 5);
        int x = thread_data->q->pop();
        thread_data->sum += x;
        sleep(1);
        std::cout << x << std::endl;
    }

    return NULL;
}

TEST(Queue, LimitedQueueSize)
{
    Queue<int> q(5);

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 5);
    pthread_create(&t2, NULL, consumer_slow, (void*)&thread_data2);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), false);
    EXPECT_EQ(q.size(), 5);
}

3. 实现 Queue 类的方案

可以基于 C++ 11 实现, 不过据说 C++11 的 thread 在华为手机上有问题,传闻中 pthread 能消除问题;
于是乎还有另一个选择: C++03 + pthread 实现 Queue 类。

Windows 平台上可以使用 windows-pthreads 库, 它是基于 Windows threads 模拟实现了 PThread 和 Semaphore 接口。(完)文章来源地址https://www.toymoban.com/news/detail-629680.html

到了这里,关于C++:设计一个线程安全的队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C++实现一个线程安全的map

    本文是使用ChatCPT生成的,最终的代码使用起来没问题。代码是通过两轮对话完善的,后面把对话合并后跑不出理想效果就没尝试了。 c++11实现一个线程安全的map,使用方法与std::map保持一致,实现[]运算符 以下是一个简单的线程安全的map实现,可以使用[]运算符来访问和修改

    2024年02月03日
    浏览(38)
  • C++类设计:一个比较复杂的日志类 支持多线程、文件切换、信息缓存(源码)

    初级代码游戏的专栏介绍与文章目录-CSDN博客 github位置:codetoys/ctfc.git src/env/myLog.h和.cpp           这个类功能细节比较多,因为是长期使用的版本,累积了各种功能。之前介绍的两个是我的测试代码用的版本,非常简单,那两个在这里: C++类设计:设计一个日志类(源码

    2024年04月10日
    浏览(40)
  • (C++) 如何设计一个安全的pop函数

    在经典数据结构, stack 和 queue 中有一个重要的函数那就是 pop() 表示弹出线性顶部的一个元素。 而在各种语言的标准数据结构中也自然有这些数据结构和对应的函数。 在C++中,pop()无返回,且对空对象pop()行为未定义。 空对象未定义可以理解,但是为什么不返回顶部元素呢?

    2024年03月13日
    浏览(37)
  • 现代C++编程实战25-两个单元测试库:C++里如何进行单元测试

    你好,我是吴咏炜。 单元测试已经越来越成为程序员工作密不可分的一部分了。在 C++ 里,我们当然也是可以很方便地进行单元测试的。今天,我就来介绍两个单元测试库:一个是 Boost.Test [1],一个是 Catch2 [2]。 单元测试库有很多,我选择 Boost 的原因我在上一讲已经说过:“

    2024年02月07日
    浏览(41)
  • 【C++】 排序算法合集 && 单元测试

    排序算法 是《数据结构与算法》中最基本的算法之一。 十种常见排序算法可以分为两大类: 比较类排序 :通过比较来决定元素间的相对次序,时间复杂度为 O(nlogn)~O(n²)。 非比较类排序 :不通过比较来决定元素间的相对次序,其时间复杂度可以突破 O(nlogn),以线性时间运

    2024年03月15日
    浏览(44)
  • 使用C++构建安全队列

    STL的容器不是线程安全的,我们经常会有需求要求数据结构线程安全,比如写生产者消费者模型的时候,就要求队列线程安全。 利用std::queue和C++线程标准库的一些组件(mutex,condition_variable),可以写一个线程安全的队列ConcurrenceQueue。 需要4个函数 push,入队; pop,出队并返

    2024年02月08日
    浏览(32)
  • C++轻量级单元测试框架

    单元测试是构建稳定、高质量的程序、服务或系统的必不可少的一环。通过单元测试,我们可以在开发过程中及时发现和修复代码中的问题,提高代码的质量和可维护性。同时,单元测试也可以帮助我们更好地理解代码的功能和实现细节,从而更好地进行代码重构和优化。

    2023年04月25日
    浏览(72)
  • C++ 手写一个线程池

    本专栏已在我的个人站点中完成更新升级,可点击这里直达。 本专栏不再更新,不要购买!如有需要,请前往我的自建站点中购买, 价格更实惠 、 内容更丰富 、 并且继续保持更新 。 已购买该专栏的同学,可点击这里查看后续调整方案。 更多说明,可点击这里查看。

    2024年02月13日
    浏览(43)
  • C++单元测试Gtest+Stub攻略

    笔者环境为linux环境(deepin),以下均在此环境进行 Gtest源码链接 Stub源码链接 StubExt源码链接 Stub的使用方法在cpp-stub/README_zh.md中有讲解 StubExt的使用方法在 cpp-stub-ext/ README.md中有讲解 StubExt可支持Lambda表达式进行打桩写Gtest时如果想获取一个固定的返回值或者出参十分好用 搭建环

    2024年02月10日
    浏览(51)
  • C++ 多线程:实现一个功能完整的线程池

            今天我们来聊一聊异步编程的知识。在分布式系统中,一个功能完整的线程池类是一切代码的前提。 一个『合格』的线程池该具备哪些功能? 首先,很自然地想到『线程池类里该有个线程对象的集合,然后可以初始化线程对象的个数、创建线程对象、及启动线程主

    2024年02月04日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包