Java | 一分钟掌握定时任务 | 9 - PowerJob分布式定时任务

这篇具有很好参考价值的文章主要介绍了Java | 一分钟掌握定时任务 | 9 - PowerJob分布式定时任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

作者:Mars酱

声明:本文章由Mars酱整理编写,部分内容来源于网络,如有疑问请联系本人。

转载:欢迎转载,转载前先请联系我!

前言

我们选择一套框架或者技术的时候,一定要知道它的特点和功能,不能为了(学习)技术而(选择)技术,那是对产品的不负责任。官方说有类似情况的,可以使用PowJob:

  • 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表、未支付订单超时取消等。
  • 有需要​​全部机器一同执行​​的业务场景:如使用广播执行模式清理集群日志。
  • 有需要​​分布式处理​​的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce 处理器完成任务的分发,调动整个集群加速计算。
  • 有需要​​延迟执行​​某些任务的业务场景:比如订单过期处理等。

架构图

这是官方提供的架构图:

单机处理器、广播处理器、map 处理器和 mapreduce 处理器。,Java,java,分布式,大数据

从架构图颜色可以看出主体就分了两个大块和一小条:调度中心、执行器、Akka。

  • ​调度中心 powerjob-server​​:PowerJob 的设计目标为企业级的分布式任务调度平台,即成为调度中间件,让任意业务线的应用仅需要依赖 powerjob-worker 即可获取任务调度与分布式计算的能力。因此,PowerJob 的理想部署模式为一个公司统一部署 powerjob-server 集群,各业务线应用直接接入使用。
  • ​执行器 powerjob-worker​​:根据以前对定时任务的理解,用过Quartz的话,这里相当于Job这个接口;用过ElasticJob的话,最起码相当于Job接口中的一种,比如SimpleJob接口;用过xxl-job的话,这里也是同理,相当于使用了注解​​@XxlJob​​的方法。因此,所有需要执行的任务,mars酱的理解都需要依赖powerjob-worker的。
  • ​Akka ActorSystem​​:基于Actor模型设计的,专用于构建高度并发、分布式和弹性的工具包,号称单台机器上高达 200 亿条消息/秒。从架构图来看,PowerJob用来做数据交换传输,这么牛逼的中间协议处理者,看来PowerJob团队一定是想往大了搞的。

学习官方的例子

官方提供了一个示例,下载源代码之后,有个powerjob-worker-samples子工程,工程结构是这样:

单机处理器、广播处理器、map 处理器和 mapreduce 处理器。,Java,java,分布式,大数据

工程依赖powerjob-worker-spring-boot-starter,工程文件夹中比较重点的就是processors文件夹了,给出了各种处理器实现的例子。处理器官方按照功能分了几种:

  • ​单机处理器 - BasicProcessor​​:单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。
  • ​广播处理器 - BroadcastProcessor​​:广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在​​BasicProcessor​​ 的基础上额外增加了 ​​preProcess​​ 和 ​​postProcess​​ 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。
  • ​并行处理器 - MapReduceProcessor​​:MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的首选。
  • ​Map处理器 - MapProcessor​​:对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。

process方法

BasicProcessor 的 process 方法基本上是所有任务需要实现的核心方法,表示需要执行任务的具体业务内容,其方法定义如下:

ProcessResult process(TaskContext context) throws Exception;

TaskContext入参

process参数TaskContext类似一个dto对象,里面定义了框架传递给具体业务内容所需的一些属性,如下:

属性名称 意义/用法
jobId 任务 ID,开发者一般无需关心此参数
instanceId 任务实例 ID,全局唯一,开发者一般无需关心此参数
subInstanceId 子任务实例 ID,秒级任务使用,开发者一般无需关心此参数
taskId 采用链式命名法的 ID,在某个任务实例内唯一,开发者一般无需关心此参数
taskName task 名称,Map/MapReduce 任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数
jobParams 任务参数对于非工作流中的任务其值等同于控制台录入的任务参数; 如果该任务为工作流中的任务且有配置节点参数信息,那么接收到的是节点配置的参数信息
instanceParams 任务实例参数对于非工作流中的任务 其值 等同于 OpenAPI 传递的实例参数,非 OpenAPI 触发的任务则一定为空。 如果该任务为工作流中的任务那么这里实际接收到的是工作流上下文信息,建议使用 getWorkflowContext 方法获取上下文信息
maxRetryTimes Task 的最大重试次数
currentRetryTimes Task 的当前重试次数,和 maxRetryTimes 联合起来可以判断当前是否为该 Task 的最后一次运行机会
subTask 子 Task,Map/MapReduce 处理器专属,开发者调用map方法时传递的子任务列表中的某一个
omsLogger 在线日志,用法同 Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,滥用在线日志会对 Server 造成巨大的压力
userContext 用户在 PowerJobWorkerConfig 中设置的自定义上下文
workflowContext 工作流WorkflowContext对象

