【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL

这篇具有很好参考价值的文章主要介绍了【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生产者端

目录结构

导入依赖

修改yml

业务逻辑

        队列消息过期

        消息单独过期


   

        TTL(Time To Live)存活时间。表示当消息由生产端存入MQ当中的存活时间,当时间到达的时候还未被消息就会被自动清除。RabbitMQ可以对消息单独设置过期时间也可以对整个队列(并不是队列,而是队列中的消息)设置过期时间。

生产者端

目录结构

【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL

导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

修改yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    #三个类型:none默认不开启确认回调 correlated开启确认回调
    #simple也会确认回调 还会调用waitForConfirms()方法或waitForConfirmsOrDie()方法
    publisher-confirm-type: correlated # 开启确认回调
    publisher-returns: true # 开启退回回调

业务逻辑

        队列消息过期

        第一段代码即是定义交换机与队列的名称并使其进行绑定,仅是一个配置类的效果。第二段代码就是生产者产生消息的方法,只需要在意for循环里面的逻辑即可。而图1即是创建出来的队列以及生产的10条消息,在10s会自动删除。因为在配置类中已经定义了TTL。

/**
 * 定义交换机与队列的Bean 并且使之绑定
 */
@Component
public class RabbitMQConfig {

    public static final String TTL_EXCHANGE_NAME = "ttl_exchange_name";
    public static final String TTL_QUEUE_NAME = "ttl_queue_name";

    @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.topicExchange(TTL_EXCHANGE_NAME).durable(true).build();
    }

    //配置队列的时候顺带上ttl()方法 其内部对MQ设置了参数"x-message-ttl"
    //注意这里的单位是毫秒 所以我写的参数为10000毫秒即10秒
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(TTL_QUEUE_NAME).ttl(10000).build();
    }

    @Bean
    public Binding ttl(@Qualifier("ttlExchange") Exchange exchange,
                       @Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}
@SpringBootTest
@RunWith(SpringRunner.class)
class RabbitmqProducerApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testTTL(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b) System.out.println("交换机成功接受到了消息");
                else System.out.println("消息失败原因" + s);
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("队列接受不到交换机的消息进行了失败回调");
            }
        });
        // 以上代码只是保证消息传递的可靠性 与TTL无关
        for(int i = 0; i < 10; ++i){
            rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,"test.heHe","HelloWorld");
        }
    }
}

【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL

                                                                 图1

        消息单独过期

         这里的配置类还是如上相同:队列中的消息10s自动过期,再对其中一条消息进行处理就可以更好的明白这两种过期的区别:其中一条消息设置了5s自动过期,通过图2可以发现队列中有11条消息,当5s后变为10条消息,再过了5s后就没有消息。

@SpringBootTest
@RunWith(SpringRunner.class)
class RabbitmqProducerApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testTTL(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if(b) System.out.println("交换机成功接受到了消息");
                else System.out.println("消息失败原因" + s);
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("队列接受不到交换机的消息进行了失败回调");
            }
        });
        // 以上代码只是保证消息传递的可靠性 与TTL无关
        // 消息的后处理对象 设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");//设置消息对象5s后过期
                return message;
            }
        };
        //消息单独5s过期
        rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,"test.heHe","HelloWorld",messagePostProcessor);
        //队列中的消息全体10s过期
        for(int i = 0; i < 10; ++i){
            rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,"test.heHe","HelloWorld");
        }
    }
}

【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL

                                                                 图2

        到这真的就以为结束了吗,当我把for循环的10条消息放到单独过期的消息上面,发现了新大陆:一开始的消息也是如图2所示为11条,但是但是,过了5s后并不会消除一条消息,而是过了10s后将11条全部删除了。于是我猜测这个消息的存放队列就好似一个栈,虽然先生产的消息的生存时间短,但是当别的消息压在自己头上的时候是出不去的,而是必须等自己为栈顶元素才可以出栈!文章来源地址https://www.toymoban.com/news/detail-431874.html

到了这里,关于【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot整合rabbitmq 实现消息发送和消费

    Spring Boot提供了RabbitMQ的自动化配置,使得整合RabbitMQ变得非常容易。 首先,需要在pom.xml文件中引入amqp-client和spring-boot-starter-amqp依赖: 接下来需要在application.properties文件中配置RabbitMQ连接信息: 然后编写消息发送者: 其中,my-exchange和my-routing-key是需要自己定义的交换机和

    2024年02月07日
    浏览(42)
  • 消息队列——spring和springboot整合rabbitmq

    目录 spring整合rabbitmq——生产者 rabbitmq配置文件信息 倒入生产者工程的相关代码 简单工作模式 spring整合rabbitmq——消费者 spring整合rabbitmq——配置详解 SpringBoot整合RabbitMQ——生产者  SpringBoot整合RabbitMQ——消费者   使用原生amqp来写应该已经没有这样的公司了 创建两个工程

    2024年02月16日
    浏览(54)
  • springboot 整合rabbitMq保证消息一致性方案

    RabbitMQ是一种开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,可用于在应用程序之间传递消息。RabbitMQ最初由LShift开发,现在由Pivotal Software维护。 RabbitMQ可以在多个平台上运行,包括Windows、Mac OS X和各种Linux发行版。它提供了多种编程语言的客户端库,如Java、

    2024年02月02日
    浏览(40)
  • 【SpringBoot】 整合RabbitMQ 保证消息可靠性传递

    生产者端 目录结构 导入依赖 修改yml 业务逻辑 测试结果         在publisher-confirm-type中有三个确认消息接受类型:none、correlated、simple。         publisher-confirm-type: none 表示 禁用发布确认模式 。是 默认值 。使用此模式之后,不管消息有没有发送到Broker(RabbitMQ)都不会

    2024年02月10日
    浏览(50)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(57)
  • SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的发送方:生产者 消息的接收方:消费者 同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送 异步消息:不需要接收方回应就可以进行下一步的发送 什么是消息队列? 当此时有很多个用户同时访问服务器,需要服务器进行操作,但此

    2024年04月27日
    浏览(52)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(47)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(48)
  • Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端&服务器端消息收发

    前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费; 1 为什么要使用Rabbitmq: RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点: 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和

    2024年02月07日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包