任务服务 - 969251639/study GitHub Wiki

这里简单的用Quartz实现了任务的立即执行,定时执行,Cron表达式自定义执行的三种操作

public class SchedulerUtils {
	private static Logger logger = LoggerFactory.getLogger(SchedulerUtils.class);
	private static SchedulerFactory sf = new StdSchedulerFactory();	
	private SchedulerUtils(){}
	
	public static Scheduler getScheduler() throws SchedulerException {
		Scheduler scheduler = sf.getScheduler();
		return scheduler;
	}
	
	public static void startUp() throws SchedulerException {
		Scheduler scheduler = sf.getScheduler();
		if(scheduler.isShutdown() || !scheduler.isStarted()) {
			scheduler.start();
		}
	}
	
	/**
	 * 添加立即执行任务
	 * @throws SchedulerException 
	 */
	public static void addNowJob(JobEntity jobEntity) throws SchedulerException {
		// 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例
		JobDetail jobDetail = JobBuilder.newJob(ExecuteTimeJob.class)
				.withIdentity(jobEntity.getName(), jobEntity.getGroup()).build();
		jobDetail.getJobDataMap().put(TaskConstant.JOB_TIME_KEY, jobEntity);
		SimpleScheduleBuilder ssb = SimpleScheduleBuilder.simpleSchedule();
		// 描叙触发Job执行的时间触发规则,Trigger实例化一个触发器
		Trigger trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器
			.withIdentity(jobEntity.getName(), jobEntity.getGroup())
			.startNow()
			//.startAt(DateBuilder.evenMinuteDate(new Date()))
			.withSchedule(ssb).build();// 产生触发器
		// 向Scheduler添加一个job和trigger
		getScheduler().scheduleJob(jobDetail, trigger);
		logger.info("添加立即执行任务成功, param: " + jobEntity);
	}
	
	/**
	 * 添加定时任务
	 */
	public static void addTimeJob(JobEntity jobEntity) throws SchedulerException {
		// 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例
		JobDetail jobDetail = JobBuilder.newJob(ExecuteTimeJob.class)
				.withIdentity(jobEntity.getName(), jobEntity.getGroup()).build();
		jobDetail.getJobDataMap().put(TaskConstant.JOB_TIME_KEY, jobEntity);
		SimpleScheduleBuilder ssb = SimpleScheduleBuilder.simpleSchedule();
		Trigger trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器
			.withIdentity(jobEntity.getName(), jobEntity.getGroup())// 给触发器一个名字和组名
			//.startNow()//立即执行
			.startAt(jobEntity.getTriggerTime())// 设置触发开始的时间
			.withSchedule(ssb).build();// 产生触发器
		// 向Scheduler添加一个job和trigger
		getScheduler().scheduleJob(jobDetail, trigger);
		logger.info("添加定时任务成功, param: " + jobEntity);
	}
	
	/**
	 * 添加Cron任务
	 * @throws SchedulerException 
	 */
	public static void addCronJob(JobEntity jobEntity) throws SchedulerException {
		// 用于描叙Job实现类及其他的一些静态信息,构建一个作业实例
		JobDetail jobDetail = JobBuilder.newJob(ExecuteTimeJob.class)
				.withIdentity(jobEntity.getName(), jobEntity.getGroup()).build();
		jobDetail.getJobDataMap().put(TaskConstant.JOB_TIME_KEY, jobEntity);
		CronScheduleBuilder csb = CronScheduleBuilder.cronSchedule(jobEntity.getCron());
		// 描叙触发Job执行的时间触发规则,Trigger实例化一个触发器
		Trigger trigger = TriggerBuilder.newTrigger()// 创建一个新的TriggerBuilder来规范一个触发器
			.withIdentity(jobEntity.getName(), jobEntity.getGroup())
			.withSchedule(csb).build();// 产生触发器
		// 向Scheduler添加一个job和trigger
		getScheduler().scheduleJob(jobDetail, trigger);
		logger.info("添加Cron任务成功, param: " + jobEntity);
	}
	
