CompletableFuture、ListenableFuture高级用列

这篇具有很好参考价值的文章主要介绍了CompletableFuture、ListenableFuture高级用列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

CompletableFuture
链式

 public static void main(String[] args) throws Exception {
    	 CompletableFuture<Integer> thenCompose = T1()
        .thenCompose(Compress::T2)
        .thenCompose(Compress::T3);
	    Integer result = thenCompose.get();
	    System.out.println(result);
    	 
    }
    // 假设这些是异步操作,并返回CompletableFuture<Integer>
    public static CompletableFuture<Integer> T1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 1;
        });
    }

    public static CompletableFuture<Integer> T2(int valueFromT1) {
        return CompletableFuture.supplyAsync(() -> {
            // 使用上一步的结果进行计算
            int result = valueFromT1 * 2;
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return result;
        });
    }

    public static CompletableFuture<Integer> T3(int valueFromT2) {
        return CompletableFuture.supplyAsync(() -> {
            // 使用上一步的结果进行计算
            int finalResult = valueFromT2 + 10;
            return finalResult;
        });
    }

异步操作集合对象入库

public void saveUsers(List<User> users) throws InterruptedException, ExecutionException {
     // 将大集合分成若干个小集合,每个大小为100(具体分片大小根据实际需求调整)
     List<List<User>> partitions = users.stream()
             .collect(Collectors.groupingBy(it -> users.indexOf(it) / 100))
             .values().stream()
             .collect(Collectors.toList());

     List<CompletableFuture<Void>> futures = new ArrayList<>();
     for (List<User> partition : partitions) {
         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
             partition.forEach(user -> {
                 userRepository.save(user); // 假设这是你的保存方法
             });
         });
         futures.add(future);
     }

     // 等待所有任务完成
     CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
 }

CompletableFuture
异常

public static void saveTut(List<User> users) throws InterruptedException, ExecutionException{
        // 将大集合分成若干个小集合
        ThreadLocal<AtomicInteger> threadLocal = ThreadLocal.withInitial(() -> new AtomicInteger(0));
        List<List<User>> partitions = Lists.partition(users, 10);
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (List<User> partition : partitions) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            		 partition.forEach(user -> {
                         AtomicInteger value = threadLocal.get();
                         value.set(user.getId());
                         value.incrementAndGet();
                    });
                
            });

            // 添加异常处理器
            future.exceptionally(ex -> {
                // 记录异常信息
            	//System.out.println("===="+threadLocal.get().intValue());
                return null;
            });

            futures.add(future);
        }

        // 等待所有任务完成,并处理整体完成时的异常
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        allFutures.thenAccept(v -> {
            System.out.println("All save tasks completed.");
        }).exceptionally(ex -> {
            // 记录整体完成时的异常
        	//System.err.println(ex.getMessage());
            return null;
        });

        // 确保阻塞直到所有异步操作完成
        allFutures.get();
} 

ListenableFuture文章来源地址https://www.toymoban.com/news/detail-786034.html

public class ListProcessingExample  {

    private static final ListeningExecutorService EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
 
    public static void main(String[] args) throws InterruptedException {
        List<User> users = Arrays.asList(new User(1), new User(2), new User(3));
        //processUsersAsync001(users);
    }
    
    public static void processUsersAsync001(List<User> users) throws InterruptedException {
        // 将用户列表分为3个分区(根据实际情况调整分区数量)
        int partitionSize = (int) Math.ceil((double) users.size() / 3);//线程数
        List<List<User>> partitions = Lists.partition(users, partitionSize);

        List<ListenableFuture<Void>> futures = new ArrayList<>();

        for (List<User> partition : partitions) {
            ListenableFuture<Void> future = EXECUTOR_SERVICE.submit(() -> {
                partition.forEach(user -> {
                    System.out.println("Processing user: " + user.getId());
                });
                return null;
            });

            Futures.addCallback(future, new FutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    System.out.println("Successfully processed a batch of users.");
                }

                @Override
                public void onFailure(Throwable t) {
                    System.err.println("Error processing a batch of users, error: " + t.getMessage());
                }
            }, MoreExecutors.directExecutor());

            futures.add(future);
        }

        // 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)
        EXECUTOR_SERVICE.shutdown();
        EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    }

    /**
     * 增加成功回调获取数据
     */
    public static void processUsersAsync002(List<User> users) throws InterruptedException {
        // 将用户列表分为3个分区(根据实际情况调整分区数量)
        int partitionSize = (int) Math.ceil((double) users.size() / 3);
        List<List<User>> partitions = Lists.partition(users, partitionSize);

        List<ListenableFuture<ProcessedUserResult>> futures = new ArrayList<>();

        for (List<User> partition : partitions) {
            ListenableFuture<ProcessedUserResult> future = EXECUTOR_SERVICE.submit(() -> {
                ProcessedUserResult result = new ProcessedUserResult();
                partition.forEach(user -> {
                    System.out.println("Processing user: " + user.getId());
                    result.successfulIds.add(user.getId());
                });
                return result;
            });

            Futures.addCallback(future, new FutureCallback<ProcessedUserResult>() {
                @Override
                public void onSuccess(ProcessedUserResult result) {
                    System.out.println("Successfully processed users with IDs: " + result.successfulIds);
                }

                @Override
                public void onFailure(Throwable t) {
                    System.err.println("Error processing a batch of users, error: " + t.getMessage());
                }
            }, MoreExecutors.directExecutor());

            futures.add(future);
        }

        // 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)
        EXECUTOR_SERVICE.shutdown();
        EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    }

    
    
    
    static class ProcessedUserResult {

        // 用于存储成功处理的用户ID列表
        private List<Integer> successfulIds = new ArrayList<>();
        // 用于存储失败处理的用户ID列表
        private List<Integer> failedIds = new ArrayList<>();

        public void addSuccessfulId(int userId) {
            this.successfulIds.add(userId);
        }

        public List<Integer> getSuccessfulIds() {
            return successfulIds;
        }
        public void addFailedId(int userId) {
            this.failedIds.add(userId);
        }
        public List<Integer> getFailedIds() {
            return failedIds;
        }

    }

    static class User {
        private int id;

        public User(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }
    }
}

