【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略

这篇具有很好参考价值的文章主要介绍了【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略

说明

最近在研究RabbitMQ如何实现延时队列时发现消息进入死信队列的情况之一就是当消息数量超过队列设置的最大长度时会被丢入死信队列,看到这时我就产生了一个疑问,到底是最后插入的消息还是最早插入的消息会被丢入死信队列呢?遗憾的是看了几篇博客都是模棱两可的答案,还有的说超过数量后该消息会被放入死信队列,看完之后还是对这个问题将信将疑。所以我决定去探究一下正确答案

答案

RabbitMQ官方文档

遇事不决肯定是先看官方文档最靠谱啦,在官网中扒拉了半天终于找到说明这个问题的页面了,就是上面引用的链接,重点如下:
【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略
翻译过来就是:在RabbitMQ中,当消息的数量或大小达到限制后,默认的操作是删除最旧的消息或将最旧的消息放入死信队列,这取决于该队列是否配置了死信队列。 我们可以通过使用overflow配置指定的处理策略,如果overflow被设置为reject-publishreject-publish-dlx,那么会将最新插入的消息丢弃。如果该队列开启了confirm机制,发布者会收到nack的信息,如果一个消息被路由到多个队列,只要其中一个队列拒绝发布者就会收到nack消息,但是没被拒绝的队列可以正确接收到消息。reject-publishreject-publish-dlx的区别是后者还会将拒绝的消息放入死信队列。

验证

下面我们使用demo来验证一下各个策略的现象

默认策略(drop-head)

  1. application.yml配置如下:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: username
    password: password
    virtual-host: /test
    publisher-confirm-type: correlated # 配置启用confirm机制
  1. 使用RabbitMQConfig创建业务队列和对应死信队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.business.exchange";
    public static final String DELAY_QUEUE_NAME = "delay.business.queue";
    public static final String DELAY_QUEUE_ROUTING_KEY = "delay.business.queue.routingKey";
    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
    public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.queue.routingKey";
    
    // 声明延迟队列交换机
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }
    
    // 声明死信队列交换机
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }
    
    // 声明延时队列
    @Bean("delayQueue")
    public Queue delayQueue(){
        HashMap<String, Object> map = new HashMap<>();
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
        // 设置该队列最大消息数
        map.put("x-max-length", 10);
        map.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(map).build();
    }
    
    // 声明死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue(){
        return new Queue(DEAD_LETTER_QUEUE_NAME);
    }
    
    // 声明延时队列的绑定关系
    @Bean
    public Binding delayBinding(@Qualifier("delayExchange") DirectExchange directExchange,
                                @Qualifier("delayQueue") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with(DELAY_QUEUE_ROUTING_KEY);
    }
    
    // 声明死信队列的绑定关系
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterExchange") DirectExchange directExchange,
                                     @Qualifier("deadLetterQueue") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);
    }
}

注意,这里我们并没有设置overflow参数,所以采用的是默认配置
3. 创建消费者监听死信队列

