流程:
- 获取执行器中所有被注解(
- 执行器注册过程
- 执行器中任务执行过程
@xxlJjob
修饰的handler
xxl-job 2.3.1
xxl-job源码,按流程图debug
调试,看堆栈信息并按文章内容理解执行流程。
完整流程图:
查找Handler任务
部分流程图:
XxlJobAdminApplication,然后启动项目中给的执行器实例(SpringBoot
;
@xxlJob注解的所有handler方法。接着往下走
private void initJobHandlerMethodRepository(ApplicationContext applicationContext {
if (applicationContext == null {
return;
}
//获取该项目中所有的bean,然后遍历
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true;
for (String beanDefinitionName : beanDefinitionNames {
Object bean = applicationContext.getBean(beanDefinitionName;
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(,
new MethodIntrospector.MetadataLookup<XxlJob>( {
//注意点★
@Override
public XxlJob inspect(Method method {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class;
}
};
} catch (Throwable ex {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex;
}
//没有跳过本次循环继续
if (annotatedMethods==null || annotatedMethods.isEmpty( {
continue;
}
//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet( {
Method executeMethod = methodXxlJobEntry.getKey(;
XxlJob xxlJob = methodXxlJobEntry.getValue(;
// regist
registJobHandler(xxlJob, bean, executeMethod;
}
}
}
在Spring
案例执行器中有5个handler
:
String name = xxlJob.value(;
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass(;
String methodName = executeMethod.getName(;
if (name.trim(.length( == 0 {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .";
}
if (loadJobHandler(name != null {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.";
}
然后进行遍历注册;开始进行名字判断:
- 判断bean名字是否为空
- 判断bean是否被注册了(存在了
loadJobHandler校验方式会去该方法中查找:当bean注册完成后时存放到jobHandlerRepository
一个map
类型中;
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(;
public static IJobHandler loadJobHandler(String name{
return jobHandlerRepository.get(name;
}
executeMethod.setAccessible(true;
它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查。init和destroy
未设置生命周期,则直接开始注册
//注意MethodJobHandler,后面会用到 registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod; //添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性 public static IJobHandler registJobHandler(String name, IJobHandler jobHandler{ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler; return jobHandlerRepository.put(name, jobHandler; }
- 有生命周期,设置init和destroy方法权限
if (xxlJob.init(.trim(.length( > 0 { try { initMethod = clazz.getDeclaredMethod(xxlJob.init(; initMethod.setAccessible(true; } catch (NoSuchMethodException e { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] ."; } } if (xxlJob.destroy(.trim(.length( > 0 { try { destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy(; destroyMethod.setAccessible(true; } catch (NoSuchMethodException e { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] ."; } }
首先检查
@XxlJob
注解中的init
属性是否存在且不为空。如果存在,则尝试获取该类中名为init
的方法,并将其设置为可访问状态,以便后续调用。@XxlJob注解中的
destroy
属性是否存在且不为空,如果是,则获取该类中名为destroy
的方法,并设置其为可访问状态。NoSuchMethodException异常,并且使用
throw new RuntimeException
将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置init和destroy
属性,并确保方法名称与属性值一致。执行器的注册过程
部分流程图:
public void afterSingletonsInstantiated( { // init JobHandler Repository /*initJobHandlerRepository(applicationContext;*/ // init JobHandler Repository (for method initJobHandlerMethodRepository(applicationContext; // refresh GlueFactory GlueFactory.refreshInstance(1; // super start try { super.start(; } catch (Exception e { throw new RuntimeException(e; } }
在扫描完执行器中所有的任务后,开始进行执行器注册
XxlJobSpringExecutor中的super.start(
方法。adminBizList地址(可视化管理台地址、初始化日志清除、初始化回调线程等。
这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到。
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken throws Exception { // fill ip port port = port>0?port: NetUtil.findAvailablePort(9999; ip = (ip!=null&&ip.trim(.length(>0?ip: IpUtil.getIp(; // generate address if (address==null || address.trim(.length(==0 { String ip_port_address = IpUtil.getIpPort(ip, port; // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address; } // accessToken if (accessToken==null || accessToken.trim(.length(==0 { logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."; } // start embedServer = new EmbedServer(; embedServer.start(address, port, appname, accessToken; }
继续到
initEmbedServer
,开始初始化ip地址和端口等,需要明白的是,这一步的参数获取方式其实是第一步读取**XxlJobConfig**
获得的;进行ip的校验和拼接等操作,开始进行真正的注册。嵌入式的HTTP服务器,将当前执行器信息(包含应用名称和IP地址端口等注册到注册中心,注册方式的实现在
ExecutorRegistryThread
中实现。// registry while (!toStop { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(, appname, address; for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList( { try { ReturnT<String> registryResult = adminBiz.registry(registryParam; if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode( { registryResult = ReturnT.SUCCESS; logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}; break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}; } } catch (Exception e { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e; } } } catch (Exception e { if (!toStop { logger.error(e.getMessage(, e; } } try { //心跳检测,默认30s if (!toStop { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT; } } catch (InterruptedException e { if (!toStop { logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage(; } } }
开启一个新线程,首先构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号,遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求。
@Override public ReturnT<String> registry(RegistryParam registryParam { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class; }
在
debug
的过程中,XxlJobRemotingUtil
执行到int statusCode = connection.getResponseCode(;
才会跳转到JobApiController.api
中的注册地址。// services mapping if ("callback".equals(uri { List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class; return adminBiz.callback(callbackParamList; } else if ("registry".equals(uri { RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class; return adminBiz.registry(registryParam; } else if ("registryRemove".equals(uri { RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class; return adminBiz.registryRemove(registryParam; } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +" not found."; }
最后进入到
JobRegistryHelper.registry(
方法中完成数据库的入库和更新操作。通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过。
public ReturnT<String> registry(RegistryParam registryParam { //....... // async execute registryOrRemoveThreadPool.execute(new Runnable( { @Override public void run( { //更新注册表信息 int ret = XxlJobAdminConfig.getAdminConfig(.getXxlJobRegistryDao(.registryUpdate(registryParam.getRegistryGroup(, registryParam.getRegistryKey(, registryParam.getRegistryValue(, new Date(; if (ret < 1 { //保存执行器注册信息 XxlJobAdminConfig.getAdminConfig(.getXxlJobRegistryDao(.registrySave(registryParam.getRegistryGroup(, registryParam.getRegistryKey(, registryParam.getRegistryValue(, new Date(; // fresh 刷新执行器状态 freshGroupRegistryInfo(registryParam; } } }; return ReturnT.SUCCESS; }
至此执行器的注册流程分析完成。
执行器中的任务执行过程
部分流程图:
JobThread通过
Cron
表达式进行操作的。handler.execute(进行执行,是在框架内部通过反射机制调用作业处理器对象
handler
中的execute(
方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了init(
方法进行初始化。method.invoke( 方法使用反射机制调用指定对象
target
中的方法method
。在这个方法中,target
表示作业处理器对象,method
表示作业处理器中的execute(
方法。SampleXxlJob.demoJobHandler的任务,然后开始进行任务逻辑操作。