RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)

这篇具有很好参考价值的文章主要介绍了RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


SpringAMQP

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

1.SpringBoot 的支持

  • SpringBoot 已经提供了对 AMQP 协议完全支持的 spring-boot-starter-amqp 依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。
https://spring.io/projects/spring-amqp

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

2.RabbitTemplate

  • RabbitTemplate 是 SpringBoot AMQP 提供的快速发 RabbitMQ 消息的模板类,与 RestTemplate 有类似之处,意指方便、简单、快速的发 RabbitMQ 消息。
@Slf4j
@Component
public class ClientReportTopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String ROUTING_KEY = "report";

    public void send(String param) {
        rabbitTemplate.send(TopicConst.CLIENT_REPORT_TOPIC, ROUTING_KEY, new Message(param.getBytes(), new MessageProperties()));
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

send:将消息发送到指定的交换机和路由键中。

convertAndSend:将Java对象转换为消息,然后将其发送到指定的交换机和路由键中。

sendAndReceive:发送一个请求消息并接收一个响应消息。

convertSendAndReceive:将Java对象转换为请求消息,发送请求消息,并接收响应消息。

convertSendAndReceiveAsType:将Java对象转换为请求消息,发送请求消息,并接收响应消息,并将响应消息转换为指定类型的Java对象。

convertSendAndReceiveAsType:将Java对象转换为请求消息,发送请求消息,并接收响应消息,并将响应消息转换为指定类型的Java对象。

sendWithMessagePostProcessor:发送消息,并在发送之前进行处理。

execute:执行Rabbit操作并返回一个结果。

receive:从队列接收一条消息。

receiveAndConvert:从队列接收一条消息,并将其转换为Java对象。

receiveAndReply:从队列接收一条请求消息,并发送一个响应消息。

convertSendAndReceiveAsType:将Java对象转换为请求消息,发送请求消息,并接收响应消息,并将响应消息转换为指定类型的Java对象。

convertSendAndReceiveAsType:将Java对象转换为请求消息,发送请求消息,并接收响应消息,并将响应消息转换为指定类型的Java对象。

convertSendAndReceiveAndReplyHeader:将Java对象转换为请求消息,并发送请求消息。接收到请求消息后,将其转换为响应消息,并设置响应消息的头信息。

convertAndSend:将Java对象转换为消息,并发送消息。

convertAndSend:将Java对象转换为消息,并发送消息。在发送之前,先对消息进行处理。

convertAndSend:将Java对象转换为消息,并发送消息。在发送之前,先对消息进行处理,并指定响应消息的类型。

convertAndSend:将Java对象转换为消息,并发送消息。在发送之前,先对消息进行处理,并指定响应消息的类型和交换机。

send:将消息发送到指定的交换机和路由键中。

send:将消息发送到指定的交换机和路由键中。在发送之前,先对消息进行处理。

send:将消息发送到指定的交换机和路由键中。在发送之前,先对消息进行处理,并指定响应消息的类型。

sendAndReceive:发送一个请求消息并接收一个响应消息。

sendAndReceive:发送一个请求消息并接收一个响应消息。在发送之前,先对消息进行处理。

sendAndReceive:发送一个请求消息并接收一个响应消息。在发送之前,先对消息进行处理,并指定响应消息的类型。

sendAndReceive:发送一个请求消息并接收一个响应消息。在发送之前,先对消息进行处理,并指定响应消息的类型和交换机。

setConnectionFactory:设置RabbitMQ连接工厂。

getConnectionFactory:获取RabbitMQ连接工厂。

setExchange:设置默认的交换机。

getExchange:获取默认的交换机。

setRoutingKey:设置默认的路由键。

getRoutingKey:获取默认的路由键。

setQueue:设置默认的队列。

getQueue:获取默认的队列。

setMandatory:设置消息是否强制路由到队列。

isMandatory:检查消息是否强制路由到队列。

setReplyTimeout:设置接收响应消息的超时时间。

getReplyTimeout:获取接收响应消息的超时时间。

setChannelTransacted:设置通道是否应该在事务中使用。

isChannelTransacted:检查通道是否应该在事务中使用。

setConfirmCallback:设置确认回调。

getConfirmCallback:获取确认回调。

setReturnCallback:设置返回回调。

getReturnCallback:获取返回回调。

setBeforePublishPostProcessor:设置发布之前的后处理器。

getBeforePublishPostProcessor:获取发布之前的后处理器。

setAfterReceivePostProcessor:设置接收后的后处理器。

getAfterReceivePostProcessor:获取接收后的后处理器。

setUsePublisherConnection:设置是否应该使用发布者连接。

isUsePublisherConnection:检查是否应该使用发布者连接。

setApplicationContext:设置应用程序上下文。

3.@RabbitListener(终极监听方案)

使用此方案做监听消息功能,就可以把之前的 SimpleMessageListenerContainer 进行监听的方案舍弃掉了,就是这么的喜新厌旧,不过之前的 SimpleMessageListenerContainer 也不是一无是处,学过之后可以更好的理解内部的一些逻辑。

@RabbitListener 的特点:

  • RabbitListener 是 SpringBoot 架构中监听消息的终极方案。
  • RabbitListener 使用注解声明,对业务代码无侵入。
  • RabbitListener 可以在 SpringBoot 配置文件中进行配置。

@RabbitListener 本身是 Java 中的注解,可以搭配其他注解一起使用:

  • @Exchange:自动声明 Exchange。
  • @Queue:自动声明队列。
  • @QueueBinding:自动声明绑定关系。
package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage1(String msg){
        System.out.println("consume1接收到的消息:"+msg);
    }
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage2(String msg){
        System.out.println("consume2接收到的消息:"+msg);
    }
}

