增加支持SimpleTrigger任务

This commit is contained in:
justlive1 2018-06-21 19:35:14 +08:00
parent 714117e925
commit 3c06d81432
8 changed files with 285 additions and 74 deletions

View File

@ -3,3 +3,4 @@
- 调度记录增加执行时间
- 任务超时报警机制
- 支持分片任务
- 支持SimpleTrigger任务

View File

@ -44,6 +44,21 @@ public class JobInfo {
RETRY;
}
public enum MODE {
/**
* 简单任务
*/
SIMPLE,
/**
* 延时任务
*/
DELAY,
/**
* cron表达式任务
*/
CRON;
}
/**
* 编号
*/
@ -59,11 +74,6 @@ public class JobInfo {
*/
private JobGroup group;
/**
* 定时表达式
*/
private String cron;
/**
* 任务状态
*/
@ -119,4 +129,26 @@ public class JobInfo {
*/
private Integer sharding;
/**
* 任务模式
*/
private String mode;
/**
* 简单任务的时间戳
*/
private Long timestamp;
/**
* 延时任务延时
*/
private Long initDelay;
private Long delay;
/**
* 定时表达式
*/
private String cron;
}

View File

@ -189,6 +189,7 @@ public class DemoScriptJob implements IJob {
$scope.modalDatas = { opt: 1};
$scope.modalDatas.type = 'BEAN';
$scope.modalDatas.failStrategy = 'NOTIFY';
$scope.modalDatas.mode = 'CRON';
$scope.modalDatas.childrenJobs = [];
$scope.modalDatas.extraSettings = $scope.extraSettings;
$scope.modalDatas.translationTexts = $scope.translationTexts;
@ -245,28 +246,34 @@ public class DemoScriptJob implements IJob {
if ($scope.modalDatas.childrenJobs.length > 0) {
childJobIds = $scope.modalDatas.childrenJobs.map(r => r.id);
}
var job = {
name: $scope.modalDatas.name,
cron: $scope.modalDatas.cron,
type: $scope.modalDatas.type,
param: $scope.modalDatas.param,
auto: $scope.modalDatas.auto,
failStrategy: $scope.modalDatas.failStrategy,
notifyMails: mails,
timeout : $scope.modalDatas.timeout,
useSharding : $scope.modalDatas.useSharding,
sharding : $scope.modalDatas.sharding,
childJobIds: childJobIds
};
if($scope.modalDatas.type == 'SCRIPT'){
job.script = $scope.defaultScript;
var job = {};
angular.copy($scope.modalDatas, job);
job.notifyMails = mails;
job.childJobIds = childJobIds;
if ($scope.modalDatas.mode == 'SIMPLE') {
job.timestamp = new Date($scope.modalDatas.execDate).getTime();
delete job.cron;
delete job.initDelay;
delete job.delay;
} else if ($scope.modalDatas.mode == 'CRON') {
delete job.initDelay;
delete job.delay;
} else if ($scope.modalDatas.mode == 'DELAY') {
var delayArr = $scope.modalDatas.delayStr.split(',');
job.initDelay = delayArr[0];
job.delay = delayArr[1];
delete job.cron;
}
if ($scope.modalDatas.type == 'SCRIPT'){
job.script = $scope.defaultScript;
if ($scope.modalDatas.useExecutor) {
job.group = {
groupKey: $scope.modalDatas.groupKey
};
}
} else {
job.group = {
jobKey: $scope.modalDatas.jobKey,
@ -396,17 +403,17 @@ public class DemoScriptJob implements IJob {
if (data.data.notifyMails) {
mails = data.data.notifyMails.join();
}
$scope.modalDatas.type = data.data.type;
angular.extend($scope.modalDatas, data.data);
$scope.modalDatas.preType = data.data.type;
$scope.modalDatas.cron = data.data.cron;
$scope.modalDatas.name = data.data.name;
$scope.modalDatas.param = data.data.param;
$scope.modalDatas.failStrategy = data.data.failStrategy;
$scope.modalDatas.notifyMails = mails;
$scope.modalDatas.childJobIds = data.data.childJobIds;
$scope.modalDatas.timeout = data.data.timeout;
$scope.modalDatas.useSharding = data.data.useSharding;
$scope.modalDatas.sharding = data.data.sharding;
if (data.data.mode == 'SIMPLE') {
$scope.modalDatas.execDate = new Date(data.data.timestamp);
} else if (data.data.mode == 'DELAY') {
$scope.modalDatas.delayStr = data.data.initDelay + "," + data.data.delay;
}
if (data.data.group) {
$scope.modalDatas.groupKey = data.data.group.groupKey;
$scope.modalDatas.jobs = $scope.modalDatas.executorMap[$scope.modalDatas.groupKey];
@ -464,19 +471,27 @@ public class DemoScriptJob implements IJob {
if ($scope.modalDatas.childrenJobs.length > 0) {
childJobIds = $scope.modalDatas.childrenJobs.map(r => r.id);
}
var job = {
id: id,
name: $scope.modalDatas.name,
cron: $scope.modalDatas.cron,
type: $scope.modalDatas.type,
param: $scope.modalDatas.param,
failStrategy: $scope.modalDatas.failStrategy,
notifyMails: mails,
timeout: $scope.modalDatas.timeout,
useSharding : $scope.modalDatas.useSharding,
sharding : $scope.modalDatas.sharding,
childJobIds: childJobIds
};
var job = {};
angular.copy($scope.modalDatas, job);
job.id = id;
job.notifyMails = mails;
job.childJobIds = childJobIds;
if ($scope.modalDatas.mode == 'SIMPLE') {
job.timestamp = new Date($scope.modalDatas.execDate).getTime();
delete job.cron;
delete job.initDelay;
delete job.delay;
} else if ($scope.modalDatas.mode == 'CRON') {
delete job.initDelay;
delete job.delay;
} else if ($scope.modalDatas.mode == 'DELAY') {
var delayArr = $scope.modalDatas.delayStr.split(',');
job.initDelay = delayArr[0];
job.delay = delayArr[1];
delete job.cron;
}
if (job.type == 'SCRIPT') {
if ($scope.modalDatas.useExecutor) {
job.group = {

View File

@ -58,6 +58,19 @@
</div>
</div>
<div class="control-group inline-block">
<label class="control-label width100"> 任务模式 </label>
<div class="controls">
<div class="input-prepend">
<select class="input-large width248" ng-model="modalDatas.mode">
<option value="CRON">cron任务</option>
<option value="SIMPLE">简单任务</option>
<option value="DELAY">延时任务</option>
</select>
</div>
</div>
</div>
<div class="control-group inline-block"
ng-if="modalDatas.mode == 'CRON'">
<label class="control-label width100"> Cron </label>
<div class="controls">
<div class="input-prepend">
@ -66,6 +79,27 @@
</div>
</div>
</div>
<div class="control-group inline-block"
ng-if="modalDatas.mode == 'SIMPLE'">
<label class="control-label width100"> 执行时间 </label>
<div class="controls">
<div class="input-prepend">
<input placeholder="请选择执行时间" class="input-large width235"
name="execDate" type="datetime-local"
ng-model="modalDatas.execDate" required>
</div>
</div>
</div>
<div class="control-group inline-block"
ng-if="modalDatas.mode == 'DELAY'">
<label class="control-label width100"> 延时 </label>
<div class="controls">
<div class="input-prepend">
<input type="text" ng-model="modalDatas.delayStr" name="delay"
placeholder="格式:初始延时,延时间隔 单位:秒" class="input-large width235" required>
</div>
</div>
</div>
<div class="control-group inline-block">
<label class="control-label width100"> 子任务 </label>
<div class="controls">
@ -121,10 +155,11 @@
<label class="control-label width100"> 分片运行 </label>
<div class="controls">
<div class="input-prepend">
<input type="checkbox" ng-model="modalDatas.useSharding" ng-click="modalDatas.sharding = '';"> <input
type="text" ng-model="modalDatas.sharding"
placeholder="默认分片为执行器个数" class="input-large"
ng-disabled="!modalDatas.useSharding" style="width: 222px;">
<input type="checkbox" ng-model="modalDatas.useSharding"
ng-click="modalDatas.sharding = '';"> <input type="text"
ng-model="modalDatas.sharding" placeholder="默认分片为执行器个数"
class="input-large" ng-disabled="!modalDatas.useSharding"
style="width: 222px;">
</div>
</div>
</div>

View File

@ -14,7 +14,7 @@
<th class="name-column"><span>任务编号</span></th>
<th class="time-column"><span>名称</span></th>
<th class="info-column">分组信息</th>
<th class="cron-column">Cron</th>
<th class="time-column">定时信息</th>
<th class="cron-column">状态</th>
<th class="operation-column">操作</th>
</tr>
@ -24,7 +24,7 @@
<td class="name-column" ng-bind="job.id"></td>
<td class="time-column" ng-bind="job.name"></td>
<td class="info-column">[{{job.group.groupKey?job.group.groupKey:'…'}}]-[{{job.group.jobKey?job.group.jobKey:'…'}}]</td>
<td class="cron-column" ng-bind="job.cron"></td>
<td class="cron-column">{{job.mode == 'SIMPLE'? (job.timestamp | date:'yyyy-MM-dd HH:mm:ss'):(job.cron || (job.initDelay + "," + job.delay))}}</td>
<td class="cron-column"><span class="status-{{job.status}}">{{job.status}}</span></td>
<td class="operation-column">
<button type="button" class="btn" tooltip-placement="bottom"

View File

@ -9,13 +9,40 @@ package vip.justlive.frost.core.job;
public interface JobSchedule {
/**
* 增加job
* 新增任务
*
* @param jobId
* @return
*/
String addJob(String jobId);
/**
* 增加简单任务
*
* @param jobId
* @param timestamp 执行时间点的时间戳
* @return
*/
String addSimpleJob(String jobId, long timestamp);
/**
* 增加延迟任务
*
* @param jobId
* @param initDelay
* @param delay
* @return
*/
String addDelayJob(String jobId, long initDelay, long delay);
/**
* 增加表达式job
*
* @param jobId
* @param cron
* @return
*/
String addJob(String jobId, String cron);
String addCronJob(String jobId, String cron);
/**
* 刷新job
@ -26,6 +53,25 @@ public interface JobSchedule {
*/
String refreshJob(String jobId, String cron);
/**
* 刷新job
*
* @param jobId
* @param timestamp
* @return
*/
String refreshJob(String jobId, Long timestamp);
/**
* 刷新job
*
* @param jobId
* @param initDelay
* @param delay
* @return
*/
String refreshJob(String jobId, Long initDelay, Long delay);
/**
* 暂停job
*

View File

@ -1,6 +1,8 @@
package vip.justlive.frost.executor.redis.job;
import java.time.ZonedDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.redisson.api.CronSchedule;
import org.redisson.api.RList;
@ -64,7 +66,56 @@ public class RedisJobScheduleImpl implements JobSchedule {
}
@Override
public String addJob(String jobId, String cron) {
public String addJob(String jobId) {
JobInfo jobInfo = this.getJobInfo(jobId);
switch (JobInfo.MODE.valueOf(jobInfo.getMode())) {
case SIMPLE:
return this.addSimpleJob(jobId, jobInfo.getTimestamp());
case DELAY:
return this.addDelayJob(jobId, jobInfo.getInitDelay(), jobInfo.getDelay());
case CRON:
return this.addCronJob(jobId, jobInfo.getCron());
default:
return null;
}
}
@Override
public String addSimpleJob(String jobId, long timestamp) {
RScheduledExecutorService service =
redissonClient.getExecutorService(JobProperties.CENTER_PREFIX);
long delay = timestamp - ZonedDateTime.now().toInstant().toEpochMilli();
RScheduledFuture<?> future =
service.scheduleAsync(new JobDispatchWrapper(jobId), delay, TimeUnit.MILLISECONDS);
String taskId = future.getTaskId();
RListMultimap<String, String> listmap = redissonClient.getListMultimap(String
.join(JobProperties.SEPERATOR, JobProperties.EXECUTOR_PREFIX, JobSchedule.class.getName()));
listmap.put(jobId, taskId);
return taskId;
}
@Override
public String addDelayJob(String jobId, long initDelay, long delay) {
RScheduledExecutorService service =
redissonClient.getExecutorService(JobProperties.CENTER_PREFIX);
RScheduledFuture<?> future = service.scheduleAtFixedRateAsync(new JobDispatchWrapper(jobId),
initDelay, delay, TimeUnit.SECONDS);
String taskId = future.getTaskId();
RListMultimap<String, String> listmap = redissonClient.getListMultimap(String
.join(JobProperties.SEPERATOR, JobProperties.EXECUTOR_PREFIX, JobSchedule.class.getName()));
listmap.put(jobId, taskId);
return taskId;
}
@Override
public String addCronJob(String jobId, String cron) {
RScheduledExecutorService service =
redissonClient.getExecutorService(JobProperties.CENTER_PREFIX);
@ -79,12 +130,25 @@ public class RedisJobScheduleImpl implements JobSchedule {
return taskId;
}
@Override
public String refreshJob(String jobId, String cron) {
this.pauseJob(jobId);
return this.addJob(jobId, cron);
return this.addCronJob(jobId, cron);
}
@Override
public String refreshJob(String jobId, Long timestamp) {
this.pauseJob(jobId);
return this.addSimpleJob(jobId, timestamp);
}
@Override
public String refreshJob(String jobId, Long initDelay, Long delay) {
this.pauseJob(jobId);
return this.addDelayJob(jobId, initDelay, delay);
}
@Override
@ -103,10 +167,8 @@ public class RedisJobScheduleImpl implements JobSchedule {
@Override
public String resumeJob(String jobId) {
JobInfo jobInfo = this.getJobInfo(jobId);
pauseJob(jobId);
return this.addJob(jobId, jobInfo.getCron());
return addJob(jobId);
}
@Override

View File

@ -5,6 +5,7 @@ import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.redisson.api.RedissonClient;
import org.redisson.executor.CronExpression;
import org.springframework.beans.factory.annotation.Autowired;
@ -57,7 +58,8 @@ public class RedisJobServiceImpl implements JobService {
@Override
public String addJob(JobInfo jobInfo) {
if (!CronExpression.isValidExpression(jobInfo.getCron())) {
if (Objects.equals(jobInfo.getMode(), JobInfo.MODE.CRON.name())
&& !CronExpression.isValidExpression(jobInfo.getCron())) {
throw Exceptions.fail("300001", "定时表达式格式有误");
}
if (jobInfo.isAuto()) {
@ -67,7 +69,7 @@ public class RedisJobServiceImpl implements JobService {
}
jobRepository.addJob(jobInfo);
if (jobInfo.isAuto()) {
jobSchedule.addJob(jobInfo.getId(), jobInfo.getCron());
jobSchedule.addJob(jobInfo.getId());
}
return jobInfo.getId();
@ -76,7 +78,8 @@ public class RedisJobServiceImpl implements JobService {
@Override
public void updateJob(JobInfo jobInfo) {
if (!CronExpression.isValidExpression(jobInfo.getCron())) {
if (Objects.equals(jobInfo.getMode(), JobInfo.MODE.CRON.name())
&& !CronExpression.isValidExpression(jobInfo.getCron())) {
throw Exceptions.fail("300001", "定时表达式格式有误");
}
@ -90,23 +93,22 @@ public class RedisJobServiceImpl implements JobService {
throw Exceptions.fail("300003", "子任务不能包含本任务");
}
localJobInfo.setCron(jobInfo.getCron());
localJobInfo.setName(jobInfo.getName());
localJobInfo.setGroup(jobInfo.getGroup());
localJobInfo.setParam(jobInfo.getParam());
localJobInfo.setType(jobInfo.getType());
localJobInfo.setScript(jobInfo.getScript());
localJobInfo.setFailStrategy(jobInfo.getFailStrategy());
localJobInfo.setNotifyMails(jobInfo.getNotifyMails());
localJobInfo.setChildJobIds(jobInfo.getChildJobIds());
localJobInfo.setTimeout(jobInfo.getTimeout());
localJobInfo.setUseSharding(jobInfo.isUseSharding());
localJobInfo.setSharding(jobInfo.getSharding());
this.mergeData(localJobInfo, jobInfo);
jobRepository.updateJob(localJobInfo);
if (JobInfo.STATUS.NORMAL.name().equals(localJobInfo.getStatus())) {
jobSchedule.refreshJob(jobInfo.getId(), jobInfo.getCron());
switch (JobInfo.MODE.valueOf(localJobInfo.getMode())) {
case SIMPLE:
jobSchedule.refreshJob(jobInfo.getId(), jobInfo.getTimestamp());
break;
case DELAY:
jobSchedule.refreshJob(jobInfo.getId(), jobInfo.getInitDelay(), jobInfo.getDelay());
break;
case CRON:
jobSchedule.refreshJob(jobInfo.getId(), jobInfo.getCron());
break;
}
}
}
@ -285,4 +287,22 @@ public class RedisJobServiceImpl implements JobService {
return statictisDays;
}
private void mergeData(JobInfo localJobInfo, JobInfo jobInfo) {
localJobInfo.setMode(jobInfo.getMode());
localJobInfo.setTimestamp(jobInfo.getTimestamp());
localJobInfo.setInitDelay(jobInfo.getInitDelay());
localJobInfo.setDelay(jobInfo.getDelay());
localJobInfo.setCron(jobInfo.getCron());
localJobInfo.setName(jobInfo.getName());
localJobInfo.setGroup(jobInfo.getGroup());
localJobInfo.setParam(jobInfo.getParam());
localJobInfo.setType(jobInfo.getType());
localJobInfo.setScript(jobInfo.getScript());
localJobInfo.setFailStrategy(jobInfo.getFailStrategy());
localJobInfo.setNotifyMails(jobInfo.getNotifyMails());
localJobInfo.setChildJobIds(jobInfo.getChildJobIds());
localJobInfo.setTimeout(jobInfo.getTimeout());
localJobInfo.setUseSharding(jobInfo.isUseSharding());
localJobInfo.setSharding(jobInfo.getSharding());
}
}