SpringCloud(17)之SpringCloud Stream

这篇具有很好参考价值的文章主要介绍了SpringCloud(17)之SpringCloud Stream。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Spring Cloud Stream介绍

        Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、使用者组和有状态分区的支持。  

        它可以基于 Spring Boot来创建独立的、可用于生产的  Spring应用程序,Spring Cloud Stream为一些供应商的消息   中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通   过使用 Spring Cloud Stream ,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前 Spring Cloud Stream 支持 RabbitMQ  Kafka 自动化配置。

        目前Spring Cloud Stream只适配以下中间件信息:

SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息

二、Spring Cloud Stream 工作流程

        Spring Cloud Stream应用程序由一个与中间件无关的核心组成。应用程序通过在外部代理公开的目的地和代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定于Broker的详细信息由特定于中间件的Binder实现来处理。

SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息

        通过Stream可以很好的屏蔽各个中间件的API差异,它统一了API,生产者通过OUTPUT向消息中间件发 送消息,此时并不需要关心消息中间件是Kafka还是RabbitMQ,不需要关注他们的API,只需要用到Stream的API,这样可以降低学习成本。消费方通过INPUT消费指定的消息,也不需要关注消息中间件 API,架构图如上图: 

        我们对上图的对象进行说明:

  • Application Core:生产者、消费者;
  • inputs:消费者;
  • ouputs:生产者;
  • Binder:绑定器,主要和消息中间件进行绑定操作;
  • Middleware:消息中间件服务;

     SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息   

        我们项目中真正应用到Stream,只需要按照如上流程图操作即可;

 生产者:

        1:使用Source绑定消息的输出管道。

        2:通过MessageChannel输出消息。

        3:通过@EnableBinding开启binder,将生产者绑定到指定的MQ服务。

消费者:      

        1:通过@EnableBinding绑定到MQ。

        2:通过Sink绑定到输入数据管道。

        3:@StreamListener监听指定管道数据。

 2.1 Spring Cloud Stream 实战

        SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息

        如上图,当用户行程结束,用户需进入支付操作,当用户支付完成时,我们需要更新订单状态,此时我 们可以让支付系统将支付状态发送到MQ中,订单系统订阅MQ消息,根据MQ消息修改订单状态。我们 将使用 SpringCloud Stream实现该功能。

2.1.1 生产者

1)引入依赖 

   hailtaxi-pay 中引入依赖:

        <!--stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

2) 配置MQ服务

 修改 hailtaxi-pay  application.yml 添加如下配置:

server:
  port: 18083
spring:
  application:
    name: hailtaxi-pay
  cloud:
    #Consul配置
    consul:
      host: localhost
      port: 8500
      discovery:
        #注册到Consul中的服务名字
        service-name: ${spring.application.name}
    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

3)消息输出管道绑定 

/***
 * 负责向MQ发送消息
 */
@EnableBinding(Source.class)
public class MessageSender {

    @Resource
    private MessageChannel output;//消息发送管道

    /***
     * 发送消息
     * @param message
     * @return
     */
    public Boolean send(Object message) {
        //消息发送
        boolean bo = output.send(MessageBuilder.withPayload(message).build());
        System.out.println("*******send message: "+message);
        return bo;
    }
}

 参数说明:

Source.class:绑定一个输出消息管道Channel。

MessageChannel:发送消息对象,默认是DirectWithAttributesChannel,发消息在 AbstractMessageChannel中完成。

MessageBuilder.withPayload:构建消息。

        此时大家可能会有一个疑问?如果我们多个channel,在rabbitMQ中就是说我一个服务有多个交换机该怎么办?

        我们来看下 Source.class里面定义的内容是什么,定义的内容如下:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

        所以说如果此时我们要新的管道的话,我们就可以参考Source来定义新的类,然后OUTPUT就定义新的管道名称,然后再配置文件中我们就定义这个新的管道名称。 

4)消息发送 

  com.itheima.pay.controller.TaxiPayController 中创建支付方法用于发送消息,代码如下:

    /***
     * 支付  http://localhost:18083/pay/wxpay/1
     * @return
     */
    @GetMapping(value = "/wxpay/{id}")
    public TaxiPay pay(@PathVariable(value = "id")String id){
        //支付操作
        TaxiPay taxiPay = new TaxiPay(id,310,3);
        //发送消息
        messageSender.send(taxiPay);
        return taxiPay;
    }

2.1.2 消费者 

1)修改配置 

 修改 hailtaxi-order 的核心配置文件 application.yml ,在文件中配置要监听MQ信息:

server:
  port: 18082
spring:
  application:
    name: hailtaxi-order
  zipkin:
    #zipkin服务地址
    base-url: http://localhost:9411
  sleuth:
    sampler:
      probability: 1  #采样值,0~1之间,1表示全部信息都手机,值越大,效率越低
  cloud:
    #Consul配置
    consul:
      host: localhost
      port: 8500
      discovery:
        #注册到Consul中的服务名字
        service-name: ${spring.application.name}
    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
          group: paygroup #所属分组

2)消息监听 

  hailtaxi-order 中创建消息监听对象 com.itheima.order.mq.MessageReceiver ,代码如下:

@EnableBinding(Sink.class)
public class MessageReceiver {

    @Value("${server.port}")
    private String port;

