Quartz是一个强大的作业调度框架,在这之前我们已经使用Quartz实现了很多自定义的定时任务,如数据库备份,定时发送消息等,并且很多分布式作业调度中心系统均以它为基础实现此类功能。这篇是在单体的Springboot 2.*应用下集成quartz框架,实现动态更改任务执行状态及触发时间等。
初始配置
pom.xml添加依赖 quartz:
<dependency>
<groupid>org.quartz-scheduler</groupid>
<artifactid>quartz</artifactid>
<version>2.3.1</version>
</dependency>
SpringBoot集成Quartz使用工厂Bean生成Bean的方式。
我们首先需要配置工厂Bean,负责生成实现了Job接口的类的实例对象Bean。
@Component("storyJobFactory")
public class StoryJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
SchedulerConfig
类通过 @Configuration
自动装载配置。SchedulerFactoryBean
是Quartz为我们提供的一个生成Scheduler
实例Bean的工厂类。Scheduler
主要负责调度任务。quartzProperties
配置参数可以自由配置Quartz的一些基本参数属性。
@Configuration
public class SchedulerConfig {
@Autowired
@Qualifier("storyJobFactory")
private StoryJobFactory storyJobFactory;
@Bean(name="storySchedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(quartzProperties());
factory.setOverwriteExistingJobs(true);
factory.setJobFactory(storyJobFactory);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactory = new PropertiesFactoryBean();
propertiesFactory.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactory.afterPropertiesSet();
return propertiesFactory.getObject();
}
}
新增 quartz.properties
配置文件:
org.quartz.scheduler.instanceId=STORY_ADMIN
org.quartz.scheduler.instanceName=StoryAdminQuartzScheduler
org.quartz.threadPool.threadCount=5
其他的配置属性可以查相关的文档。
定义一个任务
首先我们来明确几个概念:Scheduler
:主要负责调度任务。Trigger
:负责执行任务的触发器。JobListener
:任务执行的监听器,可以监听到任务执行前、后以及未能成功执行抛出异常。Job
及JobDetail
:被执行的任务,Scheduler
真正调度的对象。
知道了Scheduler
,那么我们需要要知道如何启动,停止,变更以及移除一个任务,所以定义任务动态变更的服务类,包含对任务用到的所有操作。
@Service
public class StorySchedulerService {
private static final String JOB_GROUP = "STORY_JOB_GROUP";
private static final String TRIGGER_GROUP = "STORY_TRIGGER_GROUP";
@Autowired
@Qualifier("storySchedulerFactory")
private SchedulerFactoryBean schedulerFactory;
@Autowired
private StoryJobListener storyJobListener;
/**
* 新增定时任务(默认启动)
* @param jobId
* @param jobClazz
* @param cron
* @param startTime
*/
public void addJob(String jobId, String jobClazz, String cron, Date startTime) {
addJob(jobId, jobClazz, cron, startTime, true);
}
/**
* 新增定时任务
* @param jobId
* @param jobClazz
* @param cron
* @param startTime
* @param startJob
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void addJob(String jobId, String jobClazz, String cron, Date startTime, boolean startJob) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
Class jobClass = Class.forName(jobClazz);
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobId, JOB_GROUP).build();// 设置Job的名字和组
TriggerBuilder<trigger> triggerBuilder = TriggerBuilder.newTrigger();
triggerBuilder.withIdentity(jobId, TRIGGER_GROUP);
if(startTime == null || new Date().compareTo(startTime) > 0) {
triggerBuilder.startNow();
} else {
triggerBuilder.startAt(startTime);
}
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
CronTrigger trigger = (CronTrigger) triggerBuilder.build();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.getListenerManager().addJobListener(storyJobListener);
if(!startJob) {
pauseJob(jobId);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}</trigger>
/**
* 修改一个任务的触发时间
* @param jobId
* @param cron
* @param startTime
*/
public void updateJobCron(String jobId, String cron, Date startTime) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobId, TRIGGER_GROUP);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(cron)) {
TriggerBuilder<trigger> triggerBuilder = TriggerBuilder.newTrigger();
triggerBuilder.withIdentity(jobId, TRIGGER_GROUP);
if(startTime == null || new Date().compareTo(startTime) > 0) {
triggerBuilder.startNow();
} else {
triggerBuilder.startAt(startTime);
}
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
trigger = (CronTrigger) triggerBuilder.build();
scheduler.rescheduleJob(triggerKey, trigger);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}</trigger>
/**
* 移除指定jobId的定时任务
* @param jobId
*/
public void removeJob(String jobId) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobId, TRIGGER_GROUP);
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(JobKey.jobKey(jobId, JOB_GROUP));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 暂停指定jobId的定时任务
* @param jobId
*/
public void pauseJob(String jobId) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
JobKey jobKey = JobKey.jobKey(jobId, JOB_GROUP);
scheduler.pauseJob(jobKey);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 恢复指定jobId的定时任务
* @param jobId
*/
public void resumeJob(String jobId) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
JobKey jobKey = JobKey.jobKey(jobId, JOB_GROUP);
scheduler.resumeJob(jobKey);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 启动所有定时任务
*/
public void startJobs() {
try {
Scheduler sched = schedulerFactory.getScheduler();
sched.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 关闭所有定时任务
*/
public void shutdownJobs() {
try {
Scheduler sched = schedulerFactory.getScheduler();
if (!sched.isShutdown()) {
sched.shutdown();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
定义任务监听器StoryJobListener
:
@Slf4j
@Service
public class StoryJobListener implements JobListener {
public static final String LISTENER_NAME = "StoryJobListener";
@Autowired
private ScheduleJobService scheduleJobService;
public StoryJobListener(){}
@Override
public String getName() {
return LISTENER_NAME;
}
/**
* 任务执行前
* @param context
*/
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
System.out.println("jobToBeExecuted");
System.out.println("Job : " + jobName + " is going to start...");
}
/**
* 任务被否决
* @param context
*/
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
System.out.println("jobExecutionVetoed");
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
String jobId = context.getJobDetail().getKey().getName();
String failReason = null;
if(jobException != null) {
failReason = ExceptionUtils.getStackMsg(jobException);
}
try {
scheduleJobService.updateRuntimeJob(jobId, context.getFireTime(), context.getPreviousFireTime(), context.getNextFireTime(), failReason);
} catch (StoryServiceException e) {
log.error("updateRuntimeJob {} error", jobId, e);
}
}
}
这里我们在容器启动完成后,立即启动我们的计划任务。
可以看到,所有的Job都是我们从数据库中检索出的任务,自动添加到执行计划中。
@Component
@Slf4j
public class ScheduleJobStarter implements ApplicationRunner {
@Autowired
private StorySchedulerService storySchedulerService;
@Autowired
private ScheduleJobService scheduleJobService;
/**
* 会在服务启动完成后立即执行
*/
@Override
public void run(ApplicationArguments args) {
log.info("================= 启动所有定时任务 ================");
QueryWrapper<schedulejob> query= new QueryWrapper<>();
query.eq("yn_flag", YNFlagStatusEnum.VALID.getCode());
query.eq("start_job",true);
query.orderByAsc("start_time");</schedulejob>
List<schedulejob> startJobs = scheduleJobService.list(query);
startJobs.stream().forEach(j -> {
storySchedulerService.addJob(j.getJobId(), j.getJobClass(), j.getCron(), j.getStartTime());
});
storySchedulerService.startJobs();
}
}
ApplicationRunner
接口是spring boot提供给我们的一个接口,能够在容器启动完成之后,执行我们自定义的任务,例如这里我们启动所有的Job。
自定义一个我们自己的Job抽象父类,实现Job接口
public abstract class BaseJob implements Job {
}
我们第一个具体的Job实现类
@Slf4j
@ScheduleAnnotation(jobId = "StoryDemoJob", jobName = "框架演示Job")
public class StoryDemoJob extends BaseJob {
@Override
public void execute(JobExecutionContext context){
log.info("STORY-ADMIN: 你好!");
}
}
可以看到这里用到了@ScheduleAnnotation
注解,这是一个自定义的注解,用来在我们的任务上标识这是一个Job,后面我们用于扫描所有的任务,在页面上生成下拉选项。
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@Documented
public @interface ScheduleAnnotation {
/**
* 任务编号,全局必须唯一
* @return
*/
String jobId() default "";
/**
* 任务名称
*
* @return
*/
String jobName() default "";
/**
* 任务是否废弃
*
* @return
*/
boolean discard() default false;
}
动态增删变更任务
既然要动态的变更任务,那我们就将任务保存在数据库中,没此项目启动,就不用再次配置。定义我们的PO类,上面的用到的ScheduleJob
便是出自这里:
/**
* 定时任务
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName("st_schedule_job")
public class ScheduleJob extends BaseEntity {
private static final long serialVersionUID = 1L;
//JobID
private String jobId;
//Job名称
private String jobName;
//cron表达式
private String cron;
//启动状态
private Boolean startJob;
//方法
private String jobClass;
//开始时间
private Date startTime;
//触发时间
private Date fireTime;
//上次触发时间
private Date lastFireTime;
//下次触发时间
private Date nextFireTime;
//Job描述
private String jobDesc;
//失败原因
private String failReason;
//有效标志
private String ynFlag;
//创建人
private String creator;
//修改人
private String editor;
//创建时间
private Date createdTime;
//修改时间
private Date modifiedTime;
}
其次就是增删改查了,简单列举了增删改的方法,可以看到在持久化的同时也动态的修改了计划任务。
@Slf4j
@RestController
@RequestMapping("/sysmgr/schedulejob")
public class ScheduleJobController {
@Autowired
ScheduleJobService scheduleJobService;
@Autowired
private StorySchedulerService storySchedulerService;
/**
* 保存
* @param scheduleJob
* @return
*/
@RequestMapping(value="/save",method = {RequestMethod.POST})
public Result save(@RequestBody ScheduleJob scheduleJob){
boolean startJob = scheduleJob.getStartJob() != null ? scheduleJob.getStartJob().booleanValue() : false;
Date currentDate= DateUtils.currentDate();
scheduleJob.setEditor(UserContext.getCurrentUser().getAccount());
scheduleJob.setModifiedTime(currentDate);
if(scheduleJob.getId()!=null){
scheduleJobService.updateById(scheduleJob);
if(startJob) {
storySchedulerService.updateJobCron(scheduleJob.getJobId(), scheduleJob.getCron(), scheduleJob.getStartTime());
storySchedulerService.resumeJob(scheduleJob.getJobId());
} else {
storySchedulerService.pauseJob(scheduleJob.getJobId());
}
}else{
scheduleJob.setCreator(UserContext.getCurrentUser().getAccount());
scheduleJob.setCreatedTime(currentDate);
scheduleJob.setYnFlag(YNFlagStatusEnum.VALID.getCode());
scheduleJobService.save(scheduleJob);
storySchedulerService.addJob(scheduleJob.getJobId(), scheduleJob.getJobClass(), scheduleJob.getCron(), scheduleJob.getStartTime(), startJob);
}
return new Result(true,null,null, Constants.TOKEN_CHECK_SUCCESS);
}
/**
* 删除
* @param scheduleJob
* @return
*/
@RequestMapping(value="/delete",method = {RequestMethod.POST})
public Result dropById(@RequestBody ScheduleJob scheduleJob){
Result result ;
if(scheduleJob.getId()!=null){
ScheduleJob delScheduleJob= new ScheduleJob();
delScheduleJob.setId(scheduleJob.getId());
delScheduleJob.setYnFlag("0");
delScheduleJob.setEditor(UserContext.getCurrentUser().getAccount());
delScheduleJob.setModifiedTime(Date.from(Instant.now()));
result=new Result(scheduleJobService.updateById(delScheduleJob),null,null,Constants.TOKEN_CHECK_SUCCESS);
storySchedulerService.removeJob(scheduleJob.getJobId());
}else{
result = new Result(false, "", null ,Constants.PARAMETERS_MISSING);
}
return result;
}
/**
* 获取下拉选项
* @return
*/
@RequestMapping(value="/job-options",method = {RequestMethod.POST,RequestMethod.GET})
public Result getJobCombo() {
List<schedulejob> jobList= this.findScheduleJobCombo();
return new Result(true, "", jobList ,Constants.TOKEN_CHECK_SUCCESS);
}
private List<schedulejob> findScheduleJobCombo() {
List<schedulejob> vos = Lists.newArrayList();
List<class<?>> clsList = AnnotationUtils.getClasses("com.story.storyadmin.scheduler");
if(CollectionUtils.isNotEmpty(clsList)) {
clsList.stream().forEach(c -> {
Annotation[] annotations = c.getAnnotations();
if (annotations != null && annotations.length > 0) {
if (annotations[0] instanceof ScheduleAnnotation) {
ScheduleAnnotation annotation = (ScheduleAnnotation) annotations[0];
ScheduleJob vo = new ScheduleJob();
vo.setJobId(annotation.jobId());
vo.setJobName(annotation.jobName());
vo.setJobClass(c.getName());
vos.add(vo);
}
}
});
}
return vos;
}
}
关于任务的持久化,这里省略了一些方法,当然核心的东西,我们已经实现了。