学习RabbitMQ高级特性

这篇具有很好参考价值的文章主要介绍了学习RabbitMQ高级特性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目标:

了解熟悉RabbitMQ的高级特性


学习步骤:

高级特性主要分为以下几点, 官网介绍

1、消息可靠性投递 【confirm 确认模式、return 退回模式】
2、Consumer ACK 【acknowledge】
3、消费端限流 【prefetch】
4、TTL过期时间 【time to live】
5、死信队列 【Dead Letter Exchange】
6、延迟队列 【rabbitmq-delayed-message-exchange】
7、优先级队列 【x-max-priority】

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq


前戏:项目搭建

1、创建两个module,一个为生产者,一个为消费者

分别添加如下依赖【或者将依赖放置在父工程下,两个module作为子工程引用即可】

 <dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
     </dependency>
 </dependencies>

2、 配置RabbitMQ 的基本信息 [application.yml]

spring:
  rabbitmq:
    host: 服务器IP 
    port: 5672 # 端口默认为 5672
    username: guest # 默认账号有guest 密码一致
    password: guest
    virtual-host: /

3、编写配置类RabbitMQConfig,注册队列、交换机、以及绑定关系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "csnz_queue";
    public static final String EXCHANGE_NAME = "csnz_exchange";

    // 1、注册队列
    @Bean("CSNZQueue")
    public Queue getQueue(){
    	// 使用QueueBuilder构建一个队列,设置队列持久化,以及自动删除。
        return QueueBuilder.durable(QUEUE_NAME).autoDelete().build();
    }
    // 2、注册交换机
    @Bean("CSNZExchange")
    public Exchange getExchange(){
    	// 使用ExchangeBuilder构建一个交换机(类型可选,此处为通配符交换机),设置持久化和自动删除
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    // 3、绑定队列和交换机
    @Bean("CSNZBind")
    public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
    	// 使用BindingBuilder 将刚刚声明的队列和交换机绑定并设置绑定的路由key
        return BindingBuilder.bind(queue).to(exchange).with("csnz.#").noargs();
    }
}

一、消息可靠性投递

RabbitMQ提供了两种模式来控制消息的投递(生产者发送的)可靠性

  • confirm 确认模式
  • return 退回模式
    rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

因为消息投递过程是从 生产者Broker[exchange -> queue] 再到 消费者

两种模式过程:
  • 1:message从producer到exchange成功时会返回一个 confirmCallback
  • 2:message从exchange到queue失败时会返回一个 returnCallback

只要利用这俩个callback就可以控制消息的 可靠性投递了

demo演示

确认模式:

1、在配置中开启 publisher-confirms 为 true
2、在rabbitTemplate定义 confirmCallBack 回调函数

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

	/*
        确认模式:
        1、在配置中开启 publisher-confirms为true
        2、在rabbitTemplate定义confirmCallBack回调函数
     */
    @Test
    public void testConfirm(){
        // 定义confirmCallBack回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            /*
                CorrelationData:相关的配置信息【在convertAndSend重载方法中有包含此信息】
                ack;exchange交换机,是否成功收到了信息。
                cause:失败原因。如果成功接收则此值为null
             */
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    System.out.println("消息成功发送");
                }else{
                    System.out.println("发送失败原因:" + cause);
                    // 重新发起或其他操作
                }

            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testConfirm","测试确认回调模式");
    }

此时,如果发送消息时指定的交换机和路由键都是正确的,即代码块第27行参数正确,则第17行的 ack值为true,执行 消息成功发送到交换机后的逻辑代码
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
这里发送失败就是因为我们把交换机的名称写错了,换成正确的交换机名称就好
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

回退模式:

当消息发送给Exchange后,Exchange路由到queue失败时才会执行 ReturnCallBack

1、在配置中开启 publisher-returns 为 true
2、设置ReturnCallBack
3、设置Exchange处理消息的模式:
  • 如果消息没路由到queue
    • 1、丢弃消息(默认)即 rabbitTemplate.setMandatory(false);
    • 2、 返回给消息发送方 ReturnCallBack 即 rabbitTemplate.setMandatory(true);

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

	@Test
    public void testReturn(){
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        // 设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            /*
                Message:消息对象
                replyCode:错误码
                replyText:错误信息
                exchange:交换机
                routingKey:路由键
             */
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(message);
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"error.testReturn","测试回退模式");
    }

