「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

这篇具有很好参考价值的文章主要介绍了「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

介绍

方案

配置手动确认

使用 「Bean 」 配置RabbitMQ的属性

确定消费、拒绝消费、拒绝消费进入死信队列

模拟生产者发送消息①


介绍

        RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电商交易系统等。以下是消息确认机制的一些常见应用场景和好处:

        1. 确认消息的可靠性

        在 RabbitMQ 中,生产者将消息发送到队列之后就不能再控制该消息的安全性,而消费者需要及时地对该消息进行处理并进行确认,以确保该消息已经被成功消费。使用消息确认机制可以保证消息只会被消费一次,从而确保消息的可靠性。

        2. 防止消息丢失

        在 RabbitMQ 中,当消费者从队列中取出消息之后,消息就被认为是已经消费,如果消费者在消费过程中出现异常导致消费失败,那么该消息就会从队列中被删除,从而导致消息丢失。使用消息确认机制可以避免这种情况的发生,从而保证消息不会丢失。

        3. 避免重复消费

        在 RabbitMQ 中,如果消费者在处理完一个消息之后没有及时确认该消息已经被消费,那么 RabbitMQ 认为该消息未被消费,就会将该消息重新发送给另一个消费者进行消费,从而导致消息重复消费。使用消息确认机制可以避免这种情况的发生,从而保证消息只会被消费一次。

        4. 节约系统资源

        在 RabbitMQ 中,当一个消费者同时处理多个消息时,可能会导致系统资源短缺或者消息被重复消费。使用消息确认机制可以限制消费者一次只处理一个消息,从而提高系统的稳定性和可靠性,同时还可以避免消息被重复消费的问题。

        综上所述,消息确认机制在 RabbitMQ 中的应用场景非常广泛,可以有效地保证消息的可靠性、避免消息丢失和重复消费、节约系统资源等。因此,在实际应用中,推荐使用消息确认机制来确保 RabbitMQ 的高可用和高性能。

方案

        在消息传递系统中,实现消息的可靠性可以通过引入消息确认机制来完成。该机制涉及三个方面:确认消息的发送、确认消息的接收以及拒收消息的处理。以下是这一优化的详细方案:

  1. 确认消息的发送:

    • 发送者在向消息队列发送消息之前,需等待接收到消息队列发出的确认信号。
    • 当消息成功写入消息队列后,消息队列会发送一个确认信号给发送者,表示消息已经被成功接收并保存。
    • 如果发送者在一定时间内未收到确认信号,可以选择重新发送消息或执行其他错误处理逻辑。
  2. 确认消息的接收:

    • 接收者在从消息队列中获取消息后,需发送一个确认信号给消息队列,表示已经成功接收到该消息。
    • 消息队列收到确认信号后,会将该消息标记为已确认,并在需要的情况下进行下一步处理。
    • 如果接收者在一定时间内未发送确认信号,消息队列可以将该消息重新投递给其他接收者或执行其他补救措施。
  3. 拒收消息的处理:

    • 如果接收者无法处理某条消息,可以发送拒收信号给消息队列,表示拒绝接收该消息。
    • 消息队列收到拒收信号后,可以将该消息重新投递给其他接收者或执行其他适当的处理策略。
    • 发送拒收信号的原因可能包括消息格式错误、业务逻辑不符等。

        通过实现消息确认机制,可以提高消息传递的可靠性和稳定性。发送者可以确保消息被正确写入消息队列,接收者可以确保每条消息被成功接收,并且拒收功能可以帮助处理无法处理的消息。

配置手动确认

#自动签收:auto  手动:manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

若要实现手动确认,必须在配置中这样配置,否则消息会被重复消费,还会遇见不可预料的报错结果

使用 「Bean 」 配置RabbitMQ的属性

@Configuration
public class RabbitMqConfig {
    Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        // 确认消息送到交换机(Exchange)回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                assert correlationData != null;
                logger.info("消息确认送到交换机(Exchange),消息的唯一标识符:{}", correlationData.getId());
            } else {
                logger.info("投递失败,错误原因 :{}", cause);
            }
        });
        return rabbitTemplate;
    }
}

生产者发送的消息,不管成功与否都会调用回调函数,确保消息已经成功发送到交换机中

如果设置手动确认,则所有队列中的消息被消费后都需要手动确认,不然不会从队列中移除,第二次重启服务后还会被重复消费,如下图所示:

