rabbitmq消息异常处理

这篇具有很好参考价值的文章主要介绍了rabbitmq消息异常处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

在使用rabbitmq时,会因为各种原因(网络波动,系统宕机,程序异常等)导致消息发送失败。rabbitmq也提供了相应的处理机制。


提示:以下是本篇文章正文内容,下面案例可供参考

一、rabbitmq消息发送失败处理机制

生产法发送失败
配置回调器。
yml配置开启确认和返回机制
confirm:发送给exchange时的回调,不管是否成功发送给队列。
return:消息没有发送给exchange时的回调。

#成功发送到exchange时的回调
spring.rabbitmq.publisher-confirm-type=correlated
#exchange未发送到队列时回调
spring.rabbitmq.publisher-returns=true

回调函数配置方式

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData 消息相关数据
     * @param ack             交换机是否收到消息
     * @param cause           未收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
        //重发处理,可以入库,死信队列处理....
            log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
        }
    }

    //当消息无法路由的时候触发回调方法
    @Override
    public void returnedMessage(ReturnedMessage returned) {
         //重发处理,可以入库,死信队列处理....
        log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",
                new String(returned.getMessage().getBody()), returned.getExchange(),
                returned.getReplyText(), returned.getRoutingKey(),
                returned.getReplyCode());

    }
}

rabbitmqTemplate在启动时注入:

@Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitTemplate.ConfirmCallback confirmCallback;
    @Autowired
    private RabbitTemplate.ReturnsCallback returnsCallback;
    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnsCallback(returnsCallback);
    }

消费者消费失败后处理
通过自动ack+retry配置+私信队列方式实现

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
#        acknowledge-mode: manual   # 配置该消费者的ack方式为手动
        acknowledge-mode: auto   # 配置该消费者的ack方式为自动
        default-requeue-rejected: false
        #设置消费失败后重发
        retry:
          #重发次数
          max-attempts: 3
          #开启重发
          enabled: true
          # 重试间隔(ms)
          initial-interval: 5000

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME1, durable = “true”, autoDelete = “false”,
arguments = {@Argument(name = “x-dead-letter-exchange”, value = “dead-exchange”),
@Argument(name = “x-dead-letter-routing-key”, value = “dead-routing-key”),
@Argument(name = “x-message-ttl”, value = “1000”,type = “java.lang.Long”)
}),
exchange = @Exchange(value = “first_exchange”, type = ExchangeTypes.DIRECT),
key = “queue_one_key1”))
public void handleMessage1(Message message, Channel channel) throws IOException {
log.info(“OrderConsumer handleMessage {} , error:”, message);
//模拟消费异常,自动进入私信队列
throw new RuntimeException(“抛出异常,模拟消费失败,触发spring-retry”);
}

/**
 * 死信队列消费者
 *
 * @param data
 * @param channel
 * @throws Exception
 */
@RabbitListener(queues = "dead-queue")
public void consumeDL(String data, Channel channel) throws Exception {
    //处理消费失败的消息
    log.info(">>>> 死信队列消费 tag = {},消息内容 : {}", data);

// channel.basicNack(tag, false, false);
}

通过手动ack+私信队列实现(不要配置retry!!!)

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(String data, Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}", msg);
//        throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry");
        //模拟消费失败,重发n次后仍然失败。调用basicNack 抛给私信队列处理
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
 //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }

总结

rabbitmq消息失败处理需要谨慎对待,因为容易产生资源消耗殆尽的问题!!!文章来源地址https://www.toymoban.com/news/detail-406881.html

到了这里,关于rabbitmq消息异常处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ怎么处理消息事务

    在 RabbitMQ 中,可以通过以下两种方式实现消息事务: 发送方确认(Publisher Confirms) :这是 RabbitMQ 提供的一种轻量级事务机制。在发送消息之前,发送方可以要求 RabbitMQ 确认消息是否成功投递到交换机(Exchange)中。如果确认失败,发送方可以选择重试或者处理发送失败的情

    2024年02月07日
    浏览(32)
  • RabbitMQ消息堆积方案处理

    在消息队列中,消息堆积是生产环境中的需要考虑的问题,一旦消息产生积压,来不及消费,可能会导致MQ服务器宕机,而解决消息积压有这样一些方案解决: 1.增加消费者数量 可以根据业务情况适当添加多台服务器部署消费者服务实例,消费者数量增加,可以有效提高消息

    2024年02月11日
    浏览(46)
  • [ RabbitMQ 消息队列来处理高并发场景 ]

    目录 首先,需要创建一个 RabbitMQ 的连接和消息通道。 然后,需要创建一个生产者来发送消息到消息队列。 最后,需要创建一个消费者来消费消息队列中的消息。 RabbitMQ 消息队列可以提高代码执行性能,主要体现在以下几个方面:  RabbitMQ 实现保持消息一致性的demo 在上面的

    2024年02月16日
    浏览(39)
  • Android应用集成RabbitMQ消息处理指南

    RabbitMQ官网直通车 — ✈✈✈✈✈✈        最近工作繁忙,好久没有更新博文了。        对于互联网饱和的今天, 如何做到不同系统之间传递信息与通信? 在实际项目中,多个端例如:ios、android、pc、小程序采用从RabbitMQ上获取实时包消息,然后根据此实时包消息来

    2024年02月06日
    浏览(39)
  • 优雅地处理RabbitMQ中的消息丢失

    目录 一、异常处理 二、消息重试机制 三、错误日志记录 四、死信队列 五、监控与告警 优雅地处理RabbitMQ中的消息丢失对于构建可靠的消息系统至关重要。下面将介绍一些优雅处理消息丢失的方案,包括异常处理、重试机制、错误日志记录、死信队列和监控告警等。 一、异

    2024年02月13日
    浏览(26)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(39)
  • 学会RabbitMQ的延迟队列,提高消息处理效率

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 真的好用吗?鲜有人提的 RabbitMQ-RPC模式 前面

    2024年02月14日
    浏览(52)
  • Python 和 RabbitMQ 进行消息传递和处理

    RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准。它的官方客户端提供了多种编程语言的接口,包括 Python、Java 和 Ruby 等。它支持消息的持久化、多种交换机类型、消息通知机制、灵活的路由和安全机制等。 RabbitMQ 是由三部分组成的:生产者、代

    2024年02月15日
    浏览(38)
  • adb远程连接手机,提示异常拒绝处理办法

    解决办法: 1、手机依次打开开发者人员选项、USB调试、仅充电模式下允许ADB调试,然后用USB线把手机和电脑连起来 2、进入adb命令行,输入adb devices 3、输入adb tcpip 5555(这个是从usb模式切换到无线连接,后面的5555为端口) 4、再次输入adb命令进行连接 adb connect 192.168.232.89:5

    2024年02月04日
    浏览(27)
  • RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理

    前言:在生产环境中由于一些不明原因,导致RabbitMQ重启的情况下,在RabbitMQ重启期间生产者投递消息失败,生产者发送的消息会丢失,那这时候就需要去想在极端的情况下,RabbitMQ集群不可用的时候,如果去处理投递失败的消息。 1、在config包里新建一个名为ConfirmConfig的类用

    2024年02月15日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包