此时,如果发送消息时指定的交换机和路由键发生找不到的情况,即代码块第20行参数错误
情况一:没写第四行设置交换机处理失败消息的模式为true,则不会执行第15行的代码,因为消息失败默认是丢弃模式

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
因为默认此值就是false
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
情况二:设置交换机处理失败消息的模式为true,则会执行第15行的代码块,消息发送失败时,消息会通过回调返回,此时就可以查看消息发送失败的具体原因
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

二、Consumer Ack

Ack为Acknowledge,顾名思义,指的是消费者收到消息后的确认模式。

分为三种模式
  • 自动确认:acknowledge=“none”(默认)
    • 当消息一旦被consumer接收了,则自动确认收到,并移除消息缓存中的信息
  • 手动确认:acknowledge=“manual”
    • 手动ACK
    • 手动NACK
  • 根据异常情况判断是否确认:acknowledge=“auto”
实际业务情况:

一般不会使用 自动确认模式,因为收到消息后,很可能在进行业务处理时出现异常,造成数据丢失。真就啪一下没了。
一般都是使用 手动确认模式

  • 即在业务处理成功后,调用 channel.basicAck() 进行手动确认,会发送给 broker 一个应答,代表消息处理成功
  • 如果在进行业务处理时发生异常,则调用 channel.basicNack() 方法【如果设置了重回队列,broker 就会将没有成功处理的消息重新发送。否则将该消息从队列中剔除】。

demo演示

自动确认模式:

1、定义一个监听器:AckListener 实现 MessageListener 接口
2、在onMessage方法上绑定要监听的队列
@Component
public class AckListener implements MessageListener { 
    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message) throws Exception {
        System.out.println(new String(message.getBody()));
    }
}

测试
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

手动确认模式:

1、设置手动接收:acknowledge-mode: manual
2、定义一个监听器:AckListener 实现 ChannelAwareMessageListener 接口 :(因为此接口才有返回channel参数)
3、在onMessage方法上绑定要监听的队列
4、消息成功处理:调用 channel.basicAck() 接收
5、消息处理失败:调用channel.basicNack()拒绝接收,让broker重新发送给consumer

application.yml配置文件

# 配置RabbitMQ 的基本信息 IP 端口 username pass
spring:
  rabbitmq:
    host: 服务器IP
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual

监听器:AckListener

@Component
public class AckListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener

    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000); // 模拟业务时间
        // 传递标签:该字段为MQ server 用于消息确认的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1、接收转换消息
            System.out.println(new String(message.getBody()));
            // 2、处理业务逻辑
            Thread.sleep(1000); // 模拟业务时间
            // 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
            // ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
        }
    }
}
测试正常情况下的手动接收代码:

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

测试异常情况下的手动接收代码(在处理业务逻辑时加上错误即可):可以看见消息一直被重新入队进行消费

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

三、消费端限流

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

限流机制
  • 设置手动接收:acknowledge-mode: manual
  • 设置消费端每次消费消息的条数
    • prefetch = 1
    • 表示消费端每次从MQ拉取一条消息来消费,直至手动确认消费完毕后,才会继续拉取下一条数据
  • 监听器类实现 ChannelAwareMessageListener 接口
    • 消息成功处理:调用 channel.basicAck() 接收
    • 处理失败:调用channel.basicNack()拒绝接收,让broker重新发送给consumer

demo演示

监听器类LimitListener

@Component
public class LimitListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener

    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000); // 模拟业务时间
        // 传递标签:该字段为MQ server 用于消息确认的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1、接收转换消息
            System.out.println(new String(message.getBody()));
            // 2、处理业务逻辑
            Thread.sleep(1000); // 模拟业务时间
//            int i = 1/0;
            // 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
            // ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

消费端:application.yml配置文件

# 配置RabbitMQ 的基本信息 IP 端口 username pass
spring:
  rabbitmq:
    host: 服务器IP
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        prefetch: 1

生产者测试代码

循环发送数据
@Test
    public void testSend(){
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        // 设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            /*
                Message:消息对象
                replyCode:错误码
                replyText:错误信息
                exchange:交换机
                routingKey:路由键
             */
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(message);
            }
        });
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testSend","测试限流-手动接收:"+i);
        }
    }

消息发送完毕后,观察MQ工作台

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

消费者测试代码

    @Test
    public void testLimitAck(){
        System.out.println("执行 限流Ack模式");
        while(true){

        }
    }

观察消费端控制台打印

可以看到消息是一条一条消费的(设置了业务时间为2秒),这就是所谓的限流消费。

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

