(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制

这篇具有很好参考价值的文章主要介绍了(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ-进阶 死信队列、延迟队列、防丢失机制

死信队列

概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以 将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死 信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制,中间件组件实战应用,# RabbitMq,java-rabbitmq,rabbitmq,java

消息成为死信的情况

  1. 队列消息长度到达限制
  2. 消费者拒签消息,并且不把消息重新放入原队列
  3. 消息到达存活时间未被消费

代码实现

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig2 {
    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";
    // 死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE)
                .durable(true)
                .build();
    }
    // 死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)
                .build();
    }
    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("dead_routing")
                .noargs();
    }
    // 普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE)
                .durable(true)
                .build();
    }
    // 普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
                .ttl(10000) // 消息存活10s
                .maxLength(10) // 队列最大长度为10
                .build();
    }
    // 普通交换机绑定普通队列
    @Bean
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("my_routing")
                .noargs();
    }
}

测试

1、生产者发送消息

@Test
public void testDlx(){
    // 存活时间过期后变成死信
    //       rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    // 超过队列长度后变成死信
    //       for (int i = 0; i < 20; i++) {
    //           rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    //       }
    // 消息拒签但不返回原队列后变成死信
  rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}


2、

@Component
public class DlxConsumer {
    @RabbitListener(queues = "normal_queue")
    public void listenMessage(Message message, Channel channel) throws IOException {
        // 拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
   }
}

延迟队列

延迟队列介绍

什么是延时队列?

延时队列即就是放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费

(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制,中间件组件实战应用,# RabbitMq,java-rabbitmq,rabbitmq,java

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果

(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制,中间件组件实战应用,# RabbitMq,java-rabbitmq,rabbitmq,java

延迟交换机主要帮我们解决什么问题

(1)当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, name必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;

(2)就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的

适用场景

(1)商城订单超时未支付,取消订单

(2)使用权限到期前十分钟提醒用户

(3)收益项目,投入后一段时间后产生收益

延迟队列_死信队列_的实现

1、创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、 lombok依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2、编写配置文件

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 123456
   virtual-host: /
  
# 日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'

3、创建队列和交换机

@Configuration
public class RabbitConfig {
    // 订单交换机和队列
    private final String ORDER_EXCHANGE = "order_exchange";
    private final String ORDER_QUEUE = "order_queue";
    // 过期订单交换机和队列
    private final String EXPIRE_EXCHANGE = "expire_exchange";
    private final String EXPIRE_QUEUE = "expire_queue";
    // 过期订单交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange deadExchange(){
         return ExchangeBuilder
               .topicExchange(EXPIRE_EXCHANGE)
               .durable(true)
               .build();
   }
    // 过期订单队列
    @Bean(EXPIRE_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
               .durable(EXPIRE_QUEUE)
               .build();
   }
    // 将过期订单队列绑定到交换机
    @Bean
    public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("expire_routing")
               .noargs();
   }
    // 订单交换机
    @Bean(ORDER_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
               .topicExchange(ORDER_EXCHANGE)
               .durable(true)
               .build();
   }
    // 订单队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
               .durable(ORDER_QUEUE)
               .ttl(10000) // 存活时间为10s,模拟30min
               .deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
               .deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字
               .build();
   }
    // 将订单队列绑定到交换机
    @Bean
    public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE) Queue queue){
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("order_routing")
               .noargs();
   }
}

4、编写下单的控制器方法,下单后向订单交换机发送消息

@Test
 public String placeOrder(String orderId){
        System.out.println("处理订单数据...");
        // 将订单id发送到订单队列
        rabbitTemplate.convertAndSend("order_exchange", "order_routing", orderId);
        return "下单成功,修改库存";
   }

5、编写监听死信队列的消费者

// 过期订单消费者
@Component
public class ExpireOrderConsumer {
    // 监听队列
    @RabbitListener(queues = "expire_queue")
    public void listenMessage(String orderId){
        System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
   }
}

