Quartz是一个强大的作业调度框架,在这之前我们已经使用Quartz实现了很多自定义的定时任务,如数据库备份,定时发送消息等,并且很多分布式作业调度中心系统均以它为基础实现此类功能。这篇是在单体的Springboot 2.*应用下集成quartz框架,实现动态更改任务执行状态及触发时间等。
初始配置
pom.xml添加依赖 quartz:
<dependency> <groupid>org.quartz-scheduler</groupid> <artifactid>quartz</artifactid> <version>2.3.1</version> </dependency>
SpringBoot集成Quartz使用工厂Bean生成Bean的方式。
我们首先需要配置工厂Bean,负责生成实现了Job接口的类的实例对象Bean。
@Component("storyJobFactory") public class StoryJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { beanFactory = context.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } }
SchedulerConfig
类通过 @Configuration
自动装载配置。SchedulerFactoryBean
是Quartz为我们提供的一个生成Scheduler
实例Bean的工厂类。Scheduler
主要负责调度任务。quartzProperties
配置参数可以自由配置Quartz的一些基本参数属性。
@Configuration public class SchedulerConfig { @Autowired @Qualifier("storyJobFactory") private StoryJobFactory storyJobFactory; @Bean(name="storySchedulerFactory") public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setQuartzProperties(quartzProperties()); factory.setOverwriteExistingJobs(true); factory.setJobFactory(storyJobFactory); return factory; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactory = new PropertiesFactoryBean(); propertiesFactory.setLocation(new ClassPathResource("/quartz.properties")); propertiesFactory.afterPropertiesSet(); return propertiesFactory.getObject(); } }
新增 quartz.properties
配置文件:
org.quartz.scheduler.instanceId=STORY_ADMIN org.quartz.scheduler.instanceName=StoryAdminQuartzScheduler org.quartz.threadPool.threadCount=5
其他的配置属性可以查相关的文档。
定义一个任务
首先我们来明确几个概念:Scheduler
:主要负责调度任务。Trigger
:负责执行任务的触发器。JobListener
:任务执行的监听器,可以监听到任务执行前、后以及未能成功执行抛出异常。Job
及JobDetail
:被执行的任务,Scheduler
真正调度的对象。
知道了Scheduler
,那么我们需要要知道如何启动,停止,变更以及移除一个任务,所以定义任务动态变更的服务类,包含对任务用到的所有操作。
@Service public class StorySchedulerService { private static final String JOB_GROUP = "STORY_JOB_GROUP"; private static final String TRIGGER_GROUP = "STORY_TRIGGER_GROUP"; @Autowired @Qualifier("storySchedulerFactory") private SchedulerFactoryBean schedulerFactory; @Autowired private StoryJobListener storyJobListener; /** * 新增定时任务(默认启动) * @param jobId * @param jobClazz * @param cron * @param startTime */ public void addJob(String jobId, String jobClazz, String cron, Date startTime) { addJob(jobId, jobClazz, cron, startTime, true); } /** * 新增定时任务 * @param jobId * @param jobClazz * @param cron * @param startTime * @param startJob */ @SuppressWarnings({ "rawtypes", "unchecked" }) public void addJob(String jobId, String jobClazz, String cron, Date startTime, boolean startJob) { try { Scheduler scheduler = schedulerFactory.getScheduler(); Class jobClass = Class.forName(jobClazz); JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobId, JOB_GROUP).build();// 设置Job的名字和组 TriggerBuilder<trigger> triggerBuilder = TriggerBuilder.newTrigger(); triggerBuilder.withIdentity(jobId, TRIGGER_GROUP); if(startTime == null || new Date().compareTo(startTime) > 0) { triggerBuilder.startNow(); } else { triggerBuilder.startAt(startTime); } triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); CronTrigger trigger = (CronTrigger) triggerBuilder.build(); scheduler.scheduleJob(jobDetail, trigger); scheduler.getListenerManager().addJobListener(storyJobListener); if(!startJob) { pauseJob(jobId); } } catch (Exception e) { throw new RuntimeException(e); } }</trigger> /** * 修改一个任务的触发时间 * @param jobId * @param cron * @param startTime */ public void updateJobCron(String jobId, String cron, Date startTime) { try { Scheduler scheduler = schedulerFactory.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(jobId, TRIGGER_GROUP); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (trigger == null) { return; } String oldTime = trigger.getCronExpression(); if (!oldTime.equalsIgnoreCase(cron)) { TriggerBuilder<trigger> triggerBuilder = TriggerBuilder.newTrigger(); triggerBuilder.withIdentity(jobId, TRIGGER_GROUP); if(startTime == null || new Date().compareTo(startTime) > 0) { triggerBuilder.startNow(); } else { triggerBuilder.startAt(startTime); } triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); trigger = (CronTrigger) triggerBuilder.build(); scheduler.rescheduleJob(triggerKey, trigger); } } catch (Exception e) { throw new RuntimeException(e); } }</trigger> /** * 移除指定jobId的定时任务 * @param jobId */ public void removeJob(String jobId) { try { Scheduler scheduler = schedulerFactory.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(jobId, TRIGGER_GROUP); scheduler.pauseTrigger(triggerKey); scheduler.unscheduleJob(triggerKey); scheduler.deleteJob(JobKey.jobKey(jobId, JOB_GROUP)); } catch (Exception e) { throw new RuntimeException(e); } } /** * 暂停指定jobId的定时任务 * @param jobId */ public void pauseJob(String jobId) { try { Scheduler scheduler = schedulerFactory.getScheduler(); JobKey jobKey = JobKey.jobKey(jobId, JOB_GROUP); scheduler.pauseJob(jobKey); } catch (Exception e) { throw new RuntimeException(e); } } /** * 恢复指定jobId的定时任务 * @param jobId */ public void resumeJob(String jobId) { try { Scheduler scheduler = schedulerFactory.getScheduler(); JobKey jobKey = JobKey.jobKey(jobId, JOB_GROUP); scheduler.resumeJob(jobKey); } catch (Exception e) { throw new RuntimeException(e); } } /** * 启动所有定时任务 */ public void startJobs() { try { Scheduler sched = schedulerFactory.getScheduler(); sched.start(); } catch (Exception e) { throw new RuntimeException(e); } } /** * 关闭所有定时任务 */ public void shutdownJobs() { try { Scheduler sched = schedulerFactory.getScheduler(); if (!sched.isShutdown()) { sched.shutdown(); } } catch (Exception e) { throw new RuntimeException(e); } } }
定义任务监听器StoryJobListener
:
@Slf4j @Service public class StoryJobListener implements JobListener { public static final String LISTENER_NAME = "StoryJobListener"; @Autowired private ScheduleJobService scheduleJobService; public StoryJobListener(){} @Override public String getName() { return LISTENER_NAME; } /** * 任务执行前 * @param context */ @Override public void jobToBeExecuted(JobExecutionContext context) { String jobName = context.getJobDetail().getKey().toString(); System.out.println("jobToBeExecuted"); System.out.println("Job : " + jobName + " is going to start..."); } /** * 任务被否决 * @param context */ @Override public void jobExecutionVetoed(JobExecutionContext context) { System.out.println("jobExecutionVetoed"); } @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { String jobId = context.getJobDetail().getKey().getName(); String failReason = null; if(jobException != null) { failReason = ExceptionUtils.getStackMsg(jobException); } try { scheduleJobService.updateRuntimeJob(jobId, context.getFireTime(), context.getPreviousFireTime(), context.getNextFireTime(), failReason); } catch (StoryServiceException e) { log.error("updateRuntimeJob {} error", jobId, e); } } }
这里我们在容器启动完成后,立即启动我们的计划任务。
可以看到,所有的Job都是我们从数据库中检索出的任务,自动添加到执行计划中。
@Component @Slf4j public class ScheduleJobStarter implements ApplicationRunner { @Autowired private StorySchedulerService storySchedulerService; @Autowired private ScheduleJobService scheduleJobService; /** * 会在服务启动完成后立即执行 */ @Override public void run(ApplicationArguments args) { log.info("================= 启动所有定时任务 ================"); QueryWrapper<schedulejob> query= new QueryWrapper<>(); query.eq("yn_flag", YNFlagStatusEnum.VALID.getCode()); query.eq("start_job",true); query.orderByAsc("start_time");</schedulejob> List<schedulejob> startJobs = scheduleJobService.list(query); startJobs.stream().forEach(j -> { storySchedulerService.addJob(j.getJobId(), j.getJobClass(), j.getCron(), j.getStartTime()); }); storySchedulerService.startJobs(); } }
ApplicationRunner
接口是spring boot提供给我们的一个接口,能够在容器启动完成之后,执行我们自定义的任务,例如这里我们启动所有的Job。
自定义一个我们自己的Job抽象父类,实现Job接口
public abstract class BaseJob implements Job { }
我们第一个具体的Job实现类
@Slf4j @ScheduleAnnotation(jobId = "StoryDemoJob", jobName = "框架演示Job") public class StoryDemoJob extends BaseJob { @Override public void execute(JobExecutionContext context){ log.info("STORY-ADMIN: 你好!"); } }
可以看到这里用到了@ScheduleAnnotation
注解,这是一个自定义的注解,用来在我们的任务上标识这是一个Job,后面我们用于扫描所有的任务,在页面上生成下拉选项。
@Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.TYPE, ElementType.METHOD }) @Documented public @interface ScheduleAnnotation { /** * 任务编号,全局必须唯一 * @return */ String jobId() default ""; /** * 任务名称 * * @return */ String jobName() default ""; /** * 任务是否废弃 * * @return */ boolean discard() default false; }
动态增删变更任务
既然要动态的变更任务,那我们就将任务保存在数据库中,没此项目启动,就不用再次配置。定义我们的PO类,上面的用到的ScheduleJob
便是出自这里:
/** * 定时任务 */ @Data @EqualsAndHashCode(callSuper = true) @Accessors(chain = true) @TableName("st_schedule_job") public class ScheduleJob extends BaseEntity { private static final long serialVersionUID = 1L; //JobID private String jobId; //Job名称 private String jobName; //cron表达式 private String cron; //启动状态 private Boolean startJob; //方法 private String jobClass; //开始时间 private Date startTime; //触发时间 private Date fireTime; //上次触发时间 private Date lastFireTime; //下次触发时间 private Date nextFireTime; //Job描述 private String jobDesc; //失败原因 private String failReason; //有效标志 private String ynFlag; //创建人 private String creator; //修改人 private String editor; //创建时间 private Date createdTime; //修改时间 private Date modifiedTime; }
其次就是增删改查了,简单列举了增删改的方法,可以看到在持久化的同时也动态的修改了计划任务。
@Slf4j @RestController @RequestMapping("/sysmgr/schedulejob") public class ScheduleJobController { @Autowired ScheduleJobService scheduleJobService; @Autowired private StorySchedulerService storySchedulerService; /** * 保存 * @param scheduleJob * @return */ @RequestMapping(value="/save",method = {RequestMethod.POST}) public Result save(@RequestBody ScheduleJob scheduleJob){ boolean startJob = scheduleJob.getStartJob() != null ? scheduleJob.getStartJob().booleanValue() : false; Date currentDate= DateUtils.currentDate(); scheduleJob.setEditor(UserContext.getCurrentUser().getAccount()); scheduleJob.setModifiedTime(currentDate); if(scheduleJob.getId()!=null){ scheduleJobService.updateById(scheduleJob); if(startJob) { storySchedulerService.updateJobCron(scheduleJob.getJobId(), scheduleJob.getCron(), scheduleJob.getStartTime()); storySchedulerService.resumeJob(scheduleJob.getJobId()); } else { storySchedulerService.pauseJob(scheduleJob.getJobId()); } }else{ scheduleJob.setCreator(UserContext.getCurrentUser().getAccount()); scheduleJob.setCreatedTime(currentDate); scheduleJob.setYnFlag(YNFlagStatusEnum.VALID.getCode()); scheduleJobService.save(scheduleJob); storySchedulerService.addJob(scheduleJob.getJobId(), scheduleJob.getJobClass(), scheduleJob.getCron(), scheduleJob.getStartTime(), startJob); } return new Result(true,null,null, Constants.TOKEN_CHECK_SUCCESS); } /** * 删除 * @param scheduleJob * @return */ @RequestMapping(value="/delete",method = {RequestMethod.POST}) public Result dropById(@RequestBody ScheduleJob scheduleJob){ Result result ; if(scheduleJob.getId()!=null){ ScheduleJob delScheduleJob= new ScheduleJob(); delScheduleJob.setId(scheduleJob.getId()); delScheduleJob.setYnFlag("0"); delScheduleJob.setEditor(UserContext.getCurrentUser().getAccount()); delScheduleJob.setModifiedTime(Date.from(Instant.now())); result=new Result(scheduleJobService.updateById(delScheduleJob),null,null,Constants.TOKEN_CHECK_SUCCESS); storySchedulerService.removeJob(scheduleJob.getJobId()); }else{ result = new Result(false, "", null ,Constants.PARAMETERS_MISSING); } return result; } /** * 获取下拉选项 * @return */ @RequestMapping(value="/job-options",method = {RequestMethod.POST,RequestMethod.GET}) public Result getJobCombo() { List<schedulejob> jobList= this.findScheduleJobCombo(); return new Result(true, "", jobList ,Constants.TOKEN_CHECK_SUCCESS); } private List<schedulejob> findScheduleJobCombo() { List<schedulejob> vos = Lists.newArrayList(); List<class<?>> clsList = AnnotationUtils.getClasses("com.story.storyadmin.scheduler"); if(CollectionUtils.isNotEmpty(clsList)) { clsList.stream().forEach(c -> { Annotation[] annotations = c.getAnnotations(); if (annotations != null && annotations.length > 0) { if (annotations[0] instanceof ScheduleAnnotation) { ScheduleAnnotation annotation = (ScheduleAnnotation) annotations[0]; ScheduleJob vo = new ScheduleJob(); vo.setJobId(annotation.jobId()); vo.setJobName(annotation.jobName()); vo.setJobClass(c.getName()); vos.add(vo); } } }); } return vos; } }
关于任务的持久化,这里省略了一些方法,当然核心的东西,我们已经实现了。

