造个轮子-任务调度执行小框架-任务清单执行器实现

这篇具有很好参考价值的文章主要介绍了造个轮子-任务调度执行小框架-任务清单执行器实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

okey,上一篇文章我们提到了,如何实现它的一个清单的一个代理。这里的话我们来捋一捋我们的这个执行流程是啥:
造个轮子-任务调度执行小框架-任务清单执行器实现,手把手教你编写任务执行框架,开发语言,java,架构
所以的话,我们的我们这里今天要做的是这个执行器的一个执行。当然这里的话,我们也是分两个部分,因为这个执行器的话,是分两个部分的,一个是正常的任务执行,还有一个是这个宕机之后,我们对任务的一个恢复的处理。

执行器流程

提交流程

那么在这里的话,我得先说说这个执行器提交的流程,因为这个不说清楚的话,就比较麻烦了。
首先我们先来看到这几个类:
造个轮子-任务调度执行小框架-任务清单执行器实现,手把手教你编写任务执行框架,开发语言,java,架构
然后的话,我们的流程是这样的:

1. ExecuteMagaer 负责创建TaskWrapper
2. TaskWrapper 里面包含了代理对象,执行代理对象的执行方法
3.TaskWraaper 提交到线程池里面去

所以的话,是通过这三个环节,最终任务提交到了我们的这个线程池里面,然后进行执行。

线程池实现

okey,这里的话,我们当然需要去有一个线程池,但是这个线程池的话,有个特点,那就是:

  1. 如果你有ID,那么相同ID的任务排队执行
  2. 如果没有ID,那么就直接异步执行
    这样做的话有啥好处嘛,好处就是,假设这个ID是你的UserID,在用户下单的时候,就算重复下单,由于两次账单是顺序执行的,第一个账单执行完毕之后,改变了状态,假设此时你对商品ID上锁了,那么第二个账单执行的时候,发现商品ID锁住了,就不会继续无脑执行了。主要是实现更加细致的操作。
package com.huterox.todoscheduler.common.impl;


import com.huterox.todoscheduler.common.SerializationUtils;
import com.huterox.todoscheduler.common.TaskManager;
import com.huterox.todoscheduler.config.Configuration;
import com.huterox.todoscheduler.core.wapper.TaskWrapper;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 任务管理器
 * */
public class DefaultTaskManager implements TaskManager, Serializable {

    private final ThreadPoolExecutor executor;
    private final Map<String, BlockingQueue<Runnable>> taskQueues;
    private final Object lock;

    public DefaultTaskManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>());
        taskQueues = new HashMap<>();
        lock = new Object();
    }

    public DefaultTaskManager() {

        executor = new ThreadPoolExecutor(
                Configuration.corePoolSize,
                Configuration.maximumPoolSize,
                Configuration.keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        taskQueues = new HashMap<>();
        lock = new Object();
    }

    @Override
    public void submitTask(TaskWrapper task, String id) {
        if (id == null || id.isEmpty()) {
            executor.execute(task); // 直接执行任务
            //然后保存当前的一个状态
            saveStatus();
        } else {
            synchronized (lock) {
                BlockingQueue<Runnable> queue = taskQueues.computeIfAbsent(id, k -> new LinkedBlockingQueue<>());

                if (queue.isEmpty()) {
                    // 之前没有相同ID的任务在执行,直接提交到线程池执行
                    executor.execute(() -> {
                        try {
                            task.run(); // 执行任务
                        } finally {
                            submitNextTask(id); // 执行完毕后提交下一个任务
                            saveStatus();
                        }
                    });
                } else {
                    // 将任务加入队列中,等待前面的任务执行完毕后再执行
                    queue.offer(() -> {
                        try {
                            task.run(); // 执行任务
                        } finally {
                            submitNextTask(id); // 执行完毕后提交下一个任务
                            saveStatus();
                        }
                    });
                }
            }
        }
    }

    @Override
    public void saveStatus() {
        //保存当前的一个状态
        SerializationUtils.serializeObject(this,"runningTask","task.ser");
    }

    private void submitNextTask(String id) {
        synchronized (lock) {
            BlockingQueue<Runnable> queue = taskQueues.get(id);
            if (queue != null && !queue.isEmpty()) {
                executor.execute(queue.poll()); // 提交下一个任务
            } else {
                taskQueues.remove(id); // 队列为空时移除对应的ID
            }
        }
    }

    public ThreadPoolExecutor getExecutor() {
        return executor;
    }

    public Map<String, BlockingQueue<Runnable>> getTaskQueues() {
        return taskQueues;
    }

    public Object getLock() {
        return lock;
    }

    @Override
    public void shutdown() {
        executor.shutdown();
    }
}

