Springboot集成Quartz实现动态定时任务

Quartz是一个强大的作业调度框架,在这之前我们已经使用Quartz实现了很多自定义的定时任务,如数据库备份,定时发送消息等,并且很多分布式作业调度中心系统均以它为基础实现此类功能。这篇是在单体的Springboot 2.*应用下集成quartz框架,实现动态更改任务执行状态及触发时间等。

初始配置

pom.xml添加依赖 quartz:

<dependency>
	<groupId>org.quartz-scheduler</groupId>
	<artifactId>quartz</artifactId>
	<version>2.3.1</version>
</dependency>

SpringBoot集成Quartz使用工厂Bean生成Bean的方式。
我们首先需要配置工厂Bean,负责生成实现了Job接口的类的实例对象Bean。

@Component("storyJobFactory")
public class StoryJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

	private transient AutowireCapableBeanFactory beanFactory;

	@Override
	public void setApplicationContext(ApplicationContext context) throws BeansException {
		beanFactory = context.getAutowireCapableBeanFactory();
	}

	@Override
	protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
		final Object job = super.createJobInstance(bundle);
		beanFactory.autowireBean(job);
		return job;
	}
}

SchedulerConfig类通过 @Configuration 自动装载配置。
SchedulerFactoryBean是Quartz为我们提供的一个生成Scheduler实例Bean的工厂类。Scheduler主要负责调度任务。
quartzProperties配置参数可以自由配置Quartz的一些基本参数属性。

@Configuration
public class SchedulerConfig {
	
	@Autowired
	@Qualifier("storyJobFactory")
    private StoryJobFactory storyJobFactory;
	
	@Bean(name="storySchedulerFactory")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(quartzProperties());
        factory.setOverwriteExistingJobs(true);
        factory.setJobFactory(storyJobFactory);
        return factory;
    }
	
	@Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactory = new PropertiesFactoryBean();
        propertiesFactory.setLocation(new ClassPathResource("/quartz.properties"));
        propertiesFactory.afterPropertiesSet();
        return propertiesFactory.getObject();
    }
}

新增 quartz.properties 配置文件:

org.quartz.scheduler.instanceId=STORY_ADMIN
org.quartz.scheduler.instanceName=StoryAdminQuartzScheduler
org.quartz.threadPool.threadCount=5

其他的配置属性可以查相关的文档。


定义一个任务

首先我们来明确几个概念:
Scheduler:主要负责调度任务。
Trigger:负责执行任务的触发器。
JobListener:任务执行的监听器,可以监听到任务执行前、后以及未能成功执行抛出异常。
JobJobDetail:被执行的任务,Scheduler真正调度的对象。

知道了Scheduler,那么我们需要要知道如何启动,停止,变更以及移除一个任务,所以定义任务动态变更的服务类,包含对任务用到的所有操作。

@Service
public class StorySchedulerService {

	private static final String JOB_GROUP = "STORY_JOB_GROUP";
	private static final String TRIGGER_GROUP = "STORY_TRIGGER_GROUP";
	
	@Autowired
	@Qualifier("storySchedulerFactory")
	private SchedulerFactoryBean schedulerFactory;

	@Autowired
	private StoryJobListener storyJobListener;

	/**
	 * 新增定时任务(默认启动)
	 * @param jobId
	 * @param jobClazz
	 * @param cron
	 * @param startTime
	 */
	public void addJob(String jobId, String jobClazz, String cron, Date startTime) {
		addJob(jobId, jobClazz, cron, startTime, true);
	}
	
	/**
	 * 新增定时任务
	 * @param jobId
	 * @param jobClazz
	 * @param cron
	 * @param startTime
	 * @param startJob
	 */
	@SuppressWarnings({ "rawtypes", "unchecked" })
	public void addJob(String jobId, String jobClazz, String cron, Date startTime, boolean startJob) {
		try {
			Scheduler scheduler = schedulerFactory.getScheduler();
			Class jobClass = Class.forName(jobClazz);
			JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobId, JOB_GROUP).build();// 设置Job的名字和组
			TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
			triggerBuilder.withIdentity(jobId, TRIGGER_GROUP);
			if(startTime == null || new Date().compareTo(startTime) > 0) {
				triggerBuilder.startNow();
			} else {
				triggerBuilder.startAt(startTime);
			}
			triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
			CronTrigger trigger = (CronTrigger) triggerBuilder.build();
			scheduler.scheduleJob(jobDetail, trigger);
			scheduler.getListenerManager().addJobListener(storyJobListener);
			if(!startJob) {
				pauseJob(jobId);
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * 修改一个任务的触发时间
	 * @param jobId
	 * @param cron
	 * @param startTime
	 */
	public void updateJobCron(String jobId, String cron, Date startTime) {
		try {
			Scheduler scheduler = schedulerFactory.getScheduler();
			TriggerKey triggerKey = TriggerKey.triggerKey(jobId, TRIGGER_GROUP);
			CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
			if (trigger == null) {
				return;
			}
			String oldTime = trigger.getCronExpression();
			if (!oldTime.equalsIgnoreCase(cron)) {
				TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
				triggerBuilder.withIdentity(jobId, TRIGGER_GROUP);
				if(startTime == null || new Date().compareTo(startTime) > 0) {
					triggerBuilder.startNow();
				} else {
					triggerBuilder.startAt(startTime);
				}
				triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
				trigger = (CronTrigger) triggerBuilder.build();
				scheduler.rescheduleJob(triggerKey, trigger);
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 移除指定jobId的定时任务
	 * @param jobId
	 */
	public void removeJob(String jobId) {
		try {
			Scheduler scheduler = schedulerFactory.getScheduler();
			TriggerKey triggerKey = TriggerKey.triggerKey(jobId, TRIGGER_GROUP);
			scheduler.pauseTrigger(triggerKey);
			scheduler.unscheduleJob(triggerKey);
			scheduler.deleteJob(JobKey.jobKey(jobId, JOB_GROUP));
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 暂停指定jobId的定时任务
	 * @param jobId
	 */
	public void pauseJob(String jobId) {
		try {
			Scheduler scheduler = schedulerFactory.getScheduler();
			JobKey jobKey = JobKey.jobKey(jobId, JOB_GROUP);  
			scheduler.pauseJob(jobKey);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 恢复指定jobId的定时任务
	 * @param jobId
	 */
	public void resumeJob(String jobId) {
		try {
			Scheduler scheduler = schedulerFactory.getScheduler();
			JobKey jobKey = JobKey.jobKey(jobId, JOB_GROUP);  
			scheduler.resumeJob(jobKey);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
	
	/**
	 * 启动所有定时任务
	 */
	public void startJobs() {
		try {
			Scheduler sched = schedulerFactory.getScheduler();
			sched.start();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * 关闭所有定时任务
	 */
	public void shutdownJobs() {
		try {
			Scheduler sched = schedulerFactory.getScheduler();
			if (!sched.isShutdown()) {
				sched.shutdown();
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
}

定义任务监听器StoryJobListener

@Slf4j
@Service
public class StoryJobListener implements JobListener {
	public static final String LISTENER_NAME = "StoryJobListener";
	
	@Autowired
	private ScheduleJobService scheduleJobService;

	public StoryJobListener(){}

	@Override
	public String getName() {
		return LISTENER_NAME;
	}

	/**
	 * 任务执行前
	 * @param context
	 */
	@Override
	public void jobToBeExecuted(JobExecutionContext context) {
		String jobName = context.getJobDetail().getKey().toString();
		System.out.println("jobToBeExecuted");
		System.out.println("Job : " + jobName + " is going to start...");
	}

	/**
	 * 任务被否决
	 * @param context
	 */
	@Override
	public void jobExecutionVetoed(JobExecutionContext context) {
		System.out.println("jobExecutionVetoed");
	}

	@Override
	public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
		String jobId = context.getJobDetail().getKey().getName();
		String failReason = null;
		if(jobException != null) {
			failReason = ExceptionUtils.getStackMsg(jobException);
		}
		try {
			scheduleJobService.updateRuntimeJob(jobId, context.getFireTime(), context.getPreviousFireTime(), context.getNextFireTime(), failReason);
		} catch (StoryServiceException e) {
			log.error("updateRuntimeJob {} error", jobId, e);
		}
	}
}

这里我们在容器启动完成后,立即启动我们的计划任务。
可以看到,所有的Job都是我们从数据库中检索出的任务,自动添加到执行计划中。

@Component
@Slf4j
public class ScheduleJobStarter implements ApplicationRunner {

    @Autowired
    private StorySchedulerService storySchedulerService;

    @Autowired
    private ScheduleJobService scheduleJobService;

    /**
     * 会在服务启动完成后立即执行
     */
    @Override
    public void run(ApplicationArguments args) {
        log.info("================= 启动所有定时任务 ================");
        
        QueryWrapper<ScheduleJob> query= new QueryWrapper<>();
        query.eq("yn_flag", YNFlagStatusEnum.VALID.getCode());
        query.eq("start_job",true);
        query.orderByAsc("start_time");
		
        List<ScheduleJob> startJobs = scheduleJobService.list(query);
        startJobs.stream().forEach(j -> {
            storySchedulerService.addJob(j.getJobId(), j.getJobClass(), j.getCron(), j.getStartTime());
        });
        storySchedulerService.startJobs();
    }
}

ApplicationRunner 接口是spring boot提供给我们的一个接口,能够在容器启动完成之后,执行我们自定义的任务,例如这里我们启动所有的Job。

自定义一个我们自己的Job抽象父类,实现Job接口

public abstract class BaseJob implements Job {

}

我们第一个具体的Job实现类

@Slf4j
@ScheduleAnnotation(jobId = "StoryDemoJob", jobName = "框架演示Job")
public class StoryDemoJob extends BaseJob {

	@Override
	public void execute(JobExecutionContext context){
		log.info("STORY-ADMIN: 你好!");
	}
}

可以看到这里用到了@ScheduleAnnotation注解,这是一个自定义的注解,用来在我们的任务上标识这是一个Job,后面我们用于扫描所有的任务,在页面上生成下拉选项

@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@Documented
public @interface ScheduleAnnotation {

	/**
	 * 任务编号,全局必须唯一
	 * @return
	 */
	String jobId() default "";

	/**
	 * 任务名称
	 * 
	 * @return
	 */
	String jobName() default "";

	/**
	 * 任务是否废弃
	 * 
	 * @return
	 */
	boolean discard() default false;
}

动态增删变更任务

既然要动态的变更任务,那我们就将任务保存在数据库中,没此项目启动,就不用再次配置。定义我们的PO类,上面的用到的ScheduleJob便是出自这里:

/**
 * 定时任务
 */
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName("st_schedule_job")
public class ScheduleJob extends BaseEntity {
    private static final long serialVersionUID = 1L;

    //JobID
    private String jobId;
    //Job名称
    private String jobName;
    //cron表达式
    private String cron;
    //启动状态
    private Boolean startJob;
    //方法
    private String jobClass;
    //开始时间
    private Date startTime;
    //触发时间
    private Date fireTime;
    //上次触发时间
    private Date lastFireTime;
    //下次触发时间
    private Date nextFireTime;
    //Job描述
    private String jobDesc;
    //失败原因
    private String failReason;
    //有效标志
    private String ynFlag;
    //创建人
    private String creator;
    //修改人
    private String editor;
    //创建时间
    private Date createdTime;
    //修改时间
    private Date modifiedTime;
}

其次就是增删改查了,简单列举了增删改的方法,可以看到在持久化的同时也动态的修改了计划任务。

@Slf4j
@RestController
@RequestMapping("/sysmgr/schedulejob")
public class ScheduleJobController {

    @Autowired
    ScheduleJobService scheduleJobService;

    @Autowired
    private StorySchedulerService storySchedulerService;
	/**
	 * 保存
	 * @param scheduleJob
	 * @return
	 */
	@RequestMapping(value="/save",method = {RequestMethod.POST})
	public Result save(@RequestBody ScheduleJob scheduleJob){
		boolean startJob = scheduleJob.getStartJob() != null ? scheduleJob.getStartJob().booleanValue() : false;

		Date currentDate= DateUtils.currentDate();
		scheduleJob.setEditor(UserContext.getCurrentUser().getAccount());
		scheduleJob.setModifiedTime(currentDate);
		if(scheduleJob.getId()!=null){
			scheduleJobService.updateById(scheduleJob);

			if(startJob) {
				storySchedulerService.updateJobCron(scheduleJob.getJobId(), scheduleJob.getCron(), scheduleJob.getStartTime());
				storySchedulerService.resumeJob(scheduleJob.getJobId());
			} else {
				storySchedulerService.pauseJob(scheduleJob.getJobId());
			}

		}else{
			scheduleJob.setCreator(UserContext.getCurrentUser().getAccount());
			scheduleJob.setCreatedTime(currentDate);
			scheduleJob.setYnFlag(YNFlagStatusEnum.VALID.getCode());
			scheduleJobService.save(scheduleJob);

			storySchedulerService.addJob(scheduleJob.getJobId(), scheduleJob.getJobClass(), scheduleJob.getCron(), scheduleJob.getStartTime(), startJob);
		}
		return new Result(true,null,null, Constants.TOKEN_CHECK_SUCCESS);
	}

	/**
	 * 删除
	 * @param scheduleJob
	 * @return
	 */
	@RequestMapping(value="/delete",method = {RequestMethod.POST})
	public Result dropById(@RequestBody ScheduleJob scheduleJob){
		Result result ;
		if(scheduleJob.getId()!=null){
			ScheduleJob delScheduleJob= new ScheduleJob();
			delScheduleJob.setId(scheduleJob.getId());
			delScheduleJob.setYnFlag("0");
			delScheduleJob.setEditor(UserContext.getCurrentUser().getAccount());
			delScheduleJob.setModifiedTime(Date.from(Instant.now()));
			result=new Result(scheduleJobService.updateById(delScheduleJob),null,null,Constants.TOKEN_CHECK_SUCCESS);

			storySchedulerService.removeJob(scheduleJob.getJobId());
		}else{
			result = new Result(false, "", null ,Constants.PARAMETERS_MISSING);
		}
		return result;
	}

	/**
     * 获取下拉选项
     * @return
     */
	@RequestMapping(value="/job-options",method = {RequestMethod.POST,RequestMethod.GET})
	public Result getJobCombo() {
		List<ScheduleJob> jobList= this.findScheduleJobCombo();
		return new Result(true, "", jobList ,Constants.TOKEN_CHECK_SUCCESS);
	}
	
	private List<ScheduleJob> findScheduleJobCombo() {
        List<ScheduleJob> vos = Lists.newArrayList();
        List<Class<?>> clsList = AnnotationUtils.getClasses("com.story.storyadmin.scheduler");
        if(CollectionUtils.isNotEmpty(clsList)) {
            clsList.stream().forEach(c -> {
                Annotation[] annotations = c.getAnnotations();
                if (annotations != null && annotations.length > 0) {
                    if (annotations[0] instanceof ScheduleAnnotation) {
                        ScheduleAnnotation annotation = (ScheduleAnnotation) annotations[0];
                        ScheduleJob vo = new ScheduleJob();
                        vo.setJobId(annotation.jobId());
                        vo.setJobName(annotation.jobName());
                        vo.setJobClass(c.getName());
                        vos.add(vo);
                    }
                }
            });
        }
        return vos;
    }
}

关于任务的持久化,这里省略了一些方法,当然核心的东西,我们已经实现了。

任务列表
任务列表

发表评论

电子邮件地址不会被公开。 必填项已用*标注