当前位置 博文首页 > 文章内容

    C++ 多线程编程建议之 C++ 对多线程/并发的支持(下)

    作者:shunshunshun18 栏目:未分类 时间:2021-10-12 18:10:14

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    前言:

    本文承接前文  C++ 对多线程/并发的支持(上) ,翻译自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅(A Tour of C++)一书的第 13 章 Concurrency。本文将继续介绍 C++ 并发中的 future/promise,packaged_task 以及 async() 的用法。

    1、通信任务

    标准库还在头文件 <future> 中提供了一些机制,能够让编程人员基于更高的抽象层次任务来开发,而不是直接使用低层的线程、锁:

    • future promise:用于从任务(另一个线程)中返回一个值
    • packaged_task:帮助启动任务,封装了 future promise,并且建立两者之间的关联
    • async() :像调用一个函数那样启动一个任务。形式最简单,但也最强大!

    1.1 future 和 promise

    future promise 可以在两个任务之间传值,而无需显式地使用锁,实现了高效地数据传输。其基本想法很简单:当一个任务向另一个任务传值时,把值放入 promise,通过特定的实现,使得值可以通过与之关联的 future 读出(一般谁启动了任务,谁从 future 中取结果)。

    假如有一个 future<X>fx,我们可以通过 get() 获取类型 X 的值:

    X v = fx.get(); // if necessary, wait for the value to get computed
    
    

    如果值还没有计算出,则调用 get() 的线程阻塞,直到有值返回。如果值无法计算出,get()可能抛出异常。

    promise 的主要目的是提供一个简单的“put”的操作(set_value set_exception),和 future get() 相呼应。

    如果你有一个 promise,需要发送一个类型为 X 的结果到一个 future,你要么传递一个值,要么传递一个异常。举个例子:

    void f(promise<X>& px) // 一个任务:把结果放入 px
    {
        try {
            X res;
            // 计算 res 的值
            px.set_value(res);
        }
        catch(...) { // 如果无法计算 res 的值
            px.set_exception(current_exception()); // 传异常到 future 的线程
        }
    }
    
    
    

    current_exception() 即捕获到的异常。

    要处理通过 future 传递的异常,get() 的调用者必须在什么地方捕获,例如:

    void g(future<X>& fx) // 一个任务;从 fx 提取结果
    {
        try {
            X v = fx.get(); // 如有必要,等待值计算完成
            // 使用 v
        }
        catch(...){ // 无法计算 v
            // 错误处理
        }
    }
    
    

    如果 g() 不需要自己处理错误,代码可以进一步简化:

    void g(future<X>& fx) // 一个任务;从 fx 提取结果
    {
        X v = fx.get(); // 如有必要,等待值计算完成
        // 使用 v
    }
    
    

    思考:future 和 promise 是怎么关联起来的?

    1.2 packaged_task

    如何把 future 放入一个需要结果的任务,并且把与之关联的、产生结果的 promise 放入线程?packaged_task 可以简化任务的设置,关联 future/promisepackaged_task 封装了把返回值或异常放入 promise 的操作,并且调用 packaged_task get_future() 方法,可以得到一个与 promise 关联的 future。举个例子,我们可以设置两个任务,借助标准库的 accumulate() 分别累加 vector<double> 的前后部分:

    double accum (double* beg, double* end, double init) // 计算以 init 为初值,[beg,end) 的和
    {
        return accumulate(beg,end,init);
    }
    
    double comp2(vector<double>& v)
    {
        using Task_type = double(double*,double*,double); // 任务的类型
    
        packaged_task<Task_type> pt0 {accum}; // 打包任务(即 accum)
        packaged_task<Task_type> pt1 {accum};
    
        future<double> f0 {pt0.get_future()}; // 取得 pt0 的 future
        future<double> f1 {pt1.get_future()}; // 取得 pt1 的 future
    
        double* first = &v[0];
        thread t1{move(pt0),first,first+v.size()/2,0};          // 为 pt0 启动线程
        thread t2{move(pt1),first+v.size()/2,first+v.size(),0}; // 为 pt1 启动线程
    
        return f0.get() + f1.get();
    }
    

    packaged_task 模板以任务的类型(Task_type,double(double*,double*,double) 的别名)作为其模板参数,以任务(accum)作为其构造函数的参数。move() 操作是必要的,因为 packaged_task 不可拷贝(只能移动)。packaged_task 不可拷贝是因为它是一个资源处理程序(resource handler),拥有 promise 的所有权,并且(间接地)负责与之关联的任务可能拥有的资源。

    请注意,这里的代码没有显式地使用锁:我们能够专注于要完成的任务,而不是来管理它们通信的机制。这两个任务在不同的线程中执行,具有了潜在的并发性。

    1.3 async()

    我在本章所追求的思路,最简单,但也非常强大:把任务看成是一个恰巧可能和其他任务同时运行的函数。这并不是 C++ 标准库所支持的唯一模型,但它能很好地满足各类广泛的需求。其他更微妙、棘手的模型,如依赖于共享内存的编程风格也可以根据实际需要使用。

    要启动潜在异步执行的任务,我们可以用 async():

    double comp4(vector<double>& v) // 如果 v 足够大,派生多个任务
    {
        if(v.size()<10000) // 犯得着用并发吗?
            return accum(v.begin(),v.end(),0);
        
        auto v0 = &v[0];
        auto sz = v.size();
        
        auto f0 = async(accum,v0,v0+sz/4,0.0);
        auto f1 = async(accum,v0+sz/4,v0+sz/2,0.0);
        auto f2 = async(accum,v0+sz/2,v0+sz*3/4,0.0);
        auto f3 = async(accum,v0+sz*3/4,v0+sz,0.0);
        
        return f0.get()+f1.get()+f2.get()+f3.get(); // 收集 4 部分的结果,求和
    }
    
    

    大体上,async() 把“调用部分”和“获取结果部分“分离开来,并且将两者和实际执行的任务分离。使用 async() 你不需要考虑线程、锁;你只要从任务(潜在地、异步地计算结果)的角度去考虑就可以了。async() 也有明显的限制:使用了共享资源、需要上锁的任务无法使用 async() ,你甚至不知道会用到多少线程,这完全是由 async() 决定的,它会根据调用时系统可用资源的情况,决定使用多少线程。例如,async() 在决定使用几个线程前,会检查有多少核心(处理器)空闲。

    示例代码中的猜测计算开销和启动线程的相对开销(v.size()<10000)只是一个很原始、粗略的性能估计。这里不适合展开讨论怎么去管理线程,但这个估计仅仅是一个简单(可能很烂)的猜测。

    请注意,async()不仅仅是专门用于并行计算、提高性能的机制。例如,它也能用于派生任务,从用户获取输入,让“主程序”忙其他事情。

    2、建议

    使用并发改善响应性和吞吐量
    尽可能在最高级别的抽象上工作(比如优先考虑 asyncpackaged_task 而不是 threadmutex
    考虑使用进程作为线程的替代方案
    标准库的并发支持是类型安全的
    内存模型把多数程序员从考虑机器架构的工作中解放出来
    内存模型使得内存的表现和我们的预期基本一致
    原子操作为无锁编程提供了可能性
    把无锁编程留给专家
    有时顺序操作比起并发更简单、更快
    避免数据竞争(不受控地同时访问可变数据)
    std::thread 是类型安全的系统线程接口
    join() 等待一个线程结束
    尽量避免显式共享数据
    unique_lock 管理 mutexes
    lock() 一次性获取多个锁
    condition_variable 管理线程之间的通信
    从(可以并行执行的)任务的角度思考,而非线程
    不要低估“简单性”的价值
    选择 packaged_task future,而不是直接使用 thread mutex
    promise 返回结果,从 future 获取结果
    packaged_task 处理任务抛出的异常或返回值
    packaged_task future 来表示对外部服务的请求,以及等待其回复
    async() 启动简单的任务

    上一篇:  C++ 对多线程/并发的支持(上)