C++11线程池和Linux C++线程对比使用

这篇具有很好参考价值的文章主要介绍了C++11线程池和Linux C++线程对比使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、C++11线程池代码

threadpool.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;

void getNow(timeval *tv);
int64_t getNowMs();
#define TNOW getNow()
#define TNOWMS getNowMs()


class ThreadPool
{
protected:
    struct TaskFunc
    {
        TaskFunc(uint64_t expireTime) : _expireTime(expireTime)
        { }
        std::function<void()> _func;
        int64_t _expireTime = 0; //超时的绝对时间
    };
    // 定义一个智能指针类型
    typedef shared_ptr<TaskFunc> TaskFuncPtr;

public:
    ThreadPool();
    // 析构函数停止所有线程
    virtual ~ThreadPool();

    // 初始化线程个数
    bool init(size_t num);

    // 获取线程数
    size_t getThreadNum(){
        std::unique_lock<std::mutex> lock(_mutex);
        return _threads.size();
    }

    // 获取线程池任务数
    size_t getJobNum(){
        std::unique_lock<std::mutex> lock(_mutex);
        return _tasks.size();
    }

    // 停止所有线程,等待所有线程结束
    void stop();

    // 启动所有线程
    bool start();

    // 用线程池启用任务(F是function, Args是参数)
    template <class F, class... Args>
    auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        return exec(0,f,args...);
    }

    // 用线程池启用任务(F是function, Args是参数)
    template <class F, class... Args>
    auto exec(int64_t timeoutMs, F&& f, Args&&... args) ->
    std::future<decltype(f(args...))>
    {
        int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取现在时间
        //定义返回值类型
        using RetType = decltype(f(args...)); // 推导返回值
        // 封装任务
        auto task = std::make_shared<std::packaged_task<RetType()>>
                    (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        // 封装任务指针,设置过期时间
        TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);
        
        // 具体执行的函数
        fPtr->_func = [task]() { 
            (*task)();
        };
        std::unique_lock<std::mutex> lock(_mutex);
        // 插入任务
        _tasks.push(fPtr);
        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情
        _condition.notify_one();
        //况再去notify
        return task->get_future();;
    }
    // 等待当前任务队列中,所有工作全部结束
    bool waitForAllDone(int millsecond = -1);
protected:
    // 获取任务
    bool get(TaskFuncPtr&task);
    // 线程池是否退出
    bool isTerminate() { return _bTerminate; }
    // 运行线程
    void run();

protected:
    /**
    * 任务队列
    */    
    queue<TaskFuncPtr> _tasks;
    /**
    * 工作线程
    */
    std::vector<std::thread*> _threads;
    std::mutex _mutex;
    std::condition_variable _condition;
    size_t _threadNum;
    bool _bTerminate;
    std::atomic<int> _atomic{ 0 };
};

#endif // _THREADPOOL_H

threadpool.cpp

#include "threadpool.h"

// 构造函数,初始化:线程数=1、是否退出=false
ThreadPool::ThreadPool()
: _threadNum(1), _bTerminate(false){

}

ThreadPool::~ThreadPool(){
    // 退出所有线程
    stop();
}

bool ThreadPool::init(size_t num)
{
    // lock作用域结束会手动释放锁,在析构函数内部
    std::unique_lock<std::mutex> lock(_mutex);
    if (!_threads.empty()){
        return false;
    }
    _threadNum = num;
    return true;
}

void ThreadPool::stop()
{
    {
        std::unique_lock<std::mutex> lock(_mutex);
        // 标识符为true,标识要退出
        _bTerminate = true;
        // 唤醒线程池中所有线程
        _condition.notify_all();
    }

    for (size_t i = 0; i < _threads.size(); i++)
    {
        if(_threads[i]->joinable()){
            _threads[i]->join();
        }
        delete _threads[i];
        _threads[i] = NULL;
    }
    std::unique_lock<std::mutex> lock(_mutex);
    _threads.clear();
}

bool ThreadPool::start()
{
    std::unique_lock<std::mutex> lock(_mutex);
    if (!_threads.empty()){
        return false;
    }
    // 创建线程,将线程添加到容器中
    for (size_t i = 0; i < _threadNum; i++){
        _threads.push_back(new thread(&ThreadPool::run, this));
    }
    return true;
}