四、TTL

TTL 称为 time to live,也就是存活时间,也称过期时间

它的用处是当消息到达存活时间后,如果还没有被消费掉,则会自动清除。此用途也经常被用来做订单的延时付款。

  • 可以对消息设置过期时间使用参数:expiration,单位:毫秒,且当该消息在队列头部时,才会单独判断这一消息是否过期
  • 可以对整个队列所有消息设置过期时间时间一到,队列内全部消息清空使用参数:x-message-ttl,单位:毫秒)
  • 如果两者都进行了设置,则以时间短的为主。(因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的)

且RabbitMQ不保证消息会在精确的TTL时间后立即被删除。这是因为RabbitMQ使用一种基于时间戳的方式来检查消息的过期时间,并且该方式是有一定的误差的

问题一:如果一个rabbitmq队列中同时设置了A消息过期时间,以及队列总体消息过期时间,且A消息设置的过期时间比较短,那么是A先过期还是消息总体一起过期,A消息的过期时间是否会被设置的总体过期时间所覆盖

答:
A 消息会先过期,而不是队列中所有消息一起过期。因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的。当 A 消息过期时,它会被从队列中删除,而不受队列总体消息过期时间的影响。队列总体消息过期时间只会影响那些没有设置过期时间的消息。因此,A 消息的过期时间不会被设置的总体过期时间所覆盖。

问题二:如果一个rabbitmq队列中同时设置了A消息过期时间,以及队列总体消息过期时间,且A消息设置的过期时间比较长,那么A会不会比队列中的其他消息后过期

答:
A 消息会比队列中的其他消息后过期。因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的。当队列中的消息的过期时间早于 A 消息的过期时间时,这些消息会先被删除,而 A 消息会继续存在于队列中,直到其过期时间到达后才会被删除。因此,A 消息会比队列中的其他消息后过期。

问题三:rabbitmq设置队列过期时间 和 设置队列消息过期时间的区别

答:
设置队列过期时间,是指在队列空闲一段时间之后(即没有消费者消费该队列,也没有新消息进入该队列),队列会自动被删除。这个过期时间是应用于整个队列的,而不是具体某一条消息。

设置队列消息过期时间,是指在每一条消息入队时,设置消息的过期时间,当消息在队列中等待时间超过其过期时间时,该消息会被自动删除。这个过期时间是应用于具体某一条消息的,而不是整个队列。

设置队列消息过期使用x-message-ttl参数,而设置队列过期使用x-expires参数
在实际应用中,根据不同的需求,我们可以选择设置队列过期时间或设置队列消息过期时间。如果我们希望在一段时间内没有消费者消费该队列时,自动删除该队列,那么可以设置队列过期时间。如果我们希望在一条消息在队列中存活的时间超过一定时间后自动被删除,那么可以设置队列消息过期时间。

demo演示

1、单独对一个消息设置过期时间,用到MessagePostProcessor后置处理器

    @Test
    public void testOneTtl(){
        // 消息后置处理器,可以设置一些参数
        MessagePostProcessor processor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置message的信息
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testTTL","测试TTL过期:",processor);
    }

返回观察rabbitMQ控制台,是已经进了1条数据了

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

让我们等他个5秒,看他拜拜了没

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

2、对整个队列所有消息设置过期时间, 首先需要修改我们之前声明队列的地方(生产者),给它加点参数

在这里新增了一个args参数,作用是声明队列消息TTL以及过期时间

    // 1、注册队列
    @Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }

其他的就不需要修改了,直接写测试

    @Test
    public void testTtl(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testTTL","测试TTL过期:"+i);
        }
    }

返回观察rabbitMQ控制台,是已经进了10条数据了

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

让我们等他个5秒,看他拜拜了没

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

官网还告诉我们:The original expiry time of a message is preserved if it is requeued (for example due to the use of an AMQP method that features a requeue parameter, or due to a channel closure).

我知道你们英语四级没过🤭,把翻译也丢过来了:

如果消息被重新排队(例如,由于使用了具有requeue参数的AMQP方法,或者由于通道关闭),则保留消息的原始到期时间。

也就是说如果 rabbitMQ设置A消息过期时间为10秒 此时5秒后A消息因为某种原因被重新排队 那么A消息的剩余过期时间会被重置为10秒,而不是5秒,因为消息过期时间是从消息第一次被发送到队列开始计算的,而不是从消息第一次被消费开始计算的。

