c++并发编程实战-第4章 并发操作的同步

这篇具有很好参考价值的文章主要介绍了c++并发编程实战-第4章 并发操作的同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

等待事件或等待其他条件

坐车案例

想象一种情况:假设晚上坐车外出,如何才能确保不坐过站又能使自己最轻松?

方法一:不睡觉,时刻关注自己的位置

 1 #include <iostream>
 2 #include <thread>
 3 #include <mutex>
 4 using namespace std;
 5 
 6 mutex _mtx;
 7 bool bFlag = false;
 8 void wait_for_flag()
 9 {
10     auto startTime = chrono::steady_clock::now();
11     while (1)
12     {
13         unique_lock<mutex> lock(_mtx);
14         if (bFlag)
15         {
16             auto endTime = chrono::steady_clock::now();
17             double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
18             cout << "wait_for_flag consume : " << dCount << endl;
19             return;
20         }
21     }
22 }
23 
24 void set_flag()
25 {
26     auto startTime = chrono::steady_clock::now();
27     unique_lock<mutex> lock(_mtx);
28     for (int i = 0; i < 5; i++)
29     {
30         lock.unlock();
31         //do something comsume 1000ms
32         this_thread::sleep_for(chrono::milliseconds(1000));
33         lock.lock();
34     }
35 
36     bFlag = true;
37     auto endTime = chrono::steady_clock::now();
38     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
39     cout << "set_flag consume : " << dCount << endl;
40 }
41 
42 int main()
43 {
44     thread th1(wait_for_flag);
45     thread th2(set_flag);
46     th1.join();
47     th2.join();
48     return 0;
49 }

这种方式存在双重浪费:

  • 线程 th1(wait_for_flag)须不断查验标志,浪费原本有用的处理时间,这部分计算资源原本可以留给其他线程使用。
  • 线程 th1(wait_for_flag)每次循环都需要给互斥上锁,导致其他线程无法加锁。如果 th2 此时完成操作,则需要等待 th1 释放互斥才能操作。

程序输出如下:

set_flag consume : 5045.39
wait_for_flag consume : 5045.97

两个线程执行时间相近,但查看任务管理器,发现Debug程序CPU占用率始终保持10%。

c++并发编程实战-第4章 并发操作的同步

方法二:通过设定多个闹钟,每隔一段时间叫醒自己

 1 void wait_for_flag()
 2 {
 3     auto startTime = chrono::steady_clock::now();
 4     unique_lock<mutex> lock(_mtx);
 5     while (!bFlag)
 6     {
 7         lock.unlock();
 8         //设置 500ms 的闹钟
 9         this_thread::sleep_for(chrono::milliseconds(500));    
10         lock.lock();
11     }
12 
13     auto endTime = chrono::steady_clock::now();
14     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
15     cout << "wait_for_flag consume : " << dCount << endl;
16 }

上面代码中引用了 this_thread::sleep_for()函数,如果暂时不满足条件,就让线程休眠。这确有改进,因为线程休眠,所以处理时间不再被浪费(不用熬夜)。但是,还是存在缺陷,休眠间隔时间难以确定。如果设置太短,会导致频繁检验,如果设置太长,又可能导致过度休眠(到站还没响)。如果线程 th2 完成了任务,线程 th1 却没有被及时唤醒,就会导致延迟。

上面的代码将休眠时间设置为500ms,CPU占用率始终为0%,但两个线程的运行时间相差过大。运行结果如下:

set_flag consume : 5061.66
wait_for_flag consume : 5570.77

方法三:让列车员叫醒你(使用 c++提供的同步机制)

若数据存在先后处理关系,线程甲需要等待线程乙完成处理后才能开始操作,那么线程甲则需等待线程乙完成并且触发事件,其中最基本的方式是条件变量。

 1 mutex _mtx;
 2 bool bFlag = false;
 3 condition_variable _cond;    //条件变量
 4 void wait_for_flag()
 5 {
 6     auto startTime = chrono::steady_clock::now();
 7     unique_lock<mutex> lock(_mtx);
 8     _cond.wait(lock, []() {return bFlag; });    //等待
 9 
10     auto endTime = chrono::steady_clock::now();
11     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
12     cout << "wait_for_flag consume : " << dCount << endl;
13 }
14 
15 void set_flag()
16 {
17     auto startTime = chrono::steady_clock::now();
18     unique_lock<mutex> lock(_mtx);
19     for (int i = 0; i < 5; i++)
20     {
21         lock.unlock();
22         //do something comsume 1000ms
23         this_thread::sleep_for(chrono::milliseconds(1000));
24         lock.lock();
25     }
26 
27     bFlag = true;
28     _cond.notify_one();    //通知
29     auto endTime = chrono::steady_clock::now();
30     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
31     cout << "set_flag consume : " << dCount << endl;
32 }

引用条件变量后,两线程执行时间相差不大,程序输出如下:

set_flag consume : 5015.84
wait_for_flag consume : 5016.75

注:上述案例的测试结果可能不尽相同,理解意思即可。

条件变量

C++标准库提供了条件变量的两种实现:

  • std::condition_variable,只能和std::mutex一起使用。(推荐)
  • std::condition_variable_any,只要某一类型符合成为互斥的最低标准,就能与其一起使用。

二者都在标准库的头文件<condition_variable>内声明。

std::condition_variable

构造函数

condition_variable();
~condition_variable();

condition_variable(const condition_variable&) = delete;
condition_variable& operator=(const condition_variable&) = delete;

不支持拷贝、也不支持移动

通知

void notify_one();      //唤醒一个等待者
void notify_all();      //唤醒所有等待者

wait()函数

void wait(unique_lock<mutex>& _Lck);
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred);

参数:

  • _Lck:独占锁,需要多次调用加锁、解锁操作。
  • _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。

含义:

使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。

该函数执行过程如下:

  • 当程序流程执行到wait时,如果指定了_Pred参数,wait会先执行_Pred如果_Pred返回true,wait函数执行完毕,返回,执行后续代码;如果_Pred返回false,先将_Lck解锁并阻塞当前线程,等待其他线程唤醒。如果没有指定_Pred参数,等价于返回false的情况。
  • 后续,其他线程调用notify_one()或notify_all()函数唤醒当前线程。当前线程被唤醒,先将_Lck上锁,如果指定_Pred参数,则先进行检查,根据返回值决定是否阻塞。如果没有指定_Pred参数,wait函数执行完毕,返回,执行后续代码

wait_for()函数

template <class _Rep, class _Period>
cv_status wait_for(
            unique_lock<mutex>& _Lck,
            const chrono::duration<_Rep, _Period>& _Rel_time);

