目录
开始语
📝简述
🗒️模式NONE
application配置
生产者
消费者
结果验证
🗒️模式AUTO
application配置
生产者
消费者
结果验证
🗒️模式ACK(重点)
application配置
生产者
消费者
结果验证
🗒️生产者确认机制
yml添加配置
修改生产者代码
结果验证
结束语
开始语
一位普通的程序员,慢慢在努力变强!
在此文章之前,想学习前面部分的请看下列列表
RabbitMQ部署方式(第一节)👈
SpringBoot集成RabbitMQ(第二节)👈
📝简述
ACK模式代表的是mq的确认机制,简单来讲就是【生产者】在发送消息的时候,发送成功mq有一个消息收到confirm回调机制,发送失败有一个return回调机制, 【消费者】在接收消息后,在执行消费完消息需要有一个确认机制,要告诉mq,这个消息我消费成功了,请将队列中的消息删除,如果是失败了,你也进行确认、或者拒绝要告诉mq,不然消息会一直存在于队列中
在RabbitMQ消费者中一共有三种模式:
NODE:对于消息的成功和失败都不管,MQ队列中都会将消息删除。(不安全)
AUTO:自动确认模式,对于消息消费成功,MQ队列中的消息将会自动删除,消费失败则会一直对消息进行消费,有没有解决方案,当然是有的,文章中会注明(不稳定,如果消费者不能保证百分百消息成功,auto模式还是不建议使用)
MANUAL:此模式就是对AUTO模式下新增了一个确认机制,消费者对消息的消费成功和失败都需要给出一个消费确认的标识和动作!
开启生产者确认机制:
# 消息发送交换机,开启确认回调模式
publisher-confirm-type: correlated
# 消息发送交换机,开启确认机制,并且返回回调
publisher-returns: true
🗒️模式NONE
application配置
spring:
application:
name: rabbitmq-deadLetter
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 20000
listener:
simple:
acknowledge-mode: none
生产者
@Test
public void workTest() throws InterruptedException {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("work", "[workTest] send 消息发送" + i);
}
Thread.sleep(10000);
System.out.println("模式:"+rabbitProperties.getListener().getSimple().getAcknowledgeMode());
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消费者监听 (第二种模型:工作模式)
*
* @author 猿仁
* @data 2023-01-31 09:38
*/
@Component
@Slf4j
public class WorkCustomer {
/**
* 消费者1
*
* @param data Body响应内容
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue("work"))
public void work1(String data) {
log.info("[work1]消费者消费成功success:{}",data);
}
/**
* 消费者2
*
* @param data Body响应内容
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue("work"))
public void work2(String data) {
// log.info("[work2]消费者消费成功success:{}",data);
try {
int a = 0 / 0;
}catch (Exception e){
log.error("[work2]消费者消费失败fail:{}",data);
throw e;
}
}
}
结果验证
结果解析:
从上面途中可以看出,此模式针对成功和失败的消息都是直接成功,只要消费者接受到消息(不管消费者是否有异常),都当作是消费成功处理。
优点L:此模式很难被阻塞,消费能力不足,多开几个消费来消费即可。
缺点:失败的消息被丢弃了,在现实开发中不允许丢弃消息的(比如:发货队列,某个账号在通知进销存系统进行发货时消息发送了,但是处理异常直接丢失了,此时用户不知道自己的货还没有发出,没有没有短信和物流,货未发就没有接下来的一系列消息)
🗒️模式AUTO
application配置
spring:
application:
name: rabbitmq-deadLetter
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 20000
listener:
simple:
acknowledge-mode: auto # 此处开启了,没有设置死信,过期时间,最好设置一下异常不会回归队列的配置,不然会出现一直重新消费的问题。配置如下:
default-requeue-rejected: false # 是否将失败消息回归队列
生产者
/**
* auto模式
*/
@Test
public void autoTest() throws InterruptedException {
// 开启ack模式 完全消费,队列中无消息
rabbitTemplate.convertAndSend("auto_no_err", "测试auto消费者模式");
// 未开启ack模式 结果是队列中还存在一条等待被消费的消息
rabbitTemplate.convertAndSend("auto_err", "测试auto消费者模式");
Thread.sleep(3000);
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* none消费者模式
*
* @author tianyu.Ge
* @date 2023/2/6 12:55
*/
@Component
@Slf4j
public class AutoCustomer {
/**
* @param data Body响应内容
* @param headers 请求头
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "auto_no_err", durable = "true", autoDelete = "false"))
public void autoNoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) {
System.out.println("获取到了通道[autoNoErr]的数据!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
}
/**
* @param data Body响应内容
* @param headers 请求头
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "auto_err", durable = "true", autoDelete = "false"))
public void autoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) {
int a = 0 / 0;
System.out.println("获取到了通道[autoErr]的数据!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
}
}
结果验证
结果分析:
生产者发送了两条消息,一条正常,一条异常,那么在开启了default-requeue-rejected: false这个回归队列配置,消息只要是消费者接收了,就当做消费成功,不关心你消费者是否在消费的途中出现异常,队列都将会删除队列中对应的消息。 如果没有配置default-requeue-rejected那么消费者出现异常,消息会重回队列,然后由消费者重新进行消费,导致一直重复消费!
🗒️模式ACK(重点)
application配置
spring:
application:
name: rabbitmq-ack
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 2000000
listener:
direct:
# 采用手动应答
acknowledge-mode: manual
simple:
# 指定最大的消费者数量
max-concurrency: 50
# 指定最小的消费者数量
concurrency: 1
# 采用手动应答
acknowledge-mode: manual
retry:
# 是否开启重试机制
enabled: true
# 默认是3,是一共三次,而不是重试三次,三次包含了第一执行,所以只重试了两次
max-attempts: 3
# 重试间隔时间。毫秒
initial-interval: 2000
default-requeue-rejected: false
生产者
/**
* ack模式
*/
@Test
public void ackTest() throws InterruptedException {
// 开启ack模式 完全消费,队列中无消息
rabbitTemplate.convertAndSend("ack_no_err", "测试ack消费者模式");
// 未开启ack模式 结果是队列中还存在一条等待被消费的消息
rabbitTemplate.convertAndSend("ack_err", "测试ack消费者模式");
Thread.sleep(6000);
System.out.println("一共执行"+ AckCustomer.count +"次!");
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 手动确认模式 消费者模式
*
* @author tianyu.Ge
* @date 2023/2/6 12:55
*/
@Component
@Slf4j
public class AckCustomer {
// 重试的次数
public static int count = 0;
/**
* @param data Body响应内容
* @param headers 请求头
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "ack_no_err", durable = "true", autoDelete = "false"))
public void ackNoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) throws IOException {
System.out.println("获取到了通道[ackNoErr]的数据!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* @param data Body响应内容
* @param headers 请求头
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "ack_err", durable = "true", autoDelete = "false"))
public void ackErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) throws IOException {
try {
++count;
int a = 0 / 0;
/**
* 参数1:消息标签
* 参数2:是否批量确认,属于一个队列中的消息,全部确认,false:只确认当前消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
log.error("[ackErr]消费者出现异常:{}",e.getMessage());
/**
* 【否认策略】
*
* 参数1:消息标签
* 参数2:是否批量处理 true:批量 (通道中A\B\C\D接收到第一个那么后面的不管成没成功都会被应答,不安全,只有在确保通道中的消息百分百消费成功时才可使用),false:只确认当前消息
* 参数3:被拒绝的消息是否回归队列 true:回归,false:丢弃 【注意】:如果只有一个消费者的话,true将导致无限循坏, 应该改为false:并且通知mq丢弃或者不处理
*/
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
if(count == 3){
// 当执行最后一次的时候,失败了,那么直接丢弃,从队列中删除
log.info("[ackErr]一共执行{}次,还是失败,开启确认失败!",count);
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
throw e;
}
System.out.println("获取到了通道[ackErr]的数据!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
}
}
结果验证
结果分析:
开启ack模式,那么不管消费者是否执行成功或者失败,都需要给予一个消息的确认,成功就确认成功,失败就确认失败,如果没有确认,那么消息一直会存在队列,并且没有确认ack的消息会再unacked中显示数量。
🗒️生产者确认机制
yml添加配置
spring: application: name: rabbitmq-ack rabbitmq: # 消息发送交换机,开启确认回调模式 publisher-confirm-type: correlated # 消息发送交换机,开启确认机制,并且返回回调 publisher-returns: true # template: # # 指定消息在没有被队列接收时是否强行退回还是直接丢弃:ReturnCallback.returnedMessage消息未送达回调(true) # mandatory: true
修改生产者代码
/**
* ack模式之生产者确认机制
*/
@Test
public void ackPublisherTest() throws InterruptedException {
// 生产者消息确认机制开启
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
if (ack) {
System.out.println("ConfirmCallback:correlationData: " + correlationData);
System.out.println("ConfirmCallback:correlationData.body: " + new String(correlationData.getReturnedMessage().getBody()));
System.out.println("ConfirmCallback:ack: " + ack);
System.out.println("ConfirmCallback:cause: " + cause);
}else {
System.out.println("没有ack,又是怎样的,猿友们有空可以研究研究");
}
});
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
System.out.println("ReturnCallback: " + message);
System.out.println("ReturnCallback: " + replyCode);
System.out.println("ReturnCallback: " + replyText);
System.out.println("ReturnCallback: " + exchange);
System.out.println("ReturnCallback: " + routingKey);
});
rabbitTemplate.setMandatory(true);
// 开启ack模式 完全消费,队列中无消息
CorrelationData correlationData = new CorrelationData();
correlationData.setReturnedMessage(new Message("测试ack消费者模式".getBytes(), new MessageProperties()));
correlationData.setId("A");
rabbitTemplate.convertAndSend("ack_no_err", "测试ack消费者模式", (Message message) -> {
// 可以配置一些request请求参数
// message.getMessageProperties().setHeader("token", "123-213-we-123-sd-ad2-");
//message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}, correlationData);
// 未开启ack模式 结果是队列中还存在一条等待被消费的消息
CorrelationData correlationData1 = new CorrelationData();
correlationData1.setReturnedMessage(new Message("测试ack消费者模式".getBytes(), new MessageProperties()));
correlationData1.setId("B");
rabbitTemplate.convertAndSend("ack_err", "测试ack消费者模式", (Message message) -> {
return message;
}, correlationData1);
// 没有的队列名称 这条将会被ReturnCallback监听到,因为没有队列ack_err_123,消息不可达
rabbitTemplate.convertAndSend("ack_err_123", "测试ack消费者模式");
Thread.sleep(6000);
System.out.println("一共执行" + AckCustomer.count + "次!");
}
结果验证
结果分析:
开启消息回调,那么生产者在发送消息的时候就可以捕捉消息是否发送成功,发送成功会进入ConfirmCallback回调代码块,消息发送失败会进入 ReturnCallback代码块。到这里就完成了消息的确认机制,保证了消息可靠性!
结束语
温馨提示:如有问题,可在下方留言,作者看到了会第一时间回复!文章来源:https://www.toymoban.com/news/detail-765073.html
本章节完成了,各位正在努力的程序员们,如果你们觉得本文章对您有用的话,你学到了一些东西,希望猿友们点个赞+关注,支持一下猿仁!
持续更新中…文章来源地址https://www.toymoban.com/news/detail-765073.html
到了这里,关于SpringBoot集成RabbitMQ之ACK确认机制(第三节)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!