bool ThreadPool::get(TaskFuncPtr& task)
{
    std::unique_lock<std::mutex> lock(_mutex);
    if (_tasks.empty())
    {
        _condition.wait(lock, [this] { return _bTerminate || !_tasks.empty();});
    }

    if (_bTerminate)
        return false;
    
    if (!_tasks.empty())
    {   
        // 取出任务队列中的任务,返回
        task = std::move(_tasks.front());
        _tasks.pop();
        return true;
    }
    return false;
}

void ThreadPool::run() // 执行任务的线程
{
    //调用处理部分
    while (!isTerminate())
    {
        TaskFuncPtr task;
        bool ok = get(task); // 读取任务
        if (ok)
        {
            ++_atomic;
            try
            {
                if (task->_expireTime != 0 && task->_expireTime < TNOWMS ){
                    //超时任务,是否需要处理?
                }
                else{
                    task->_func(); // 执行任务
                }
            }
            catch (...){
            }
            --_atomic;
            //任务都执行完毕了
            std::unique_lock<std::mutex> lock(_mutex);
            if (_atomic == 0 && _tasks.empty()){
                _condition.notify_all(); // 这里只是为了通知waitForAllDone
            }
        }
    }
}

bool ThreadPool::waitForAllDone(int millsecond)
{
    std::unique_lock<std::mutex> lock(_mutex);
    if (_tasks.empty())
        return true;

    if (millsecond < 0){
        _condition.wait(lock, [this] { return _tasks.empty(); });
        return true;
    }
    else{
        // wait和wait_for区别:
        // wait等待条件变量通知返回
        // wait_for:等待一段时间(第二个参数),满足条件或则超时时间返回
        return _condition.wait_for(lock, std::chrono::milliseconds(millsecond),
                [this] { return _tasks.empty(); });
    }
}

int gettimeofday(struct timeval &tv)
{
    #if WIN32
        time_t clock;
        struct tm tm;
        SYSTEMTIME wtm;
        GetLocalTime(&wtm);
        tm.tm_year = wtm.wYear - 1900;
        tm.tm_mon = wtm.wMonth - 1;
        tm.tm_mday = wtm.wDay;
        tm.tm_hour = wtm.wHour;
        tm.tm_min = wtm.wMinute;
        tm.tm_sec = wtm.wSecond;
        tm. tm_isdst = -1;
        clock = mktime(&tm);
        tv.tv_sec = clock;
        tv.tv_usec = wtm.wMilliseconds * 1000;
        return 0;
    #else
        return ::gettimeofday(&tv, 0);
    #endif
}

void getNow(timeval *tv)
{
    #if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
        int idx = _buf_idx;
        *tv = _t[idx];
        if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc){
            addTimeOffset(*tv, idx);
        }
        else{
            TC_Common::gettimeofday(*tv);
        }
    #else
        gettimeofday(*tv);
    #endif
}

int64_t getNowMs()
{
    struct timeval tv;
    getNow(&tv);
    return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}

main.cpp

#include <iostream>
#include "threadpool.h"
using namespace std;

void func0(){
    cout << "func0()" << endl;
}

void func1(int a){
    cout << "func1() a=" << a << endl;
}

void func2(int a, string b){
    cout << "func1() a=" << a << ", b=" << b<< endl;
}

void test1() // 简单测试线程池
{
    ThreadPool threadpool;
    threadpool.init(2);
    threadpool.start(); // 启动线程池
    // 假如要执行的任务
    threadpool.exec(1000,func0);
    threadpool.exec(func1, 10);
    threadpool.exec(func2, 20, "darren");
    threadpool.waitForAllDone();
    threadpool.stop();
}

int func1_future(int a)
{
    cout << "func1() a=" << a << endl;
    return a;
}

string func2_future(int a, string b)
{
    cout << "func1() a=" << a << ", b=" << b<< endl;
    return b;
}

void test2() // 测试任务函数返回值
{
    ThreadPool threadpool;
    threadpool.init(10);
    threadpool.start(); // 启动线程池
    // 假如要执行的任务
    std::future<decltype (func1_future(0))> result1 =
    threadpool.exec(func1_future, 10);
    std::future<string> result2 = threadpool.exec(func2_future, 20, "darren");
    // auto result2 = threadpool.exec(func2_future, 20, "darren");
    std::cout << "result1: " << result1.get() << std::endl;
    std::cout << "result2: " << result2.get() << std::endl;
    threadpool.waitForAllDone();
    threadpool.stop();
}

int main()
{
    // test1(); // 简单测试线程池
    test2(); // 测试任务函数返回值
    cout << "Hello World!" << endl;
    return 0;
}

