【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

这篇具有很好参考价值的文章主要介绍了【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

承接上文

承接上一篇文章【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)】我们基本上对层级时间轮算法的基本原理有了一定的认识,本章节就从落地的角度进行分析和介绍如何通过Java进行实现一个属于我们自己的时间轮服务组件,最后,在告诉大家一下,其实时间轮的技术是来源于生活中的时钟。

时间轮演示结构总览

无序列表时间轮

【无序列表时间轮】主要是由LinkedList链表和启动线程、终止线程实现。

遍历定时器中所有节点,将剩余时间为 0s 的任务进行过期处理,在执行一个周期。

  • 无序链表:每一个延时任务都存储在该链表当中(无序存储)。
  • 启动线程: 直接在链表后面push ,时间复杂度 O(1)。
  • 终止线程: 直接在链表中删除节点,时间复杂度 O(1) 。

遍历周期:需要遍历链表中所有节点,时间复杂度 O(n),所以伴随着链表中的元素越来越多,速度也会越来越慢!

无序列表时间轮的长度限制了其适用场景,这里对此进行优化。因此引入了有序列表时间轮。

有序列表时间轮

与无序列表时间轮一样,同样使用链表进行实现和设计,但存储的是绝对延时时间点

  • 启动线程有序插入,比较时间按照时间大小有序插入,时间复杂度O(n),主要耗时在插入操作
  • 终止线程链表中查找任务,删除节点,时间复杂度O(n),主要耗时在插入操作

找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1),从上面的描述「有序列表定时器」的性能瓶颈在于插入时的任务排序,但是换来的就是缩短了遍历周期。

所以我们如果要提高性,就必须要提升一下插入和删除以及检索的性能,因此引入了「树形有序列表时间轮」在「有序列表定时器」的基础上进行优化,以有序树的形式进行任务存储。

树形有序列表时间轮

  • 启动定时器: 有序插入,比较时间按照时间大小有序插入,时间复杂度 O(logn)
  • 终止定时器: 在链表中查找任务,删除节点,时间复杂度 O(logn)
  • 周期清算: 找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1)

层级时间轮

整体流程架构图,如下所示。

对应的原理,在这里就不进行赘述了,之前本人已经有两篇文章对层级式时间轮进行了较为详细的介绍了,有需要的小伙伴,可以直接去前几篇文章去学习,接下来我们进行相关的实现。

时间轮数据模型

时间轮(TimingWheel)是一个存储定时任务的环形队列,数组中的每个元素可以存放一个定时任务列表,其中存放了真正的定时任务,如下图所示。

时间轮的最基本逻辑模型,由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs),所以我们先来设计和定义开发对应的时间轮的轮盘模型。命名为Roulette类。

轮盘抽象类-Roulette

之所以定义这个抽象类

public abstract class Roulette {
	// 链表数据-主要用于存储每个延时任务节点
    List<TimewheelTask> tasks = null;
    // 游标指针索引
    protected int index;
	// 时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格
    protected int capacity;
	// 时间轮轮盘的层级,如果是一级,它的上级就是二级
    protected Integer level;
    private AtomicInteger num = new AtomicInteger(0);

   // 构造器
    public Roulette(int capacity, Integer level) {
        this.capacity = capacity;
        this.level = level;
        this.tasks = new ArrayList<>(capacity);
        this.index = 0;
    }
    // 获取当前下表的索引对应的时间轮的任务
    public TimewheelTask getTask() {
        return tasks.get(index);
    }
   // init初始化操作机制
    public List<TimewheelTask> init() {
        long interval = MathTool.power((capacity + 1), level);
        long add = 0;
        TimewheelTask delayTask = null;
        for (int i = 0; i < capacity; i++) {
            add += interval;
            if (level == 0) {
                delayTask = new DefaultDelayTask(level);
            } else {
                delayTask = new SplitDelayTask(level);
            }
            //已经转换为最小的时间间隔
            delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());
            tasks.add(delayTask);
        }
        return tasks;
    }

   // 索引下标移动
    public void indexAdd() {
        this.index++;
        if (this.index >= capacity) {
            this.index = 0;
        }
    }
   // 添加对应的任务到对应的队列里面
    public void addTask(TimewheelTask task) {
        tasks.add(task);
    }
	// 给子类提供的方法进行实现对应的任务添加功能
    public abstract void addTask(int interval, MyTask task);
}
时间轮盘的熟悉信息介绍

链表数据-主要用于存储每个延时任务节点。

List<TimewheelTask> tasks = null;

