(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

这篇具有很好参考价值的文章主要介绍了(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

消费端限流

(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列,中间件组件实战应用,# RabbitMq,rabbitmq,分布式,中间件

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

1、生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

2、消费端配置限流机制

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
       prefetch: 5

3、消费者监听队列

@Component
public class QosConsumer{
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.获取消息
        System.out.println(new String(message.getBody()));
        // 2.模拟业务处理
        Thread.sleep(3000);
        // 3.签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
   }
}

利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多

1、生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

端配置不公平分发

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
       prefetch: 1

编写两个消费者

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
     {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
     }
    
    // 消费者2
    @RabbitListener(queues = "my_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception
     {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1、在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {
    private final String EXCHANGE_NAME="my_topic_exchange2";
    private final String QUEUE_NAME="my_queue2";
    // 1.创建交换机
    @Bean("bootExchange2")
    public Exchange getExchange2(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean("bootQueue2")
    public Queue getMessageQueue2(){
        return QueueBuilder
               .durable(QUEUE_NAME)
               .ttl(10000) //队列的每条消息存活10s
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、生产者批量生产消息,测试存活时间

@Test
public void testSendBatch2() throws InterruptedException {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
       rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
   }
}

设置单条消息存活时间

@Test
public void testSendMessage() {
    //设置消息属性
    MessageProperties messageProperties = new MessageProperties();
    //设置存活时间
    messageProperties.setExpiration("10000");
    // 创建消息对象
    Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
    // 发送消息
    rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration(“10000”);
// 3.创建消息对象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

1、创建队列和交换机

@Configuration
public class RabbitConfig3 {
    private final String EXCHANGE_NAME="priority_exchange";
    private final String QUEUE_NAME="priority_queue";
    // 1.创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean(QUEUE_NAME)
    public Queue priorityQueue(){
        return QueueBuilder
               .durable(QUEUE_NAME)
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
               .maxPriority(10)
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、编写生产者

@Test
public void testPriority() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // i为5时消息的优先级较高
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setPriority(9);
            Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
       } else {
           rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
       }
   }
}

3、编写消费者文章来源地址https://www.toymoban.com/news/detail-610949.html

@Component
public class PriorityConsumer {
    @RabbitListener(queues = "priority_queue")
    public void listenMessage(Message message, Channel channel) throws Exception
     {
        //获取消息
        System.out.println(new String(message.getBody()));
        //手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

到了这里,关于(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 高级篇-rabbitmq的高级特性

         启动MQ 创建Queues:  两种Callback: 1.ReturnCallback:全局callback   2.ComfirmCallback: 发送信息时候设置    执行成功:  监控页面: 模拟失败:  1.投递到交互机失败 2.投递到交换机了,但是没有进入队列     注意:   演示数据是否默认持久化:       重启mq:  1. 交互机、

    2024年02月09日
    浏览(28)
  • RabbitMQ(四):RabbitMQ高级特性

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息可靠性问题:如何确保发送的消息至少被消费—次 延迟消息问题:如何实现消息的延迟投递 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题 高可用问题:如何避免单点的MQ故障而导致的不可用问题

    2024年01月19日
    浏览(35)
  • RabbitMQ之高级特性

    提示:以下是本篇文章正文内容,RabbitMQ 系列学习将会持续更新 官网 :https://www.rabbitmq.com RabbitMQ 消息确定主要分为两部分: 第一种是 消息发送确认 。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。 确认发送的第一步是确认是

    2023年04月10日
    浏览(25)
  • 学习RabbitMQ高级特性

    了解熟悉RabbitMQ的高级特性 1、消息可靠性投递 【confirm 确认模式、return 退回模式】 2、Consumer ACK 【acknowledge】 3、消费端限流 【prefetch】 4、TTL过期时间 【time to live】 5、死信队列 【Dead Letter Exchange】 6、延迟队列 【rabbitmq-delayed-message-exchange】 7、优先级队列 【x-max-priority】

    2024年02月04日
    浏览(33)
  • RabbitMQ消息队列高级特性

    在线上生产环境中,RabbitMQ可能会产生消息丢失或者是投递失败的一个场景,RabbitMQ为了避免这种场景的发生,提供了两种方式来控制消息传递的可靠性。 Confirm确认模式 消息从生产者到MQ的Exchange过程中,如果消息成功到达,则会返回一个ConfirmCallback的确认函数。 Return退回模

    2024年02月12日
    浏览(29)
  • RabbitMQ的高级特性及其特点

    1、应用解耦 提高系统容错性和可维护性 在订单系统中,可以通过远程调用直接调用库存系统,支付系统,物流系统。 但是这三个系统耦合度太高了,因为订单系统下完订单首先去库存系统将库存-1,然后将返回值返回给订单系统,然后通过订单系统的返回结果来在支付系统

    2024年02月08日
    浏览(23)
  • rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

    消息何去何从 mandatory和immediate是channel.basicPublish方法的两个参数,都有消息传递过程中不可达目的地时将消息返回给生产者的功能。 mandatory参数 true:交换器无法根据自身的类型 和路由键找到符合条件的队列,rabbitmq调用Basic.Return命令将消息返回给生产者 生产者调用channel.

    2024年02月10日
    浏览(29)
  • 4.RabbitMQ高级特性 幂等 可靠消息 等等

    保障消息的成功发出 保障MQ节点的成功接收 发送端收到MQ节点(Broker)确认应答 完善的消息进行补偿机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。 生产者进行接收应答,用来确定这条消息是否正常的发送到了Broker,这种方式也是

    2024年02月11日
    浏览(35)
  • RabbitMQ养成记 (10.高级特性:死信队列,延迟队列)

    这个概念 在其他MQ产品里面也是有的,只不过在Rabbitmq中稍微特殊一点 什么叫私信队列呢? 就是当消息成为 dead message之后,可以重新发到另外一台交换机,这个交换机就是DLX。 注意这里的有翻译歧义, 这里的DLX 指的是 交换机 ,而不是一个队列。 队列的消息长度 到达限制

    2024年02月05日
    浏览(29)
  • RabbitMQ高级特性2 、TTL、死信队列和延迟队列

    设置 消费者 测试 添加多条消息 拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费 Time To Live(存活时间/过期时间)。 当消息到达存活时间后,还没有被消费,会被自动清除。 RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。 可

    2024年01月16日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包