编译

all:main

main:main.cpp
	g++ -o main main.cpp threadpool.cpp -std=c++11 -pthread

clean:
	rm -rf main

2、Linux C++线程池

pthreadPool.h

#ifndef __PTHREADPOOL_H_
#define __PTHREADPOOL_H_
#include <vector>
#include <atomic>
#include <pthread.h>
#include <iostream>
#include <list>
#include <unistd.h>
using namespace std;

struct student
{
    char name[256];
    unsigned int age;
    int id;
};

class pthreadPool
{
public:
    pthreadPool();
    ~pthreadPool();

public:
    bool createPthread(int threadNUm = 5);
    void stopAll();
    void call();
    void inMsgRecvQueueAndSignal(char* buf);

private:
    static void* threadFunc(void* threadData);
    void clearMsgRecvQueue();
    void msgDispose(char* jobbuf);
private:
    struct pthreadItem
    {
        bool isruning;
        pthreadPool* _pThis;
        pthread_t _Handle;
        pthreadItem(pthreadPool* pthis):_pThis(pthis),isruning(false){}
        ~pthreadItem(){}
    };
private:
    static pthread_cond_t   m_pthreadCond;      // 条件变量
    static pthread_mutex_t  m_pthreadMutex;     // 互斥量
    static bool             m_shutdown;         // 线程退出标识
    
    int                     m_iThreadNum;       // 要创建的线程数
    time_t                  m_iLastTime;        // 上次线程不够用,时间记录
    atomic<int>             m_iRunThreadNum;    // 正在运行线程数量 原子操作

    vector<pthreadItem*>    m_vThread;          // 线程容器
    list<char*>             m_msgRecvQueue;     // 消息队列
    int                     m_iRecvQueueCount;  // 收消息队列大小
};

#endif // !__PTHREADPOOL_

pthreadPool.cpp

#include "pthreadPool.h"

pthread_cond_t pthreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t pthreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
bool pthreadPool::m_shutdown = false;

pthreadPool::pthreadPool()
{
    // 运行的线程数为0,时间为0,消息数0
    m_iRunThreadNum = 0;
    m_iLastTime = 0;
    m_iRecvQueueCount = 0;
}

pthreadPool::~pthreadPool()
{
    clearMsgRecvQueue();
}

bool pthreadPool::createPthread(int threadNum)
{
    if(!threadNum) return false;
    m_iThreadNum =  threadNum;
    
    pthreadItem* item;
    int errCode = 0;
    for(int i=0;i<threadNum;i++){
        item = new pthreadItem(this);
        errCode = pthread_create(&(item->_Handle),NULL,pthreadPool::threadFunc,item);
        if(errCode!=0){
            cout<<"线程创建失败:"<<i<<endl;
            return false;
        }
        m_vThread.push_back(item);
    }

    vector<pthreadItem*>::iterator iter;
lblfor:
    // goto语句作用,为了所有线程都正常启动,并且卡在pthread_cond_wait()这
    for(iter = m_vThread.begin();iter!=m_vThread.end();iter++){
        if((*iter)->isruning == false){
            usleep(100*1000);   // 单位是微秒,100毫秒
            goto lblfor;
        }
    }
    return true;
}

void* pthreadPool::threadFunc(void* threadData)
{
    pthreadItem* pThread = (pthreadItem*)threadData;
    pthreadPool* pPoll = pThread->_pThis;
    int errCode = 0;
    pthread_t tid = pthread_self();
    while(true){
        // 拿锁
        errCode = pthread_mutex_lock(&m_pthreadMutex);
        if(errCode!=0){
            cout<<"pthread_mutex_lock fail threadFunc errCode"<<errCode<<endl;
            return (void*)0;
        }

        while((pPoll->m_msgRecvQueue.size() == 0) && m_shutdown == false){
            if(pThread->isruning == false)
                pThread->isruning = true;
            
            // 整个程序初始化的时候,保证所有线程都卡在这里
            // 当线程走到这里,就会释放锁,处于未激发状态
            // 一旦被激发,就会去拿锁
            pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
        }

        // 判断线程退出条件
        if(m_shutdown){
            pthread_mutex_unlock(&m_pthreadMutex);
            break;
        }

        // 走到这里可以去消息处理
        // 返回第一个元素,没有检查是否存在,走下来就说明有消息
        char* jobbuf = pPoll->m_msgRecvQueue.front();
        pPoll->m_msgRecvQueue.pop_front(); 
        // 消息队列数减1
        --pPoll->m_iRecvQueueCount;

        // 可以解锁了
        pthread_mutex_unlock(&m_pthreadMutex);
        // 能走到这里表示有消息,并且线程正在处理这个消息
        // 正在工作的线程数加1,原子操作
        ++pPoll->m_iRunThreadNum;
        // 消息处理
        //cout<<"消息处理开始:"<<tid<<endl;
        //sleep(3);
        //cout<<"消息处理结束:"<<tid<<endl;
        // 消息处理函数
        pPoll->msgDispose(jobbuf);
        // 消息处理结束
        //释放消息内存,运行线程数--
        ++pPoll->m_iRunThreadNum;
    }
    return (void*)0;
}

