自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

这篇具有很好参考价值的文章主要介绍了自定义一个简单的Task调度器、任务循环调度器、TaskScheduler。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

  自从接触异步(async await  Task)操作后,始终都不明白,这个Task调度的问题。

  接触Quartz.net已经很久了,只知道它实现了一套Task调度的方法,自己跟着Quartz.net源代码写了遍,调试后我算是明白了Task调度的一部分事(自定义一个简单的Task调度器、任务循环调度器、TaskScheduler )。

  春风来不远,只在屋东头。

  理解Task运行,请参考大佬文章 https://www.cnblogs.com/artech/p/task_scheduling.html ,推荐大佬的书。

  直到我看Quartz.net源代码中的任务调度 “QueuedTaskScheduler”,我才搞明白了,如何写一个简单的任务调度器,或者说线程如何执行代码,才不会造成死循环,CPU吃满等问题,下面代码有的直接从quartz.net copy过来的。

BlockingCollection类

  微软文档 https://learn.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview

  个人博客,中文解释通俗易懂 https://www.cnblogs.com/gl1573/p/14595985.html

  BlockingCollection 提供一个很重要的“阻塞”功能。

TaskScheduler类

  TaskScheduler 直译过来:表示一个对象,该对象处理将任务排队到线程上的低级工作。

  该类为抽象类,其真正意义在于“对Task任务的编排”

基于TaskScheduler类实现自定义的“Task队列调度器”

  源代码,我的仓库 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler

  1.定义一个存储Task的队列容器,使用BlockingCollection容器来添加Task,为什么使用BlockingCollection,后面会解释

        /// <summary>The collection of tasks to be executed on our custom threads.</summary>
        private readonly BlockingCollection<Task> _blockingTaskQueue;

  2.定义CancellationTokenSource变量,用于释放。通常就是调用 CancellationToken.ThrowIfCancellationRequested() ,抛出一个 “OperationCanceledException”的异常,使正在执行的Task任务停止。

  3.创建Thread数组,用于存储创建出的Thread

        /// <summary>The threads used by the scheduler to process work.</summary>
        private readonly Thread[] _threads;

   4.自定义一个类QueuedTaskScheduler,继承 “TaskScheduler”,“IDisposable”

public class QueuedTaskScheduler: System.Threading.Tasks.TaskScheduler, IDisposable

    实现构造函数

        public QueuedTaskScheduler(int threadCount)
        {
            _threadCount = threadCount;
            _blockingTaskQueue = new BlockingCollection<Task>();

            // create threads
            _threads = new Thread[threadCount];
            for (int i = 0; i < threadCount; i++)
            {
                _threads[i] = new Thread(ThreadBasedDispatchLoop)
                {
                    Priority = ThreadPriority.Normal,
                    IsBackground = true,
                    Name = $"threadName ({i})"
                };
            }

            // start
            foreach (var thread in _threads)
            {
                thread.Start();
            }
        }

    在构造函数中创建,并启动“Thread”,构造函数接收一个“线程数量的参数”,控制开启的线程数。

    Thread中实现的委托为“ThreadBasedDispatchLoop”,其表达意思是“基于循环的调度”。

  5.重点来了,具体看下“ThreadBasedDispatchLoop”方法的实现

ThreadBasedDispatchLoop实现

         /// <summary>The dispatch loop run by all threads in this scheduler.</summary>
        private void ThreadBasedDispatchLoop()
        {
            _taskProcessingThread.Value = true;

            try
            {
                // If a thread abort occurs, we'll try to reset it and continue running.
                while (true)
                {
                    try
                    {
                        // For each task queued to the scheduler, try to execute it.
                        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
                        {
                            TryExecuteTask(task);
                        }
                    }
                    catch (ThreadAbortException)
                    {
                        // If we received a thread abort, and that thread abort was due to shutting down
                        // or unloading, let it pass through.  Otherwise, reset the abort so we can
                        // continue processing work items.
                        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
                        {
#pragma warning disable SYSLIB0006
                            Thread.ResetAbort();
#pragma warning restore SYSLIB0006
                        }
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // If the scheduler is disposed, the cancellation token will be set and
                // we'll receive an OperationCanceledException.  That OCE should not crash the process.
            }
            finally
            {
                _taskProcessingThread.Value = false;
            }
        }

    在外层套一层try catch捕获 CancellationTokenSource 变量,取消操作(CancellationTokenSource.Cancel())产生的异常,并且忽略该异常。

    其中使用while(true),无限循环执行,?????奇了怪了,为什么以前写代码时,while(true)写了,会直接把CPU吃满,程序搞奔溃呢????自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

    关键点就在于自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

    当_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行时,如果_blockingTaskQueue容器中没有元素时,执行就会被“阻塞”,这种阻塞不会造成或者造成很小的资源浪费。

    当_blockingTaskQueue有值时,阻塞就会停止,_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行,返回一个Task对象,然后开始执行 TryExecuteTask(task) ,执行Task。

  6.继承 “TaskScheduler”后需要实现的几个方法

    

GetScheduledTasks

         protected override IEnumerable<Task>? GetScheduledTasks()
        {
            return _blockingTaskQueue.ToList();
        }

    GetScheduledTasks 返回需要被调度的 Tasks

QueueTask

         protected override void QueueTask(Task task)
        {
            // QueuedTaskScheduler 释放时,禁止向队列中添加Task
            if (_disposeCancellation.IsCancellationRequested)
            {
                throw new ObjectDisposedException(GetType().Name);
            }

            _blockingTaskQueue.Add(task);
        }

    QueueTask 将排队等候的Task加入到 “_blockingTaskQueue”队列变量中

TryExecuteTaskInline

         protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If we're already running tasks on this threads, enable inlining
            return _taskProcessingThread.Value && TryExecuteTask(task);
        }

    意思是,参数task是否在此线程上运行,请查看ThreadBasedDispatchLoop方法。

    “ThreadLocal<bool>” 该类型变量声明生命周期跟随 “构造函数”中启动的线程,且每一个线程单独一个变量,值存储在线程上。

    自此自定义“Task调度器”完成。

启动,运行QueuedTaskScheduler

  自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

  1. 创建 QueuedTaskScheduler ,其中用于执行Task的线程数为 1

  2.创建 Task ,并将其加入到指定的 Task调度器中。

  3.调试一下

    A. 创建 QueuedTaskScheduler ,创建 线程 “Thread” ,并启动线程

    自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

     B. 调试过程 

      自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

       当 _blockingTaskQueue没有Task时,执行到 _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token) 就会阻塞。