template <class _Rep, class _Period, class _Predicate>
bool wait_for(
        unique_lock<mutex>& _Lck,
        const chrono::duration<_Rep, _Period>& _Rel_time,
        _Predicate _Pred);

参数:

  • _Lck:独占锁
  • _Rel_time:等待所消耗的最大时间
  • _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。
返回值:
  • cv_status:如果在最大时间时间内被唤醒,则wait_for()函数返回cv_status::no_timeout,否则wait_for()函数返回cv_status::timeout。
  • bool:返回_Pred的返回值。
含义:

使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。如果等待时常超过_Rel_time,wait函数将返回。

wait_until()函数

template <class _Clock, class _Duration>
cv_status wait_until(
            unique_lock<mutex>&_Lck,
            const chrono::time_point<_Clock, _Duration>& _Abs_time);

template <class _Clock, class _Duration, class _Predicate>
bool wait_until(
        unique_lock<mutex>&_Lck,
        const chrono::time_point<_Clock, _Duration>&_Abs_time,
        _Predicate _Pred);

参数:

  • _Lck:独占锁
  • _Abs_time:指定停止等待的时间点
  • _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。

返回值:

  • cv_status:如果在指定的时间点之前被唤醒,则wait_until()函数返回cv_status::no_timeout,否则wait_until()函数返回cv_status::timeout。
  • bool:返回_Pred的返回值。

含义:

使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。如果等待时常超过指定的_Abs_time时间点,wait函数将返回。

std::condition_variable_any

std::condition_variable_any与std::condition_variable类似,这里只简单列出成员函数,具体含义可以参考上面的std::condition_variable。

构造函数

condition_variable_any();
~condition_variable_any();

condition_variable_any(const condition_variable_any&) = delete;
condition_variable_any& operator=(const condition_variable_any&) = delete;

不支持拷贝、也不支持移动

通知

void notify_one();
void notify_all();

wait()函数

template <class _Lock>
void wait(_Lock& _Lck);

template <class _Lock, class _Predicate>
void wait(_Lock& _Lck, _Predicate _Pred);

wait_for()函数

template <class _Lock, class _Rep, class _Period>
cv_status wait_for(
            _Lock & _Lck,
            const chrono::duration<_Rep, _Period>& _Rel_time);

template <class _Lock, class _Rep, class _Period, class _Predicate>
bool wait_for(
        _Lock & _Lck,
        const chrono::duration<_Rep, _Period>&_Rel_time,
        _Predicate _Pred);

wait_until()函数

template <class _Lock, class _Clock, class _Duration>
cv_status wait_until(_Lock & _Lck,
            const chrono::time_point<_Clock, _Duration>&_Abs_time)

template <class _Lock, class _Clock, class _Duration, class _Predicate>
bool wait_until(_Lock & _Lck,
        const chrono::time_point<_Clock, _Duration>&_Abs_time,
        _Predicate _Pred)

虚假唤醒

当线程从休眠状态中被唤醒,却发现等待条件未满足时,因而无事,这种情况被称为虚假唤醒。发生虚假唤醒最常见的情况是,多个线程争抢同一个条件,例如:

 1 mutex _mtx;
 2 condition_variable        _cond;
 3 queue<int> _dataQueue;
 4 
 5 void data_preparation_thread()
 6 {
 7     while (true)
 8     {
 9         int _data = rand();
10         {
11             std::lock_guard<mutex> lock(_mtx);
12             _dataQueue.push(_data);
13         }
14         _cond.notify_all();
15         this_thread::sleep_for(chrono::milliseconds(1000));
16     }
17 }
18 
19 void data_processing_thread()
20 {
21     while (true)
22     {
23         std::unique_lock<mutex> lock(_mtx);
24         _cond.wait(lock, []()
25             {
26                 bool bEmpty = _dataQueue.empty();
27                 if (bEmpty)
28                     cout << this_thread::get_id() << " be spurious waken up\n";
29 
30                 return !bEmpty;
31             });
32         int _data = _dataQueue.front();
33         _dataQueue.pop();
34         lock.unlock();
35 
36         cout << "threadID : " << this_thread::get_id() << " data = " << _data << endl;
37     }
38 }
39 
40 int main()
41 {
42     srand(time(NULL));
43 
44     thread th1(data_processing_thread);
45     thread th2(data_processing_thread);
46     thread th3(data_preparation_thread);
47     th1.join();
48     th2.join();
49     th3.join();
50     return 0;
51 }

两个线程竞争队列中的一条数据,总有一个是被虚假唤醒的。

唤醒丢失

 1 void wait_for_flag()
 2 {
 3     unique_lock<mutex> lock(_mtx);
 4     _cond.wait(lock);    //等待
 5 }
 6 
 7 void set_flag()
 8 {
 9     unique_lock<mutex> lock(_mtx);
10     bFlag = true;
11     _cond.notify_one();    //通知
12 }
13 
14 int main()
15 {
16     thread th1(set_flag);
17     thread th2(wait_for_flag);
18     th1.join();
19     th2.join();
20     return 0;
21 }

先执行set_flag()函数,设置数据,然后通知等待线程,此时wait_for_flag()函数还未进入等待状态,导致通知信号丢失,后续又无新的通知信号,导致线程一直处于阻塞状态。

使用 future 等待一次性事件发生

本节介绍std::future类,该类一般用于处理一次性事件,可以获取该事件的返回值。

我们可以在某个线程启动一个目标事件,该目标事件由新的线程去执行,并获取一个std::future对象。之后,该线程执行自己余下的任务,等到未来某一时刻,获取目标事件中执行的结果。

C++标准程序库有两种future,分别由两个类模板实现,其声明都位于标准库的头文件<future>内:

  • std::future:独占future。
  • std::shared_future:共享future。

它们的设计参照了std::unique_ptr和std::shared_ptr。同一目标事件仅仅允许关联唯一一个std::future实例,但可以关联多个std::shared_future实例。大致含义是:目标事件中的返回结果,如果想被其他多个线程访问,则应该使用std::shared_future,否则使用std::future。

std::future

std::future类提供了访问异步操作执行结果的机制。通过std::async、std::packaged_task或std::promise创建的异步操作,这些函数会返回一个std::future对象,该对象中保存了异步操作的执行结果。但是,该类的结果并不是共享的,即,该结果只能访问一次。

构造函数

future();
~future();
future(future&& _Other);
future& operator=(future&& _Right);

future(const future&) = delete;
future& operator=(const future&) = delete;

仅支持移动语义,不支持拷贝。

valid()函数

bool valid() const;

检测当前结果是否就绪。

get()函数

template <class _Ty>
_Ty get();

