RabbitMQ高级特性2 、TTL、死信队列和延迟队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ高级特性2 、TTL、死信队列和延迟队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MQ高级特性

1.削峰

设置 消费者

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

测试 添加多条消息

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

TTL

Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以在管理台新建队列、交换机,绑定

1.图形化操作

添加队列

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

添加交换机

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

将交换机和对应的队列进行绑定

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

时间结束 , 消息失效

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

2.代码实现

配置 生产者

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

@Configuration
public class TopicMqTtlConfig {


    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Value("${mq.queue.name1}")
    private String QUEUENAME1;
    @Value("${mq.queue.name2}")
    private String QUEUENAME2;
    // 1
    // . 交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    // 2。 队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue = QueueBuilder.nonDurable(QUEUENAME1)
                .withArgument("x-message-ttl",30000)//过期时间30秒
                .withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃
                .build();
        return queue;
    }
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2)
                .withArgument("x-message-ttl",300000000)//过期时间30秒
                .build();
        return queue2;
    }
    // 3. 交换机和队列进行绑定
    @Bean("binding1")
    public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs();
        return binding1;
    }
    @Bean("binding2")
    public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs();
        return binding2;
    }
}

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

测试

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

添加成功 ttl1只接收10条

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

时间过期

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

消息在什么情况下会成为死信?(面试会问)

1.队列消息长度到最大的限制

最大的长度设置为10当第11条消息进来的时候就会成为死信

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)

设置消费者为手动签收的状态

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定交换机的方式是什么?

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

// 1.  交换机  :正常的交换机   死信交换机

// 2.队列  :正常的  死信

//3.绑定   正常ex - 正常的que

正常的que和死信交换机

死信ex-死信queue

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

2.代码实现

@Configuration
public class TopicMqDeadConfig {
    @Value("${mq1.exchange.name1}")
    private String EXCHANGENAME;
    @Value("${mq1.exchange.name2}")
    private String DEADEXCHANGE;
    @Value("${mq1.queue.name1}")
    private String QUEUENAME1;
    @Value("${mq1.queue.name2}")
    private String QUEUENAME2;
    // 声明正常交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    //  正常队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue = QueueBuilder.nonDurable(QUEUENAME1)
                .withArgument("x-message-ttl",30000)//过期时间30秒
                .withArgument("x-dead-letter-exchange",DEADEXCHANGE)
                .withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定
                //.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃
                .build();
        return queue;
    }
    // 交换机和队列进行绑定
    @Bean("binding1")
    public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();
        return binding1;
    }


    // 声明死信交换机
    @Bean("ex2")
    public Exchange getDeadExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();
        return exchange;
    }
    //死信队列
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2)
                .build();
        return queue2;
    }
    // 死信交换机和死信队列进行绑定
    @Bean("binding2")
    public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();
        return binding2;
    }


}

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

测试

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

如果程序出现错误 拒绝签收

监听正常队列

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

发送消息 启动测试

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

总结:

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:

        1. 队列消息长度到达限制;

        2. 消费者拒接消费消息,并且不重回队列;

        3. 原队列存在消息过期设置,消息到达超时时间未被消费;

 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  • 1. 下单后,30分钟未支付,取消订单,回滚库存
  • 2. 新用户注册成功7天后,发送短信问候。

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

实现方式:

1. 定时器

2. 死信队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

1.配置

添加依赖

  <!--2. rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>




        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <!--nacos 配置中心-->
        <!--配置中心-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>


        <!-- application  bootstrap  -->


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>


        <!-- nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>




        <dependency>
            <groupId>com.example</groupId>
            <artifactId>sys-comm</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

 RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

修改配置

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

2.代码实现

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

创建实体类

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

发送消息 测试

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

过期后放入死信队列

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

添加依赖

 <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.16</version>
        </dependency>

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

将json数据转化为对象

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

获取成功

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

3.连接数据库

创建表

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

创建测试类
@RestController
@RequestMapping("order")
public class OrderController {
    @Value("${mq1.exchange.name1}")
    private String EXCHANGENAME;
    //
    @Resource
    private RabbitTemplate rabbitTemplate;
    @GetMapping
    public Result aaa(TabOrder order){
     //1. 消息 存放到mq里面
        String s = JSONUtil.toJsonStr(order);
        // openfeign  --      数据添加到数据库里面
        rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s);
        return Result.success(s);
    }
}

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

监听normal
import javax.annotation.Resource;
@Component
public class XiaoFeng implements ChannelAwareMessageListener {
    @Resource
    private TabOrderMapper orderMapper;
    @Override
    @RabbitListener(queues = "test_queue_normal")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(2000);// 20s
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        // 将字符串转化为 对象


        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            TabOrder order = JSONUtil.toBean(s, TabOrder.class);
            // 将订单的信息 报讯到数据库里面
            int insert = orderMapper.insert(order);
            channel.basicAck(deliveryTag,true); //
        }catch(Exception e){
            //long deliveryTag, boolean multiple, boolean requeue
            System.out.println("拒绝签收消息");
            channel.basicNack(deliveryTag,true,false);// 死信消息
        }
    }
}

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

监听dead
@Component
public class YanChi implements ChannelAwareMessageListener {


    @Resource
    private TabOrderMapper orderMapper;
    @Override
    @RabbitListener(queues = "test_queue_dead")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(2000);// 20s
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        // 将字符串转化为 对象
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            TabOrder order = JSONUtil.toBean(s, TabOrder.class);
            //  order 的状态
            TabOrder tabOrder = orderMapper.selectById(order.getId());
            if(tabOrder.getStatus()==1){
                // 取消
                tabOrder.setStatus(3);
            }
            orderMapper.updateById(tabOrder);
            channel.basicAck(deliveryTag,true); //
        }catch(Exception e){
            //long deliveryTag, boolean multiple, boolean requeue
            System.out.println("拒绝签收消息");
            channel.basicNack(deliveryTag,true,false);// 死信消息
        }
    }
}

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

测试

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式

成功

RabbitMQ高级特性2 、TTL、死信队列和延迟队列,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-793272.html

到了这里,关于RabbitMQ高级特性2 、TTL、死信队列和延迟队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(43)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(59)
  • RabbitMQ:概念和安装,简单模式,工作,发布确认,交换机,死信队列,延迟队列,发布确认高级,其它知识,集群

    1.1.1.什么是MQ MQ(message queue:消息队列) ,从字面意思上看,本质是个 队列 , FIFO 先入先出 ,只不过队列中存放的 内容是message 而已 ,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服

    2024年01月20日
    浏览(75)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(53)
  • RabbitMQ详解(五):过期时间TTL、死信队列、磁盘监控

    过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。 目前有两种方法可以设置 第一种方法是通过队列属性设置,队列中所有消息都有相同的TTL。 第二种方法是对消息进行单独设置,每条消息TTL可以不同。 当同时

    2024年02月04日
    浏览(43)
  • 【RabbitMQ学习日记】——死信队列与延迟队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候 由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死

    2024年02月06日
    浏览(56)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(60)
  • (五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 概念 在MQ中,当消息成为死信(Dead message)后,消息中间件可以 将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死 信交换机(Dead Letter Exchange,简称DLX)。死信交

    2024年02月15日
    浏览(54)
  • 【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?

    * entire channel rather than each consumer * @throws java.io.IOException if an error is encountered */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; prefetchSize:0,单条消息大小限制,0代表不限制 prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个

    2024年04月27日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包