Rabbitmq消息不丢失

这篇具有很好参考价值的文章主要介绍了Rabbitmq消息不丢失。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:
1,生产者不丢数据
2,MQ服务器不丢数据
3,消费者不丢数据
保证消息不丢失有两种实现方式:
1,开启事务模式
2,消息确认模式
说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

1.消息确认

消息持久化
如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化
Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
message
发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认
有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认
有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?
要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

2.消息确认业务封装

service-mq修改配置
开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:
  rabbitmq:
    host: 192.168.121.140
    port: 5672
    username: admin
    password: admin
    publisher-confirms-type: correlated  #交换机的确认
    publisher-returns: true  #队列的确认
    listener:
      simple:
        acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

搭建rabbit-util模块
由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
搭建方式如:
pom.xml

    <dependencies>
        <!--rabbitmq消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--rabbitmq 协议-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
    </dependencies>

4.2.4 封装发送端消息确认

/**
 * @Description 消息发送确认
 * <p>
 * ConfirmCallback  只确认消息是否正确到达 Exchange 中
 * ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行
 * <p>
 * 1. 如果消息没有到exchange,则confirm回调,ack=false
 * 2. 如果消息到达exchange,则confirm回调,ack=true
 * 3. exchange到queue成功,则不回调return
 * 4. exchange到queue失败,则回调return
 * 
 */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送成功:" + JSON.toJSONString(correlationData));
        } else {
            log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化对象输出
        System.out.println("消息主体: " + new String(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }

 }

封装消息发送

@Service
public class RabbitService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  发送消息
     * @param exchange 交换机
     * @param routingKey 路由键
     * @param message 消息
     */
    public boolean sendMessage(String exchange, String routingKey, Object message) {
 
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }
   
}

2.1 发送确认消息测试

消息发送端

@RestController
@RequestMapping("/mq")
public class MqController {


   @Autowired
   private RabbitService rabbitService;


   /**
    * 消息发送
    */
   //http://localhost:8282/mq/sendConfirm
   @GetMapping("sendConfirm")
   public Result sendConfirm() {
     
      rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");
      return Result.ok();
   }
}

消息接收端

@Component
public class ConfirmReceiver {

@SneakyThrows
@RabbitListener(bindings=@QueueBinding(
        value = @Queue(value = "queue.confirm",autoDelete = "false"),
        exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),
        key = {"routing.confirm"}))
public void process(Message message, Channel channel){
    System.out.println("RabbitListener:"+new String(message.getBody()));

        // false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
}
}

测试:http://localhost:8282/mq/sendConfirm

2.2 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制
模块中添加依赖

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成redis所需common-pool2-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {

    //  消息主体
    private Object message;
    //  交换机
    private String exchange;
    //  路由键
    private String routingKey;
    //  重试次数
    private int retryCount = 0;
    //  消息类型  是否是延迟消息
    private boolean isDelay = false;
    //  延迟时间
    private int delayTime = 10;
}

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){
    //  将发送的消息 赋值到 自定义的实体类
    GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
    //  声明一个correlationId的变量
    String correlationId = UUID.randomUUID().toString().replaceAll("-","");
    gmallCorrelationData.setId(correlationId);
    gmallCorrelationData.setExchange(exchange);
    gmallCorrelationData.setRoutingKey(routingKey);
    gmallCorrelationData.setMessage(msg);

    //  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。
    redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);
    //  调用发送消息方法
    //this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);
    //  默认返回true
    return true;
}

发送失败调用重发方法  MQProducerAckConfig 类中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    //  ack = true 说明消息正确发送到了交换机
    if (ack){
        System.out.println("哥们你来了.");
        log.info("消息发送到了交换机");
    }else {
        //  消息没有到交换机
        log.info("消息没发送到交换机");
        //  调用重试发送方法
        this.retrySendMsg(correlationData);
    }
}

@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {
    System.out.println("消息主体: " + new String(message.getBody()));
    System.out.println("应答码: " + code);
    System.out.println("描述:" + codeText);
    System.out.println("消息使用的交换器 exchange : " + exchange);
    System.out.println("消息使用的路由键 routing : " + routingKey);

    //  获取这个CorrelationData对象的Id  spring_returned_message_correlation
    String correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    //  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据
    String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);
    //  消息没有到队列的时候,则会调用重试发送方法
    GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);
    //  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.
    this.retrySendMsg(gmallCorrelationData);
}

/**
 * 重试发送方法
 * @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData
 */
