SpringBoot集成RabbitMQ之ACK确认机制(第三节)

这篇具有很好参考价值的文章主要介绍了SpringBoot集成RabbitMQ之ACK确认机制(第三节)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

开始语

📝简述

🗒️模式NONE

application配置

生产者

消费者

结果验证

🗒️模式AUTO

application配置

生产者

消费者

结果验证

🗒️模式ACK(重点)

application配置

生产者

消费者

结果验证

🗒️生产者确认机制

yml添加配置

修改生产者代码

结果验证

结束语


开始语

一位普通的程序员,慢慢在努力变强!

在此文章之前,想学习前面部分的请看下列列表

RabbitMQ部署方式(第一节)👈

SpringBoot集成RabbitMQ(第二节)👈

📝简述

ACK模式代表的是mq的确认机制,简单来讲就是【生产者】在发送消息的时候,发送成功mq有一个消息收到confirm回调机制,发送失败有一个return回调机制, 【消费者】在接收消息后,在执行消费完消息需要有一个确认机制,要告诉mq,这个消息我消费成功了,请将队列中的消息删除,如果是失败了,你也进行确认、或者拒绝要告诉mq,不然消息会一直存在于队列中

在RabbitMQ消费者中一共有三种模式:

NODE:对于消息的成功和失败都不管,MQ队列中都会将消息删除。(不安全)

AUTO:自动确认模式,对于消息消费成功,MQ队列中的消息将会自动删除,消费失败则会一直对消息进行消费,有没有解决方案,当然是有的,文章中会注明(不稳定,如果消费者不能保证百分百消息成功,auto模式还是不建议使用)

MANUAL:此模式就是对AUTO模式下新增了一个确认机制,消费者对消息的消费成功和失败都需要给出一个消费确认的标识和动作!

开启生产者确认机制:

# 消息发送交换机,开启确认回调模式

publisher-confirm-type: correlated

# 消息发送交换机,开启确认机制,并且返回回调

publisher-returns: true

🗒️模式NONE

application配置

spring:
  application:
    name: rabbitmq-deadLetter
  rabbitmq:
    host: tianyu.com.cn
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 20000
    listener:
      simple:
        acknowledge-mode: none

生产者

@Test
public void workTest() throws InterruptedException {
    for (int i = 1; i <= 10; i++) {
        rabbitTemplate.convertAndSend("work", "[workTest] send 消息发送" + i);
    }
    Thread.sleep(10000);
    System.out.println("模式:"+rabbitProperties.getListener().getSimple().getAcknowledgeMode());
}

 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;


/**
 * 消费者监听 (第二种模型:工作模式)
 *
 * @author 猿仁
 * @data 2023-01-31 09:38
 */
@Component
@Slf4j
public class WorkCustomer {

    /**
     * 消费者1
     *
     * @param data Body响应内容
     * @return void
     * @author 猿仁
     * @date 2023/1/31 9:38
     */
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void work1(String data) {
        log.info("[work1]消费者消费成功success:{}",data);
    }

    /**
     * 消费者2
     *
     * @param data Body响应内容
     * @return void
     * @author 猿仁
     * @date 2023/1/31 9:38
     */
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void work2(String data) {
        // log.info("[work2]消费者消费成功success:{}",data);
        try {
            int a = 0 / 0;
        }catch (Exception e){
            log.error("[work2]消费者消费失败fail:{}",data);
            throw e;
        }
    }
}

结果验证

channel.basicack(message.getmessageproperties().getdeliverytag(),false);,RabbitMQ系列,java-rabbitmq,rabbitmq,spring boot

结果解析:

从上面途中可以看出,此模式针对成功和失败的消息都是直接成功,只要消费者接受到消息(不管消费者是否有异常),都当作是消费成功处理。

优点L:此模式很难被阻塞,消费能力不足,多开几个消费来消费即可。

缺点:失败的消息被丢弃了,在现实开发中不允许丢弃消息的(比如:发货队列,某个账号在通知进销存系统进行发货时消息发送了,但是处理异常直接丢失了,此时用户不知道自己的货还没有发出,没有没有短信和物流,货未发就没有接下来的一系列消息)

🗒️模式AUTO

application配置

