SpringCloudStream集成RabbitMQ实现消息收发

这篇具有很好参考价值的文章主要介绍了SpringCloudStream集成RabbitMQ实现消息收发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、SpringCloudStream

​ SpringCloudStream 是一个构建高扩展和事件驱动的微服务系统的框架,用于连接共有消息系统,官网地址: spring.io/projects/sp… 。整体上是把各种花里胡哨的MQ产品抽象成了一套非常简单的统一的编程框架,以实现事件驱动的编程模型。社区官方实现了RabbitMQ,Apache Kafka,Kafka Stream和Amazon Kinesis这几种产品,而其他还有很多产品比如RocketMQ,都是由产品方自行提供扩展实现。

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

所以可以看到,对于RabbitMQ,使用SpringCloudStream框架算是一种比较成熟的集成方案。但是需要主要注意的是,SpringCloudStream框架集成的版本通常是比RabbitMQ落后几个版本的,使用时需要注意。

​ SpringCloudStream框架封装出了三个最基础的概念来对各种消息中间件提供统一的抽象:

  • Destination Binders:负责集成外部消息系统的组件。
  • Destination Binding:由Binders创建的,负责沟通外部消息系统、消息发送者和消息消费者的桥梁。
  • Message:消息发送者与消息消费者沟通的简单数据结构。

​ 可以看到,这个模型非常简单,使用时也会非常方便。但是简单,意味着SCStream中的各种概念模型,与RabbitMQ的基础概念之间是有比较大的差距的,例如Exchange、Queue这些原生概念,集成到SCStream框架时,都需要注意如何配置,如何转换。

1-1、引入依赖

RabbitMQ的SpringCloudStream支持是由Spring社区官网提供的,所以这也是相当成熟的一种集成方案。但是要注意,SpringCloudStream框架集成的版本通常是比RabbitMQ产品本身落后几个版本的,使用时需要注意。

​ 他的核心依赖也就一个:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <!-- artifactId>spring-cloud-starter-stream-rabbit</artifactId -->
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
复制代码

这两个Maven依赖没有什么特别大的区别,实际上,他们的github代码库是在一起的。仓库地址:github.com/spring-clou…

依赖的版本通常建议使用SpringCloud的整体版本控制。 org.springframework.cloud#spring-cloud-dependencies#Hoxton.SR6,这样各个组件之间的版本比较安全。不建议贸然尝试新版本。

1-2、配置mq相关参数

spring.rabbitmq.addresses=192.168.253.131:5672,192.168.253.132:5672,192.168.253.133:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/mirror
复制代码

1-3、配置启动类

需要在springboot启动类上加上如下注解

@EnableBinding({Source.class, Sink.class})
复制代码

1-4、声明消息收费者

@Component
@EnableBinding(Sink.class)
public class MessageReceiver {
   private Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

   @EventListener
   @StreamListener(Sink.INPUT)
   public void process(Object message) {
        System.out.println("received message : " + message);
        logger.info("received message : {}", message);
    }
}
复制代码

1-5、声明消息发送者

@RestController
@EnableBinding(Source.class)
public class SendMessageController {

   @Autowired
   private Source source;
   
