如何保证消息不被重复消费?

这篇具有很好参考价值的文章主要介绍了如何保证消息不被重复消费?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

       在Java中,可以使用消息队列来实现消息的异步处理,其中常用的消息队列有 RabbitMQ、ActiveMQ、Kafka 等。

  为了避免消息被重复消费,可以使用以下几种方法:

  1.消息队列提供的幂等性机制

  常见的消息队列如 Kafka、RocketMQ等提供了幂等性机制,能够确保同一条消息被消费多次时只会产生一次影响。在Kafka中,可以通过设置消息的key来实现幂等性。

  2.消费者自己维护消费记录

  消费者可以在消费一条消息后,将其在数据库中或者内存中记录下来。在消费下一条消息时,先查询是否已经消费过该消息,如果已经消费过,则不再处理。

  3.使用分布式锁

  在消费消息时,可以使用分布式锁来保证同一条消息只会被一个消费者处理。常见的分布式锁实现有 ZooKeeper、Redis 等。

  接下来我们以使用RabbitMQ消息队列为例,演示如何实现消息的幂等性。

  (1)首先,需要使用RabbitMQ的Java客户端库,可以通过Maven引入以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

  (2)接着,可以使用以下代码向RabbitMQ发送消息:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String message = "Hello World!";
String exchangeName = "test_exchange";
String routingKey = "test_routing_key";
String messageId = UUID.randomUUID().toString();

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .messageId(messageId)
        .build();

channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());

System.out.println("Sent message: " + message);

channel.close();
connection.close();

  在发送消息时,使用了 UUID.randomUUID().toString()生成了一个唯一的消息 ID,作为消息的messageId 属性。

  (3)然后,可以使用以下代码消费 RabbitMQ 中的消息,并实现幂等性:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String exchangeName = "test_exchange";
String queueName = "test_queue";
String routingKey = "test_routing_key";

channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String messageId = properties.getMessageId();
        String message = new String(body, StandardCharsets.UTF_8);

        if (!isMessageProcessed(messageId)) {
            processMessage(message);
            saveProcessedMessage(messageId);
        }

        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};

channel.basicConsume(queueName, false, consumer);

System.out.println("Waiting for messages...");

  在消费消息时,首先从消息的properties中获取messageId属性,并使用isMessageProcessed()方法查询该消息是否已经被处理过。如果没有被处理过,则调用processMessage()方法处理该消息,并使用 saveProcessedMessage()方法保存已经处理过的消息。

  在处理完消息后,还需要调用channel.basicAck(envelope.getDeliveryTag(), false)方法确认消息已经被消费。这是因为RabbitMQ是一个消息的投递机制,只有在消费者确认了消息已经被处理后,才会从消息队列中删除该消息。

  以上代码演示了如何在RabbitMQ中实现消息的幂等性。在消费消息时,通过查询已经处理过的消息来避免重复处理。同时,使用了唯一的消息ID来确保同一条消息只会被处理一次。

  笔者开篇列举的3种方法均可保证消息不被重复消费,但需要根据实际情况选择适合自己的方案。文章来源地址https://www.toymoban.com/news/detail-573074.html

到了这里,关于如何保证消息不被重复消费?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ防止消息重复消费、保证异步消息的幂等性

    一、rabbitmq出现消息重复的场景 1、消费成功,没有进行ack,这时 Broker 会重新发送 2、不确认(unack)或 reject 之后,重新排队,Broker 会重新发送 3、消费成功,ack时宕机,没有ack成功,消息由unack变为ready,Broker又重新发送 4、总的来说就是 Broker 发送消息后,消费端收到消息

    2024年02月13日
    浏览(31)
  • JAVA面试题分享一百六十二:Kafka消息重复消费问题?

    消息重复消费的根本原因都在于:已经消费了数据,但是offset没有成功提交。 其中很大一部分原因在于发生了再均衡。 1)消费者宕机、重启等。导致消息已经消费但是没有提交offset。 2)消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生

    2024年02月03日
    浏览(36)
  • RabbitMQ消息丢失、消息重复消费、消息顺序性无法保证、消息积压、一致性问题、系统可用性降低等这些常见问题怎么解决

    该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 1. 消息丢失 问题 :在生产者发送消息到MQ、MQ内部处理、消费者接收消息的任一环节都可能导致消息丢失。 解决方案 : 生产者确认机制 :确保消息

    2024年04月25日
    浏览(27)
  • Kafka 如何保证消息的消费顺序

    我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是: 更改用户会员等级。 根据会员等级计算订单价格。 假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

    2024年02月13日
    浏览(38)
  • kafka 如何保证消息的顺序消费

    在Kafka分布式集群中,要保证消息的顺序消费,您可以采取以下措施: 分区策略 :Kafka的主题可以分为多个分区,每个分区内的消息是有序的。因此,首先要确保生产者将相关的消息发送到同一个分区。这可以通过生产者的分区策略来实现。默认情况下,Kafka会使用基于消息

    2024年02月06日
    浏览(31)
  • Kafka如何保证消息⼀定能被消费

    Kafka 通过多种机制来保证消息一定能被消费,从而实现数据的可靠性和持久性。 以下是一些常见的方法和策略来提高消息的可靠性: 复制机制: Kafka 使用了分区和副本的概念。每个分区可以有多个副本,分布在不同的 Broker 上。当消息写入到一个分区时,它会被复制到该分

    2024年02月12日
    浏览(33)
  • RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?

    consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试) 什么是ACK 消息确认机制 在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者

    2023年04月13日
    浏览(36)
  • 如何避免重复消费消息

    博主介绍: ✌全网粉丝3W+,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验✌ 博主作品: 《Java项目案例》主要基于SpringBoot+MyBatis/MyBatis

    2024年02月10日
    浏览(79)
  • Kafka 如何保证消息消费的全局顺序性

    哈喽大家好,我是咸鱼 今天我们继续来讲一讲 Kafka 当有消息被生产出来的时候,如果没有指定分区或者指定 key ,那么消费会按照【轮询】的方式均匀地分配到所有可用分区中,但不一定按照分区顺序来分配 我们知道,在 Kafka 中消费者可以订阅一个或多个主题,并被分配一

    2024年02月05日
    浏览(35)
  • 【消息队列】聊一下如何避免消息的重复消费

    一条消息在传输过程中,为了保证消息的不丢失,可能会多少量的消息进行重试,这样就可能导致Broker接受到的消息出现重复,如果说下游系统没有针对业务上的处理,那么可能导致同一笔借款或者支付订单出现重复扣款或者重复还款的情况。业务上是不允许出现的。 在MQ

    2024年02月10日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包