void pthreadPool::msgDispose(char* jobbuf)
{
    pthread_t tid = pthread_self();
    struct student* stu = (struct student*)jobbuf;

    cout<<"tid:"<<tid<<" name:"<<stu->name<<" age:"<<stu->age
    <<" id:"<<stu->id<<endl;

    if(stu!=NULL){
        delete stu;
        stu = NULL;
    }
}

void pthreadPool::call()
{
    // 唤醒一个等待该条件的线程,也可能是多个,也就是可以唤醒卡在pthread_cond_wait
    int errCode = pthread_cond_signal(&m_pthreadCond);
    if(errCode!=0){
        cout<<"call fail"<<endl;
        return;
    }
    // 如果工作线程数==开辟线程数需要扩容
    if(m_iRunThreadNum == m_iThreadNum)
    {
        time_t currentime = time(NULL);
        if(currentime-m_iLastTime >10)
        {
            m_iLastTime = currentime;
            cout<<"Call()发现线程池中当前空闲线程数量为0,需要考虑扩容"<<endl;
        }
    }
    return;
}

void pthreadPool::stopAll()
{
    if(m_shutdown)
        return;
    m_shutdown = true;
    int errCode = pthread_cond_broadcast(&m_pthreadCond);
    if(errCode!=0){
        cout<<"stopAll faile"<<endl;
        return;
    }

    // 等待所有线程结束
    vector<pthreadItem*>::iterator iter;
    for(iter=m_vThread.begin();iter!=m_vThread.end();iter++){
        pthread_join((*iter)->_Handle,NULL);
        if((*iter))
            delete *iter;
    }
    m_vThread.clear();
    pthread_cond_destroy(&m_pthreadCond);
    pthread_mutex_destroy(&m_pthreadMutex);
    cout<<"成功返回,线程池中线程全部正常退出"<<endl;
    return;
}

void pthreadPool::clearMsgRecvQueue()
{
    while(!m_msgRecvQueue.empty())
    {
        char* buf = m_msgRecvQueue.front();
        m_msgRecvQueue.pop_front();
        if(buf!=NULL){
            delete buf;
            buf = NULL;
        }
    }
}

void pthreadPool::inMsgRecvQueueAndSignal(char* buf)
{
    // 先互斥住
    int errCode = pthread_mutex_lock(&m_pthreadMutex);
    if(errCode!=0){
        cout<<"inMsgRecvQueueAndSignal faile lock"<<endl;
    }
    m_msgRecvQueue.push_back(buf);
    ++m_iRecvQueueCount;
    errCode = pthread_mutex_unlock(&m_pthreadMutex);
    if(errCode!=0){
        cout<<"inMsgRecvQueueAndSignal faile unlock"<<endl;
    }
    // 激发线程做事
    call();
    return;
}

pthreadPool.cpp

#include "pthreadPool.h"

int main()
{
    pthreadPool* pool = new pthreadPool();
    pool->createPthread(6);
    for(int i=0;i<1000;i++){
        struct student* stu = new student();
        sprintf(stu->name,"name-%d",i);
        stu->age = i;
        stu->id = 1000+i;
        pool->inMsgRecvQueueAndSignal((char*)stu);
    }
    pool->stopAll();
    if(pool!=NULL){
        delete pool;
        pool = NULL;
    }
    pthread_exit(0);
}

编译

all:pthreadPool

pthreadPool:pthreadPool.h pthreadPool.cpp pthreadPoolText.cpp
	g++ -o pthreadPool pthreadPool.cpp pthreadPoolText.cpp -pthread -std=c++11

文章来源地址https://www.toymoban.com/news/detail-486135.html