这是它的源码:

@Getter
@Setter
@ToString
@Slf4j
public class TaskContext {

    private Long jobId;

    private Long instanceId;

    private Long subInstanceId;

    private String taskId;

    private String taskName;
    
    private String jobParams;
    
    private String instanceParams;
    
    private int maxRetryTimes;
    
    private int currentRetryTimes;
    
    private Object subTask;
    
    @JsonIgnore
    private OmsLogger omsLogger;
    
    private Object userContext;
    
    private WorkflowContext workflowContext;

}

返回值 ProcessResult

方法的返回值为 ​​ProcessResult​​,代表了本次任务执行的结果,包含 ​​success​​ 和 ​​msg​​ 两个属性,分别用于传递 Task 是否执行成功和 Task 需要返回的信息。

process方法被谁调用

mars酱选择官方例子中的SimpleProcessor任务,它实现了​​BasicProcessor​​的process方法,跟踪它被调用的地方,找到LightTaskTracker, 它的构造函数把任务提交给线程池调用,这是LightTaskTracker的构造函数:

public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
        super(req, workerRuntime);
        try {
            taskContext = constructTaskContext(req, workerRuntime);
            // 1. 等待处理
            status = TaskStatus.WORKER_RECEIVED;
            // 2. 加载 Processor
            processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo()));
            executeThread = new AtomicReference<>();
            long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L;
            // 3. 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
            long initDelay = RandomUtils.nextInt(5000, 10000);
            // 4. 上报任务状态
            statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
            // 5. 超时控制
            if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
                if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
                } else {
                    // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
                    timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
                }
            } else {
                timeoutCheckScheduledFuture = null;
            }
            // 6. 提交任务到线程池
            processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask);
        } catch (Exception e) {
            log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req);
            destroy();
            throw e;
        }

    }

上面代码的第6步是通过PowerJob的ExecutorManager创建的一个线程池,并提交给线程池去执行,这是ExecutorManager的构造函数:

public ExecutorManager(PowerJobWorkerConfig workerConfig){


        final int availableProcessors = Runtime.getRuntime().availableProcessors();
        // 初始化定时线程池
        ThreadFactory coreThreadFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-core-%d").build();
        coreExecutor =  new ScheduledThreadPoolExecutor(3, coreThreadFactory);

        ThreadFactory lightTaskReportFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-status-check-%d").build();
        
        lightweightTaskStatusCheckExecutor =  new ScheduledThreadPoolExecutor(availableProcessors * 10, lightTaskReportFactory);

        ThreadFactory lightTaskExecuteFactory = new ThreadFactoryBuilder().setNameFormat("powerjob-worker-light-task-execute-%d").build();
        // 这里创建线程池,
        lightweightTaskExecutorService = new ThreadPoolExecutor(availableProcessors * 10,availableProcessors * 10, 120L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>((workerConfig.getMaxLightweightTaskNum() * 2),true), lightTaskExecuteFactory, new ThreadPoolExecutor.AbortPolicy());

    }

构造函数最后一行创建ThreadPoolExecutor,队列使用的ArrayBlockingQueue,失败策略使用的AbortPolicy策略,失败之后抛出异常。

其他任务调度框架

优秀的定时任务框架很多,单体架构的实现可选的不太多,一般也就spring task常用点,分布式架构的可选的很多,可以根据自己的需求选择不同的定时任务框架,以下还有几款名气也不小的:

  • ​big-whale​​:美柚app开源的任务调度框架,提供Spark、Flink等批处理任务的DAG调度和流处理任务的运行管理和状态监控,并具有Yarn应用管理、重复应用检测、大内存应用检测等功能。
  • ​Schedulis​​:微众银行基于 LinkedIn 的开源项目 Azkaban 开发的一款工作流任务调度系统,用于解决金融级场景下,大量批量作业任务的复杂依赖、灵活调度。
  • ​Oozie​​:工作流调度系统,用于管理Apache Hadoop作业。它与Hadoop堆栈的其余部分集成,支持多种类型的Hadoop作业(如Java map-reduce,Streaming map-reduce,Pig,Hive,Sqoop和Distcp)以及系统特定的作业(如Java程序和shell脚本)。

其他优秀的开源任务调度框架,大家可以去github或者gitee上搜索并学习一下。一分钟掌握定时任务,完结。文章来源地址https://www.toymoban.com/news/detail-822002.html

