单个springboot整合rabbitmq

这篇具有很好参考价值的文章主要介绍了单个springboot整合rabbitmq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、rabbitmq介绍

1.1 rabbitmq概念

rabbitmq是一种消息中间件,是基于erlang语言开发的AMQP(高级消息队列协议)的开源实现。本质是个队列,FIFO先入先出。

1.1.1 rabbitmq特性:

  • 开源,性能优秀,稳定保障
  • 提供可靠的消息投递模式,返回模式
  • 于Spring AMQP完美整合,API丰富
  • 集群模式丰富
  • 高可用

1.1.2 rabbitmq主要结构

  • 生产者(Producer)

        将数据发送给rabbitmq的程序就是生产者

  • 交换机(Exchange)

        交换机是rabbitmq一个非常重要的部件,一方面它接收来自生产者的消息,另一方面,它将消息推动到队列中。交换机必须明确的知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这些都要由交换机的类型来决定。

  • 队列(Queue)

        队列是rabbitmq内部使用的一种数据结构,尽管消息流经rabbitmq和应用程序,但它们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列中取数据。

  • 消费者(Consumer)

        消费者大多数是一个等待接收消息的程序,这个应用程序可以跟生产者在同一个应用程序,也可以在不同的应用程序上

1.1.3 总结

可以把rabbitmq理解为是一个邮箱,投递邮件的人(生产者)将邮件(消息)放到邮箱以后,邮递员(交换机)会帮我我们送消息(将消息放到指定队列),对方(消费者)收到消息以后,就可以阅读你发送的邮件了

单个springboot整合rabbitmq

二、rabbitmq使用场景

2.1 为什么要用MQ(优点和缺点)

  •  流量削峰

        当系统在一瞬间有一万个请求进来,而我们系统最多只能处理三千条请求,那我们就可以使用mq来进行缓冲,从mq中处理系统所承受的最大限度的请求,等高峰期一过,就将mq积压的请求全部处理掉。

  • 应用解耦

        在一个项目中,有多个系统时会发生验证的耦合。当一个系统有一个重要的基础数据,而其他系统也需要这些数据。如果其他系统挂掉了,还怎么将这些数据发送过去呢?这个时候我们可以将详细发送到MQ上,如果其他系统需要这些基础数据,可以直接去mq消费就可以,不需要关心系统有没有挂掉或者超时的情况

  • 异步处理

        当一个系统收到请求,需要在自己系统处理数据,处理完成后,还需要把数据发送给其他系统,而每个系统接收请求后处理数据时间不一致。如果时同步请求的话,需要等待所有请求处理完之后,才能将结果返回。而异步时非阻塞的,可以直接将结果返回,然后再去处理数据。用mq可以把这个同步通信的过程变成异步通信的过程,从而到达降低延迟的特性。

  • 缺点 
  1. 系统可用性降低,假如mq挂了,就会影响整个项目
  2. 系统复杂度变得更高,需要考虑的问题更多,比如消息丢失,消息重复消费,消息顺序错乱,消息重复发送,消息积压等问题
  3. 容易产生一致性问题。比如A系统保存操作,再向mq发送消息,B系统成功操作数据库,C系统也成功操作,但是当D系统时操作失败,在用MQ没有事务得情况下,就会导致不一致。

2.2 在什么情况下选择rabbitMQ

 常见的四种MQ比较

特 性 ActiveMQ RabbitMQ RocketMQ Kafka
语言 Java Erlang Java Scala
单机吞吐 十万 十万
时效性 ms us ms ms(以内)
可用性 高(主从架构) 高(主从架构) 非常高
(分布式架构)
非常高
(分布式架构)
功能特性 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

2.3 centos搭建mq

  centos7搭建rabbitmq:centos7安装rabbitmq_java-zh的博客-CSDN博客

三、案例(rabbitMQ常见的五种模式讲解)

3.1 导包

<modelVersion>4.0.0</modelVersion>
<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.4.1</version>
   <relativePath/>
</parent>
<properties>
   <java.version>1.8</java.version>
</properties>
<dependencies>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>3.0.4</version>
 </dependency>
</dependencies>

yml初始配置

