elastic-job源码(1)- job自动装配

科技资讯 投稿 7000 0 评论

elastic-job源码(1)- job自动装配

版本:3.1.0-SNAPSHOT
git地址:https://github.com/apache/shardingsphere-elasticjob
 
Maven 坐标

<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>${latest.version}</version> </dependency>

 
Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration
在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。
 
ElasticJobLiteAutoConfiguration.java

/** * ElasticJob-Lite auto configuration. */ @Configuration(proxyBeanMethods = false @AutoConfigureAfter(DataSourceAutoConfiguration.class /** * elastic job 开关 * elasticjob.enabled.ture默认为true */ @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true /** * 导入 * ElasticJobRegistryCenterConfiguration.class 注册中心配置 * ElasticJobTracingConfiguration.class job事件追踪配置 * ElasticJobSnapshotServiceConfiguration.class 快照配置 */ @Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class} /** * job相关配置信息 */ @EnableConfigurationProperties(ElasticJobProperties.class public class ElasticJobLiteAutoConfiguration { @Configuration(proxyBeanMethods = false /** * ElasticJobBootstrapConfiguration.class 创建job beans 注入spring容器 * ScheduleJobBootstrapStartupRunner.class 执行类型为ScheduleJobBootstrap.class 的job开始运行 */ @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class} protected static class ElasticJobConfiguration { } }

Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。
自动装配主要做了4件事事
1.配置zookeeper 客户端信息,启动连接zookeeper.
2.配置事件追踪数据库,用于保存job运行记录
3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中
4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
 
第一件事:配置zookeeper 客户端信息,启动连接zookeeper.
ZookeeperRegistryCenter.class

public void init( { log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists(; CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder( //设置zookeeper 服务器地址 .connectString(zkConfig.getServerLists( //设置重试机制 .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(, zkConfig.getMaxRetries(, zkConfig.getMaxSleepTimeMilliseconds( //设置命名空间,zookeeper节点名称 .namespace(zkConfig.getNamespace(; //设置session超时时间 if (0 != zkConfig.getSessionTimeoutMilliseconds( { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds(; } //设置连接超时时间 if (0 != zkConfig.getConnectionTimeoutMilliseconds( { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds(; } if (!Strings.isNullOrEmpty(zkConfig.getDigest( { builder.authorization("digest", zkConfig.getDigest(.getBytes(StandardCharsets.UTF_8 .aclProvider(new ACLProvider( { @Override public List<ACL> getDefaultAcl( { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path { return ZooDefs.Ids.CREATOR_ALL_ACL; } }; } client = builder.build(; //zookeeper 客户端开始启动 client.start(; try { //zookeeper 客户端一直连接 if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds( * zkConfig.getMaxRetries(, TimeUnit.MILLISECONDS { client.close(; throw new KeeperException.OperationTimeoutException(; } //CHECKSTYLE:OFF } catch (final Exception ex { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex; } }

 

第二件事: 配置事件追踪数据库,用于保存job运行记录

ElasticJobTracingConfiguration.java

/**
 * Create a bean of tracing DataSource.
 *
 * @param tracingProperties tracing Properties
 * @return tracing DataSource
 */
@Bean("tracingDataSource"
//spring中注入bean name 为tracingDataSource的job数据库连接信息
public DataSource tracingDataSource(final TracingProperties tracingProperties {
    //获取elastic-job 数据库配置
    DataSourceProperties dataSource = tracingProperties.getDataSource(;
    if (dataSource == null {
        return null;
    }
    HikariDataSource tracingDataSource = new HikariDataSource(;
    tracingDataSource.setJdbcUrl(dataSource.getUrl(;
    BeanUtils.copyProperties(dataSource, tracingDataSource;
    return tracingDataSource;
}


/**
 * Create a bean of tracing configuration.
 *
 * @param dataSource required by constructor
 * @param tracingDataSource tracing ataSource
 * @return a bean of tracing configuration
 */
@Bean
@ConditionalOnBean(DataSource.class
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB"
public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource {
    /**
     * dataSource 是业务数据库
     * tracingDataSource 是job数据库
     * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
     * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
     */
    DataSource ds = tracingDataSource;
    if (ds == null {
        ds = dataSource;
    }
    return new TracingConfiguration<>("RDB", ds;
}

 

 

ElasticJobBootstrapConfiguration.class

public void createJobBootstrapBeans( {
    //获取job配置
    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class;
    //获取单利注册对象
    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext applicationContext.getBeanFactory(;
    //获取注入zookeeper 客户端
    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class;
    //获取job事件追踪
    TracingConfiguration<?> tracingConfig = getTracingConfiguration(;
    //构造JobBootstraps
    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig;
}

重要的是constructJobBootstraps 这个方法,来看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig { //遍历配置的每一个job for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs(.entrySet( { ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue(; //校验 job class 和 type 都为空抛异常 Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass( || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType(, "Please specific [elasticJobClass] or [elasticJobType] under job configuration."; //校验 job class 和 type 都有 报相互排斥 Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass( || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType(, "[elasticJobClass] and [elasticJobType] are mutually exclusive."; if (null != jobConfigurationProperties.getElasticJobClass( { //通过class 注入job registerClassedJob(entry.getKey(, entry.getValue(.getJobBootstrapBeanName(, singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties; } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType( { //通过type 注入job registerTypedJob(entry.getKey(, entry.getValue(.getJobBootstrapBeanName(, singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties; } } }

Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入
 
再来看看registerClassedJob 方法里的内容

private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties { //获取job配置 JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName; //配置job事件追踪 jobExtraConfigurations(jobConfig, tracingConfig; //获取job类型 ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass(; //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务 if (Strings.isNullOrEmpty(jobConfig.getCron( { Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName, "The property [jobBootstrapBeanName] is required for One-off job."; singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig; } else { //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务 //设置bean name String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName ? jobBootstrapBeanName : jobConfig.getJobName( + "ScheduleJobBootstrap"; //注入ScheduleJobBootstrap对象为单利对象 singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig; } }

Class 类型注入的job有两种类型
1.ScheduleJobBootstrap:定时任务类型的job。
2.OneOffJobBootstrap:一定次job类型。
 
看下定义的new ScheduleJobBootstrap 方法

public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig { Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null."; this.regCenter = regCenter; //获取job监听器 Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig; // 集成所有操作zookeeper 节点的services,job 监听器 setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(, jobListeners; //获取当前job名称 String jobClassName = JobClassNameProviderFactory.getProvider(.getJobClassName(elasticJob; //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息 this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig; // 集成所有操作zookeeper 节点的services schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName(; jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(, jobListeners, findTracingConfiguration(.orElse(null; //检验job配置 validateJobProperties(; //定义job执行器 jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade; //监听器里注入GuaranteeService setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners; //创建定时任务,开始执行 jobScheduleController = createJobScheduleController(; }

看下createJobScheduleController

private JobScheduleController createJobScheduleController( { JobScheduleController result = new JobScheduleController(createScheduler(, createJobDetail(, getJobConfig(.getJobName(; //注册job JobRegistry.getInstance(.registerJob(getJobConfig(.getJobName(, result; //注册器开始运行 registerStartUpInfo(; return result; }

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled { //开始所有的监听器 listenerManager.startAllListeners(; //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器 leaderService.electLeader(; //{namespace}/{ipservers} 设置enable处理 serverService.persistOnline(enabled; //临时节点 /{namespave}/instances 放置运行服务实例信息 instanceService.persistOnline(; //开启一个异步服务 if (!reconcileService.isRunning( { reconcileService.startAsync(; } }

这里实行的操作:
1.开启所有监听器处理
2.leader选举
3.持久化节点数据
4.开启异步服务
 
第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
 

@Override public void run(final String... args { log.info("Starting ElasticJob Bootstrap."; applicationContext.getBeansOfType(ScheduleJobBootstrap.class.values(.forEach(ScheduleJobBootstrap::schedule; log.info("ElasticJob Bootstrap started."; }

获取到所有的定时任务job(ScheduleJobBootstrap类型,执行schedule方法,底层实际使用quartz框架运行定时任务。
 
 
 
 
 

编程笔记 » elastic-job源码(1)- job自动装配

赞同 (38) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