这里面的代码细节,我就不说了,因为不难,再说篇幅太大了,还有好多东西要说呢。

执行器实现

ok,我们说了这个流程,我们来看到这个执行器是如何实现的。

接口

首先的话,我们是有一个接口的:

package com.huterox.todoscheduler.core.execute;

/**
 * 我们核心的调度器,通过TodoListFactory可以得到可以执行的任务清单
 * */
public interface ExecuteCore {

    void execute(String ListName);

    String getClsId();

    void run();

    //服务器意外宕机之后,恢复这个任务的时候要进行的操作
    void repair();



}

看到这个接口的话,有好几个方法,首先是执行提交的,然后是run,这个主要是这个原因:

package com.huterox.todoscheduler.core.wapper;

import com.huterox.todoscheduler.core.enumType.ExecuteType;
import com.huterox.todoscheduler.core.execute.ExecuteCore;

import java.io.Serializable;


public class TaskWrapper implements Runnable, Serializable {

    private ExecuteCore executeCore;

    private ExecuteType executeType = ExecuteType.Run;

    public TaskWrapper() {
    }

    public ExecuteCore getExecuteCore() {
        return executeCore;
    }

    public ExecuteType getExecuteType() {
        return executeType;
    }

    public void setExecuteType(ExecuteType executeType) {
        this.executeType = executeType;
    }

    public void setExecuteCore(ExecuteCore executeCore) {
        this.executeCore = executeCore;
    }

    public TaskWrapper(ExecuteCore executeCore) {
        this.executeCore = executeCore;
    }

    @Override
    public void run() {
        if(executeType==ExecuteType.Run){
            executeCore.run();
        }else if(executeType==ExecuteType.Repair){
            executeCore.repair();
        }

    }
}

这个TaskWrapper是实现了Runable接口,里面有run,所以就索性这样写了。

状态标志

之后的话,我们的项目到这里的话,只是实现了一个正向的过程,就是当项目宕机的时候,我们要尽可能去恢复任务清单的一个执行,或者状态,比如你买东西的接口,后面执行退款代码的时候,服务器宕机了,那么这个时候,我要尽可能去恢复这个宕机退款的执行。当然这里面要考虑的东西要多得多,小项目的话,你要相信第三方组件要比自己写的代码靠谱(狗头)

这里主要是这两个:
造个轮子-任务调度执行小框架-任务清单执行器实现,手把手教你编写任务执行框架,开发语言,java,架构

package com.huterox.todoscheduler.core.enumType;

import java.io.Serializable;

/**
 * 当前的任务清单执行的情况
 * */
public enum TodoListStateType implements Serializable {

    CreatFailed,
    Running,
    Fine,
    Error,
    Repairing;
}

package com.huterox.todoscheduler.core.enumType;

import java.io.Serializable;

public enum TodoItemStateType implements Serializable {

    /**
     * 需要重新运行启动,适用于强一致的清单
     * */
    Again,

    Running,

    Error,

    /**
     * 运行正常
     * */
    Fine,

    /**
     * 只需要执行修复,适用于弱一致的清单
     * */
    Repairing;
}

执行周期实现

之后的话,就是我们要实现一个完整的执行周期了:

清单代理创建

