RabbitMQ手动ACK与死信队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ手动ACK与死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。

默认情况下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。

1、手动应答

  1. Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
  2. Channel.basicNack (用于否定确认)
  3. Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

消费者端的配置,相关属性值改为自己的:

server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=lonewalker
#密码
spring.rabbitmq.password=XX
#配置虚拟机
spring.rabbitmq.virtual-host=demo
#设置消费端手动 ack   none不确认  auto自动确认  manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

修改消费代码:请勿复制使用,会卡死

package com.example.consumer.service;

import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @description:
 * @author: LoneWalker
 * @create: 2022-04-04
 **/
@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="publisher.addUser")
    public void addUser(String userStr,Channel channel,Message message){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("我一直在重试");
            int a = 1/0;
            User user = JSONObject.parseObject(userStr,User.class);
            log.info(user.toString());
            //手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
            try {
                channel.basicNack(deliveryTag,false,true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

先启动发布者发送消息,查看控制台:有一条消息待消费·

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

启动消费端,因为代码中有除0,所以会报错,这里就会出现一条unacked消息:

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

因为设置的是将消息重新请求,所以它会陷入死循环

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

防止出现这种情况,可以将basicNack最后一个参数改为false,让消息进去死信队列

2、什么是死信队列

说简单点就是备胎队列,而死信的来源有以下几种:

  1. 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

3、配置死信队列

一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

从控制台将之前的交换机都删除,然后修改代码。

首先看一下发布者的配置代码:

package com.example.publisher.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author LoneWalker
 * @date 2023/4/8
 * @description
 */
@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        //设置给rabbitTemplate
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /************ 正常配置 ******************/
    /**
     * 正常交换机,开启持久化
     */
    @Bean
    DirectExchange normalExchange() {
        return new DirectExchange("normalExchange", true, false);
    }

    @Bean
    public Queue normalQueue() {
        // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
        // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        Map<String, Object> args = deadQueueArgs();
        // 队列设置最大长度
        args.put("x-max-length", 5);
        return new Queue("normalQueue", true, false, false, args);
    }

    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = deadQueueArgs();
        // 队列设置消息过期时间 60 秒
        args.put("x-message-ttl", 60 * 1000);
        return new Queue("ttlQueue", true, false, false, args);
    }

    @Bean
    Binding normalRouteBinding() {
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with("normalRouting");
    }

    @Bean
    Binding ttlRouteBinding() {
        return BindingBuilder.bind(ttlQueue())
                .to(normalExchange())
                .with("ttlRouting");
    }


    /**************** 死信配置 *****************/
    /**
     * 死信交换机
     */
    @Bean
    DirectExchange deadExchange() {
        return new DirectExchange("deadExchange", true, false);
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue("deadQueue", true, false, false);
    }

    @Bean
    Binding deadRouteBinding() {
        return BindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with("deadRouting");
    }

    /**
     * 转发到 死信队列,配置参数
     */
    private Map<String, Object> deadQueueArgs() {
        Map<String, Object> map = new HashMap<>();
        // 绑定该队列到死信交换机
        map.put("x-dead-letter-exchange", "deadExchange");
        map.put("x-dead-letter-routing-key", "deadRouting");
        return map;
    }


    /**
     * 消息成功到达交换机会触发
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
        }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
        }
    }

    /**
     * 消息未成功到达队列会触发
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
}

properties

server.port=8081
#rabbitmq服务ip
spring.rabbitmq.host=localhost
#rabbitmq端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=用户名改为自己的
#密码
spring.rabbitmq.password=密码改为自己的
#虚拟机
spring.rabbitmq.virtual-host=demo

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

发送消息:

@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void addUser(User user) {

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
    }
}

4、模拟场景

4.1消息处理异常

文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:

package com.example.consumer.service;

import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @description:
 * @author: LoneWalker
 * @create: 2022-04-04
 **/
@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="normalQueue")
    public void addUser(String userStr,Channel channel,Message message){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            int a = 1/0;
            User user = JSONObject.parseObject(userStr,User.class);
            log.info(user.toString());
            //手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
            try {
                channel.basicNack(deliveryTag,false,false);
            } catch (IOException ex) {
                throw new RuntimeException("消息处理失败");
            }
        }
    }
}

注意basicNack的第三个参数,设置为false后就不会重新请求。

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

4.2队列达到最大长度

配置上面的代码已经有过了:

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

4.3消息TTL过期

