目的:分析xxl-job执行器的注册过程

流程:

  1. 获取执行器中所有被注解(
    @xxlJjob
    )修饰的
    handler
  2. 执行器注册过程
  3. 执行器中任务执行过程

版本:
xxl-job 2.3.1

建议:下载
xxl-job
源码,按流程图
debug
调试,
看堆栈信息并按文章内容理解执行流程

完整流程图:

img

查找Handler任务

部分流程图:

img

首先启动管理台界面(服务
XxlJobAdminApplication
),然后启动项目中给的执行器实例
(SpringBoot)
;

img

这个方法是扫描项目中使用
@xxlJob
注解的所有handler方法。接着往下走

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    //获取该项目中所有的bean,然后遍历
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        //注意点★
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        //没有跳过本次循环继续
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
    	//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            registJobHandler(xxlJob, bean, executeMethod);
        }
    }
}


Spring
案例执行器中有5个
handler
:

img

XxlJobExecutor.registJobHandler()中部分源码

String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}

然后进行遍历注册;开始进行名字判断:

  1. 判断bean名字是否为空
  2. 判断bean是否被注册了(存在了)

loadJobHandler
校验方式会去该方法中查找:当bean注册完成后时存放到
jobHandlerRepository
一个
map
类型中;

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

executeMethod.setAccessible(true);
它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查.

往后走会判断该注解的生命周期方法(
init和destroy
)

  1. 未设置生命周期,则直接开始注册
//注意MethodJobHandler,后面会用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}
  1. 有生命周期,设置init和destroy方法权限
if (xxlJob.init().trim().length() > 0) {
    try {
        initMethod = clazz.getDeclaredMethod(xxlJob.init());
        initMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}
if (xxlJob.destroy().trim().length() > 0) {
    try {
        destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
        destroyMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}

首先检查
@XxlJob
注解中的
init
属性是否存在且不为空。如果存在,则尝试获取该类中名为
init
的方法,并将其设置为
可访问状态
,以便后续调用。

同理,代码接下来也检查了
@XxlJob
注解中的
destroy
属性是否存在且不为空,如果是,则获取该类中名为
destroy
的方法,并设置其为
可访问状态

在这个过程中,如果某个方法不存在或者无法被访问,则会抛出
NoSuchMethodException
异常,并且使用
throw new RuntimeException
将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置
init和destroy
属性,并确保方法名称与属性值一致。

执行器的注册过程

部分流程图:

img

public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

在扫描完执行器中所有的任务后,开始进行执行器注册
XxlJobSpringExecutor中的super.start()
方法。

在初始化执行服务器启动之前,进行了四种操作,初始化日志、初始化
adminBizList
地址(可视化管理台地址)、初始化日志清除、初始化回调线程等。

这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

继续到
initEmbedServer
,开始初始化ip地址和端口等,
需要明白的是,这一步的参数获取方式其实是第一步读取
**XxlJobConfig**
获得的;
进行ip的校验和拼接等操作,开始进行真正的注册。

创建一个
嵌入式的HTTP服务器,
将当前执行器信息(包含应用名称和IP地址端口等)注册到注册中心,注册方式的实现在
ExecutorRegistryThread
中实现。

校验名字和注册中心,如果注册中心不可用,则等待一段时间后重新尝试连接。

// registry
while (!toStop) {
    try {
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
            }

        }
    } catch (Exception e) {
        if (!toStop) {
            logger.error(e.getMessage(), e);
        }

    }

    try {
        //心跳检测,默认30s
        if (!toStop) {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        }
    } catch (InterruptedException e) {
        if (!toStop) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
        }
    }
}

开启一个新线程,首先
构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号),遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求。

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}


debug
的过程中,
XxlJobRemotingUtil
执行到
int statusCode = connection.getResponseCode();
才会跳转到
JobApiController.api
中的注册地址.

// services mapping
if ("callback".equals(uri)) {
    List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
    return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registryRemove(registryParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}

最后进入到
JobRegistryHelper.registry()
方法中完成数据库的入库和更新操作。

通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过。

public ReturnT<String> registry(RegistryParam registryParam) {
	//.......
    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            //更新注册表信息
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            if (ret < 1) {
                //保存执行器注册信息
                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

                // fresh 刷新执行器状态
                freshGroupRegistryInfo(registryParam);
            }
        }
    });

    return ReturnT.SUCCESS;
}

至此执行器的注册流程分析完成。

执行器中的任务执行过程

img

部分流程图:

img

执行器中的任务流程比较简单,如果执行器启动的话,那么每次执行任务是通过
JobThread
通过
Cron
表达式进行操作的。

通过
handler.execute()
进行执行,是在框架内部通过反射机制调用作业处理器对象
handler
中的
execute()
方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了
init()
方法进行初始化。

method.invoke()
方法使用反射机制调用指定对象
target
中的方法
method
。在这个方法中,
target
表示作业处理器对象,
method
表示作业处理器中的
execute()
方法。

通过上述方法,获取到
SampleXxlJob.demoJobHandler
的任务,然后开始进行任务逻辑操作。

标签: none

添加新评论