阻塞,等待future拥有合法结果并返回该结果。结果返回后,释放共享状态,后续调用valid()函数将返回false。若调用该函数前valid()为false,则行为未定义。

wait()函数

void wait() const;

阻塞直至结果变得可用,该函数执行后,valid() == true。

wait_for()函数

template <class _Rep, class _Per>
future_status wait_for(const chrono::duration<_Rep, _Per>& _Rel_time);

等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回。

future_status有如下取值:

  • future_status::ready:共享状态就绪。
  • future_status::timeout:共享状态在经过指定的等待时间内仍未就绪。
  • future_status::deferred:共享状态持有的函数正在延迟运行,结果将在显式请求时计算。

wait_until()函数

template <class _Clock, class _Dur>
future_status wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time);

等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回。

share()函数

template <class _Ty>
shared_future<_Ty> share();

将本对象移动到std::shared_future对象。

std::shared_future

类似std::future类,同样提供了访问异步操作执行结果的机制,与std::future类不同的是,该类的结果可以被访问多次(可以连续多次调用get()函数)。

构造函数

shared_future();      //构造函数
~shared_future();     //析构函数

//支持拷贝
shared_future(const shared_future& _Other);
shared_future& operator=(const shared_future& _Right);

//支持移动
shared_future(future<_Ty>&& _Other);
shared_future& operator=(shared_future&& _Right);

支持拷贝,可以复制多个std::shared_future对象指向同一异步结果。每个线程通过自身的std::shared_future对象副本访问共享的异步结果,这一操作是安全的。

valid()函数

bool valid() const;

检测当前结果是否可用。

get()函数

template <class _Ty>
const _Ty& get() const;

阻塞,等待shared_future 拥有合法结果并获取它。若调用此函数前valid()为false,则行为未定义。

等待

void wait() const;

template <class _Rep, class _Per>
future_status wait_for(
    const chrono::duration<_Rep, _Per>&_Rel_time);

template <class _Clock, class _Dur>
future_status wait_until(
    const chrono::time_point<_Clock, _Dur>& _Abs_time);

和std::future相似。

案例-构造std::shared_future对象

 1 std::promise<int> pro;
 2 std::future<int> _fu = pro.get_future();
 3 std::shared_future<int> _sfu = std::move(_fu);        //显示
 4 
 5 std::promise<int> pro;
 6 std::shared_future<int> _sfu = pro.get_future();        //隐式
 7 
 8 std::promise<int> pro;
 9 std::future<int> _fu = pro.get_future();
10 std::shared_future<int> _sfu = _fu.share();        //share函数

std::async()函数-从后台任务返回值

std::async()函数用于构建后台任务,并允许调用者在未来某一时刻获取该任务的返回值。

函数定义

template <class _Fty, class... _ArgTypes>
std::future<...> async(_Fty&& _Fnarg, _ArgTypes&&... _Args);

template <class _Fty, class... _ArgTypes>
std::future<...> async(launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args);
参数:
  • _Policy:该函数执行方式,有如下几种取值:
    • std::launch::async:异步执行。该函数执行后,会启动新线程执行可调用对象。
    • std::launch::deferred:惰性执行。该函数执行后,并不会启动线程,而是等到后续需要获取结果时,由获取值的线程直接执行可调用对象。如果没有调用get()或者wait(),可调用对象不会执行。
    • std::launch::async | std::launch::deferred:由系统自行决定采用其中一个。
  • _Fnarg:可调用对象。
  • _Args:传递给可调用对象的参数包。

返回值:

std::future对象,通过该对象可用获取后台任务的返回值。

参数传递流程

//省略Res_Data类
int Entry(Res_Data data)
{
    cout << "-----------";
    return 5;
}

int main()
{
    Res_Data _data;
    auto _fu = std::async(Entry, _data);
    cout << _fu.get() << endl;
    return 0;
}

输出如下:

008FF9E3  Constractor
008FF6E8  Copy Constractor
008FF33C  Move Constractor
00CE0854  Move Constractor
008FF33C  Destractor
008FF6E8  Destractor
009FDE74  Move Constractor
-----------
009FDE74  Destractor
00CE0854  Destractor
5
008FF9E3  Destractor

结论:一次拷贝,3次移动。

案例

int ThreadEntry()
{
    cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl;
    return 5;
}

int main()
{
    cout << "main threadId : " << std::this_thread::get_id() << endl;
    std::future<int> fu = std::async(ThreadEntry);
    cout << "main thread to do something" << endl;

    cout << fu.get() << endl;            //等待线程结束并获取值,不能重复调用
    //fu.wait();                        //仅等待线程结束

    cout << "main thread end doing something" << endl;

    return 0;
}

如何判断std::async的执行方式

如果在构建异步任务时,不指定std::async的执行方式,那么操作系统会根据当前系统资源自动决定采用 std::launch::async 或 std::launch::deferred,那么,在程序中如何获知当前的执行方式呢?方法如下:

 1 std::future _fu = std::async(ThreadEntry);
 2 future_status status = _fu.wait_for(std::chrono::microseconds(0));//等待0ms
 3 if (status == future_status::deferred)        //延时执行
 4 {
 5     //todo
 6 }
 7 else if (status == future_status::ready || status == future_status::timeout)
 8 {
 9     //todo
10 }

std::future类的wait_for和wait_until成员函数会返回future_status枚举类型,该类型中记录当前的执行方式。

std::async与std::thread的区别

std::thread是创建一个线程执行相关操作,当系统资源紧张时,可能会导致线程创建失败。

std::async是创建一个异步任务,是否创建线程取决于操作系统,当系统资源紧张时,该函数可能不会创建线程,这取决于函数的执行方式。

std::thread不支持获取线程的执行结果,而std::async可以通过std::future获取异步任务的执行结果。

std::packaged_task - 关联future实例和任务

类模板 std::packaged_task 用于包装任何可调用对象,使之能被异步调用。可调用对象执行后,其返回值保存在std::future对象中,可用通过成员函数get_future()函数获取。

若一项庞杂的操作能分解为多个子任务,则可把它们分别包装到多个std::packaged_task实例之中,再传递给任务调度器或线程池。这就隐藏了细节,使任务抽象化,让调度器得以专注处理std::packaged_task<>实例,无须纠缠于形形色色的任务函数。

构造函数

packaged_task();      //默认构造函数
~packaged_task();     //析构函数

template <class _Fty2, ...>
packaged_task(_Fty2&& _Fnarg)    //传入一个可调用对象

//支持移动
packaged_task(packaged_task&& _Other);
packaged_task& operator=(packaged_task&& _Other);

//不支持拷贝
packaged_task(const packaged_task&) = delete;
packaged_task& operator=(const packaged_task&) = delete;