到了这里,关于Java | 一分钟掌握定时任务 | 9 - PowerJob分布式定时任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式定时任务

    本文引用了谷粒商城的课程 定时任务是我们系统里面经常要用到的一些功能。如每天的支付订单要与支付宝进行对账操作、每个月定期进行财务汇总、在服务空闲时定时统计当天所有信息数据等。 定时任务有个非常流行的框架Quartz和Java原生API的Timer类。Spring框架也可以支持

    2023年04月15日
    浏览(55)
  • 分布式任务调度,定时任务的处理方案

    适用场景: Spring 定时任务是 Spring 框架提供的一种轻量级的任务调度方案,它的特点是简单易用、轻量级。Spring 定时任务的执行是在 单个节点 上进行的,如果需要分布式任务调度,需要自己实现相应的解决方案。 1.导入依赖版本自己控制 2.启动类加上@EnableScheduling 3.编写业

    2023年04月14日
    浏览(64)
  • 架构师系列- 定时任务(一)- 单机和分布式定时任务比较

    定时任务概述 在很多应用中我们都是需要执行一些定时任务的,比如定时发送短信,定时统计数据,在实际使用中我们使用什么定时任务框架来实现我们的业务,定时任务使用中会遇到哪些坑,如何最大化的提高定时任务的性能。 我们这里主要介绍单机和分布式两大类的解

    2024年04月27日
    浏览(36)
  • java中定时任务 schedule 分布式下没有锁住 时间不同步 执行滞后 相对时间 系统时间 spring springboot

    java.util.Timer计时器可以进行:管理任务延迟执行(“如1000ms后执行任务”),及周期性执行(“如每500ms执行一次该任务”)。 但是,Timer存在一些缺陷,应考虑使用ScheduledThreadPoolExecutor代替,Timer对调度的支持是基于绝对时间,而不是相对时间的,由此任务对系统时钟的改变是敏感

    2024年02月10日
    浏览(48)
  • 分布式定时任务调度框架Quartz

    Quartz是一个定时任务调度框架,比如你遇到这样的问题: 比如淘宝的待支付功能,后台会在你生成订单后24小时后,查看订单是否支付,未支付则取消订单 比如vip的每月自动续费功能 … 想定时在某个时间,去做某件事 Quartz是一套轻量级的任务调度框架,只需要定义了 Job(

    2024年02月04日
    浏览(45)
  • 使用shedlock实现分布式定时任务锁【防止task定时任务重复执行】

    第一步:引入shedlock相关依赖 ShedLock还可以使用Mongo,Redis,Hazelcast,ZooKeeper等外部存储进行协调,例如使用redis则引入下面的包 第二步:创建数据库表结构,数据库表的脚本如下: 第三步:添加shedlock配置类 (定时任务防重复执行的配置类) 第四步:在启动类上添加启动注

    2024年02月10日
    浏览(42)
  • 分布式定时任务调度xxl-job

    Quartz中最重要的三个对象:Job(作业)、Trigger(触发器)、Scheduler(调度器)。 xxl-job的调度原理:调度线程在一个while循环中不断地获取一定数量的即将触发的Trigger,拿到绑定的Job,包装成工作线程执行。 当然,不管在任何调度系统中,底层都是线程模型。如果要自己写一个

    2024年03月10日
    浏览(55)
  • 分布式定时任务-XXL-JOB-教程+实战

    1.定时任务认识 1.1.什么是定时任务 定时任务是按照指定时间周期运行任务。使用场景为在某个固定时间点执行,或者周期性的去执行某个任务,比如:每天晚上24点做数据汇总,定时发送短信等。 1.2.常见定时任务方案 While + Sleep : 通过循环加休眠的方式定时执行 Timer和Time

    2024年02月16日
    浏览(48)
  • Springboot 定时任务,分布式下幂等性如何解决

    在分布式环境下,定时任务的幂等性问题需要考虑多个节点之间的数据一致性和事务处理。 一种解决方法是使用分布式锁来保证同一时间只有一个节点能够执行该任务。具体实现可以使用Redis或Zookeeper等分布式协调工具提供的分布式锁功能。 另一种解决方法是使用消息队列来

    2024年02月11日
    浏览(35)
  • 基于Mongodb分布式锁简单实现,解决定时任务并发执行问题

    我们日常开发过程,会有一些定时任务的代码来统计一些系统运行数据,但是我们应用有需要部署多个实例,传统的通过配置文件来控制定时任务是否启动又太过繁琐,而且还经常出错,导致一些异常数据的产生 网上有很多分布式锁的实现方案,基于redis、zk、等有很多,但

    2023年04月18日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包