增加支持分片任务

This commit is contained in:
justlive1 2018-06-07 21:55:42 +08:00
parent 8314f36618
commit 5fb2b3d90a
15 changed files with 137 additions and 12 deletions

View File

@ -1,4 +1,5 @@
[features]
- 支持钉钉预警通知
- 调度记录增加执行时间
- 任务超时报警机制
- 任务超时报警机制
- 支持分片任务

View File

@ -47,4 +47,9 @@ public class JobExecuteParam {
*/
private String parentLoggerId;
/**
* 分片参数
*/
private JobSharding sharding;
}

View File

@ -117,6 +117,6 @@ public class JobInfo {
/**
* 分片
*/
private JobSharding sharding;
private Integer sharding;
}

View File

@ -1,6 +1,8 @@
package vip.justlive.frost.api.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 分片
@ -9,6 +11,8 @@ import lombok.Data;
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobSharding {
/**

View File

@ -254,6 +254,8 @@ public class DemoScriptJob implements IJob {
failStrategy: $scope.modalDatas.failStrategy,
notifyMails: mails,
timeout : $scope.modalDatas.timeout,
useSharding : $scope.modalDatas.useSharding,
sharding : $scope.modalDatas.sharding,
childJobIds: childJobIds
};
if($scope.modalDatas.type == 'SCRIPT'){
@ -403,6 +405,8 @@ public class DemoScriptJob implements IJob {
$scope.modalDatas.notifyMails = mails;
$scope.modalDatas.childJobIds = data.data.childJobIds;
$scope.modalDatas.timeout = data.data.timeout;
$scope.modalDatas.useSharding = data.data.useSharding;
$scope.modalDatas.sharding = data.data.sharding;
if (data.data.group) {
$scope.modalDatas.groupKey = data.data.group.groupKey;
$scope.modalDatas.jobs = $scope.modalDatas.executorMap[$scope.modalDatas.groupKey];
@ -469,6 +473,8 @@ public class DemoScriptJob implements IJob {
failStrategy: $scope.modalDatas.failStrategy,
notifyMails: mails,
timeout: $scope.modalDatas.timeout,
useSharding : $scope.modalDatas.useSharding,
sharding : $scope.modalDatas.sharding,
childJobIds: childJobIds
};
if (job.type == 'SCRIPT') {

View File

@ -117,6 +117,18 @@
</div>
</div>
<div class="control-group inline-block">
<label class="control-label width100"> 分片运行 </label>
<div class="controls">
<div class="input-prepend">
<input type="checkbox" ng-model="modalDatas.useSharding" ng-click="modalDatas.sharding = '';"> <input
type="text" ng-model="modalDatas.sharding"
placeholder="默认分片为执行器个数" class="input-large"
ng-disabled="!modalDatas.useSharding" style="width: 222px;">
</div>
</div>
</div>
<div class="control-group inline-block" ng-if="modalDatas.opt == 1">
<label class="control-label width100"> 自动运行 </label>
<div class="controls">

View File

@ -13,15 +13,22 @@ public interface Dispatcher {
/**
* 分发Job失败会抛出运行时异常
*
* @param param
* @param param 运行参数
*/
void dispatch(JobExecuteParam param);
/**
* 校验是否抛出异常失败会抛出运行时异常
*
* @param param
* @return
* @param param 运行参数
*/
void checkDispatch(JobExecuteParam param);
/**
* 获取执行器数量
*
* @param param 运行参数
* @return 执行器数
*/
int count(JobExecuteParam param);
}

View File