packaged_task的实例化与std::function类似,比如:

int ThreadEntry(string& str, double dValue, char* pBuf);

std::packaged_task<int(string&, double, char*)> task(ThreadEntry);

valid()函数

bool valid() const;

检查任务对象是否拥有合法的可调用对象。

swap()函数

void swap(packaged_task& _Other);

交换std::packaged_task中包装的可调用对象和与之关联的std::future对象。

get_future()函数

future<_Ret> get_future();

返回与之关联的std::future对象。

operator()()函数

void operator()(_ArgTypes... _Args);

调用被包装的可调用对象,可调用对象执行完后,将返回值保存到std::future对象中,并使std::future对象变为可用状态。

make_ready_at_thread_exit()函数

void make_ready_at_thread_exit(_ArgTypes... _Args);

该函数会调用被包装的可调用对象,可调用对象执行完后,将返回值保存到std::future对象中,但不会立马让std::future对象变为可用,而是等到该线程结束后才使其可用。

例如:

 1 int ThreadEntry()
 2 {
 3     cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl;
 4     std::this_thread::sleep_for(std::chrono::milliseconds(5000));
 5     cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl;
 6     return 5;
 7 }
 8 
 9 int main()
10 {
11     std::packaged_task<int(void)> task(ThreadEntry);
12     std::future<int> _fu = task.get_future();
13 
14     task.make_ready_at_thread_exit(); //执行该函数后,_fu未就绪,调用get将出错
15     //task();   //_fu立马就绪,调用get()不会出错
16     cout << "value = " << _fu.get() << endl;
17     return 0;
18 }

正确用法:

 1 int main()
 2 {
 3     std::packaged_task<int(void)> task(ThreadEntry);
 4     std::future<int> _fu = task.get_future();
 5 
 6     thread th1([&]() {task.make_ready_at_thread_exit(); });
 7     th1.detach();
 8     
 9     cout << "value = " << _fu.get() << endl;
10     return 0;
11 }

reset()函数

void reset();

抛弃之前的std::future对象和执行结果,重新构建新的std::future对象。不影响可调用对象。

案例-在线程间传递任务

 1 std::mutex m;
 2 std::deque<std::packaged_task<void()>> tasks;
 3 
 4 void gui_thread()
 5 {
 6     while (...)   
 7     {
 8         std::packaged_task<void()> task;
 9         {
10             std::lock_guard<std::mutex> lk(m);
11             if (tasks.empty())    
12                 continue;
13             task = std::move(tasks.front());    
14             tasks.pop_front();
15         }
16         task();
17         //其他操作
18     }
19 }
20 
21 template<typename Func>
22 std::future<void> post_task_for_gui_thread(Func f)
23 {
24     std::packaged_task<void()> task(f);  
25     std::future<void> res = task.get_future();  
26     std::lock_guard<std::mutex> lk(m);
27     tasks.push_back(std::move(task));    
28     return res;    
29 }

std::promise-创建std::promise

std::promise给出了一种异步求值的方法,能与某个std::future对象关联,延后读出需要求取的值。配对的std::promise和std::future可实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的promise设定关联的值,使future准备就绪。

 1 void setValue(int nData, std::promise<int>& _pro)
 2 {
 3     cout << "set value threadID : " << this_thread::get_id() << endl;
 4     this_thread::sleep_for(chrono::seconds(3));        //休息3秒
 5     _pro.set_value( nData * 2 + 5 );
 6 }
 7 
 8 void getValue(std::future<int>& _fu)
 9 {
10     cout << "get value threadID : " << this_thread::get_id() << endl;
11     cout << "data = " << _fu.get() << endl;
12 }
13 
14 int main()
15 {
16     std::promise<int> pro;
17     std::future fu = pro.get_future();
18 
19     thread th1(setValue, 10, std::ref(pro));
20     thread th2(getValue, std::ref(fu));
21 
22     th1.join();
23     th2.join();
24     return 0;
25 }

构造函数

promise();
~promise();

template <class _Alloc>
promise(allocator_arg_t, const _Alloc& _Al);

//移动构造
promise(promise&& _Other);
promise& operator=(promise&& _Other);

//无拷贝
promise(const promise&) = delete;
promise& operator=(const promise&) = delete;

swap()函数

void swap(promise& _Other);

交换2个std::promise对象的内容,以及与之关联的std::future对象。

get_future()函数

future<_Ty> get_future();

获取std::future对象。

set_value()函数

void set_value(const _Ty& _Val);

void set_value(_Ty&& _Val);

_Val存储到共享状态,并使其进入就绪状态。如果std::promise对象在销毁时还未设置值,会用异常代替。

set_value_at_thread_exit()函数

void set_value_at_thread_exit(const _Ty& _Val);

void set_value_at_thread_exit(_Ty&& _Val);

_Val存储到共享状态,在线程结束时,使共享状态进入就绪状态。

set_exception()函数

void set_exception(exception_ptr _Exc);

将异常指针_Excl存储到共享状态,并使其进入就绪状态。

set_exception_at_thread_exit()函数

void set_exception_at_thread_exit(exception_ptr _Exc);

将异常指针_Excl存储到共享状态,在线程结束时,使共享状态进入就绪状态。

案例

 1 void process_connections(vector<ConnectPackage>& connections)
 2 {
 3     while (!done(connections))
 4     {
 5         for (auto itr : connections)
 6         {
 7             if (itr->has_incoming_data())        //有数据传入,接收
 8             {
 9                 data_packet data = connection->incoming();
10                 std::promise<payload_type>& p = connection->get_promise(data.id);
11                 p.set_value(data.payload);
12             }
13 
14             if (connection->has_outgoing_data())   //有数据需向外传递
15             {
16                 outgoing_packet data = connection->top_of_outgoing_queue();
17                 connection->send(data.payload);
18                 data.promise.set_value(true);
19             }
20         }
21     }
22 }

将异常保存到future中

在允许多线程程序时,如果有异常情况发生,异常值如何向外传递是我们需要考虑的一个问题,例如:

 1 double square_root(double x)
 2 {
 3     if (x < 0)
 4         throw std::out_of_range("x < 0");
 5     
 6     return sqrt(x);
 7 }
 8 
 9 int main()
10 {
11     std::future<double> f = std::async(square_root, -1);
12     double dVal = f.get();
13     return 0;
14 }

std::future实例化为double类型,因此f.get()将会返回一个double类型的值,那异常该如何传递呢?

c++规定,若经由std::async()调用的函数抛出异常,则会被保存到future中,代替本该设定的值,future随之进入就绪状态,等到其成员函数get()被调用,存储在内的异常即被重新抛出。假如我们把任务函数包装在std::packaged_task对象内,也依然如是。若包装的任务函数在执行时抛出异常,则会代替本应求得的结果,被保存到future内并使其准备就绪。只要调用get(),该异常就会被再次抛出。

