1. 优化 JobHandlerInvoker 的实现,增加重试机制

2. 去除作业异常,自动暂时作业的机制。
This commit is contained in:
YunaiV
2021-02-18 21:26:59 +08:00
parent a4a13efa4a
commit 3959aadae2
30 changed files with 1019 additions and 1017 deletions

View File

@ -65,6 +65,20 @@ public class QueryWrapperX<T> extends QueryWrapper<T> {
return this;
}
public QueryWrapperX<T> ltIfPresent(String column, Object val) {
if (val != null) {
return (QueryWrapperX<T>) super.lt(column, val);
}
return this;
}
public QueryWrapperX<T> leIfPresent(String column, Object val) {
if (val != null) {
return (QueryWrapperX<T>) super.le(column, val);
}
return this;
}
public QueryWrapperX<T> betweenIfPresent(String column, Object val1, Object val2) {
if (val1 != null && val2 != null) {
return (QueryWrapperX<T>) super.between(column, val1, val2);

View File

@ -7,6 +7,8 @@ public enum JobDataKeyEnum {
JOB_ID,
JOB_HANDLER_NAME,
JOB_HANDLER_PARAM
JOB_HANDLER_PARAM,
JOB_RETRY_COUNT, // 最大重试次数
JOB_RETRY_INTERVAL, // 每次重试间隔
}

View File

@ -1,7 +1,7 @@
package cn.iocoder.dashboard.framework.quartz.core.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.iocoder.dashboard.framework.quartz.core.enums.JobDataKeyEnum;
import cn.iocoder.dashboard.framework.quartz.core.service.JobLogFrameworkService;
import lombok.extern.slf4j.Slf4j;
@ -38,8 +38,11 @@ public class JobHandlerInvoker extends QuartzJobBean {
protected void executeInternal(JobExecutionContext executionContext) throws JobExecutionException {
// 第一步,获得 Job 数据
Long jobId = executionContext.getMergedJobDataMap().getLong(JobDataKeyEnum.JOB_ID.name());
String jobHandlerName = getJobHandlerName(executionContext);
String jobHandlerName = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_NAME.name());
String jobHandlerParam = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_PARAM.name());
int refireCount = executionContext.getRefireCount();
int retryCount = (Integer) executionContext.getMergedJobDataMap().getOrDefault(JobDataKeyEnum.JOB_RETRY_COUNT.name(), 0);
int retryInterval = (Integer) executionContext.getMergedJobDataMap().getOrDefault(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), 0);
// 第二步,执行任务
Long jobLogId = null;
@ -48,7 +51,7 @@ public class JobHandlerInvoker extends QuartzJobBean {
Throwable exception = null;
try {
// 记录 Job 日志(初始)
jobLogId = jobLogFrameworkService.createJobLog(jobId, startTime, jobHandlerName, jobHandlerParam);
jobLogId = jobLogFrameworkService.createJobLog(jobId, startTime, jobHandlerName, jobHandlerParam, refireCount + 1);
// 执行任务
data = this.executeInternal(jobHandlerName, jobHandlerParam);
} catch (Throwable ex) {
@ -58,21 +61,8 @@ public class JobHandlerInvoker extends QuartzJobBean {
// 第三步,记录执行日志
this.updateJobLogResultAsync(jobLogId, startTime, data, exception, executionContext);
// 最终还是抛出异常,用于停止任务
if (exception != null) {
throw new JobExecutionException(exception);
}
}
private static String getJobHandlerName(JobExecutionContext executionContext) {
String jobHandlerName = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_NAME.name());
if (StrUtil.isEmpty(jobHandlerName)) {
log.error("[executeInternal][Job({}) 获取不到正确的 jobHandlerName({})]",
executionContext.getJobDetail().getKey(), jobHandlerName);
throw new IllegalStateException(StrUtil.format("Job({}) 获取不到正确的 jobHandlerName({})",
executionContext.getJobDetail().getKey(), jobHandlerName));
}
return jobHandlerName;
// 第四步,处理有异常的情况
handleException(exception, refireCount, retryCount, retryInterval);
}
private String executeInternal(String jobHandlerName, String jobHandlerParam) throws Exception {
@ -86,18 +76,38 @@ public class JobHandlerInvoker extends QuartzJobBean {
private void updateJobLogResultAsync(Long jobLogId, Date startTime, String data, Throwable exception,
JobExecutionContext executionContext) {
Date endTime = new Date();
// 处理是否成功
boolean success = exception == null;
if (!success) {
data = getRootCauseMessage(exception);
}
// 更新日志
try {
if (data != null) { // 成功
jobLogFrameworkService.updateJobLogSuccessAsync(jobLogId, endTime, (int) diff(endTime, startTime), data);
} else { // 失败
jobLogFrameworkService.updateJobLogErrorAsync(jobLogId, endTime, (int) diff(endTime, startTime),
getRootCauseMessage(exception));
}
jobLogFrameworkService.updateJobLogResultAsync(jobLogId, endTime, (int) diff(endTime, startTime), success, data);
} catch (Exception ex) {
log.error("[executeInternal][Job({}) logId({}) 记录执行日志失败({})]",
executionContext.getJobDetail().getKey(), jobLogId,
data != null ? data : getRootCauseMessage(exception));
log.error("[executeInternal][Job({}) logId({}) 记录执行日志失败({}/{})]",
executionContext.getJobDetail().getKey(), jobLogId, success, data);
}
}
private void handleException(Throwable exception,
int refireCount, int retryCount, int retryInterval) throws JobExecutionException {
// 如果有异常,则进行重试
if (exception == null) {
return;
}
// 情况一:如果到达重试上限,则直接抛出异常即可
if (refireCount >= retryCount) {
throw new JobExecutionException(exception);
}
// 情况二:如果未到达重试上限,则 sleep 一定间隔时间,然后重试
// 这里使用 sleep 来实现,主要还是希望实现比较简单。因为,同一时间,不会存在大量失败的 Job。
if (retryInterval > 0) {
ThreadUtil.sleep(retryInterval);
}
// 第二个参数refireImmediately = true表示立即重试
throw new JobExecutionException(exception, true);
}
}

View File

@ -30,9 +30,12 @@ public class SchedulerManager {
* @param jobHandlerName 任务处理器的名字
* @param jobHandlerParam 任务处理器的参数
* @param cronExpression CRON 表达式
* @param retryCount 重试次数
* @param retryInterval 重试间隔
* @throws SchedulerException 添加异常
*/
public void addJob(Long jobId, String jobHandlerName, String jobHandlerParam, String cronExpression)
public void addJob(Long jobId, String jobHandlerName, String jobHandlerParam, String cronExpression,
Integer retryCount, Integer retryInterval)
throws SchedulerException {
// 创建 JobDetail 对象
JobDetail jobDetail = JobBuilder.newJob(JobHandlerInvoker.class)
@ -40,7 +43,7 @@ public class SchedulerManager {
.usingJobData(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName)
.withIdentity(jobHandlerName).build();
// 创建 Trigger 对象
Trigger trigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression);
Trigger trigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval);
// 新增调度
scheduler.scheduleJob(jobDetail, trigger);
}
@ -51,12 +54,15 @@ public class SchedulerManager {
* @param jobHandlerName 任务处理器的名字
* @param jobHandlerParam 任务处理器的参数
* @param cronExpression CRON 表达式
* @param retryCount 重试次数
* @param retryInterval 重试间隔
* @throws SchedulerException 更新异常
*/
public void updateJob(String jobHandlerName, String jobHandlerParam, String cronExpression)
public void updateJob(String jobHandlerName, String jobHandlerParam, String cronExpression,
Integer retryCount, Integer retryInterval)
throws SchedulerException {
// 创建新 Trigger 对象
Trigger newTrigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression);
Trigger newTrigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval);
// 修改调度
scheduler.rescheduleJob(new TriggerKey(jobHandlerName), newTrigger);
}
@ -102,7 +108,7 @@ public class SchedulerManager {
*/
public void triggerJob(Long jobId, String jobHandlerName, String jobHandlerParam)
throws SchedulerException {
JobDataMap data = new JobDataMap();
JobDataMap data = new JobDataMap(); // 无需重试,所以不设置 retryCount 和 retryInterval
data.put(JobDataKeyEnum.JOB_ID.name(), jobId);
data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName);
data.put(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam);
@ -110,11 +116,14 @@ public class SchedulerManager {
scheduler.triggerJob(new JobKey(jobHandlerName), data);
}
private Trigger buildTrigger(String jobHandlerName, String jobHandlerParam, String cronExpression) {
private Trigger buildTrigger(String jobHandlerName, String jobHandlerParam, String cronExpression,
Integer retryCount, Integer retryInterval) {
return TriggerBuilder.newTrigger()
.withIdentity(jobHandlerName)
.usingJobData(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.usingJobData(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam)
.usingJobData(JobDataKeyEnum.JOB_RETRY_COUNT.name(), retryCount)
.usingJobData(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), retryInterval)
.build();
}

View File

@ -14,41 +14,31 @@ public interface JobLogFrameworkService {
/**
* 创建 Job 日志
*
* @param jobId 任务编号
* @param beginTime 开始时间
* @param jobHandlerName Job 处理器的名字
* @param jobId 任务编号
* @param beginTime 开始时间
* @param jobHandlerName Job 处理器的名字
* @param jobHandlerParam Job 处理器的参数
* @param executeIndex 第几次执行
* @return Job 日志的编号
*/
Long createJobLog(@NotNull(message = "任务编号不能为空") Long jobId,
@NotNull(message = "开始时间") Date beginTime,
@NotEmpty(message = "Job 处理器的名字不能为空") String jobHandlerName,
String jobHandlerParam);
String jobHandlerParam,
@NotNull(message = "第几次执行不能为空") Integer executeIndex);
/**
* 更新 Job 日志成功
* 更新 Job 日志的执行结果
*
* @param logId 日志编号
* @param endTime 结束时间。因为是异步,避免记录时间不准去
* @param logId 日志编号
* @param endTime 结束时间。因为是异步,避免记录时间不准去
* @param duration 运行时长,单位:毫秒
* @param result 成功数据
* @param success 是否成功
* @param result 成功数据
*/
void updateJobLogSuccessAsync(@NotNull(message = "日志编号不能为空") Long logId,
@NotNull(message = "结束时间不能为空") Date endTime,
@NotNull(message = "运行时长不能为空") Integer duration,
String result);
/**
* 更新 Job 日志失败
*
* @param logId 日志编号
* @param endTime 结束时间。因为是异步,避免记录时间不准去
* @param duration 运行时长,单位:毫秒
* @param result 异常提示
*/
void updateJobLogErrorAsync(@NotNull(message = "日志编号不能为空") Long logId,
@NotNull(message = "结束时间不能为空") Date endTime,
@NotNull(message = "运行时长不能为空") Integer duration,
String result);
void updateJobLogResultAsync(@NotNull(message = "日志编号不能为空") Long logId,
@NotNull(message = "结束时间不能为空") Date endTime,
@NotNull(message = "运行时长不能为空") Integer duration,
boolean success, String result);
}