tasks也可以改成双向链表 + 数组的结构:即节点存贮的对象中有指针,组成环形,可以通过数组的下标灵活访问每个节点,类似 LinkedHashMap。

游标指针索引

protected int index;

时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格

protected int capacity;

时间轮轮盘的层级,如果是一级,它的上级就是二级

protected Integer level;

init初始化时间轮轮盘对象模型,主要用于分配分配每一个轮盘上面元素的TimewheelTask,用于延时队列的执行任务线程,已经分配对应的每一个节点的延时时间节点数据。

 public List<TimewheelTask> init() {
	   //  那么整个时间轮的总体时间跨度(interval)
        long interval = MathTool.power((capacity + 1), level);
        long add = 0;
        TimewheelTask delayTask = null;
        for (int i = 0; i < capacity; i++) {
            add += interval;
            if (level == 0) {
                delayTask = new ExecuteTimewheelTask(level);
            } else {
                delayTask = new MoveTimewheelTask(level);
            }
            //已经转换为最小的时间间隔
            delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());
            tasks.add(delayTask);
        }
        return tasks;
}
  • 整数a的n次幂:interval,计算跨度,主要是各级别之间属于平方倍数

例如,第一层:20 ,第二层:20^2 ......

    //例如 n=7  二进制 0   1                 1          1
    //a的n次幂 = a的2次幂×a的2次幂  ×   a的1次幂×a的1次幂  ×a
    public static long power(long a, int n) {
        int rtn = 1;
        while (n >= 1) {
            if((n & 1) == 1){
                rtn *= a;
            }
            a *= a;
            n = n >> 1;
        }
        return rtn;
    }
TimeUnitProvider工具类

主要用于计算时间单位操作的转换

public class TimeUnitProvider {
    private static TimeUnit unit = TimeUnit.SECONDS;
    public static TimeUnit getTimeUnit() {
        return unit;
    }
}

代码简介:

  • interval:代表着初始化的延时时间数据值,主要用于不同的层次的出发时间数据
  • for (int i = 0; i < capacity; i++) :代表着进行for循环进行添加对应的延时队列任务到集合中
  • add += interval,主要用于添加对应的延时队列的延时数据值!并且分配给当前轮盘得到所有数据节点。

获取当前下标的索引对应的时间轮的任务节点

public TimewheelTask getTask() {
        return tasks.get(index);
}
层级时间轮的Bucket数据桶

在这里我们建立了一个TimewheelBucket类实现了Roulette轮盘模型,从而进行建立对应的我们的层级时间轮的数据模型,并且覆盖了addTask方法。

public class TimewheelBucket extends Roulette {

    public TimewheelBucket(int capacity, Integer level) {
        super(capacity, level);
    }

    public synchronized void addTask(int interval, MyTask task) {
        interval -= 1;
        int curIndex = interval + this.index;
        if (curIndex >= capacity) {
            curIndex = curIndex - capacity;
        }
        tasks.get(curIndex).addTask(task);
    }
}

添加addTask方法,进行获取计算对应的下标,并且此方法add操作才是对外开发调用的,在这里,我们主要实现了根据层级计算出对应的下标进行获取对应的任务执行调度点,将我们外界BizTask,真正的业务操作封装到这个BizTask模型,交由我们的系统框架进行执行。

     public synchronized void addTask(int interval, BizTask task) {
        interval -= 1;
        int curIndex = interval + this.index;
        if (curIndex >= capacity) {
            curIndex = curIndex - capacity;
        }
        tasks.get(curIndex).addTask(task);
    }
时间轮轮盘上的任务点

我们针对于时间轮轮盘的任务点进行设计和定义对应的调度执行任务模型。一个调度任务点,可以帮到关系到多个BizTask,也就是用户提交上来的业务任务线程对象,为了方便采用延时队列的延时处理模式,再次实现了Delayed这个接口,对应的实现代码如下所示:

Delayed接口
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
TimewheelTask时间轮刻度点
@Getter
public abstract class TimewheelTask implements Delayed {
    private List<BizTask> tasks = new ArrayList<BizTask>();
    private int level;
    private Long delay;
    private long calDelay;
    private TimeUnit calUnit;
    public TimewheelTask(int level) {
        this.level = level;
    }
    public void setDelay(Long delay, TimeUnit unit) {
        this.calDelay=delay;
        this.calUnit=unit;
    }
    public void calDelay() {
        this.delay = TimeUnit.NANOSECONDS.convert(this.calDelay, this.calUnit) + System.nanoTime(); 
    }
    public long getDelay(TimeUnit unit) {
        return this.delay - System.nanoTime();
    }
    public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public void addTask(BizTask task) {
        synchronized (this) {
            tasks.add(task);
        }
    }
    public void clear() {
        tasks.clear();
    }
    public abstract void run();
}
  • 业务任务集合:private List<BizTask> tasks = new ArrayList<BizTask>();

    • 层级
    private int level;
    
    • 延时时间
      private Long delay;
    
    • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列
      private long calDelay;
      
    • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列(用于统一化的时间单位)
      private TimeUnit calUnit;
      