过期时间TTL表示可以对消息设置预期的时间,超过这个时间就删除或者放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

rabbitmq设置手动ack,RabbitMQ单排日记,java-rabbitmq,rabbitmq,java,原力计划

死信队列中的消息处理和正常的队列没什么区别,就不赘述了。文章来源地址https://www.toymoban.com/news/detail-521461.html

到了这里,关于RabbitMQ手动ACK与死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ初级篇:生产者与消费者关系、消息确认机制(ACK)、交换器与队列进行消息路由和存储

    在RabbitMQ中,生产者(Producer) 负责发送消息 ,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则 负责处理接收到的这些消息 。在RabbitMQ中,生产者和消费者之间使用 交换器(Exchange)和队列(Queue)进行消息路由和存储 。生产者将消息发送到

    2024年02月01日
    浏览(34)
  • 微服务: 05-rabbitmq设置重试次数并设置死信队列

    目录 1. 上文传送门:  2. 前言简介:  2.1 问: 消费端重复循环异常如何解决? 2.2 为什么要使用死信队列 2.3 案例思路 - ps: 以下案例经过测试(思路一/二实现原理一样) - 2.3.1 思路一  - 2.3.2 思路二 3. 案例代码 3.1 简单介绍案例 3.2 声明交换机 队列 以及绑定路由键 3.3 修改配置文件

    2024年02月17日
    浏览(31)
  • RabbitMQ的ack和nack机制

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 当生产者的发送消息到exchange,并路由到对应的队列后,MQ主动push消息到channel,由应用线程从channel中获取消息。 主动ACK是指在MQ主动push到channel中后,channel立马自动的给到MQ ack响应,然后MQ删除消息。

    2024年02月11日
    浏览(27)
  • 浅谈RabbitMQ消费端ACK和限流

    消费者 ACK 和 消费端限流 ack指  Acknowledge ,确认。 表示消费端收到消息后的确认方式。 有三种确认方式: • 自动确认:acknowledge=\\\" none \\\" • 手动确认:acknowledge=\\\" manual \\\" • 根据异常情况确认:acknowledge=\\\" auto \\\",(这种方式使用麻烦,不作讲解) 其中自动确认是指,当消息一

    2024年02月19日
    浏览(25)
  • rabbitmq基础7——队列和消息过期时间设置、死信队列、延迟队列、优先级队列、回调队列、惰性队列

    这里过一个知识点——过期时间,即对消息或队列设置过期时间(TTL)。一旦消息过期,消费就无法接收到这条消息,这种情况是绝不允许存在的,所以官方就出了一个对策——死信队列,死信队列最初出现的意义就是为了应对消息过期丢失情况的手段之一。 那么过期时间具

    2024年02月03日
    浏览(51)
  • RabbitMQ消息可靠性投递与ACK确认机制

    什么是消息的可靠性投递 保证消息百分百发送到消息队列中去 保证MQ节点成功接收消息 消息发送端需要接收到MQ服务端接收到消息的确认应答 完善的消息补偿机制,发送失败的消息可以再感知并二次处理 RabbitMQ消息投递路径 生产者–交换机–队列–消费者 通过两个节点控制

    2024年02月20日
    浏览(33)
  • SpringBoot集成RabbitMQ之ACK确认机制(第三节)

    目录 开始语 📝简述 🗒️模式NONE application配置 生产者 消费者 结果验证 🗒️模式AUTO application配置 生产者 消费者 结果验证 🗒️模式ACK(重点) application配置 生产者 消费者 结果验证 🗒️生产者确认机制 yml添加配置 修改生产者代码 结果验证 结束语 一位普通的程序员,

    2024年02月04日
    浏览(27)
  • RabbitMQ-ack、nack、reject、unacked

    如果队列使用的是手动ack,但在接收消息后不做任何ack处理,RabbitMQ会把消息标记为 unacked ,unacked状态的消息不会被消费,并且占用RabbirMQ资源,只有当消费者channel断开或者服务器重启,消息才会重新回到ready状态被其他消费者消费。 确认签收后,消息从队列中删除。 自动

    2024年02月15日
    浏览(27)
  • RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

    代码位置 消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。 为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。 如果RabbitMQ没有收

    2024年02月10日
    浏览(39)
  • 使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

    下载地址:link 1.下载完成放到rabbitmq安装目录plugins下 2.执行命令启用插件 3.重启mq Exchanges - add a new exchange - type 出现x-delayed-message即安装成功

    2024年02月11日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包