响应式编程Reactor API大全(中)

这篇具有很好参考价值的文章主要介绍了响应式编程Reactor API大全(中)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

   <dependencyManagement>
       <dependencies>
           <dependency>
               <groupId>io.projectreactor</groupId>
               <artifactId>reactor-bom</artifactId>
               <version>2023.0.0</version>
               <type>pom</type>
               <scope>import</scope>
           </dependency>
       </dependencies>
   </dependencyManagement>


   <dependencies>
       <dependency>
           <groupId>io.projectreactor</groupId>
           <artifactId>reactor-core</artifactId>
       </dependency>
       <dependency>
           <groupId>io.projectreactor</groupId>
           <artifactId>reactor-test</artifactId>
           <scope>test</scope>
       </dependency>
       <dependency>
           <groupId>org.junit.jupiter</groupId>
           <artifactId>junit-jupiter</artifactId>
           <version>5.7.2</version>
           <scope>test</scope>
       </dependency>

   </dependencies>

22. 使用 Reactor 的 elapsed 方法进行时间测量

elapsed 方法可以用于测量元素发射之间的时间间隔,返回包含时间间隔和元素的元组。

import reactor.core.publisher.Flux;
import java.time.Duration;

public class ReactorElapsedExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<Integer> source = Flux.just(1, 2, 3, 4, 5)
                .delayElements(Duration.ofSeconds(1));

        source.elapsed()
                .subscribe(tuple -> {
                    long elapsedTime = tuple.getT1();
                    int value = tuple.getT2();
                    System.out.println("Elapsed Time: " + elapsedTime + "ms, Value: " + value);
                });

        Thread.sleep(23333);
    }
}

23. 使用 Reactor 的 cache 方法进行结果缓存

cache 方法可以用于缓存结果,避免多次计算相同的数据流。

import reactor.core.publisher.Flux;

public class ReactorCacheExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 3)
                .log() //日志
                .cache();

        source.subscribe(System.out::println); // 输出: 1, 2, 3
        source.subscribe(System.out::println); // 输出: 1, 2, 3 直接从缓存中取,日志中显示,未调用request、onNext等方法
    }
}

24. 使用 Reactor 的 reduce 方法进行聚合操作

reduce 方法用于对数据流中的元素进行聚合操作,返回一个包含最终结果的 Mono

import reactor.core.publisher.Flux;

public class ReactorReduceExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);

        source.reduce(Integer::sum)
                .subscribe(result -> System.out.println("Sum: " + result)); // 输出: Sum: 15
    }
}

25. 使用 Reactor 的 interval 方法进行周期性操作

interval 方法可以用于创建一个周期性的数据流,用于执行定时任务。

import reactor.core.publisher.Flux;
import java.time.Duration;

public class ReactorIntervalExample {
    public static void main(String[] args) throws InterruptedException {
        Flux.interval(Duration.ofSeconds(1))
                .take(5) // 限制产生的元素数量
                .subscribe(System.out::println);

        Thread.sleep(233333);
    }
}

26. 使用 Reactor 的 onErrorContinue 方法进行错误处理

onErrorContinue 方法允许在发生错误时继续处理数据流,并提供一个处理函数,用于处理错误。

import reactor.core.publisher.Flux;

public class ReactorOnErrorContinueExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);

        // 在发生除零错误时继续处理数据流
        source.map(x -> 10 / x)
                .onErrorContinue((error, value) -> {
                    //10/0触发的异常会在最后打印
                    System.err.println("Error: " + error.getMessage() + ", Value: " + value);
                })
                .subscribe(System.out::println); 
    }
}

28. 使用 Reactor 的 materialize 方法进行错误通知

materialize 方法用于将正常元素和错误信息封装为通知对象,使得错误信息也成为数据流的一部分。

import reactor.core.publisher.Flux;

public class ReactorMaterializeExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);

        // 将正常元素和错误信息封装为通知对象
        source.map(x -> 10 / x)
                .materialize()
                .subscribe(System.out::println);
    }
}

29. 使用 Reactor 的 expand 方法进行递归操作

expand 方法用于对数据流进行递归操作,产生新的元素并加入数据流。

import reactor.core.publisher.Flux;