对于std::promise类,可以调用调用成员函数set_exception()设置异常,例如:

1 try
2 {
3     pro.set_value(dVal);
4 }
5 catch (...)
6 {
7     pro.set_exception(std::make_exception_ptr(std::logic_error("error")));
8 }

另一个方法是,直接销毁与std::future对象关联的std::promise对象或std::packaged_task对象。如果关联的future未能准备就绪,无论销毁两者中的哪一个,其析构函数都会将异常std::future_error存储为异步任务的状态数据,它的值是错误代码是std::future_errc::broken_promise。

限时等待

时钟周期-std::ratio

时间周期是指时钟的计时单位,每隔多长时间计数一次,用std::chrono::ratio来描述,该类定义如下:

template<std::intmax_t Num, std::intmax_t Denom = 1>
class ratio;

std::chrono::ratio是一个比例类,其中Num代表分子,Denom代表分母。用于描述时间时,默认单位是秒,即:

ratio<2>          //代表2秒
ratio<60>         //代表1分钟
ratio<60*60>      //代表1小时
ratio<11000>    //代表1毫秒
ratio<11000000> //代表1微妙

时钟类

就C++标准库而言,时钟(clock)是时间信息的来源。具体来说,每种时钟都是一个类,提供4项关键信息:

  • 当前时刻。
  • 时间值的类型(int、double)。
  • 该时钟的计时单元的长度(std::chrono::duration)。
  • 计时速率是否恒定,即能否将该时钟视为恒稳时钟(steady clock)。

c++给出下列几种时钟:

  • std::chrono::system_clock:
  • std::chrono::steady_clock:
  • std::high_resolution_clock:

其中,high_resolution_clock 被定义为:

using high_resolution_clock = steady_clock;

时钟类中都提供了一个静态成员函数now(),用于返回当前时刻。另一个静态成员is_steady,用于判断当前始终的计时速率是否恒定且无法调整,如果is_steady=true,则该时钟为恒定时钟。通常,std::chrono::system_clock类不是恒稳时钟,而std::chrono::steady_clock是一个恒定时钟。

时长类

c++标准库用std::chrono::duration<>类描述时长,该类定义如下:

template<class Rep, class Period = std::ratio<1,1>>
class duration;

其中,Rep代表时钟数的类型;Period代表时钟周期,描述每一个计时周期代表多少秒。c++标准库也定义了一些常用的时长:

typedef duration<Rep ratio<3600,1>> hours;              //小时
typedef duration<Rep ratio<60,1>> minutes;              //分钟
typedef duration<Rep ratio<1,1>> seconds;               //
typedef duration<Rep ratio<1,1000>> milliseconds;       //毫秒
typedef duration<Rep ratio<1,1000000>> microseconds;    //微妙
typedef duration<Rep ratio<1,1000000000>> nanoseconds;  //纳秒

使用也非常方便,如果需要使线程休眠3秒,代码如下:

std::this_thread::sleep_for(std::chrono::seconds(3));    //休眠3秒

std::chrono::duration_cast时间转换

如果需要在两个时间周期转换,可以使用std::chrono::duration_cast<>进行显示转换,例如:

1 std::chrono::milliseconds ms(54802);
2 std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);
3 
4 cout << ms.count() << endl;    //54802
5 cout << s.count() << endl;     //54

强转将导致结果被截断。

count()函数

时长类支持算术运算,我们将时长乘或除以一个数值(数值应与时长类的计数类型相符或对两个时长进行加减,就能得出一个新时长。可以通过count()函数获取该计时单位的数量:

1 std::chrono::seconds s = std::chrono::seconds(1) * 5;
2 cout << s.count() << endl;      //5

时间点类

在c++标准库中,用std::chrono::time_point<>表示时间点,时间点是一个时间跨度,始于一个称为时钟纪元的特定时刻(一般是1970年1月1日0时0分0秒),终于该时间点本身。

template <class _Clock, class _Duration = typename _Clock::duration>
class time_point;

其中,_Clock表示所参考的时钟,_Duration表示计时单元。

std::chrono::time_point类内提供了一个成员函数time_since_epoch(),该函数返回从时钟纪元到该时间点的时长跨度,下面例子是计算当前时间距1970年1月1日相隔多少天:

1 int main()
2 {
3     using days_type = std::chrono::duration<int, std::ratio<60 * 60 * 24>>;
4 
5     std::chrono::time_point<std::chrono::system_clock, days_type> today = 
6         std::chrono::time_point_cast<days_type>(std::chrono::system_clock::now());
7     
8     cout << today.time_since_epoch().count() << endl;
9 }

可以对某个时间进行加减操作,从而计算出新的时间点,例如:

std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500);

上面代码计算出未来500毫秒时刻,我们可以用它来指定绝对超时时刻。在前面的案例中,也给出了计算某段代码执行时长的例子。

接受超时时限的函数

在c++标准库中,以_for结尾的,一般对应一个时长;以_until()结尾的,一般对应一个时间点。

1 std::chrono::milliseconds ms(500);
2 this_thread::sleep_for(ms);
3 
4 auto tp = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500);
5 this_thread::sleep_until(tp);

前面介绍的几个类中,都有包含_for和_until()结尾的函数,例如:std::this_thread、std::condition_variable、std::future、std::unique_lock等。

运用同步操作简化代码

注:该小节整理的不是很好,有部分内容我自己也没做过测试,大伙粗略看一下(或者跳过)。

利用future进行函数式编程

函数式编程是指一种编程风格,函数调用的结果完全取决于参数,而不依赖任何外部状态。若我们以相同的参数调用同一个函数两次,结果会完全一致。有一个函数类型叫“纯函数”,它的含义是:它产生的作用被完全限制在返回值上,而不会改动任何外部状态。这非常契合并发编程,只要共享数据没有改动,就不会引发条件竞争,因而无须动用互斥保护。

下面例子给出了快速排序并发版本:(大概理解一下就好,实际效率非常低)

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if (input.empty()) return input;

    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    const T& pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(), [&pivot](const T& t) { return t < pivot; });
    std::list<T> lower_part;
    lower_part.splice(lower_part.begin(), input, input.begin(), divide_point);

    std::future<std::list<T>> fu = std::async(parallel_quick_sort<T>, std::move(lower_part));
    auto new_higher = parallel_quick_sort(std::move(input));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), fu.get());
    return result;
}

使用消息传递进行同步