添加对应的业务延时任务到轮盘刻度点
    public void addTask(BizTask task) {
        synchronized (this) {
            tasks.add(task);
        }
    }
刻度点的实现类

因为对应的任务可能会需要将下游的业务任务进行升级或者降级,所以我们会针对于执行任务点分为,执行任务刻度点和跃迁任务刻度点两种类型。

  • 执行任务延时队列刻度点
public class ExecuteTimewheelTask extends TimewheelTask {
    public ExecuteTimewheelTask(int level) {
        super(level);
    }
    //到时间执行所有的任务
    public void run() {
        List<BizTask> tasks = getTasks();
        if (CollectionUtils.isNotEmpty(tasks)) {
            tasks.forEach(task -> ThreadPool.submit(task));
        }
    }
}

再次我们就定义执行这些任务的线程池为:

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(10000),
            new MyThreadFactory("executor"), new ThreadPoolExecutor.CallerRunsPolicy());
  • 跃迁任务延时队列刻度点
public class MoveTimewheelTask extends TimewheelTask {
    public MoveTimewheelTask(int level) {
        super(level);
    }
    //跃迁到其他轮盘,将对应的任务
    public void run() {
        List<BizTask> tasks = getTasks();
        if (CollectionUtils.isNotEmpty(tasks)) {
            tasks.forEach(task -> {
                long delay = task.getDelay();
                TimerWheel.adddTask(task,delay, TimeUnitProvider.getTimeUnit());
            });
        }
    }
}

致辞整个时间轮轮盘的数据模型就定义的差不多了,接下来我们需要定义运行在时间轮盘上面的任务模型,BizTask基础模型。

BizTask基础模型
public abstract class BizTask implements Runnable {
     protected long interval;
     protected int index;
     protected long executeTime;
     public BizTask(long interval, TimeUnit unit, int index) {
          this.interval  = interval;
          this.index = index;
          this.executeTime= TimeUnitProvider.getTimeUnit().convert(interval,unit)+TimeUnitProvider.getTimeUnit().convert(System.nanoTime(),TimeUnit.NANOSECONDS);
     }
     public long getDelay() {
          return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
}

主要针对于任务执行,需要交给线程池去执行,故此,实现了Runnable接口。

  • protected long interval;:跨度操作
  • protected int index;:索引下表,在整个队列里面的下表处理
  • protected long executeTime;:对应的执行时间

其中最重要的便是获取延时时间的操作,主要提供给框架的Delayed接口进行判断是否到执行时间了。

     public long getDelay() {
          return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
层级时间轮的门面TimerWheel

最后我们要进行定义和设计开发对应的整体的时间轮层级模型。

public class TimerWheel {

    private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();
    //一个轮表示三十秒
    private static int interval = 30;
    private static wheelThread wheelThread;

    public static void adddTask(BizTask task, Long time, TimeUnit unit) {
        if(task == null){
            return;
        }
        long intervalTime = TimeUnitProvider.getTimeUnit().convert(time, unit);
        if(intervalTime < 1){
            ThreadPool.submit(task);
            return;
        }
        Integer[] wheel = getWheel(intervalTime,interval);
        TimewheelBucket taskList = cache.get(wheel[0]);
        if (taskList != null) {
            taskList.addTask(wheel[1], task);
        } else {
            synchronized (cache) {
                if (cache.get(wheel[0]) == null) {
                    taskList = new TimewheelBucket(interval-1, wheel[0]);
                    wheelThread.add(taskList.init());
                    cache.putIfAbsent(wheel[0],taskList);
                }
            }
            taskList.addTask(wheel[1], task);
        }
    }
    static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
    }
    private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();

        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }

        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }

        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
        }
    }
}
TimerWheel的模型定义
private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();

一个轮表示30秒的整体跨度。

private static int interval = 30;

创建整体驱动的执行线程

private static wheelThread wheelThread;

 static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
}

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();
        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }
        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }
        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }
        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
   }

