RabbitMQ应用问题——消息补偿机制以及代码示例

这篇具有很好参考价值的文章主要介绍了RabbitMQ应用问题——消息补偿机制以及代码示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ应用问题——消息补偿机制以及代码示例

RabbitMQ应用问题

  • 消息可靠性的保障
    • 消息补偿机制

RabbitMQ应用问题——消息补偿机制以及代码示例,rabbitmq,java-rabbitmq,rabbitmq,java

详细说明

这里使用了简单的代码进行演示,订单的消费者没有写,在订单的消费同时,发送一条增加积分消息到积分队列。

详细流程途中都有注明。

为了更加清楚代码这里进行表明功能。

RabbitMQ应用问题——消息补偿机制以及代码示例,rabbitmq,java-rabbitmq,rabbitmq,java

gitee地址文章来源地址https://www.toymoban.com/news/detail-673574.html

1.创建mq-manager父工程

1.1导入依赖
 <packaging>pom</packaging>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.0</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.7</version>
    </dependency>

</dependencies>

2.创建mq-common子模块

2.1导入依赖
<dependencies>
    <!-- mybatis-plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.2</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.2.6</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>
2.2编写实体类
2.2.1Order
package com.qf.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.util.Date;
/*
  { "bizNo": "20200803173145877",
    "status": 1,
    "price": 34.12,
    "goodId": 1002,
    "userId": 100
  }

*/

//订单
@TableName("orders")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    @TableField(value = "biz_no")
    private String bizNo; //业务编号

    @TableField(value = "status")
    private Integer status;

    @TableField(value = "price")
    private BigDecimal price;

    @TableField(value = "create_time")
    private Date createTime;

    @TableField(value = "pay_time")
    private Date payTime;

    @TableField(value = "good_id")
    private Integer goodId;

    @TableField(value = "user_id")
    private Integer userId;

    //exist = false:该属性不使用
    @TableField(value = "num", exist = false)
    private Integer num;
}
2.2.2Integral
package com.qf.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

//积分
@TableName("integral")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Integral{

    @TableId(type = IdType.AUTO)
    private Integer id;
    @TableField("user_id")
    private Integer userId;
    private Long score;
    private String msg;

    @TableField("create_time")
    private Date createTime;
}
2.2.3Msg
package com.qf.entity;

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

//消息
@TableName("message")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Msg {

    @TableId(value = "id")
    private String id;

    @TableField(value = "exchange")
    private String exchange;

    @TableField(value = "routing_key")
    private String routingKey;

    @TableField(value = "content")
    private String content; // 消息的内容

    @TableField
    private Integer status; // 消息的状态

    @TableField(value = "try_count")
    private int tryCount; //尝试次数

    @TableField(value = "create_time")
    private Date createTime;
}
2.3编写公共参数
2.3.1IntegralConstant
package com.qf.contant;

//设置系统中的参数
public class IntegralConstant {

    // 积分系统队列
    public final static String INTEGRAL_QUEUE = "integral_queue";

    // 积分系统交换机
    public final static String INTEGRAL_EXCHANGE = "integral_exchange";

    // 积分系统的 routing-key
    public final static String INTEGRAL_ROUTING_KEY= "integral_routing_key";
}
2.3.2DeadConstant
package com.qf.contant;

//死信
public class DeadConstant {
    //死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    //死信路由键
    public static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
}

3.编写mq-order子模块

3.1导入公共模块
<dependencies>
    <dependency>
        <groupId>com.qf</groupId>
        <artifactId>mq-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>
3.2编写配置文件
server:
  port: 8080

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.129
    port: 5672
    publisher-confirm-type: simple
    publisher-returns: true

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #驼峰形式
    map-underscore-to-camel-case: true
3.3编写启动类
package com.qf;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.qf.mapper")
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class,args);
    }
}
3.4编写OrderController
package com.qf.controller;

