1、C++11线程池代码
threadpool.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;
void getNow(timeval *tv);
int64_t getNowMs();
#define TNOW getNow()
#define TNOWMS getNowMs()
class ThreadPool
{
protected:
struct TaskFunc
{
TaskFunc(uint64_t expireTime) : _expireTime(expireTime)
{ }
std::function<void()> _func;
int64_t _expireTime = 0; //超时的绝对时间
};
// 定义一个智能指针类型
typedef shared_ptr<TaskFunc> TaskFuncPtr;
public:
ThreadPool();
// 析构函数停止所有线程
virtual ~ThreadPool();
// 初始化线程个数
bool init(size_t num);
// 获取线程数
size_t getThreadNum(){
std::unique_lock<std::mutex> lock(_mutex);
return _threads.size();
}
// 获取线程池任务数
size_t getJobNum(){
std::unique_lock<std::mutex> lock(_mutex);
return _tasks.size();
}
// 停止所有线程,等待所有线程结束
void stop();
// 启动所有线程
bool start();
// 用线程池启用任务(F是function, Args是参数)
template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
return exec(0,f,args...);
}
// 用线程池启用任务(F是function, Args是参数)
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) ->
std::future<decltype(f(args...))>
{
int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取现在时间
//定义返回值类型
using RetType = decltype(f(args...)); // 推导返回值
// 封装任务
auto task = std::make_shared<std::packaged_task<RetType()>>
(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
// 封装任务指针,设置过期时间
TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);
// 具体执行的函数
fPtr->_func = [task]() {
(*task)();
};
std::unique_lock<std::mutex> lock(_mutex);
// 插入任务
_tasks.push(fPtr);
// 唤醒阻塞的线程,可以考虑只有任务队列为空的情
_condition.notify_one();
//况再去notify
return task->get_future();;
}
// 等待当前任务队列中,所有工作全部结束
bool waitForAllDone(int millsecond = -1);
protected:
// 获取任务
bool get(TaskFuncPtr&task);
// 线程池是否退出
bool isTerminate() { return _bTerminate; }
// 运行线程
void run();
protected:
/**
* 任务队列
*/
queue<TaskFuncPtr> _tasks;
/**
* 工作线程
*/
std::vector<std::thread*> _threads;
std::mutex _mutex;
std::condition_variable _condition;
size_t _threadNum;
bool _bTerminate;
std::atomic<int> _atomic{ 0 };
};
#endif // _THREADPOOL_H
threadpool.cpp
#include "threadpool.h"
// 构造函数,初始化:线程数=1、是否退出=false
ThreadPool::ThreadPool()
: _threadNum(1), _bTerminate(false){
}
ThreadPool::~ThreadPool(){
// 退出所有线程
stop();
}
bool ThreadPool::init(size_t num)
{
// lock作用域结束会手动释放锁,在析构函数内部
std::unique_lock<std::mutex> lock(_mutex);
if (!_threads.empty()){
return false;
}
_threadNum = num;
return true;
}
void ThreadPool::stop()
{
{
std::unique_lock<std::mutex> lock(_mutex);
// 标识符为true,标识要退出
_bTerminate = true;
// 唤醒线程池中所有线程
_condition.notify_all();
}
for (size_t i = 0; i < _threads.size(); i++)
{
if(_threads[i]->joinable()){
_threads[i]->join();
}
delete _threads[i];
_threads[i] = NULL;
}
std::unique_lock<std::mutex> lock(_mutex);
_threads.clear();
}
bool ThreadPool::start()
{
std::unique_lock<std::mutex> lock(_mutex);
if (!_threads.empty()){
return false;
}
// 创建线程,将线程添加到容器中
for (size_t i = 0; i < _threadNum; i++){
_threads.push_back(new thread(&ThreadPool::run, this));
}
return true;
}
bool ThreadPool::get(TaskFuncPtr& task)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_tasks.empty())
{
_condition.wait(lock, [this] { return _bTerminate || !_tasks.empty();});
}
if (_bTerminate)
return false;
if (!_tasks.empty())
{
// 取出任务队列中的任务,返回
task = std::move(_tasks.front());
_tasks.pop();
return true;
}
return false;
}
void ThreadPool::run() // 执行任务的线程
{
//调用处理部分
while (!isTerminate())
{
TaskFuncPtr task;
bool ok = get(task); // 读取任务
if (ok)
{
++_atomic;
try
{
if (task->_expireTime != 0 && task->_expireTime < TNOWMS ){
//超时任务,是否需要处理?
}
else{
task->_func(); // 执行任务
}
}
catch (...){
}
--_atomic;
//任务都执行完毕了
std::unique_lock<std::mutex> lock(_mutex);
if (_atomic == 0 && _tasks.empty()){
_condition.notify_all(); // 这里只是为了通知waitForAllDone
}
}
}
}
bool ThreadPool::waitForAllDone(int millsecond)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_tasks.empty())
return true;
if (millsecond < 0){
_condition.wait(lock, [this] { return _tasks.empty(); });
return true;
}
else{
// wait和wait_for区别:
// wait等待条件变量通知返回
// wait_for:等待一段时间(第二个参数),满足条件或则超时时间返回
return _condition.wait_for(lock, std::chrono::milliseconds(millsecond),
[this] { return _tasks.empty(); });
}
}
int gettimeofday(struct timeval &tv)
{
#if WIN32
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tv.tv_sec = clock;
tv.tv_usec = wtm.wMilliseconds * 1000;
return 0;
#else
return ::gettimeofday(&tv, 0);
#endif
}
void getNow(timeval *tv)
{
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
int idx = _buf_idx;
*tv = _t[idx];
if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc){
addTimeOffset(*tv, idx);
}
else{
TC_Common::gettimeofday(*tv);
}
#else
gettimeofday(*tv);
#endif
}
int64_t getNowMs()
{
struct timeval tv;
getNow(&tv);
return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}
main.cpp
#include <iostream>
#include "threadpool.h"
using namespace std;
void func0(){
cout << "func0()" << endl;
}
void func1(int a){
cout << "func1() a=" << a << endl;
}
void func2(int a, string b){
cout << "func1() a=" << a << ", b=" << b<< endl;
}
void test1() // 简单测试线程池
{
ThreadPool threadpool;
threadpool.init(2);
threadpool.start(); // 启动线程池
// 假如要执行的任务
threadpool.exec(1000,func0);
threadpool.exec(func1, 10);
threadpool.exec(func2, 20, "darren");
threadpool.waitForAllDone();
threadpool.stop();
}
int func1_future(int a)
{
cout << "func1() a=" << a << endl;
return a;
}
string func2_future(int a, string b)
{
cout << "func1() a=" << a << ", b=" << b<< endl;
return b;
}
void test2() // 测试任务函数返回值
{
ThreadPool threadpool;
threadpool.init(10);
threadpool.start(); // 启动线程池
// 假如要执行的任务
std::future<decltype (func1_future(0))> result1 =
threadpool.exec(func1_future, 10);
std::future<string> result2 = threadpool.exec(func2_future, 20, "darren");
// auto result2 = threadpool.exec(func2_future, 20, "darren");
std::cout << "result1: " << result1.get() << std::endl;
std::cout << "result2: " << result2.get() << std::endl;
threadpool.waitForAllDone();
threadpool.stop();
}
int main()
{
// test1(); // 简单测试线程池
test2(); // 测试任务函数返回值
cout << "Hello World!" << endl;
return 0;
}
编译
all:main
main:main.cpp
g++ -o main main.cpp threadpool.cpp -std=c++11 -pthread
clean:
rm -rf main
2、Linux C++线程池
pthreadPool.h
#ifndef __PTHREADPOOL_H_
#define __PTHREADPOOL_H_
#include <vector>
#include <atomic>
#include <pthread.h>
#include <iostream>
#include <list>
#include <unistd.h>
using namespace std;
struct student
{
char name[256];
unsigned int age;
int id;
};
class pthreadPool
{
public:
pthreadPool();
~pthreadPool();
public:
bool createPthread(int threadNUm = 5);
void stopAll();
void call();
void inMsgRecvQueueAndSignal(char* buf);
private:
static void* threadFunc(void* threadData);
void clearMsgRecvQueue();
void msgDispose(char* jobbuf);
private:
struct pthreadItem
{
bool isruning;
pthreadPool* _pThis;
pthread_t _Handle;
pthreadItem(pthreadPool* pthis):_pThis(pthis),isruning(false){}
~pthreadItem(){}
};
private:
static pthread_cond_t m_pthreadCond; // 条件变量
static pthread_mutex_t m_pthreadMutex; // 互斥量
static bool m_shutdown; // 线程退出标识
int m_iThreadNum; // 要创建的线程数
time_t m_iLastTime; // 上次线程不够用,时间记录
atomic<int> m_iRunThreadNum; // 正在运行线程数量 原子操作
vector<pthreadItem*> m_vThread; // 线程容器
list<char*> m_msgRecvQueue; // 消息队列
int m_iRecvQueueCount; // 收消息队列大小
};
#endif // !__PTHREADPOOL_
pthreadPool.cpp
#include "pthreadPool.h"
pthread_cond_t pthreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t pthreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
bool pthreadPool::m_shutdown = false;
pthreadPool::pthreadPool()
{
// 运行的线程数为0,时间为0,消息数0
m_iRunThreadNum = 0;
m_iLastTime = 0;
m_iRecvQueueCount = 0;
}
pthreadPool::~pthreadPool()
{
clearMsgRecvQueue();
}
bool pthreadPool::createPthread(int threadNum)
{
if(!threadNum) return false;
m_iThreadNum = threadNum;
pthreadItem* item;
int errCode = 0;
for(int i=0;i<threadNum;i++){
item = new pthreadItem(this);
errCode = pthread_create(&(item->_Handle),NULL,pthreadPool::threadFunc,item);
if(errCode!=0){
cout<<"线程创建失败:"<<i<<endl;
return false;
}
m_vThread.push_back(item);
}
vector<pthreadItem*>::iterator iter;
lblfor:
// goto语句作用,为了所有线程都正常启动,并且卡在pthread_cond_wait()这
for(iter = m_vThread.begin();iter!=m_vThread.end();iter++){
if((*iter)->isruning == false){
usleep(100*1000); // 单位是微秒,100毫秒
goto lblfor;
}
}
return true;
}
void* pthreadPool::threadFunc(void* threadData)
{
pthreadItem* pThread = (pthreadItem*)threadData;
pthreadPool* pPoll = pThread->_pThis;
int errCode = 0;
pthread_t tid = pthread_self();
while(true){
// 拿锁
errCode = pthread_mutex_lock(&m_pthreadMutex);
if(errCode!=0){
cout<<"pthread_mutex_lock fail threadFunc errCode"<<errCode<<endl;
return (void*)0;
}
while((pPoll->m_msgRecvQueue.size() == 0) && m_shutdown == false){
if(pThread->isruning == false)
pThread->isruning = true;
// 整个程序初始化的时候,保证所有线程都卡在这里
// 当线程走到这里,就会释放锁,处于未激发状态
// 一旦被激发,就会去拿锁
pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
}
// 判断线程退出条件
if(m_shutdown){
pthread_mutex_unlock(&m_pthreadMutex);
break;
}
// 走到这里可以去消息处理
// 返回第一个元素,没有检查是否存在,走下来就说明有消息
char* jobbuf = pPoll->m_msgRecvQueue.front();
pPoll->m_msgRecvQueue.pop_front();
// 消息队列数减1
--pPoll->m_iRecvQueueCount;
// 可以解锁了
pthread_mutex_unlock(&m_pthreadMutex);
// 能走到这里表示有消息,并且线程正在处理这个消息
// 正在工作的线程数加1,原子操作
++pPoll->m_iRunThreadNum;
// 消息处理
//cout<<"消息处理开始:"<<tid<<endl;
//sleep(3);
//cout<<"消息处理结束:"<<tid<<endl;
// 消息处理函数
pPoll->msgDispose(jobbuf);
// 消息处理结束
//释放消息内存,运行线程数--
++pPoll->m_iRunThreadNum;
}
return (void*)0;
}
void pthreadPool::msgDispose(char* jobbuf)
{
pthread_t tid = pthread_self();
struct student* stu = (struct student*)jobbuf;
cout<<"tid:"<<tid<<" name:"<<stu->name<<" age:"<<stu->age
<<" id:"<<stu->id<<endl;
if(stu!=NULL){
delete stu;
stu = NULL;
}
}
void pthreadPool::call()
{
// 唤醒一个等待该条件的线程,也可能是多个,也就是可以唤醒卡在pthread_cond_wait
int errCode = pthread_cond_signal(&m_pthreadCond);
if(errCode!=0){
cout<<"call fail"<<endl;
return;
}
// 如果工作线程数==开辟线程数需要扩容
if(m_iRunThreadNum == m_iThreadNum)
{
time_t currentime = time(NULL);
if(currentime-m_iLastTime >10)
{
m_iLastTime = currentime;
cout<<"Call()发现线程池中当前空闲线程数量为0,需要考虑扩容"<<endl;
}
}
return;
}
void pthreadPool::stopAll()
{
if(m_shutdown)
return;
m_shutdown = true;
int errCode = pthread_cond_broadcast(&m_pthreadCond);
if(errCode!=0){
cout<<"stopAll faile"<<endl;
return;
}
// 等待所有线程结束
vector<pthreadItem*>::iterator iter;
for(iter=m_vThread.begin();iter!=m_vThread.end();iter++){
pthread_join((*iter)->_Handle,NULL);
if((*iter))
delete *iter;
}
m_vThread.clear();
pthread_cond_destroy(&m_pthreadCond);
pthread_mutex_destroy(&m_pthreadMutex);
cout<<"成功返回,线程池中线程全部正常退出"<<endl;
return;
}
void pthreadPool::clearMsgRecvQueue()
{
while(!m_msgRecvQueue.empty())
{
char* buf = m_msgRecvQueue.front();
m_msgRecvQueue.pop_front();
if(buf!=NULL){
delete buf;
buf = NULL;
}
}
}
void pthreadPool::inMsgRecvQueueAndSignal(char* buf)
{
// 先互斥住
int errCode = pthread_mutex_lock(&m_pthreadMutex);
if(errCode!=0){
cout<<"inMsgRecvQueueAndSignal faile lock"<<endl;
}
m_msgRecvQueue.push_back(buf);
++m_iRecvQueueCount;
errCode = pthread_mutex_unlock(&m_pthreadMutex);
if(errCode!=0){
cout<<"inMsgRecvQueueAndSignal faile unlock"<<endl;
}
// 激发线程做事
call();
return;
}
pthreadPool.cpp
#include "pthreadPool.h"
int main()
{
pthreadPool* pool = new pthreadPool();
pool->createPthread(6);
for(int i=0;i<1000;i++){
struct student* stu = new student();
sprintf(stu->name,"name-%d",i);
stu->age = i;
stu->id = 1000+i;
pool->inMsgRecvQueueAndSignal((char*)stu);
}
pool->stopAll();
if(pool!=NULL){
delete pool;
pool = NULL;
}
pthread_exit(0);
}
编译
all:pthreadPool
pthreadPool:pthreadPool.h pthreadPool.cpp pthreadPoolText.cpp
g++ -o pthreadPool pthreadPool.cpp pthreadPoolText.cpp -pthread -std=c++11
文章来源地址https://www.toymoban.com/news/detail-486135.html
文章来源:https://www.toymoban.com/news/detail-486135.html
到了这里,关于C++11线程池和Linux C++线程对比使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!