RabbitMQ之TTL机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ之TTL机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ之TTL机制

        在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 

该如何实现?

  • 定期轮询(数据库等)
    • 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
    • 优点:设计实现简单
    • 缺点:需要对数据库进行大量的IO操作,效率低下。
  • Timer
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
            Timer timer = new Timer();
            TimerTask timerTask = new TimerTask() {
                @Override
                public void run() {
                    System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
                    timer.cancel();
                }
            };
            System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
            // 10秒后执行timerTask
            timer.schedule(timerTask, 10 * 1000);
    • 缺点
      • Timers没有持久化机制
      • Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
      • Timers 不能利用线程池,一个timer一个线程
      • Timers没有真正的管理计划
  • ScheduledExecutorService
            SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
            // 线程工厂
            ThreadFactory factory = Executors.defaultThreadFactory();
            // 使用线程池
            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
            System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
            service.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("用户未付款,交易取消:" + format.format(new Date()));
                }// 等待10s 单位秒
            }, 10, TimeUnit.SECONDS);
    • 优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。
    • 在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。
  • RabbitMQ:使用TTL
  • Quartz
  • Redis Zset
  • JCronTab
  • SchedulerX
  • 。。。

TTL,Time to Live 的简称,即过期时间。

RabbitMQ 可以对消息和队列两个维度来设置TTL。

   任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。

目前有两种方法可以设置消息的TTL。

  1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息自身进行单独设置,每条消息的TTL 可以不同。

        如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的

1、原生API案例

package com.lagou.rabbitmq.demo;

import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.80.121:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<>();
        // 消息队列中消息的过期时间
        arguments.put("x-message-ttl", 10 * 1000);
        // 如果消息队列中没有消费者,则10s后消息过期,队列将会被自动删除
        arguments.put("x-expires", 60 * 1000);

        channel.queueDeclare("queue.ttl.waiting",
                true,
                false,
                false,
                arguments);

        channel.exchangeDeclare("ex.ttl.waiting",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                false,
                null);

        channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentEncoding("utf-8")
                .deliveryMode(2) // 设置消息持久化,2代表设置消息持久化
                .build();

        channel.basicPublish("ex.ttl.waiting", "key.ttl.waiting", properties, "等待的订单号".getBytes(StandardCharsets.UTF_8));

        channel.close();
        connection.close();
    }
}

此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

  • 如果不设置TTL,则表示此消息不会过期;
  • 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

        注意理解message-ttlx-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

2、springboot案例

(1)pom.xml添加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

(2)application.properties添加rabbitmq连接信息

spring.application.name=ttl
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

(3)主入口类

package com.lagou.rabbitmq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo07.class, args);
    }
}

(4)RabbitConfig类

package com.lagou.rabbitmq.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queueTTLWaiting() {
        Map<String, Object> props = new HashMap<>();
        // 对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    }

    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", false, false, false);
        return queue;
    }

    @Bean
    public Exchange exchangeTTLWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
        return exchange;
    }

    /**
     * 该交换器使用的时候,需要给每个消息设置有效期
     *
     * @return
     */
    @Bean
    public Exchange exchangeWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
        return exchange;
    }

    @Bean
    public Binding bindingTTLWaiting() {
        return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl - waiting").noargs();
    }

    @Bean
    public Binding bindingWaiting() {
        return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
    }
}

(5)PayController类文章来源地址https://www.toymoban.com/news/detail-423852.html

package com.lagou.rabbitmq.demo.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

@RestController
public class PayController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/pay/queuettl")
    public String sendMessage() {
        rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttlwaiting", "发送了TTL-WAITING-MESSAGE");
        return "queue-ttl-ok";
    }

    @RequestMapping("/pay/msgttl")
    public String sendTTLMessage() throws UnsupportedEncodingException {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");
        Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        return "msg-ttl-ok";
    }
}

