RabbitMQ应用问题——消息补偿机制以及代码示例
RabbitMQ应用问题
- 消息可靠性的保障
- 消息补偿机制
详细说明
这里使用了简单的代码进行演示,订单的消费者没有写,在订单的消费同时,发送一条增加积分消息到积分队列。
详细流程途中都有注明。
为了更加清楚代码这里进行表明功能。
文章来源:https://www.toymoban.com/news/detail-673574.html
gitee地址文章来源地址https://www.toymoban.com/news/detail-673574.html
1.创建mq-manager父工程
1.1导入依赖
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
2.创建mq-common子模块
2.1导入依赖
<dependencies>
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2.2编写实体类
2.2.1Order
package com.qf.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.Date;
/*
{ "bizNo": "20200803173145877",
"status": 1,
"price": 34.12,
"goodId": 1002,
"userId": 100
}
*/
//订单
@TableName("orders")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@TableField(value = "biz_no")
private String bizNo; //业务编号
@TableField(value = "status")
private Integer status;
@TableField(value = "price")
private BigDecimal price;
@TableField(value = "create_time")
private Date createTime;
@TableField(value = "pay_time")
private Date payTime;
@TableField(value = "good_id")
private Integer goodId;
@TableField(value = "user_id")
private Integer userId;
//exist = false:该属性不使用
@TableField(value = "num", exist = false)
private Integer num;
}
2.2.2Integral
package com.qf.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
//积分
@TableName("integral")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Integral{
@TableId(type = IdType.AUTO)
private Integer id;
@TableField("user_id")
private Integer userId;
private Long score;
private String msg;
@TableField("create_time")
private Date createTime;
}
2.2.3Msg
package com.qf.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
//消息
@TableName("message")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Msg {
@TableId(value = "id")
private String id;
@TableField(value = "exchange")
private String exchange;
@TableField(value = "routing_key")
private String routingKey;
@TableField(value = "content")
private String content; // 消息的内容
@TableField
private Integer status; // 消息的状态
@TableField(value = "try_count")
private int tryCount; //尝试次数
@TableField(value = "create_time")
private Date createTime;
}
2.3编写公共参数
2.3.1IntegralConstant
package com.qf.contant;
//设置系统中的参数
public class IntegralConstant {
// 积分系统队列
public final static String INTEGRAL_QUEUE = "integral_queue";
// 积分系统交换机
public final static String INTEGRAL_EXCHANGE = "integral_exchange";
// 积分系统的 routing-key
public final static String INTEGRAL_ROUTING_KEY= "integral_routing_key";
}
2.3.2DeadConstant
package com.qf.contant;
//死信
public class DeadConstant {
//死信交换机
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
//死信路由键
public static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";
//死信队列
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
}
3.编写mq-order子模块
3.1导入公共模块
<dependencies>
<dependency>
<groupId>com.qf</groupId>
<artifactId>mq-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
3.2编写配置文件
server:
port: 8080
spring:
datasource:
username: root
password: root
url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
rabbitmq:
username: guest
password: guest
host: 192.168.25.129
port: 5672
publisher-confirm-type: simple
publisher-returns: true
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#驼峰形式
map-underscore-to-camel-case: true
3.3编写启动类
package com.qf;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.qf.mapper")
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class,args);
}
}
3.4编写OrderController
package com.qf.controller;
import com.qf.entity.Order;
import com.qf.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("insertOrder")
public String insertOrder(@RequestBody Order order){
orderService.insertOrder(order);
return "success";
}
}
3.5编写OrderService
package com.qf.service;
import com.qf.entity.Order;
public interface OrderService {
/**
* 插入订单
* @param order
*/
void insertOrder(Order order);
}
3.6编写OrderServiceImpl
package com.qf.service.impl;
import com.alibaba.fastjson.JSON;
import com.qf.contant.IntegralConstant;
import com.qf.entity.Integral;
import com.qf.entity.Msg;
import com.qf.entity.Order;
import com.qf.mapper.MsgMapper;
import com.qf.mapper.OrderMapper;
import com.qf.service.OrderService;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;
@Service
@Transactional
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private MsgMapper msgMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void insertOrder(Order order) {
//插入订单
orderMapper.insert(order);
//插入积分,是把积分信息发送到消息队列中
Integral integral = new Integral();
integral.setUserId(order.getUserId());//积分对应的用户id就是下订单的用户id
integral.setScore(10L);
integral.setMsg("购物积分");
integral.setCreateTime(new Date());
//把积分对象,转换为Json类型,发送到消息队列中
String integralJson = JSON.toJSONString(integral);
//创建消息对象,如果消息消费成功了,再去删除对应的消息
Msg msg = new Msg();
//分布式环境下,id必须是唯一的,解决方案:百度的uid-generator,美团开源项目Leaf
String uuid = UUID.randomUUID().toString();
msg.setId(uuid);
msg.setExchange(IntegralConstant.INTEGRAL_EXCHANGE);//积分对应的交换机
msg.setRoutingKey(IntegralConstant.INTEGRAL_ROUTING_KEY);//积分对象的路由的key
msg.setContent(integralJson);//积分的Json对象
msg.setStatus(-1);//状态
msg.setTryCount(0);//尝试次数
msg.setCreateTime(new Date());//时间
//插入消息
msgMapper.insert(msg);
//发送消息,需要把Msg对象的id(就是uuid)传过来,一旦消息消费成功,还要去Msg对应表中把该消息删除
CorrelationData correlationData = new CorrelationData(uuid);
System.out.println("uuid:" + uuid);
System.out.println("correlationData.getId():" + correlationData.getId());
//发送
rabbitTemplate.convertAndSend(
IntegralConstant.INTEGRAL_EXCHANGE,
IntegralConstant.INTEGRAL_ROUTING_KEY,
buildMessage(integralJson,uuid),
correlationData
);
}
//构建消息
private Message buildMessage(String body,String messageId){
//获取MessagePropertiesBuilder对象
MessagePropertiesBuilder messagePropertiesBuilder = MessagePropertiesBuilder.newInstance();
//获取MessageProperties对象
MessageProperties messageProperties = messagePropertiesBuilder.build();
messageProperties.setMessageId(messageId);
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
Message message = new Message(body.getBytes(),messageProperties);
System.out.println("message传递的内容:" + new String(message.getBody()));
System.out.println("message传递的uuid:" + message.getMessageProperties().getMessageId());
return message;
}
}
3.7编写OrderMapper
package com.qf.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Order;
import org.springframework.stereotype.Repository;
@Repository
public interface OrderMapper extends BaseMapper<Order> {
}
3.8编写MsgMapper
package com.qf.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Msg;
import org.springframework.stereotype.Repository;
@Repository
public interface MsgMapper extends BaseMapper<Msg> {
}
3.9编写配置类
3.9.1DeadConfig
package com.qf.config;
import com.qf.constant.DeadConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadConfig {
//创建队列
@Bean
public Queue createDeadQueue(){
return new Queue(DeadConstant.DEAD_LETTER_QUEUE);
}
//创建交换机
@Bean
public DirectExchange createDeadExchange(){
//交换机默认持久化
return new DirectExchange(DeadConstant.DEAD_LETTER_EXCHANGE);
}
//绑定:交换机中的消息可以发送到不同的队列
@Bean
public Binding bindingDeadQueue(){
//需要设置routingKey
return BindingBuilder.bind(createDeadQueue()).to(createDeadExchange())
.with(DeadConstant.DEAD_LETTER_ROUTING_KEY);//和发送消息时的routingKey一致
}
}
3.9.2IntegralConfig
package com.qf.config;
import com.qf.constant.DeadConstant;
import com.qf.constant.IntegralConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
//路由模式
@Configuration
public class IntegralConfig {
//创建队列
@Bean
public Queue createIntegralQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DeadConstant.DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", DeadConstant.DEAD_LETTER_ROUTING_KEY);
return new Queue(IntegralConstant.INTEGRAL_QUEUE,true,false,false,arguments);
}
//创建交换机
@Bean
public DirectExchange createIntegralExchange(){
//交换机默认持久化
return new DirectExchange(IntegralConstant.INTEGRAL_EXCHANGE);
}
//绑定:交换机中的消息可以发送到不同的队列
@Bean
public Binding bindingIntegralQueue(){
//需要设置routingKey
return BindingBuilder.bind(createIntegralQueue()).to(createIntegralExchange())
.with(IntegralConstant.INTEGRAL_ROUTING_KEY);//和发送消息时的routingKey一致
}
}
3.9.3PublisherConfirmAndReturnConfig
package com.qf.config;
import com.qf.entity.Msg;
import com.qf.mapper.MsgMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
@Slf4j
@Configuration
public class PublisherConfirmAndReturnConfig implements
RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Resource
private RabbitTemplate rabbitTemplate;
@Autowired
private MsgMapper msgMapper;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
//判断
if(ack){
log.info("已到达broker");
log.info("correlationData id is {}",correlationData.getId());
//删除存到数据库中的消息
//msgMapper.deleteById(correlationData.getId());//通过id删除
//设置删除条件
HashMap<String, Object> map = new HashMap<>();
map.put("id",correlationData.getId());
map.put("status",-1);
//多条件删除
msgMapper.deleteByMap(map);
}else{
log.info("没有到达broker,实际上消息已经保存到mysql中,也可以保存到redis中");
}
}
//return机制,该方法比confirm先执行,只要未到达队列的时候才执行
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息未到达队列");
//如果消息到达队列不执行以下代码
//消息已经从数据库被删除了
//考虑人工干预,获取消息信息进行保存
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
Message message = returnedMessage.getMessage();
//创建消息对象,如果消息消费成功了,再去删除对应的消息
Msg msg = new Msg();
msg.setId(message.getMessageProperties().getMessageId());
msg.setExchange(exchange);//积分对应的交换机
msg.setRoutingKey(routingKey);//积分对象的路由的key
msg.setContent(new String(message.getBody()));//积分的Json对象
msg.setStatus(-2);//状态可以设置为和之前不一样
msg.setTryCount(0);//尝试次数
msg.setCreateTime(new Date());//时间
//插入消息
msgMapper.insert(msg);
//做进一步处理:给管理员发邮件,发短信....
}
}
4.创建mq-integral子模块
4.1导入依赖
<dependencies>
<dependency>
<groupId>com.qf</groupId>
<artifactId>mq-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
</dependencies>
4.2编写配置文件
server:
port: 8081
spring:
datasource:
username: root
password: root
url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
rabbitmq:
username: guest
password: guest
host: 192.168.25.134
port: 5672
listener:
simple:
retry:
enabled: true #开启消息重试
max-attempts: 3 #最大重试次数
initial-interval: 2000ms #每次重试的时间间隔
multiplier: 2 #每次重试时间乘以当前倍数
#重试机制必须是自动ack,才能放到死信队列中
acknowledge-mode: auto
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#驼峰形式
map-underscore-to-camel-case: true
4.3编写启动类
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.qf.mapper")
public class IntegralApplication {
public static void main(String[] args) {
SpringApplication.run(IntegralApplication.class,args);
}
}
4.4编写IntegralImpl
package com.qf.service.impl;
import com.alibaba.fastjson.JSON;
import com.qf.constant.IntegralConstant;
import com.qf.entity.Integral;
import com.qf.mapper.IntegralMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Service
public class IntegralImpl {
@Autowired
public IntegralMapper integralMapper;
@Autowired
private StringRedisTemplate stringRedisTemplate;
// //第一种方式
// @RabbitListener(
// bindings = @QueueBinding(
// value = @Queue(name = IntegralConstant.INTEGRAL_QUEUE,
// durable = "true"
// ),
// key = {IntegralConstant.INTEGRAL_ROUTING_KEY},
// exchange = @Exchange(name = IntegralConstant.INTEGRAL_EXCHANGE, durable = "true")
// ))
// public void insertIntegral(Message message){
// //积分信息
// String integralJson = new String(message.getBody());
// System.out.println(integralJson);
//
// //类型转换
// Integral integral = JSON.parseObject(integralJson, Integral.class);
// System.out.println(integral);
// //插入数据库
// integralMapper.insert(integral);
// }
// //第二种方式
// @RabbitListener(queues = IntegralConstant.INTEGRAL_QUEUE)
// public void insertIntegral(Message message) throws InterruptedException {
// //积分信息
// String integralJson = new String(message.getBody());
// System.out.println(integralJson);
//
// //运行成功
// //类型转换
// Integral integral = JSON.parseObject(integralJson, Integral.class);
// System.out.println(integral);
// //插入数据库
// integralMapper.insert(integral);
//
// //测试消息重复消费
// //直到方法执行完毕,才会ack
// Thread.sleep(500000);
//
// //测试消息重试
// //System.out.println("当前系统时间:"+System.currentTimeMillis());
// //运行失败
// //throw new RuntimeException("消息消费异常...");
// }
@RabbitListener(queues = IntegralConstant.INTEGRAL_QUEUE)
public void receiveIntegralMessage(Message message){
//获取要被消息的消息id
String messageId = message.getMessageProperties().getMessageId();
//判断,如果redis中没有这个消息id的key,则是第一次消费该消息
if(!stringRedisTemplate.hasKey(messageId)){
//获取积分信息
String integralJson = new String(message.getBody());
//类型转换
Integral integral = JSON.parseObject(integralJson, Integral.class);
//插入到数据库
integralMapper.insert(integral);
//使用hutool工具类生成随机数
int randomInt = RandomUtil.randomInt(10, 100);
//往redis中也存一份
stringRedisTemplate.opsForValue().setIfAbsent(messageId,
String.valueOf(randomInt),600, TimeUnit.SECONDS);
}
}
4.5编写IntegralMapper
package com.qf.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Integral;
import org.springframework.stereotype.Repository;
@Repository
public interface IntegralMapper extends BaseMapper<Integral> {
}
5.创建mq-compensate子工程
5.1导入依赖
<dependencies>
<dependency>
<groupId>com.qf</groupId>
<artifactId>mq-order</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
4.2编写配置文件
server:
port: 8082
spring:
datasource:
username: root
password: root
url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
rabbitmq:
username: guest
password: guest
host: 192.168.25.134
port: 5672
publisher-confirm-type: simple
publisher-returns: true
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#驼峰形式
map-underscore-to-camel-case: true
4.3编写启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class CompensateApplication {
public static void main(String[] args) {
SpringApplication.run(CompensateApplication.class,args);
}
}
4.4编写MassageCompensateTask
package com.qf.task;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.qf.contant.IntegralConstant;
import com.qf.entity.Msg;
import com.qf.mapper.MsgMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.List;
/**
*
* 消息补偿:隔一段时间,去数据库查询未消费掉的消息,再次执行
*/
@Configuration
public class MassageCompensateTask {
@Autowired
private MsgMapper msgMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "10 * * * * ?")
public void compensateTask(){
//设置查询条件
QueryWrapper<Msg> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("status","-2");
queryWrapper.lt("try_count","3");
//查询消息
List<Msg> msgList = msgMapper.selectList(queryWrapper);
//判断
if(msgList.size() > 0){
for(Msg msg : msgList){
System.out.println("数据库中的消息id:" + msg.getId());
//发送
rabbitTemplate.convertAndSend(
msg.getExchange(),
msg.getRoutingKey(),
buildMessage(msg.getContent(),msg.getId()),
new CorrelationData(msg.getId())
);
//设置尝试次数
msg.setTryCount(msg.getTryCount() + 1);
//修改数据库中的消息
msgMapper.updateById(msg);
}
}
}
//构建消息
private Message buildMessage(String body, String messageId){
//获取MessagePropertiesBuilder对象
MessagePropertiesBuilder messagePropertiesBuilder = MessagePropertiesBuilder.newInstance();
//获取MessageProperties对象
MessageProperties messageProperties = messagePropertiesBuilder.build();
messageProperties.setMessageId(messageId);
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
Message message = new Message(body.getBytes(),messageProperties);
System.out.println("message传递的内容:" + new String(message.getBody()));
System.out.println("message传递的uuid:" + message.getMessageProperties().getMessageId());
return message;
}
}
到了这里,关于RabbitMQ应用问题——消息补偿机制以及代码示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!