Quartz的执行流程分析
一、QuartzSchedulerThread 的 run方法大致阐述
先说一下run方法的执行时机:
当 Quartzscheduler执行start方法 时,方法体中有一句
schedThread.togglePause(false); ,接着就会调用 QuartzSchedulerThread 下的 togglePause 方法,将 paused 置为 false ,在此之后, QuartzSchedulerThread 下的 run 方法开始真正运行 /**通知主处理循环在下一个可能的点暂停 */ void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); } else { sigLock.notifyAll(); } } }复制代码 public void run() { int acquiresFailed = 0; // 这里就是判断调度器是否该停止,如果没有收到信号的话,这个调度器是一直处于循环之中的 while (!halted.get()) { try { // 这里是检查我们是否应该暂停 synchronized (sigLock) { // 我们在初始化的时候,paused 是置为 true的, // 因此在上文中,我们才说 // 当 Quartzscheduler 执行 start方法时调用togglePause, // 将 paused 置为false,run 方法才开始运行 // 也是因为此处的判断 while (paused && !halted.get()) { try { sigLock.wait(1000L); }catch (InterruptedException ignore) {} acquiresFailed = 0; } if (halted.get()) { break;} } // 如果从作业存储中读取一直失败(例如数据库关闭或重新启动) // 就会等待一段时间~ if (acquiresFailed > 1) { try { //这里就是计算延迟时间 long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) {} } // 从线程池拿出空闲可利用的线程数量 // 这里多谈一嘴 blockForAvailableThreads()方法 // 它是一个阻塞式方法,直到至少有一个可用线程。 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { List triggers; long now = System.currentTimeMillis(); // 清除信号调度变更 clearSignaledSchedulingChange(); try { //如果可用线程数量足够那么就是30后再次扫描, //acquireNextTriggers方法的三个参数的意思分别是: //idleWaitTime :为如果没有的再次扫描的时间,默认是 // private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L; 30秒 //Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()) :这里的意思就是一次最多能取几个出来 //batchTimeWindow :默认是0,同样是一个时间范围, //如果有两个任务只差一两秒,而执行线程数量满足及batchTimeWindow时间也满足的情况下就会两个都取出来 // 具体的方法的执行,后文再看~ triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()){ //... } //在获取到 triggers 触发器不为空后, //trigger列表是以下次执行时间排序查出来的 if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); //取出集合中最早执行的触发器 //获取它的下一个触发时间 long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; // 判断距离执行时间是否大于2 毫秒 while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } //判断是不是距离触发事件最近的, if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // 没有的话,就进行阻塞,稍后进行执行 now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) {} } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to "executing" List bndles = new ArrayList(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { //开始根据需要执行的trigger从数据库中获取相应的JobDetail 同时这一步也更新了 triggers 的状态,稍后会讲到~ List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers "" + triggers + """, se); for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } //将查询到的结果封装成为 TriggerFiredResult for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); //TriggerFiredBundle用于将执行时数据从 JobStore 返回到QuartzSchedulerThread 。 TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { //把任务封装成JobRunShell线程任务, //JobRunShell extends SchedulerListenerSupport implements Runnable 是实现了 Runnable 接口的 //然后放到线程池中跑动。 shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } // 别看这里是个if判断 // 但是这里就是将 obshell 放进线程池执行的地方 // 利用的就是boolean runInThread(Runnable runnable); 方法 // 这个方法的作用就是 在下一个可用的Thread中执行给定Runnable if (qsRsrcs.getThreadPool().runInThread(shell) == false) { getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { // .... } } // .... } // while (!halted) // .... }复制代码二、一些细节2.1、先获取线程池中的可用线程数量
(若没有可用的会阻塞,直到有可用的); int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); 复制代码2.2、获取 30m 内要执行的 trigger
(即 acquireNextTriggers )
我们来看一看 acquireNextTriggers 方法
首先说 acquireNextTriggers 具体实现是在 JobStoreSupport 中,同时 quartz 与数据库关联的实现大都在 JobStoreSupport 中,当然更具体的SQL执行还是在 DriverDelegate 接口下的。
acquireNextTriggers 做了哪些事情呢?
我们看看这两个方法:
首先看第一个 acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
主要就是获取下一个 30m内可执行的triggers的触发器,在里面 JobStoreSupport 从数据库取出 triggers 时是按照 nextFireTime 排序的
更具体的就需要大家点进方法去看啦~另外里面还包含triggers状态的变更,属于是更加细节化的东西。
第二个就是获取到触发的触发记录~
然后在执行 executeInNonManagedTXLock 时,是需要先获得锁,之后再在提交时释放锁的。
待直到获取的trigger中最先执行的trigger在2ms内; if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { //... } }复制代码2.3、triggersFired(triggers)
List res = qsRsrcs.getJobStore().triggersFired(triggers);
这一步看着只是获取了 List 对象,实际上在 triggersFired(triggers) 方法中隐藏了很多东西~
首先查询,确保触发器没有被删除、暂停或完成...,就更新 firedTrigger 的 status=STATE_EXECUTING ;代码的注释上还说,如果没有这些就会将状态该为 deleted
另外就是更新触发触发器: 更新trigger下一次触发的时间; 更新trigger的状态:
如果下一次的执行时间为空,状态则改为 STATE_COMPLETE
在执行 executeInNonManagedTXLock 方法时,提交前先获得锁, transOwner = getLockHandler().obtainLock(conn, lockName);
最后是释放锁: commitConnection(conn); 2.4、创建JobRunShell,放进线程池执行
针对每个要执行的trigger,创建JobRunShell,并放入线程池执行:
然后由execute:执行job
更详细的看不下去啦~
来源:https://juejin.cn/post/7131671438901116935
升级投屏体验,易投熊T1入门级无线HDMI传输值得一试万物互联时代,各屏幕间的互联互通也会越来越重要,公司会议室的投屏方法比较过时,家庭电视也又有点跟不上节奏了,今天笔者带来的评测则是一款能够变废为宝的无线HDMI传输小物件易投熊T1
重回2198元,6400万四摄8GB128GB,友商高素质真全面屏手机如果要用两个字来形容目前的手机圈,我觉得用厮杀来形容最合适不过了。因为,产品的同质化非常严重,举个简单多了例子,预算5000元左右,你可以买到的骁龙888手机相当多,它们基本都会具
号称年轻人的第一把键盘?小米生态链又来搞事情了前言号称年轻人的第一把客制化机械键盘!听到这个Slogan是不是觉得特别耳熟。最近米物就发布了他们的米物ART系列机械键盘,众筹时的价格仅需369元!68键佳达隆G黄Pro轴,可以
王者荣耀皮肤曝光,天竺公主自带异域风情,女儿国国王甄姬绝美相信小伙伴们都知道,王者荣耀最近要和西游记联动,可能是因为皮肤即将上线最近有不少相关的爆料,赶紧来看看吧!首先就是最确定的皮肤,和西游记联动的唐僧皮肤。新英雄金蝉即将上线,大家从1
乡土散文火炉(甘肃80后农村孩子的记忆)绿蚁新醅酒,红泥小火炉。晚来天欲雪,能饮一杯无?每当读起这首耳熟能详的古诗,我便会想起童年的火炉,那暖暖的气息永远氤氲在我的脑海里。上世纪八十年代后期,偏僻的小山村还是落后至极,几
散文人生应该积极向上人生应该积极向上,让自己拥有自己的一片天空,也让内心拥有自己的一片天地。这时想起来,只有静静地想起来,竟也变得有些痴迷了。这就是心太热烈了,也太淡漠了,所以才会想起来,想起来自己过
东方叶子香飘万里中国是茶的故乡。在历史上,中国的茶叶和丝绸等商品通过丝绸之路走向世界,这条路也成了连通中国与世界的和平之路文明之路。实际上,在中国与世界的交往史中,丝绸之路是最著名的,却不是唯一的
原神早柚培养攻略,大剑配萝莉,挖矿加跑图,功能性拉满大佬们一直都在说胡桃池和武器池都很不错,有很重要的原因是,这次胡桃池中一起up的四星角色猫猫和早柚都是很香的,上次写了迪奥娜的攻略,这次就说一下萝莉团中目前最新的角色,早柚的培养攻
宝博看电竞赛Faker已成为自由人随着2021年冬季转会期的已经开始,Lck最近宣布了一系列官方公告,很多选手已成为自由人。然而,除了hle的核心chovy和deft,以及DK的核心Canyon和showmaker
LOL半价海克斯皮肤哪个值得兑换?S11EDG夺冠之后,海克斯科技皮肤兑换需要的宝石也减半了。而宝石这东西很稀有,可能攒很久才能攒够一个皮肤的,那么哪些皮肤是值得兑换的呢?值得兑换的卡萨丁波比鳄鱼乌鸦龙龟猪妹人马薇
九寨秋韵如诗如画来源人民网航拍九寨沟景区一角。梁枫摄秋天的九寨沟景区如一幅油画绚丽的海子橙红的秋叶蜿蜒的栈道暖暖的阳光穿过树枝,从秋叶间透出斑驳的光影,洒落在海子上,风一吹,泛起点点涟漪。丝毫不减