import com.qf.entity.Order;
import com.qf.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("insertOrder")
    public String insertOrder(@RequestBody Order order){
        orderService.insertOrder(order);
        return "success";
    }

}
3.5编写OrderService
package com.qf.service;

import com.qf.entity.Order;

public interface OrderService {
    /**
     * 插入订单
     * @param order
     */
    void insertOrder(Order order);
}
3.6编写OrderServiceImpl
package com.qf.service.impl;

import com.alibaba.fastjson.JSON;
import com.qf.contant.IntegralConstant;
import com.qf.entity.Integral;
import com.qf.entity.Msg;
import com.qf.entity.Order;
import com.qf.mapper.MsgMapper;
import com.qf.mapper.OrderMapper;
import com.qf.service.OrderService;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

@Service
@Transactional
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private MsgMapper msgMapper;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void insertOrder(Order order) {
        //插入订单
        orderMapper.insert(order);

        //插入积分,是把积分信息发送到消息队列中
        Integral integral = new Integral();
        integral.setUserId(order.getUserId());//积分对应的用户id就是下订单的用户id
        integral.setScore(10L);
        integral.setMsg("购物积分");
        integral.setCreateTime(new Date());
        //把积分对象,转换为Json类型,发送到消息队列中
        String integralJson = JSON.toJSONString(integral);

        //创建消息对象,如果消息消费成功了,再去删除对应的消息
        Msg msg = new Msg();
        //分布式环境下,id必须是唯一的,解决方案:百度的uid-generator,美团开源项目Leaf
        String uuid = UUID.randomUUID().toString();
        msg.setId(uuid);
        msg.setExchange(IntegralConstant.INTEGRAL_EXCHANGE);//积分对应的交换机
        msg.setRoutingKey(IntegralConstant.INTEGRAL_ROUTING_KEY);//积分对象的路由的key
        msg.setContent(integralJson);//积分的Json对象
        msg.setStatus(-1);//状态
        msg.setTryCount(0);//尝试次数
        msg.setCreateTime(new Date());//时间
        //插入消息
        msgMapper.insert(msg);

        //发送消息,需要把Msg对象的id(就是uuid)传过来,一旦消息消费成功,还要去Msg对应表中把该消息删除
        CorrelationData correlationData = new CorrelationData(uuid);
        System.out.println("uuid:" + uuid);
        System.out.println("correlationData.getId():" + correlationData.getId());
        //发送
        rabbitTemplate.convertAndSend(
                IntegralConstant.INTEGRAL_EXCHANGE,
                IntegralConstant.INTEGRAL_ROUTING_KEY,
                buildMessage(integralJson,uuid),
                correlationData
        );
    }

    //构建消息
    private Message buildMessage(String body,String messageId){
        //获取MessagePropertiesBuilder对象
        MessagePropertiesBuilder messagePropertiesBuilder = MessagePropertiesBuilder.newInstance();
        //获取MessageProperties对象
        MessageProperties messageProperties = messagePropertiesBuilder.build();
        messageProperties.setMessageId(messageId);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化

        Message message = new Message(body.getBytes(),messageProperties);
        System.out.println("message传递的内容:" + new String(message.getBody()));
        System.out.println("message传递的uuid:" + message.getMessageProperties().getMessageId());

        return message;
    }
}
3.7编写OrderMapper
package com.qf.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Order;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderMapper extends BaseMapper<Order> {
}
3.8编写MsgMapper
package com.qf.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Msg;
import org.springframework.stereotype.Repository;

@Repository
public interface MsgMapper extends BaseMapper<Msg> {
}
3.9编写配置类
3.9.1DeadConfig
package com.qf.config;