在执行的时候 ,我们先要去创建这个代理对象。但是这个代理对象的话,也有创建前执行处理器,之后处理器等待。所以这里也要进行一个处理:

    @Override
    public void execute(String ListName) {
        //1. 先拿到可执行清单对象
        todoListExBean = TodoListFactory.getInstance(ListName);
        //2. 查看返回的结果
        if(todoListExBean==null){
            //说明创建就失败了,这个失败是清单对象都没有起来
            this.running = false;
        }else {
            if(todoListExBean.getTodoListStateType()== TodoListStateType.CreatFailed){
                //创建失败了
                if(todoListExBean.getTodoListElementType()== TodoListElementType.NothingConsistency
                    || todoListExBean.getTodoListElementType()== TodoListElementType.WeakConsistency
                ){
                    //在创建阶段,只要你不是强一致性,那么我就不管你,如果创建都失败了
                    System.err.println("任务清单创建失败,取消执行");
                }else if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                    //强一致,这个时候,直接进入失败队列,这个时候也是创建失败,但是这个失败是指
                    //清单项目创建的时候有问题,清单对象起来了
                    TodoListFailedList.addFailedWithSerializable(todoListExBean);
                    this.running = false;
                }
            }
        }
    }

清单项执行

之后的 话,才是执行我们的 方法。执行之后结束。
然后对于一个清单项,它其实有这样的几个过程:

  1. 清单项创建前(在Factory的时候就可以看到)
  2. 当前清单项执行前
  3. 当前清单项执行时刻
  4. 当前清单项执行后
  5. 清单项执行异常