五、死信队列(弥补RabbitMQ3.0以前支持的immediate参数的功能)

概述:在所有MQ产品里,此队列都叫死信队列,在RabbitMQ中也不例外,但是它又有点特殊,因为只有RabbitMQ才有交换机的概念,所有在RabbitMQ中又称死信队列为 DLX(Dead Letter Exchange 死信交换机),

当消息称为死信息后,可以被重新发送到另一个交换机,此时另一个交换机就称之为死信交换机(DLX)。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

消息变成死信的三种情况:

  • 1、队列消息长度到达上限,导致消息被丢弃
  • 2、消费者拒绝接收消息,并且不让消息重新入队
  • 3、消息设置的TTL已经到达 超时时间 而仍未被消费

来自官方提示:Note that expiration of a queue will not dead letter the messages in it.
请注意,队列到期不会使其中的消息成为死信。(是队列到期,而不是队列消息到期)

如何将队列绑定至 死信交换机?

To set the dead letter exchange for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
要为队列设置死信交换,请在声明队列时指定可选的x-dead-letter-exchange参数。该值必须是同一虚拟主机中的exchange名称
You may also specify a routing key to be used when dead-lettering messages. If this is not set, the message’s own routing keys will be used.
您还可以指定在死信消息时使用的路由关键字。如果没有设置,将使用消息自己的路由关键字 args.put(“x-dead-letter-routing-key”, “some-routing-key”);

队列设置参数:

  • 1、x-dead-letter-exchange
  • 2、x-dead-letter-routing-key
注意:只有当原来的队列绑定了死信交换机后,原队列发生消息变成死信消息,此消息才会被死信交换机重新路由到死信队列

demo演示

1、再创建一套DLX队列和交换机

	@Bean("DLXQueue")
    public Queue DLXQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    @Bean
    public Binding bindDLX(){
        return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
    }

2、原有的队列添加参数,让他绑定DLX交换机和DLX路由键

    @Bean("CSNZQueue")
    public Queue getQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }

3、发送一条没设置过期时间的信息和一条设置10秒过期时间的信息

    @Test
    public void testDLX(){
        // 消息后置处理器,可以设置一些参数
        MessagePostProcessor processor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置message的信息
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testDLX","测试DLX",  processor);
    }

4、观察RabbitMQ控制台情况

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

发现一开始 原队列中有两条信息,过了十秒后其中设置了过期时间的那条信息转移到了DLX队列,所以最后是各自一条信息

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

5、测试队列设置整体消息过期时间,是否会发生信息转移情况

队列设置参数:

  • x-message-ttl
    @Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }
观察控制台,发现数据照样转移

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

6、再试试 队列消息长度到达上限,导致消息被丢弃变成死信的情况

队列设置参数:

  • x-max-length
  @Bean("CSNZQueue")
  public Queue getQueue(){
      // 设置过期时间参数
      HashMap<String, Object> args = new HashMap<>();
      args.put("x-max-length",5);
      args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
      args.put("x-dead-letter-routing-key","DLX.#");
      return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
  }
直接测试10条数据,只有5条在原队列,另外5条去了死信队列

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

7、测试 设置队列到期时间,是不是跟官网讲的一样 => 队列到期不会使其中的消息成为死信

队列设置参数:

  • x-expires
修改原队列,设置队列的过期时间,我这设置队列的过期时间为20秒!以及配置死信队列,让原队列绑定死信交换机
	@Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-expires",20000);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }
	// 2、注册交换机
    @Bean("CSNZExchange")
    public Exchange getExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
	//  return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    // 3、绑定队列和交换机
    @Bean("CSNZBind")
    public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("Delay.#").noargs();
    }

    /*
        以下声明 死信交换机
    */
    @Bean("DLXQueue")
    public Queue DLXQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    @Bean
    public Binding bindDLX(){
        return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
    }
编写一个测试方法 => 测试队列过期是否其中的信息会成为死信
    @Test
    public void testDelayMessageWithExpires(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"Delay.testDelay","测试队列过期是否其中的信息会成为死信",message -> {
            return message;
        });
    }
运行测试方法,结果如下,原队列中有一条我们刚刚录入的信息

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

等待二十秒,看是否此信息会从原队列 去往死信队列

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

发现原队列确实过期自动删除了,但是原队列中的信息并没有去往死信队列,证明官网没骗人😁
队列到期不会使其中的消息成为死信。(是队列到期,而不是队列消息到期)

六、延迟队列

