redisson的延时队列机制简述

这篇具有很好参考价值的文章主要介绍了redisson的延时队列机制简述。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

注意点,在消费者监听处如果使用thread相关操作因为redisson的默认线程nameredisson-netty会抛异常,我的处理方式是把相关操作都放到自己的线程池中操作.

官方解释是在netty线程中调用同步方法可能会导致超时;
issue:https://github.com/redisson/redisson/issues/3549

异常见源码

org.redisson.command.CommandAsyncService.get(org.redisson.api.RFuture<V>)

版本
redissonredisson-spring-boot-starter-3.17.6.jar
redis:6.2.7

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
redisson的延时队列机制简述,springboot,redis,redis,spring boot

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    //延时队列map
    private final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);


    /**
     * 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务
     */
    @PostConstruct
    public void reScheduleDelayedTasks() {
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
        for (DelayQueueEnum queueEnum : queueEnums) {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        }
    }


    @Override
    public void afterPropertiesSet() {
        // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();

        for (DelayQueueEnum queueEnum : queueEnums) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());
            if (delayQueueConsumer == null) {
                throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");
            }
            // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
            // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
            RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());

            //消费者初始化队列
            RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            //set到map中方便获取
            delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);
            // 订阅新元素的到来,调用的是takeAsync(),异步执行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }

    public RedissonClient getRedissonClient() {
        return redissonClient;
    }

    public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
        return delayQueueMap;
    }
}








import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class DelayQueueUtil {

    private static RedissonDelayQueueConfig redissonDelayQueueConfig;

    @Resource
    public void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {
        DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;
    }

    private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
        if(null == redissonDelayQueueConfig) return Collections.emptyMap();
        return redissonDelayQueueConfig.getDelayQueueMap();
    }

    private static RedissonClient getRedissonClient() {
        if(null == redissonDelayQueueConfig) return null;
        return redissonDelayQueueConfig.getRedissonClient();
    }

    /**
     * 添加延迟消息
     */
    public static void addDelayMessage(DelayMessageDTO delayMessage) {
        log.info("delayMessage={}", delayMessage);

        Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");

        delayMessage.setCreateTime(DateUtil.now());
        if(null == delayMessage.getTimeUnit()){
            delayMessage.setTimeUnit(TimeUnit.SECONDS);
        }

        RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
        //移除相同的消息
        rDelayedQueue.remove(delayMessage);

        //添加消息
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());
    }


    /**
     * 移除指定队列中的消息
     */
    public static void removeDelayMessage(DelayMessageDTO delayMessage) {
        log.info("取消:delayMessage={}", delayMessage);
        if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {
            log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
            return;
        }

        RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
        rDelayedQueue.remove(delayMessage);
        removeDelayQueue(delayMessage);
    }


    /**
     * 从所有队列中删除消息
     */
    public static void removeDelayQueue(DelayMessageDTO value) {
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
        for (DelayQueueEnum queueEnum : queueEnums) {
            RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);
            delayedQueue.remove(value);
        }
    }



}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html文章来源地址https://www.toymoban.com/news/detail-807756.html

到了这里,关于redisson的延时队列机制简述的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • redis实现分布式延时队列

    延时队列是一种特殊的消息队列,它允许将消息在一定的延迟时间后再进行消费。延时队列的主要特点是可以延迟消息的处理时间,以满足定时任务或者定时事件的需求。 总之,延时队列通过延迟消息的消费时间,提供了一种方便、可靠的方式来处理定时任务和定时事件。它

    2024年02月08日
    浏览(45)
  • 【Redis进阶】一文搞懂Redisson的看门狗机制底层实现

    看门狗机制是Redission提供的一种自动延期机制,这个机制使得 Redission提供的分布式锁是可以自动续期的 。 看门狗机制提供的默认超时时间是30*1000毫秒,也就是30秒 如果一个线程获取锁后,运行程序到释放锁所花费的时间大于锁自动释放时间(也就是看门狗机制提供的超时时

    2024年02月16日
    浏览(56)
  • springboot kafka 实现延时队列

    好文推荐: 2.5万字详解23种设计模式 基于Netty搭建websocket集群实现服务器消息推送 2.5万字讲解DDD领域驱动设计 延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。 小编已经做好了 Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时

    2024年02月03日
    浏览(39)
  • 创建延时队列、springboot配置多个rabbitmq

    type选择fanout (图中已经绑定,红框为绑定过程) (图中已经绑定,红框为绑定过程) 延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作 RabbitConfig配置类 mq1 mq2 application-prod.yaml mq1消费端,发消息给mq2 mq2消费端用于递归删除文件 FileHelper工具类递归删除文件或文

    2024年02月11日
    浏览(41)
  • 【Spring Boot 3】【Redis】集成Redisson

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月23日
    浏览(50)
  • redisson配置类---SpringBoot集成、redis单机和集群模式配置

    1.1:pom.xml 1.2 application.yml配置文件 2-1配置属性类:RedissonProperties.java 2-2redis配置:RedisConfig.java 注:EnableConfigurationPropertiess用法: 2.3:Redisson使用

    2024年02月06日
    浏览(44)
  • Spring Boot + Redis 延时双删功能,实战来了!

    在多线程并发情况下,假设有两个数据库修改请求,为保证数据库与redis的数据一致性,修改请求的实现中需要修改数据库后,级联修改Redis中的数据。 请求一:A修改数据库数据 B修改Redis数据 请求二:C修改数据库数据 D修改Redis数据 并发情况下就会存在A — C — D — B的情况

    2024年02月08日
    浏览(55)
  • springboot自定义注解+aop+redis实现延时双删

    redis作为用的非常多的缓存数据库,在多线程场景下,可能会出现数据库与redis数据不一致的现象 数据不一致的现象:https://blog.csdn.net/m0_73700925/article/details/133447466 这里采用aop+redis来解决这个方法: 删除缓存 更新数据库 延时一定时间,比如500ms 删除缓存 这里之所以要延时一

    2024年01月17日
    浏览(42)
  • SpringBoot + RabbitMQ从延时队列中删除指定的值【RabbitMQ中的basicAck和basicNack的区别以及basicReject又是什么?】

    业务需求是,就是我本来是有一个order-queue队列绑定到了死信队列交换机order-dead-direct-exchange上,然后我的业务是,现在有一个用户下单但是没有付款,order-queue队列写入该条信息并计时24小时后如果用户还是未付款状态则移除到死信队列order-dead-queue中。问题来了,如果在这个

    2024年02月16日
    浏览(42)
  • redis — 基于Spring Boot实现redis延迟队列

    1. 业务场景 延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种: 在各种购物平台上下单,订单超过30分钟未支付

    2024年02月13日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包