elastic-job源码(1)- job自动装配

这篇具有很好参考价值的文章主要介绍了elastic-job源码(1)- job自动装配。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

版本:3.1.0-SNAPSHOT
git地址:https://github.com/apache/shardingsphere-elasticjob
 
Maven 坐标
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>${latest.version}</version>
</dependency>
 
Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration
在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。
 
ElasticJobLiteAutoConfiguration.java
/**
 * ElasticJob-Lite auto configuration.
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)


/**
 * elastic job 开关
 * elasticjob.enabled.ture默认为true
 */
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)


/**
 * 导入
 * ElasticJobRegistryCenterConfiguration.class 注册中心配置
 * ElasticJobTracingConfiguration.class job事件追踪配置
 * ElasticJobSnapshotServiceConfiguration.class 快照配置
 */
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})


/**
 * job相关配置信息
 */
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
    
    @Configuration(proxyBeanMethods = false)
    /**
     * ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器
     * ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行
     */
    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
    protected static class ElasticJobConfiguration {
    }
}
Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。
自动装配主要做了4件事事
1.配置zookeeper 客户端信息,启动连接zookeeper.
2.配置事件追踪数据库,用于保存job运行记录
3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中
4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
 
第一件事:配置zookeeper 客户端信息,启动连接zookeeper.
ZookeeperRegistryCenter.class
public void init() {
    log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            //设置zookeeper 服务器地址
            .connectString(zkConfig.getServerLists())
            //设置重试机制
            .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
            //设置命名空间,zookeeper节点名称
            .namespace(zkConfig.getNamespace());
    //设置session超时时间
    if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
        builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
    }
    //设置连接超时时间
    if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
        builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
    }
    if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
        builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                .aclProvider(new ACLProvider() {
                
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                
                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
    }
    client = builder.build();
    //zookeeper 客户端开始启动
    client.start();
    try {
        //zookeeper 客户端一直连接
        if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
            client.close();
            throw new KeeperException.OperationTimeoutException();
        }
        //CHECKSTYLE:OFF
    } catch (final Exception ex) {
        //CHECKSTYLE:ON
        RegExceptionHandler.handleException(ex);
    }
}

 

第二件事: 配置事件追踪数据库,用于保存job运行记录

ElasticJobTracingConfiguration.java

 

/**
 * Create a bean of tracing DataSource.
 *
 * @param tracingProperties tracing Properties
 * @return tracing DataSource
 */
@Bean("tracingDataSource")
//spring中注入bean name 为tracingDataSource的job数据库连接信息
public DataSource tracingDataSource(final TracingProperties tracingProperties) {
    //获取elastic-job 数据库配置
    DataSourceProperties dataSource = tracingProperties.getDataSource();
    if (dataSource == null) {
        return null;
    }
    HikariDataSource tracingDataSource = new HikariDataSource();
    tracingDataSource.setJdbcUrl(dataSource.getUrl());
    BeanUtils.copyProperties(dataSource, tracingDataSource);
    return tracingDataSource;
}


/**
 * Create a bean of tracing configuration.
 *
 * @param dataSource required by constructor
 * @param tracingDataSource tracing ataSource
 * @return a bean of tracing configuration
 */
@Bean
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
    /**
     * dataSource 是业务数据库
     * tracingDataSource 是job数据库
     * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
     * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
     */
    DataSource ds = tracingDataSource;
    if (ds == null) {
        ds = dataSource;
    }
    return new TracingConfiguration<>("RDB", ds);
}

 

通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。

 

第三件事:解析所有job配置文件

ElasticJobBootstrapConfiguration.class

 

public void createJobBootstrapBeans() {
    //获取job配置
    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
    //获取单利注册对象
    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
    //获取注入zookeeper 客户端
    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
    //获取job事件追踪
    TracingConfiguration<?> tracingConfig = getTracingConfiguration();
    //构造JobBootstraps
    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}

重要的是constructJobBootstraps 这个方法,来看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
    //遍历配置的每一个job
    for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
        //校验 job class 和 type 都为空抛异常
        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                        || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
        //校验 job class 和 type 都有 报相互排斥
        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                        || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");


        if (null != jobConfigurationProperties.getElasticJobClass()) {
            //通过class 注入job
            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
            //通过type 注入job
            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        }
    }
}
Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入
 
再来看看registerClassedJob 方法里的内容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
    //获取job配置
    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
    //配置job事件追踪
    jobExtraConfigurations(jobConfig, tracingConfig);
    //获取job类型
    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
    //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务
    if (Strings.isNullOrEmpty(jobConfig.getCron())) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
    } else {
        //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务
        //设置bean name
        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
        //注入ScheduleJobBootstrap对象为单利对象
        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
    }
}
Class 类型注入的job有两种类型
1.ScheduleJobBootstrap:定时任务类型的job。
2.OneOffJobBootstrap:一定次job类型。
 
看下定义的new ScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
    this.regCenter = regCenter;
    //获取job监听器
    Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
    // 集成所有操作zookeeper 节点的services,job 监听器
    setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
    //获取当前job名称
    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
    //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息
    this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
    // 集成所有操作zookeeper 节点的services
    schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
    jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
    //检验job配置
    validateJobProperties();
    //定义job执行器
    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
    //监听器里注入GuaranteeService
    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
    //创建定时任务,开始执行
    jobScheduleController = createJobScheduleController();
}

 

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {
    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
    //注册job
    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
    //注册器开始运行
    registerStartUpInfo();
    return result;
}

看下registerStartUpInfo方法文章来源地址https://www.toymoban.com/news/detail-426170.html