所谓延迟队列,就是消息进入队列之后不会立即被消费掉,而是等到指定时间后,才会被消费

相关场景:某多多下单后,30分钟内客户未支付,则自动取消此订单,库存回滚。

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

但是RabbitMQ中并没有提供延迟队列这一功能,有两种方式可以实现:

  • 1、靠 TTL + 死信队列 组合实现延迟队列的效果。

    • 缺点:两个交换机、两个队列,且RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,但是如果第一个消息设置的过期时间很长,而后续消息设置的过期时间很短,则会导致后续的消息一直不会被过期处理(不会按时过期)
      rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
  • 2、使用 rabbitmq-delayed-message-exchange 插件来实现(RabbitMQ 3.5.7及以上的版本、Erlang/OPT 18.0及以上)

    • 优点: 只需要一个交换机、一个队列,且不会出现只判断第一个消息的情况,会根据过期时间优先处理。
      rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

demo演示

1、靠 TTL + 死信队列 组合实现延迟队列的效果

监听器监听死信队列
@Component
public class DelayListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener

    @Override
    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        // 传递标签:该字段为MQ server 用于消息确认的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1、接收转换消息
            System.out.println(new String(message.getBody()));
            // 2、处理业务逻辑
            Thread.sleep(1000); // 模拟业务时间
            // 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
            // ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
        }
    }
}
注册正常队列、死信队列,并让正常队列绑定死信交换机,且设置正常队列的过期时间为10秒
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "csnz_queue";
    public static final String EXCHANGE_NAME = "csnz_exchange";
    public static final String DLX_QUEUE_NAME = "DLX_QUEUE";
    public static final String DLX_EXCHANGE_NAME = "DLX_EXCHANGE";
    // 1、注册队列
    @Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",10000);
        args.put("x-max-length",5);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }
    // 2、注册交换机
    @Bean("CSNZExchange")
    public Exchange getExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    // 3、绑定队列和交换机
    @Bean("CSNZBind")
    public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("csnz.#").noargs();
    }

    /*
        以下声明 死信交换机
     */
    @Bean("DLXQueue")
    public Queue DLXQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    @Bean
    public Binding bindDLX(){
        return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
    }
}
编写测试类

生产者端:

    @Test
    public void testDelay() throws InterruptedException {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testDelay","测试延迟队列");
        for (int i = 0; i < 15; i++) {
            System.out.println(i);
            Thread.sleep(1000);
        }
    }

消费者端:

    @Test
    public void testDelay(){
        System.out.println("Delay模式");
        while(true){

        }
    }
运行即可发现,发送消息过了10秒后,消息会到死信队列中,此时再在死信队列中执行逻辑代码即可实现延迟队列功能。

2、使用 rabbitmq-delayed-message-exchange 插件来实现(RabbitMQ 3.5.7及以上的版本、Erlang/OPT 18.0及以上)

RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送

插件官网下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases(记得适配版本号)
下载完成后将其解压在plugins文件夹下
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

运行cmd切换到rabbitMQ的sbin目录下执行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
重启一下rabbitMQ服务
使用管理员运行CMD

rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq
去RabbitMQ的控制台看看,插件有没有加载成功
rabbittemplate.setmandatory,# RabbitMQ,java-rabbitmq,rabbitmq

注册队列,交换机

    @Bean("DelayQueue")
    public Queue DelayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DelayExchange")
    public CustomExchange DelayExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME," x-delayed-message",true,true,args);
    }
    @Bean
    public Binding bindDelay(){
        return BindingBuilder.bind(DelayQueue()).to(DelayExchange()).with("Delay.#").noargs();
    }

生产者代码

    @Test
    public void testDelayMessage(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,"Delay.testDelay","测试延迟插件队列",message -> {
            // setHeader 为延时时间 不填及为及时发送
            message.getMessageProperties().setDelay(5000);
            return message;
        });
    }

最终实现的效果就是发送的消息得等到5秒后才进入延迟队列,此时一步到位
此时需要注意的是,延迟队列插件的实现方式是通过在消息发布时设置消息的过期时间来实现的,因此在发送消息时,其实是MQ自动将消息的过期时间设置为当前时间加上延迟时间。

七、优先级队列(3.5.0以上版本)

人如其名,优先级队列即优先级比较高,优先被消费

一般通过x-max-priority 参数设置优先级队列的最大值

官方推荐参数设置:支持有限数量的优先级:255。建议使用1到10之间的值,数字越大表示优先级越高,没有设置优先级的消息被视为优先级为0