延迟队列_插件实现

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制,中间件组件实战应用,# RabbitMq,java-rabbitmq,rabbitmq,java

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列
(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制,中间件组件实战应用,# RabbitMq,java-rabbitmq,rabbitmq,java

下载插件

RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列,我们可以从 官网下载到它

https://www.rabbitmq.com/community-plugins.html

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制,中间件组件实战应用,# RabbitMq,java-rabbitmq,rabbitmq,java

选择 .ez 格式的文件下载,下载后放置 RabbitMQ 的安装目录下的 plugins 目录下,如我的路径为

docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez  rabbitmq1:/plugins

docker exec  rabbitmq1  rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RabbitMQ 配置类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Slf4j
public class RabbitConfig3 {

    /**
     * 交换机
     */
    public static final String DELAY_EXCHANGE = "delay_exchange";

    /**
     * 队列
     */
    public static final String DELAY_QUEUE = "delay_queue";

    /**
     * 路由
     */
    public static final String DELAY_KEY = "delay_key";

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }

    /**
     * 直接模式队列1
     */
    @Bean
    public Queue directOneQueue() {
        return new Queue("cundream");
    }
    /**
     * 延时队列交换机
     *
     * @return
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 延时队列
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE, true);
    }

    /**
     * 给延时队列绑定交换机
     *
     * @return
     */
    @Bean
    public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_KEY).noargs();
    }
}



RabbitMQ 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(Object object, long millisecond) {
        this.rabbitTemplate.convertAndSend("delay_exchange",
                "delay_key",
                object.toString(),
                message -> {
                    message.getMessageProperties().setHeader("x-delay", millisecond);
                    return message;
                }
        );

    }
}

RabbitMQ 消费者
import cn.hutool.json.JSONUtil;
import com.github.cundream.springbootbuilding.common.rabbitmq.RabbitConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @className: com.github.cundream.springbootbuilding.common.rabbitmq.consumer-> ReceiveDealyConsumer
 * @description:
 * @author: 李村 
 * @createDate:
 */
@Slf4j
@RabbitListener(queuesToDeclare = @Queue(RabbitConst.DELAY_QUEUE))
@Component
public class ReceiveDealyHandler {
    @RabbitHandler
    public void directHandlerManualAck(Object object, Message message, Channel channel) {
        //  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("直接队列1,手动ACK,接收消息:{}", object.toString());
            // 通知 MQ 消息已被成功消费,可以ACK了
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
                // 处理失败,重新压入MQ
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }
}

测试

通过测试,第一条消息在 5s后接收到,第二条消息在 10s后接收到,说明我们的延时队列已经成功

    @RequestMapping(value = "/delayMessage",method = RequestMethod.GET)
    public void delayMessage() {
        String message1 = "这是第一条消息";
        String message2 = "这是第二条消息";
        rabbitMqService.sendDelayMessage(message1, 5000);
        rabbitMqService.sendDelayMessage(message2, 10000);
    }

RabbitMQ防止消息丢失

消息丢失场景

MQ消息丢失场景主要有三个:

  • 消息生产者,发送消息后,rabbitMq服务器没有收到;导致消息丢失
  • rabbitmq收到消息后,没有持久化保存,导致消息丢失
  • 消费者收到消息后,没来得及处理,消费者宕机,导致消息丢失

生产者发送消息没有发送到rabbit交换机

解决方案:消息异步确认机制(confirm机制)

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: 123456
    virtual-host: /
    publisher-confirms: true # 消息异步确认机制(confirm机制)

开启confirm机制后,在生产者每次发送消息,都会调用回调代码;开发人员,需要写回调函数的逻辑,处理发送失败的消息

@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue
     * @param correlationData 发送的消息的信息(交换机,路由,消息体等)
     * @param ack true成功,false失败
     * @param cause 发生错误的信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    	// 失败,一般解决方案,是将发送失败消息,存入定时任务队列;尝试重新发送消息;再多次失败,
    	// 就不再发送,转为人工处理
        if (!ack) {
            log.error("rabbitmq confirm fail,cause:{}", cause);
            // ...... 失败处理逻辑
        }
    }
}

交换机没有发送到队列

解决方案:Return模式,确保消息从交换机发送到队列。

1、开启return模式

#开启 return 机制
spring:
  rabbitmq:
    publisher-returns: true

2、开发回调函数

@Component
public class Sender implements RabbitTemplate.ReturnCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }
 
    //通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println("消息主体message: " + message);
		System.out.println("消息replyCode: " + replyCode);
		System.out.println("描述: " + replyText);
		System.out.println("消息使用的交换器exchange: " + exchange);
		System.out.println("消息使用的路由键routing: " + routingKey);
	}
}

交换机、队列、消息没有设置持久化