4.RabbitConfig—rabbitmq配置类

声明式实现(推荐)

@Slf4j
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
        return connectionFactory;
    }

    @Bean
    RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    Exchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    Binding binding() {
        // 目的地名称、目的地类型、绑定交换机、绑定 key、参数
        return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
    }

     //死信队列机制  死信队列需要在创建 Queue 时指定对应属性:
     @Bean
    Queue queue() {
          // 配置声明队列时使用的参数
          Map<String, Object> args = new HashMap<>(1);
          // 设置死信队列指向的交换机
          args.put("x-dead-letter-exchange", EXCHANGE_DLX);
         return new Queue(QUEUE_NAME, true, false, false, args);
    }

}

注意,以上配置再启动 SpringBoot 并不会立马创建交换机、队列、绑定,SpringBoot AMQP 有懒加载,需要等到使用 connection 时才会创建。什么是使用 connection 呢?

  • 比如创建 connection
@Bean
ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setUsername("admin");
  connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
  connectionFactory.createConnection();
  return connectionFactory;
}

  • 再比如监听了队列
@RabbitListener(queues = {"test"})
void test() {
  log.info("【测试监听消息】");
}

SpringBoot集成RabbitMQ 案例

配置

导入maven坐标
<!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
yml配置
spring:
  rabbitmq:
    addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
    username: admin
    password: admin
    #开启消息确认模式,新版本已经弃用
    #publisher-confirms: true
    #开启消息送达提示
    publisher-returns: true
    # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
    publisher-confirm-type: correlated
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto #确认模式
        prefetch: 1 #限制每次发送一条数据。
        concurrency: 3 #同一个队列启动几个消费者
        max-concurrency: 3 #启动消费者最大数量
        #重试策略相关配置
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

RabbitMQ 参数配置说明

spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #账号
    password: guest #密码
    virtualHost:    #链接的虚拟主机
    addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
    requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
    publisherConfirms: true  #发布确认机制是否启用
    #确认消息已发送到交换机(Exchange)
    #publisher-confirm-type参数有三个可选值:
    #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
    #CORRELATED:消息从生产者发送到交换机后触发回调方法。
    #NONE(默认):关闭发布确认模式。
    #publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true
    publisherReturns: true #发布返回是否启用
    connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
    ### ssl相关
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路径
      keyStoreType: #key store类型 默认PKCS12
      keyStorePassword: #指定访问key store的密码
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默认JKS
      trustStorePassword: #访问密码
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否开启hostname验证
    ### cache相关
    cache:
      channel: 
        size: #缓存中保持的channel数量
        checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection:
        mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
        size: #缓存的连接数,只有是CONNECTION模式时生效
    ### listener
    listener:
       type: #两种类型,SIMPLE,DIRECT
       ## simple类型
       simple:
         concurrency: #最小消费者数量
         maxConcurrency: #最大的消费者数量
         transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
         missingQueuesFatal: #是否停止容器当容器中的队列不可用
         ## 与direct相同配置部分
         autoStartup: #是否自动启动容器
         acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
         prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
         defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
         idleEventInterval: #container events发布频率,单位ms
         ##重试机制
         retry: 
           stateless: #有无状态
           enabled:  #是否开启
           maxAttempts: #最大重试次数,默认3
           initialInterval: #重试间隔
           multiplier: #对于上一次重试的乘数
           maxInterval: #最大重试时间间隔
       direct:
         consumersPerQueue: #每个队列消费者数量
         missingQueuesFatal:
         #...其余配置看上方公共配置
     ## template相关
     template:
       mandatory: #是否启用强制信息;默认false
       receiveTimeout: #`receive()`接收方法超时时间
       replyTimeout: #`sendAndReceive()`超时时间
       exchange: #默认的交换机
       routingKey: #默认的路由
       defaultReceiveQueue: #默认的接收队列
       ## retry重试相关
       retry: 
         enabled: #是否开启
         maxAttempts: #最大重试次数
         initialInterval: #重试间隔
         multiplier: #失败间隔乘数
         maxInterval: #最大间隔


1.基本消息队列

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

1、创建队列

  • 访问接口:http://localhost:15672,账号密码都为guest

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

2、发布消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads() {
        String queue="MqTest1";
        String message="message1";
        rabbitTemplate.convertAndSend(queue,message);
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

3、接受消息

package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage(String msg){
        System.out.println("接收到的消息:"+msg);
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

2.工作消息队列(Work Queue)

  • 可以提高消息处理速度,避免队列消息堆积

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

1、发布消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads() {
        String queue="MqTest1";
        String message="message1";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(queue,message);
        }
    }
 
}

2、接受消息

package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage1(String msg){
        System.out.println("consume1接收到的消息:"+msg);
    }
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage2(String msg){
        System.out.println("consume2接收到的消息:"+msg);
    }
}

3、控制台输出结果

consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1

4、消息预取问题

  • 但是此时有一个问题就是消息预取,比如队列有10条消息,两个消费者各自直接先预取5个消息,如果一个消费者接受消息的速度慢,一个快,就会导致一个消费者已经完成工作,另一个还在慢慢处理,会造成消息堆积消费者身上,要解决这个问题需要在yml文件配置相关配置
  rabbitmq:
    host: 43.140.244.236
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能取一个,处理完才能取下一个消息

3.发布订阅模式之模式(Fanout)

exchange是交换机,负责消息路由,但不存储消息,路由失败则消息丢失

生产者将消息发送到fanout交换器
  • fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

1、Fanout配置类(@Bean声明)

package com.rabbitmqdemoconsumer.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class FanountConfig {
    //交换机声明
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("FanountExchange");
    }
    //声明队列1
    @Bean
    public Queue Fanount_Qeueue1(){
        return new Queue("Fanount_Qeueue1");
    }
    //声明队列2
    @Bean
    public Queue Fanount_Qeueue2(){
        return new Queue("Fanount_Qeueue2");
    }
    //绑定交换机和队列
    @Bean
    public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

2、发送消息

