等待事件或等待其他条件
坐车案例
方法一:不睡觉,时刻关注自己的位置
1 #include <iostream> 2 #include <thread> 3 #include <mutex> 4 using namespace std; 5 6 mutex _mtx; 7 bool bFlag = false; 8 void wait_for_flag() 9 { 10 auto startTime = chrono::steady_clock::now(); 11 while (1) 12 { 13 unique_lock<mutex> lock(_mtx); 14 if (bFlag) 15 { 16 auto endTime = chrono::steady_clock::now(); 17 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 18 cout << "wait_for_flag consume : " << dCount << endl; 19 return; 20 } 21 } 22 } 23 24 void set_flag() 25 { 26 auto startTime = chrono::steady_clock::now(); 27 unique_lock<mutex> lock(_mtx); 28 for (int i = 0; i < 5; i++) 29 { 30 lock.unlock(); 31 //do something comsume 1000ms 32 this_thread::sleep_for(chrono::milliseconds(1000)); 33 lock.lock(); 34 } 35 36 bFlag = true; 37 auto endTime = chrono::steady_clock::now(); 38 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 39 cout << "set_flag consume : " << dCount << endl; 40 } 41 42 int main() 43 { 44 thread th1(wait_for_flag); 45 thread th2(set_flag); 46 th1.join(); 47 th2.join(); 48 return 0; 49 }
set_flag consume : 5045.39 wait_for_flag consume : 5045.97
方法二:通过设定多个闹钟,每隔一段时间叫醒自己
1 void wait_for_flag() 2 { 3 auto startTime = chrono::steady_clock::now(); 4 unique_lock<mutex> lock(_mtx); 5 while (!bFlag) 6 { 7 lock.unlock(); 8 //设置 500ms 的闹钟 9 this_thread::sleep_for(chrono::milliseconds(500)); 10 lock.lock(); 11 } 12 13 auto endTime = chrono::steady_clock::now(); 14 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 15 cout << "wait_for_flag consume : " << dCount << endl; 16 }
上面代码中引用了 this_thread::sleep_for()函数,如果暂时不满足条件,就让线程休眠。
set_flag consume : 5061.66 wait_for_flag consume : 5570.77
方法三:让列车员叫醒你(使用 c++提供的同步机制)
1 mutex _mtx; 2 bool bFlag = false; 3 condition_variable _cond; //条件变量 4 void wait_for_flag() 5 { 6 auto startTime = chrono::steady_clock::now(); 7 unique_lock<mutex> lock(_mtx); 8 _cond.wait(lock, []() {return bFlag; }); //等待 9 10 auto endTime = chrono::steady_clock::now(); 11 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 12 cout << "wait_for_flag consume : " << dCount << endl; 13 } 14 15 void set_flag() 16 { 17 auto startTime = chrono::steady_clock::now(); 18 unique_lock<mutex> lock(_mtx); 19 for (int i = 0; i < 5; i++) 20 { 21 lock.unlock(); 22 //do something comsume 1000ms 23 this_thread::sleep_for(chrono::milliseconds(1000)); 24 lock.lock(); 25 } 26 27 bFlag = true; 28 _cond.notify_one(); //通知 29 auto endTime = chrono::steady_clock::now(); 30 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 31 cout << "set_flag consume : " << dCount << endl; 32 }
set_flag consume : 5015.84 wait_for_flag consume : 5016.75
条件变量
std::condition_variable
condition_variable(); ~condition_variable(); condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete;
void notify_one(); //唤醒一个等待者 void notify_all(); //唤醒所有等待者
void wait(unique_lock<mutex>& _Lck); void wait(unique_lock<mutex>& _Lck, _Predicate _Pred);
参数:
wait
template <class _Rep, class _Period> cv_status wait_for( unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time); template <class _Rep, class _Period, class _Predicate> bool wait_for( unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred);
参数:
wait_
template <class _Clock, class _Duration> cv_status wait_until( unique_lock<mutex>&_Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time); template <class _Clock, class _Duration, class _Predicate> bool wait_until( unique_lock<mutex>&_Lck, const chrono::time_point<_Clock, _Duration>&_Abs_time, _Predicate _Pred);
参数:
返回值:
含义:
std::condition_variable_any
condition_variable_any(); ~condition_variable_any(); condition_variable_any(const condition_variable_any&) = delete; condition_variable_any& operator=(const condition_variable_any&) = delete;
void notify_one(); void notify_all();
template <class _Lock> void wait(_Lock& _Lck); template <class _Lock, class _Predicate> void wait(_Lock& _Lck, _Predicate _Pred);
template <class _Lock, class _Rep, class _Period> cv_status wait_for( _Lock & _Lck, const chrono::duration<_Rep, _Period>& _Rel_time); template <class _Lock, class _Rep, class _Period, class _Predicate> bool wait_for( _Lock & _Lck, const chrono::duration<_Rep, _Period>&_Rel_time, _Predicate _Pred);
template <class _Lock, class _Clock, class _Duration> cv_status wait_until(_Lock & _Lck, const chrono::time_point<_Clock, _Duration>&_Abs_time) template <class _Lock, class _Clock, class _Duration, class _Predicate> bool wait_until(_Lock & _Lck, const chrono::time_point<_Clock, _Duration>&_Abs_time, _Predicate _Pred)
虚假唤醒
1 mutex _mtx; 2 condition_variable _cond; 3 queue<int> _dataQueue; 4 5 void data_preparation_thread() 6 { 7 while (true) 8 { 9 int _data = rand(); 10 { 11 std::lock_guard<mutex> lock(_mtx); 12 _dataQueue.push(_data); 13 } 14 _cond.notify_all(); 15 this_thread::sleep_for(chrono::milliseconds(1000)); 16 } 17 } 18 19 void data_processing_thread() 20 { 21 while (true) 22 { 23 std::unique_lock<mutex> lock(_mtx); 24 _cond.wait(lock, []() 25 { 26 bool bEmpty = _dataQueue.empty(); 27 if (bEmpty) 28 cout << this_thread::get_id() << " be spurious waken up\n"; 29 30 return !bEmpty; 31 }); 32 int _data = _dataQueue.front(); 33 _dataQueue.pop(); 34 lock.unlock(); 35 36 cout << "threadID : " << this_thread::get_id() << " data = " << _data << endl; 37 } 38 } 39 40 int main() 41 { 42 srand(time(NULL)); 43 44 thread th1(data_processing_thread); 45 thread th2(data_processing_thread); 46 thread th3(data_preparation_thread); 47 th1.join(); 48 th2.join(); 49 th3.join(); 50 return 0; 51 }
唤醒丢失
1 void wait_for_flag() 2 { 3 unique_lock<mutex> lock(_mtx); 4 _cond.wait(lock); //等待 5 } 6 7 void set_flag() 8 { 9 unique_lock<mutex> lock(_mtx); 10 bFlag = true; 11 _cond.notify_one(); //通知 12 } 13 14 int main() 15 { 16 thread th1(set_flag); 17 thread th2(wait_for_flag); 18 th1.join(); 19 th2.join(); 20 return 0; 21 }
使用 future 等待一次性事件发生
std::future
构造函数
future(); ~future(); future(future&& _Other); future& operator=(future&& _Right); future(const future&) = delete; future& operator=(const future&) = delete;
valid()函数
bool valid() const;
get()函数
template <class _Ty> _Ty get();
wait()函数
void wait() const;
wait_for()函数
template <class _Rep, class _Per> future_status wait_for(const chrono::duration<_Rep, _Per>& _Rel_time);
wait_until()函数
template <class _Clock, class _Dur> future_status wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time);
share()函数
template <class _Ty> shared_future<_Ty> share();
std::shared_future
构造函数
shared_future(); //构造函数 ~shared_future(); //析构函数 //支持拷贝 shared_future(const shared_future& _Other); shared_future& operator=(const shared_future& _Right); //支持移动 shared_future(future<_Ty>&& _Other); shared_future& operator=(shared_future&& _Right);
valid()函数
bool valid() const;
get()函数
template <class _Ty> const _Ty& get() const;
等待
void wait() const; template <class _Rep, class _Per> future_status wait_for( const chrono::duration<_Rep, _Per>&_Rel_time); template <class _Clock, class _Dur> future_status wait_until( const chrono::time_point<_Clock, _Dur>& _Abs_time);
案例-构造std::shared_future对象
1 std::promise<int> pro; 2 std::future<int> _fu = pro.get_future(); 3 std::shared_future<int> _sfu = std::move(_fu); //显示 4 5 std::promise<int> pro; 6 std::shared_future<int> _sfu = pro.get_future(); //隐式 7 8 std::promise<int> pro; 9 std::future<int> _fu = pro.get_future(); 10 std::shared_future<int> _sfu = _fu.share(); //share函数
std::async()函数-从后台任务返回值
函数定义
template <class _Fty, class... _ArgTypes> std::future<...> async(_Fty&& _Fnarg, _ArgTypes&&... _Args); template <class _Fty, class... _ArgTypes> std::future<...> async(launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args);
参数传递流程
//省略Res_Data类 int Entry(Res_Data data) { cout << "-----------"; return 5; } int main() { Res_Data _data; auto _fu = std::async(Entry, _data); cout << _fu.get() << endl; return 0; }
输出如下:
008FF9E3 Constractor 008FF6E8 Copy Constractor 008FF33C Move Constractor 00CE0854 Move Constractor 008FF33C Destractor 008FF6E8 Destractor 009FDE74 Move Constractor ----------- 009FDE74 Destractor 00CE0854 Destractor 5 008FF9E3 Destractor
案例
int ThreadEntry() { cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl; std::this_thread::sleep_for(std::chrono::milliseconds(5000)); cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl; return 5; } int main() { cout << "main threadId : " << std::this_thread::get_id() << endl; std::future<int> fu = std::async(ThreadEntry); cout << "main thread to do something" << endl; cout << fu.get() << endl; //等待线程结束并获取值,不能重复调用 //fu.wait(); //仅等待线程结束 cout << "main thread end doing something" << endl; return 0; }
如何判断std::async的执行方式
1 std::future _fu = std::async(ThreadEntry); 2 future_status status = _fu.wait_for(std::chrono::microseconds(0));//等待0ms 3 if (status == future_status::deferred) //延时执行 4 { 5 //todo 6 } 7 else if (status == future_status::ready || status == future_status::timeout) 8 { 9 //todo 10 }
std::async与std::thread的区别
std::packaged_task - 关联future实例和任务
构造函数
packaged_task(); //默认构造函数 ~packaged_task(); //析构函数 template <class _Fty2, ...> packaged_task(_Fty2&& _Fnarg) //传入一个可调用对象 //支持移动 packaged_task(packaged_task&& _Other); packaged_task& operator=(packaged_task&& _Other); //不支持拷贝 packaged_task(const packaged_task&) = delete; packaged_task& operator=(const packaged_task&) = delete;
int ThreadEntry(string& str, double dValue, char* pBuf); std::packaged_task<int(string&, double, char*)> task(ThreadEntry);
valid()函数
bool valid() const;
swap()函数
void swap(packaged_task& _Other);
get_future()函数
future<_Ret> get_future();
operator()()函数
void operator()(_ArgTypes... _Args);
make_ready_at_thread_exit()函数
void make_ready_at_thread_exit(_ArgTypes... _Args);
1 int ThreadEntry() 2 { 3 cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl; 4 std::this_thread::sleep_for(std::chrono::milliseconds(5000)); 5 cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl; 6 return 5; 7 } 8 9 int main() 10 { 11 std::packaged_task<int(void)> task(ThreadEntry); 12 std::future<int> _fu = task.get_future(); 13 14 task.make_ready_at_thread_exit(); //执行该函数后,_fu未就绪,调用get将出错 15 //task(); //_fu立马就绪,调用get()不会出错 16 cout << "value = " << _fu.get() << endl; 17 return 0; 18 }
正确用法:
1 int main() 2 { 3 std::packaged_task<int(void)> task(ThreadEntry); 4 std::future<int> _fu = task.get_future(); 5 6 thread th1([&]() {task.make_ready_at_thread_exit(); }); 7 th1.detach(); 8 9 cout << "value = " << _fu.get() << endl; 10 return 0; 11 }
reset()函数
void reset();
案例-在线程间传递任务
1 std::mutex m; 2 std::deque<std::packaged_task<void()>> tasks; 3 4 void gui_thread() 5 { 6 while (...) 7 { 8 std::packaged_task<void()> task; 9 { 10 std::lock_guard<std::mutex> lk(m); 11 if (tasks.empty()) 12 continue; 13 task = std::move(tasks.front()); 14 tasks.pop_front(); 15 } 16 task(); 17 //其他操作 18 } 19 } 20 21 template<typename Func> 22 std::future<void> post_task_for_gui_thread(Func f) 23 { 24 std::packaged_task<void()> task(f); 25 std::future<void> res = task.get_future(); 26 std::lock_guard<std::mutex> lk(m); 27 tasks.push_back(std::move(task)); 28 return res; 29 }
std::promise-创建std::promise
1 void setValue(int nData, std::promise<int>& _pro) 2 { 3 cout << "set value threadID : " << this_thread::get_id() << endl; 4 this_thread::sleep_for(chrono::seconds(3)); //休息3秒 5 _pro.set_value( nData * 2 + 5 ); 6 } 7 8 void getValue(std::future<int>& _fu) 9 { 10 cout << "get value threadID : " << this_thread::get_id() << endl; 11 cout << "data = " << _fu.get() << endl; 12 } 13 14 int main() 15 { 16 std::promise<int> pro; 17 std::future fu = pro.get_future(); 18 19 thread th1(setValue, 10, std::ref(pro)); 20 thread th2(getValue, std::ref(fu)); 21 22 th1.join(); 23 th2.join(); 24 return 0; 25 }
构造函数
promise(); ~promise(); template <class _Alloc> promise(allocator_arg_t, const _Alloc& _Al); //移动构造 promise(promise&& _Other); promise& operator=(promise&& _Other); //无拷贝 promise(const promise&) = delete; promise& operator=(const promise&) = delete;
swap()函数
void swap(promise& _Other);
get_future()函数
future<_Ty> get_future();
set_value()函数
void set_value(const _Ty& _Val); void set_value(_Ty&& _Val);
set_value_at_thread_exit()函数
void set_value_at_thread_exit(const _Ty& _Val); void set_value_at_thread_exit(_Ty&& _Val);
set_exception()函数
void set_exception(exception_ptr _Exc);
set_exception_at_thread_exit()函数
void set_exception_at_thread_exit(exception_ptr _Exc);
案例
1 void process_connections(vector<ConnectPackage>& connections) 2 { 3 while (!done(connections)) 4 { 5 for (auto itr : connections) 6 { 7 if (itr->has_incoming_data()) //有数据传入,接收 8 { 9 data_packet data = connection->incoming(); 10 std::promise<payload_type>& p = connection->get_promise(data.id); 11 p.set_value(data.payload); 12 } 13 14 if (connection->has_outgoing_data()) //有数据需向外传递 15 { 16 outgoing_packet data = connection->top_of_outgoing_queue(); 17 connection->send(data.payload); 18 data.promise.set_value(true); 19 } 20 } 21 } 22 }
将异常保存到future中
1 double square_root(double x) 2 { 3 if (x < 0) 4 throw std::out_of_range("x < 0"); 5 6 return sqrt(x); 7 } 8 9 int main() 10 { 11 std::future<double> f = std::async(square_root, -1); 12 double dVal = f.get(); 13 return 0; 14 }
1 try 2 { 3 pro.set_value(dVal); 4 } 5 catch (...) 6 { 7 pro.set_exception(std::make_exception_ptr(std::logic_error("error"))); 8 }
限时等待
时钟周期-std::ratio
template<std::intmax_t Num, std::intmax_t Denom = 1> class ratio;
ratio<2> //代表2秒 ratio<60> //代表1分钟 ratio<60*60> //代表1小时 ratio<1,1000> //代表1毫秒 ratio<1,1000000> //代表1微妙
时钟类
high_resolution_clock 被定义为:
using high_resolution_clock = steady_clock;
时长类
template<class Rep, class Period = std::ratio<1,1>> class duration;
typedef duration<Rep ratio<3600,1>> hours; //小时 typedef duration<Rep ratio<60,1>> minutes; //分钟 typedef duration<Rep ratio<1,1>> seconds; //秒 typedef duration<Rep ratio<1,1000>> milliseconds; //毫秒 typedef duration<Rep ratio<1,1000000>> microseconds; //微妙 typedef duration<Rep ratio<1,1000000000>> nanoseconds; //纳秒
std::this_thread::sleep_for(std::chrono::seconds(3)); //休眠3秒
std::chrono::duration_cast时间转换
1 std::chrono::milliseconds ms(54802); 2 std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms); 3 4 cout << ms.count() << endl; //54802 5 cout << s.count() << endl; //54
count()函数
1 std::chrono::seconds s = std::chrono::seconds(1) * 5; 2 cout << s.count() << endl; //5
时间点类
template <class _Clock, class _Duration = typename _Clock::duration> class time_point;
1 int main() 2 { 3 using days_type = std::chrono::duration<int, std::ratio<60 * 60 * 24>>; 4 5 std::chrono::time_point<std::chrono::system_clock, days_type> today = 6 std::chrono::time_point_cast<days_type>(std::chrono::system_clock::now()); 7 8 cout << today.time_since_epoch().count() << endl; 9 }
std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500);
接受超时时限的函数
1 std::chrono::milliseconds ms(500); 2 this_thread::sleep_for(ms); 3 4 auto tp = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500); 5 this_thread::sleep_until(tp);
运用同步操作简化代码
利用future进行函数式编程
template<typename T> std::list<T> parallel_quick_sort(std::list<T> input) { if (input.empty()) return input; std::list<T> result; result.splice(result.begin(), input, input.begin()); const T& pivot = *result.begin(); auto divide_point = std::partition(input.begin(), input.end(), [&pivot](const T& t) { return t < pivot; }); std::list<T> lower_part; lower_part.splice(lower_part.begin(), input, input.begin(), divide_point); std::future<std::list<T>> fu = std::async(parallel_quick_sort<T>, std::move(lower_part)); auto new_higher = parallel_quick_sort(std::move(input)); result.splice(result.end(), new_higher); result.splice(result.begin(), fu.get()); return result; }
使用消息传递进行同步
符合并发技术规约的后续风格并发
template<typename Func> std::experimental::future<decltype(std::declval<Func>()())> spawn_async(Func&& func) { std::experimental::promise<decltype(std::declval<Func>()())> pro; auto fu = pro.get_future(); auto cbk = [_pro = std::move(pro), _func = std::decay_t<Func>(func)]() mutable { try { _pro.set_value_at_thread_exit(_func()); } catch (...) { _pro.set_exception_at_thread_exit(std::current_exception()); } }; std::thread th(cbk); th.detach(); return fu; }
后续函数的连锁调用
1 void process_login(std::string const& username, std::string const& password) 2 { 3 try 4 { 5 user_id const id = backend.authenticate_user(username, password); //验证身份 6 user_data const info_to_display = backend.request_current_info(id); //请求账户信息 7 update_display(info_to_display); //更新显示 8 } 9 catch (std::exception& e) 10 { 11 display_error(e); 12 } 13 }
1 std::future<void> process_login(std::string const& username, std::string const& password) 2 { 3 auto cbk = [=]() 4 { 5 try 6 { 7 const user_id id = backend.authenticate_user(username, password); //验证身份 8 const user_data info_to_display = backend.request_current_info(id); //请求账户信息 9 update_display(info_to_display); //更新显示 10 } 11 catch (std::exception& e) 12 { 13 display_error(e); 14 } 15 }; 16 17 return std::async(std::launch::async, cbk); 18 }
文章来源地址https://www.toymoban.com/news/detail-709985.html
1 std::experimental::future<void> process_login(std::string const& username, std::string const& password) 2 { 3 //验证身份 4 auto cbk1 = [=]() 5 { 6 return backend.authenticate_user(username, password); 7 }; 8 9 //请求账户信息 10 auto cbk2 = [](std::experimental::future<user_id> id) 11 { 12 return backend.request_current_info(id.get()); 13 }; 14 15 //更新显示 16 auto cbk3 = [](std::experimental::future<user_data> info_to_display) 17 { 18 try { 19 update_display(info_to_display.get()); 20 } 21 catch (std::exception& e) { 22 display_error(e); 23 } 24 }; 25 26 return spawn_async(cbk1).then(cbk2).then(cbk3); 27 }
1 std::experimental::future<void> process_login(std::string const& username, std::string const& password) 2 { 3 auto cbk1 = [](std::experimental::future<user_id> id) 4 { 5 return backend.async_request_current_info(id.get()); 6 }; 7 8 auto cbk2 = [](std::experimental::future<user_data> info_to_display) 9 { 10 try 11 { 12 update_display(info_to_display.get()); 13 } 14 catch (std::exception& e) 15 { 16 display_error(e); 17 } 18 }; 19 20 return backend.async_authenticate_user(username, password).then(cbk1).then(cbk2); 21 }
等待多个future全部就绪
1 std::future<FinalResult> process_data(std::vector<MyData>& vec) 2 { 3 size_t const chunk_size = whatever; 4 std::vector<std::future<ChunkResult>> results; 5 6 for (auto begin = vec.begin(), end = vec.end(); beg != end;) 7 { 8 const size_t remaining_size = end - begin; 9 const size_t this_chunk_size = std::min(remaining_size, chunk_size); 10 results.push_back(std::async(process_chunk, begin, begin + this_chunk_size)); 11 begin += this_chunk_size; 12 } 13 14 //新开线程,不断检查future是否就绪 15 auto cbk = [all_results = std::move(results)]() 16 { 17 std::vector<ChunkResult> v; 18 for (auto& f : all_results) 19 { 20 v.push_back(f.get()); 21 } 22 }; 23 return std::async(cbk); 24 }
std::experimental::when_all()函数
1 std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec) 2 { 3 size_t const chunk_size = whatever; 4 std::vector<std::experimental::future<ChunkResult>> results; 5 for (auto begin = vec.begin(), end = vec.end(); beg != end;) 6 { 7 const size_t remaining_size = end - begin; 8 const size_t this_chunk_size = std::min(remaining_size, chunk_size); 9 results.push_back(spawn_async(process_chunk, begin, begin + this_chunk_size)); 10 begin += this_chunk_size; 11 } 12 13 auto cbk = [](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results) 14 { 15 std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get(); 16 std::vector<ChunkResult> v; 17 v.reserve(all_results.size()); 18 19 for (auto& f : all_results) 20 { 21 v.push_back(f.get()); 22 } 23 return gather_results(v); 24 }; 25 26 return std::experimental::when_all(results.begin(), results.end()).then(cbk); 27 }
等待多个future中任意一个就绪
std::experimental::when_any()函数
1 struct DoneCheck 2 { 3 std::shared_ptr<std::experimental::promise<FinalResult>> final_result; 4 5 DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> final_result_) : final_result(std::move(final_result_)) {} 6 7 void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param) 8 { 9 auto results = results_param.get(); 10 const MyData* ready_result = results.futures[results.index].get(); 11 12 if (ready_result) 13 final_result->set_value(process_found_value(*ready_result)); 14 else 15 { 16 results.futures.erase(results.futures.begin() + results.index); 17 if (!results.futures.empty()) 18 { 19 std::experimental::when_any(results.futures.begin(), results.futures.end()).then(std::move(*this)); 20 } 21 else 22 { 23 final_result->set_exception(std::make_exception_ptr(std::runtime_error("Not found"))); 24 } 25 } 26 } 27 }; 28 29 std::experimental::future<FinalResult> find_and_process_value(std::vector<MyData>& data) 30 { 31 const unsigned concurrency = std::thread::hardware_concurrency(); 32 const unsigned num_tasks = (concurrency > 0) ? concurrency : 2; 33 34 std::vector<std::experimental::future<MyData*>> results; 35 36 auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks; 37 38 auto chunk_begin = data.begin(); 39 40 std::shared_ptr<std::atomic<bool>> done_flag = std::make_shared<std::atomic<bool>>(false); 41 42 for (unsigned i = 0; i < num_tasks; ++i) 43 { 44 auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end(); 45 46 auto cbk = [=]() 47 { 48 for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry) 49 { 50 if (matches_find_criteria(*entry)) 51 { 52 *done_flag = true; 53 return &*entry; 54 } 55 } 56 return (MyData*)nullptr; 57 }; 58 59 std::experimental::future<MyData*> _fu = spawn_async(cbk); 60 results.push_back(std::move(_fu)); 61 chunk_begin = chunk_end; 62 } 63 64 std::shared_ptr<std::experimental::promise<FinalResult>> final_result = std::make_shared<std::experimental::promise<FinalResult>>(); 65 66 std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result)); 67 return final_result->get_future(); 68 }
void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param)
1 auto results = results_param.get(); 2 const MyData* ready_result = results.futures[results.index].get();
基本的线程闩类std::experimental::latch
1 void foo() 2 { 3 unsigned const thread_count = 5; 4 std::experimental::latch done(thread_count); 5 my_data data[thread_count]; 6 std::vector<std::future<void>> threads; 7 8 for (unsigned i = 0; i < thread_count; ++i) 9 { 10 auto cbk = [&, i]() 11 { 12 data[i] = make_data(i); 13 done.count_down(); 14 do_more_stuff(); 15 }; 16 17 std::future<void> _fu = std::async(std::launch::async, cbk); 18 threads.push_back(std::move(_fu)); 19 } 20 21 done.wait(); 22 process_data(data, thread_count); 23 }
基本的线程卡类std::experimental::barrier
1 void process_data(data_source& source, data_sink& sink) 2 { 3 const unsigned concurrency = std::thread::hardware_concurrency(); 4 const unsigned num_threads = (concurrency > 0) ? concurrency : 2; //线程个数 5 6 std::experimental::barrier sync(num_threads); 7 std::vector<joining_thread> threads(num_threads); 8 9 std::vector<data_chunk> chunks; 10 result_block result; 11 12 for (unsigned i = 0; i < num_threads; ++i) 13 { 14 auto cbk = [&, i]() 15 { 16 while (!source.done()) 17 { 18 if (!i) //在0号线程上做分割 19 { 20 data_block current_block = source.get_next_data_block(); 21 chunks = divide_into_chunks(current_block, num_threads); //分成num_threads块 22 } 23 24 sync.arrive_and_wait(); //其他线程等待0号线程分割完 25 result.set_chunk(i, num_threads, process(chunks[i])); //各个线程进行相关处理 26 sync.arrive_and_wait(); //等待所有线程处理完 27 28 if (!i) 29 { 30 sink.write_data(std::move(result)); //由0号线程负责处理写入操作 31 } 32 } 33 }; 34 35 threads[i] = joining_thread(cbk); //创建std::thread线程,会自动调用join() 36 } 37 }
std::experimental::flex_barrier
1 void process_data(data_source& source, data_sink& sink) 2 { 3 const unsigned concurrency = std::thread::hardware_concurrency(); 4 const unsigned num_threads = (concurrency > 0) ? concurrency : 2; 5 std::vector<data_chunk> chunks; 6 7 auto split_source = [&] () 8 { 9 if (!source.done()) 10 { 11 data_block current_block = source.get_next_data_block(); 12 chunks = divide_into_chunks(current_block, num_threads); 13 } 14 }; 15 16 //先对数据进行分割 17 split_source(); 18 19 result_block result; 20 21 auto cbk1 = [&]() 22 { 23 sink.write_data(std::move(result)); 24 split_source(); 25 return -1; 26 }; 27 28 //等到线程都到达时,其中一个线程会调用cbk1补全函数 29 std::experimental::flex_barrier sync(num_threads, cbk1); 30 31 std::vector<joining_thread> threads(num_threads); 32 for (unsigned i = 0; i < num_threads; ++i) 33 { 34 auto cbk2 = [&, i] 35 { 36 while (!source.done()) 37 { 38 result.set_chunk(i, num_threads, process(chunks[i])); //各个线程进行相关处理 39 sync.arrive_and_wait(); //等待所有线程就位(就位后其中一个线程会调用补全函数cbk1) 40 } 41 }; 42 threads[i] = joining_thread(cbk2); //创建std::thread线程,会自动调用join() 43 } 44 }
Copyright
文章来源:https://www.toymoban.com/news/detail-709985.html
到了这里,关于c++并发编程实战-第4章 并发操作的同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!