RabbitMQ手动签收消息
这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。文章来源地址https://www.toymoban.com/news/detail-666590.html
1、pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/>
</parent>
<groupId>com.example.demo</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-demo</name>
<description>rabbitmq-demno</description>
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、配置文件
server:
port: 9090
spring:
application:
name: rabbit-confirm
rabbitmq:
template:
# 使用return-callback时必须设置mandatory为true
mandatory: true
# 消息发送到交换机确认机制,是否确认回调
publisher-confirm-type: correlated
# 消息发送到交换机确认机制,是否返回回调
publisher-returns: true
listener:
simple:
# 并发消费者初始化值
concurrency: 5
# 最大值
max-concurrency: 10
# 每个消费者每次监听时可拉取处理的消息数量
prefetch: 20
# 确认模式设置为手动签收
acknowledge-mode: manual
username: zsx242030
password: zsx242030
virtual-host: /
3、定义配置类
package com.example.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfiguration {
/**
* 声明confirm.message队列
*/
@Bean
public Queue confirmQueue() {
return new Queue("confirm.message");
}
/**
* 声明一个名为exchange-2的交换机
*/
@Bean
public TopicExchange exchange2() {
return new TopicExchange("exchange-2");
}
/**
* 将confirm.message的队列绑定到exchange-2交换机
*/
@Bean
public Binding bindMessage1() {
return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");
}
}
4、定义生产者
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.sql.Timestamp;
import java.time.LocalDateTime;
@Component
@Slf4j
public class ConfirmProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败:correlationData: {},cause: {}", correlationData, cause);
} else {
log.info("消息发送成功:correlationData: {},ack: {}", correlationData, ack);
}
};
private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->
log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);
/**
* 发送消息
*
* @param message 消息内容
*/
public void send(String message) {
// 构建回调返回的数据
CorrelationData correlationData = new CorrelationData();
Timestamp time = Timestamp.valueOf(LocalDateTime.now());
correlationData.setId(time + "");
Message message1 = MessageBuilder.withBody(message.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
// 将CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,然后人工处理
.setCorrelationId(correlationData.getId())
.build();
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);
}
}
5、定义消费者
package com.example.demo.config;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class ConfirmConsumer {
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true"),
exchange = @Exchange(value = "exchange-2",type = "topic"),
key = "confirm.message"))
public void receive(String message, Message message1, Channel channel) throws IOException {
log.info("消费者收到消息:{}", message);
long deliverTag = message1.getMessageProperties().getDeliveryTag();
//第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型
//为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
channel.basicAck(deliverTag, false);
}
}
6、创建controller调用
package com.example.demo.controller;
import com.example.demo.config.ConfirmProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class ConfirmController {
@Resource
private ConfirmProducer confirmProducer;
@GetMapping("/confirm-message")
public void confirmMessage() {
confirmProducer.send("hello confirm message");
}
}
7、启动类
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitDemoApplication.class, args);
}
}
8、测试
http://localhost:9090/confirm-message
2022-07-05 18:20:43.043 INFO 4492 --- [nectionFactory1] com.example.demo.config.ConfirmProducer : 消息发送成功:correlationData: CorrelationData [id=2022-07-05 18:20:43.025],ack: true
2022-07-05 18:20:43.046 INFO 4492 --- [ntContainer#0-5] com.example.demo.config.ConfirmConsumer : 消费者收到消息:hello confirm message
文章来源:https://www.toymoban.com/news/detail-666590.html
到了这里,关于RabbitMQ手动签收消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!