基于springboot实现的rabbitmq消息确认

这篇具有很好参考价值的文章主要介绍了基于springboot实现的rabbitmq消息确认。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

详细

一、运行效果

rabbithandler手动确认消息,WEB系列,java-rabbitmq,spring boot,rabbitmq

二、实现过程

①、引入rabbitmq包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
②、修改application.properties配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
③、定义exchange和queue,并将queue绑定在exchange上
package com.mm.springbootrabbitmqconfirmdemo.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean(name = "confirmQueue")
    public Queue confirmQueue(){
        return  new Queue("confirmQueue",true,false,false);
    }
 
    @Bean(name = "confirmExchange")
    public FanoutExchange confirmExchange(){
        return new FanoutExchange("confirmExchange");
    }
 
    @Bean
    public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange,
                                                 @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange);
    }
 
}
④、消息发送确认

发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。

消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。

我们可以利用这两个Callback来确保消息的100%送达。

1、 ConfirmCallback确认模式

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

package com.mm.springbootrabbitmqconfirmdemo.service;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){
        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
 
    }
 
 
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。

  • ack:消息投递到broker 的状态,true表示成功。

  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

com.mm.springbootrabbitmqconfirmdemo.service;

lombok.extern.slf4j.;
org.springframework.amqp.core.Message;
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.stereotype.;

ReturnCallbackService  RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 Confirm 和 Return 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id 为10000000000

⑤、消息发送确认

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
     
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
  
        try {
            log.info("小富收到消息:{}", msg);
  
            //TODO 具体业务
             
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  
        }  catch (Exception e) {
             
            if (message.getMessageProperties().getRedelivered()) {
                 
                log.error("消息已重复处理失败,拒绝再次接收...");
                 
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                 
                log.error("消息即将再次返回队列处理...");
                 
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3、basicReject

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

三、项目结构图

rabbithandler手动确认消息,WEB系列,java-rabbitmq,spring boot,rabbitmq

四、补充

1、别忘确认消息

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。

2、消息无限投递

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
  
        try {
            log.info("消费者 2 号收到:{}", msg);
  
            int a = 1 / 0;
  
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  
        } catch (Exception e) {
  
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

3、重复消费

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis将消息持久化,通过再消息中的唯一性属性校验。

可以看到使用了 RabbitMQ 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:

  • 消息生产者 - > rabbitmq服务器(消息发送失败)

  • rabbitmq服务器自身故障导致消息丢失

  • 消息消费者 - > rabbitmq服务(消费消息失败)文章来源地址https://www.toymoban.com/news/detail-734682.html

到了这里,关于基于springboot实现的rabbitmq消息确认的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

    下载地址:link 1.下载完成放到rabbitmq安装目录plugins下 2.执行命令启用插件 3.重启mq Exchanges - add a new exchange - type 出现x-delayed-message即安装成功

    2024年02月11日
    浏览(30)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(28)
  • RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

    SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致; 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;

    2024年02月14日
    浏览(36)
  • 【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

    这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。 目录 一、防止消息丢失 1.1、消息确认机制(生产者) (1)生产者丢失消息 (2)生产者消息确认机制 1.2、消息确认机制(消费者) (1)消费者丢失消息

    2024年02月02日
    浏览(44)
  • rabbitmq的消息确认和消息回退

    前面章节我们至少知道了 rabbitmq 的几个核心组件, 比如 exchange queue 和 routing key 还有 java 编程方面的 channel 和 connection 但是这些还不够运用于生产环境 消息确认机制( message confirm ) 消息 return 机制 开始confirm机制, 生产者消息投递到exchange, exchange就会立即ack给生产者, 如果无法投

    2024年02月01日
    浏览(26)
  • 消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列

    2024年02月21日
    浏览(31)
  • RabbitMQ消息确认

    目录 1. 消息确认作用 2 开发示例 2.1 生产者确认 2.2 消费者确认 保证消息的可靠性主要依靠三种机制:一个是消息的持久化,一个是事务机制,一个就是消息的确认机制。 1)消息持久化 消息持久化是将消息写入本地文件,如果rabbitmq故障退出,在重启时会从本地文件系统读

    2024年02月13日
    浏览(31)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(32)
  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(31)
  • Rabbitmq的消息确认

    配置文件 消息从生产者到交换机 无论消息是否到交换机ConfirmCallback都会触发。 消息从交换机到队列 只有消息没到达队列才会触发ReturnsCallback 消息从队列到消费者 (ACK) 消息默认是自动确认的(手动确认需配置文件开启),无论消息是否被成功消费都会被确认,确认后消息

    2024年02月14日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包