spring:
 rabbitmq:
    #如果是集群,用,隔开
    host: 192.168.139.128
    #端口,不能写默认端口15672
    port: 5672
    connection-timeout: 15000
    username: zhonghui
    password: 123456
    virtual-host: /
    #none值是禁用发布确认模式,是默认值
    #correlated值是发布消息成功到交换器后会触发回调方法
    #simple有两种效果,一种是和correlated值一样会触发回调方法,
    #另一种是发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步逻辑,
    #要注意的点是waitForConfrimsOrDie方法如果返回false则会关闭Channel,则接下来无法发送消息到broker
    publisher-confirm-type: correlated
    #消息回退确认机制
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: auto

3.2 简单模式

单个springboot整合rabbitmq

  • P:生产者
  • C:消费者 

Producer

@RestController
@AllArgsConstructor
@RequestMapping("/test/mq")
public class SimpleQueueProducer {

    private RabbitTemplate rabbitTemplate;

    // 发送到的队列名称
    public static final String AMQP_SIMPLE_QUEUE = "amqp.simple.queue";

    @GetMapping("/simple/{context}")
    public void sendMessage(@PathVariable String context) {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(AMQP_SIMPLE_QUEUE, context + i);
        }
    }
}

Consumer

@Component
@Slf4j
public class SimpleQueueConsumer {

    @RabbitListener(queuesToDeclare = @Queue(name = SimpleQueueProducer.AMQP_SIMPLE_QUEUE))
    public void consumerSimpleMessage(String context){
        // 通过Message解析对象消息
        log.info("简单模式内容为:{}",context);
    }
}

结果