public class ReactorExpandExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 3);

        // 对数据流进行递归操作,每个元素产生两个新元素
        source.expand(value -> Flux.just(value * 2, value * 3))
                .take(22) // 限制产生的元素数量
                .subscribe(System.out::println);

        //原始   新元素 ->新元素 ->新元素...
        //1 2 3   -> 2 3  4 6  6 9 ->4 6  6 9     8 12  12 18    12 18   18 27 -> 8 ...

    }
}

30. 使用 Reactor 的 checkpoint 方法进行调试

checkpoint 方法用于在操作链中设置断点,以便在调试时更容易定位问题。

import reactor.core.publisher.Flux;

public class ReactorCheckpointExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);

        // 在操作链中设置断点
        source.checkpoint("Initial Source")
                .map(x -> x * 2)
                .checkpoint("Mapped Source")
                .subscribe(System.out::println);
    }
}
  • 好像没啥用

31. 使用 Reactor 的 groupBy 方法进行分组操作

groupBy 方法用于将数据流中的元素进行分组,返回一个 GroupedFlux

import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;

public class ReactorGroupByExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 10);

        // 将数据流中的元素按奇偶分组
        Flux<GroupedFlux<String, Integer>> groupedFlux = source.groupBy(value -> value % 2 == 0 ? "Even" : "Odd");

        groupedFlux.subscribe(group -> {
            String key = group.key();
            group.subscribe(value -> System.out.println(key + ": " + value));
        });
    }
}

32. 使用 Reactor 的 concatMap 方法进行顺序操作

concatMap 方法用于对数据流中的元素进行顺序操作,并保持元素的相对顺序。

import reactor.core.publisher.Flux;

public class ReactorConcatMapExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 3);

        // 对每个元素进行异步操作,保持相对顺序
        source.concatMap(value -> Flux.just(value * 2).log())
                .subscribe(System.out::println);
    }
}

33. 使用 Reactor 的 block 方法获取结果

在某些情况下,可以使用 block 方法来阻塞等待数据流的完成,并获取最终结果。

import reactor.core.publisher.Flux;

public class ReactorBlockExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 3);

        // 阻塞等待数据流的完成,并获取最终结果
        Integer result = source.reduce((x, y) -> x + y).block();

        System.out.println("Sum: " + result); // 输出: Sum: 6
    }
}

35. 使用 Reactor 的 doFinally 方法进行清理操作

doFinally 方法用于在数据流完成时执行清理操作,无论是正常完成还是发生错误。

import reactor.core.publisher.Flux;

public class ReactorDoFinallyExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 3);

        source
                .doFinally(signalType -> System.out.println("Finally: " + signalType))
                .subscribe(System.out::println);
    }
}

36. 使用 Reactor 的 log 方法进行日志记录

log 方法用于在操作链中添加日志记录,以便更好地了解数据流的处理过程。

import reactor.core.publisher.Flux;

public class ReactorLogExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 3);

        source.log()
                .subscribe(System.out::println);
    }
}

37. 使用 Reactor 的 create 方法创建自定义 Publisher

create 方法用于创建自定义的 FluxMono,通过编程方式发射元素和控制订阅。

import reactor.core.publisher.Flux;

public class ReactorCreate2Example {
    public static void main(String[] args) {
        Flux<Integer> customFlux = Flux.create(emitter -> {
            for (int i = 1; i <= 5; i++) {
                emitter.next(i);
            }
            emitter.complete();
        });

        customFlux.subscribe(System.out::println);
    }
}

38. 使用 Reactor 的 sample 方法进行采样操作

sample 方法用于在固定的时间间隔内从数据流中采样元素。

import reactor.core.publisher.Flux;

import java.time.Duration;

public class ReactorSampleExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)); // 模拟延迟;

        // 在2秒钟采样一个元素
        source.sample(Duration.ofSeconds(2)) //数据源1秒一个,采用2秒一次。会漏掉部分数据
                .subscribe(System.out::println);

        // 阻塞主线程,让采样执行完
        Thread.sleep(233333);
    }
}

41. 使用 Reactor 的 limitRate 方法进行限流

limitRate 方法用于限制数据流的速率,防止快速生产者导致的资源耗尽。

import reactor.core.publisher.Flux;

