Springboot整合Quartz数据库方式
Springboot整合Quartz数据库方式
引入依赖
<!-- 定时任务模块 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
spring:
quartz:
job-store-type: jdbc #数据库方式
jdbc:
initialize-schema: never #不初始化表结构
properties:
org:
quartz:
scheduler:
instanceId: AUTO #默认主机名和时间戳生成实例ID,可以是任何字符串,但对于所有调度程序来说,必须是唯一的 对应qrtz_scheduler_state INSTANCE_NAME字段
#instanceName: clusteredScheduler #quartzScheduler
jobStore:
class: org.springframework.scheduling.quartz.LocalDataSourceJobStore # springboot>2.5.6后使用这个
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #仅为数据库制作了特定于数据库的代理
useProperties: false #以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。
tablePrefix: QRTZ_ #数据库表前缀
misfireThreshold: 60000 #在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)。
clusterCheckinInterval: 5000 #设置此实例“检入”*与群集的其他实例的频率(以毫秒为单位)。影响检测失败实例的速度。
isClustered: true #打开群集功能
threadPool: #连接池
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
threadPriority: 5
threadsInheritContextClassLoaderOfInitializingThread: true
创建数据库
解压quartz.jar包,sql脚本位置在org/quartz/impl/jdbcjobstore下,我这选择mysql数据库且使用innodb引擎,对应的脚本文件是tables_mysql_innodb.sql,共计11张表,这些表不需要我们做任何操作,是Quartz框架使用的。
集群节点相互之间不通信,而是通过定时任务持久化加锁的方式来实现集群。
1.qrtz_blob_triggers : 以Blob 类型存储的触发器。
2.qrtz_calendars:存放日历信息, quartz可配置一个日历来指定一个时间范围。
3.qrtz_cron_triggers:存放cron类型的触发器。
4.qrtz_fired_triggers:存放已触发的触发器。
5.qrtz_job_details:存放一个jobDetail信息。
6.qrtz_locks: 存储程序的悲观锁的信息(假如使用了悲观锁)。
7.qrtz_paused_trigger_graps:存放暂停掉的触发器。
8.qrtz_scheduler_state:调度器状态。
9.qrtz_simple_triggers:简单触发器的信息。
10.qrtz_trigger_listeners:触发器监听器。
11.qrtz_triggers:触发器的基本信息。
实体
@Data
public class QuartzBean {
private String id;
private String jobName;
private String jobGroup;
private String jobDescription;
private String jobClass;
private String jobStatus;
private Date startTime;
private Integer interval;
private Date endTime;
private String cronExpression;
private JobDataMap jobDataMap;
}
具体Job类
public class ExecuteTask extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("springboot-quartz-cluster-a" + ">>" + sdf.format(new Date()) + ":" + context.getJobDetail().getKey() + "执行中..." + context.getJobDetail().getDescription());
}
}
Service
import com.ibi.common.exception.ServiceException;
import com.ibi.ptd.encyclopedia.core.task.model.QuartzBean;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
public class QuartzJobService {
@Resource
private Scheduler scheduler;
/**
* 调度任务列表
* @return
*/
public List<QuartzBean> getScheduleJobList(){
List<QuartzBean> list = new ArrayList<>();
try {
for(String groupJob: scheduler.getJobGroupNames()){
for(JobKey jobKey: scheduler.getJobKeys(GroupMatcher.<JobKey>groupEquals(groupJob))){
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger: triggers) {
QuartzBean quartzBean = new QuartzBean();
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
String cronExpression = "";
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
cronExpression = cronTrigger.getCronExpression();
quartzBean.setCron(cronExpression);
}
quartzBean.setStartTime(trigger.getStartTime());
quartzBean.setEndTime(trigger.getEndTime());
quartzBean.setJobName(jobKey.getName());
quartzBean.setJobGroup(jobKey.getGroup());
quartzBean.setJobDescription(jobDetail.getDescription());
quartzBean.setJobStatus(triggerState.name());
quartzBean.setJobClass(jobDetail.getJobClass().toGenericString());
quartzBean.setJobDataMap(jobDetail.getJobDataMap());
list.add(quartzBean);
}
}
}
} catch (SchedulerException e) {
e.printStackTrace();
}
return list;
}
/**
* 创建简单调度任务
* @param quartzBean
* @throws Exception
*/
public void createScheduleSimpleJob(QuartzBean quartzBean) {
Class<? extends Job> jobClass = null;
try {
jobClass = (Class<? extends Job>) Class.forName(quartzBean.getJobClass());
} catch (ClassNotFoundException e) {
log.error("createScheduleSimpleJob无执行处理任务的容器:", e);
throw new ServiceException("无执行处理任务的容器");
}
//job
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(quartzBean.getJobName(), quartzBean.getJobGroup())
.setJobData(quartzBean.getJobDataMap())
.withDescription(quartzBean.getJobDescription())
.build();
//trigger
Trigger trigger = null;
//单次还是循环
if (quartzBean.getInterval() == null) {
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),quartzBean.getJobGroup())
.withSchedule(SimpleScheduleBuilder.simpleSchedule())
.startAt(quartzBean.getStartTime())
.build();
} else {
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),quartzBean.getJobGroup())
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(quartzBean.getInterval()))
.startAt(quartzBean.getStartTime())
.endAt(quartzBean.getEndTime())
.build();
}
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.error("createScheduleSimpleJob创建定时任务出错:", e);
throw new ServiceException("创建定时任务出错");
}
}
/**
* 创建cron调度任务
* @param quartzBean
* @throws Exception
*/
public void createScheduleCronJob(QuartzBean quartzBean) {
Class<? extends Job> jobClass = null;
try {
jobClass = (Class<? extends Job>) Class.forName(quartzBean.getJobClass());
} catch (ClassNotFoundException e) {
log.error("createScheduleCronJob无执行处理任务的容器:", e);
throw new ServiceException("无执行处理任务的容器");
}
// job
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(quartzBean.getJobName(),quartzBean.getJobGroup())
.setJobData(quartzBean.getJobDataMap())
.withDescription(quartzBean.getJobDescription())
.build();
// trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),quartzBean.getJobGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(quartzBean.getCron()).withMisfireHandlingInstructionDoNothing())
.build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.error("createScheduleCronJob创建定时任务出错:", e);
throw new ServiceException("创建定时任务出错");
}
}
/**
* 暂停任务
* @param jobName
* @param jobGroup
* @throws Exception
*/
public void pauseScheduleJob(String jobName,String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
try {
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 立即执行任务
* @param jobName
* @param jobGroup
* @throws Exception
*/
public void runJob(String jobName,String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
try {
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
log.error("runJob执行任务出错:", e);
throw new RuntimeException(e);
}
}
/**
* 更新简单任务
* @param quartzBean
* @throws Exception
*/
public void updateScheduleSimpleJob(QuartzBean quartzBean){
//原任务触发器
TriggerKey triggerKey = TriggerKey.triggerKey(quartzBean.getJobName(), quartzBean.getJobGroup());
//trigger
Trigger trigger = null;
//单次还是循环
if (quartzBean.getInterval() == null) {
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),quartzBean.getJobGroup())
.withSchedule(SimpleScheduleBuilder.simpleSchedule())
.startAt(quartzBean.getStartTime())
.build();
} else {
trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(),quartzBean.getJobGroup())
.withSchedule(SimpleScheduleBuilder.repeatMinutelyForever(quartzBean.getInterval()))
.startAt(quartzBean.getStartTime())
.endAt(quartzBean.getEndTime())
.build();
}
//重置对应的job
try {
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
log.error("updateScheduleSimpleJob更新定时任务出错:", e);
throw new RuntimeException(e);
}
}
/**
* 创建or更新任务,存在则更新不存在创建
*
* @param jobClass 任务类
* @param jobName 任务名称
* @param jobGroupName 任务组名称
* @param jobCron cron表达式
*/
public void addOrUpdateJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName,
String jobCron) {
QuartzBean quartzBean = new QuartzBean();
quartzBean.setJobClass("com.ibi.ptd.encyclopedia.core.task.execute.ExecuteTask");
quartzBean.setJobName(jobName);
quartzBean.setJobGroup(jobGroupName);
quartzBean.setJobDescription(jobName);
quartzBean.setCron(jobCron);
JobDataMap map = new JobDataMap();
// map.put("id", "1001");
quartzBean.setJobDataMap(map);
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
createScheduleCronJob(quartzBean);
} else {
if (trigger.getCronExpression().equals(jobCron)) {
return;
}
updateScheduleCronJob(quartzBean);
}
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 更新定时任务Cron
* @param quartzBean 定时任务信息类
* @throws SchedulerException
*/
public void updateScheduleCronJob(QuartzBean quartzBean) {
//原任务触发器
TriggerKey triggerKey = TriggerKey.triggerKey(quartzBean.getJobName(), quartzBean.getJobGroup());
// trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(quartzBean.getJobName(), quartzBean.getJobGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(quartzBean.getCron()))
.build();
//重置对应的job
try {
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
log.error("updateScheduleCronJob更新定时任务出错:", e);
throw new RuntimeException(e);
}
}
/**
* 删除定时任务
* @param jobName
* @param jobGroup
* @throws Exception
*/
public void deleteScheduleJob(String jobName,String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
try {
scheduler.deleteJob(jobKey);
} catch (SchedulerException e) {
log.error("deleteScheduleJob删除定时任务出错:", e);
throw new RuntimeException(e);
}
}
/**
* 获取任务状态
* (" BLOCKED ", " 阻塞 ");
* ("COMPLETE", "完成");
* ("ERROR", "出错");
* ("NONE", "不存在");
* ("NORMAL", "正常");
* ("PAUSED", "暂停");
*/
public String getScheduleJobStatus(String jobName,String jobGroup){
TriggerKey triggerKey = TriggerKey.triggerKey(jobName,jobGroup);
Trigger.TriggerState state = null;
try {
state = scheduler.getTriggerState(triggerKey);
} catch (SchedulerException e) {
log.error("getScheduleJobStatus获取任务状态出错:", e);
throw new RuntimeException(e);
}
return state.name();
}
/**
* 检查任务是否存在
* @param jobName
* @param jobGroup
* @return
* @throws Exception
*/
public Boolean checkExistsScheduleJob(String jobName,String jobGroup) {
JobKey jobKey = JobKey.jobKey(jobName,jobGroup);
try {
return scheduler.checkExists(jobKey);
} catch (SchedulerException e) {
log.error("checkExistsScheduleJob检查任务是否存在出错:", e);
throw new RuntimeException(e);
}
}
}
Controller
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ibi.common.result.R;
import com.ibi.common.utils.StringUtils;
import com.ibi.ptd.encyclopedia.common.enums.EnumStatus;
import com.ibi.ptd.encyclopedia.core.domain.entity.EncyModelConfig;
import com.ibi.ptd.encyclopedia.core.service.IEncyModelConfigService;
import com.ibi.ptd.encyclopedia.core.task.execute.ExecuteTask;
import com.ibi.ptd.encyclopedia.core.task.model.QuartzBean;
import org.quartz.JobDataMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
@RestController
@RequestMapping("/task")
public class QuartzJobController {
@Autowired
private IEncyModelConfigService iEncyModelConfigService;
@Autowired
private QuartzJobService quartzJobService;
@GetMapping("get")
public R<List<QuartzBean>> getQuartzList() {
return R.ok(quartzJobService.getScheduleJobList());
}
@GetMapping("createSimpleJob")
public R<String> createSimpleJob(String jobName, String jobGroup, String jobDescription) {
QuartzBean quartzBean = new QuartzBean();
quartzBean.setJobClass(ExecuteTask.class.getName());
quartzBean.setJobName(jobName);
// init 用的 null
quartzBean.setJobGroup(jobGroup);
quartzBean.setJobDescription(jobDescription);
JobDataMap map = new JobDataMap();
map.put("serviceId", "1001");
quartzBean.setJobDataMap(map);
// 保存10s中后开始,50s后结束,每隔3秒钟一次
Calendar newTimeStart = Calendar.getInstance();
newTimeStart.setTime(new Date());
newTimeStart.add(Calendar.SECOND, 10);
quartzBean.setStartTime(newTimeStart.getTime());
Calendar newTimeEnd = Calendar.getInstance();
newTimeEnd.setTime(new Date());
newTimeEnd.add(Calendar.SECOND, 50);
quartzBean.setStartTime(newTimeEnd.getTime());
quartzBean.setInterval(3);
quartzJobService.createScheduleSimpleJob(quartzBean);
return R.ok("SUCCESS");
}
@GetMapping("/createCronJob")
public R<String> createCronJob(String jobName, String jobGroup, String jobDescription) {
QuartzBean quartzBean = new QuartzBean();
quartzBean.setJobClass(ExecuteTask.class.getName());
quartzBean.setJobName(jobName);
quartzBean.setJobGroup(jobGroup);
quartzBean.setJobDescription(jobDescription);
quartzBean.setCron("*/10 * * * * ?");
JobDataMap map = new JobDataMap();
map.put("serviceId", "1001");
quartzBean.setJobDataMap(map);
quartzJobService.createScheduleCronJob(quartzBean);
return R.ok("SUCCESS");
}
@GetMapping("/initSchedule")
public R<Void> initSchedule() {
LambdaQueryWrapper<EncyModelConfig> lqw = Wrappers.lambdaQuery();
lqw.eq(EncyModelConfig::getAutoUpdate, 1);
lqw.eq(EncyModelConfig::getStatus, EnumStatus.RUN.getIndex());
lqw.isNotNull(EncyModelConfig::getTaskCron);
// 从表里获取有效的 cron 并启动
List<EncyModelConfig> modelConfigs = iEncyModelConfigService.list(lqw);
for (EncyModelConfig config : modelConfigs) {
if (StringUtils.isBlank(config.getTaskCron())) {
continue;
}
QuartzBean quartzBean = new QuartzBean();
// 定时任务处理的类
quartzBean.setJobClass(ExecuteTask.class.getName());
quartzBean.setJobName(config.getId().toString());
quartzBean.setJobGroup(null);
quartzBean.setJobDescription(config.getName());
quartzBean.setCron(config.getTaskCron());
JobDataMap map = new JobDataMap();
map.putAll(BeanUtil.beanToMap(config));
quartzBean.setJobDataMap(map);
quartzJobService.createScheduleCronJob(quartzBean);
}
return R.ok();
}
@GetMapping(value = "/delete")
public R<String> delete(String jobName, String jobGroup) {
quartzJobService.deleteScheduleJob(jobName, jobGroup);
return R.ok("SUCCESS");
}
@GetMapping(value = "check")
public R<String> check(String jobName, String jobGroup) {
if (quartzJobService.checkExistsScheduleJob(jobName, jobGroup)) {
return R.ok("存在定时任务:" + jobName);
} else {
return R.fail("不存在定时任务:" + jobName);
}
}
}
使用效果
启动多个服务
QRTZ_SCHEDULER_STATE
表里有 2 条数据,每个 Scheduler 实例都在 QRTZ_SCHEDULER_STATE
表里有自己的唯一ID,例如以 hostname+time 标识;
添加任务
调用 /createCronJob
接口,创建一个 Cron 任务
QRTZ_CRON_TRIGGERS
表里会添加一条数据
会看到有一个服务持续执行任务,停掉一个切换到另一服务,都停掉重新启动一个会从数据库中拉起任务执行。由此可知:
Quartz
定时任务集群服务调度,并不像其他中间件一样进行集群中不同实例之间的负载均衡,而是在其中一台上面持续执行;- 一旦监测到有服务实例停止运行后,任务会切换至正常运行的服务实例上继续执行,这就实现了使得无论集群中有多少应用实例,定时任务只会触发一次这一效果,同时我们拥有了一个具备故障转移的分布式任务调度集群。
删除任务
解决报错
org.quartz.SchedulerConfigException: DataSource name not set.
原因分析: spring-boot-starter-quartz 2.5.6 之前使用org.quartz.impl.jdbcjobstore.JobStoreTX定义quartz的默认数据源支持,即如下配置:
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
解决方案:升级 spring-boot-starter-quartz > 2.5.6 的版本后将不再支持此方式进行配置默认数据源,需改为如下配置:
org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
参考文章: