基于redisson实现延时队列解耦业务

这篇具有很好参考价值的文章主要介绍了基于redisson实现延时队列解耦业务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

      今天跟大家分享的是一个基于redisson实现的延时队列,有个初版的封装工具,使用者只用关心延时时间到了取到的数据处理(或者之前处理,到时间只做剩下的业务),废话不多说,直接上货。


一、业务场景

      这里是对物联网设备做数据模拟上报。看下原型转化后的需求界面吧。
基于redisson实现延时队列解耦业务,架构,工具,高效开发分享,redisson延时队列,延时队列实现

二、实现思路

1、实现其实有很多方案:

  1. 用timer实现
  2. 用java提供的队列实现
  3. redis实现
  4. redission实现

      最简单的直接用timer都可以做,我是想到这个延时队列以后还有其他场景使用,让其他开发小伙伴只用关心业务,所以基于redisson实现,封装延时队列工具类。

2、业务流程图

我自己画的简单流程图:
基于redisson实现延时队列解耦业务,架构,工具,高效开发分享,redisson延时队列,延时队列实现

三、核心代码

1.redisson引入与配置

      这个我之前有写,这里就不重复了

2.延时队列工具

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * redisson实现的延时队列
 *
 *
 * @author zwmac
 */
@Slf4j
@Component
public class RedissonDelayQueue {
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加任务到延时队列里面
     *
     * @param queueName 队列名称
     * @param data      数据
     * @param delayTime 延时时间,单位秒
     */
    public void addTaskToDelayQueue(String queueName,JSONObject data,Long delayTime) {
        if(StringUtils.isNotBlank(queueName)){
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(data, delayTime, TimeUnit.SECONDS);

        }
    }

    /**
     * 删除延时队列
     * @param queueName 队列名称
     */
    public void delDelayQueue(String queueName) {
        if(StringUtils.isNotBlank(queueName)){
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

            blockingDeque.clear();
            blockingDeque.delete();
            delayedQueue.clear();
            delayedQueue.destroy();

        }
    }

    /**
     * 判断队列是否存在
     * @param queueName 队列名称
     * @return true 存在,false 不存在
     */
    public boolean hasQueue(String queueName) {
        RBlockingDeque<JSONObject> blockingDeque =  redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        if (blockingDeque.isExists() && delayedQueue.isExists() && !delayedQueue.isEmpty()){
            return true;
        }
        return false;
    }

    /**
     * 队列消费者
     * @param consumer 消费者
     * @param queueName 队列名称
     */
    public void queueConsumer( Consumer consumer, String queueName){
        new Thread(() -> {
            while (true){
                try {
                    JSONObject data = this.takeFromDelayQueue(queueName);
                    if (data != null){
                        //消费接口
                        consumer.accept(data);
                        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
                        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
                        if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
                            //所有数据已经轮训完毕,删除队列
                            this.delDelayQueue(queueName);
                            //结束线程
                            log.info("队列名称:{},延时元素消费完成,退出释放线程",queueName);
                            break;
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //退出,释放线程
                    log.info("队列名称:{},退出线程释放,原因:{}",queueName,e.getMessage());
                    break;
                }

            }

        },queueName + "-Customer").start();
    }

    /**
     * 从延时队列里面取出数据
     * @param queueName 队列名称
     * @return 队列元素json对象
     * @throws Exception 异常
     */
    public JSONObject takeFromDelayQueue(String queueName) throws Exception {
        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        JSONObject jsonObject = null;
        try {
            //log.info("--队列名称:{},blockingDeque数量:{},delayedQueue数量:{}",queueName,blockingDeque.size(),delayedQueue.size());
            if (blockingDeque.isExists()){
                log.info("--出队列前--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
                jsonObject = blockingDeque.take();
                log.info("--出队列后--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
            }
            /** 这里处理早了,还没有消费就销毁了,会导致消费数据差一条
             if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
                //所有数据已经轮训完毕,删除队列
                this.delDelayQueue(queueName);
                //结束线程
                //Thread.currentThread().interrupt();
                throw new RuntimeException("所有数据已经轮训完毕,删除队列");
            }**/
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return jsonObject;
    }
}

      里面有关于线程销毁注释了一段,有兴趣的可以看看,为什么销毁不在那里处理,当然原因我也写在注释里了的。

2.使用

@Resource
  private RedissonDelayQueue redissonDelayQueue;

private MessageInfo historyReceive(JSONObject jsonObject, String identify) {
    //从ES查询设备的历史数据
    List<JSONObject> historyData = searchHisFromEs(nbDeviceId, startTime, endTime,logMarkId);

    //查询该设备是否有重放队列在执行
    String hisRetryQueueKey = "hisRetryQueueKey-" + nbDeviceId;
    if(redissonDelayQueue.hasQueue(hisRetryQueueKey)){
      //有重放队列在执行,删除原队列
      redissonDelayQueue.delDelayQueue(hisRetryQueueKey);
    }

    //放到延时队列
    if (CollectionUtil.isNotEmpty(historyData)) {
      queueConsumer(redissonDelayQueue,nbDeviceId,logMarkId,identify,hisRetryQueueKey);
      for (int i = 0; i < historyData.size(); i++) {
        JSONObject data = historyData.get(i);
        Long interval = 2L;
        if (i > 0){
          interval = Long.valueOf(intervalTime * i) + interval;
        }
        redissonDelayQueue.addTaskToDelayQueue(hisRetryQueueKey,data,interval);
      }
    }

    return new MessageInfo(0, "success");
  }
/**延时数据业务处理
**/
private void queueConsumer(RedissonDelayQueue redissonDelayQueue, String nbDeviceId, String logMarkId, String identify, String hisRetryQueueKey) {
    //消费延时队列数据
    redissonDelayQueue.queueConsumer(data -> {
      //重放数据做数据重新组织后,直接放到解析完成的队列
      log.info("时间:{}---重放数据:{}", DateUtil.now(),data);
      //业务处理
      

    },hisRetryQueueKey);
  }

      我这里是在从延时队列取到元素后做的一些业务操作,如果没有一些下游级联操作,其实可以在放入队列的for循环里做,真正到时间了,再做一些简单的业务也可以。
      可以看出,现在使用就只需要处理for循环放入延时队列,queueConsumer消费处理延时到期的业务。

3.效果

基于redisson实现延时队列解耦业务,架构,工具,高效开发分享,redisson延时队列,延时队列实现文章来源地址https://www.toymoban.com/news/detail-671482.html


总结

  1. 解耦,让开发只用关注业务
  2. 基于redisson不用太关注redis底层实现,这里可以理解就是2个队列,一个未到期队列、一个到期队列,随着时间的推移redisson帮我们实现从未到期移动数据到到期,我们只用管从到期取到数据的操作
  3. 封装还很粗糙,还有进步空间
    就分享到这,希望能帮到大家,uping!

到了这里,关于基于redisson实现延时队列解耦业务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Rabbitmq死信队列及延时队列实现

    问题:什么是延迟队列 我们常说的延迟队列是指消息进入队列后不会被立即消费,只有达到指定时间后才能被消费。 但RabbitMq中并 没有提供延迟队列功能 。那么RabbitMQ如何实现延迟队列 通过:死信队列 + RabbitMQ的TTL特性实现。 实现原理 给一个普通带有过期功能的队列绑定一

    2024年02月15日
    浏览(48)
  • Redisson实现简单消息队列:优雅解决缓存清理冲突

    在项目中,缓存是提高应用性能和响应速度的关键手段之一。然而,当多个模块在短时间内发布工单并且需要清理同一个接口的缓存时,容易引发缓存清理冲突,导致缓存失效的问题。为了解决这一难题,我们采用Redisson的消息队列功能,实现了一个简单而高效的消息队列,

    2024年02月16日
    浏览(38)
  • RabbitMQ如何实现延时队列

    RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列。 1. 延时队列的概念 延

    2024年02月16日
    浏览(39)
  • springboot kafka 实现延时队列

    好文推荐: 2.5万字详解23种设计模式 基于Netty搭建websocket集群实现服务器消息推送 2.5万字讲解DDD领域驱动设计 延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。 小编已经做好了 Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时

    2024年02月03日
    浏览(39)
  • Java实现Redis延时队列

    “如何实现Redis延时队列”这个面试题应该也是比较常见的,解答如下: 使用sortedset(有序集合) ,拿时间戳作为 score ,消息内容作为key 调用 zadd 来生产消息,消费者用zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。 Java实现Redis延时队列,首先要了解何为延时队列,即

    2024年02月20日
    浏览(42)
  • redis实现分布式延时队列

    延时队列是一种特殊的消息队列,它允许将消息在一定的延迟时间后再进行消费。延时队列的主要特点是可以延迟消息的处理时间,以满足定时任务或者定时事件的需求。 总之,延时队列通过延迟消息的消费时间,提供了一种方便、可靠的方式来处理定时任务和定时事件。它

    2024年02月08日
    浏览(45)
  • Golang RabbitMQ实现的延时队列

    之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。 延迟队列是一种特殊类型的消息队列 ,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景

    2024年02月10日
    浏览(52)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • RabbitMQ延时队列的实现原理和应用实例

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。 目前有两种方法可以设置消息的 TTL: 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间; 第二种方法是对消息本身进行单独设置,每条消息

    2024年02月05日
    浏览(57)
  • 【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

    承接上一篇文章【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)】我们基本上对层级时间轮算法的基本原理有了一定的认识,本章节就从落地的角度进行分析和介绍如何通过Java进行实现一个属于我们自

    2023年04月08日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包