public class ReactorLimitRateExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 1000).log();

        // 限制数据流的速率为每秒产生100个元素
        source.limitRate(100)  //一次预取100个元素; 第一次 request(100),以后request(75) (100*75=75)
                .subscribe(
                        data -> {
                            // 模拟慢速消费者
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(data);
                        },
                        error -> System.err.println("Error: " + error),
                        () -> System.out.println("Done")
                );
    }
}

学习打卡day08:响应式编程Reactor API大全(中)文章来源地址https://www.toymoban.com/news/detail-788749.html

到了这里,关于响应式编程Reactor API大全(中)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring5学习笔记—AOP编程

    ✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Spring专栏 ✨特色专栏: MySQL学习 🥭本文内容:Spring5学习笔记—AOP编程 🖥️个人小站 :个人博客,欢迎大家访问 📚个人知识库: 知识库,欢

    2024年02月12日
    浏览(40)
  • Linux高性能服务器编程 学习笔记 第五章 Linux网络编程基础API

    我们将从以下3方面讨论Linux网络API: 1.socket地址API。socket最开始的含义是一个IP地址和端口对(ip,port),它唯一表示了使用TCP通信的一端,本书称其为socket地址。 2.socket基础API。socket的主要API都定义在sys/socket.h头文件中,包括创建socket、命名socket、监听socket、接受连接、发

    2024年02月07日
    浏览(55)
  • 【四万字】网络编程接口 Socket API 解读大全

             Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。         本文讲述的 socket 内容源自 Linux man。本文主要对各 API 进行详细介绍,从而更好的理解 socket 编程。 遵循 POSIX.1 - 2001、POS

    2024年02月08日
    浏览(45)
  • Java学习笔记21——常用API

    在 java.lang 下,使用不需要导包 被 final 修饰,是最终类,没有子类 执行基本数字运算的方法 没有构造方法,直接用类名访问(被static修饰 )。 Math的常用方法 在 java.lang 下,使用不需要导包 被 final 修饰,是最终类,没有子类 System类包含几个有用的类字段和方法。它不能被

    2024年02月07日
    浏览(46)
  • Spring Boot进阶(66):翻转编程思路,探索Spring Boot响应式编程和WebFlux

            本文将介绍Spring Boot中的响应式编程以及WebFlux的使用。响应式编程是一种编程范式,它强调数据流的异步处理和响应式编程模型,能够提高程序性能和可伸缩性。WebFlux是Spring框架中的一个响应式Web框架,它支持响应式编程模式,能够轻松地处理高并发的Web请求。

    2024年02月09日
    浏览(44)
  • 【微服务】spring webflux响应式编程使用详解

    目录 一、webflux介绍 1.1 什么是webflux 1.2 什么是响应式编程 1.3 webflux特点

    2024年02月08日
    浏览(41)
  • 【Java学习笔记】 68 - 网络——TCP编程、UDP编程

    https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter21/src 目录 项目代码 网络 一、网络相关概念 1.网络通讯 2.网络 3.IP地址 4.域名 5.端口号 6.网络通讯协议 TCP协议:传输控制协议 UDP协议: 二、InetAddress类 1.相关方法 三、Socket 1.基本介绍 2.TCP网络通信编程 基本介绍 应用案例

    2024年02月04日
    浏览(53)
  • Spring Reactive:响应式编程与WebFlux的深度探索

    🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页 ——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文并茂🦖生动形象🐅简单易学!欢迎大家来踩踩~🌺 🌊 《IDEA开发秘籍专栏》 🐾 学会IDEA常用操作,工作效率翻倍~💐 🌊 《100天精通Golang(基础

    2024年02月09日
    浏览(43)
  • Java学习笔记37——网络编程01

    计算机网络 是指将地理位置不同的具有独立功能的多台计算机及其外部设备,通过通信线路连接起来,在网络操作系统,网络管理软件及网络通信协议的管理和协调下,实现资源共享和信息传递的计算机系统 网络编程 在网络通信协议下,实现网络互连的不同计算机上运行的

    2024年02月07日
    浏览(51)
  • 7.面向对象编程(基础部分)|Java学习笔记

    java 设计者 引入 类与对象(OOP) ,根本原因就是现有的技术,不能完美的解决新的新的需求. 类是抽象的,概念的,代表一类事物,比如人类,猫类…, 即它是数据类型. 对象是具体的,实际的,代表一个具体事物, 即 是实例. 类是对象的模板,对象是类的一个个体,对应一个实例

    2024年02月08日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包