响应式编程初探

这篇具有很好参考价值的文章主要介绍了响应式编程初探。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

响应式

响应式系统(Reactive System)

具有以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)响应式系统更加灵活,松耦合,可伸缩

  • 即时响应性
    只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动
  • 回弹性
    系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理
  • 弹性
    系统在不断变化的工作负载之下依然保持即时响应性。 响应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 响应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
  • 消息驱动
    消息驱动:响应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

响应式编程初探

响应式编程

响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式

变化传递
在命令式编程下,式子 a=b+c,代表a的值是由b和c计算出来的。如果b或c后续有变化,不会影响到a的值。
在响应式编程下,式子 a:=b+c,代表着a的值是由b和c计算出来的。但如果b或者c的值后续有变化,会影响到a的值。

响应式流

Reactive Streams致力于提供一套非阻塞背压的异步流处理标准规范
通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等

响应式流定义 org.reactivestreams
响应式编程初探

Reactor

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)和 Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)

响应式编程初探

Flux

Flux 是一个能够发出 0 到 N 个元素的标准的 Publisher,它会被一个“错误(error)” 或“完成(completion)”信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNext,onComplete和onError方法。
响应式编程初探

Mono

Mono 是一种特殊的 Publisher, 它最多发出一个元素,然后终止于一个 onComplete 信号或一个 onError 信号。
响应式编程初探

Scheduler

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。 Scheduler 是一个拥有广泛实现类的抽象接口。Schedulers 类提供的静态方法:

  • 当前线程(Schedulers.immediate())
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()。
  • 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源。
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同。
  • 自定义调度器,Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler。

发布订阅

  • publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)。
  • subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。
    在订阅(subscribe)前,只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦被订阅了,整个流程就开始工作了。

异常

  • onErrorReturn 返回静态缺省值
  • onErrorResume 捕获并执行一个异常处理方法,动态计算一个候补值,将异常包装后再次抛出
  • onErrorMap 捕获,包装一个新异常后再次抛出
  • doOnError 捕获异常,并继续抛出
  • retry 重试

Spring WebFlux

响应式编程初探

响应式编程初探

SpringWebFlux开发

// 处理请求参数
Mono<String> string = request.body(BodyExtractors.toMono(String.class));
Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class));
Mono<MultiValueMap<String, String>> map = request.formData(); 
// 封装响应参数
Mono<Person> person = ...
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person, Person.class);


public class PersonHandler {
    private final PersonRepository repository;
    public PersonHandler(PersonRepository repository) {
        this.repository = repository;
    }
    public Mono<ServerResponse> listPeople(ServerRequest request) { 
        Flux<Person> people = repository.allPeople();
        return ok().contentType(APPLICATION_JSON).body(people, Person.class);
    }
    public Mono<ServerResponse> createPerson(ServerRequest request) { 
        Mono<Person> person = request.bodyToMono(Person.class);
        return ok().build(repository.savePerson(person));
    }
    public Mono<ServerResponse> getPerson(ServerRequest request) { 
        int personId = Integer.valueOf(request.pathVariable("id"));
        return repository.getPerson(personId)
            .flatMap(person -> ok().contentType(APPLICATION_JSON).bodyValue(person))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
}

PersonRepository repository = ...
PersonHandler handler = new PersonHandler(repository);

RouterFunction<ServerResponse> otherRoute = ...
//接口路由
RouterFunction<ServerResponse> route = route()
    .GET("/person/{id}", accept(APPLICATION_JSON), handler::getPerson) 
    .GET("/person", accept(APPLICATION_JSON), handler::listPeople) 
    .POST("/person", handler::createPerson) 
    .add(otherRoute) 
    .build();


Spring WebFlux 也可以完全复用SpringMVC的注解

@RestController
@RequestMapping("/users")
public class MyRestController {
    private final UserRepository userRepository;
    private final CustomerRepository customerRepository;
    public MyRestController(UserRepository userRepository, CustomerRepository customerRepository) {
        this.userRepository = userRepository;
        this.customerRepository = customerRepository;
    }
    @GetMapping("/{userId}")
    public Mono<User> getUser(@PathVariable Long userId) { return this.userRepository.findById(userId);}

    @GetMapping("/{userId}/customers")
    public Flux<Customer> getUserCustomers(@PathVariable Long userId) {
	return this.userRepository.findById(userId).flatMapMany(this.customerRepository::findByUser);
    }

    @DeleteMapping("/{userId}")
    public Mono<Void> deleteUser(@PathVariable Long userId) { return this.userRepository.deleteById(userId);}
}

SpringWebFlux 使用feign进行微服务调用

<dependency>
            <groupId>com.playtika.reactivefeign</groupId>
            <artifactId>feign-reactor-spring-cloud-starter</artifactId>
            <version>3.1.5</version>
            <type>pom</type>
</dependency>

@EnableReactiveFeignClients


@ReactiveFeignClient(value = "platform-domain-gateway-server")
public interface DomainGateWayClient {