rabbittemplate 消息确认,消息中间件,java-rabbitmq,rabbitmq,java

确定消费、拒绝消费、拒绝消费进入死信队列

@Configuration
public class SimpleQueueConfig {
    Logger logger = LoggerFactory.getLogger(SimpleQueueConfig.class);
    private static Map<Long, String> list = new HashMap<>();

    @Bean(name = "simpleQueue")
    public Queue queue() {
        Map<String, Object> arguments = new HashMap<>(4);
        arguments.put("x-message-ttl", 20000);
        arguments.put("x-max-length", 1000);
        arguments.put("x-dead-letter-exchange", "dead.exchange");
        arguments.put("x-dead-letter-routing-key", "dead.message");
        return new Queue("simple_queue", true, false, false, arguments);
    }

    @Bean(name = "deadQueue")
    public Queue deadQueue() {
        return new Queue("dead.queue", true, false, false);
    }

    @Bean(name = "deadExchange")
    public Exchange exchange() {
        return new DirectExchange("dead.exchange", true, false);
    }

    @Bean(name = "deadBinding")
    public Binding binding() {
        return BindingBuilder.bind(deadQueue()).to(exchange()).with("dead.message").noargs();
    }

    @RabbitListener(queues = "dead.queue")
    public void readDeadMessage(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        logger.info("接收到的死信消息为:{}", msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "simple_queue")
    public void readMessage(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println(msg);
        try {
            if (msg.contains("2") || msg.contains("7")) {
                logger.info("拒绝消费,(false)不重回队列,进入死信队列,消息为:{}", msg);
                // 第二个参数若为TRUE,则表示拒绝消费,重回队列让其他消费者消费,也可能自己会再次消费,若为FALSE,则表示不重回队列,将消息发送到死信队列中(前提是该队列绑定了死信队列)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            } else if (msg.contains("3")) {
                // 消费报了异常
                int i = 1 / 0;
            } else {
                logger.info("确认消费,消息为:{}", msg);
                // 符合消费的条件,确认消费,第二个参数表示,是否批量确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            logger.info("报错消息,拒绝消费,直接丢弃,进入死信队列,消息为:{}", msg);
            // 进入异常方法,拒绝当前消费,第二个参数表示是否批量拒绝,第三个参数表示当前消息是否重回队列顶部,若为FALSE则表示丢弃该消息,但该消息会进入死信队列(前提是该队列绑定了死信队列)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } finally {
            // 限制消费者只有在确认之前最多接收一个未确认的消息
            channel.basicQos(1);
        }
    }

    @RabbitListener(queues = "simple_queue")
    public void readMessageTwo(Message message, Channel channel) throws IOException {
        logger.info("two接收one拒绝的消息为:{}", new String(message.getBody()));
        // 一次只确认一条消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

注意:

channel.basicReject和channel.basicNack的主要区别是:是否可以批量拒绝文章来源地址https://www.toymoban.com/news/detail-719321.html

模拟生产者发送消息①

@SpringBootTest(classes = MqApplication.class)
@RunWith(SpringRunner.class)
public class ProducerSimpleTest {
    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 1; i <=10; i++) {
            String msg = "消息" + i;
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("simple_queue", (Object) msg, correlationData);
        }
    }
}

到了这里,关于「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(53)
  • RabbitMQ可靠性消息发送(java实现)

    本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客 step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性; step2:Producer发送消息到MQ Broker; step3:Producer收到 broker 返回的确认消息; step4:更改消息记录库的状态(定义三种状态:0待确

    2024年02月04日
    浏览(70)
  • RabbitMQ消息可靠性(一)-- 生产者消息确认

    目录 前言 一、消息确认流程图 二、生产者消息确认 1、publisher-confirm(发送者确认) 2、publisher-return(发送者回执) 三、代码实现 1、修改application.yml 配置 2、ConfirmCallback函数和ReturnCallback函数 在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一

    2024年02月04日
    浏览(73)
  • RabbitMQ发送方确认机制

    RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失: 消息发送到交换机的过程。 消息从交换机发送到队列的过程。 而RabbitMQ提供了类似于回调函数的机制来告诉发送

    2024年02月09日
    浏览(28)
  • RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

    代码位置 消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。 为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。 如果RabbitMQ没有收

    2024年02月10日
    浏览(50)
  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(57)
  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(40)
  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(42)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(46)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包