@ -64,7 +64,12 @@ public abstract class AbstractJobExecuteWrapper extends AbstractWrapper {
long end = ZonedDateTime.now().toInstant().toEpochMilli();
JobRepository jobRepository = SpringBeansHolder.getBean(JobRepository.class);
jobRecordStatus.setStatus(JobExecuteRecord.STATUS.SUCCESS.name());
jobRecordStatus.setMsg(String.format("执行成功 [%s]", address()));
String msg = String.format("执行成功 [%s]", address());
if (jobExecuteParam.getSharding() != null) {
msg += String.format("[%s of %s]", jobExecuteParam.getSharding().getIndex(),
jobExecuteParam.getSharding().getTotal());
}
jobRecordStatus.setMsg(msg);
jobRecordStatus.setDuration(end - jobRecordStatus.getTime().getTime());
jobRepository.addJobRecordStatus(jobRecordStatus);
// 触发任务执行成功事件
@ -83,7 +88,14 @@ public abstract class AbstractJobExecuteWrapper extends AbstractWrapper {
} else {
cause = e.getMessage();
}
jobRecordStatus.setMsg(String.format("执行失败 [%s] [%s]", address(), cause));
if (jobExecuteParam.getSharding() != null) {
jobRecordStatus.setMsg(String.format("执行失败 [%s] [%s of %s] [%s]", address(),
jobExecuteParam.getSharding().getIndex(), jobExecuteParam.getSharding().getTotal(),
cause));
} else {
jobRecordStatus.setMsg(String.format("执行失败 [%s] [%s]", address(), cause));
}
jobRecordStatus.setDuration(
ZonedDateTime.now().toInstant().toEpochMilli() - jobRecordStatus.getTime().getTime());
JobRepository jobRepository = SpringBeansHolder.getBean(JobRepository.class);

View File

@ -2,7 +2,9 @@ package vip.justlive.frost.core.job;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import vip.justlive.frost.api.model.JobExecuteParam;
import vip.justlive.frost.api.model.JobInfo;
import vip.justlive.frost.api.model.JobSharding;
/**
* 默认Job上下文
@ -16,6 +18,8 @@ public class DefaultJobContext implements JobContext {
private JobInfo jobInfo;
private JobExecuteParam jobParam;
@Override
public JobInfo getInfo() {
return jobInfo;
@ -25,4 +29,9 @@ public class DefaultJobContext implements JobContext {
public String getParam() {
return jobInfo.getParam();
}
@Override
public JobSharding getSharding() {
return jobParam.getSharding();
}
}

View File

@ -21,7 +21,7 @@ public class JobBeanExecuteWrapper extends AbstractJobExecuteWrapper {
public void doRun() {
this.before();
IJob job = getIJob();
job.execute(new DefaultJobContext(jobInfo));
job.execute(new DefaultJobContext(jobInfo, jobExecuteParam));
}
@Override

View File

@ -1,6 +1,7 @@
package vip.justlive.frost.core.job;
import vip.justlive.frost.api.model.JobInfo;
import vip.justlive.frost.api.model.JobSharding;
/**
* job上下文
@ -13,15 +14,22 @@ public interface JobContext {
/**
* 获取job信息
*
* @return
* @return job信息
*/
JobInfo getInfo();
/**
* 获取参数
*
* @return
* @return 参数
*/
String getParam();
/**
* 获取分片信息
*
* @return 分片信息
*/
JobSharding getSharding();
}

View File

@ -14,6 +14,7 @@ import vip.justlive.frost.api.model.JobExecuteRecord;
import vip.justlive.frost.api.model.JobGroup;
import vip.justlive.frost.api.model.JobInfo;
import vip.justlive.frost.api.model.JobRecordStatus;
import vip.justlive.frost.api.model.JobSharding;
import vip.justlive.frost.core.config.JobProperties;
import vip.justlive.frost.core.dispacher.Dispatcher;
import vip.justlive.frost.core.notify.Event;
@ -102,8 +103,24 @@ public class JobDispatchWrapper extends AbstractWrapper {
param.setFailRetry(failRetry);
Dispatcher dispatcher = SpringBeansHolder.getBean(Dispatcher.class);
dispatcher.dispatch(param);
if (jobInfo.isUseSharding()) {
handleSharding(dispatcher);
} else {
dispatcher.dispatch(param);
}
}
void handleSharding(Dispatcher dispatcher) {
Integer total = jobInfo.getSharding();
if (total == null) {
total = dispatcher.count(param);
}
for (int i = 0; i < total; i++) {
JobSharding sharding = new JobSharding(i, total);
param.setSharding(sharding);
dispatcher.dispatch(param);
}
}
@Override

View File

@ -27,7 +27,7 @@ public class JobScriptExecuteWrapper extends AbstractJobExecuteWrapper {
throw Exceptions.fail("30002", "执行job类型不匹配");
}
IJob job = getIJob();
job.execute(new DefaultJobContext(jobInfo));
job.execute(new DefaultJobContext(jobInfo, jobExecuteParam));
}
@Override

View File

@ -0,0 +1,39 @@
package vip.justlive.frost.executor.example;
import java.util.List;
import org.springframework.stereotype.Component;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import vip.justlive.frost.api.model.JobSharding;
import vip.justlive.frost.core.job.IJob;
import vip.justlive.frost.core.job.Job;
import vip.justlive.frost.core.job.JobContext;
@Slf4j
@Job(value = "shardingJob", desc = "分片job例子")
@Component
public class ShardingJob implements IJob {
@Override
public void execute(JobContext ctx) {
int max = 10;
JobSharding sharding = ctx.getSharding();
String msg = "未选择分片任务";
if (sharding != null) {
msg = String.format("[%s of %s]", sharding.getIndex(), sharding.getTotal());
}
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < max; i++) {
if (sharding != null) {
if (i % sharding.getTotal() == sharding.getIndex()) {
list.add(i);
}
} else {
list.add(i);
}
}
log.info("{},执行结果:{}", msg, list);
}
}

View File

@ -54,7 +54,11 @@ public class RedisDispatcher implements Dispatcher {
@Override
public void checkDispatch(JobExecuteParam param) {
count(param);
}
@Override
public int count(JobExecuteParam param) {
// redisson 当没有worker时候调用countActiveWorkers会阻塞
// 由于计算count是基于订阅模式下的publish触发增加各自worker到workersCounterAtomicLong事件
// 再去获取semaphore最后返回workersCounterAtomicLong的数值
@ -68,6 +72,7 @@ public class RedisDispatcher implements Dispatcher {
if (workers == 0) {
throw Exceptions.fail("30000", "没有可调度的执行器");
}
return workers;
}
/**