到了这里,关于CompletableFuture、ListenableFuture高级用列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java的CompletableFuture,Java的多线程开发

    如下图: 以后用到再加 get() 和 join() 方法区别? 都可以阻塞线程 —— 等所有任务都执行完了再执行后续代码。 anyOf() 和 allOf() 的区别? 无返回值 推荐: 开启多线程——无返回值的——阻塞 :test06 有返回值 推荐:开启多线程——有返回值的,返回一个新的List——阻塞—

    2024年02月06日
    浏览(45)
  • 【Java 8 新特性】Java CompletableFuture supplyAsync()详解

    supplyAsync()是 Java 8 引入的 CompletableFuture 静态方法。 supplyAsync() 默认完成在 ForkJoinPool.commonPool() 或指定 Executor 中异步执行的任务。 方法声明:supplyAsync(Supplier supplier) 需要将Supplier作为任务传递给 supplyAsync() 方法。 默认情况下,此任务将在 ForkJoinPool.commonPool() 中异步完成执行,

    2024年02月22日
    浏览(56)
  • 「Java」《深入解析Java多线程编程利器:CompletableFuture》

    多线程编程是指在一个程序中同时执行多个线程来提高系统的并发性和响应性。在现代计算机系统中,多线程编程已经成为开发者日常工作的一部分。以下是对多线程编程需求和挑战的介绍: 需求: 提高系统的性能:通过同时执行多个线程,可以利用多核处理器的优势,实

    2024年02月11日
    浏览(50)
  • CompletableFuture:Java中的异步编程利器

    前言: 在秋招的面试中,面试官问了很多关于异步编程相关的知识点,朋友最近也和我聊到了这个话题,因此今天咱们来讨论讨论这个知识点! 随着现代软件系统的日益复杂,对于非阻塞性和响应性的需求也在不断增加。Java为我们提供了多种工具和技术来满足这些需求,其

    2024年02月04日
    浏览(37)
  • Java组合式异步编程CompletableFuture

    CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。 CompletableFuture实现了两个接口,一个

    2024年04月09日
    浏览(38)
  • 【Java8新特性--->异步处理】CompletableFuture

    一、引入 假设一个商品详情页需要以下操作: 查询展示商品的基本信息耗时:0.5s 查询展示商品的销售信息耗时:0.7s 查询展示商品的图片信息耗时:1s 查询展示商品销售属性耗时:0.3s 查询展示商品规格属性耗时:1.5s 查询展示商品详情信息耗时:1s 即使每个查询时间耗时不

    2024年02月06日
    浏览(43)
  • 【Java笔记+踩坑汇总】Java基础+进阶+JavaWeb+SSM+SpringBoot+瑞吉外卖+SpringCloud+黑马旅游+谷粒商城+学成在线+MySQL高级篇+设计模式+面试题汇总+源码

    本文是“Java学习路线”专栏的导航文章,目标是为Java工程师提供一套 完整的Java学习路线 。 目录 0.摘要/资料/代码整理 1.Java基础+进阶 2.MySQL,JavaWeb,Mybatis,前端 3.Git 4.SSM(Spring,SpringMVC,Mybatis)框架 5.Maven高级 6.Springboot,MybatisPlus,JPA框架 7.瑞吉外卖、Redis、Nginx、Linux、mysql主从复制

    2024年02月16日
    浏览(242)
  • 【Java笔记+踩坑汇总】Java基础+进阶+JavaWeb+SSM+SpringBoot+瑞吉外卖+SpringCloud+黑马旅游+谷粒商城+学成在线+MySQL高级篇+设计模式+常见面试题+源码

    本文是“Java学习路线”专栏的导航文章,目标是为Java工程师提供一套 完整的Java学习路线 。 目录 0.摘要/资料/代码整理 1.Java基础+进阶 2.MySQL,JavaWeb,Mybatis,前端 3.Git 4.SSM(Spring,SpringMVC,Mybatis)框架 5.Maven高级 6.Springboot,MybatisPlus,JPA框架 7.瑞吉外卖、Redis、Nginx、Linux、mysql主从复制

    2024年02月06日
    浏览(76)
  • 从 Future 到 CompletableFuture:简化 Java 中的异步编程

    在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。 在这篇

    2024年02月11日
    浏览(34)
  • CompletableFuture与线程池:Java 8中的高效异步编程搭配

    摘要:在Java 8中,CompletableFuture和线程池的结合使用为程序员提供了一种高效、灵活的异步编程解决方案。本文将深入探讨CompletableFuture和线程池结合使用的优势、原理及实际应用案例,帮助读者更好地理解并掌握这一技术。 随着多核处理器的普及,应用程序的性能和响应能

    2024年02月07日
    浏览(62)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包