spring boot rabbitmq 如何保持顺序消费

这篇具有很好参考价值的文章主要介绍了spring boot rabbitmq 如何保持顺序消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ 是一个消息代理和队列功能的开源实现,可以帮助构建分布式应用程序。Spring Boot 集成 RabbitMQ 可以方便地在应用程序中使用消息队列,保持顺序消费可以通过以下方式来实现:

  1. 单线程消费:使用一个线程消费消息,因为 RabbitMQ 的队列是有序的,所以保证单线程的消费能够保证消息的顺序。需要注意的是,单线程消费可能影响整体的性能。

  2. 有序分片消费:将消息队列按照一定的规则进行分割,每个分片使用一个线程消费,这样可以减少单线程消费的性能影响。保证消息有序性的关键是要确保分片规则是有序的。

  3. 使用 RabbitMQ 提供的优先级队列:优先级队列会按照消息的优先级进行排序,可以通过设置优先级来保证消息的顺序。缺点是需要将队列中的所有消息都进行排序,因此可能会影响整体性能。

  4. 使用 RabbitMQ 提供的插件:RabbitMQ 提供了插件来实现有序消费,比如 rabbitmq_delayed_message_exchange 插件可以延迟消息投递,保证消息的有序性。此外,还有 RabbitMQ Stream 插件等。

如果实现有序分片消费?

要实现有序分片消费,可以先将消息队列按照一定的规则(如消息 ID、时间戳等)分成多个分片,然后每个分片使用一个单独的消费者线程消费消息。要保证消息的顺序,需要在分片规则上做额外的处理,确保分片规则是有序的,然后让每个消费者只消费自己所负责分片的消息。

以下是实现有序分片消费的代码示例:

首先定义一个分片规则,例如按照消息 ID 的 hash 值分片:

int numShards = 10; // 分成 10 个分片
public int getShardIndex(String messageId) {
    int hash = Math.abs(messageId.hashCode());
    return hash % numShards;
}

然后创建多个消费者线程,每个线程只负责消费自己所负责的分片:

@RabbitListener(queues = "myQueue")
public void processMessage(Message message) {
    String messageId = extractMessageId(message);

    int shardIndex = getShardIndex(messageId);
    if (shardIndex == myShardIndex) {
        // 处理消息逻辑
    }
}

可以使用 Spring Boot 提供的 @RabbitListener 注解来监听消息队列。在消费消息时,先从消息中提取出消息 ID,然后根据分片规则计算出当前消费者线程负责的分片编号,如果当前线程负责的分片与消息所在分片相同,则处理该消息。这样每个消费者线程只会消费自己负责的分片,就能保证消息的有序性。

下面是一个完整的示例,包括消费者类、消息发送者类和一个测试用例:

消息消费者类:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

@Component
public class MyConsumer {
    private int myShardIndex;
    private int numShards = 10;

    private AtomicInteger counter = new AtomicInteger(0);

    public MyConsumer() {
        // 假设从配置文件中读取 myShardIndex
        myShardIndex = 3;
    }

    @RabbitListener(queues = "myQueue")
    public void processMessage(Message message) {
        String messageId = extractMessageId(message);

        int shardIndex = getShardIndex(messageId);
        if (shardIndex == myShardIndex) {
            int count = counter.getAndIncrement();
            System.out.println("Consumer " + myShardIndex + " received message " + message.getBody() + " (" + count + ")");
        }
    }

    private int getShardIndex(String messageId) {
        int hash = Math.abs(messageId.hashCode());
        return hash % numShards;
    }

    private String extractMessageId(Message message) {
        // 假设 message 的 messageId 在 messageProperties 的 headers 中
        return message.getMessageProperties().getHeaders().get("messageId").toString();
    }
}

消息发送者类:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MySender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendMessage() {
        String messageId = UUID.randomUUID().toString();
        String message = "Hello, RabbitMQ";

        rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, msg -> {
            msg.getMessageProperties().getHeaders().put("messageId", messageId);
            return msg;
        });
    }
}

测试用例:

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import static org.junit.jupiter.api.Assertions.*;

@ExtendWith(SpringExtension.class)
@SpringBootTest
public class MyConsumerTest {
    @Autowired
    private MySender sender;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSharding() throws InterruptedException {
        // 发送消息
        for (int i = 0; i < 100; i++) {
            sender.sendMessage();
        }

        // 等待消息被消费完毕
        Thread.sleep(5000);

        // 检查是否有所有 shard 都有消息被消费到
        for (int i = 0; i < 10; i++) {
            int count = (int) rabbitTemplate.receiveAndConvert("myQueue", 10000);
            assertTrue(count > 0, "Shard " + i + " has not received any message");
        }

        // 清空队列中的消息
        while (rabbitTemplate.receiveAndConvert("myQueue") != null) {}
    }
}

