RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:在生产环境中由于一些不明原因,导致RabbitMQ重启的情况下,在RabbitMQ重启期间生产者投递消息失败,生产者发送的消息会丢失,那这时候就需要去想在极端的情况下,RabbitMQ集群不可用的时候,如果去处理投递失败的消息。

1、在config包里新建一个名为ConfirmConfig的类用于编写配置交换机、队列、routingkey的代码

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    //交换机
    public static final String EXCHANGE_NAME = "confirm_exchange";

    //队列
    public static final String QUEUE_NAME = "confirm_queue";

    //routingkey
    public static final String ROUTING_KEY = "confirm";

    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //绑定交换机和队列
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }

}

2、在controller包里新建一个名为ProducerController的类用于编写充当生产者发送消息的代码

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,
                ConfirmConfig.ROUTING_KEY,message);
        log.info("发送消息内容:{}",message);
    }

}

3、在consumer包里新建一个名为Consumer的类用于编写充当消费者消费消息的代码

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
    public void receiveConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到队列的消息为:{}",msg);
    }

}

4、启动项目,在浏览器地址栏调用发送消息的接口,查看生产者是否运行成功并能发送消息http://localhost:8080/confirm/sendMessage/我是消息

例:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

效果图: RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

5、前言里我们说过,怎么在RabbitMQ宕机的情况下,保证生产者发送的消息不丢失呢,这时候就需要用到回调函数了,交换机本身收到消息后会确认消息,如果交换机没有确认或者确认消息失败,都视为发送消息失败,然后触发回调接口,告诉生产者消息发送失败,这样,消息接收成功与否我们都能通过回调方法返回的消息知道了

(1)在config包里新建一个名为MyCallBack的类用于编写交换机的确认回调方法

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.springbootrqbbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * @PostConstruct注解,在对象加载完依赖注入后执行,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行
     */
    @PostConstruct
    public void init() {
        //把当前实现类MyCallBack注入到RabbitTemplate类的ConfirmCallback接口里面
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     * 1、第一个参数:correlationData保存回调消息的ID以及相关信息
     * 2、第二个参数:交换机收到消息就返回true,否则返回false
     * 3、第三参数:原因(返回失败的原因,如果成功返回的是null)
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id =  correlationData != null ? correlationData.getId() : "";
        if(ack) {
            log.info("交换机已经收到id为{}的消息",id);
        }else {
            log.info("交换机还未收到id为{}的消息,原因为{}",id,cause);
        }

    }
}

6、在上述步骤可得知confirm方法有一个类型为CorrelationData的参数correlationData,这个参数实际上是空的,并没有值,需要生产者发送,correlationData参数才会有值(connfirm方法的其余两个参数ack和cause默认有值)所以我们需要修改生产者的代码

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

 代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,
                ConfirmConfig.ROUTING_KEY,message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

7、在配置文件加上以下配置开启交换机确认发布模式

spring.rabbitmq.publisher-confirm-type=correlated

配置文件完整内容如下:

spring.rabbitmq.host=192.168.194.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#none(禁用发布确认模式,默认值)
#correlated(发布消息成功到交换机后会触发回调方法)
#simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果)
spring.rabbitmq.publisher-confirm-type=correlated

效果图:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

8、启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口,消费者成功消费消息

http://localhost:8080/confirm/sendMessage/我是消息

例:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

效果图: 

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

9、把生产者要发送到的交换机改成不存在的,用以模拟交换机出问题的情景

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME + "1",
                ConfirmConfig.ROUTING_KEY,message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

效果图:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

10、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出了交换机接收消息失败的原因

http://localhost:8080/confirm/sendMessage/我是消息

例:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

效果图: 

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

11、把RoutingKey改成不存在的,用以模拟队列出问题的情景

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,
                ConfirmConfig.ROUTING_KEY + "2",message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

效果图:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

12、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出交换机接收消息成功,但消费者没有消费成功的日志输出,因为RoutingKey错了,交换机没有把消息发送到队列里,队列里没消息,自然消费者也就没有消费到消息了,但这个结果不符合我们的预期,因为这次丢失了消息,丢失消息却没有回馈消息丢失,实际上应该调用回调接口反馈消息丢失,所以我们需要继续往下改进代码。

http://localhost:8080/confirm/sendMessage/我是消息

例:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

效果图: 

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

13、给配置文件加上以下配置,用以回退消息

spring.rabbitmq.publisher-returns=true

配置文件完整内容如下:

spring.rabbitmq.host=192.168.194.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#none(禁用发布确认模式,默认值)
#correlated(发布消息成功到交换机后会触发回调方法)
#simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果)
spring.rabbitmq.publisher-confirm-type=correlated
#一旦投递消息失败或者路由失败,是否回退消息给生产者
spring.rabbitmq.publisher-returns=true

14、使用RabbitTemplate的内置接口回退消息

代码如下:

