消费者确认机制
一、一次确认一个消息
这里生产者一次性向rabbitmq发送一百条消息
@GetMapping("/affair/affair")
public String affair(){
long begin=System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
boolean b = Boolean.TRUE.equals(template.invoke(operations -> {
template.convertAndSend("testQueue", "发布的消息");
return template.waitForConfirms(1000);
}));
if(b) {
System.out.println("发布成功");
} else {
System.out.println("发布失败");
}
}
long end=System.currentTimeMillis();
return (end-begin)+"ms";
}
然后消费者一条一条的消费,每次消费时间模拟为0.5秒
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void listen(Message msg, String str, Channel channel) throws IOException, InterruptedException {
Thread.sleep(100);
System.out.println("消息头帧设置的内容" + msg.getMessageProperties().getHeaders());
System.out.println("消息体中明文发布的内容" + str);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
下面时消费100条消息,消费每条消息花时为0.5s的趋势图
二、一次确认多个消息
yml文件中的配置其中concurrency和prefetch很重要
server:
port: 8021
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-returns: true #设置publisher-returns消息成功则会向发布者发送成功,失败时不会将消息丢弃,而是返回给发布者,发布者根据需求处理
publisher-confirm-type: correlated #开启发布确认模式,具体有三个值,具体的可以去百度
listener:
simple:
prefetch: 10 # 设置qos最大缓存数,预接受10个消息到缓存中
concurrency: 10 # 处理线程数,可以理解为同一时间处理多少个消息
max-concurrency: 20
发送代码是相同的,以下是调整后的接受代码文章来源:https://www.toymoban.com/news/detail-808593.html
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void listen(Message msg, String str) throws IOException, InterruptedException {
Thread.sleep(500);
System.out.println("消息头帧设置的内容" + msg.getMessageProperties().getHeaders());
System.out.println("消息体中明文发布的内容" + str);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
下面时消费100条消息,消费每条消息花时为0.5s的趋势图,但每次同时有十条消息被消费
文章来源地址https://www.toymoban.com/news/detail-808593.html
到了这里,关于rabbitmq的qos和消费者一次确认多个消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!