rabbitMQ引入死信队列

这篇具有很好参考价值的文章主要介绍了rabbitMQ引入死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、基本概念

1、死信定义

        指的是,从队列当中取出来的消息,到达消费方后,因为某些原因导致消息并没有被正常消费掉,这些没有被后续处理的消息就是“死信”,而保存死信的队列,就是死信队列。

2、死信出现的场景举例

        为了保证订单业务的消息数据不丢失,需要使用死信队列机制,在消息消费发生异常的时候,将消息给投入到死信队列当中;

          用户在商城下单成功并点进去准备支付,超过指定时间未支付时,消息自动失效成为死信(消息超时情况);

3、死信的来源

        消息TTL过期;

        队列已经到达最大长度,数据无法再添加到MQ;

        消息被拒绝了;

4、死信架构图

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

分析:

对于消息生产者而言,只需要关注将消息发送给交换机即可;

而对于普通消费者C1而言,需要关注普通交换机、普通队列、死信交换机的相关信息,要做两次绑定操作(普通交换机和普通队列绑定,普通队列和死信队列绑定),难点在于---普通队列怎么与死信交换机进行绑定?

而消费者C2也是一个普通消费者,专注于死信队列当中消息的处理,需要关注死信交换机、死信队列的信息,在死信交换机和死信队列绑定之后,从队列当中拿到死信进行处理;

二、代码部分

(0)提前准备工具类

封装获取MQ的connection方法,以及释放资源的方法

public class AMQPUtils {
    //用于获取客户端和MQ绑定的连接对象
    public static Connection getConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/test");//用于隔离资源的虚拟主机
        factory.setHost("MQServer的ip地址");
        factory.setPort(5672);
        factory.setUsername("zhangsan");
        factory.setPassword("1234");
        Connection connection = factory.newConnection();
        return connection;
    }

    //释放资源
    public static void close(Channel channel,Connection connection) throws Exception{
        channel.close();
        connection.close();
    }
}

(一)模拟消息超时情况

1、消息发布者

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个普通交换机
        String normalExchange = "normal_Exchange";
        channel.exchangeDeclare(normalExchange,"direct");
        String key = "zhangsan";
        //设置消息的ttl时间为5s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
        //发布消息-发布多条消息验证
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish(normalExchange,key,properties,("message---->"+i).getBytes());
        }
        //释放资源
        AMQPUtils.close(channel,connection);
    }
}

2、消费者C1

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

代码:

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //提前准备一些名字
        String normalExchange = "normal_exchange";
        String deadExchange = "dead_exchange";
        String normalQueue = "normal_queue";
        String deadQueue = "dead_queue";
        String normal_key = "zhangsan";
        String dead_key = "lisi";
        //声明普通交换机
        channel.exchangeDeclare(normalExchange,"direct");
        //声明死信交换机
        channel.exchangeDeclare(deadExchange,"direct");
        //设置普通队列当中需要携带的其他信息(死信交换机、死信队列、路由key)
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",deadExchange);
        params.put("x-dead-letter-routing-key",dead_key);
        //声明普通队列
        channel.queueDeclare(normalQueue,false,false,false,params);

        //声明死信队列
        channel.queueDeclare(deadQueue,false,false,false,null);

        //binding
        //将普通交换机和普通队列绑定
        channel.queueBind(normalQueue,normalExchange,normal_key);
        //将将死信交换机和死信队列绑定
        channel.queueBind(deadQueue,deadExchange,dead_key);

        //消费消息
        channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1消费的消息是---->"+new String(body));
            }
        });
    }
}

注意:开启C1接受普通交换机的消息之后,关闭C1,让普通队列当中的消息超过超时时间,成为死信,后被死信交换机路由进入dead_queue当中,下图所示的就是消息超时之后进入死信队列:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

点击dead_queue可以查看具体的死信来源、交换机、路由key等信息;

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

此时:注意此时死信消息是保存在MQ当中的;

3、消费者c2

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

消费者C2要去消费死信队列当中的消息:

public class Consumer2 {
    public static void main(String[] args)throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //准备一些名字
        String deadExchange = "dead_exchange";
        String deadQueue = "dead_queue";
        String key = "lisi";
        //声明死信交换机
        channel.exchangeDeclare(deadExchange,"direct");
        //声明死信队列
        channel.queueDeclare(deadQueue,false,false,false,null);
        //交换机和队列绑定
        channel.queueBind(deadQueue,deadExchange,key);
        //消费死信消息
        channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2消费了死信----->"+new String(body));
            }
        });
    }
}

