一种基于springboot、redis的分布式任务引擎的实现(一)

这篇具有很好参考价值的文章主要介绍了一种基于springboot、redis的分布式任务引擎的实现(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 总体思路是,主节点接收到任务请求,将根据任务情况拆分成多个任务块,将任务块标识的主键放入redis。发送redis消息,等待其他节点运行完毕,结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执行任务、结束处理。

1、主节点接收任务请求

    @Override
    public void executeTaskInfo(PrepareDTO prepareDTO) {
        //异常标记
        String taskInfo = prepareDTO.getTaskId();
        //任务分组状态
        String taskStatus = "";
        try {
            log.info("数据准备任务并设定任务执行状态,{}", prepareDTO);
            this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);
            //给redis集合中放计算对象
            log.info("开始放入计算任务:{}", prepareDTO);
            boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);
            if (!getTaskFlag) {
                taskStatus = String.format("没有获取数据或计划已取消,%s", taskInfo);
                log.error(taskStatus);
                throw new Exception(taskStatus);
            }
            //发消息执行缓存中任务
            log.info("发消息执行任务:{}", prepareDTO);
            sendMessage(prepareDTO);
            //等待任务执行完毕
            log.info("等待任务执行结果");
            taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);
        } catch (Exception e) {//捕获日志
            e.printStackTrace();
            taskStatus = "获取任务状态异常" + e;
            log.info(taskStatus);
            dataPrepareBo.putExceptionMsg2Cache(taskInfo, "数据准备分发计算任务线程异常:" + taskStatus);
        } finally {
            //做任务结束处理
            this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);
        }
    }

2,发送消息

    @Override
    public void sendMessage(String topic, String msg) {
        this.redisTemplate.convertAndSend(topic, msg);
    }

3,节点接收任务,并执行

    public void doUpLoadTask(String msg) throws Exception {
        log.info("开始执行明细任务{}" + msg);
        String taskId = this.getTaskId(msg);
        try {
            Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));
            if(cancelFlag != null && "1".equals(cancelFlag.toString())){
                log.info("本次任务已取消");
                return;
            }
            //上传本机执行信息到redis
            this.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
            //从缓存获取任务,获取任务后启线程执行任务。如果没获取到任务,则本节点任务执行完毕
            //循环获取任务
            this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);
            //处理结束
            this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
        } catch (Exception e) {
			//记录日志
            taskUpldExeLogCDTO.setRunStas("-1");
            String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;
            taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);
            throw e;
        } finally {
			//记录日志
            taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());
            if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//异常结束
                this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务异常");
            } else {//正常结束
                taskUpldExeLogCDTO.setRunStas("1");
                this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务正常");
            }
        }
    }