import com.qf.constant.DeadConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadConfig {

    //创建队列
    @Bean
    public Queue createDeadQueue(){
        return new Queue(DeadConstant.DEAD_LETTER_QUEUE);
    }

    //创建交换机
    @Bean
    public DirectExchange createDeadExchange(){
        //交换机默认持久化
        return new DirectExchange(DeadConstant.DEAD_LETTER_EXCHANGE);
    }

    //绑定:交换机中的消息可以发送到不同的队列
    @Bean
    public Binding bindingDeadQueue(){
        //需要设置routingKey
        return BindingBuilder.bind(createDeadQueue()).to(createDeadExchange())
                .with(DeadConstant.DEAD_LETTER_ROUTING_KEY);//和发送消息时的routingKey一致
    }
}
3.9.2IntegralConfig
package com.qf.config;

import com.qf.constant.DeadConstant;
import com.qf.constant.IntegralConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

//路由模式
@Configuration
public class IntegralConfig {

    //创建队列
    @Bean
    public Queue createIntegralQueue(){

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DeadConstant.DEAD_LETTER_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", DeadConstant.DEAD_LETTER_ROUTING_KEY);

        return new Queue(IntegralConstant.INTEGRAL_QUEUE,true,false,false,arguments);
    }

    //创建交换机
    @Bean
    public DirectExchange createIntegralExchange(){
        //交换机默认持久化
        return new DirectExchange(IntegralConstant.INTEGRAL_EXCHANGE);
    }

    //绑定:交换机中的消息可以发送到不同的队列
    @Bean
    public Binding bindingIntegralQueue(){
        //需要设置routingKey
        return BindingBuilder.bind(createIntegralQueue()).to(createIntegralExchange())
                .with(IntegralConstant.INTEGRAL_ROUTING_KEY);//和发送消息时的routingKey一致
    }

}
3.9.3PublisherConfirmAndReturnConfig
package com.qf.config;

import com.qf.entity.Msg;
import com.qf.mapper.MsgMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;


@Slf4j
@Configuration
public class PublisherConfirmAndReturnConfig implements
        RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MsgMapper msgMapper;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        //判断
        if(ack){
            log.info("已到达broker");
            log.info("correlationData id is {}",correlationData.getId());
            //删除存到数据库中的消息
            //msgMapper.deleteById(correlationData.getId());//通过id删除
            //设置删除条件
            HashMap<String, Object> map = new HashMap<>();
            map.put("id",correlationData.getId());
            map.put("status",-1);
            //多条件删除
            msgMapper.deleteByMap(map);

        }else{
            log.info("没有到达broker,实际上消息已经保存到mysql中,也可以保存到redis中");
        }
    }

    //return机制,该方法比confirm先执行,只要未到达队列的时候才执行
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息未到达队列");
        //如果消息到达队列不执行以下代码

        //消息已经从数据库被删除了
        //考虑人工干预,获取消息信息进行保存
        String exchange = returnedMessage.getExchange();
        String routingKey = returnedMessage.getRoutingKey();
        Message message = returnedMessage.getMessage();

        //创建消息对象,如果消息消费成功了,再去删除对应的消息
        Msg msg = new Msg();

        msg.setId(message.getMessageProperties().getMessageId());
        msg.setExchange(exchange);//积分对应的交换机
        msg.setRoutingKey(routingKey);//积分对象的路由的key
        msg.setContent(new String(message.getBody()));//积分的Json对象
        msg.setStatus(-2);//状态可以设置为和之前不一样
        msg.setTryCount(0);//尝试次数
        msg.setCreateTime(new Date());//时间
        //插入消息
        msgMapper.insert(msg);
        //做进一步处理:给管理员发邮件,发短信....

    }
}

4.创建mq-integral子模块

4.1导入依赖
<dependencies>
    <dependency>
        <groupId>com.qf</groupId>
        <artifactId>mq-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    
    <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.4</version>
        </dependency>