package com.ken.springbootrqbbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    /**
     * @PostConstruct注解,在对象加载完依赖注入后执行,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行
     */
    @PostConstruct
    public void init() {
        //把当前实现类MyCallBack注入到RabbitTemplate类的ConfirmCallback接口里面
        rabbitTemplate.setConfirmCallback(this);
        //把当前实现类MyCallBack注入到RabbitTemplate类的ReturnCallback接口里面
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认回调方法
     * 1、第一个参数:correlationData保存回调消息的ID以及相关信息
     * 2、第二个参数:交换机收到消息就返回true,否则返回false
     * 3、第三参数:原因(返回失败的原因,如果成功返回的是null)
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id =  correlationData != null ? correlationData.getId() : "";
        if(ack) {
            log.info("交换机已经收到id为{}的消息",id);
        }else {
            log.info("交换机还未收到id为{}的消息,原因为{}",id,cause);
        }

    }

    /**
     * 可以在当消息传递过程中不可达目的地时将消息返回给生产者
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息{},被交换机{}退回,退回原因:{},路由routingkey:{}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }

}

15、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机收到消息发不过去队列后把消息回退了,保证了消息不丢失。

http://localhost:8080/confirm/sendMessage/我是消息

例:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq

效果图:

RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理,rabbitmq,rabbitmq,java-rabbitmq文章来源地址https://www.toymoban.com/news/detail-550418.html

到了这里,关于RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【故障公告】被放出的 Bing 爬虫,又被爬宕机的园子

    这些巨头爬虫们现在怎么了?记忆中2022年之前的十几年,园子没有遇到过被巨头爬虫们爬宕机的情况,巨头们都懂得爱护,都懂得控制节奏,都懂得在爬网时控制并发连接数以免给目标网站造成过大压力。 从去年开始,巨头爬虫们开始变了。首先从中文巨头爬虫中的佼佼者

    2023年04月26日
    浏览(31)
  • rabbitmq topic模式设置#通配符情况下 消费者队列未接收消息问题排查解决

    生产者配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.# 消费者代码配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.user 其实以上代码看着没有问题,意思是代码生成一个队列,并把【topic.shcool.user】队列和生产者的【topic_exchange_shcool】exchange绑定,但是生产者发送消息是

    2024年02月11日
    浏览(36)
  • 分布式项目08 redis的扩容,预备redis宕机的哨兵处理 和 最后集大成redis集群

    01.redis扩容 由于redis的容量也是有限的,所以,就会有扩容的操作。也就好像内存中扩容一样。redis有分片这种说法,具体实现操作如下: 第一步:首先在/usr/local/src中去找到redis文件夹。cd /usr/local/src 而后进入redis文件夹。使用的linux命令是 cd redis 。 第二步: 01.使用查看当前

    2024年02月05日
    浏览(28)
  • RabbitMQ系列(7)--RabbitMQ消息应答及消息未应答后重新入队

    概念:消费者消费完一条消息可能需要等待一段时间,但如果这段时间内消费者在未完成消费信息的情况下时就挂掉了,这时候会怎么样?RabbitMQ一旦向消费者传递一条消息,该消息就会被标记为删除,这种情况下消费者挂掉了正在处理的消息就会丢失,为了保证消息在发送

    2024年02月05日
    浏览(25)
  • 【学习日记2023.6.19】 之 RabbitMQ服务异步通信_消息可靠性_死信交换机_惰性队列_MQ集群

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收

    2024年02月11日
    浏览(35)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(38)
  • RabbitMQ系列教程消息中间件技术精讲

    作者:禅与计算机程序设计艺术 消息中间件(Message Queue,MQ)是一种分布式应用间通信的组件。它可以在不同的系统之间传递消息、数据或指令。在现代IT架构中,越来越多的应用需要相互通信,所以出现了消息队列的概念。RabbitMQ是一个开源的AMQP实现,是一个可靠、可扩展

    2024年02月06日
    浏览(34)
  • 【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势

    我们知道 RabbitMq 提供了两种机制,来确保发送端的消息被 brocke 正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势 首先创建一个 SpringBoot 项目,用于后续的演示 springboot 版本为 2.2.1.RELEASE rabbitmq 版本为  3.7.5   依赖配置文件 pom.xml 在 a

    2024年01月18日
    浏览(34)
  • 【腾讯云HAI域探秘】搭建一个永不宕机的本地SD文本生成图像应用--喂饭级高性能应用HAI部署stable diffusion webui 一键文生图

    本次有幸受邀作为新品先锋体验官参加了【 腾讯云HAI域探秘 】活动,沉浸式体验高性能应用服务HAI产品 + Stable Diffusion WebUI部署。 腾讯云高性能应用服务(Hyper Application Inventor, HAI),是一款面向AI、科学计算的GPU应用服务产品,为开发者量身打造的澎湃算力平台。基于腾讯云

    2024年02月06日
    浏览(49)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包