一个完整的清单周期包括:

  1. 清单创建前(创建之后是立刻执行的,因此没有执行前这个方法)
  2. 清单项周期
  3. 清单结束
  4. 清单执行异常
 @Override
    public void run() {

        if(!this.running){
            System.err.println("执行失败,停止执行该任务清单");
            return;
        }

        todoListExBean.setExTimes(todoListExBean.getExTimes()+1);
        todoListExBean.setTodoListStateType(TodoListStateType.Running);
        //在这里完成方法任务清单项的执行
        //同时在这里完成状态的持久化处理,方便恢复状态
        Map<Integer, TodoItemExBean> sortedMap = todoListExBean.getSortedMap();

        //开始遍历执行清单项
        for(Map.Entry<Integer, TodoItemExBean> entry:sortedMap.entrySet()){
            Integer key = entry.getKey();
            TodoItemExBean entryValue = entry.getValue();
            entryValue.setTodoItemStateType(TodoItemStateType.Running);
            //这里开始按照生命周期执行代码
            try {
                if (entryValue.getTodoItemBeforeRunningHandler()!=null)
                {
                    //执行第一个运行时,运行前的代码
                    TodoItemBeforeRunningHandler todoItemBeforeRunningHandler = entryValue.
                            getTodoItemBeforeRunningHandler();
                    boolean concierge = todoItemBeforeRunningHandler.concierge(
                            entryValue.getStateWrapper(),
                            todoListExBean.getStateWrapper()
                    );
                    if(!concierge){

                        //没有满足通过条件,需要跳过或者终止
                        if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
                            System.err.println("任务清单:"+todoListExBean.getTodoListName()
                                    +"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
                                    "方法:"+entryValue.getWrapperMethod().getName()+"未通过运行时执行前Handler"
                                    +"正在前往执行下一个任务项"
                                    );
                        }else {
                            //查看当前任务清单类型,如果是那种强一致性的,那就加入失败队列,等待重启
                            if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
                                //强一致,这个时候,直接进入失败队列
                                entryValue.setTodoItemStateType(TodoItemStateType.Again);
                                TodoListFailedList.addFailedWithSerializable(todoListExBean);
                            }else if(
                                todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
                            ){
                                entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                            }
                            return;
                        }
                        entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                        //这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
                        todoItemBeforeRunningHandler.repair(
                                entryValue.getStateWrapper(),
                                todoListExBean.getStateWrapper()
                        );
                    }
                }

                //执行运行时刻代码
                Method wrapperMethod = entryValue.getWrapperMethod();
                wrapperMethod.setAccessible(true);
                Parameter[] parameters = wrapperMethod.getParameters();

                // 构造参数数组
                Object[] argsArray = new Object[parameters.length];
                for (int i = 0; i < parameters.length; i++) {
                    Parameter parameter = parameters[i];
                    if (parameter.getType() == ListStateWrapper.class) {
                        // 设置特定参数的值
                        argsArray[i] = todoListExBean.getStateWrapper();
                    }else if(parameter.getType() == ItemStateWrapper.class){
                        argsArray[i] = entryValue.getStateWrapper();
                    }
                }
                // 执行方法
                Object result = wrapperMethod.invoke(entryValue, argsArray);

                //执行后置处理,这个执行流程和前置是一样的
                if (entryValue.getTodoItemAfterRunningHandler()!=null){
                    TodoItemAfterRunningHandler todoItemAfterRunningHandler = entryValue.getTodoItemAfterRunningHandler();
                    boolean concierge = todoItemAfterRunningHandler.concierge(
                            entryValue.getStateWrapper(),
                            todoListExBean.getStateWrapper()
                    );
                    if(!concierge){
                        entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                        if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
                            System.err.println("任务清单:"+todoListExBean.getTodoListName()
                                    +"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
                                    "方法:"+entryValue.getWrapperMethod().getName()+"未通过运行时执行前Handler"
                                    +"正在前往执行下一个任务项"
                            );
                        }else {
                            if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
                                //强一致,这个时候,直接进入失败队列
                                entryValue.setTodoItemStateType(TodoItemStateType.Again);
                                TodoListFailedList.addFailedWithSerializable(todoListExBean);
                            }else if(
                                    todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
                            ){
                                entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
                            }
                            return;
                        }
                        //这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
                        todoItemAfterRunningHandler.repair(
                                entryValue.getStateWrapper(),
                                todoListExBean.getStateWrapper()
                        );
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                //对错误进行处理
                entryValue.setTodoItemStateType(TodoItemStateType.Error);
                if(entryValue.getTodoItemErrorHandler()!=null){
                    try {
                        TodoItemErrorHandler todoItemErrorHandler = entryValue.getTodoItemErrorHandler();
                        todoItemErrorHandler.concierge(
                                entryValue.getStateWrapper(),
                                todoListExBean.getStateWrapper()
                        );
                    }catch (Exception e1){
                        e1.printStackTrace();
                        //如果这个都执行失败了,那真的没救了
                        //加入失败列表看看了,只能,如果是一定要执行的话
                        if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                            TodoListFailedList.addFailedWithSerializable(todoListExBean);
                        }else {
                            System.err.println("任务强制终止");
                        }
                        return;
                    }
                }
            }
            //此时这个任务清单的小项目才算执行正常
            entryValue.setTodoItemStateType(TodoItemStateType.Fine);
        }
        //清单项目是执行完毕了,那么接下来是这个清单的后置处理部分
        if(todoListExBean.getTodoListAfterHandler()!=null) {
            TodoListAfterHandler todoListAfterHandler = todoListExBean.getTodoListAfterHandler();
            try {
                boolean concierge = todoListAfterHandler.concierge(todoListExBean.getStateWrapper());
                if(!concierge) {

                    todoListExBean.setTodoListStateType(TodoListStateType.Repairing);
                    //这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
                    todoListAfterHandler.repair(
                            todoListExBean.getStateWrapper()
                    );
                }
            } catch (Exception e) {
                e.printStackTrace();
                todoListExBean.setTodoListStateType(TodoListStateType.Error);
                //对错误进行处理
                if(todoListExBean.getTodoListErrorHandler()!=null){
                    try {
                        TodoListErrorHandler todoListErrorHandler = todoListExBean.getTodoListErrorHandler();
                        todoListErrorHandler.concierge(
                                todoListExBean.getStateWrapper()
                        );
                    }catch (Exception e1){
                        e1.printStackTrace();
                        if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
                            todoListExBean.setTodoListStateType(TodoListStateType.Error);
                            TodoListFailedList.addFailedWithSerializable(todoListExBean);
                        }else {
                            System.err.println("任务强制终止");
                        }
                    }
                }
            }
        }

        todoListExBean.setTodoListStateType(TodoListStateType.Fine);
    }

那么这里的核心代码其实就是这一块儿。当然这个家伙的实现还有一部分,关于这个状态恢复的。

总结

那么这篇博文就到这里,今天还有一篇,这里再重复一次(第二次了),我们的项目地址是:https://gitee.com/Huterox/htodo-sechudling文章来源地址https://www.toymoban.com/news/detail-641438.html