private void retrySendMsg(CorrelationData correlationData) {
    //  数据类型转换  统一转换为子类处理
    GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;
    //  获取到重试次数 初始值 0
    int retryCount = gmallCorrelationData.getRetryCount();
    //  判断
    if (retryCount>=3){
        //  不需要重试了
        log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));
    } else {
        //  变量更新
        retryCount+=1;
        //  重新赋值重试次数 第一次重试 0->1 1->2 2->3
        gmallCorrelationData.setRetryCount(retryCount);
        System.out.println("重试次数:\t"+retryCount);

        //  更新缓存中的数据
        this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);

        //  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法
            this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
   
    }
}

测试:只需修改(错误信息)
Rabbitmq消息不丢失,# RabbitMQ,rabbitmq,ruby,分布式
Rabbitmq消息不丢失,# RabbitMQ,rabbitmq,ruby,分布式文章来源地址https://www.toymoban.com/news/detail-646294.html

到了这里,关于Rabbitmq消息不丢失的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式消息队列RabbitMQ-Linux下服务搭建,面试完腾讯我才发现这些知识点竟然没掌握全

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 5.修改配置文件 这里面修改{loopback_users, [“guest”]}改为{loopback_users, []} {application, rabbit, %% - - erlang - - [{description, “RabbitMQ”}, {id, “RabbitMQ”}, {vsn, “3.6.5”}, {modules, [‘background_gc’,‘delegate’,‘delegate_sup’,‘dtree’,‘file_han

    2024年04月14日
    浏览(41)
  • RabbitMQ 消息丢失的场景,如何保证消息不丢失?

    第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。 1.针对生

    2024年02月11日
    浏览(39)
  • Rabbitmq消息不丢失

    消息的不丢失,在MQ角度考虑,一般有三种途径: 1,生产者不丢数据 2,MQ服务器不丢数据 3,消费者不丢数据 保证消息不丢失有两种实现方式: 1,开启事务模式 2,消息确认模式 说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采

    2024年02月13日
    浏览(30)
  • RabbitMQ如何避免丢失消息

    消息从生产到消费,要经历三个阶段,分别是生产、队列转发与消费,每个环节都可能丢失消息。 以下以RabbitMQ为例,来说明各个阶段会产生的问题以及解决方式。在说明之前,先回顾一下RabbitMQ的一个基本架构图 1. 网络问题 外界环境问题导致:发生网络丢包、网络故障等造

    2024年02月10日
    浏览(30)
  • RabbitMQ解决消息丢失

    目录 1.开启发布确认模式 1.1单个确认发布 1.2批量确认发布 1.3异步确认发布 1.4处理异步未确认的消息 1.5 三种发布方式对比 1.6发布确认高级  2.消息持久化 2.1队列持久化 2.2消息持久化  3.消费手动确认 单次消息确认 批量消息确认  确认消息失败重新入队         上次我

    2023年04月27日
    浏览(22)
  • RabbitMQ如何保证消息不丢失

    观察整个 RabbitMQ 消息发送过程: 从上述流程我们可以得知:消息从生产者到达消费者,经过两次网络传输,并且在 RabbitMQ 服务器中进行路由。 因此我们能知道整个流程中可能会出现三种消息丢失场景: 生产者发送消息到 RabbitMQ 服务器的过程中出现消息丢失。 可能是网络波

    2024年02月21日
    浏览(29)
  • 如何保证 RabbitMQ 消息不丢失?

      第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。 第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。   方

    2024年02月08日
    浏览(27)
  • RabbitMQ如何保证消息不丢失?

    1、什么 情况会导致消息丢失 ? ​​​​​​          a.发送 时丢失:                    生产者发送的消息未送达exchange                     消息到达exchange 后未到达 queue         b.MQ宕机, queue 将消息丢失         c.consumer接收到消息后未消费就宕机

    2024年02月02日
    浏览(38)
  • RabbitMQ消息丢失的场景,MQ消息丢失解决方案

    第一种 : (生产者) 生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种 : (服务端) RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种 : (消费者) 消费端弄丢了数据。刚消费到,还没处理

    2024年02月08日
    浏览(36)
  • RabbitMQ如何保证消息不丢失呢?

    RabbitMQ 是一个流行的消息队列系统,用于在分布式应用程序之间传递消息。要确保消息不会丢失,可以采取以下一些措施: 持久化消息: RabbitMQ 允许你将消息标记为持久化的。这意味着消息将被写入磁盘,即使 RabbitMQ 服务器崩溃,也能够在恢复后重新发送消息。要使消息持

    2024年02月07日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包