</dependencies>
4.2编写配置文件
server:
  port: 8081

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.134
    port: 5672
    listener:
      simple:
        retry:
          enabled: true #开启消息重试
          max-attempts: 3 #最大重试次数
          initial-interval: 2000ms #每次重试的时间间隔
          multiplier: 2 #每次重试时间乘以当前倍数
        #重试机制必须是自动ack,才能放到死信队列中
        acknowledge-mode: auto

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #驼峰形式
    map-underscore-to-camel-case: true
4.3编写启动类
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.qf.mapper")
public class IntegralApplication {
    public static void main(String[] args) {
        SpringApplication.run(IntegralApplication.class,args);
    }
}
4.4编写IntegralImpl
package com.qf.service.impl;

import com.alibaba.fastjson.JSON;
import com.qf.constant.IntegralConstant;
import com.qf.entity.Integral;
import com.qf.mapper.IntegralMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Service
public class IntegralImpl {

    @Autowired
    public IntegralMapper integralMapper;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

//    //第一种方式    
//    @RabbitListener(
//            bindings = @QueueBinding(
//                    value = @Queue(name = IntegralConstant.INTEGRAL_QUEUE,
//                    durable = "true"
//            ),
//            key = {IntegralConstant.INTEGRAL_ROUTING_KEY},
//            exchange = @Exchange(name = IntegralConstant.INTEGRAL_EXCHANGE, durable = "true")
//    ))
//    public void insertIntegral(Message message){
//        //积分信息
//        String integralJson = new String(message.getBody());
//        System.out.println(integralJson);
//
//        //类型转换
//        Integral integral = JSON.parseObject(integralJson, Integral.class);
//        System.out.println(integral);
//        //插入数据库
//        integralMapper.insert(integral);
//    }

//    //第二种方式  
//    @RabbitListener(queues = IntegralConstant.INTEGRAL_QUEUE)
//    public void insertIntegral(Message message) throws InterruptedException {
//        //积分信息
//        String integralJson = new String(message.getBody());
//        System.out.println(integralJson);
//
//        //运行成功
//        //类型转换
//        Integral integral = JSON.parseObject(integralJson, Integral.class);
//        System.out.println(integral);
//        //插入数据库
//        integralMapper.insert(integral);
//
//        //测试消息重复消费
//        //直到方法执行完毕,才会ack
//        Thread.sleep(500000);
//
//        //测试消息重试
//        //System.out.println("当前系统时间:"+System.currentTimeMillis());
//        //运行失败
//        //throw new RuntimeException("消息消费异常...");
//    }

    
    @RabbitListener(queues = IntegralConstant.INTEGRAL_QUEUE)
    public void receiveIntegralMessage(Message message){
        //获取要被消息的消息id
        String messageId = message.getMessageProperties().getMessageId();
        //判断,如果redis中没有这个消息id的key,则是第一次消费该消息
        if(!stringRedisTemplate.hasKey(messageId)){
            //获取积分信息
            String integralJson = new String(message.getBody());
            //类型转换
            Integral integral = JSON.parseObject(integralJson, Integral.class);
            //插入到数据库
            integralMapper.insert(integral);
            //使用hutool工具类生成随机数
            int randomInt = RandomUtil.randomInt(10, 100);
            //往redis中也存一份
            stringRedisTemplate.opsForValue().setIfAbsent(messageId,
                    String.valueOf(randomInt),600, TimeUnit.SECONDS);

        }

}
4.5编写IntegralMapper
package com.qf.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qf.entity.Integral;
import org.springframework.stereotype.Repository;

@Repository
public interface IntegralMapper extends BaseMapper<Integral> {
}

5.创建mq-compensate子工程

5.1导入依赖
<dependencies>
    <dependency>
        <groupId>com.qf</groupId>
        <artifactId>mq-order</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>
4.2编写配置文件
server:
  port: 8082

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/mq?serverTimezone=Asia/Shanghai&characterEncoding=utf8
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

