一、功能說明
SpringBoot的定時任務(wù)的加強工具,實現(xiàn)對SpringBoot原生的定時任務(wù)進行動態(tài)管理,完全兼容原生@Scheduled注解,無需對原本的定時任務(wù)進行修改
二、快速使用
具體的功能已經(jīng)封裝成SpringBoot-starter即插即用
com.github.guoyixing spring-boot-starter-super-scheduled 0.3.1
三、實現(xiàn)原理
1、動態(tài)管理實現(xiàn)
(1) 配置管理介紹
@Component("superScheduledConfig") publicclassSuperScheduledConfig{ /** *執(zhí)行定時任務(wù)的線程池 */ privateThreadPoolTaskSchedulertaskScheduler; /** *定時任務(wù)名稱與定時任務(wù)回調(diào)鉤子的關(guān)聯(lián)關(guān)系容器 */ privateMapnameToScheduledFuture=newConcurrentHashMap<>(); /** *定時任務(wù)名稱與定時任務(wù)需要執(zhí)行的邏輯的關(guān)聯(lián)關(guān)系容器 */ privateMap nameToRunnable=newConcurrentHashMap<>(); /** *定時任務(wù)名稱與定時任務(wù)的源信息的關(guān)聯(lián)關(guān)系容器 */ privateMap nameToScheduledSource=newConcurrentHashMap<>(); /*普通的get/sets省略*/ }
(2) 使用后處理器攔截SpringBoot原本的定時任務(wù)
實現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文
實現(xiàn)BeanPostProcessor接口,將這個類標記為后處理器,后處理器會在每個bean實例化之后執(zhí)行
使用@DependsOn注解強制依賴SuperScheduledConfig類,讓SpringBoot實例化SuperScheduledPostProcessor類之前先實例化SuperScheduledConfig類
主要實現(xiàn)邏輯在postProcessAfterInitialization()方法中
@DependsOn({"superScheduledConfig"}) @Component @Order publicclassSuperScheduledPostProcessorimplementsBeanPostProcessor,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateApplicationContextapplicationContext; /** *實例化bean之前的操作 *@parambeanbean實例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessBeforeInitialization(Objectbean,StringbeanName)throwsBeansException{ returnbean; } /** *實例化bean之后的操作 *@parambeanbean實例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName)throwsBeansException{ //1.獲取配置管理器 SuperScheduledConfigsuperScheduledConfig=applicationContext.getBean(SuperScheduledConfig.class); //2.獲取當前實例化完成的bean的所有方法 Method[]methods=bean.getClass().getDeclaredMethods(); //循環(huán)處理對每個方法逐一處理 if(methods.length>0){ for(Methodmethod:methods){ //3.嘗試在該方法上獲取@Scheduled注解(SpringBoot的定時任務(wù)注解) Scheduledannotation=method.getAnnotation(Scheduled.class); //如果無法獲取到@Scheduled注解,就跳過這個方法 if(annotation==null){ continue; } //4.創(chuàng)建定時任務(wù)的源屬性 //創(chuàng)建定時任務(wù)的源屬性(用來記錄定時任務(wù)的配置,初始化的時候記錄的是注解上原本的屬性) ScheduledSourcescheduledSource=newScheduledSource(annotation,method,bean); //對注解上獲取到源屬性中的屬性進行檢測 if(!scheduledSource.check()){ thrownewSuperScheduledException("在"+beanName+"Bean中"+method.getName()+"方法的注解參數(shù)錯誤"); } //生成定時任務(wù)的名稱(id),使用beanName+“.”+方法名 Stringname=beanName+"."+method.getName(); //將以key-value的形式,將源數(shù)據(jù)存入配置管理器中,key:定時任務(wù)的名稱value:源數(shù)據(jù) superScheduledConfig.addScheduledSource(name,scheduledSource); try{ //5.將原本SpringBoot的定時任務(wù)取消掉 clearOriginalScheduled(annotation); }catch(Exceptione){ thrownewSuperScheduledException("在關(guān)閉原始方法"+beanName+method.getName()+"時出現(xiàn)錯誤"); } } } //最后bean保持原有返回 returnbean; } /** *修改注解原先的屬性 *@paramannotation注解實例對象 *@throwsException */ privatevoidclearOriginalScheduled(Scheduledannotation)throwsException{ changeAnnotationValue(annotation,"cron",Scheduled.CRON_DISABLED); changeAnnotationValue(annotation,"fixedDelay",-1L); changeAnnotationValue(annotation,"fixedDelayString",""); changeAnnotationValue(annotation,"fixedRate",-1L); changeAnnotationValue(annotation,"fixedRateString",""); changeAnnotationValue(annotation,"initialDelay",-1L); changeAnnotationValue(annotation,"initialDelayString",""); } /** *獲取SpringBoot的上下文 *@paramapplicationContextSpringBoot的上下文 */ @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(3) 使用ApplicationRunner初始化自定義的定時任務(wù)運行器
實現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文
使用@DependsOn注解強制依賴threadPoolTaskScheduler類
實現(xiàn)ApplicationRunner接口,在所有bean初始化結(jié)束之后,運行自定義邏輯
主要實現(xiàn)邏輯在run()方法中
@DependsOn("threadPoolTaskScheduler") @Component publicclassSuperScheduledApplicationRunnerimplementsApplicationRunner,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); privateApplicationContextapplicationContext; /** *定時任務(wù)配置管理器 */ @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *定時任務(wù)執(zhí)行線程 */ @Autowired privateThreadPoolTaskSchedulerthreadPoolTaskScheduler; @Override publicvoidrun(ApplicationArgumentsargs){ //1.定時任務(wù)配置管理器中緩存定時任務(wù)執(zhí)行線程 superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2.獲取所有定時任務(wù)源數(shù)據(jù) MapnameToScheduledSource=superScheduledConfig.getNameToScheduledSource(); //逐一處理定時任務(wù) for(Stringname:nameToScheduledSource.keySet()){ //3.獲取定時任務(wù)源數(shù)據(jù) ScheduledSourcescheduledSource=nameToScheduledSource.get(name); //4.獲取所有增強類 String[]baseStrengthenBeanNames=applicationContext.getBeanNamesForType(BaseStrengthen.class); //5.創(chuàng)建執(zhí)行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); //配置執(zhí)行控制器 runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6.逐一處理增強類(增強器實現(xiàn)原理后面具體分析) List points=newArrayList<>(baseStrengthenBeanNames.length); for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //7.將增強器代理成point ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //創(chuàng)建代理 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //8.所有的points連成起來 points.add(proxy); } //將point形成調(diào)用鏈 runnable.setChain(newChain(points)); //將執(zhí)行邏輯封裝并緩存到定時任務(wù)配置管理器中 superScheduledConfig.addRunnable(name,runnable::invoke); try{ //8.啟動定時任務(wù) ScheduledFuture>schedule=ScheduledFutureFactory.create(threadPoolTaskScheduler ,scheduledSource,runnable::invoke); //將線程回調(diào)鉤子存到任務(wù)配置管理器中 superScheduledConfig.addScheduledFuture(name,schedule); logger.info(df.format(LocalDateTime.now())+"任務(wù)"+name+"已經(jīng)啟動..."); }catch(Exceptione){ thrownewSuperScheduledException("任務(wù)"+name+"啟動失敗,錯誤信息:"+e.getLocalizedMessage()); } } } @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(4) 進行動態(tài)管理
@Component publicclassSuperScheduledManager{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *修改Scheduled的執(zhí)行周期 * *@paramnamescheduled的名稱 *@paramcroncron表達式 */ publicvoidsetScheduledCron(Stringname,Stringcron){ //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedDelay * *@paramnamescheduled的名稱 *@paramfixedDelay上一次執(zhí)行完畢時間點之后多長時間再執(zhí)行 */ publicvoidsetScheduledFixedDelay(Stringname,LongfixedDelay){ //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedRate * *@paramnamescheduled的名稱 *@paramfixedRate上一次開始執(zhí)行之后多長時間再執(zhí)行 */ publicvoidsetScheduledFixedRate(Stringname,LongfixedRate){ //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name,scheduledSource); } /** *查詢所有啟動的Scheduled */ publicListgetRunScheduledName(){ Set names=superScheduledConfig.getNameToScheduledFuture().keySet(); returnnewArrayList<>(names); } /** *查詢所有的Scheduled */ publicList getAllSuperScheduledName(){ Set names=superScheduledConfig.getNameToRunnable().keySet(); returnnewArrayList<>(names); } /** *終止Scheduled * *@paramnamescheduled的名稱 */ publicvoidcancelScheduled(Stringname){ ScheduledFuturescheduledFuture=superScheduledConfig.getScheduledFuture(name); scheduledFuture.cancel(true); superScheduledConfig.removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now())+"任務(wù)"+name+"已經(jīng)終止..."); } /** *啟動Scheduled * *@paramnamescheduled的名稱 *@paramscheduledSource定時任務(wù)的源信息 */ publicvoidaddScheduled(Stringname,ScheduledSourcescheduledSource){ if(getRunScheduledName().contains(name)){ thrownewSuperScheduledException("定時任務(wù)"+name+"已經(jīng)被啟動過了"); } if(!scheduledSource.check()){ thrownewSuperScheduledException("定時任務(wù)"+name+"源數(shù)據(jù)內(nèi)容錯誤"); } scheduledSource.refreshType(); Runnablerunnable=superScheduledConfig.getRunnable(name); ThreadPoolTaskSchedulertaskScheduler=superScheduledConfig.getTaskScheduler(); ScheduledFuture>schedule=ScheduledFutureFactory.create(taskScheduler,scheduledSource,runnable); logger.info(df.format(LocalDateTime.now())+"任務(wù)"+name+"已經(jīng)啟動..."); superScheduledConfig.addScheduledSource(name,scheduledSource); superScheduledConfig.addScheduledFuture(name,schedule); } /** *以cron類型啟動Scheduled * *@paramnamescheduled的名稱 *@paramcroncron表達式 */ publicvoidaddCronScheduled(Stringname,Stringcron){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *以fixedDelay類型啟動Scheduled * *@paramnamescheduled的名稱 *@paramfixedDelay上一次執(zhí)行完畢時間點之后多長時間再執(zhí)行 *@paraminitialDelay第一次執(zhí)行的延遲時間 */ publicvoidaddFixedDelayScheduled(Stringname,LongfixedDelay,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次執(zhí)行的延遲時間只能傳入一個參數(shù)"); } addScheduled(name,scheduledSource); } /** *以fixedRate類型啟動Scheduled * *@paramnamescheduled的名稱 *@paramfixedRate上一次開始執(zhí)行之后多長時間再執(zhí)行 *@paraminitialDelay第一次執(zhí)行的延遲時間 */ publicvoidaddFixedRateScheduled(Stringname,LongfixedRate,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedRate(fixedRate); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次執(zhí)行的延遲時間只能傳入一個參數(shù)"); } addScheduled(name,scheduledSource); } /** *手動執(zhí)行一次任務(wù) * *@paramnamescheduled的名稱 */ publicvoidrunScheduled(Stringname){ Runnablerunnable=superScheduledConfig.getRunnable(name); runnable.run(); } }
2、增強接口實現(xiàn)
增強器實現(xiàn)的整體思路與SpringAop的思路一致,實現(xiàn)沒有Aop復(fù)雜
(1) 增強接口
@Order(Ordered.HIGHEST_PRECEDENCE) publicinterfaceBaseStrengthen{ /** *前置強化方法 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執(zhí)行的方法對象 *@paramargs方法參數(shù) */ voidbefore(Objectbean,Methodmethod,Object[]args); /** *后置強化方法 *出現(xiàn)異常不會執(zhí)行 *如果未出現(xiàn)異常,在afterFinally方法之后執(zhí)行 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執(zhí)行的方法對象 *@paramargs方法參數(shù) */ voidafter(Objectbean,Methodmethod,Object[]args); /** *異常強化方法 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執(zhí)行的方法對象 *@paramargs方法參數(shù) */ voidexception(Objectbean,Methodmethod,Object[]args); /** *Finally強化方法,出現(xiàn)異常也會執(zhí)行 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執(zhí)行的方法對象 *@paramargs方法參數(shù) */ voidafterFinally(Objectbean,Methodmethod,Object[]args); }
(2) 代理抽象類
publicabstractclassPoint{ /** *定時任務(wù)名 */ privateStringsuperScheduledName; /** *抽象的執(zhí)行方法,使用代理實現(xiàn) *@paramrunnable定時任務(wù)執(zhí)行器 */ publicabstractObjectinvoke(SuperScheduledRunnablerunnable); /*普通的get/sets省略*/ }
(3) 調(diào)用鏈類
publicclassChain{ privateListlist; privateintindex=-1; /** *索引自增1 */ publicintincIndex(){ return++index; } /** *索引還原 */ publicvoidresetIndex(){ this.index=-1; } }
(4) cglib動態(tài)代理實現(xiàn)
使用cglib代理增強器,將增強器全部代理成調(diào)用鏈節(jié)點Point
publicclassRunnableBaseInterceptorimplementsMethodInterceptor{ /** *定時任務(wù)執(zhí)行器 */ privateSuperScheduledRunnablerunnable; /** *定時任務(wù)增強類 */ privateBaseStrengthenstrengthen; @Override publicObjectintercept(Objectobj,Methodmethod,Object[]args,MethodProxymethodProxy)throwsThrowable{ Objectresult; //如果執(zhí)行的是invoke()方法 if("invoke".equals(method.getName())){ //前置強化方法 strengthen.before(obj,method,args); try{ //調(diào)用執(zhí)行器中的invoke()方法 result=runnable.invoke(); }catch(Exceptione){ //異常強化方法 strengthen.exception(obj,method,args); thrownewSuperScheduledException(strengthen.getClass()+"中強化執(zhí)行時發(fā)生錯誤",e); }finally{ //Finally強化方法,出現(xiàn)異常也會執(zhí)行 strengthen.afterFinally(obj,method,args); } //后置強化方法 strengthen.after(obj,method,args); }else{ //直接執(zhí)行方法 result=methodProxy.invokeSuper(obj,args); } returnresult; } publicRunnableBaseInterceptor(Objectobject,SuperScheduledRunnablerunnable){ this.runnable=runnable; if(BaseStrengthen.class.isAssignableFrom(object.getClass())){ this.strengthen=(BaseStrengthen)object; }else{ thrownewSuperScheduledException(object.getClass()+"對象不是BaseStrengthen類型"); } } publicRunnableBaseInterceptor(){ } }
(5) 定時任務(wù)執(zhí)行器實現(xiàn)
publicclassSuperScheduledRunnable{ /** *原始的方法 */ privateMethodmethod; /** *方法所在的bean */ privateObjectbean; /** *增強器的調(diào)用鏈 */ privateChainchain; publicObjectinvoke(){ Objectresult; //索引自增1 if(chain.incIndex()==chain.getList().size()){ //調(diào)用鏈中的增強方法已經(jīng)全部執(zhí)行結(jié)束 try{ //調(diào)用鏈索引初始化 chain.resetIndex(); //增強器全部執(zhí)行完畢,執(zhí)行原本的方法 result=method.invoke(bean); }catch(IllegalAccessException|InvocationTargetExceptione){ thrownewSuperScheduledException(e.getLocalizedMessage()); } }else{ //獲取被代理后的方法增強器 Pointpoint=chain.getList().get(chain.getIndex()); //執(zhí)行增強器代理 //增強器代理中,會回調(diào)方法執(zhí)行器,形成調(diào)用鏈,逐一運行調(diào)用鏈中的增強器 result=point.invoke(this); } returnresult; } /*普通的get/sets省略*/ }
(6) 增強器代理邏輯
com.gyx.superscheduled.core.SuperScheduledApplicationRunner類中的代碼片段
//創(chuàng)建執(zhí)行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //用來存放增強器的代理對象 Listpoints=newArrayList<>(baseStrengthenBeanNames.length); //循環(huán)所有的增強器的beanName for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //獲取增強器的bean對象 ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //將增強器代理成Point節(jié)點 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //增強器的代理對象緩存到list中 points.add(proxy); } //將增強器代理實例的集合生成調(diào)用鏈 //執(zhí)行控制器中設(shè)置調(diào)用鏈 runnable.setChain(newChain(points));
審核編輯:劉清
-
處理器
+關(guān)注
關(guān)注
68文章
19882瀏覽量
234943 -
控制器
+關(guān)注
關(guān)注
114文章
17088瀏覽量
184076 -
增強器
+關(guān)注
關(guān)注
1文章
48瀏覽量
8497 -
SpringBoot
+關(guān)注
關(guān)注
0文章
175瀏覽量
390
原文標題:SpringBoot 定時任務(wù)動態(tài)管理通用解決方案
文章出處:【微信號:AndroidPush,微信公眾號:Android編程精選】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
【API】多pk分組&用戶場景&通用定時任務(wù)api已經(jīng)發(fā)布
Linux系統(tǒng)定時任務(wù)Crond
求一種可網(wǎng)絡(luò)化管理和配置機頂盒的網(wǎng)絡(luò)解決方案
SpringBoot如何實現(xiàn)動態(tài)增刪啟停定時任務(wù)

Python定時任務(wù)的實現(xiàn)方式
如何在SpringBoot項目中實現(xiàn)動態(tài)定時任務(wù)
說說Spring定時任務(wù)如何大規(guī)模企業(yè)級運用
解析Golang定時任務(wù)庫gron設(shè)計和原理
SpringBoot如何實現(xiàn)定時任務(wù)(下)

SpringBoot如何實現(xiàn)定時任務(wù)(上)

Spring Boot中整合兩種定時任務(wù)的方法

如何動態(tài)添加修改刪除定時任務(wù)?
python定時任務(wù)實踐

評論