前言
okey,上一篇文章我们提到了,如何实现它的一个清单的一个代理。这里的话我们来捋一捋我们的这个执行流程是啥:
所以的话,我们的我们这里今天要做的是这个执行器的一个执行。当然这里的话,我们也是分两个部分,因为这个执行器的话,是分两个部分的,一个是正常的任务执行,还有一个是这个宕机之后,我们对任务的一个恢复的处理。
执行器流程
提交流程
那么在这里的话,我得先说说这个执行器提交的流程,因为这个不说清楚的话,就比较麻烦了。
首先我们先来看到这几个类:
然后的话,我们的流程是这样的:
1. ExecuteMagaer 负责创建TaskWrapper
2. TaskWrapper 里面包含了代理对象,执行代理对象的执行方法
3. 将TaskWraaper 提交到线程池里面去
所以的话,是通过这三个环节,最终任务提交到了我们的这个线程池里面,然后进行执行。
线程池实现
okey,这里的话,我们当然需要去有一个线程池,但是这个线程池的话,有个特点,那就是:
- 如果你有ID,那么相同ID的任务排队执行
- 如果没有ID,那么就直接异步执行
这样做的话有啥好处嘛,好处就是,假设这个ID是你的UserID,在用户下单的时候,就算重复下单,由于两次账单是顺序执行的,第一个账单执行完毕之后,改变了状态,假设此时你对商品ID上锁了,那么第二个账单执行的时候,发现商品ID锁住了,就不会继续无脑执行了。主要是实现更加细致的操作。
package com.huterox.todoscheduler.common.impl;
import com.huterox.todoscheduler.common.SerializationUtils;
import com.huterox.todoscheduler.common.TaskManager;
import com.huterox.todoscheduler.config.Configuration;
import com.huterox.todoscheduler.core.wapper.TaskWrapper;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
/**
* 任务管理器
* */
public class DefaultTaskManager implements TaskManager, Serializable {
private final ThreadPoolExecutor executor;
private final Map<String, BlockingQueue<Runnable>> taskQueues;
private final Object lock;
public DefaultTaskManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>());
taskQueues = new HashMap<>();
lock = new Object();
}
public DefaultTaskManager() {
executor = new ThreadPoolExecutor(
Configuration.corePoolSize,
Configuration.maximumPoolSize,
Configuration.keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
taskQueues = new HashMap<>();
lock = new Object();
}
@Override
public void submitTask(TaskWrapper task, String id) {
if (id == null || id.isEmpty()) {
executor.execute(task); // 直接执行任务
//然后保存当前的一个状态
saveStatus();
} else {
synchronized (lock) {
BlockingQueue<Runnable> queue = taskQueues.computeIfAbsent(id, k -> new LinkedBlockingQueue<>());
if (queue.isEmpty()) {
// 之前没有相同ID的任务在执行,直接提交到线程池执行
executor.execute(() -> {
try {
task.run(); // 执行任务
} finally {
submitNextTask(id); // 执行完毕后提交下一个任务
saveStatus();
}
});
} else {
// 将任务加入队列中,等待前面的任务执行完毕后再执行
queue.offer(() -> {
try {
task.run(); // 执行任务
} finally {
submitNextTask(id); // 执行完毕后提交下一个任务
saveStatus();
}
});
}
}
}
}
@Override
public void saveStatus() {
//保存当前的一个状态
SerializationUtils.serializeObject(this,"runningTask","task.ser");
}
private void submitNextTask(String id) {
synchronized (lock) {
BlockingQueue<Runnable> queue = taskQueues.get(id);
if (queue != null && !queue.isEmpty()) {
executor.execute(queue.poll()); // 提交下一个任务
} else {
taskQueues.remove(id); // 队列为空时移除对应的ID
}
}
}
public ThreadPoolExecutor getExecutor() {
return executor;
}
public Map<String, BlockingQueue<Runnable>> getTaskQueues() {
return taskQueues;
}
public Object getLock() {
return lock;
}
@Override
public void shutdown() {
executor.shutdown();
}
}
这里面的代码细节,我就不说了,因为不难,再说篇幅太大了,还有好多东西要说呢。
执行器实现
ok,我们说了这个流程,我们来看到这个执行器是如何实现的。
接口
首先的话,我们是有一个接口的:
package com.huterox.todoscheduler.core.execute;
/**
* 我们核心的调度器,通过TodoListFactory可以得到可以执行的任务清单
* */
public interface ExecuteCore {
void execute(String ListName);
String getClsId();
void run();
//服务器意外宕机之后,恢复这个任务的时候要进行的操作
void repair();
}
看到这个接口的话,有好几个方法,首先是执行提交的,然后是run,这个主要是这个原因:
package com.huterox.todoscheduler.core.wapper;
import com.huterox.todoscheduler.core.enumType.ExecuteType;
import com.huterox.todoscheduler.core.execute.ExecuteCore;
import java.io.Serializable;
public class TaskWrapper implements Runnable, Serializable {
private ExecuteCore executeCore;
private ExecuteType executeType = ExecuteType.Run;
public TaskWrapper() {
}
public ExecuteCore getExecuteCore() {
return executeCore;
}
public ExecuteType getExecuteType() {
return executeType;
}
public void setExecuteType(ExecuteType executeType) {
this.executeType = executeType;
}
public void setExecuteCore(ExecuteCore executeCore) {
this.executeCore = executeCore;
}
public TaskWrapper(ExecuteCore executeCore) {
this.executeCore = executeCore;
}
@Override
public void run() {
if(executeType==ExecuteType.Run){
executeCore.run();
}else if(executeType==ExecuteType.Repair){
executeCore.repair();
}
}
}
这个TaskWrapper是实现了Runable接口,里面有run,所以就索性这样写了。
状态标志
之后的话,我们的项目到这里的话,只是实现了一个正向的过程,就是当项目宕机的时候,我们要尽可能去恢复任务清单的一个执行,或者状态,比如你买东西的接口,后面执行退款代码的时候,服务器宕机了,那么这个时候,我要尽可能去恢复这个宕机退款的执行。当然这里面要考虑的东西要多得多,小项目的话,你要相信第三方组件要比自己写的代码靠谱(狗头)
这里主要是这两个:
package com.huterox.todoscheduler.core.enumType;
import java.io.Serializable;
/**
* 当前的任务清单执行的情况
* */
public enum TodoListStateType implements Serializable {
CreatFailed,
Running,
Fine,
Error,
Repairing;
}
package com.huterox.todoscheduler.core.enumType;
import java.io.Serializable;
public enum TodoItemStateType implements Serializable {
/**
* 需要重新运行启动,适用于强一致的清单
* */
Again,
Running,
Error,
/**
* 运行正常
* */
Fine,
/**
* 只需要执行修复,适用于弱一致的清单
* */
Repairing;
}
执行周期实现
之后的话,就是我们要实现一个完整的执行周期了:
清单代理创建
在执行的时候 ,我们先要去创建这个代理对象。但是这个代理对象的话,也有创建前执行处理器,之后处理器等待。所以这里也要进行一个处理:
@Override
public void execute(String ListName) {
//1. 先拿到可执行清单对象
todoListExBean = TodoListFactory.getInstance(ListName);
//2. 查看返回的结果
if(todoListExBean==null){
//说明创建就失败了,这个失败是清单对象都没有起来
this.running = false;
}else {
if(todoListExBean.getTodoListStateType()== TodoListStateType.CreatFailed){
//创建失败了
if(todoListExBean.getTodoListElementType()== TodoListElementType.NothingConsistency
|| todoListExBean.getTodoListElementType()== TodoListElementType.WeakConsistency
){
//在创建阶段,只要你不是强一致性,那么我就不管你,如果创建都失败了
System.err.println("任务清单创建失败,取消执行");
}else if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
//强一致,这个时候,直接进入失败队列,这个时候也是创建失败,但是这个失败是指
//清单项目创建的时候有问题,清单对象起来了
TodoListFailedList.addFailedWithSerializable(todoListExBean);
this.running = false;
}
}
}
}
清单项执行
之后的 话,才是执行我们的 方法。执行之后结束。
然后对于一个清单项,它其实有这样的几个过程:
- 清单项创建前(在Factory的时候就可以看到)
- 当前清单项执行前
- 当前清单项执行时刻
- 当前清单项执行后
- 清单项执行异常
一个完整的清单周期包括:
- 清单创建前(创建之后是立刻执行的,因此没有执行前这个方法)
- 清单项周期
- 清单结束
- 清单执行异常
@Override
public void run() {
if(!this.running){
System.err.println("执行失败,停止执行该任务清单");
return;
}
todoListExBean.setExTimes(todoListExBean.getExTimes()+1);
todoListExBean.setTodoListStateType(TodoListStateType.Running);
//在这里完成方法任务清单项的执行
//同时在这里完成状态的持久化处理,方便恢复状态
Map<Integer, TodoItemExBean> sortedMap = todoListExBean.getSortedMap();
//开始遍历执行清单项
for(Map.Entry<Integer, TodoItemExBean> entry:sortedMap.entrySet()){
Integer key = entry.getKey();
TodoItemExBean entryValue = entry.getValue();
entryValue.setTodoItemStateType(TodoItemStateType.Running);
//这里开始按照生命周期执行代码
try {
if (entryValue.getTodoItemBeforeRunningHandler()!=null)
{
//执行第一个运行时,运行前的代码
TodoItemBeforeRunningHandler todoItemBeforeRunningHandler = entryValue.
getTodoItemBeforeRunningHandler();
boolean concierge = todoItemBeforeRunningHandler.concierge(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
if(!concierge){
//没有满足通过条件,需要跳过或者终止
if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
System.err.println("任务清单:"+todoListExBean.getTodoListName()
+"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
"方法:"+entryValue.getWrapperMethod().getName()+"未通过运行时执行前Handler"
+"正在前往执行下一个任务项"
);
}else {
//查看当前任务清单类型,如果是那种强一致性的,那就加入失败队列,等待重启
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
//强一致,这个时候,直接进入失败队列
entryValue.setTodoItemStateType(TodoItemStateType.Again);
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else if(
todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
){
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
}
return;
}
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
//这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
todoItemBeforeRunningHandler.repair(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
}
}
//执行运行时刻代码
Method wrapperMethod = entryValue.getWrapperMethod();
wrapperMethod.setAccessible(true);
Parameter[] parameters = wrapperMethod.getParameters();
// 构造参数数组
Object[] argsArray = new Object[parameters.length];
for (int i = 0; i < parameters.length; i++) {
Parameter parameter = parameters[i];
if (parameter.getType() == ListStateWrapper.class) {
// 设置特定参数的值
argsArray[i] = todoListExBean.getStateWrapper();
}else if(parameter.getType() == ItemStateWrapper.class){
argsArray[i] = entryValue.getStateWrapper();
}
}
// 执行方法
Object result = wrapperMethod.invoke(entryValue, argsArray);
//执行后置处理,这个执行流程和前置是一样的
if (entryValue.getTodoItemAfterRunningHandler()!=null){
TodoItemAfterRunningHandler todoItemAfterRunningHandler = entryValue.getTodoItemAfterRunningHandler();
boolean concierge = todoItemAfterRunningHandler.concierge(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
if(!concierge){
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
if(entryValue.getTodoItemElementType()== TodoItemElementType.CONTINUTEITEM){
System.err.println("任务清单:"+todoListExBean.getTodoListName()
+"-Cid:" + todoListExBean.getClsId()+"第"+entryValue.getOrder()+
"方法:"+entryValue.getWrapperMethod().getName()+"未通过运行时执行前Handler"
+"正在前往执行下一个任务项"
);
}else {
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency) {
//强一致,这个时候,直接进入失败队列
entryValue.setTodoItemStateType(TodoItemStateType.Again);
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else if(
todoListExBean.getTodoListElementType()==TodoListElementType.WeakConsistency
){
entryValue.setTodoItemStateType(TodoItemStateType.Repairing);
}
return;
}
//这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
todoItemAfterRunningHandler.repair(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
}
}
} catch (Exception e) {
e.printStackTrace();
//对错误进行处理
entryValue.setTodoItemStateType(TodoItemStateType.Error);
if(entryValue.getTodoItemErrorHandler()!=null){
try {
TodoItemErrorHandler todoItemErrorHandler = entryValue.getTodoItemErrorHandler();
todoItemErrorHandler.concierge(
entryValue.getStateWrapper(),
todoListExBean.getStateWrapper()
);
}catch (Exception e1){
e1.printStackTrace();
//如果这个都执行失败了,那真的没救了
//加入失败列表看看了,只能,如果是一定要执行的话
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else {
System.err.println("任务强制终止");
}
return;
}
}
}
//此时这个任务清单的小项目才算执行正常
entryValue.setTodoItemStateType(TodoItemStateType.Fine);
}
//清单项目是执行完毕了,那么接下来是这个清单的后置处理部分
if(todoListExBean.getTodoListAfterHandler()!=null) {
TodoListAfterHandler todoListAfterHandler = todoListExBean.getTodoListAfterHandler();
try {
boolean concierge = todoListAfterHandler.concierge(todoListExBean.getStateWrapper());
if(!concierge) {
todoListExBean.setTodoListStateType(TodoListStateType.Repairing);
//这个时候由于没有满足条件,那么这个时候要执行对应的恢复函数
todoListAfterHandler.repair(
todoListExBean.getStateWrapper()
);
}
} catch (Exception e) {
e.printStackTrace();
todoListExBean.setTodoListStateType(TodoListStateType.Error);
//对错误进行处理
if(todoListExBean.getTodoListErrorHandler()!=null){
try {
TodoListErrorHandler todoListErrorHandler = todoListExBean.getTodoListErrorHandler();
todoListErrorHandler.concierge(
todoListExBean.getStateWrapper()
);
}catch (Exception e1){
e1.printStackTrace();
if(todoListExBean.getTodoListElementType()==TodoListElementType.StrongConsistency){
todoListExBean.setTodoListStateType(TodoListStateType.Error);
TodoListFailedList.addFailedWithSerializable(todoListExBean);
}else {
System.err.println("任务强制终止");
}
}
}
}
}
todoListExBean.setTodoListStateType(TodoListStateType.Fine);
}
那么这里的核心代码其实就是这一块儿。当然这个家伙的实现还有一部分,关于这个状态恢复的。文章来源:https://www.toymoban.com/news/detail-641438.html
总结
那么这篇博文就到这里,今天还有一篇,这里再重复一次(第二次了),我们的项目地址是:https://gitee.com/Huterox/htodo-sechudling文章来源地址https://www.toymoban.com/news/detail-641438.html
到了这里,关于造个轮子-任务调度执行小框架-任务清单执行器实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!