到了这里,关于造个轮子-任务调度执行小框架-任务清单执行器实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Mybatis执行器(Executor)

    Executor Executor是MyBatis的核心接口之一,其中定义了数据库操作的基本方法。在实际应用中经常涉及的SqlSession接口的功能,都是基于Executor接口实现的。 BaseExecutor BaseExecutor是一个实现了Executor接口的抽象类,它实现了Executor 接口的大部分方法。BaseExecutor 中主要提供了缓存管理和

    2024年04月16日
    浏览(23)
  • Junit执行器Runner探索之旅

    单元测试是每个程序员必备的技能,而Runner是每个单元测试类必有属性。本文通过解读Junit源码,介绍junit中每个执行器的使用方法,让读者在单元测试时,可以灵活的使用Runner执行器。 在今年的敏捷团队建设中,京东物流通过Suite执行器实现了一键自动化单元测试。Juint除了

    2024年02月08日
    浏览(27)
  • PgSQL-执行器机制-Unique算子

    PgSQL-执行器机制-Unique算子 PgSQL中输出去重的元组有多种方法,比如通过HashAgg或者GroupAgg。这里我们介绍第三种方法,通过Unique算子来完成这个功能。当然语句上可以是:select distinct(id1) from t; 执行器执行算子的函数都是ExecXXX,其中XXX代表某个算子。Unique算子的执行是由函数

    2024年02月07日
    浏览(23)
  • 机械臂速成小指南(五):末端执行器

    👨‍🏫🥰🥳需要机械臂相关资源的同学可以在评论区中留言哦🤖😽🦄 指南目录📖: 🎉🎉机械臂速成小指南(零点五):机械臂相关资源🎉🎉 机械臂速成小指南(零):指南主要内容及分析方法 机械臂速成小指南(一):机械臂发展概况 机械臂速成小指南(二):

    2024年02月03日
    浏览(28)
  • 【PostgreSQL内核学习(二十三)—— 执行器(ExecEndPlan)】

    声明 :本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。 本文主要参考了 postgresql-10.1 的开源代码和《OpenGauss数据库源码解析》和《PostgresSQL数据库内核分析》一书   在这三

    2024年01月17日
    浏览(38)
  • 【PostgreSQL内核学习(二十一)—— 执行器(InitPlan)】

    声明 :本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。 本文主要参考了 postgresql-10.1 的开源代码和《OpenGauss数据库源码解析》和《PostgresSQL数据库内核分析》一书   在【

    2024年01月16日
    浏览(35)
  • Camunda 7.x 系列【53】Job 执行器

    有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 2.7.9 本系列Camunda 版本 7.19.0 源码地址:https://gitee.com/pearl-organization/camunda-study-demo Job Executor 即任务执行器,是 Camunda 中的一个调度组件,负责执行异步后台作业。 Job 表示 Job Executor 执行的某一作业,例如,在定

    2024年02月09日
    浏览(27)
  • xxl-job执行器无法自动注册

    问题描述 在springboot项目里配置了xxl-job2.3.0,但是执行器无法自动注册 yaml配置如下: 执行器无法自动注册到xxl-job-admin 排查过程 经过debug发现,是spring没有加载xxlJobExecutor这个Bean debug流程(SpringApplication.run()–SpringApplication.refreshContext()–SpringApplication.refresh() --SpringApplication

    2024年02月16日
    浏览(24)
  • 【源码分析】XXL-JOB的执行器的注册流程

    目的:分析xxl-job执行器的注册过程 流程: 获取执行器中所有被注解( @xxlJjob )修饰的 handler 执行器注册过程 执行器中任务执行过程 版本: xxl-job 2.3.1 建议:下载 xxl-job 源码,按流程图 debug 调试, 看堆栈信息并按文章内容理解执行流程 。 完整流程图: 部分流程图: 首先启

    2023年04月22日
    浏览(31)
  • 【微软】【ICLR 2022】TAPEX:通过学习神经 SQL 执行器进行表预训练

    重磅推荐专栏: 《大模型AIGC》;《课程大纲》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,

    2024年02月05日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包