4,开启线程执行任务

    @Override
    public CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {
        List<Future> futureList = new ArrayList<>();
        //开始执行明细任务处理
        ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);
        ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());
        for(int i = 0 ; i < parallelProcessNum ; i++) {
            DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId
                    , redisTemplate, calculationBo, null, null);
            Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);
            futureList.add(future);
        }

        if (!CollectionUtil.isEmpty(futureList)) {
            futureList.forEach(f -> {
                try {
                    f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        log.info("本节点执行分组任务执行完毕{}", taskId + ":" + GroupConstant.IDENTITY);
        return null;
    }

5,线程执行明细

    @Override
    public ResponseDTO call() throws Exception {
        //执行任务
        while(true) {
            FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
            log.debug("取出任务:" + filterTableUniqueDTO);
            if(null == filterTableUniqueDTO) {
                break ;
            }
            long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
            log.info("生成文件剩余任务数量:" + lastNum);
//           处理任务
            calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);
        }
        return null;
    }

以上是主要入口总体思路涉及代码,详细实现整理起来涉及内容比较繁多,将在第二部分分享。文章来源地址https://www.toymoban.com/news/detail-667404.html

到了这里,关于一种基于springboot、redis的分布式任务引擎的实现(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot 定时任务,分布式下幂等性如何解决

    在分布式环境下,定时任务的幂等性问题需要考虑多个节点之间的数据一致性和事务处理。 一种解决方法是使用分布式锁来保证同一时间只有一个节点能够执行该任务。具体实现可以使用Redis或Zookeeper等分布式协调工具提供的分布式锁功能。 另一种解决方法是使用消息队列来

    2024年02月11日
    浏览(21)
  • SpringBoot使用Redis实现分布式缓存

    ✅作者简介:2022年 博客新星 第八 。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏:SpringBoot 框架从入门到精通 ✨特色专栏:国学周更-心性养成之路 🥭本文内容:SpringBoot使用

    2023年04月09日
    浏览(26)
  • SpringBoot 定时任务 @Scheduled 集群环境优化 (使用分布式锁, 注解形式)

    SpringBoot提供了 Schedule模块完美支持定时任务的执行 在实际开发中由于项目部署在分布式或集群服务器上 会导致定时任务多次触发 因此,使用redis分布锁机制可以有效避免多次执行定时任务   核心方法是org.springframework.data.redis.core包下的  setIfAbsent() 方法 返回值为布尔类型

    2024年02月15日
    浏览(27)
  • springboot3 redis 实现分布式锁

    分布式锁介绍 分布式锁是一种在分布式系统中用于控制不同节点上的进程或线程对共享资源进行互斥访问的技术机制。 在分布式环境中,多个服务可能同时访问和操作共享资源,如数据库、文件系统等。为了保持数据的一致性和完整性,需要确保在同一时刻只有一个服务能

    2024年04月16日
    浏览(23)
  • XXL-JOB分布式任务调度平台搭建以及和SpringBoot整合应用

    XXL-JOB 是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 可以前往 Gitee 地址进行下载使用:   代码结构如下: 运行 SQL 文件至本地数据库: 修改 xxl-job-admin 模块的 yml 文件

    2023年04月21日
    浏览(30)
  • SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】

    上一篇实现了单体应用下如何上锁,这一篇主要说明如何在分布式场景下上锁 上一篇地址:加锁 需要注意的点是: 在上锁和释放锁的过程中要保证 原子性操作 核心是上锁和解锁的过程 关于解锁使用脚本参考:SET key value [EX seconds] [PX milliseconds] [NX|XX] 3.1 一个服务按照多个端口同时

    2023年04月10日
    浏览(34)
  • 2、基于redis实现分布式锁

    借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。 多个客户端同时获取锁(setnx) 获取成功,执行业务逻辑,执行完成释放锁(del) 其他客户端等

    2024年02月15日
    浏览(44)
  • 基于 Redis 实现分布式限流

    分布式限流是指通过将限流策略嵌入到分布式系统中,以控制流量或保护服务,保证系统在高并发访问情况下不被过载。 分布式限流可以防止系统因大量请求同时到达导致压力过大而崩溃,从而提高系统的稳定性和可靠性。同时,它可以使得业务资源能够更好地分配,提高系

    2024年02月12日
    浏览(27)
  • 基于Mongodb分布式锁简单实现,解决定时任务并发执行问题

    我们日常开发过程,会有一些定时任务的代码来统计一些系统运行数据,但是我们应用有需要部署多个实例,传统的通过配置文件来控制定时任务是否启动又太过繁琐,而且还经常出错,导致一些异常数据的产生 网上有很多分布式锁的实现方案,基于redis、zk、等有很多,但

    2023年04月18日
    浏览(33)
  • 自定义注解,基于redis实现分布式锁

    1.1、注解的基础知识 实现自定义注解其实很简单,格式基本都差不多。也就参数可能变一变。 @Retention:取值决定了注解在什么时候生效,一般都是取运行时,也就是RetentionPolicy.RUNTIME。 @Target:决定了这个注解可以使用在哪些地方,可以取方法,字段,类等。 注解这就定义

    2024年02月08日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包