总体思路是,主节点接收到任务请求,将根据任务情况拆分成多个任务块,将任务块标识的主键放入redis。发送redis消息,等待其他节点运行完毕,结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执行任务、结束处理。
1、主节点接收任务请求
@Override
public void executeTaskInfo(PrepareDTO prepareDTO) {
//异常标记
String taskInfo = prepareDTO.getTaskId();
//任务分组状态
String taskStatus = "";
try {
log.info("数据准备任务并设定任务执行状态,{}", prepareDTO);
this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);
//给redis集合中放计算对象
log.info("开始放入计算任务:{}", prepareDTO);
boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);
if (!getTaskFlag) {
taskStatus = String.format("没有获取数据或计划已取消,%s", taskInfo);
log.error(taskStatus);
throw new Exception(taskStatus);
}
//发消息执行缓存中任务
log.info("发消息执行任务:{}", prepareDTO);
sendMessage(prepareDTO);
//等待任务执行完毕
log.info("等待任务执行结果");
taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);
} catch (Exception e) {//捕获日志
e.printStackTrace();
taskStatus = "获取任务状态异常" + e;
log.info(taskStatus);
dataPrepareBo.putExceptionMsg2Cache(taskInfo, "数据准备分发计算任务线程异常:" + taskStatus);
} finally {
//做任务结束处理
this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);
}
}
2,发送消息
@Override
public void sendMessage(String topic, String msg) {
this.redisTemplate.convertAndSend(topic, msg);
}
3,节点接收任务,并执行
public void doUpLoadTask(String msg) throws Exception {
log.info("开始执行明细任务{}" + msg);
String taskId = this.getTaskId(msg);
try {
Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));
if(cancelFlag != null && "1".equals(cancelFlag.toString())){
log.info("本次任务已取消");
return;
}
//上传本机执行信息到redis
this.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
//从缓存获取任务,获取任务后启线程执行任务。如果没获取到任务,则本节点任务执行完毕
//循环获取任务
this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);
//处理结束
this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
} catch (Exception e) {
//记录日志
taskUpldExeLogCDTO.setRunStas("-1");
String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;
taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);
throw e;
} finally {
//记录日志
taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());
if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//异常结束
this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务异常");
} else {//正常结束
taskUpldExeLogCDTO.setRunStas("1");
this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务正常");
}
}
}
4,开启线程执行任务
@Override
public CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {
List<Future> futureList = new ArrayList<>();
//开始执行明细任务处理
ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);
ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());
for(int i = 0 ; i < parallelProcessNum ; i++) {
DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId
, redisTemplate, calculationBo, null, null);
Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);
futureList.add(future);
}
if (!CollectionUtil.isEmpty(futureList)) {
futureList.forEach(f -> {
try {
f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
}
log.info("本节点执行分组任务执行完毕{}", taskId + ":" + GroupConstant.IDENTITY);
return null;
}
5,线程执行明细文章来源:https://www.toymoban.com/news/detail-667404.html
@Override
public ResponseDTO call() throws Exception {
//执行任务
while(true) {
FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
log.debug("取出任务:" + filterTableUniqueDTO);
if(null == filterTableUniqueDTO) {
break ;
}
long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
log.info("生成文件剩余任务数量:" + lastNum);
// 处理任务
calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);
}
return null;
}
以上是主要入口总体思路涉及代码,详细实现整理起来涉及内容比较繁多,将在第二部分分享。文章来源地址https://www.toymoban.com/news/detail-667404.html
到了这里,关于一种基于springboot、redis的分布式任务引擎的实现(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!