	/**
	 * 取消任务
	 */
	public static void removeJob(JobEntity jobEntity) throws SchedulerException {  
        Scheduler sched = getScheduler();  
        TriggerKey triggerKey = TriggerKey.triggerKey(jobEntity.getName(), jobEntity.getGroup());
        sched.pauseTrigger(triggerKey);// 停止触发器  
        sched.unscheduleJob(triggerKey);// 移除触发器  
        JobKey jobKey = JobKey.jobKey(jobEntity.getName(), jobEntity.getGroup());
        sched.deleteJob(jobKey);// 删除任务  
        logger.info("取消任务成功, param: " + jobEntity);
    }
}

新增两张表记录任务的记录和结果

CREATE TABLE `task_job` (
  `task_id` int NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `task_type` int(11) NOT NULL COMMENT '任务类型',
  `name` varchar(50) NOT NULL COMMENT '任务名称',
  `status` varchar(2) NOT NULL COMMENT '状态(0.待执行,1.已执行)',
  `remark` varchar(50) DEFAULT NULL COMMENT '备注说明',
  `schedule_type` varchar(1) NOT NULL COMMENT '0: 定时器,1: cron表达式,2: 立即执行',
  `execute_time` datetime DEFAULT NULL COMMENT '调度器真正执行的时间',
  `schedule_time` varchar(50) NOT NULL COMMENT '调度时间,cron表达式或者时间字符, -1为立即执行',
  `execute_content` varchar(1000) NOT NULL COMMENT '执行内容',
  `create_time` datetime NOT NULL COMMENT '创建日期',
  `task_data` varchar(1000) DEFAULT NULL COMMENT '任务数据',
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务信息表';

CREATE TABLE `task_result` (
  `result_id` varchar(32) NOT NULL COMMENT '主键ID',
  `task_id` varchar(32) NOT NULL COMMENT '任务主键',
  `STATUS` int(11) NOT NULL COMMENT '执行结果,0.待执行,1.成功,2.失败,3.取消',
  `result_content` varchar(500) DEFAULT NULL COMMENT '结果说明',
  `execute_time` datetime DEFAULT NULL COMMENT '执行时间',
  PRIMARY KEY (`result_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='定时任务执行结果表'

项目启动时需要扫描上面的task_job表,将上次未执行(未到执行时间或者到了执行时间但没有执行)加入到定时调度器中

@Configuration
public class TaskApplicationStartup implements ApplicationRunner {
	private static Logger logger = LoggerFactory.getLogger(TaskApplicationStartup.class);
	
	@Autowired
	private TaskJobService taskJobService;
	
	public void run(ApplicationArguments args) throws Exception {
		SchedulerUtils.startUp();
		//查询未执行的job
		TaskJobVo taskJobVo = new TaskJobVo();
		taskJobVo.setStatus(TaskStatus.TO_BE_EXECUTED);
		List<TaskJobVo> list = taskJobService.queryList(taskJobVo);
		for(TaskJobVo t : list) {
			try {
				String scheduleType = t.getScheduleType();
				if(TaskConstant.NOW_SCHEDULE.equals(scheduleType)) {
					JobDetail jobDetail = SchedulerUtils.getScheduler().getJobDetail(JobKey.jobKey(t.getTaskId(), TaskConstant.NOW_SCHEDULE));
					if(jobDetail == null) {
						JobEntity jobEntity = genJobEntity(t, TaskConstant.NOW_GROUP, TaskConstant.NOW_SCHEDULE, null, null);
						SchedulerUtils.addNowJob(jobEntity);
					}
				}else if(TaskConstant.TIME_SCHEDULE.equals(scheduleType)) {
					JobDetail jobDetail = SchedulerUtils.getScheduler().getJobDetail(JobKey.jobKey(t.getTaskId(), TaskConstant.TIME_SCHEDULE));
					if(jobDetail == null) {
						//如果执行的时间小于当前时间,则略过,打印
						Date scheduleDate = DateUtils.strToDate(t.getScheduleTime(), FormatEnum.YYYYMMDDHMS.getDesc());
						Date currentDate = DateUtils.getNowTime();
						long minute = DateUtils.compareDateOnMinute(scheduleDate, currentDate);
						JobEntity jobEntity = genJobEntity(t, TaskConstant.TIME_GROUP, TaskConstant.TIME_SCHEDULE, scheduleDate, null);
						if(minute > 0) {
							SchedulerUtils.addTimeJob(jobEntity);
						}else {
							logger.warn("未执行的定时器的执行时间小于当前系统时间, param: " + jobEntity);
						}
					}
				}else if(TaskConstant.CRON_SCHEDULE.equals(scheduleType)) {
					JobDetail jobDetail = SchedulerUtils.getScheduler().getJobDetail(JobKey.jobKey(t.getTaskId(), TaskConstant.CRON_SCHEDULE));
					if(jobDetail == null) {
						JobEntity jobEntity = genJobEntity(t, TaskConstant.CRON_GROUP, TaskConstant.CRON_SCHEDULE, null, t.getScheduleTime());
						SchedulerUtils.addCronJob(jobEntity);
					}
				}


			}catch (SchedulerException e) {
				logger.error("job提交到调度器异常, param: " + t, e);
			}
		}
		//如果是定时任务,则重新加进
		TaskJobVo cornTaskJobVo = new TaskJobVo();
		cornTaskJobVo.setStatus(TaskStatus.EXECUTED);
		cornTaskJobVo.setScheduleType(TaskConstant.CRON_SCHEDULE);
		List<TaskJobVo> cornTaskJobVoList = taskJobService.queryList(cornTaskJobVo);
		for(TaskJobVo ct : cornTaskJobVoList) {
			JobDetail jobDetail = SchedulerUtils.getScheduler().getJobDetail(JobKey.jobKey(ct.getTaskId(), TaskConstant.CRON_SCHEDULE));
			if(jobDetail == null) {
				JobEntity jobEntity = genJobEntity(ct, TaskConstant.CRON_GROUP, TaskConstant.CRON_SCHEDULE, null, ct.getScheduleTime());
				SchedulerUtils.addCronJob(jobEntity);
			}
		}
	}
	
	private JobEntity genJobEntity(TaskJobVo t, String group, String scheduleType, Date triggerTime, String cron) {
		JobEntity jobEntity = new JobEntity();
		TaskMsg taskMsg = JsonUtils.fromJson(t.getTaskData(), TaskMsg.class);
		jobEntity.setTaskMsg(taskMsg);
		jobEntity.setName(t.getTaskId());
		jobEntity.setTaskId(t.getTaskId());
		jobEntity.setExecuteType(t.getExecuteType());
		jobEntity.setGroup(group);
		jobEntity.setTriggerTime(triggerTime);
		jobEntity.setCron(cron);
		if(TaskType.WECHAT.intValue() == t.getTaskType().intValue()) {
			jobEntity.setExecuteType(TaskConstant.WECHAT_EXECUTE_TYPE);
		}else if(TaskType.SMS.intValue() == t.getTaskType().intValue()) {
			jobEntity.setExecuteType(TaskConstant.SMS_EXECUTE_TYPE);
		}else if(TaskType.PHONE.intValue() == t.getTaskType().intValue()) {
			jobEntity.setExecuteType(TaskConstant.PHONE_EXECUTE_TYPE);
		}else if(TaskType.ChoseQ.intValue() == t.getTaskType().intValue()){
			jobEntity.setExecuteType(TaskConstant.CHOSEQ_EXECUTE_TYPE);
		}else if(TaskType.DrawPrize.intValue() == t.getTaskType().intValue()){
			jobEntity.setExecuteType(TaskConstant.DRAW_PRIZE_TYPE);
		}
		return jobEntity;
	}
}

调度器实现

public class ExecuteTimeJob implements Job {
	private static Logger logger = LoggerFactory.getLogger(ExecuteTimeJob.class);
	
	@Override
	public void execute(JobExecutionContext context)
			throws JobExecutionException {
		JobEntity jobEntity = (JobEntity)
				context.getJobDetail().getJobDataMap().get(TaskConstant.JOB_TIME_KEY);
		TaskMsg taskMsg = jobEntity.getTaskMsg();
		
		//回调业务逻辑
		JobExecutor jobExecutor = JobExecutorFactory.createJobExecutor(jobEntity);
		if(jobExecutor != null) {
			ExecuteResult executeResult = jobExecutor.execute(jobEntity);
			try {
				TaskJobService taskJobService = SpringUtil.getBean(BeanNameConstants.TASK_JOB_SERVICE, TaskJobService.class);
				TaskUpdateStatus taskUpdateStatus = new TaskUpdateStatus();
				taskUpdateStatus.setTaskId(jobEntity.getTaskId());
				taskUpdateStatus.setResultContent(executeResult.getMsg());
				taskUpdateStatus.setTaskJobStatus(TaskStatus.EXECUTED);
				if(executeResult.isSuccess()) {
					taskMsg.setStatus(Integer.parseInt(TaskStatus.EXECUTE_SUCCESS));
					taskUpdateStatus.setTaskResultStatus(TaskStatus.EXECUTE_SUCCESS);
					taskJobService.updateStatus(taskUpdateStatus);
				}else {
					taskUpdateStatus.setTaskResultStatus(TaskStatus.EXECUTE_FAIL);
					taskMsg.setStatus(Integer.parseInt(TaskStatus.EXECUTE_FAIL));
					taskJobService.updateStatus(taskUpdateStatus);
				}
				Productor product = SpringUtil.getBean(BeanNameConstants.PRODUCTOR, Productor.class);
				product.sendMessage(JmsDestination.getDestination(DestinationConstants.CUSTOMER_EDIT_TASK_SAAS),
						JsonUtils.toJson(taskMsg));
			} catch (Exception e) {
				logger.error("更新任务状态失败", e);
			}
		}else {
			taskMsg.setStatus(Integer.parseInt(TaskStatus.EXECUTE_FAIL));
			Productor product = SpringUtil.getBean(BeanNameConstants.PRODUCTOR, Productor.class);
			product.sendMessage(JmsDestination.getDestination(DestinationConstants.CUSTOMER_EDIT_TASK_SAAS),
					taskMsg.toString());
		}
	}
}

任务执行器

public interface JobExecutor {
	public ExecuteResult execute(JobEntity jobEntity );
}
public class JobExecutorFactory {
	private static Logger logger = LoggerFactory.getLogger(JobExecutorFactory.class);
	
	public static JobExecutor createJobExecutor(JobEntity jobEntity) {  
		String className = TaskConstant.JOB_EXECUTOR_PACKAGE + jobEntity.getExecuteType() + TaskConstant.JOB_EXECUTOR;
        try {  
        	JobExecutor jobExecutor = (JobExecutor)Class.forName(className).newInstance();
        	return jobExecutor;
        } catch (Exception e) {
        	logger.error("获取JobExecutor异常,param: " + jobEntity, e);
        }  
        return null;
    }  
}
public class ExecuteResult {
	private boolean success;
	private String msg;
	
	public boolean isSuccess() {
		return success;
	}
	public void setSuccess(boolean success) {
		this.success = success;
	}
	public String getMsg() {
		return msg;
	}
	public void setMsg(String msg) {
		this.msg = msg;
	}
	
}

public class AJobExecutor implements JobExecutor {

	@Override
	public ExecuteResult execute(JobEntity jobEntity) {
		ExecuteResult executeResult = new ExecuteResult();
		try {
			...
		} catch (Exception e) {
			executeResult.setSuccess(false);
			return executeResult;
		}
		executeResult.setSuccess(true);
		return executeResult;
	}
}

public class BJobExecutor implements JobExecutor {

	@Override
	public ExecuteResult execute(JobEntity jobEntity) {
		ExecuteResult executeResult = new ExecuteResult();
		try {
			...
		} catch (Exception e) {
			executeResult.setSuccess(false);
			return executeResult;
		}
		executeResult.setSuccess(true);
		return executeResult;
	}
}

后期采用Elastic-job更佳

⚠️ **GitHub.com Fallback** ⚠️