spring:
  application:
    name: rabbitmq-deadLetter
  rabbitmq:
    host: tianyu.com.cn
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 20000
    listener:
      simple:
        acknowledge-mode: auto # 此处开启了,没有设置死信,过期时间,最好设置一下异常不会回归队列的配置,不然会出现一直重新消费的问题。配置如下:
        default-requeue-rejected: false # 是否将失败消息回归队列

生产者

/**
 * auto模式
 */
@Test
public void autoTest() throws InterruptedException {
    // 开启ack模式 完全消费,队列中无消息
    rabbitTemplate.convertAndSend("auto_no_err", "测试auto消费者模式");
    // 未开启ack模式 结果是队列中还存在一条等待被消费的消息
    rabbitTemplate.convertAndSend("auto_err", "测试auto消费者模式");
    Thread.sleep(3000);
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * none消费者模式
 *
 * @author tianyu.Ge
 * @date 2023/2/6 12:55
 */

@Component
@Slf4j
public class AutoCustomer {

    /**
     * @param data    Body响应内容
     * @param headers 请求头
     * @param channel 通道
     * @param message 消息
     * @return void
     * @author 猿仁
     * @date 2023/1/31 9:38
     */
    @RabbitListener(queuesToDeclare = @Queue(value = "auto_no_err", durable = "true", autoDelete = "false"))
    public void autoNoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) {
        System.out.println("获取到了通道[autoNoErr]的数据!" + data);
        log.info("Payload:  {}", data);
        log.info("Headers:  {}", headers);
        log.info("Channel:  {}", channel);
        log.info("Message:  {}", message);
    }

    /**
     * @param data    Body响应内容
     * @param headers 请求头
     * @param channel 通道
     * @param message 消息
     * @return void
     * @author 猿仁
     * @date 2023/1/31 9:38
     */
    @RabbitListener(queuesToDeclare = @Queue(value = "auto_err", durable = "true", autoDelete = "false"))
    public void autoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) {
        int a = 0 / 0;
        System.out.println("获取到了通道[autoErr]的数据!" + data);
        log.info("Payload:  {}", data);
        log.info("Headers:  {}", headers);
        log.info("Channel:  {}", channel);
        log.info("Message:  {}", message);
    }

}

结果验证

channel.basicack(message.getmessageproperties().getdeliverytag(),false);,RabbitMQ系列,java-rabbitmq,rabbitmq,spring boot

 

结果分析:

生产者发送了两条消息,一条正常,一条异常,那么在开启了default-requeue-rejected: false这个回归队列配置,消息只要是消费者接收了,就当做消费成功,不关心你消费者是否在消费的途中出现异常,队列都将会删除队列中对应的消息。 如果没有配置default-requeue-rejected那么消费者出现异常,消息会重回队列,然后由消费者重新进行消费,导致一直重复消费!

🗒️模式ACK(重点)

 application配置

spring:
  application:
    name: rabbitmq-ack
  rabbitmq:
    host: tianyu.com.cn
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 2000000
    listener:
      direct:
        # 采用手动应答
        acknowledge-mode: manual
      simple:
        # 指定最大的消费者数量
        max-concurrency: 50
        # 指定最小的消费者数量
        concurrency: 1
        # 采用手动应答
        acknowledge-mode: manual
        retry:
          # 是否开启重试机制
          enabled: true
          # 默认是3,是一共三次,而不是重试三次,三次包含了第一执行,所以只重试了两次
          max-attempts: 3
          # 重试间隔时间。毫秒
          initial-interval: 2000
        default-requeue-rejected: false

生产者

/**
 * ack模式
 */