2023-06-20 20:01:43.198  INFO 10440 --- [ntContainer#3-1] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成1
2023-06-20 20:01:43.198  INFO 10440 --- [ntContainer#3-9] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成0
2023-06-20 20:01:43.198  INFO 10440 --- [tContainer#3-10] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成3
2023-06-20 20:01:43.266  INFO 10440 --- [ntContainer#3-8] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成2
2023-06-20 20:01:43.266  INFO 10440 --- [ntContainer#3-7] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成4
2023-06-20 20:01:43.266  INFO 10440 --- [ntContainer#3-6] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成5
2023-06-20 20:01:43.268  INFO 10440 --- [ntContainer#3-4] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成7
2023-06-20 20:01:43.268  INFO 10440 --- [ntContainer#3-3] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成6
2023-06-20 20:01:43.268  INFO 10440 --- [ntContainer#3-2] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成9
2023-06-20 20:01:43.268  INFO 10440 --- [ntContainer#3-5] c.m.m.controller.SimpleQueueConsumer     : 简单模式内容为:mq的简单模式已经完成8

注解含义

  • @RabbitListener:一个监听器,主要作用时监听队列是否有消息,用于类和方法上,
  • @queuesToDeclare:将队列绑到默认交换机上
  • @Queue:队列注解,value为队列名称

3.3 Work模式

单个springboot整合rabbitmq

Producer

@RestController
@Slf4j
@AllArgsConstructor
@RequestMapping("/test/mq")
public class WorkProducer {
    private RabbitTemplate rabbitTemplate;

    // 发送到的队列名称
    public static final String AMQP_WORK_QUEUE = "amqp.work.queue";
    
    @GetMapping("/work/{context}")
    public void sendMessage(@PathVariable String context) {

        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(AMQP_WORK_QUEUE,context+i);
        }
    }
}

Consumer

@Component
@Slf4j
public class WorkConsumer {

    @RabbitListener(queuesToDeclare = @Queue(value = WorkProducer.AMQP_WORK_QUEUE))
    public void consumerSimpleMessage(String context){
        try {
            // 假设业务逻辑执行了一秒
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("work1模式内容为:{}",context);
    }
}
@Component
@Slf4j
public class WorkConsumer2 {
    @RabbitListener(queuesToDeclare = @Queue(value = WorkProducer.AMQP_WORK_QUEUE))
    public void consumerSimpleMessage2(String context){
        log.info("work2模式内容为:{}",context);
    }

}

结果

2023-06-21 09:06:07.075  INFO 21492 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的work模式已经完成1
2023-06-21 09:06:07.076  INFO 21492 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的work模式已经完成3
2023-06-21 09:06:07.076  INFO 21492 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的work模式已经完成5
2023-06-21 09:06:07.076  INFO 21492 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的work模式已经完成7
2023-06-21 09:06:07.077  INFO 21492 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的work模式已经完成9
2023-06-21 09:06:08.053  INFO 21492 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的work模式已经完成0
2023-06-21 09:06:09.054  INFO 21492 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的work模式已经完成2
2023-06-21 09:06:10.055  INFO 21492 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的work模式已经完成4
2023-06-21 09:06:11.057  INFO 21492 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的work模式已经完成6
2023-06-21 09:06:12.059  INFO 21492 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的work模式已经完成8

解析

消费者1和消费者2获取到的消息数量是相同的,一个是消费奇数号消息,一个是消费偶数号消息,不管work1和work2效率是否一致,消费消息都是相同的,这样看起来有点不合理,work2消费完成以后已经空闲下来了,所以我们修改一下配置文件

listener:
  simple:
    acknowledge-mode: auto
    #每次领取二个消息,消费以后再领取
    prefetch: 2

修改完配置文件,增加属性以后,重新运行,我们可以看见,这个时候work2消费了8条,work1消费了2条


2023-06-21 09:48:06.838  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成3
2023-06-21 09:48:06.838  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成4
2023-06-21 09:48:06.841  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成5
2023-06-21 09:48:06.841  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成7
2023-06-21 09:48:06.841  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成8
2023-06-21 09:48:06.843  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成6
2023-06-21 09:48:06.843  INFO 21956 --- [ntContainer#5-1] com.mq.mqcloud.controller.WorkConsumer2  : work2模式内容为:mq的simple模式已经完成9
2023-06-21 09:48:07.828  INFO 21956 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的simple模式已经完成0
2023-06-21 09:48:08.830  INFO 21956 --- [ntContainer#4-1] com.mq.mqcloud.controller.WorkConsumer   : work1模式内容为:mq的simple模式已经完成1

这是利用prefetch属性来实现的,如果使用prefetch,我们可以同通过增加线程处理,下面时对应的配置文件。


    listener:
      simple:
        ##manual:手动处理 auto:自动处理
        acknowledge-mode: auto
        #消费端监听个数(即@RabbitListenter开启几个线程去处理)
        concurrency: 10
        #消费端监听的最大个数
        max-concurrency: 10
        #每次领取二个消息,消费以后再领取
        prefetch: 2
        #消费不成功的消息,拒绝入队
        default-requeue-rejected: true  
        retry:
          #开启消息重试
          enabled: true 
          #重试次数.
          max-attempts: 4
          #重试最大间隔时间
          max-interval: 10000 
          #重试初始间隔时间
          initial-interval: 2000 

3.4 订阅模式(Fanout消息模型)

x:交换机

单个springboot整合rabbitmq

1、一个生产者,多个消费者

2、每个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的(一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会被消费)

Producer

@RestController
@AllArgsConstructor
@RequestMapping("/test/mq")
public class FanoutExchangeProducer {

    private RabbitTemplate rabbitTemplate;

    public static final String EXCHANGE_NAME = "exchange.fanout";

    public static final String EXCHANGE_QUEUE_1 = "exchange.fanout.queue_1";

    public static final String EXCHANGE_QUEUE_2 = "exchange.fanout.queue_2";


    /**
     * 扇形交换机,订阅模式
     *
     * @param context
     */
    @GetMapping("/fanout/{context}")
    public void sendMessage(@PathVariable String context) {
        rabbitTemplate.convertAndSend(FanoutExchangeProducer.EXCHANGE_NAME, "", context);
    }

}

Consumer 

@Component
@Slf4j
public class FanoutExchangeConsumer {


    /**
     * 订阅模式,扇形交换机
     * @param context
     */
    @RabbitListener(bindings =
    @QueueBinding(exchange = @Exchange(value = FanoutExchangeProducer.EXCHANGE_NAME,type = ExchangeTypes.FANOUT),
                  value = @Queue(value = FanoutExchangeProducer.EXCHANGE_QUEUE_1)
    ))
    @RabbitHandler
    public void exchangeFanoutQueue1(String context){
        log.info("通道1接收到的消息为:{}",context);
    }

    /**
     * 订阅模式,扇形交换机
     *
     * @param context
     */
    @RabbitListener(bindings =
    @QueueBinding(exchange = @Exchange(value = FanoutExchangeProducer.EXCHANGE_NAME,type = ExchangeTypes.FANOUT)  ,
                  value = @Queue(value = FanoutExchangeProducer.EXCHANGE_QUEUE_2)
    ))
    @RabbitHandler
    public void exchangeFanoutQueue2(String context){
        log.info("通道2接收到的消息为:{}",context);
    }
    
}

结果

消息放到了交换机以后,通道1和通道2都接收到了消息(相当于通道1和通道2都跟这个交换机订阅了消息)

2023-06-21 11:20:43.262  INFO 4696 --- [ntContainer#4-1] c.m.m.controller.FanoutExchangeConsumer  : 通道1接收到的消息为:mq的fanout模式(订阅模式)
2023-06-21 11:20:43.262  INFO 4696 --- [ntContainer#3-1] c.m.m.controller.FanoutExchangeConsumer  : 通道2接收到的消息为:mq的fanout模式(订阅模式)

3.5 路由模式(Direct消息模型)

路由消息模型是交换机根据routingKey进行消息投递的,每个队列都有自己专属的routingKey,生产者发送消息时,指定交换机和rountingKey,消息到了交换机之后,交换机通过routingKey将消息投递到指定队列

 单个springboot整合rabbitmq

 Producer

@RestController
@AllArgsConstructor
@RequestMapping("/test/mq")
public class DirectExchangeProducer {

    private RabbitTemplate rabbitTemplate;

    public static final String EXCHANGE_DIRECT = "exchange.direct";

    public static final String EXCHANGE_DIRECT_QUEUE_1 = "exchange.direct.queue_1";

    public static final String EXCHANGE_DIRECT_QUEUE_2 = "exchange.direct.queue_2";

    public static final String EXCHANGE_DIRECT_ROUTING_KEY_1 = "exchange.direct.routing.1";

    public static final String EXCHANGE_DIRECT_ROUTING_KEY_2 = "exchange.direct.routing.2";


    @GetMapping("/direct")
    public void sendMessageDirect(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
        if (1 == routingkey) {
            rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, EXCHANGE_DIRECT_ROUTING_KEY_1, context + routingkey);
        } else if (2 == routingkey) {
            rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, EXCHANGE_DIRECT_ROUTING_KEY_2, context + routingkey);
        } else {
            System.out.println("数据非法");
        }
    }
}

Consumer

@Component
@Slf4j
public class DirectExchangeConsumer {

    @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange(value = DirectExchangeProducer.EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT),
                value = @Queue(value = DirectExchangeProducer.EXCHANGE_DIRECT_QUEUE_1),
                key = DirectExchangeProducer.EXCHANGE_DIRECT_ROUTING_KEY_1))
    public void exchangeDirectRoutingKey1(String context,  Message message) {
        log.info("key1:" + message.getMessageProperties().getReceivedRoutingKey());
        log.info("路由器模式1 接收到的消息为:{}", context);
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = DirectExchangeProducer.EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT),
            value = @Queue(value = DirectExchangeProducer.EXCHANGE_DIRECT_QUEUE_2),
            key = DirectExchangeProducer.EXCHANGE_DIRECT_ROUTING_KEY_2))
    public void exchangeDirectRoutingKey2(String context,  Message message) {
        log.info("key2:" + message.getMessageProperties().getReceivedRoutingKey());
        log.info("路由器模式2 接收到的消息为:{}", context);
    }
    
}

结果

单个springboot整合rabbitmq

2023-06-22 14:48:49.642  INFO 22476 --- [ntContainer#2-2] c.m.m.controller.DirectExchangeConsumer  : key2:exchange.direct.routing.2
2023-06-22 14:48:49.643  INFO 22476 --- [ntContainer#2-2] c.m.m.controller.DirectExchangeConsumer  : 路由器模式2 接收到的消息为:内容2
2023-06-22 14:48:56.289  INFO 22476 --- [ntContainer#1-1] c.m.m.controller.DirectExchangeConsumer  : key1:exchange.direct.routing.1
2023-06-22 14:48:56.289  INFO 22476 --- [ntContainer#1-1] c.m.m.controller.DirectExchangeConsumer  : 路由器模式1 接收到的消息为:内容1
2023-06-22 14:48:59.037  INFO 22476 --- [ntContainer#2-2] c.m.m.controller.DirectExchangeConsumer  : key2:exchange.direct.routing.2
2023-06-22 14:48:59.038  INFO 22476 --- [ntContainer#2-2] c.m.m.controller.DirectExchangeConsumer  : 路由器模式2 接收到的消息为:内容2

3.6  主题模式(Topic模式,通配符模式)

在Topic模式中,Routingkey不再时固定的字符,而是有了通配符,交换机可以进行模糊匹配队列

RoutingKey一般由一个或者多个单词组成,多个单词以"."分割

"*":匹配一个单词,就只有一个单词 比如topic.*  可以匹配到topic.AB topic.AC
"#":匹配一个或者多个单词 比如topic.#  可以配到到topic.AB也可以匹配到topic.AB.AC

单个springboot整合rabbitmq

 Producer

@RestController
@AllArgsConstructor
@RequestMapping("/test/mq")
public class TopicExchangeProducer {

    private RabbitTemplate rabbitTemplate;

    public static final String EXCHANGE_TOPIC = "exchange.topic";

    public static final String EXCHANGE_TOPIC_QUEUE_1 = "exchange.topic.queue_1";

    public static final String EXCHANGE_TOPIC_QUEUE_2 = "exchange.topic.queue_2";

    //只要包含了routingkey都会匹配到
    public static final String EXCHANGE_TOPIC_ROUTING_KEY_1 = "#.routingkey.#";

    //routingkey开头,后面接了一个单词的都会匹配到,比如routing.AB
    public static final String EXCHANGE_TOPIC_ROUTING_KEY_2 = "routingkey.*";

    public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";

    //如果case_key_2这样写,那么绑定case_key_1的队列一样会接收到,因为case_key_2也一样和key1匹配上
    public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";


    @GetMapping("/topic")
    public void sendMessageDirect(@RequestParam("context") String context, @RequestParam("routingkey") Integer routingkey) {
        if (1 == routingkey) {
            rabbitTemplate.convertAndSend(EXCHANGE_TOPIC, EXCHANGE_TOPIC_CASE_KEY_1, context + routingkey);
        } else if (2 == routingkey) {
            rabbitTemplate.convertAndSend(EXCHANGE_TOPIC, EXCHANGE_TOPIC_CASE_KEY_2, context + routingkey);
        } else {
            System.out.println("数据非法");
        }
    }
}

Consumer

@Component
@Slf4j
public class TopicExchangeConsumer {


    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = TopicExchangeProducer.EXCHANGE_TOPIC,  type = ExchangeTypes.TOPIC),
            value = @Queue(value = TopicExchangeProducer.EXCHANGE_TOPIC_QUEUE_1),
            key = TopicExchangeProducer.EXCHANGE_TOPIC_ROUTING_KEY_1))
    @RabbitHandler
    public void exchangeTopicRoutingKey1(String context, Message message) {
        System.out.println("key1:"+message.getMessageProperties().getReceivedRoutingKey());
        log.info("主题模式1:内容为:{}", context);
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = TopicExchangeProducer.EXCHANGE_TOPIC, type = ExchangeTypes.TOPIC),
            value = @Queue(value = TopicExchangeProducer.EXCHANGE_TOPIC_QUEUE_2),
            key = ExchangeProducer.EXCHANGE_TOPIC_ROUTING_KEY_2))
    @RabbitHandler
    public void exchangeTopicRoutingKey2(String context,  Message message) {
        System.out.println("key2:"+message.getMessageProperties().getReceivedRoutingKey());
        log.info("主题模式2:内容为:{}", context);
    }

}

结果1

key1:topic.routingkey.case1
2023-06-22 15:32:10.554  INFO 26060 --- [ntContainer#8-1] c.m.m.controller.TopicExchangeConsumer   : 主题模式1:内容为:内容1

结果2 

因为第二个key也会被第一个匹配到,所以会产生两个结果文章来源地址https://www.toymoban.com/news/detail-496968.html

key1:routingkey.case2
key2:routingkey.case2
2023-06-22 15:32:25.451  INFO 26060 --- [ntContainer#8-1] c.m.m.controller.TopicExchangeConsumer   : 主题模式1:内容为:内容2
2023-06-22 15:32:25.451  INFO 26060 --- [ntContainer#9-1] c.m.m.controller.TopicExchangeConsumer   : 主题模式2:内容为:内容2

到了这里,关于单个springboot整合rabbitmq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot 整合 RabbitMQ

    由于有的 Idea 不选择插线无法创建 Spring Boot 项目,这里我们先随便选一个插件,大家也可以根据需求选择~~ 把版本改为 2.7.14 引入这两个依赖: 配置 application.yml文件 Config 类 : RabbitMQConfig 测试类: RabbitMQConfigTests 结果 当我们启动 测试类 之后就可以发现我们的 rabbitmq 界面里的

    2024年02月10日
    浏览(38)
  • SpringBoot整合RabbitMQ(基础)

    一.环境准备 1、在pom文件中引入对应的依赖: 2、在application.yml配置文件中配置RabbitMQ: 二、整合 点对点,简单模式 ①配置文件中声明队列 ②创建生产者 消息发送成功后,在web管理页面查看: 可以看到对应队列中产生了消息 ③创建消费者 启动项目,可以看到消息成功消费:

    2024年02月11日
    浏览(37)
  • SpringBoot整合RabbitMQ

    🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,计算机系列(火速更新中) 💭 格言:种一棵树最好的时间是十年前,其次是现在 🏡动动小手,点个关注不迷路,感

    2024年02月21日
    浏览(45)
  • SpringBoot 整合RabbitMQ

    2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出

    2024年02月15日
    浏览(52)
  • RabbitMQ整合Springboot

    目录 一、配置 二、使用 (1)创建普通交换机 (2) 创建普通队列 (3)绑定 交换机--队列 (4)创建带有死信交换机的队列 (5)生产者 (6)消费者 (7)Message对象 (8)延时队列优化(死信实现延时,有缺陷) 三、Rabbitmq插件实现延迟队列(重点) 四、发布确认 (1)确认回调

    2024年02月15日
    浏览(41)
  • SpringBoot整合实现RabbitMQ

    本文大纲 一.RabbitMQ介绍 二.RabbitMQ的工作原理 2.1 RabbitMQ的基本结构 2.2 组成部分说明 2.3 生产者发送消息流程 2.4 消费者接收消息流程 三.SpringBoot 整合实现RabbitMQ 3.1创建mq-rabbitmq-producer(生产者)发送消息 3.1.1pom.xml中添加相关的依赖 3.1.2 配置application.yml 3.1.3 配置RabbitMQ常量类

    2024年02月17日
    浏览(44)
  • springboot整合rabbitmq死信队列

    什么是死信 需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置: 从上面的逻辑图中,可以发现大致的思路: .1. 消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。 2. 正常队列针对死信信息,需要将数据 重新 发送至死信交换机

    2024年02月11日
    浏览(47)
  • RabbitMQ与SpringBoot整合实践

    作者:禅与计算机程序设计艺术 2020年是一个转折点,现代化的信息社会已经开启了数字化进程,越来越多的人开始接受信息技术作为工作的一部分。相较于传统的技术岗位,人工智能、大数据、云计算领域的软件工程师更加需要具备实际项目应用能力、高超的计算机和通信

    2024年02月09日
    浏览(37)
  • SpringBoot 整合 RabbitMQ demo

    Rabbit Windows安装教程 本文只做Demo案例的分享,具体知识需自行百度 1.application.properties 配置Rabbit的基本信息 2.pom文件 导入两个maven依赖 第一个是SpringBoot集成的rabbit 第二个是web依赖,用来方便发消息 3.Rabbit配置类 这里用的是主题模式,也就是模糊匹配的模式 这里设置的是只

    2024年02月17日
    浏览(43)
  • SpringBoot整合RabbitMQ(最新笔记)

    1.2 创建工程并导入依赖 我们使用的springboot版本为2.5.6,其他都是根据 spring-boot-starter-parent 自动选择版本 引入以下工程即可 spring-boot-starter-test 用于测试 junit 用于单元测试 spring-boot-starter-amqp SpringBoot和RabbitMQ的整合方案 1.2 创建配置文件并配置 SpringBoot配置文件名称为 applica

    2024年02月05日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包