当前位置 博文首页 > 文章内容

    CompletableFuture实现异步获取结果并且等待所有异步任务完成

    作者: 栏目:未分类 时间:2020-07-23 18:00:19

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    直接上代码:

    import com.google.common.collect.Lists;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.function.BiConsumer;
    
    /**
     * CompletableFuture的AllOf功能测试,等待所有任务执行完
     *
     */
    public class CompletableFutureAllOfTest {
    
        public static void main(String[] args) throws Exception {
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4,
                    100L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(10));
    
            method1(executor);
            method2(executor);
            method3(executor);
        }
    
        /**
         * 拆解写法
         * @param executor
         */
        public static void method1 (ExecutorService executor) {
            long start = System.currentTimeMillis();
            // 定义第一个任务
            CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                return "cf1";
            }, executor);
    
            cf1.whenComplete(new BiConsumer<String, Throwable>() {
                @Override
                public void accept(String t, Throwable u) {
                    System.out.println("hello " + t);
                }
            });
    
            // 定义第二个任务
            CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
                return "cf2";
            }, executor);
    
            cf2.whenComplete(new BiConsumer<String, Throwable>() {
                @Override
                public void accept(String t, Throwable u) {
                    System.out.println("hello " + t);
                }
            });
            // 开始等待所有任务执行完成
            CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2);
            System.out.println("start block");
            all.join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
        }
    
        /**
         * 合并写法
         * @param executor
         */
        public static void method2 (ExecutorService executor) {
            List<String> testList = Lists.newArrayList();
            testList.add("cf1");
            testList.add("cf2");
            long start = System.currentTimeMillis();
            CompletableFuture<Void> all = null;
            for (String str : testList) {
                // 定义任务
                CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(5000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                    return str;
                }, executor);
    
                cf.whenComplete(new BiConsumer<String, Throwable>() {
                    @Override
                    public void accept(String t, Throwable u) {
                        System.out.println("hello " + t);
                    }
                });
                all = CompletableFuture.allOf(cf);
            }
            System.out.println("start block");
            // 开始等待所有任务执行完成
            all.join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
        }
    
        /**
         * 通过Java8的stream实现,非常简洁
         * @param executor
         */
        @SuppressWarnings("rawtypes")
        public static void method3 (ExecutorService executor) {
            List<String> testList = Lists.newArrayList();
            testList.add("cf1");
            testList.add("cf2");
            long start = System.currentTimeMillis();
            CompletableFuture[] cfArr = testList.stream().
                    map(t -> CompletableFuture
                            .supplyAsync(() -> pause(t), executor)
                            .whenComplete((result, th) -> {
                                System.out.println("hello" + result);
                            })).toArray(CompletableFuture[]::new);
            // 开始等待所有任务执行完成
            System.out.println("start block");
            CompletableFuture.allOf(cfArr).join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
        }
    
        public static String pause (String name) {
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return name;
        }
    
    }

    参考: https://blog.csdn.net/jianjun200607/article/details/84027273

             https://blog.csdn.net/jianjun200607/article/details/83996833

             https://www.jianshu.com/p/6bac52527ca4