springboot整合rabbitmq死信队列

这篇具有很好参考价值的文章主要介绍了springboot整合rabbitmq死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

springboot整合rabbitmq死信队列

什么是死信
说道死信,可能大部分观众大姥爷会有懵逼的想法,什么是死信?

死信队列,俗称DLX,翻译过来的名称为Dead Letter Exchange 死信交换机。

当消息限定时间内未被消费,成为 Dead Message后,可以被重新发送到另一个交换机中,发挥其应有的价值!

需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置:

从上面的逻辑图中,可以发现大致的思路:
springboot整合rabbitmq死信队列,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

.1. 消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。
2. 正常队列针对死信信息,需要将数据 重新 发送至死信交换机中。

死信使用的场景
  1. 消息被拒绝
  2. 消息ttl过期
  3. 队列达到最大长度

这三种场景就会成为死信,然后放入死信交换机

import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    //正常交换机的名字
    public final static String  EXCHANGE\_NAME = "exchange\_name";
    //正常队列的名字
    public final static String QUEUE\_NAME="queue\_name";
    //死信交换机的名字
    public final static String  EXCHANGE\_DEAD = "exchange\_dead";
    //死信队列的名字
    public final static String QUEUE\_DEAD="queue\_dead";
    //死信路由key
    public final static String DEAD\_KEY="dead.key";




    //创建正常交换机
    @Bean(EXCHANGE\_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
                //持久化 mq重启后数据还在
                .durable(true)
                .build();
    }



    //创建正常队列
    @Bean(QUEUE\_NAME)
    public Queue queue(){
        //正常队列和死信进行绑定 转发到 死信队列,配置参数
        Map<String,Object>map=getMap();
        return new Queue(QUEUE\_NAME,true,false,false,map);
    }

    //正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射
    @Bean
    public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
                           @Qualifier(EXCHANGE\_NAME) Exchange exchange){
        return BindingBuilder.bind(queue)
                .to(exchange)
                //路由规则
                .with("app.#")
                .noargs();
    }

    //创建死信队列
    @Bean(QUEUE\_DEAD)
    public Queue queueDead(){
        return new Queue(QUEUE\_DEAD,true,false,false);
    }

    //创建死信交换机
    @Bean(EXCHANGE\_DEAD)
    public Exchange exchangeDead(){
        return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
                .durable(true) //持久化 mq重启后数据还在
                .build();
    }


    //绑定死信队列和死信交换机
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(queueDead())
                .to(exchangeDead())
                //路由规则 正常路由key
                .with(DEAD\_KEY)
                .noargs();
    }

    /\*\*
      获取死信的配置信息
     \*
     \*\*/
    public Map<String,Object>getMap(){
        //3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。
        //方式一
        Map<String,Object> map=new HashMap<>(16);
        //死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
        map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
        //死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
        map.put("x-dead-letter-routing-key", DEAD\_KEY);
        //方式二
        //消息的过期时间,单位:毫秒;达到时间 放入死信队列
        // map.put("x-message-ttl",5000);
        //方式三
        //队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据
        // map.put("x-max-length",3);
        return map;
    }


}

配置文件信息

spring:
  rabbitmq:
    host: 192.168.23.135
    port: 5672
    username: admin
    password: admin
    #虚拟主机
    virtual-host: dmg-a
    listener:
      simple:
        #自动ack
        acknowledge-mode: auto
        retry:
          #最大重试次数
          max-attempts: 3
          #开启重试
          enabled: true

引入 rabbitmq 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者

@RestController
@RequestMapping("p")
public class TestController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/test")
    public String test(){
        //正常交换机 正常路由键 正常消息内容
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE\_NAME
        ,"app.test","我是生产者");
        return "aa";
    }
}

//消费者

@Component
public class Xf {


    //监听正常队列名称
    @RabbitListener(queues = {RabbitmqConfig.QUEUE\_NAME})
    public void normal(String payload, Message message, Channel channel) throws IOException {
        System.out.println("正常消息:"+payload);
        long tag=message.getMessageProperties().getDeliveryTag();
        try{
           // int i=1/0;
            //手动签收
            channel.basicAck(tag,true);
        }catch (RuntimeException runtimeException){
            //出现异常 删除消息 放入死信队列
            channel.basicReject(tag,false);
        }
    }

监听死信队列名称

 @RabbitListener(queues = {RabbitmqConfig.QUEUE\_DEAD})
    public void dead(String payload, Message message, Channel channel) throws IOException {
        System.out.println("死信队列:"+payload);
        //删除消息 放入数据库 人工处理
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
    }
}

springboot整合rabbitmq死信队列,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

springboot整合rabbitmq死信队列,消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq文章来源地址https://www.toymoban.com/news/detail-677747.html

到了这里,关于springboot整合rabbitmq死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包