public void registerStartUpInfo(final boolean enabled) {
    //开始所有的监听器
    listenerManager.startAllListeners();
    //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
    leaderService.electLeader();
    //{namespace}/{ipservers} 设置enable处理
    serverService.persistOnline(enabled);
    //临时节点   /{namespave}/instances 放置运行服务实例信息
    instanceService.persistOnline();
    //开启一个异步服务
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}
这里实行的操作:
1.开启所有监听器处理
2.leader选举
3.持久化节点数据
4.开启异步服务
 
第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
 
@Override
public void run(final String... args) {
    log.info("Starting ElasticJob Bootstrap.");
    applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
    log.info("ElasticJob Bootstrap started.");
}
获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。
 
 
 
 
 

到了这里,关于elastic-job源码(1)- job自动装配的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【转载】elastic-job链接zookeeper报错KeeperErrorCode = OperationTimeout

    org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout 报错 新同事本地启项目,springboot启动报这个错,原文如下: 排查过程: 网上有说防火墙的(试了不行);有说zk版本的(这边还是第三方包间接依赖的,且有包冲突,试了去掉一个没好使); 解决方案: 修

    2024年02月12日
    浏览(36)
  • 定时任务特辑 | Quartz、xxl-job、elastic-job、Cron四个定时任务框架对比,和Spring Boot集成实战

    专栏集锦,大佬们可以收藏以备不时之需: Spring Cloud 专栏: Python 专栏: Redis 专栏: TensorFlow 专栏: Logback 专栏: 量子计算: 量子计算 | 解密著名量子算法Shor算法和Grover算法 AI机器学习实战: AI机器学习实战 | 使用 Python 和 scikit-learn 库进行情感分析 AI机器学习 | 基于lib

    2024年02月05日
    浏览(56)
  • SpringBoot源码-自动装配

      springboot的核心注解@SpringBootApplication 接着看 @SpringBootApplication 注解 截图: 代码:  接着看红框的注解 @EnableAutoConfiguration 截图: 代码:  接着看红框的 AutoConfigurationImportSelector.class 这个类 截图: 接着看接口 DeferredImportSelector 的实现 截图: 在这个DeferredImportSelector类中,

    2024年02月08日
    浏览(59)
  • 一文足够,SpringBoot自动装配底层源码

    目录 自动装配原理 开始深入源码 总结自动装配原理 首先明白一个概念,什么是自动装配? 我们在项目中建一个yaml或者properties文件,里面配置一些参数,如redis,在pom中引入启动器,之后就能用redis,自动把这些集成到spring中,这就是自动装配。 先来提前剧透: 加载spring.

    2023年04月13日
    浏览(43)
  • 【Spring Boot 源码学习】自动装配流程源码解析(上)

    《Spring Boot 源码学习系列》 上篇博文,笔者带大家从整体上了解了AutoConfigurationImportSelector 自动装配逻辑的核心功能及流程,由于篇幅有限,更加细化的功能及流程详解还没有介绍。本篇开始将从其源码入手,重点解析细化后的自动装配流程源码。 在开始本篇的内容介绍之前

    2024年02月14日
    浏览(41)
  • 【Spring Boot 源码学习】自动装配流程源码解析(下)

    《Spring Boot 源码学习系列》 上篇博文,笔者带大家了解了自动装配流程中有关自动配置加载的流程; 本篇将介绍自动装配流程剩余的内容,包含了自动配置组件的排除和过滤、触发自动配置事件。 在开始本篇的内容介绍之前,我们先来看看往期的系列文章【有需要的朋友,

    2024年02月11日
    浏览(36)
  • Spring Boot源码解析 - 自动装配原理

    Spring Boot 自动装配是 Spring Boot 框架的一个关键特性,它的目标是让开发者能够快速构建 Spring 应用程序,减少繁琐的配置工作。   @SpringApplication 从启动类 @SpringApplication 注解入手, @SpringBootApplication 是一个组合注解,它是 Spring Boot 框架中常用的一个主要注解之一。它结合了

    2024年01月19日
    浏览(44)
  • Springboot中SpringSecurity自动装配原理,源码级别绝对详细

    (1)Springboot有一个自动配置类 SecurityFilterAutoConfiguration , SecurityFilterAutoConfiguration 只要当项目中引入了SpringSecurity的相关jar包就会被自动加载。装载这个类是干嘛的呢? (2)如下图, SecurityFilterAutoConfiguration 自动配置类主要用于,当存在名字叫做\\\"springSecurityFilterChain\\\"的bea

    2024年02月05日
    浏览(51)
  • SpringBoot源码解读与原理分析(六)WebMvc场景的自动装配

    了解了SpringBoot的自动装配机制之后,研究一个常见且实用的场景:当项目整合SpringWebMvc后SpringBoot的自动装配都做了什么? 2.6.1 WebMvcAutoConfiguration 引入spring-boot-starter-web依赖后,SpringBoot会进行WebMvc的自动装配,处理的核心是一个叫WebMvcAutoConfiguration的自动配置类。 由以上源码

    2024年02月21日
    浏览(51)
  • CaffeineCache+Redis 接入系统做二层缓存思路实现(借鉴 mybatis 二级缓存、自动装配源码)

    现在手上有个系统写操作比较少,很多接口都是读操作,也就是写多读少,性能上遇到瓶颈了,正所谓前人栽树、后人乘凉,原先系统每次都是查数据库的,性能比较低,如果先查 redis,redis 没数据再查数据库的话,但是还可以更快,那就是使用内存查询,依次按照内存、

    2024年02月09日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包