CSP(通信式串行线程):线程间没有共享数据,只负责接收消息,然后根据消息做出响应。比如去ATM机取钱,取款人需要先插入银行卡,之后输入密码,验证通过后输入金额,确认并等待机器吐钱。如果用线程来完成这个功能,则会有一个插卡线程,检测有没有插入银行卡,只有等卡插入了,才会通知输入线程显示并接收用户输入的密码,验证通过后,才会通知取钱线程,这样一环套一环的效果,没有完成前一项,就无法切换至下一项,这就是CSP。

符合并发技术规约的后续风格并发

并发技术规约在名字空间std::experimental内,提供了std::experimental::promise和std::experimental::packaged_task,二者与原始版本的差异是都返回std::experimental::future实例。

std::experimental::future提供了一个新的关键特性-then()后续。所谓后续是指,一旦future就绪,该函数立马执行某个函数。

并发技术规约中并没有提供与std::async()等价的函数,我们可以自己实现一个spawn_async函数,功能和std::async()函数类似:

template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())> spawn_async(Func&& func)
{
    std::experimental::promise<decltype(std::declval<Func>()())> pro;
    auto fu = pro.get_future();

    auto cbk = [_pro = std::move(pro), _func = std::decay_t<Func>(func)]() mutable 
    {
        try
        {
            _pro.set_value_at_thread_exit(_func());
        }
        catch (...)
        {
            _pro.set_exception_at_thread_exit(std::current_exception());
        }
    };

    std::thread th(cbk);
    th.detach();
    return fu;
}

后续函数的连锁调用

假定有一系列耗时的任务需要执行,而且,为了让主线程抽身执行其他任务,我们想按异步方式执行这些任务。例如,当用户登录应用程序时,我们就需向后端服务器发送信息以验证身份;完成身份验证之后,我们需再次向后端服务器请求其账户信息;最后,一旦取得了相关信息,就做出更新并显式呈现。

串行方式顺序执行代码如下:

 1 void process_login(std::string const& username, std::string const& password)
 2 {
 3     try 
 4     {
 5         user_id const id = backend.authenticate_user(username, password);        //验证身份
 6         user_data const info_to_display = backend.request_current_info(id);        //请求账户信息
 7         update_display(info_to_display);        //更新显示
 8     }
 9     catch (std::exception& e) 
10     {
11         display_error(e);
12     }
13 }

如果该代码在UI线程中执行,相关请求可能会较慢从而阻塞UI线程。

并行执行如下:

 1 std::future<void> process_login(std::string const& username, std::string const& password)
 2 {
 3     auto cbk = [=]()
 4     {
 5         try
 6         {
 7             const user_id id = backend.authenticate_user(username, password);        //验证身份
 8             const user_data info_to_display = backend.request_current_info(id);        //请求账户信息
 9             update_display(info_to_display);        //更新显示
10         }
11         catch (std::exception& e)
12         {
13             display_error(e);
14         }
15     };
16 
17     return std::async(std::launch::async, cbk);
18 }

 文章来源地址https://www.toymoban.com/news/detail-709985.html

大多数程序员可能会这样书写,实际上这也存在浪费。全部代码集中到后台的某一线程上运行,该后台线程仍会消耗大量资源以逐一完成各任务,因而发生阻塞。若这样的任务很多,那将导致大量线程无所事事,它们只是在空等。

我们需要按照后续函数的方式,将任务接合,形成调用链,每完成一个任务,就执行下一个。使用then()改进代码:

 1 std::experimental::future<void> process_login(std::string const& username, std::string const& password)
 2 {
 3     //验证身份
 4     auto cbk1 = [=]()
 5     {
 6         return backend.authenticate_user(username, password);
 7     };
 8 
 9     //请求账户信息
10     auto cbk2 = [](std::experimental::future<user_id> id)
11     {
12         return backend.request_current_info(id.get());
13     };
14 
15     //更新显示
16     auto cbk3 = [](std::experimental::future<user_data> info_to_display)
17     {
18         try {
19             update_display(info_to_display.get());
20         }
21         catch (std::exception& e) {
22             display_error(e);
23         }
24     };
25 
26     return spawn_async(cbk1).then(cbk2).then(cbk3);
27 }

登录流程被拆分成一系列任务,每个任务各自作为上一个任务的后续函数,接合成调用链。

最后作者还提供了一个写法,但我没理解的太懂,先贴代码,后续补充:

 1 std::experimental::future<void> process_login(std::string const& username, std::string const& password)
 2 {
 3     auto cbk1 = [](std::experimental::future<user_id> id)
 4     {
 5         return backend.async_request_current_info(id.get());
 6     };
 7 
 8     auto cbk2 = [](std::experimental::future<user_data> info_to_display)
 9     {
10         try
11         {
12             update_display(info_to_display.get());
13         }
14         catch (std::exception& e)
15         {
16             display_error(e);
17         }
18     };
19 
20     return backend.async_authenticate_user(username, password).then(cbk1).then(cbk2);
21 }

等待多个future全部就绪

假定有大量数据需要处理,而且每项数据都能独立完成,此时我们会想到生成一组异步任务分别处理数据,最后通过future等待所有任务完成。但是,有可能我们需要将所有的结果汇总起来,则需要等待所有线程将任务完成,为此可能需要新开一个线程,让该线程不断检查future是否就绪,代码如下:

 1 std::future<FinalResult> process_data(std::vector<MyData>& vec)
 2 {
 3     size_t const chunk_size = whatever;
 4     std::vector<std::future<ChunkResult>> results;
 5 
 6     for (auto begin = vec.begin(), end = vec.end(); beg != end;) 
 7     {
 8         const size_t  remaining_size = end - begin;
 9         const size_t  this_chunk_size = std::min(remaining_size, chunk_size);
10         results.push_back(std::async(process_chunk, begin, begin + this_chunk_size));
11         begin += this_chunk_size;
12     }
13 
14     //新开线程,不断检查future是否就绪
15     auto cbk = [all_results = std::move(results)]()
16     {
17         std::vector<ChunkResult> v;
18         for (auto& f : all_results)
19         {
20             v.push_back(f.get());
21         }
22     };
23     return std::async(cbk);
24 }

std::experimental::when_all()函数

可以使用std::experimental::when_all()函数避免,该函数传入若干个需要等待的future对象,它在内部会生成并返回一个总future对象,等到传入的future全部就绪,总future也随之就绪。

std::experimental::when_all()函数产生一个新的future,将传入的多个future全部包装在内。