自此 自定义TaskScheduler完成。

我的源代码 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler 文章来源地址https://www.toymoban.com/news/detail-448259.html

到了这里,关于自定义一个简单的Task调度器、任务循环调度器、TaskScheduler的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Airflow大揭秘:如何让大数据任务调度变得简单高效?

    介绍:Airflow是一个开源的、用于创建、调度和监控数据管道的工作流平台。这个平台使用Python编写,并通过有向无环图(Directed Acyclic Graph, DAG)来管理任务流程,使得用户不需要知道业务数据的具体内容,只需设置任务之间的依赖关系,即可实现任务的自动调度。 在具体应

    2024年01月20日
    浏览(47)
  • 使用C语言构建一个独立栈协程和共享栈协程的任务调度系统

    使用了标准库头文件 setjmp.h 中的 setjmp 和 longjmp 两个函数,构建了一个简单的查询式协作多任务系统,支持 独立栈 和 共享栈 两种任务。 其中涉及到获取和设置栈的地址操作,因此还需要根据不同平台提供获取和设置栈的地址操作(一般是汇编语言,因为涉及到寄存器) 该

    2024年02月19日
    浏览(48)
  • vue 简单实验 自定义组件 综合应用 传参数 循环

    1.代码   2.运行结果

    2024年02月11日
    浏览(54)
  • 鸿蒙LiteOs读源码教程+向LiteOS中添加一个简单的基于线程运行时的短作业优先调度策略

    【找到了一种简单易懂的实验实现方式,适合基础较薄弱的同学,见第二部分】 最终效果如下: 依次创建了3个任务线程,以One、Two、Three指代,时间分别为15秒、30秒、10秒。 如果按生成顺序输出应该是:One-Two-Three,但我们修改了OsPriQueueEnqueue函数,由原先的“先进先出”,

    2024年02月05日
    浏览(37)
  • Spring Task(定时任务)框架

    Spring Task 是Spring框架提供的任务调度工具,可以按照约定的时间自动执行某个代码逻辑。 应用场景: 信用卡每月还款提醒 银行贷款每月还款提醒 火车票售票系统处理未支付订单 入职纪念日为用户发送通知 等等… (只要是需要定时处理当达到场景都可以使用Spring Task) cr

    2024年02月05日
    浏览(44)
  • python如何定义一个简单的队列

    哈哈,其实根本不需要用 类 (class)去定义一个 队列 (queue),因为我没有想到 list 结构竟然这么万能,它本身就是一个活生生的 队列 。 1.定义一个队列,其实就是一个列表哈哈,说明一下,队头在前面,队尾在后面。 即,这个队列a对应如下图示: 2.入队 insert tail ,这个大家

    2023年04月08日
    浏览(43)
  • 频繁设置CGroup触发linux内核bug导致CGroup running task不调度

    1. 说明 1 本篇是实际工作中linux上碰到的一个问题,一个使用了CGroup的进程处于R状态但不执行,也不退出,还不能kill,经过深入挖掘才发现是Cgroup的内核bug 2发现该bug后,去年给RedHat提交过漏洞,但可惜并未通过,不知道为什么,这里就发我博客公开了 3 前面的2个帖子《极简

    2023年04月15日
    浏览(58)
  • Verilog HDL系统任务说明语句task

    task说明语句 如果传给任务的变量和任务完成后接受任务的变量已定义,就可以用一条语句启动任务,任务完成以后控制传回启动的过程。 1.1任务的定义 定义任务的语法如下: 1.2任务的调用及变量的传递 任务调用: 1.3例子 用两种不同的方法设计一个功能相同的模块,完成

    2024年02月03日
    浏览(44)
  • Windows任务计划程序Task Scheduler笔记

    Windows任务计划程序已经存在许多年了,原来在微软的TechNet上有详细的操作介绍的,现在发现网站改版,原来的介绍居然搜索不到了,微软的平台上出现这种事情,也是比较吃惊了。 添加任务计划有两种方式,一种是通过图形界面进行,一种是使用脚本或者编程的方式进行。

    2023年04月13日
    浏览(50)
  • 基于Spring Task框架的定时任务处理

    1.1 介绍 Spring Task 是Spring框架提供的任务调度工具,可以按照约定的时间自动执行某个代码逻辑。 定位: 定时任务框架 作用: 定时自动执行某段Java代码 应用场景: 1). 信用卡每月还款提醒 2). 银行贷款每月还款提醒 3). 火车票售票系统处理未支付订单 强调: 只要是需要定时

    2024年01月23日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包