   @GetMapping("/send")
   public Object send(String message) {
      MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
      source.output().send(messageBuilder.build());
      return "message sended : "+message;
   }
复制代码

1-6、启动服务测试收发消息

启动服务之后,会在RabbitMQ服务中自动创建topic类型的交换机(scstreamExchange)及一个匹配所有routingKey(#)的队列(scstreamExchange.myinput-1),并且会进行绑定,如下:

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

1-6-1、发送消息

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

1-6-2、接收消息

这里可以看到,当前消费者不光收到了MQ消息,还收到了一些系统事件(received message相关信息)。这些系统事件需要添加@EventListener注解才能接收到。

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

下面去掉@EventListener再次测试一下

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

1-7、使用现有路由及队列发送消息

1-7-1、使用fanout模式

SpringCloudStream在使用的时候默认会创建自己的交换机和队列,如果要使用我们自己已有的,就需要进行一下配置,如一个fanout类型的exchange,绑定了四个队列的模式

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

配置信息:

#-----设置消息生产者
spring.cloud.stream.bindings.output.destination=fanoutExchange
#队列类型
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=fanout
#不用自己创建、用现有
spring.cloud.stream.rabbit.bindings.output.producer.bind-queue=false


#-----设置消息消费者
spring.cloud.stream.bindings.input.destination=fanoutExchange
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=fanout

#接收消息的队列
spring.cloud.stream.bindings.input.group=fanout.q1
#不自动创建队列
spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
#设置queue的名字只有group的名字,不包括destination
spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
spring.cloud.stream.bindings.input.content-type=text/plain
复制代码

通过以上的设置,发送消息的时候就可以给fanoutExchange交换机发送消息,这样和fanoutExchange交换机绑定的队列就都可以收到消息。

需要注意的是,SpringCloudStream中创建交换机和队列的时候,会将交换机的名称作为前缀如下:

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

因此使用我们自己创建的交换机和队列的时候,需要观察一下是否也是按照如上规则创建的,如果队列的前缀没有交换机的名称,则需要加如下配置

spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
复制代码

如下,测试项fanoutExchange发送消息之后,队列fanout.q1就可以收到消息

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

而和fanoutExchange绑定的其他三个队列的消息则仍处于待消费状态,如下

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

1-7-2、使用topic模式

1-7-2-1、配置文件

使用topic或者direct模式的时候,都会使用routingkey,但是使用SpringCloudStream的时候是无法直接穿routingKey的,这就需要在消息发送的时候设置header来进行设置

如使用topic的模式来发送,首先需要修改配置信息,如下:

#--------------使用routingkey------
#-----设置消息生产者
spring.cloud.stream.bindings.output.destination=topicExchange
#队列类型
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=topic
#是否持久化
spring.cloud.stream.rabbit.bindings.output.producer.exchange-durable=true
#不用自己创建、用现有
spring.cloud.stream.rabbit.bindings.output.producer.bind-queue=false
#设置routingkey
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.routingkey

#-----设置消息消费者
spring.cloud.stream.bindings.input.destination=topicExchange
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=topic
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-durable=true
#接收消息的队列
spring.cloud.stream.bindings.input.group=hebei.eco
#不自动创建队列
spring.cloud.stream.rabbit.bindings.input.consumer.bind-queue=false
#设置queue的名字只有group的名字,不包括destination
spring.cloud.stream.rabbit.bindings.input.consumer.queue-name-group-only=true
#设置接收消息的routingkey
spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=*.eco
spring.cloud.stream.bindings.input.content-type=text/plain
复制代码

有routingkey的配置和fanout类型没有routingkey配置不同的有

1、在发送端需要指定routingkey,headers为固定设置,routingkey为具体的key值,如name=zhangsan,则可以设置headers.name

#设置routingkey 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.routingkey
复制代码

2、在消费端配置消费的routingkey,此处配置的routingkey,就可以设置*或者#进行匹配,如下

#设置接收消息的routingkey 
spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=*.eco
复制代码

1-7-2-2、发送端代码修改

如下在MessageBuilder中需要设置Header的key和value,也就是routingkey的key和value值,在上配置中设置的key为routing,则在发送端的代码中header就设置为routingkey

@RestController
@EnableBinding(Source.class)
public class SendMessageController {

   @Autowired
   private Source source;
   
   @GetMapping("/send")
   public Object send(String message,String routingkey) {
//    MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
      MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey",routingkey);
      source.output().send(messageBuilder.build());
      return "message sended : "+message;
   }
复制代码

1-7-2-3、测试

首先设置routingkey的值为abcd,这样是无法收到消息的,因为消费端设置的routingkey的值为:*.eco

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

设置routingkey为:abcd.eco,消息就可以正常接收了,如果设置abcd.123.eco,消息就无法接收了,除非将routingkey设置为#.eco

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

二、理解SpringCloudStream都干了什么

2-1、配置RabbitMQ服务

在SpringBoot的autoconfigure包当中,有个 RabbitProperties类,这个类就会解析application.properties中以spring.rabbitmq开头的配置。里面配置了跟RabbitMQ相关的主要参数,包含服务器地址等。里面对每个参数也都提供了默认值。如果不进行配置,默认就是访问本地的RabbitMQ服务。

#这几个是默认配置。  
spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=guest  
spring.rabbitmq.password=guest  
spring.rabbitmq.virtual-host=/
复制代码

2-2、在RabbitMQ中声明Exchange和Queue

既然是要对接RabbitMQ,那么最终还是需要与RabbitMQ服务器进行交互的。从RabbitMQ的管理页面上来看,SCStream帮我们在RabbitMQ的根虚拟机上创建了一个topic类型的scstreamExchange交换机,然后在这个交换机上绑定了一个scstreamExchange.stream队列,绑定的RoutingKey是#。 而程序中的消息发送者是将消息发送到scstreamExchange交换机,然后RabbitMQ将消息转发到scstreamExchange.stream队列,消息接收者从队列接收到消息。这个流程,就是Spring Cloud Stream在背后为我们做的事情。 在这里可以尝试对应RabbitMQ的基础概念以及SCStream框架中的基础概念,整理一下他们之间的对应关系。

​ SCStream框架帮我们屏蔽了与消息中间件的交互细节,开发人员甚至都不需要感知消息中间件的存在,将更多的关注点放到业务处理的细节里。实际上,就我们这个简单的示例,只需要将maven中的spring-cloud-starter-stream-rabbit依赖,换成spring-cloud-starter-stream-kafka,就可以完成与本地Kafka服务的交互,代码不需要做任何的改动。

2-3、常用配置

在RabbitMQ的实现中,所有个性化的属性配置实现都是以spring.cloud.stream.rabbit开头,支持对binder、producer、consumer进行单独配置。

#绑定exchange  
spring.cloud.stream.binding.<bindingName>.destination=fanoutExchange  
#绑定queue  
spring.cloud.stream.binding.<bindingName>.group=myQueue  
#不自动创建queue  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.bindQueue=false  
#不自动声明exchange(自动声明的exchange都是topic)  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.declareExchange=false  
#队列名只声明组名(前面不带destination前缀)  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.queueNameGroupOnly=true  
#绑定rouytingKey  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.bindingRoutingKey=myRoutingKey  
#绑定exchange类型  
spring.cloud.stream.rabbit.bindings.<bindingName>.consumer.exchangeType=<type>  
#绑定routingKey  
spring.cloud.stream.rabbit.bindings.<bindingName>.producer.routingKeyExpression='myRoutingKey'
复制代码

通过这些配置可以按照RabbitMQ原生的方式进行声明。例如,SCStream自动创建的Exchange都是Topic类型的,如果想要用其他类型的Exchange交换机,就可以手动创建交换机,然后在应用中声明不自动创建交换机。

​ 所有可配置的属性,参见github仓库中的说明。例如,如果需要声明一个Quorum仲裁队列,那么只要给这个Binding配置quorum.enabled属性,值为true就可以了。

Stream队列目前尚不支持。RabbitMQ周边生态的发展肯定是比产品自身的发展速度要慢的,由此也可见,目前阶段,Stream队列离大规模使用还是有一点距离的。

2-4、分组消费模式

分组可以让消息实现负载均衡的策略,例如大并发过来之后,生成端会发送大量消息,而消费端消费速度较慢就可以生成多个分组,然后生产端根据策略向不同的分组发送消息,就可以加快消息的消费速度

2-4-1、配置信息

SCStream中的消费者分组策略,其实整体来看是一种类似于Kafka的分组消费机制。即,不同group的消费者,都会消费到所有的message消息,而在同一个goup中,每个message消息,只会被消费一次。这种分组消费的策略,严格来说,在RabbitMQ中是不存在的,RabbitMQ是通过不同类型的Exchange来实现不同的消费策略。而使用SCStream框架,就可以直接在RabbitMQ中实现这种分组消费的策略

#消息生产者端配置  
#启动发送者分区  
spring.cloud.stream.bindings.output.producer.partitioned=true  
#指定参与消息分区的消费端节点数量  
spring.cloud.stream.bindings.output.producer.partition-count=2  
#只有消费端分区ID为1的消费端能接收到消息  
spring.cloud.stream.bindings.output.producer.partition-key-expression=1  
  
#消息消费者端配置  
#启动消费分区  
spring.cloud.stream.bindings.input.consumer.partitioned=true  
#参与分区的消费端节点个数  
spring.cloud.stream.bindings.input.consumer.instance-count=2  
#设置该实例的消费端分区ID  
spring.cloud.stream.bindings.input.consumer.instance-index=1
复制代码

通过这样的分组策略,当前这个消费者实例就只会消费奇数编号的消息,而偶数编号的消息则不会发送到这个消费者中。**注意:**这并不是说偶数编号的消息就不会被消费,只是不会被当前这个实例消费而已。

SCStream框架虽然实现了这种分组策略机制,但是其实是不太严谨的,当把分区数量和分区ID不按套路分配时,并没有太多的检查和日志信息,但是就是收不到消息。

另外,在@StreamListener注解中还有condition属性也可以配置消费者的分配逻辑,该属性支持一个SPELl表达式,只接收满足条件的消息。

当设置了分组消费的时候,绑定的队列及routingkey就变成了如下关系

spring-cloud-starter-stream-rabbit,java,java-rabbitmq,rabbitmq,java

2-4-2、通过header灵活指定消费分组

上面的配置只设置固定的消费分组,实际场景中显然是不行的,这样就可以通过使用header来进行处理

可以配置headers.routingkey来进行动态发送

spring.cloud.stream.bindings.output.destination=scstreamExchange
#指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
#只有消费端分区ID为1的消费端能接收到消息
#spring.cloud.stream.bindings.output.producer.partition-key-expression=0
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers.routingkey

#这个input就对应Sink.INPUT strem中默认的消费队列
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=myinput
#参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
#设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=0
#启动消费分区
spring.cloud.stream.bindings.input.consumer.partitioned=true
复制代码

然后在发送端代码就可以通过设置header的routingkey来指定发送的分组了

@GetMapping("/send")
   public Object send(String message,String routingkey) {
//    MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
      MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey",routingkey);
      source.output().send(messageBuilder.build());
      return "message sended : "+message;
   }
复制代码

测试:

无法收到消息: http://localhost:8080/send?message=fdsfgg&routingkey=1

可以收到消息: http://localhost:8080/send?message=fdsfgg&routingkey=0文章来源地址https://www.toymoban.com/news/detail-769065.html

到了这里,关于SpringCloudStream集成RabbitMQ实现消息收发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】介绍及消息收发流程

    RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,

    2024年02月07日
    浏览(38)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(41)
  • SpringBoot RabbitMQ收发消息、配置及原理

    今天分析SpringBoot通过自动配置集成RabbitMQ的原理以及使用。 RabbitMQ是基于AMQP协议的message broker,所以我们首先要对AMQP做一个简单的了解。 AMQP (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers. AMQP是A

    2024年02月20日
    浏览(39)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(41)
  • 一文带你如何用SpringBoot+RabbitMQ方式来收发消息

    预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换。 本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管

    2024年02月09日
    浏览(40)
  • Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发

    前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费; 1 为什么要使用Rabbitmq: RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点: 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和

    2024年02月07日
    浏览(52)
  • 最简单的SpringCloudStream集成Kafka教程

    开发中,服务与服务之间通信通常会用到消息中间件,如果我们使用了某一个MQ,那么消息中间件与我们的系统算是高耦合。将来有一天,要替换成另外的MQ,我们的改动就会比较大。为了解决这个问题,我们可以使用Spring Cloud Stream 来整合我们的消息中间件,降低耦合度,使

    2023年04月09日
    浏览(25)
  • Android应用集成RabbitMQ消息处理指南

    RabbitMQ官网直通车 — ✈✈✈✈✈✈        最近工作繁忙,好久没有更新博文了。        对于互联网饱和的今天, 如何做到不同系统之间传递信息与通信? 在实际项目中,多个端例如:ios、android、pc、小程序采用从RabbitMQ上获取实时包消息,然后根据此实时包消息来

    2024年02月06日
    浏览(55)
  • 【SpringBoot笔记29】SpringBoot集成RabbitMQ消息队列

    这篇文章,主要介绍SpringBoot如何集成RabbitMQ消息队列。 目录 一、集成RabbitMQ 1.1、引入amqp依赖 1.2、添加连接信息 1.3、添加RabbitMQ配置类

    2023年04月08日
    浏览(55)
  • Springboot实战16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

    15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。 今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。 AMQP 规范与 RabbitM

    2024年02月02日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包