控制台结果:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

此时存储在死信队列当中的消息已经被C2消费了!

(二)模拟队列达到最大长度

请提前在MQ的控制台上,将情况1当中设置的队列给删除;

1、消息发布者

public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个普通交换机
        String normalExchange = "normal_exchange";
        channel.exchangeDeclare(normalExchange,"direct");
        String key = "zhangsan";
        //发布消息-发布多条消息验证
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
        }
        //释放资源
        AMQPUtils.close(channel,connection);
    }
}

2、消费者C1

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //提前准备一些名字
        String normalExchange = "normal_exchange";
        String deadExchange = "dead_exchange";
        String normalQueue = "normal_queue";
        String deadQueue = "dead_queue";
        String normal_key = "zhangsan";
        String dead_key = "lisi";
        //声明普通交换机
        channel.exchangeDeclare(normalExchange,"direct");
        //声明死信交换机
        channel.exchangeDeclare(deadExchange,"direct");
        //设置普通队列当中需要携带的其他信息(死信交换机、死信队列、路由key)
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",deadExchange);
        params.put("x-dead-letter-routing-key",dead_key);
        //设置普通队列的最大长度
        params.put("x-max-length",6);

        //声明普通队列
        channel.queueDeclare(normalQueue,false,false,false,params);

        //声明死信队列
        channel.queueDeclare(deadQueue,false,false,false,null);

        //binding
        //将普通交换机和普通队列绑定
        channel.queueBind(normalQueue,normalExchange,normal_key);
        //将将死信交换机和死信队列绑定
        channel.queueBind(deadQueue,deadExchange,dead_key);

        //消费消息
        channel.basicConsume(normalQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1消费的消息是---->"+new String(body));
            }
        });
       
    }
}

开启C1,然后关闭,再启动消息生产者,结果:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

3、消费者C2

public class Consumer2 {
    public static void main(String[] args)throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //准备一些名字
        String deadExchange = "dead_exchange";
        String deadQueue = "dead_queue";
        String key = "lisi";
        //声明死信交换机
        channel.exchangeDeclare(deadExchange,"direct");
        //声明死信队列
        channel.queueDeclare(deadQueue,false,false,false,null);
        //交换机和队列绑定
        channel.queueBind(deadQueue,deadExchange,key);
        //消费死信消息
        channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2消费了死信----->"+new String(body));
            }
        });
    }
}

结果:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

死信队列当中的消息已经被消费了;

(三)模拟消息被拒绝

1、消息发布者

public class Provider {
    public static void main(String[] args) throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明一个普通交换机
        String normalExchange = "normal_exchange";
        channel.exchangeDeclare(normalExchange,"direct");
        String key = "zhangsan";
        //发布消息-发布多条消息验证
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish(normalExchange,key,null,("message---->"+i).getBytes());
        }
        //释放资源
        AMQPUtils.close(channel,connection);
    }
}

2、消费者C1

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection connection = AMQPUtils.getConnection();
        final Channel channel = connection.createChannel();
        //提前准备一些名字
        String normalExchange = "normal_exchange";
        String deadExchange = "dead_exchange";
        String normalQueue = "normal_queue";
        String deadQueue = "dead_queue";
        String normal_key = "zhangsan";
        String dead_key = "lisi";
        //声明普通交换机
        channel.exchangeDeclare(normalExchange,"direct");
        //声明死信交换机
        channel.exchangeDeclare(deadExchange,"direct");
        //设置普通队列当中需要携带的其他信息(死信交换机、死信队列、路由key)
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",deadExchange);
        params.put("x-dead-letter-routing-key",dead_key);

        //声明普通队列
        channel.queueDeclare(normalQueue,false,false,false,params);

        //声明死信队列
        channel.queueDeclare(deadQueue,false,false,false,null);

        //binding
        //将普通交换机和普通队列绑定
        channel.queueBind(normalQueue,normalExchange,normal_key);
        //将将死信交换机和死信队列绑定
        channel.queueBind(deadQueue,deadExchange,dead_key);

        //消费消息--注意关闭自动确认
        channel.basicConsume(normalQueue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /*System.out.println("C1消费的消息是---->"+new String(body));*/
                //模拟消息被拒绝--把所有消息都拒绝
                channel.basicReject(envelope.getDeliveryTag(),false);
            }
        });
      
    }
}