这个示例中,MyConsumer 类处理来自 "myQueue" 队列的消息,并根据消息的 messageId 对消息进行分片。如果消息对应的 shard 索引和当前实例的 shard 索引相同,则处理消息。否则忽略该消息。

MySender 类负责发送消息到 "myExchange" 交换器,交换器将消息路由到 "myRoutingKey" 绑定的队列中。这里通过设置消息的 messageId,来模拟产生不同的 shard 索引。

MyConsumerTest 测试用例会发送 100 条消息到队列中,并等待 5 秒钟,然后检查所有的 shard 是否都收到了消息。如果有 shard 没有收到消息,则测试失败。文章来源地址https://www.toymoban.com/news/detail-709251.html

到了这里,关于spring boot rabbitmq 如何保持顺序消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 如何保证消息的消费顺序

    我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是: 更改用户会员等级。 根据会员等级计算订单价格。 假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

    2024年02月13日
    浏览(38)
  • kafka 如何保证消息的顺序消费

    在Kafka分布式集群中,要保证消息的顺序消费,您可以采取以下措施: 分区策略 :Kafka的主题可以分为多个分区,每个分区内的消息是有序的。因此,首先要确保生产者将相关的消息发送到同一个分区。这可以通过生产者的分区策略来实现。默认情况下,Kafka会使用基于消息

    2024年02月06日
    浏览(31)
  • Kafka 如何保证消息消费的全局顺序性

    哈喽大家好,我是咸鱼 今天我们继续来讲一讲 Kafka 当有消息被生产出来的时候,如果没有指定分区或者指定 key ,那么消费会按照【轮询】的方式均匀地分配到所有可用分区中,但不一定按照分区顺序来分配 我们知道,在 Kafka 中消费者可以订阅一个或多个主题,并被分配一

    2024年02月05日
    浏览(35)
  • Kafka面试】Kafka如何保证消费的顺序性?

    消费者组的某个消费者可能负责消费 一个topic的多个分区 。每个分区都维护了偏移量(都是从0开始的),在消息存储时按照一定的策略来找到不同的分区进行存储,消费同样如此,并不能保证消息的顺序性。 要想保证顺序性,可以只提供一个分区,或者相同的业务只在一个

    2024年02月15日
    浏览(30)
  • RabbitMQ如何保证顺序性

    顺序性 : 消息的顺序性是指消费者消费到消息和发送者发布的消息的顺序是一致的 举个例子,不考虑消息重复的情况下,如果生产者发布的消息分别为msg1、msg2、msg3 那么消费者必然也是按照 msg1、msg2、msg3 的顺序来消费的 目前很多资料显示RabbitMQ消息能够保障顺序性,这是

    2024年02月13日
    浏览(26)
  • kafka 分布式的情况下,如何保证消息的顺序消费?

    目录 一、什么是分布式 二、kafka介绍 三、消息的顺序消费 四、如何保证消息的顺序消费   分布式是指将计算任务分散到多个计算节点上进行并行处理的一种计算模型。在分布式系统中,多台计算机通过网络互联,共同协作完成任务。每个计算节点都可以独立运行,并且可以

    2024年02月10日
    浏览(42)
  • 如何保证RabbitMQ消息的顺序性

    针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;比如:在写入消息队列的数据做唯一标示,消费消 息时,根据唯一标识判断是否消费过;假设你有个系统,消费一条消息就往数据库里插入一条数据,

    2024年02月07日
    浏览(26)
  • 【RabbitMQ】RabbitMQ如何确认消息被消费、以及保证消息的幂等

    目录 一、如何保证消息被消费 二、如何保证消息幂等性 RabbitMQ提供了消息补偿机制来保证消息被消费,当一条消费被发送后,到达队列后发给消费者。消费者消费成功后会给MQ服务器的队列发送一个确认消息,此时会有一个回调检测服务监听该接收确认消息的队列,然将消费

    2024年02月16日
    浏览(31)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(36)
  • 【云原生进阶之PaaS中间件】第四章RabbitMQ-4.3-如何保证消息的可靠性投递与消费

            根据RabbitMQ的工作模式,一条消息从生产者发出,到消费者消费,需要经历以下4个步骤: 生产者将消息发送给RabbitMQ的Exchange交换机; Exchange交换机根据Routing key将消息路由到指定的Queue队列; 消息在Queue中暂存,等待消费者消费消息; 消费者从Queue中取出消息消费

    2024年03月11日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包