import com.rabbitmq.client.Channel;
import com.whs.edws.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Slf4j
@Component
public class MaxLengthConsumer {

    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
    public void receive(Message message, Channel channel) throws IOException {
        String s = new String(message.getBody());
        log.info("死信队列消费者接收到消息:" + s);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
  1. 创建测试方法发送消息
@Test
    void maxLengthTestPublish(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 相关配置信息
             * @param ack             消息队列是否成功收到消息
             * @param cause           错误原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    logger.info("消息发送成功:" + correlationData.getId());
                } else {
                    logger.error("消息发送失败:" + correlationData.getId());
                    logger.error("错误原因:" + cause);
                }
            }
        });
        for (int i = 0; i < 11; i++) {
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(String.valueOf(i));
            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUTING_KEY, String.valueOf(i), correlationData);
        }
    }
  1. 运行结果
    看上面的代码可知,我们设置了队列大小为10,但是我们向队列发送了11条消息,最后日志打印如下:
    2023-07-18 02:37:52.941 INFO 24308 --- [ntContainer#1-1] com.edws.rabbitmq.MaxLengthConsumer : 死信队列消费者接收到消息:0
    和官方文档说的一样,默认最旧的一条消息被放入死信队列

reject-publish策略

reject-publish策略的验证代码只需在默认策略的基础上加上配置即可,我们在定义队列的时候加上配置

// 指定超过队列长度后的策略
map.put("x-overflow", "reject-publish");

执行方法,打印的结果为:

2023-07-18 02:45:07.242  INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 消息发送失败:10
2023-07-18 02:45:07.242  INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 错误原因:null

通过日志可以看到,最新插入的消息被丢弃了。至于cause为什么是null,我没找到原因,如果了解的朋友可以在评论里讨论一下

reject-publish-dlx策略

reject-publish-dlx策略的代码也是只需要在默认代码中加一行配置即可

// 指定超过队列长度后的策略
map.put("x-overflow", "reject-publish-dlx");

打印结果如下:

2023-07-18 02:49:13.246  INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 消息发送失败:10
2023-07-18 02:49:13.246  INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 错误原因:null
2023-07-18 02:49:13.252  INFO 10488 --- [ntContainer#1-1] com.whs.edws.rabbitmq.MaxLengthConsumer  : 死信队列消费者接收到消息:10

通过日志可以看出,最新的一条消息被拒绝且被放入死信队列文章来源地址https://www.toymoban.com/news/detail-575323.html

到了这里,关于【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 前缀和+单调双队列+贪心:LeetCode2945:找到最大非递减数组的长度

    C++算法:前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 单调双队列 贪心 给你一个下标从 0 开始的整数数组 nums 。 你可以执行任意次操作。每次操作中,你需要选择一个 子数组 ,并将这个子数组用它所包含元素的 和 替换。比方说,给定数组是 [1,3,

    2024年02月03日
    浏览(29)
  • rabbitMq 针对于当前监听的队列,来控制消费者并发数量,不影响其他队列,代码示例

    concurrentConsumers 和 maxConcurrentConsumers 属性的具体含义如下: concurrentConsumers:指定同时运行的消费者数量,默认为1。 maxConcurrentConsumers:指定允许的最大并发消费者数量,默认为1。 因此,在上述示例中,设置了 concurrentConsumers 为 5,maxConcurrentConsumers 为 10,意味着 RabbitMQ 容器

    2024年02月05日
    浏览(46)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)
  • RabbitMq消息模型-队列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 队列消息特点: 消息不会丢失 并且 有先进先出的顺序。 消息接收是有顺序的,不是随机的,仅有一个消费者能拿到数据,而且不同消费者拿不到同一份数据。 基本模型: SimpleQueue 在上图的模型中,有以下几个概念: P:为生产

    2024年02月09日
    浏览(46)
  • 【RabbitMQ】消息队列-RabbitMQ篇章

    RabbitMQ是一个开源的 遵循AMQP协议 实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中 存储消息,转发消息 ,具有 高可用 , 高可扩性 , 易用性 等特征。 1.1、RabbitMQ—使用场景 一般场景 像一般的下订单业务如下图: 将订单信息写入数据库成功后,发

    2024年02月12日
    浏览(46)
  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(77)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(52)
  • RabbitMQ 消息中间件 消息队列

    RabbitMQ 1、RabbitMQ简介 RabbiMQ是⽤Erang开发的,集群⾮常⽅便,因为Erlang天⽣就是⼀⻔分布式语⾔,但其本身并 不⽀持负载均衡。支持高并发,支持可扩展。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 2、RabbitMQ 特点 可

    2024年02月03日
    浏览(63)
  • 3.精通RabbitMQ—消息队列、RabbitMQ

    RabbitMQ面试题 (总结最全面的面试题) 入门RabbitMQ消息队列,看这篇文章就够了 消息队列 是一种基于 队列 ,用于解决 不同进程或应用 之间 通讯 的 消息中间件 。 支持多种 消息传递模式 ,如 队列模型 、 发布/订阅模型 等。 业务解耦 :通过 发布/订阅 模式,减少系统的 耦

    2024年02月15日
    浏览(74)
  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包