    /****
     * 消息监听
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(String message) {
        System.out.println("消息监听(增加用户积分、修改订单状态)-->" + message+"-->port:"+port);
    }
}

参数说明:

Sink.class:绑定消费者管道。

@StreamListener(Sink.INPUT):监听消息配置,指定了消息为application中的input


1.3 消息分组

         消息分组有2个好处,分别是集群合理消费、数据持久化。

 1.3.1集群消费下的分组

1)分组的意义

        分组在项目中是有非常重大的意义,通常应用于消息并发高、消息堆积的场景,这些场景服务消费方通 常会做集群操作,一旦做集群操作,我们又需要项目中的消费者合理消费,比如用户打车支付完成后, 我们需要增加用户积分同时修改订单状态,如果集群环境中有2台服务器都执行该消费操作,此时用户  积分会增加两次,就会造成非幂等问题。 

SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息 

        此时集群中相同服务应该属于同一个组,同一个组中只允许有一个足节点消费某一个信息,这样就可以 避免费幂等问题的出现。

2)分组实战 

        新增一个 hailtaxi-order消费者节点:

SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息  

        此时运行起来,  18082  18182 节点会同时消费所有数据。 

        修改 hailtaxi-order 的核心配置文件 application.yml ,添加分组: 

SpringCloud(17)之SpringCloud Stream,SpringCloud,spring cloud,spring,Stream,分布式消息 

        此时再次测试,可以发现消费者不会重复消费数据。 

1.3.2 数据持久化

        我们把分组去掉,停掉 hailtaxi-order 服务,然后请求 http://localhost:18083/pay/wxpay/1 送数据,发送完数据后,再启动 hailtaxi-order服务,此时发现没有数据可以消费,这是因为数据没 有持久化,是一种广播模式,如果需要数据持久化,得给每个消费节点添加group组即可。文章来源地址https://www.toymoban.com/news/detail-839007.html

到了这里,关于SpringCloud(17)之SpringCloud Stream的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot 如何使用 Spring Cloud Stream 处理事件

    在分布式系统中,事件驱动架构(Event-Driven Architecture,EDA)已经成为一种非常流行的架构模式。事件驱动架构将系统中的各个组件连接在一起,以便它们可以相互协作,响应事件并执行相应的操作。SpringBoot 也提供了一种方便的方式来处理事件——使用 Spring Cloud Stream。 Spr

    2024年02月10日
    浏览(49)
  • 《微服务实战》 第十六章 Spring cloud stream应用

    第十六章 Spring cloud stream应用 第十五章 RabbitMQ 延迟队列 第十四章 RabbitMQ应用 https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来bindin

    2024年02月06日
    浏览(41)
  • 实战:Spring Cloud Stream消息驱动框架整合rabbitMq

    相信很多同学都开发过WEB服务,在WEB服务的开发中一般是通过缓存、队列、读写分离、削峰填谷、限流降级等手段来提高服务性能和保证服务的正常投用。对于削峰填谷就不得不用到我们的MQ消息中间件,比如适用于大数据的kafka,性能较高支持事务活跃度高的rabbitmq等等,MQ的

    2024年02月08日
    浏览(46)
  • Spring Cloud Gateway 超时、CORS配置 | Spring Cloud 17

    Spring Cloud Gateway 可以为所有路由配置 Http 超时(响应和连接) ,并为每个特定路由覆盖设置。 配置全局 http 超时: connect-timeout 必须 以毫秒为单位 指定。 response-timeout 必须指定为 java.time.Duration 使用示例:

    2024年02月14日
    浏览(55)
  • 最新版 !快速掌握 JDK17 + springboot3 + springcloud Alibaba : 10、Seata 整合实现分布式事务

    上一节成功启动了seata,传送门: https://blog.csdn.net/qq_16089135/article/details/133989446 1.1 官方文档 中文文档 Seata 是什么 1.2 模式分类 AT :基于支持本地 ACID 事务的关系型数据库。 Java 应用,通过 JDBC 访问数据库。 整体机制:二阶段提交。 一阶段:业务数据和回滚日志记录在同一

    2024年02月06日
    浏览(54)
  • 【Spring Cloud】Spring Cloud Alibaba-- 分布式事务Seata原理

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能与简单易用的分布式事务服务,为用户提供了 AT、TCC、SAGA 和 XA 几种不同的事务模式: AT模式:无侵入式的分布式事务解决方案,适合不希望对业务进行改造的场景,但由于需要添加全局事务锁,对影响高并发系统的

    2024年02月08日
    浏览(78)
  • Spring Cloud Stream 4.0.4 rabbitmq 发送消息多function

    注意当多个消费者时,需要添加配置项:spring.cloud.function.definition 启动日志 交换机名称对应: spring.cloud.stream.bindings.demo-in-0.destination配置项的值 队列名称是交换机名称+分组名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 问题总结 问题一 解决办法: 查看配置是否正确: spring

    2024年02月19日
    浏览(45)
  • Spring Cloud Stream解密:流式数据在微服务中的魔力

    欢迎来到我的博客,代码的世界里,每一行都是一个故事 在微服务的大舞台上,数据流就像一曲美妙的交响乐,而Spring Cloud Stream正是指挥家,将音符有序地传递给每个微服务。在这篇文章中,我们将揭开Spring Cloud Stream的神秘面纱,一起探索在微服务体系结构中如何通过流式

    2024年02月20日
    浏览(41)
  • 微服务SpringCloud教程——Spring Cloud是什么

    Spring Cloud 是一款基于 Spring Boot 实现的微服务框架。Spring Cloud 源自 Spring 社区,主要由 Pivotal 和 Netflix 两大公司提供技术迭代和维护。 随着微服务的火爆流行,国内外各大互联网公司都相继分享了他们在微服务架构中,针对不同场景出现的各种问题的解决方案和开源框架。

    2024年02月15日
    浏览(48)
  • 实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq

    前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多

    2024年02月08日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包