上述代码可以改为如下:

 1 std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec)
 2 {
 3     size_t const chunk_size = whatever;
 4     std::vector<std::experimental::future<ChunkResult>> results;
 5     for (auto begin = vec.begin(), end = vec.end(); beg != end;) 
 6     {
 7         const size_t remaining_size = end - begin;
 8         const size_t this_chunk_size = std::min(remaining_size, chunk_size);
 9         results.push_back(spawn_async(process_chunk, begin, begin + this_chunk_size));        
10         begin += this_chunk_size;
11     }
12 
13     auto cbk = [](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results)
14     {
15         std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get();
16         std::vector<ChunkResult> v;
17         v.reserve(all_results.size());
18 
19         for (auto& f : all_results)
20         {
21             v.push_back(f.get());
22         }
23         return gather_results(v);
24     };
25 
26     return std::experimental::when_all(results.begin(), results.end()).then(cbk);
27 }

等待多个future中任意一个就绪

假定,我们依据某些具体条件,从庞大的数据集中查找值。不过,若存在多个值同时满足要求,则选取其中任意一个皆可。为此,我们可以生成多个线程,它们分别查找数据集的子集;若有线程找到了符合条件的值,就设立标志示意其他线程停止查找,并设置最终结果的值。

std::experimental::when_any()函数

我们可以采用std::experimental::when_any()函数统筹众多future,它生成一个新的future返回给调用者,只要原来的future中至少有一个准备就绪,则该新future也随之就绪。新future中包含了一个std::experimental::when_any_result<>内部实例,该实例由一个序列和一个索引值组成,其中序列包含传入的全体future,索引值则指明哪个future就绪。

代码如下:

 1 struct DoneCheck
 2 {
 3     std::shared_ptr<std::experimental::promise<FinalResult>> final_result;
 4 
 5     DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> final_result_) : final_result(std::move(final_result_)) {}
 6 
 7     void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param)
 8     {
 9         auto results = results_param.get();
10         const MyData* ready_result = results.futures[results.index].get();
11 
12         if (ready_result)
13             final_result->set_value(process_found_value(*ready_result));
14         else
15         {
16             results.futures.erase(results.futures.begin() + results.index);
17             if (!results.futures.empty())
18             {
19                 std::experimental::when_any(results.futures.begin(), results.futures.end()).then(std::move(*this));
20             }
21             else
22             {
23                 final_result->set_exception(std::make_exception_ptr(std::runtime_error("Not found")));
24             }
25         }
26     }
27 };
28 
29 std::experimental::future<FinalResult> find_and_process_value(std::vector<MyData>& data)
30 {
31     const unsigned  concurrency = std::thread::hardware_concurrency();
32     const unsigned  num_tasks = (concurrency > 0) ? concurrency : 2;
33 
34     std::vector<std::experimental::future<MyData*>> results;
35 
36     auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
37 
38     auto chunk_begin = data.begin();
39 
40     std::shared_ptr<std::atomic<bool>> done_flag = std::make_shared<std::atomic<bool>>(false);
41 
42     for (unsigned i = 0; i < num_tasks; ++i)
43     {
44         auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
45 
46         auto cbk = [=]()
47         {
48             for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry)
49             {
50                 if (matches_find_criteria(*entry))
51                 {
52                     *done_flag = true;
53                     return &*entry;
54                 }
55             }
56             return (MyData*)nullptr;
57         };
58 
59         std::experimental::future<MyData*> _fu = spawn_async(cbk);
60         results.push_back(std::move(_fu));
61         chunk_begin = chunk_end;
62     }
63 
64     std::shared_ptr<std::experimental::promise<FinalResult>> final_result = std::make_shared<std::experimental::promise<FinalResult>>();
65 
66     std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result));
67     return final_result->get_future();
68 }

上面代码别看了,要命...(书上的更要命0_0),下面这句最要紧:

void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param)

返回的新future中保存的是一个std::experimental::when_any_result类型的实例,调用get函数可以获取future序列和索引值,如下:

1 auto results = results_param.get();
2 const MyData* ready_result = results.futures[results.index].get();

上面代码中,results.futures是future序列,results.index是索引值,results.futures[results.index]是就绪的future。

基本的线程闩类std::experimental::latch

std::experimental::latch的构造函数接收唯一一个参数,在构建该类对象时,我们需通过这个参数设定其计数器的初值。接下来,每当等待的目标事件发生时,我们就在线程闩对象上调用count_down(),一旦计数器减到0,它就进入就绪状态。若我们要等待线程闩的状态变为就绪,则在其上调用wait()。若需检查其是否已经就绪,则调用is_ready()。最后,假如我们要使计数器减持,同时要等待它减到0,则应该调用count_down_and_wait()。

例子如下:

 1 void foo() 
 2 {
 3     unsigned const thread_count = 5;
 4     std::experimental::latch done(thread_count);
 5     my_data data[thread_count];
 6     std::vector<std::future<void>> threads;
 7 
 8     for (unsigned i = 0; i < thread_count; ++i)
 9     {
10         auto cbk = [&, i]()
11         {
12             data[i] = make_data(i);
13             done.count_down();
14             do_more_stuff();
15         };
16 
17         std::future<void> _fu = std::async(std::launch::async, cbk);
18         threads.push_back(std::move(_fu));
19     }
20 
21     done.wait();
22     process_data(data, thread_count);
23 }

基本的线程卡类std::experimental::barrier

并发技术规约提出了两种线程卡:

  • std::experimental::barrier
  • std::experimental:: flex_barrier

假定有一组线程在协同处理某些数据,各线程相互独立,分别处理数据,因此操作过程不必同步。但是,只有在全部线程都完成各自的处理后,才可以操作下一项数据或开始后续处理,线程卡就是为了应对这种场景。为了同步一组线程,我们创建线程卡,并指定参与同步的线程数目。线程在完成自身的处理后,就运行到线程卡处,通过在线程卡对象上调用arrive_and_wait()等待同步组的其他线程。只要组内最后一个线程也运行至此,所有线程即被释放,线程卡会自我重置。接着,线程组视具体情况各自继续,或处理下一项数据,或进行下一阶段的处理。

代码如下:

 1 void process_data(data_source& source, data_sink& sink)
 2 {
 3     const unsigned concurrency = std::thread::hardware_concurrency();
 4     const unsigned num_threads = (concurrency > 0) ? concurrency : 2;            //线程个数
 5 
 6     std::experimental::barrier sync(num_threads);
 7     std::vector<joining_thread> threads(num_threads);
 8 
 9     std::vector<data_chunk> chunks;
10     result_block result;
11 
12     for (unsigned i = 0; i < num_threads; ++i)
13     {
14         auto cbk = [&, i]()
15         {
16             while (!source.done())
17             {
18                 if (!i) //在0号线程上做分割
19                 {
20                     data_block current_block = source.get_next_data_block();    
21                     chunks = divide_into_chunks(current_block, num_threads);    //分成num_threads块
22                 }
23 
24                 sync.arrive_and_wait();        //其他线程等待0号线程分割完
25                 result.set_chunk(i, num_threads, process(chunks[i]));        //各个线程进行相关处理
26                 sync.arrive_and_wait();        //等待所有线程处理完
27 
28                 if (!i) 
29                 {
30                     sink.write_data(std::move(result));        //由0号线程负责处理写入操作
31                 }
32             }
33         };
34 
35         threads[i] = joining_thread(cbk);            //创建std::thread线程,会自动调用join()
36     }
37 }