  rabbitmq:
    username: guest
    password: guest
    host: 192.168.25.134
    port: 5672
    publisher-confirm-type: simple
    publisher-returns: true

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #驼峰形式
    map-underscore-to-camel-case: true
4.3编写启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class CompensateApplication {
    public static void main(String[] args) {
        SpringApplication.run(CompensateApplication.class,args);
    }
}
4.4编写MassageCompensateTask
package com.qf.task;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.qf.contant.IntegralConstant;
import com.qf.entity.Msg;
import com.qf.mapper.MsgMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.util.List;

/**
 *
 * 消息补偿:隔一段时间,去数据库查询未消费掉的消息,再次执行
 */

@Configuration
public class MassageCompensateTask {

    @Autowired
    private MsgMapper msgMapper;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Scheduled(cron = "10 * * * * ?")
    public void compensateTask(){
        //设置查询条件
        QueryWrapper<Msg> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("status","-2");
        queryWrapper.lt("try_count","3");
        //查询消息
        List<Msg> msgList = msgMapper.selectList(queryWrapper);
        //判断
        if(msgList.size() > 0){
            for(Msg msg : msgList){

                System.out.println("数据库中的消息id:" + msg.getId());
                //发送
                rabbitTemplate.convertAndSend(
                        msg.getExchange(),
                        msg.getRoutingKey(),
                        buildMessage(msg.getContent(),msg.getId()),
                        new CorrelationData(msg.getId())
                );

                //设置尝试次数
                msg.setTryCount(msg.getTryCount() + 1);
                //修改数据库中的消息
                msgMapper.updateById(msg);

            }
        }
    }

    //构建消息
    private Message buildMessage(String body, String messageId){
        //获取MessagePropertiesBuilder对象
        MessagePropertiesBuilder messagePropertiesBuilder = MessagePropertiesBuilder.newInstance();
        //获取MessageProperties对象
        MessageProperties messageProperties = messagePropertiesBuilder.build();
        messageProperties.setMessageId(messageId);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化

        Message message = new Message(body.getBytes(),messageProperties);
        System.out.println("message传递的内容:" + new String(message.getBody()));
        System.out.println("message传递的uuid:" + message.getMessageProperties().getMessageId());

        return message;
    }
}

到了这里,关于RabbitMQ应用问题——消息补偿机制以及代码示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(74)
  • rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

    使用RabbitMQ提供的原生客户端API进行交互。这是使用RabbitMQ的基础。 1.1、maven依赖 1.2、基础编程模型 1.首先创建连接,获取Channel 2.声明Exchange-可选 3、声明queue 4、声明Exchange与Queue的绑定关系-可选 总结:说白了,就是声明一个交换机和队列,然后进行绑定,至于Channel和连接

    2024年02月10日
    浏览(44)
  • Rabbitmq消息积压问题如何解决以及如何进行限流

    一、增加处理能力 优化系统架构、增加服务器资源、采用负载均衡等手段,以提高系统的处理能力和并发处理能力。通过增加服务器数量或者优化代码,确保系统能够及时处理所有的消息。 二、异步处理 将消息的处理过程设计为异步执行,即接收到消息立即返回响应,然后

    2024年02月11日
    浏览(26)
  • 简单的RabbitMQ集成Springboot实现订单异步发送功能示例以及RabbitMQ的相关问题

    引入RabbitMQ的依赖,在pom.xml文件中添加以下代码: 在application.properties文件中配置RabbitMQ的相关信息: 创建消息队列,并定义消息接收者: 定义消息发送者: 在需要发送订单信息的地方调用OrderSender的send方法即可: RabbitMQ是一个开源的消息中间件,主要用于实现应用之间的异

    2024年02月09日
    浏览(25)
  • rabbitmq消息可靠性之消息回调机制

    rabbitmq消息可靠性之消息回调机制 rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制 生产者确认机制 消息持久

    2024年02月08日
    浏览(38)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(31)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(32)
  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(31)
  • RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

    代码位置 消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。 为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。 如果RabbitMQ没有收

    2024年02月10日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包