    @PostMapping("/domain/service")
    Mono<ResponseEntity<Object>> createDomainService(JsonLoadParam param);

}

参考文章
https://www.reactiveprinciples.org/
https://www.reactivemanifesto.org/zh-CN
http://www.reactive-streams.org/
https://www.tutorialspoint.com/concurrency_in_python/concurrency_in_python_reactive_programming.htm
https://link.zhihu.com/?target=https%3A//gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://projectreactor.io/
https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html文章来源地址https://www.toymoban.com/news/detail-415695.html

到了这里,关于响应式编程初探的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java 9 响应式流(Reactive Streams)

    首先出现响应式编程理念,然后出现响应式编程实现,再然后出现响应式规范,响应流主要解决处理元素流的问题—如何将元素流从发布者传递到订阅者,不而不需要发布者阻塞,或者要求订阅者有无限的缓冲区,有限的缓冲区在到达缓冲上界的时候,对到达的元素进行丢弃

    2024年02月21日
    浏览(37)
  • vue3-响应式基础之reactive

    reactive() 还有另一种声明响应式状态的方式,即使用 reactive() API。与将内部值包装在特殊对象中的 ref 不同,reactive() 将使对象本身具有响应性: 「点击按钮+1」 「示例效果」 响应式对象是 JavaScript 代理,其行为就和普通对象一样。不同的是,Vue 能够拦截对响应式对象所有属

    2024年01月16日
    浏览(51)
  • Vue3 reactive丢失响应式问题

    问题描述: 使用 reactive 定义的对象,重新赋值后失去了响应式,改变值视图不会发生变化。 测试代码: 输出结果:   从上述测试代码中,ref 定义的对象有响应式,而 reactive 定义的对象失去了响应式,这是什么原因呢?官网中写到: 如果将一个对象赋值给 ref ,那么这个对

    2024年02月05日
    浏览(40)
  • Vue3中reactive响应式失效的问题

    弹窗内部有一个挑选框,要通过请求接口获取挑选框下面可供选择的数据。 这是一个很简单的情境,我立刻有了自己的思路。如果实现搜索,数据较少可以直接用elementplus自带的filter。如果数据较多,就需要传val,在后台进行搜索,然后分页渲染。我选择的是前者,所以只需

    2024年02月10日
    浏览(37)
  • v3的响应式,ref,reactive和生命周期(简写)

    vue3笔记 1.两种安装方式 (1)直接创建项目vue3 (2)使用vite 组件传值 1.组件传值的用法 从父组件向子组件传值: Son :prop1=\\\"nmb\\\"/Son 2.defineprops函数 defineProps函数在setup标签内不需要引入,可直接使用 defineProps({ prop1:Number, }); 根据传入的值来设置格式 3.prop类型的较验 子组件required:tr

    2024年01月20日
    浏览(28)
  • vue3 reactive包裹数组无法页面无法响应式

    原代码如下: 此时removeItem方法运行,打印出historyAccount的值确实被改变了,但是页面还是没有变化 原因: 如果你直接通过赋值的方式改变 reactive 对象引用的数组,是不会触发视图的更新的,应该使用 Vue 提供的响应式方法来更新响应式数据。  改进:可以利用splice方法删除

    2024年04月09日
    浏览(47)
  • 【Vue3】使用ref与reactive创建响应式对象

    💗💗💗欢迎来到我的博客,你将找到有关如何使用技术解决问题的文章,也会找到某个技术的学习路线。无论你是何种职业,我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章,也欢迎在文章下方留下你的评论和反馈。我期待着与你分享知识、互

    2024年02月21日
    浏览(52)
  • vue3 ref reactive响应式数据 赋值的问题

    doing 遇见就记录,最后更新时间23.8.30 错误示范:直接赋值 以数组为例,对象也是一样的操作。 ref 定义的属性等价于 reactive({value:xxx}) ,所以 reactive、ref 直接重新赋值丢失响应是因为引用地址变了 正确写法 方法1: ref.value ,代码中更为清晰地表明响应式数据 方法2:包一层

    2024年02月10日
    浏览(48)
  • 前端Vue篇之Vue3响应式:Ref和Reactive

    在Vue3中,响应式编程是非常重要的概念,其中 Ref 和 Reactive 是两个关键的API。 Ref : Ref 用于创建一个响应式的基本数据类型,比如数字、字符串等。它将普通的数据变成响应式数据,可以监听数据的变化。使用 Ref 时,我们可以通过 .value 来访问和修改数据的值。 Reactive :

    2024年04月25日
    浏览(50)
  • 避大坑!Vue3中reactive丢失响应式的问题

    在vue3中,我们定义响应式数据无非是ref和reactive。 但是有的小伙伴会踩雷!导致定义的响应式丢失的问题。 大错特错!!! 大错特错!!! 1.ref 定义数据(包括对象)时,都会变成 RefImpl(Ref 引用对象) 类的实例,无论是修改还是重新赋值都会调用 setter,都会经过 reactive 方法

    2023年04月16日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包