首先发送10条消息,经过交换机转发到队列
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="FanountExchange";
        String message="message";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

3、接受消息

 //监听交换机Fanount_Qeueue1
    @RabbitListener(queues = "Fanount_Qeueue1")
    public void listenFanountQeueue1(String msg){
        System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
    }
    //监听交换机Fanount_Qeueue2
    @RabbitListener(queues = "Fanount_Qeueue2")
    public void listenFanountQeueue2(String msg){
        System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
    }

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

4.路由模式(Direct)

  • 会将消息根据规则路由到指定的队列
生产者将消息发送到direct交换器

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

1、声明(基于@RabbitListener声明)

package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    /**
     * 绑定交换机和队列,并为key赋值
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "DirectQueue1"),
        exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("listenDirectQueue1接收到的消息:"+msg);
    }
 
 
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "DirectQueue2"),
        exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
        key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("listenDirectQueue2接收到的消息:"+msg);
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

2、发送给blue

发送消息
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

 
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="DirectExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"blue",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

3、发送给red

发送消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="DirectExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"red",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

5.主题模式(Topic)

生产者将消息发送到 topic交换器

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

Queue与Exchange指定BindingKey可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

1、声明

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "TopicQueue1"),
        exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
        key = {"china.#"}
    ))
public void listenTopicQueue1(String msg){
    System.out.println("listenTopicQueue1接收到的消息:"+msg);
}
 
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "TopicQueue2"),
    exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
    key = {"#.news"}
))
public void listenTopicQueue2(String msg){
    System.out.println("listenTopicQueue2接收到的消息:"+msg);
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

2、发送消息(测试1)

package com.rabbitmqdemo;
 
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="TopicExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"china.news",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot

3、发送消息(测试2)

package com.rabbitmqdemo;
 
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="TopicExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"china.weather",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息队列,java-rabbitmq,rabbitmq,spring boot文章来源地址https://www.toymoban.com/news/detail-841962.html

到了这里,关于RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQ-消息队列-RabbitMQ

    MQ(Message Queue) 消息队列 ,是基础数据结构中“ 先进先出 ”的一种 数据结构 。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由

    2024年02月09日
    浏览(52)
  • RabbitMQ入门 消息队列快速入门 SpringAMQP WorkQueue 队列和交换机 Fanout Direct exchange RAbbitMQ单体部署

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年04月08日
    浏览(71)
  • SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)

    SpringAMQP 是基于 RabbitMQ 封装的一套模板,并且还利用 SpringBoot 对其实现了自动装配,使用起来非常方便。 SpringAmqp 的官方地址:https://spring.io/projects/spring-amqp 说明 : 1.Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案,它是一个抽象层,不依赖于特定的 AMQP Broker 实现和客户端

    2024年04月13日
    浏览(40)
  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(54)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(47)
  • RaabitMQ(三) - RabbitMQ队列类型、死信消息与死信队列、懒队列、集群模式、MQ常见消息问题

    这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需

    2024年02月14日
    浏览(203)
  • MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

    书接上文,展示一下五种模型我使用的是spring could 微服务的框架 文章说明:         本文章我会分享总结5种实用的rabbitMQ的实用模型 1、hello world简单模型 2、work queues工作队列 3、Publish/Subscribe发布订阅模型 4、Routing路由模型 5、Topics 主题模型 (赠送) 6、消息转换器 Rabbi

    2024年02月05日
    浏览(56)
  • mq 消息队列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十几年前,淘宝的notify,借鉴ActiveMQ。京东的ActiveMQ集群几百台,后面改成JMQ。 Linkedin的kafka,因为是scala,国内很多人不熟。淘宝的人把kafka用java写了一遍,取名metaq,后来再改名RocketMQ。 总的来说,三大原因,语言、潮流、生态。 MQ这种东西,当你的消息量不大的时候,用啥

    2024年02月12日
    浏览(53)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(103)
  • SpringCloud实用篇4——MQ RabbitMQ SpringAMQP

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包