到了这里,关于RabbitMQ之TTL机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实时数仓建设第2问:怎样使用flink sql快速无脑统计当天下单各流程(已发货,确认收货等等)状态的订单数量

    实时统计当天下单各流程状态(已支付待卖家发货,卖家通知物流揽收,待买家收货等等)中的订单数量。 订单表的binlog数据发送到kafka,flink从kafka接受消息进行指标统计。因为每笔订单的状态会发生变化,比如上午为【已支付待卖家发货】,这个时候【已支付待卖家发货】指标

    2024年02月16日
    浏览(31)
  • 【微信小程序源码】独立版云贝餐饮连锁V2_2.3.9源码线传小程序,新增堂食订单,支付打印新增下单时间显示

    百度网盘:https://pan.baidu.com/s/150WZX2gN_QM9nyIQAOpuXA?pwd=gzm0  提取码:gzm0 版本:2.9.4 备注: 不需要上传小程序 云贝餐饮连锁V2、云贝多端餐饮外卖连锁版、自助点单系统、餐饮外卖小程序、微信小程序、餐饮小程序 PHP+MYSQL+小程序 CentOS Linux 7.6.1810 (Core)、运行环境:宝塔 Linux v7

    2024年02月09日
    浏览(39)
  • RabbitMQ 过期时间(TTL)

    TTL,Time to Live的简称,即过期时间,RabbitMQ可以对消息和队列设置TTL。        RabbitMQ支持设置队列的过期时间和消息的过期时间。如果设置队列的过期时间则队列中所有的消息都有相同的过期时间。如果设置消息的过期时间则每条消息的过期时间则可以不同。如两个方法一

    2024年02月14日
    浏览(24)
  • rabbitMQ实现订单超时未支付自动取消订单

    下面展示一些 内联代码片 。 1.配置文件,导入jar包 2. 使用rabbitMQ插件绑定延时队列,插件可自动实现死信队列,无需配置 3. 生产者代码实现, 4. service层代码, 创建订单时,同步发送消息到mq,指定超时时间 5.消费者代码实现,修改订单的状态, 保存未支付订单表中数据

    2024年02月15日
    浏览(32)
  • RabbitMQ之TTL+死信队列实现延迟队列

    RabbitMQ是一个流行的消息队列系统,它提供了许多有用的功能,其中之一是TTL(Time To Live)和死信队列。这些功能可以用来实现延迟队列,让我们来看看如何使用它们。 首先,什么是TTL?TTL是消息的存活时间,它可以设置为一个特定的时间段。如果消息在这个时间段内没有被

    2024年02月13日
    浏览(31)
  • 三十岁成功入职京东啦!

    我是小九小九不爱喝酒: 自己工作5年后,我成功拿到了京东的offer。下面说下我是如何从传统行业到京东的经历,希望能对你有所帮助。 本科我学的是机械电子工程专业,2013年本科毕业后,同学们大多到各研究所从事智能机器人方向的研究。 自己学渣一枚没有走上科研这条

    2024年02月03日
    浏览(27)
  • Web自动化测试进阶:网页中难点之等待机制 —— 强制等待,隐式等待

    为什么要添加等待 避免页面未渲染完成后操作,导致的报错 经常会遇到报错:selenium.common.exceptions.NoSuchElementException: Message: no such element: Unable to locate element: {\\\"method\\\":\\\"xpath\\\",\\\"selector\\\":\\\"//*[text()=\\\'个人中心\\\']\\\"} 页面还在加载时,就在进行查收元素,此时元素还没显示加载出来,而报

    2024年02月05日
    浏览(59)
  • SpringBoot 结合RabbitMQ与Redis实现商品的并发下单【SpringBoot系列12】

    SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。 程序员每天的CV 与 板砖,也要知其所以然,本系列课程可以帮助初学者学习 SpringBooot 项目开发 与 SpringCloud 微服务系列项目开发 1 项目准备 SpringBoot 整合 RabbitMQ 消息队列【SpringBoot系列11】本文章 基于这个项目来开

    2023年04月19日
    浏览(28)
  • RabbitMQ高级特性2 、TTL、死信队列和延迟队列

    设置 消费者 测试 添加多条消息 拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费 Time To Live(存活时间/过期时间)。 当消息到达存活时间后,还没有被消费,会被自动清除。 RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。 可

    2024年01月16日
    浏览(32)
  • RabbitMQ详解(五):过期时间TTL、死信队列、磁盘监控

    过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。 目前有两种方法可以设置 第一种方法是通过队列属性设置,队列中所有消息都有相同的TTL。 第二种方法是对消息进行单独设置,每条消息TTL可以不同。 当同时

    2024年02月04日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包