获取对应的时间轮轮盘模型体系
    private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

到这里相信大家,基本上应该是了解了如何去实现对应的时间轮盘的技术实现过程,有兴趣希望整个完整源码的,可以联系我哦。谢谢大家!文章来源地址https://www.toymoban.com/news/detail-404562.html

到了这里,关于【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 队列——“数据结构与算法”

    各位CSDN的uu们你们好呀,又好久不见啦,最近有点摆烂,甚是惭愧!!!!今天,小雅兰的内容是队列,下面,让我们进入队列的世界吧!!! 队列 队列的概念及结构 队列:只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出FIF

    2024年02月06日
    浏览(44)
  • 算法与数据结构-队列

      队列跟栈一样,也是一种操作受限的线性表数据结构。不过,队列是先进者先出。   栈只支持两个基本操作:入栈 push()和出栈 pop()。队列跟栈非常相似,支持的操作也很有限,最基本的操作也是两个:入队 enqueue(),放一个数据到队列尾部;出队 dequeue(),从队列头部取

    2024年02月12日
    浏览(43)
  • 数据结构与算法:队列

    在上篇文章讲解了栈之后,本篇也对这一章进行收尾,来到队列! 队列(Queue)就像是排队买票的人群。想象一下你去电影院看电影,人们在售票窗口形成一条线(队列)等待购票。队列遵循一个很重要的原则:先来先服务(First In, First Out,简称FIFO)。这意味着最先到达并

    2024年02月22日
    浏览(47)
  • 【数据结构和算法】--队列

    队列是只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有 先进先出 FIFO(First In First Out) 的原则。 入队列 :进行 插入操作的一端称为队尾 。 出队列 :进行 删除操作的一端称为队头 。 队列结构联想起来也非常简单,如其名,队列就相当于

    2024年02月05日
    浏览(42)
  • 算法与数据结构(四)--队列

    队列是另一种特殊的表,这种表只在表首(也称为队首)进行删除操作,只在表尾进行插入操作。 队列的修改是按 先进先出 的规则进行的,所以队列又称为先进先出表,First In First Out,简称FIFO表。映射到生活中就是排队的队伍。 如示意图所示,a(1)就是队首元素,a(n)就是队

    2024年02月15日
    浏览(49)
  • 数据结构与算法04:队列

    目录 什么是队列? 循环队列 双端队列 阻塞队列 队列的应用场景 每日一练 在 上一篇文章 中讲述了栈:先进后出就是栈,队列刚好相反, 先进先出的数据结构就是队列 ,还是拿纸箱子来举例:队列可以理解为一个没有底的纸箱子,往箱子里面放书,一本一本叠上去,但是

    2024年02月06日
    浏览(71)
  • Python数据结构与算法-数据结构(列表、栈、队列、链表)

    数据结构是指相互之间存在这一种或者多种关系的数据元素的集合和该集合中元素之间的关系组成。 简单来说,数据结构就是设计数据以何种方式组织并存储在计算机中。 比如:列表、集合与字典等都是一种数据结构。 N.Wirth:“程序=数据结构+算法” 数据结构按照其 逻辑结

    2024年02月08日
    浏览(53)
  • 数据结构之栈、队列——算法与数据结构入门笔记(四)

    本文是算法与数据结构的学习笔记第四篇,将持续更新,欢迎小伙伴们阅读学习 。有不懂的或错误的地方,欢迎交流 栈是一种线性数据结构,其 只允许在固定的一端进行插入和删除 元素操作。进行数据插入和删除操作的一端称为栈顶 (Top), 另一端称为栈底 (Bottom)。栈中的

    2024年02月08日
    浏览(39)
  • 数据结构与算法-双端队列

    Gitee上开源的数据结构与算法代码库:数据结构与算法Gitee代码库 双端队列、队列、栈对比 定义 特点 队列 一端删除(头)另一端添加(尾) First In First Out 栈 一端删除和添加(顶) Last In First Out 双端队列 两端都可以删除、添加 优先级队列 优先级高者先出队 延时队列 根据

    2024年02月13日
    浏览(40)
  • 数据结构与算法:栈和队列

    栈是一种后入先出(LIFO)的线性逻辑存储结构。只允许在栈顶进行进出操作。 基本操作包括:入栈(push)/出栈(pop)/获取栈顶元素(peek)。 栈的实现主要有两种: 1. 数组实现,即顺序栈 2. 链表实现,即链式栈 无论是以数组还是以链表实现,入栈、出栈的时间复杂度都是

    2024年02月11日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包