到了这里,关于C++11线程池和Linux C++线程对比使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python-16-线程池和进程池python并发编程

    Python ThreadPoolExecutor线程池 线程池的基本原理是什么? 利用Python快速实现一个线程池,非常简单 Python并发编程专题 一、为什么要引入并发编程? 场景1:一个网络爬虫,按顺序爬取花了1小时,采用并发下载减少到20分钟! 场景2:一个APP应用,优化前每次打开页面需要3秒,采

    2024年02月04日
    浏览(38)
  • 【C++】—— C++11之线程库

    前言: 在本期,我将给大家介绍的是 C++11 中新引进的知识,即关于 线程库 的相关知识。 目录 (一)线程库的介绍 1、线程库的由来 2、线程库的简单介绍 (二)线程函数参数 (三)原子性操作库 (四)lock_guard与unique_lock 1、mutex的种类 2、lock_guard 3、unique_lock (五)condi

    2024年02月11日
    浏览(30)
  • 【C++】C++11:线程库和包装器

    C++11最后一篇文章 文章目录 前言 一、线程库 二、包装器和绑定 总结 上一篇文章中我们详细讲解了lambda表达式的使用,我们今天所用的线程相关的知识会大量的用到lambda表达式,所以对lambda表达式还模糊不清的可以先将上一篇文章看明白。 一、线程库 在 C++11 之前,涉及到

    2024年02月10日
    浏览(45)
  • 【C++入门到精通】 线程库 | thread类 C++11 [ C++入门 ]

    当讨论现代编程语言的特性时,C++11无疑是一个不容忽视的里程碑。在前一篇文章中,我们深入探讨了Lambda表达式在C++11中的引入和应用。本文将继续探索C++11的强大功能,具体而言,我们这篇文章将聚焦于线程库和其中的thread类。 线程在多任务处理中起着至关重要的作用,它

    2024年02月04日
    浏览(42)
  • 【C++】C++11线程库 和 C++IO流

    春风若有怜花意,可否许我再少年。 1. C++11的线程库实际封装了windows和linux底层的原生线程库接口,在不同的操作系统下运行时,C++11线程库可以通过条件编译的方式来适配的使用不同的接口,比如在linux下,就用封装POSIX线程库的接口来进行多线程编程,在windows下,就用封装

    2024年02月07日
    浏览(32)
  • C++之C++11 thread线程示例(一百三十八)

    简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏: Audio工程师进阶系列 【 原创干货持续更新中…… 】🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药. 更多原创,欢迎关注:An

    2023年04月15日
    浏览(45)
  • Linux环境使用VSCode调试简单C++代码

    本文将通过演示一个简单C++代码的编译调试过程,介绍在VSCode中如何使用Linux环境下的GCC C++编译器(g++)和GDB 调试器(gdb)。 关于GCC、g++、gcc、gdb,这里不做详细介绍,如果感兴趣可以参考另一篇文章:详解C/C++代码的预处理、编译、汇编、链接全过程,里面有详细的解释。 看懂

    2024年02月04日
    浏览(50)
  • 【C#】通过C#代码创建IIS应用程序池和网站

    代码 参考了其他人的代码,做了一些优化,很多代码用不上 例子 参考 自写C#添加网站应用程序池虚拟目录实现网站管理 » 江湖人士 使用命令行在 Windows 11 上启用 IIS Web 服务器-云东方 用C# 代码咋管理IIS呢?_零零妖的博客-CSDN博客 C#IIS网站应用程序池列表添加修改删除 - -

    2024年02月13日
    浏览(43)
  • 【C++】 C++11(右值引用,移动语义,bind,包装器,lambda,线程库)

    C++11是C++语言的一次重大更新版本,于2011年发布,它包含了一些非常有用的新特性,为开发人员提供了更好的编程工具和更好的编程体验,使得编写高效、可维护、安全的代码更加容易。 一些C++11的新增特性包括: 强制类型枚举,使得枚举类型的通常行为更加可靠和容易控制

    2024年02月10日
    浏览(42)
  • 【Linux-day11-线程的创建与同步】

    线程的概念 线程是进程内部的一条执行序列或执行路径,一个进程可以包含多条线程。 进程与线程的区别 进程是资源分配的最小单位,线程是 CPU 调度的最小单位 进程有自己的独立地址空间,线程共享进程中的地址空间 进程的创建消耗资源大,线程的创建相对较小 进程的

    2024年02月09日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包