注意:先启动C1,然后关闭,启动消息发布者,结果如下:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

这10条消息目前都保存在MQ当中,然后再启动C1,把消息全部拒绝掉,让它们成为死信:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

点击dead_queue,去查看死信队列当中的一些信息:

rabbitMQ引入死信队列,java-rabbitmq,rabbitmq,java

3、消费者C2

public class Consumer2 {
    public static void main(String[] args)throws Exception {
        Connection connection = AMQPUtils.getConnection();
        Channel channel = connection.createChannel();
        //准备一些名字
        String deadExchange = "dead_exchange";
        String deadQueue = "dead_queue";
        String key = "lisi";
        //声明死信交换机
        channel.exchangeDeclare(deadExchange,"direct");
        //声明死信队列
        channel.queueDeclare(deadQueue,false,false,false,null);
        //交换机和队列绑定
        channel.queueBind(deadQueue,deadExchange,key);
        //消费死信消息
        channel.basicConsume(deadQueue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2消费了死信----->"+new String(body));
            }
        });
    }
}

启动消费者C2,消费死信队列当中的消息!

三、小结

        死信队列的出现,是为了保存因为特殊原因无法被消费的消息,避免消息直接失效!这些消息通过rabbitMQ的死信队列机制,可以保存在MQ服务的死信队列当中,等待被其他的消费者进行处理!

 需要注意的是:

       只有针对消息的设置会放在消息发布方进行,队列等操作,因为发布方无法自己决定消息被路由到哪个队列,只能决定把消息交给哪个交换机,以及给定路由规则;

      对于消息消费方而言,需要确定交换机、消息队列,已经完成 交换机和队列的绑定操作,所以针对于队列的设置都是放在消费方完成的!

        文章来源地址https://www.toymoban.com/news/detail-788056.html

到了这里,关于rabbitMQ引入死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(53)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(47)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • Rabbitmq死信队列及延时队列实现

    问题:什么是延迟队列 我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。 但RabbitMq中并 没有提供延迟队列功能 。那么RabbitMQ如何实现延迟队列 通过:死信队列 + RabbitMQ的TTL特性实现。 实现原理 给一个普通带有过期功能的队列绑定一

    2024年02月15日
    浏览(48)
  • RabbitMQ: 死信队列

    其实就是一个普通的队列,绑定号私信交换机,不给ttl,给上匹配的路由,等待交换机发送消息。 1.在消费者里的RabbitMQConfig配置类里,创建队列,给它加参数 第四个参数,就是放入这个队列,的一些属性参数 也就是这两个位置 对应Java代码里好像少个参数,排他性,是指,

    2024年02月09日
    浏览(44)
  • RabbitMQ——死信队列

    死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。 死信的来源 在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于: 消息被拒绝( basic.reject 或 bas

    2024年03月14日
    浏览(55)
  • RabbitMQ进阶——死信队列

    在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。 为了避免这些未成

    2024年04月13日
    浏览(33)
  • RabbitMQ 死信队列实现

    死信队列使用注解实现 报错: 可以使用注解的方式来绑定 死信队列,但是还是会报上面的错误,继续修改 参数试试 java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow 但是使用注解绑定的话好像又不生效了,问题原因,tmd将死信参数绑到交换机上了,c 修改代码  至于问题

    2024年02月02日
    浏览(35)
  • rabbitmq的死信队列

    目录 成为死信的条件  消息TTL过期   队列达到最大长度  消息被拒 延迟队列  延迟队列使用场景  消息设置 TTL 队列设置 TTL  两者区别   producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息 进行消费,但某些时候由于特定的 原因导致 queue 中的某些消

    2024年02月12日
    浏览(37)
  • RabbitMQ:死信队列

    📃个人主页:不断前进的皮卡丘 🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记 🔥个人专栏:消息中间件 队列中不能被消费的消息称为死信队列 有时候因为特殊原因,可能导致队列中的某些信息无法被消费

    2024年02月02日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包