交换机、队列、消息没有持久化,当rabbitmq的服务重启之后,这些信息就会丢失。

交换机持久化
在声明交换机的时候,设置持久化属性

	/**
	 * 构造参数说明:
	 * 参数1:交换机名称
	 * 参数2:durable:true表示持久化,false表示不持久化
	 * 参数3:autoDelete:true自动删除,false不自动删除
	 */
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchangeName", true, false);
    }

队列持久化
在声明队列的时候,设置持久化属性

    public Queue queue() {
    	/**
    	 * @param queueName 队列名称
    	 * @param durable 队列持久化,true持久化,false不持久化
    	 * @param exclusive 是否排他, true不排他,false排他;此处配置一般false
    	 * @param autoDelete 是否自动删除,无生产者,队列自动删除
    	 * @param args 队列参数
    	 */
        return new Queue("queueName", true, false, false, args);
    }

消息持久化

消息的持久化是默认持久的。无需配置

消费者接收到消息没有执行业务逻辑,导致消息丢失

解决方案:手动确认消息机制
配置文件配置

**spring.rabbitmq.listener.simple.acknowledge-mode=manual**
spring:
  rabbitmq:
    host: 127.0.0.1
    #host: 10.106.10.91
    port: 5672
    username: admin
    password: 123456
    virtual-host: pub
    publisher-confirms: true   # 开启发送确认
    publisher-returns: true  # 开启发送失败回退
      #开启ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #采取手动应答
        #concurrency: 1 # 指定最小的消费者数量
        #max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

@Component
public class Consumer {
	@RabbitHandler
	public void consumeMsg(String msg, Channel channel, Message message) throws IOException {
		//拿到消息延迟消费
		try {
			// .... 消费消息业务逻辑

			/**
			 * deliveryTag	消息的随机标签信息
			 * multiple	是否批量;true表示一次性的将小于deliveryTag的值进行ack
			 */
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			
		} catch (InterruptedException e) {
			e.printStackTrace();
			/**
			 * deliveryTag	消息的随机标签信息
			 * multiple	是否批量;true表示一次性的将小于deliveryTag的值进行ack
			 * requeue	被拒绝的消息是否重新入队列
			 */
			channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
		}
	}
}

当业务出现意料之外的一场;消息就会重新回到队列中;会分发到其他正常consumer中进行消费文章来源地址https://www.toymoban.com/news/detail-610858.html

到了这里,关于(五)RabbitMQ-进阶 死信队列、延迟队列、防丢失机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(55)
  • 【RabbitMQ学习日记】——死信队列与延迟队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候 由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死

    2024年02月06日
    浏览(53)
  • RabbitMQ之TTL+死信队列实现延迟队列

    RabbitMQ是一个流行的消息队列系统,它提供了许多有用的功能,其中之一是TTL(Time To Live)和死信队列。这些功能可以用来实现延迟队列,让我们来看看如何使用它们。 首先,什么是TTL?TTL是消息的存活时间,它可以设置为一个特定的时间段。如果消息在这个时间段内没有被

    2024年02月13日
    浏览(45)
  • RabbitMQ养成记 (10.高级特性:死信队列,延迟队列)

    这个概念 在其他MQ产品里面也是有的,只不过在Rabbitmq中稍微特殊一点 什么叫私信队列呢? 就是当消息成为 dead message之后,可以重新发到另外一台交换机,这个交换机就是DLX。 注意这里的有翻译歧义, 这里的DLX 指的是 交换机 ,而不是一个队列。 队列的消息长度 到达限制

    2024年02月05日
    浏览(38)
  • RabbitMQ高级特性2 、TTL、死信队列和延迟队列

    设置 消费者 测试 添加多条消息 拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费 Time To Live(存活时间/过期时间)。 当消息到达存活时间后,还没有被消费,会被自动清除。 RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。 可

    2024年01月16日
    浏览(41)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(73)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(41)
  • rabbitmq基础7——队列和消息过期时间设置、死信队列、延迟队列、优先级队列、回调队列、惰性队列

    这里过一个知识点——过期时间,即对消息或队列设置过期时间(TTL)。一旦消息过期,消费就无法接收到这条消息,这种情况是绝不允许存在的,所以官方就出了一个对策——死信队列,死信队列最初出现的意义就是为了应对消息过期丢失情况的手段之一。 那么过期时间具

    2024年02月03日
    浏览(71)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(73)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包