@Test
public void ackTest() throws InterruptedException {
    // 开启ack模式 完全消费,队列中无消息
    rabbitTemplate.convertAndSend("ack_no_err", "测试ack消费者模式");
    // 未开启ack模式 结果是队列中还存在一条等待被消费的消息
    rabbitTemplate.convertAndSend("ack_err", "测试ack消费者模式");
    Thread.sleep(6000);
    System.out.println("一共执行"+ AckCustomer.count +"次!");
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 手动确认模式 消费者模式
 *
 * @author tianyu.Ge
 * @date 2023/2/6 12:55
 */

@Component
@Slf4j
public class AckCustomer {

    // 重试的次数
    public static int count = 0;

    /**
     * @param data    Body响应内容
     * @param headers 请求头
     * @param channel 通道
     * @param message 消息
     * @return void
     * @author 猿仁
     * @date 2023/1/31 9:38
     */
    @RabbitListener(queuesToDeclare = @Queue(value = "ack_no_err", durable = "true", autoDelete = "false"))
    public void ackNoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) throws IOException {
        System.out.println("获取到了通道[ackNoErr]的数据!" + data);
        log.info("Payload:  {}", data);
        log.info("Headers:  {}", headers);
        log.info("Channel:  {}", channel);
        log.info("Message:  {}", message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    /**
     * @param data    Body响应内容
     * @param headers 请求头
     * @param channel 通道
     * @param message 消息
     * @return void
     * @author 猿仁
     * @date 2023/1/31 9:38
     */
    @RabbitListener(queuesToDeclare = @Queue(value = "ack_err", durable = "true", autoDelete = "false"))
    public void ackErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) throws IOException {
        try {
            ++count;
            int a = 0 / 0;
            /**
             * 参数1:消息标签
             * 参数2:是否批量确认,属于一个队列中的消息,全部确认,false:只确认当前消息
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e) {
            log.error("[ackErr]消费者出现异常:{}",e.getMessage());
            /**
             * 【否认策略】
             *
             *  参数1:消息标签
             *  参数2:是否批量处理 true:批量  (通道中A\B\C\D接收到第一个那么后面的不管成没成功都会被应答,不安全,只有在确保通道中的消息百分百消费成功时才可使用),false:只确认当前消息
             *  参数3:被拒绝的消息是否回归队列 true:回归,false:丢弃 【注意】:如果只有一个消费者的话,true将导致无限循坏, 应该改为false:并且通知mq丢弃或者不处理
             */
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            if(count == 3){
                // 当执行最后一次的时候,失败了,那么直接丢弃,从队列中删除
                log.info("[ackErr]一共执行{}次,还是失败,开启确认失败!",count);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            }
            throw e;
        }
        System.out.println("获取到了通道[ackErr]的数据!" + data);
        log.info("Payload:  {}", data);
        log.info("Headers:  {}", headers);
        log.info("Channel:  {}", channel);
        log.info("Message:  {}", message);
    }

}

结果验证

channel.basicack(message.getmessageproperties().getdeliverytag(),false);,RabbitMQ系列,java-rabbitmq,rabbitmq,spring boot

结果分析:

开启ack模式,那么不管消费者是否执行成功或者失败,都需要给予一个消息的确认,成功就确认成功,失败就确认失败,如果没有确认,那么消息一直会存在队列,并且没有确认ack的消息会再unacked中显示数量。

🗒️生产者确认机制

yml添加配置

spring:
  application:
    name: rabbitmq-ack
  rabbitmq:
    # 消息发送交换机,开启确认回调模式
    publisher-confirm-type: correlated
    # 消息发送交换机,开启确认机制,并且返回回调
    publisher-returns: true
#    template:
#      # 指定消息在没有被队列接收时是否强行退回还是直接丢弃:ReturnCallback.returnedMessage消息未送达回调(true)
#      mandatory: true

修改生产者代码

/**
 * ack模式之生产者确认机制
 */
@Test
public void ackPublisherTest() throws InterruptedException {
    // 生产者消息确认机制开启
    rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
        if (ack) {
            System.out.println("ConfirmCallback:correlationData:  " + correlationData);
            System.out.println("ConfirmCallback:correlationData.body:  " + new String(correlationData.getReturnedMessage().getBody()));
            System.out.println("ConfirmCallback:ack:  " + ack);
            System.out.println("ConfirmCallback:cause:  " + cause);
        }else {
            System.out.println("没有ack,又是怎样的,猿友们有空可以研究研究");
        }
    });
    rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
        System.out.println("ReturnCallback:  " + message);
        System.out.println("ReturnCallback:  " + replyCode);
        System.out.println("ReturnCallback:  " + replyText);
        System.out.println("ReturnCallback:  " + exchange);
        System.out.println("ReturnCallback:  " + routingKey);
    });
    rabbitTemplate.setMandatory(true);

    // 开启ack模式 完全消费,队列中无消息
    CorrelationData correlationData = new CorrelationData();
    correlationData.setReturnedMessage(new Message("测试ack消费者模式".getBytes(), new MessageProperties()));
    correlationData.setId("A");
    rabbitTemplate.convertAndSend("ack_no_err", "测试ack消费者模式", (Message message) -> {
        // 可以配置一些request请求参数
        // message.getMessageProperties().setHeader("token", "123-213-we-123-sd-ad2-");
        //message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
        return message;
    }, correlationData);


    // 未开启ack模式 结果是队列中还存在一条等待被消费的消息
    CorrelationData correlationData1 = new CorrelationData();
    correlationData1.setReturnedMessage(new Message("测试ack消费者模式".getBytes(), new MessageProperties()));
    correlationData1.setId("B");
    rabbitTemplate.convertAndSend("ack_err", "测试ack消费者模式", (Message message) -> {
        return message;
    }, correlationData1);


    // 没有的队列名称 这条将会被ReturnCallback监听到,因为没有队列ack_err_123,消息不可达
    rabbitTemplate.convertAndSend("ack_err_123", "测试ack消费者模式");

    Thread.sleep(6000);
    System.out.println("一共执行" + AckCustomer.count + "次!");
}