队列需要设置为优先级队列,消息需要设置优先级(在MQ出现消息堆积情况下、及消费速度小于生产速度,优先级才有意义文章来源地址https://www.toymoban.com/news/detail-761629.html

交换机设置最大优先级参数x-max-priority
    @Bean("DelayExchange")
    public Exchange DelayExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","topic");
        args.put("x-max-priority", 10);
        return new CustomExchange(DELAY_EXCHANGE_NAME," x-delayed-message",true,true,args);
    }
测试发送的消息设置优先级setPriority 级别 ,这里可以去掉设置优先级,然后多发几条,最后发现有设置优先级的消息最先被消费
    @Test
    public void testFirstMessage(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,"Delay.testFirst","测试延迟插件队列",message -> {
            // setPriority 为设置此条数据的优先级
            message.getMessageProperties().setPriority(5);
            return message;
        });
    }

到了这里,关于学习RabbitMQ高级特性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ的高级特性及其特点

    1、应用解耦 提高系统容错性和可维护性 在订单系统中,可以通过远程调用直接调用库存系统,支付系统,物流系统。 但是这三个系统耦合度太高了,因为订单系统下完订单首先去库存系统将库存-1,然后将返回值返回给订单系统,然后通过订单系统的返回结果来在支付系统

    2024年02月08日
    浏览(35)
  • RabbitMQ——高级特性(SpringBoot实现)

    本篇文章的内容与我之前如下这篇文章一样,只是使用技术不同,本篇文章使用SpringBoot实现RabbitMQ的高级特性! RabbitMQ——高级特性_小曹爱编程!的博客-CSDN博客 RabbitMQ——高级特性:1、RabbitMQ高级特性;2、RabbitMQ应用问题;3、RabbitMQ集群搭建 https://blog.csdn.net/weixin_62993347/

    2023年04月21日
    浏览(30)
  • rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

    消息何去何从 mandatory和immediate是channel.basicPublish方法的两个参数,都有消息传递过程中不可达目的地时将消息返回给生产者的功能。 mandatory参数 true:交换器无法根据自身的类型 和路由键找到符合条件的队列,rabbitmq调用Basic.Return命令将消息返回给生产者 生产者调用channel.

    2024年02月10日
    浏览(46)
  • 4.RabbitMQ高级特性 幂等 可靠消息 等等

    保障消息的成功发出 保障MQ节点的成功接收 发送端收到MQ节点(Broker)确认应答 完善的消息进行补偿机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。 生产者进行接收应答,用来确定这条消息是否正常的发送到了Broker,这种方式也是

    2024年02月11日
    浏览(45)
  • RabbitMQ养成记 (10.高级特性:死信队列,延迟队列)

    这个概念 在其他MQ产品里面也是有的,只不过在Rabbitmq中稍微特殊一点 什么叫私信队列呢? 就是当消息成为 dead message之后,可以重新发到另外一台交换机,这个交换机就是DLX。 注意这里的有翻译歧义, 这里的DLX 指的是 交换机 ,而不是一个队列。 队列的消息长度 到达限制

    2024年02月05日
    浏览(41)
  • RabbitMQ高级特性2 、TTL、死信队列和延迟队列

    设置 消费者 测试 添加多条消息 拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费 Time To Live(存活时间/过期时间)。 当消息到达存活时间后,还没有被消费,会被自动清除。 RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。 可

    2024年01月16日
    浏览(43)
  • rabbitmq | rabbitTemplate的convertAndSend部分源码解析

    在RabbitMQ中,事务是一种确保消息发送的可靠性的机制。Spring AMQP提供了对RabbitMQ事务的抽象,而 RabbitTemplate 作为Spring AMQP的核心组件,提供了许多简化消息发送的方法。在这篇博客中,我们将深入探讨RabbitMQ事务机制的源码实现,以及Spring封装的 RabbitTemplate 的使用。 RabbitMQ的

    2024年01月19日
    浏览(31)
  • Springboot实战16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

    15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。 今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。 AMQP 规范与 RabbitM

    2024年02月02日
    浏览(50)
  • RabbitMQ高级特性解析:消息投递的可靠性保证与消费者ACK机制探究

    学习RabbitMQ高级特性,涵盖消息的持久化、确认模式、退回模式以及消费者ACK机制等方面,助您构建高可靠性的消息队列系统。

    2024年01月16日
    浏览(66)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(103)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包