std::experimental::flex_barrier

std::experimental::flex_barrier是std::experimental::latch的灵活版本,二者的不同之处在于,前者具备另一个构造函数,其参数既接收线程的数目,还接收补全函数。只要全部线程都运行到线程卡处,该函数就会在其中一个线程上运行(并且是唯一一个)。

代码如下:

 1 void process_data(data_source& source, data_sink& sink)
 2 {
 3     const unsigned concurrency = std::thread::hardware_concurrency();
 4     const unsigned num_threads = (concurrency > 0) ? concurrency : 2;
 5     std::vector<data_chunk> chunks;
 6 
 7     auto split_source = [&] ()
 8     {
 9         if (!source.done()) 
10         {
11             data_block current_block = source.get_next_data_block();
12             chunks = divide_into_chunks(current_block, num_threads);
13         }
14     };
15 
16     //先对数据进行分割
17     split_source();
18 
19     result_block result;
20 
21     auto cbk1 = [&]()
22     {
23         sink.write_data(std::move(result));
24         split_source();
25         return -1;
26     };
27 
28     //等到线程都到达时,其中一个线程会调用cbk1补全函数
29     std::experimental::flex_barrier sync(num_threads, cbk1);
30 
31     std::vector<joining_thread> threads(num_threads);
32     for (unsigned i = 0; i < num_threads; ++i) 
33     {
34         auto cbk2 = [&, i]
35         {
36             while (!source.done())
37             {
38                 result.set_chunk(i, num_threads, process(chunks[i]));    //各个线程进行相关处理
39                 sync.arrive_and_wait();        //等待所有线程就位(就位后其中一个线程会调用补全函数cbk1)
40             }
41         };
42         threads[i] = joining_thread(cbk2);        //创建std::thread线程,会自动调用join()
43     }
44 }

Copyright

本文参考至《c++并发编程实战》 第二版,作者:安东尼·威廉姆斯。本人阅读后添加了自己的理解并整理,方便后续查找,可能存在错误,欢迎大家指正,感谢!

 

 

到了这里,关于c++并发编程实战-第4章 并发操作的同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《C++并发编程实战》读书笔记(4):原子变量

    标准原子类型的定义位于头文件 atomic 内。原子操作的关键用途是取代需要互斥的同步方式,但假设原子操作本身也在内部使用了互斥,就很可能无法达到期望的性能提升。有三种方法来判断一个原子类型是否属于无锁数据结构: 所有标准原子类型( std::atomic_flag 除外,因为

    2024年02月10日
    浏览(39)
  • C++ 并发编程实战 第二章 线程管控

    线程通过构建 std::thread 对象而自动启动 ,该对象指明线程要运行的任务。 对应复杂的任务,可以使用函数对象。 一旦启动了线程,我们就需明确是要等待它结束(与之汇合 join() ),还是任由它独自运行(与之分离 detach() ) ❗❗❗ 同一个线程的 .join() 方法不能被重复调用

    2023年04月08日
    浏览(42)
  • 《C++并发编程实战》读书笔记(1):线程管控

    包含头文件 thread 后,通过构建 std::thread 对象启动线程,任何可调用类型都适用于 std::thread 。 启动线程后,需要明确是等待它结束、还是任由它独自运行: 调用成员函数 join() 会先等待线程结束,然后隶属于该线程的任何存储空间都会被清除, std::thread 对象不再关联到已结

    2024年02月10日
    浏览(41)
  • 《C++并发编程实战》读书笔记(2):线程间共享数据

    在C++中,我们通过构造 std::mutex 的实例来创建互斥量,调用成员函数 lock() 对其加锁,调用 unlock() 解锁。但通常更推荐的做法是使用标准库提供的类模板 std::lock_guard ,它针对互斥量实现了RAII手法:在构造时给互斥量加锁,析构时解锁。两个类都在头文件 mutex 里声明。 假设

    2024年02月10日
    浏览(43)
  • C++并发编程 | 原子操作std::atomic

    目录 1、原子操作std::atomic相关概念 2、不加锁情况 3、加锁情况  4、原子操作 5、总结 原子操作: 更小的代码片段,并且该片段必定是连续执行的,不可分割。 1.1 原子操作std::atomic与互斥量的区别 1) 互斥量 :类模板,保护一段共享代码段,可以是一段代码,也可以是一个

    2023年04月26日
    浏览(37)
  • c++并发编程实战-第3章 在线程间共享数据

    多线程之间共享数据,最大的问题便是数据竞争导致的异常问题。多个线程操作同一块资源,如果不做任何限制,那么一定会发生错误。例如: 输出: 显然,上面的输出结果存在问题。出现错误的原因可能是: 某一时刻, th1线程获得CPU时间片,将g_nResource从100增加至200后时

    2024年02月08日
    浏览(38)
  • 并发编程 --- 信号量线程同步

    上文编码技巧 --- 同步锁对象的选定中,提到了在C#中,让线程同步有两种方式: 锁(lock、Monitor等) 信号量(EventWaitHandle、Semaphore、Mutex) 加锁是最常用的线程同步的方法,就不再讨论,本篇主要讨论使用信号量同步线程。 实际上,再C#中 EventWaitHandle 、 Semaphore 、 Mutex 都是

    2024年02月16日
    浏览(42)
  • 16 Go并发编程(三): Go并发的传统同步机制

    Go 传统同步机制 在《Go并发编程初探》中我们提到同步概念,所谓同步是相对异步而言,即串行相对于并行。 在学习Go通信机制时我们知道管道其实就是并发单元同步方式的一种,基于CSP并发模型,Go在语言原语上使管道作为核心设计,这是Go的设计哲学,也是Go所提倡的同步

    2023年04月08日
    浏览(38)
  • 并发编程-模式篇(同步模式之保护性暂停)

    即 Guarded Suspension,用在一个线程等待另一个线程的执行结果 要点: 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者) JDK 中,join 的实现、Future 的实现,采用的就

    2024年02月07日
    浏览(35)
  • 【并发编程】线程池多线程异步去分页调用其他服务接口获取海量数据

    前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。 刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些

    2024年02月13日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包