结果验证

 channel.basicack(message.getmessageproperties().getdeliverytag(),false);,RabbitMQ系列,java-rabbitmq,rabbitmq,spring boot

结果分析:

开启消息回调,那么生产者在发送消息的时候就可以捕捉消息是否发送成功,发送成功会进入ConfirmCallback回调代码块,消息发送失败会进入 ReturnCallback代码块。到这里就完成了消息的确认机制,保证了消息可靠性!

结束语

温馨提示:如有问题,可在下方留言,作者看到了会第一时间回复!

本章节完成了,各位正在努力的程序员们,如果你们觉得本文章对您有用的话,你学到了一些东西,希望猿友们点赞+关注,支持一下猿仁!
持续更新中…文章来源地址https://www.toymoban.com/news/detail-765073.html

到了这里,关于SpringBoot集成RabbitMQ之ACK确认机制(第三节)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ初级篇:生产者与消费者关系、消息确认机制(ACK)、交换器与队列进行消息路由和存储

    在RabbitMQ中,生产者(Producer) 负责发送消息 ,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则 负责处理接收到的这些消息 。在RabbitMQ中,生产者和消费者之间使用 交换器(Exchange)和队列(Queue)进行消息路由和存储 。生产者将消息发送到

    2024年02月01日
    浏览(41)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(70)
  • 8. springboot + rabbitmq 消息发布确认机制

    在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。 消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,

    2023年04月25日
    浏览(60)
  • kafka ack确认机制

    Kafka使用ACK(Acknowledgment)确认机制来确保消息在生产者和消费者之间的可靠传递。这个机制确保消息在被认为已成功发送或处理之前不会被丢失。Kafka的ACK确认机制有三个级别: acks=0: 这是最快速的确认级别,也是最不可靠的。生产者发送消息后不会等待任何确认,直接将

    2024年03月16日
    浏览(39)
  • 【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势

    我们知道 RabbitMq 提供了两种机制,来确保发送端的消息被 brocke 正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势 首先创建一个 SpringBoot 项目,用于后续的演示 springboot 版本为 2.2.1.RELEASE rabbitmq 版本为  3.7.5   依赖配置文件 pom.xml 在 a

    2024年01月18日
    浏览(43)
  • 使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

    下载地址:link 1.下载完成放到rabbitmq安装目录plugins下 2.执行命令启用插件 3.重启mq Exchanges - add a new exchange - type 出现x-delayed-message即安装成功

    2024年02月11日
    浏览(40)
  • SpringBoot集成常用第三方框架-RabbitMQ

    作者主页:编程指南针 作者简介:Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容:Java项目、Python项目、前端项目、人工智能与大数据、简历模板、学习资料、面试题库

    2024年01月17日
    浏览(57)
  • RabbitMQ的ack和nack机制

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 当生产者的发送消息到exchange,并路由到对应的队列后,MQ主动push消息到channel,由应用线程从channel中获取消息。 主动ACK是指在MQ主动push到channel中后,channel立马自动的给到MQ ack响应,然后MQ删除消息。

    2024年02月11日
    浏览(40)
  • 【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认

                                                                       💧 【 R a b b i t M Q 教程】第三章—— R a b b i t M Q − 发布确认 color{#FF1493}{【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认} 【 R abbi tMQ 教程】第三章 —— R abbi tMQ − 发布确认

    2024年02月08日
    浏览(86)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包