mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 11:17:54 +08:00
[Feature-5062][*] Create/Edit/Delete Worker Group in Web UI (#5113)
* [Improvement][Server] Rename MASTER_PREFIX to MASTER_TYPE and WORKER_PREFIX to WORKER_TYPE * [Feature][Api&Dao] Support api and dao of worker group * [Feature][UI] Support to create, edit and delete worker group * [Feature][API&UI] Support work groups from database and zookeeper to display together in worker group management * [Feature][API&Dao] Support long ip list and check request parameters when saving worker group * [Feature][*] Refactor ipList to addrList * [Feature][Server] Support host manager to select worker host by worker group in database * [Improvement][Server] Improve the config of master and worker * [Improvement][*] Rename zkRegistered to systemDefault * [Improvement][Test] Fix unit test and improve ui * [Improvement][Server] Improve getServerMaps Co-authored-by: dailidong <dailidong66@gmail.com>
This commit is contained in:
parent
f109a758f8
commit
0d065c0712
@ -15,7 +15,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# master execute thread num
|
||||
# master execute thread number
|
||||
master.exec.threads=${MASTER_EXEC_THREADS}
|
||||
|
||||
# master execute task number in parallel
|
||||
|
@ -15,12 +15,15 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# worker execute thread num
|
||||
# worker execute thread number
|
||||
worker.exec.threads=${WORKER_EXEC_THREADS}
|
||||
|
||||
# worker heartbeat interval
|
||||
worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL}
|
||||
|
||||
# worker host weight to dispatch tasks, default value 100
|
||||
worker.host.weight=${WORKER_HOST_WEIGHT}
|
||||
|
||||
# worker tenant auto create
|
||||
worker.tenant.auto.create=true
|
||||
|
||||
@ -33,11 +36,8 @@ worker.reserved.memory=${WORKER_RESERVED_MEMORY}
|
||||
# worker listener port
|
||||
worker.listen.port=${WORKER_LISTEN_PORT}
|
||||
|
||||
# default worker groups
|
||||
# default worker groups, if this worker belongs different groups, you can config the following like that 'worker.groups=default,test'
|
||||
worker.groups=${WORKER_GROUPS}
|
||||
|
||||
# default worker host weight
|
||||
worker.host.weight=${WORKER_HOST_WEIGHT}
|
||||
|
||||
# alert server listener host
|
||||
alert.listen.host=${ALERT_LISTEN_HOST}
|
||||
|
@ -134,7 +134,7 @@ The Configuration file is `values.yaml`, and the following tables lists the conf
|
||||
| `master.tolerations` | If specified, the pod's tolerations | `{}` |
|
||||
| `master.resources` | The `resource` limit and request config for master server | `{}` |
|
||||
| `master.configmap.DOLPHINSCHEDULER_OPTS` | The java options for master server | `""` |
|
||||
| `master.configmap.MASTER_EXEC_THREADS` | Master execute thread num | `100` |
|
||||
| `master.configmap.MASTER_EXEC_THREADS` | Master execute thread number | `100` |
|
||||
| `master.configmap.MASTER_EXEC_TASK_NUM` | Master execute task number in parallel | `20` |
|
||||
| `master.configmap.MASTER_HEARTBEAT_INTERVAL` | Master heartbeat interval | `10` |
|
||||
| `master.configmap.MASTER_TASK_COMMIT_RETRYTIMES` | Master commit task retry times | `5` |
|
||||
@ -167,13 +167,13 @@ The Configuration file is `values.yaml`, and the following tables lists the conf
|
||||
| `worker.tolerations` | If specified, the pod's tolerations | `{}` |
|
||||
| `worker.resources` | The `resource` limit and request config for worker server | `{}` |
|
||||
| `worker.configmap.DOLPHINSCHEDULER_OPTS` | The java options for worker server | `""` |
|
||||
| `worker.configmap.WORKER_EXEC_THREADS` | Worker execute thread num | `100` |
|
||||
| `worker.configmap.WORKER_EXEC_THREADS` | Worker execute thread number | `100` |
|
||||
| `worker.configmap.WORKER_HEARTBEAT_INTERVAL` | Worker heartbeat interval | `10` |
|
||||
| `worker.configmap.WORKER_MAX_CPULOAD_AVG` | Only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 | `100` |
|
||||
| `worker.configmap.WORKER_RESERVED_MEMORY` | Only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G | `0.1` |
|
||||
| `worker.configmap.WORKER_LISTEN_PORT` | Worker listen port | `1234` |
|
||||
| `worker.configmap.WORKER_GROUPS` | Worker groups | `default` |
|
||||
| `worker.configmap.WORKER_HOST_WEIGHT` | Worker host weight | `100` |
|
||||
| `worker.configmap.WORKER_HOST_WEIGHT` | Worker host weight | `100` |
|
||||
| `worker.livenessProbe.enabled` | Turn on and off liveness probe | `true` |
|
||||
| `worker.livenessProbe.initialDelaySeconds` | Delay before liveness probe is initiated | `30` |
|
||||
| `worker.livenessProbe.periodSeconds` | How often to perform the probe | `30` |
|
||||
|
@ -17,10 +17,13 @@
|
||||
|
||||
package org.apache.dolphinscheduler.api.controller;
|
||||
|
||||
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL;
|
||||
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL;
|
||||
import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR;
|
||||
|
||||
import org.apache.dolphinscheduler.api.exceptions.ApiException;
|
||||
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
|
||||
import org.apache.dolphinscheduler.api.utils.RegexUtils;
|
||||
import org.apache.dolphinscheduler.api.utils.Result;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
|
||||
@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestAttribute;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
@ -58,6 +62,35 @@ public class WorkerGroupController extends BaseController {
|
||||
@Autowired
|
||||
WorkerGroupService workerGroupService;
|
||||
|
||||
/**
|
||||
* create or update a worker group
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param id worker group id
|
||||
* @param name worker group name
|
||||
* @param addrList addr list
|
||||
* @return create or update result code
|
||||
*/
|
||||
@ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
|
||||
@ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
|
||||
@ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String")
|
||||
})
|
||||
@PostMapping(value = "/save")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
@ApiException(SAVE_ERROR)
|
||||
public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
|
||||
@RequestParam(value = "name") String name,
|
||||
@RequestParam(value = "addrList") String addrList
|
||||
) {
|
||||
logger.info("save worker group: login user {}, id:{}, name: {}, addrList: {} ",
|
||||
RegexUtils.escapeNRT(loginUser.getUserName()), id, RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(addrList));
|
||||
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList);
|
||||
return returnDataList(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* query worker groups paging
|
||||
*
|
||||
@ -69,21 +102,20 @@ public class WorkerGroupController extends BaseController {
|
||||
*/
|
||||
@ApiOperation(value = "queryAllWorkerGroupsPaging", notes = "QUERY_WORKER_GROUP_PAGING_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
|
||||
@ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
|
||||
@ApiImplicitParam(name = "ipList", value = "WORKER_IP_LIST", required = true, dataType = "String")
|
||||
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
|
||||
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20"),
|
||||
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String")
|
||||
})
|
||||
@GetMapping(value = "/list-paging")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
@ApiException(QUERY_WORKER_GROUP_FAIL)
|
||||
public Result queryAllWorkerGroupsPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@RequestParam("pageNo") Integer pageNo,
|
||||
@RequestParam(value = "searchVal", required = false) String searchVal,
|
||||
@RequestParam("pageSize") Integer pageSize
|
||||
@RequestParam("pageSize") Integer pageSize,
|
||||
@RequestParam(value = "searchVal", required = false) String searchVal
|
||||
) {
|
||||
logger.info("query all worker group paging: login user {}, pageNo:{}, pageSize:{}, searchVal:{}",
|
||||
loginUser.getUserName(), pageNo, pageSize, searchVal);
|
||||
|
||||
RegexUtils.escapeNRT(loginUser.getUserName()), pageNo, pageSize, searchVal);
|
||||
searchVal = ParameterUtils.handleEscapes(searchVal);
|
||||
Map<String, Object> result = workerGroupService.queryAllGroupPaging(loginUser, pageNo, pageSize, searchVal);
|
||||
return returnDataListPaging(result);
|
||||
@ -99,14 +131,32 @@ public class WorkerGroupController extends BaseController {
|
||||
@GetMapping(value = "/all-groups")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
@ApiException(QUERY_WORKER_GROUP_FAIL)
|
||||
public Result queryAllWorkerGroups(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser
|
||||
) {
|
||||
logger.info("query all worker group: login user {}",
|
||||
loginUser.getUserName());
|
||||
|
||||
public Result queryAllWorkerGroups(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
|
||||
logger.info("query all worker group: login user {}", RegexUtils.escapeNRT(loginUser.getUserName()));
|
||||
Map<String, Object> result = workerGroupService.queryAllGroup();
|
||||
return returnDataList(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* delete worker group by id
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param id group id
|
||||
* @return delete result code
|
||||
*/
|
||||
@ApiOperation(value = "deleteById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"),
|
||||
})
|
||||
@PostMapping(value = "/delete-by-id")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
@ApiException(DELETE_WORKER_GROUP_FAIL)
|
||||
public Result deleteById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@RequestParam("id") Integer id
|
||||
) {
|
||||
logger.info("delete worker group: login user {}, id:{} ", RegexUtils.escapeNRT(loginUser.getUserName()), id);
|
||||
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(loginUser, id);
|
||||
return returnDataList(result);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -205,6 +205,10 @@ public enum Status {
|
||||
DATASOURCE_OTHER_PARAMS_ILLEGAL(10171, "datasource other params illegal", "数据源其他参数不合法"),
|
||||
DATASOURCE_NAME_ILLEGAL(10172, "datasource name illegal", "数据源名称不合法"),
|
||||
DATASOURCE_HOST_ILLEGAL(10173, "datasource host illegal", "数据源HOST不合法"),
|
||||
DELETE_WORKER_GROUP_NOT_EXIST(10174, "delete worker group not exist ", "删除worker分组不存在"),
|
||||
CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10175, "create worker group forbidden in docker ", "创建worker分组在docker中禁止"),
|
||||
DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10176, "delete worker group forbidden in docker ", "删除worker分组在docker中禁止"),
|
||||
WORKER_ADDRESS_INVALID(10177, "worker address {0} invalid", "worker地址[{0}]无效"),
|
||||
|
||||
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
|
||||
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
|
||||
|
@ -26,6 +26,17 @@ import java.util.Map;
|
||||
*/
|
||||
public interface WorkerGroupService {
|
||||
|
||||
/**
|
||||
* create or update a worker group
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param id worker group id
|
||||
* @param name worker group name
|
||||
* @param addrList addr list
|
||||
* @return create or update result code
|
||||
*/
|
||||
Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList);
|
||||
|
||||
/**
|
||||
* query worker group paging
|
||||
*
|
||||
@ -44,4 +55,11 @@ public interface WorkerGroupService {
|
||||
*/
|
||||
Map<String, Object> queryAllGroup();
|
||||
|
||||
/**
|
||||
* delete worker group by id
|
||||
* @param id worker group id
|
||||
* @return delete result code
|
||||
*/
|
||||
Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id);
|
||||
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
|
||||
|
||||
checkNotNull(zookeeperMonitor);
|
||||
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
|
||||
return zookeeperMonitor.getServersList(zkNodeType);
|
||||
return zookeeperMonitor.getServerList(zkNodeType);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,34 +17,34 @@
|
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
|
||||
import static org.apache.dolphinscheduler.common.Constants.SLASH;
|
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status;
|
||||
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
|
||||
import org.apache.dolphinscheduler.api.utils.PageInfo;
|
||||
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.DateUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.User;
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
* worker group service impl
|
||||
@ -54,12 +54,115 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
WorkerGroupMapper workerGroupMapper;
|
||||
|
||||
@Autowired
|
||||
protected ZookeeperCachedOperator zookeeperCachedOperator;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMonitor zookeeperMonitor;
|
||||
|
||||
@Autowired
|
||||
ProcessInstanceMapper processInstanceMapper;
|
||||
|
||||
/**
|
||||
* create or update a worker group
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param id worker group id
|
||||
* @param name worker group name
|
||||
* @param addrList addr list
|
||||
* @return create or update result code
|
||||
*/
|
||||
@Override
|
||||
public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
if (isNotAdmin(loginUser, result)) {
|
||||
return result;
|
||||
}
|
||||
if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) {
|
||||
putMsg(result, Status.CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER);
|
||||
return result;
|
||||
}
|
||||
if (StringUtils.isEmpty(name)) {
|
||||
putMsg(result, Status.NAME_NULL);
|
||||
return result;
|
||||
}
|
||||
Date now = new Date();
|
||||
WorkerGroup workerGroup;
|
||||
if (id != 0) {
|
||||
workerGroup = workerGroupMapper.selectById(id);
|
||||
// check exist
|
||||
if (workerGroup == null) {
|
||||
workerGroup = new WorkerGroup();
|
||||
workerGroup.setCreateTime(now);
|
||||
}
|
||||
} else {
|
||||
workerGroup = new WorkerGroup();
|
||||
workerGroup.setCreateTime(now);
|
||||
}
|
||||
workerGroup.setName(name);
|
||||
workerGroup.setAddrList(addrList);
|
||||
workerGroup.setUpdateTime(now);
|
||||
|
||||
if (checkWorkerGroupNameExists(workerGroup)) {
|
||||
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
|
||||
return result;
|
||||
}
|
||||
String invalidAddr = checkWorkerGroupAddrList(workerGroup);
|
||||
if (invalidAddr != null) {
|
||||
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
|
||||
return result;
|
||||
}
|
||||
if (workerGroup.getId() != 0) {
|
||||
workerGroupMapper.updateById(workerGroup);
|
||||
} else {
|
||||
workerGroupMapper.insert(workerGroup);
|
||||
}
|
||||
putMsg(result, Status.SUCCESS);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* check worker group name exists
|
||||
* @param workerGroup worker group
|
||||
* @return boolean
|
||||
*/
|
||||
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
|
||||
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
|
||||
if (CollectionUtils.isNotEmpty(workerGroupList)) {
|
||||
// new group has same name
|
||||
if (workerGroup.getId() == 0) {
|
||||
return true;
|
||||
}
|
||||
// check group id
|
||||
for (WorkerGroup group : workerGroupList) {
|
||||
if (group.getId() != workerGroup.getId()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// check zookeeper
|
||||
String workerGroupPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS + Constants.SLASH + workerGroup.getName();
|
||||
return zookeeperCachedOperator.isExisted(workerGroupPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* check worker group addr list
|
||||
* @param workerGroup worker group
|
||||
* @return boolean
|
||||
*/
|
||||
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
|
||||
Map<String, String> serverMaps = zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true);
|
||||
for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
|
||||
if (!serverMaps.containsKey(addr)) {
|
||||
return addr;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* query worker group paging
|
||||
*
|
||||
@ -82,7 +185,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
|
||||
}
|
||||
|
||||
List<WorkerGroup> workerGroups = getWorkerGroups(true);
|
||||
|
||||
List<WorkerGroup> resultDataList = new ArrayList<>();
|
||||
|
||||
if (CollectionUtils.isNotEmpty(workerGroups)) {
|
||||
@ -98,10 +200,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
|
||||
searchValDataList = workerGroups;
|
||||
}
|
||||
|
||||
if (searchValDataList.size() < pageSize) {
|
||||
toIndex = (pageNo - 1) * pageSize + searchValDataList.size();
|
||||
if (fromIndex < searchValDataList.size()) {
|
||||
if (toIndex > searchValDataList.size()) {
|
||||
toIndex = searchValDataList.size();
|
||||
}
|
||||
resultDataList = searchValDataList.subList(fromIndex, toIndex);
|
||||
}
|
||||
resultDataList = searchValDataList.subList(fromIndex, toIndex);
|
||||
}
|
||||
|
||||
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
|
||||
@ -121,13 +225,16 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
|
||||
@Override
|
||||
public Map<String, Object> queryAllGroup() {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
|
||||
List<WorkerGroup> workerGroups = getWorkerGroups(false);
|
||||
|
||||
Set<String> availableWorkerGroupSet = workerGroups.stream()
|
||||
List<String> availableWorkerGroupList = workerGroups.stream()
|
||||
.map(WorkerGroup::getName)
|
||||
.collect(Collectors.toSet());
|
||||
result.put(Constants.DATA_LIST, availableWorkerGroupSet);
|
||||
.collect(Collectors.toList());
|
||||
int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP);
|
||||
if (index > -1) {
|
||||
availableWorkerGroupList.remove(index);
|
||||
availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP);
|
||||
}
|
||||
result.put(Constants.DATA_LIST, availableWorkerGroupList);
|
||||
putMsg(result, Status.SUCCESS);
|
||||
return result;
|
||||
}
|
||||
@ -139,8 +246,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
|
||||
* @return WorkerGroup list
|
||||
*/
|
||||
private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
|
||||
// worker groups from database
|
||||
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
|
||||
// worker groups from zookeeper
|
||||
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
|
||||
List<WorkerGroup> workerGroups = new ArrayList<>();
|
||||
List<String> workerGroupList = null;
|
||||
try {
|
||||
workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
|
||||
@ -148,32 +257,70 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
|
||||
logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
|
||||
}
|
||||
|
||||
if (workerGroupList == null || workerGroupList.isEmpty()) {
|
||||
if (!isPaging) {
|
||||
if (CollectionUtils.isEmpty(workerGroupList)) {
|
||||
if (CollectionUtils.isEmpty(workerGroups) && !isPaging) {
|
||||
WorkerGroup wg = new WorkerGroup();
|
||||
wg.setName(DEFAULT_WORKER_GROUP);
|
||||
wg.setName(Constants.DEFAULT_WORKER_GROUP);
|
||||
workerGroups.add(wg);
|
||||
}
|
||||
return workerGroups;
|
||||
}
|
||||
|
||||
for (String workerGroup : workerGroupList) {
|
||||
String workerGroupPath = workerPath + SLASH + workerGroup;
|
||||
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
|
||||
if (CollectionUtils.isEmpty(childrenNodes)) {
|
||||
String workerGroupPath = workerPath + Constants.SLASH + workerGroup;
|
||||
List<String> childrenNodes = null;
|
||||
try {
|
||||
childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
|
||||
} catch (Exception e) {
|
||||
logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
|
||||
}
|
||||
if (childrenNodes == null || childrenNodes.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
WorkerGroup wg = new WorkerGroup();
|
||||
wg.setName(workerGroup);
|
||||
if (isPaging) {
|
||||
wg.setIpList(childrenNodes.stream().map(node -> Host.of(node).getIp()).collect(Collectors.toList()));
|
||||
String registeredValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + childrenNodes.get(0));
|
||||
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6]));
|
||||
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7]));
|
||||
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
|
||||
String registeredValue = zookeeperCachedOperator.get(workerGroupPath + Constants.SLASH + childrenNodes.get(0));
|
||||
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6]));
|
||||
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7]));
|
||||
wg.setSystemDefault(true);
|
||||
}
|
||||
workerGroups.add(wg);
|
||||
}
|
||||
return workerGroups;
|
||||
}
|
||||
|
||||
/**
|
||||
* delete worker group by id
|
||||
* @param id worker group id
|
||||
* @return delete result code
|
||||
*/
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
if (isNotAdmin(loginUser, result)) {
|
||||
return result;
|
||||
}
|
||||
if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) {
|
||||
putMsg(result, Status.DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER);
|
||||
return result;
|
||||
}
|
||||
WorkerGroup workerGroup = workerGroupMapper.selectById(id);
|
||||
if (workerGroup == null) {
|
||||
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
|
||||
return result;
|
||||
}
|
||||
List<ProcessInstance> processInstances = processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES);
|
||||
if (CollectionUtils.isNotEmpty(processInstances)) {
|
||||
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
|
||||
return result;
|
||||
}
|
||||
workerGroupMapper.deleteById(id);
|
||||
processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), "");
|
||||
putMsg(result, Status.SUCCESS);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,92 +14,93 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.api.utils;
|
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.common.model.Server;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
|
||||
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* monitor zookeeper info
|
||||
* monitor zookeeper info
|
||||
*/
|
||||
@Component
|
||||
public class ZookeeperMonitor extends AbstractZKClient {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class);
|
||||
|
||||
/**
|
||||
*
|
||||
* @return zookeeper info list
|
||||
*/
|
||||
public List<ZookeeperRecord> zookeeperInfoList(){
|
||||
String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", "");
|
||||
try{
|
||||
return zookeeperInfoList(zookeeperServers);
|
||||
}catch(Exception e){
|
||||
LOG.error(e.getMessage(),e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* @return zookeeper info list
|
||||
*/
|
||||
public List<ZookeeperRecord> zookeeperInfoList() {
|
||||
String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", "");
|
||||
try {
|
||||
return zookeeperInfoList(zookeeperServers);
|
||||
} catch (Exception e) {
|
||||
LOG.error(e.getMessage(),e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* get master servers
|
||||
* @return master server information
|
||||
*/
|
||||
public List<Server> getMasterServers(){
|
||||
return getServersList(ZKNodeType.MASTER);
|
||||
}
|
||||
/**
|
||||
* get master servers
|
||||
* @return master server information
|
||||
*/
|
||||
public List<Server> getMasterServers() {
|
||||
return getServerList(ZKNodeType.MASTER);
|
||||
}
|
||||
|
||||
/**
|
||||
* master construct is the same with worker, use the master instead
|
||||
* @return worker server informations
|
||||
*/
|
||||
public List<Server> getWorkerServers(){
|
||||
return getServersList(ZKNodeType.WORKER);
|
||||
}
|
||||
/**
|
||||
* master construct is the same with worker, use the master instead
|
||||
* @return worker server informations
|
||||
*/
|
||||
public List<Server> getWorkerServers() {
|
||||
return getServerList(ZKNodeType.WORKER);
|
||||
}
|
||||
|
||||
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {
|
||||
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {
|
||||
|
||||
List<ZookeeperRecord> list = new ArrayList<>(5);
|
||||
List<ZookeeperRecord> list = new ArrayList<>(5);
|
||||
|
||||
if(StringUtils.isNotBlank(zookeeperServers)){
|
||||
String[] zookeeperServersArray = zookeeperServers.split(",");
|
||||
|
||||
for (String zookeeperServer : zookeeperServersArray) {
|
||||
ZooKeeperState state = new ZooKeeperState(zookeeperServer);
|
||||
boolean ok = state.ruok();
|
||||
if(ok){
|
||||
state.getZookeeperInfo();
|
||||
}
|
||||
if (StringUtils.isNotBlank(zookeeperServers)) {
|
||||
String[] zookeeperServersArray = zookeeperServers.split(",");
|
||||
|
||||
int connections = state.getConnections();
|
||||
int watches = state.getWatches();
|
||||
long sent = state.getSent();
|
||||
long received = state.getReceived();
|
||||
String mode = state.getMode();
|
||||
float minLatency = state.getMinLatency();
|
||||
float avgLatency = state.getAvgLatency();
|
||||
float maxLatency = state.getMaxLatency();
|
||||
int nodeCount = state.getNodeCount();
|
||||
int status = ok ? 1 : 0;
|
||||
Date date = new Date();
|
||||
for (String zookeeperServer : zookeeperServersArray) {
|
||||
ZooKeeperState state = new ZooKeeperState(zookeeperServer);
|
||||
boolean ok = state.ruok();
|
||||
if (ok) {
|
||||
state.getZookeeperInfo();
|
||||
}
|
||||
|
||||
ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date);
|
||||
list.add(zookeeperRecord);
|
||||
int connections = state.getConnections();
|
||||
int watches = state.getWatches();
|
||||
long sent = state.getSent();
|
||||
long received = state.getReceived();
|
||||
String mode = state.getMode();
|
||||
float minLatency = state.getMinLatency();
|
||||
float avgLatency = state.getAvgLatency();
|
||||
float maxLatency = state.getMaxLatency();
|
||||
int nodeCount = state.getNodeCount();
|
||||
int status = ok ? 1 : 0;
|
||||
Date date = new Date();
|
||||
|
||||
}
|
||||
}
|
||||
ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date);
|
||||
list.add(zookeeperRecord);
|
||||
|
||||
return list;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
@ -17,21 +17,25 @@
|
||||
|
||||
package org.apache.dolphinscheduler.api.service;
|
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status;
|
||||
import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
|
||||
import org.apache.dolphinscheduler.api.utils.PageInfo;
|
||||
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.UserType;
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.User;
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -51,12 +55,20 @@ public class WorkerGroupServiceTest {
|
||||
@InjectMocks
|
||||
private WorkerGroupServiceImpl workerGroupService;
|
||||
|
||||
@Mock
|
||||
private WorkerGroupMapper workerGroupMapper;
|
||||
|
||||
@Mock
|
||||
private ProcessInstanceMapper processInstanceMapper;
|
||||
|
||||
@Mock
|
||||
private ZookeeperCachedOperator zookeeperCachedOperator;
|
||||
|
||||
@Mock
|
||||
private ZookeeperMonitor zookeeperMonitor;
|
||||
|
||||
private String groupName = "groupName000001";
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
|
||||
@ -79,6 +91,33 @@ public class WorkerGroupServiceTest {
|
||||
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
|
||||
}
|
||||
|
||||
/**
|
||||
* create or update a worker group
|
||||
*/
|
||||
@Test
|
||||
public void testSaveWorkerGroup() {
|
||||
// worker server maps
|
||||
Map<String, String> serverMaps = new HashMap<>();
|
||||
serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214");
|
||||
Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps);
|
||||
|
||||
User user = new User();
|
||||
// general user add
|
||||
user.setUserType(UserType.GENERAL_USER);
|
||||
Map<String, Object> result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
|
||||
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG));
|
||||
|
||||
// success
|
||||
user.setUserType(UserType.ADMIN_USER);
|
||||
result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
|
||||
Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
|
||||
// group name exist
|
||||
Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2));
|
||||
Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList());
|
||||
result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234");
|
||||
Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS));
|
||||
}
|
||||
|
||||
/**
|
||||
* query worker group paging
|
||||
*/
|
||||
@ -93,17 +132,38 @@ public class WorkerGroupServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryAllGroup() throws Exception {
|
||||
public void testQueryAllGroup() {
|
||||
Map<String, Object> result = workerGroupService.queryAllGroup();
|
||||
Set<String> workerGroups = (Set<String>) result.get(Constants.DATA_LIST);
|
||||
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
|
||||
Assert.assertEquals(workerGroups.size(), 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* delete group by id
|
||||
*/
|
||||
@Test
|
||||
public void testDeleteWorkerGroupById() {
|
||||
User user = new User();
|
||||
user.setUserType(UserType.ADMIN_USER);
|
||||
WorkerGroup wg2 = getWorkerGroup(2);
|
||||
Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2);
|
||||
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(), Constants.NOT_TERMINATED_STATES)).thenReturn(getProcessInstanceList());
|
||||
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(user, 1);
|
||||
Assert.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
|
||||
result = workerGroupService.deleteWorkerGroupById(user, 2);
|
||||
Assert.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
|
||||
// correct
|
||||
WorkerGroup wg3 = getWorkerGroup(3);
|
||||
Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3);
|
||||
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(), Constants.NOT_TERMINATED_STATES)).thenReturn(new ArrayList<>());
|
||||
result = workerGroupService.deleteWorkerGroupById(user, 3);
|
||||
Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
|
||||
}
|
||||
|
||||
/**
|
||||
* get processInstances
|
||||
*/
|
||||
private List<ProcessInstance> getProcessInstanceList() {
|
||||
|
||||
List<ProcessInstance> processInstances = new ArrayList<>();
|
||||
processInstances.add(new ProcessInstance());
|
||||
return processInstances;
|
||||
@ -112,9 +172,30 @@ public class WorkerGroupServiceTest {
|
||||
@Test
|
||||
public void testQueryAllGroupWithDefault() {
|
||||
Map<String, Object> result = workerGroupService.queryAllGroup();
|
||||
Set<String> workerGroups = (Set<String>) result.get(Constants.DATA_LIST);
|
||||
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
|
||||
Assert.assertEquals(1, workerGroups.size());
|
||||
Assert.assertEquals("default", workerGroups.toArray()[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* get Group
|
||||
* @return
|
||||
*/
|
||||
private WorkerGroup getWorkerGroup(int id) {
|
||||
WorkerGroup workerGroup = new WorkerGroup();
|
||||
workerGroup.setName(groupName);
|
||||
workerGroup.setId(id);
|
||||
return workerGroup;
|
||||
}
|
||||
|
||||
private WorkerGroup getWorkerGroup() {
|
||||
return getWorkerGroup(1);
|
||||
}
|
||||
|
||||
private List<WorkerGroup> getList() {
|
||||
List<WorkerGroup> list = new ArrayList<>();
|
||||
list.add(getWorkerGroup());
|
||||
return list;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -785,8 +785,8 @@ public final class Constants {
|
||||
/**
|
||||
* master/worker server use for zk
|
||||
*/
|
||||
public static final String MASTER_PREFIX = "master";
|
||||
public static final String WORKER_PREFIX = "worker";
|
||||
public static final String MASTER_TYPE = "master";
|
||||
public static final String WORKER_TYPE = "worker";
|
||||
public static final String DELETE_ZK_OP = "delete";
|
||||
public static final String ADD_ZK_OP = "add";
|
||||
public static final String ALIAS = "alias";
|
||||
|
@ -25,7 +25,6 @@ public enum ZKNodeType {
|
||||
* 0 master node;
|
||||
* 1 worker node;
|
||||
* 2 dead_server node;
|
||||
* 3 task_queue node;
|
||||
*/
|
||||
MASTER, WORKER, DEAD_SERVER, TASK_QUEUE;
|
||||
MASTER, WORKER, DEAD_SERVER;
|
||||
}
|
||||
|
@ -14,30 +14,62 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.dao.entity;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* worker group
|
||||
*/
|
||||
@TableName("t_ds_worker_group")
|
||||
public class WorkerGroup {
|
||||
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private int id;
|
||||
|
||||
private String name;
|
||||
|
||||
private List<String> ipList;
|
||||
private String addrList;
|
||||
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
|
||||
private Date createTime;
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
|
||||
|
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
|
||||
private Date updateTime;
|
||||
|
||||
@TableField(exist = false)
|
||||
private boolean systemDefault;
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getAddrList() {
|
||||
return addrList;
|
||||
}
|
||||
|
||||
public void setAddrList(String addrList) {
|
||||
this.addrList = addrList;
|
||||
}
|
||||
|
||||
public Date getCreateTime() {
|
||||
return createTime;
|
||||
@ -55,21 +87,24 @@ public class WorkerGroup {
|
||||
this.updateTime = updateTime;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
public boolean getSystemDefault() {
|
||||
return systemDefault;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
public void setSystemDefault(boolean systemDefault) {
|
||||
this.systemDefault = systemDefault;
|
||||
}
|
||||
|
||||
public List<String> getIpList() {
|
||||
return ipList;
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WorkerGroup{"
|
||||
+ "id= " + id
|
||||
+ ", name= " + name
|
||||
+ ", addrList= " + addrList
|
||||
+ ", createTime= " + createTime
|
||||
+ ", updateTime= " + updateTime
|
||||
+ ", systemDefault= " + systemDefault
|
||||
+ "}";
|
||||
}
|
||||
|
||||
public void setIpList(List<String> ipList) {
|
||||
this.ipList = ipList;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -62,12 +62,12 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
|
||||
|
||||
/**
|
||||
* query process instance by worker group and stateArray
|
||||
* @param workerGroupId workerGroupId
|
||||
* @param workerGroupName workerGroupName
|
||||
* @param states states array
|
||||
* @return process instance list
|
||||
*/
|
||||
List<ProcessInstance> queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId,
|
||||
@Param("states") int[] states);
|
||||
List<ProcessInstance> queryByWorkerGroupNameAndStatus(@Param("workerGroupName") String workerGroupName,
|
||||
@Param("states") int[] states);
|
||||
|
||||
/**
|
||||
* process instance page
|
||||
@ -134,12 +134,13 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
|
||||
@Param("destTenantId") int destTenantId);
|
||||
|
||||
/**
|
||||
* update process instance by worker groupId
|
||||
* @param originWorkerGroupId originWorkerGroupId
|
||||
* @param destWorkerGroupId destWorkerGroupId
|
||||
* update process instance by worker group name
|
||||
* @param originWorkerGroupName originWorkerGroupName
|
||||
* @param destWorkerGroupName destWorkerGroupName
|
||||
* @return update result
|
||||
*/
|
||||
int updateProcessInstanceByWorkerGroupId(@Param("originWorkerGroupId") int originWorkerGroupId, @Param("destWorkerGroupId") int destWorkerGroupId);
|
||||
int updateProcessInstanceByWorkerGroupName(@Param("originWorkerGroupName") String originWorkerGroupName,
|
||||
@Param("destWorkerGroupName") String destWorkerGroupName);
|
||||
|
||||
/**
|
||||
* count process instance state by user
|
||||
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper;
|
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
||||
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
|
||||
/**
|
||||
* worker group mapper interface
|
||||
*/
|
||||
public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
|
||||
|
||||
/**
|
||||
* query all worker group
|
||||
* @return worker group list
|
||||
*/
|
||||
List<WorkerGroup> queryAllWorkerGroup();
|
||||
|
||||
/**
|
||||
* query worer grouop by name
|
||||
* @param name name
|
||||
* @return worker group list
|
||||
*/
|
||||
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
|
||||
|
||||
}
|
@ -72,13 +72,13 @@
|
||||
order by id asc
|
||||
</select>
|
||||
|
||||
<select id="queryByWorkerGroupIdAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
|
||||
<select id="queryByWorkerGroupNameAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
|
||||
select
|
||||
<include refid="baseSql"/>
|
||||
from t_ds_process_instance
|
||||
where 1=1
|
||||
<if test="workerGroupId != -1">
|
||||
and worker_group_id =#{workerGroupId}
|
||||
<if test="workerGroupName != ''">
|
||||
and worker_group =#{workerGroupName}
|
||||
</if>
|
||||
and state in
|
||||
<foreach collection="states" item="i" open="(" close=")" separator=",">
|
||||
@ -138,10 +138,10 @@
|
||||
where tenant_id = #{originTenantId}
|
||||
</update>
|
||||
|
||||
<update id="updateProcessInstanceByWorkerGroupId">
|
||||
<update id="updateProcessInstanceByWorkerGroupName">
|
||||
update t_ds_process_instance
|
||||
set worker_group_id = #{destWorkerGroupId}
|
||||
where worker_group_id = #{originWorkerGroupId}
|
||||
set worker_group = #{destWorkerGroupName}
|
||||
where worker_group = #{originWorkerGroupName}
|
||||
</update>
|
||||
|
||||
<select id="countInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
|
||||
|
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper">
|
||||
<select id="queryAllWorkerGroup" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
|
||||
select *
|
||||
from t_ds_worker_group
|
||||
order by update_time desc
|
||||
</select>
|
||||
<select id="queryWorkerGroupByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
|
||||
select *
|
||||
from t_ds_worker_group
|
||||
where name = #{name}
|
||||
</select>
|
||||
</mapper>
|
@ -18,18 +18,24 @@
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.ResInfo;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
||||
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@ -45,11 +51,23 @@ public abstract class CommonHostManager implements HostManager {
|
||||
protected ZookeeperRegistryCenter registryCenter;
|
||||
|
||||
/**
|
||||
* zookeeperNodeManager
|
||||
* zookeeper node manager
|
||||
*/
|
||||
@Autowired
|
||||
protected ZookeeperNodeManager zookeeperNodeManager;
|
||||
|
||||
/**
|
||||
* zk master client
|
||||
*/
|
||||
@Autowired
|
||||
protected ZKMasterClient zkMasterClient;
|
||||
|
||||
/**
|
||||
* worker group mapper
|
||||
*/
|
||||
@Autowired
|
||||
protected WorkerGroupMapper workerGroupMapper;
|
||||
|
||||
/**
|
||||
* select host
|
||||
* @param context context
|
||||
@ -57,42 +75,73 @@ public abstract class CommonHostManager implements HostManager {
|
||||
*/
|
||||
@Override
|
||||
public Host select(ExecutionContext context) {
|
||||
Host host = new Host();
|
||||
Collection<String> nodes = null;
|
||||
List<HostWorker> candidates = null;
|
||||
String workerGroup = context.getWorkerGroup();
|
||||
// executor type
|
||||
ExecutorType executorType = context.getExecutorType();
|
||||
switch (executorType) {
|
||||
case WORKER:
|
||||
nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup);
|
||||
candidates = getHostWorkersFromDatabase(workerGroup);
|
||||
if (candidates.isEmpty()) {
|
||||
candidates = getHostWorkersFromZookeeper(workerGroup);
|
||||
}
|
||||
break;
|
||||
case CLIENT:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid executorType : " + executorType);
|
||||
}
|
||||
|
||||
if (CollectionUtils.isEmpty(candidates)) {
|
||||
return new Host();
|
||||
}
|
||||
if (nodes == null || nodes.isEmpty()) {
|
||||
return host;
|
||||
}
|
||||
List<HostWorker> candidateHosts = new ArrayList<>();
|
||||
nodes.forEach(node -> {
|
||||
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
|
||||
String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
|
||||
int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
|
||||
if (StringUtils.isNotEmpty(heartbeat)) {
|
||||
String[] parts = heartbeat.split(Constants.COMMA);
|
||||
if (ResInfo.isNewHeartbeatWithWeight(parts)) {
|
||||
hostWeight = Integer.parseInt(parts[10]);
|
||||
}
|
||||
}
|
||||
candidateHosts.add(HostWorker.of(node, hostWeight, workerGroup));
|
||||
});
|
||||
return select(candidateHosts);
|
||||
return select(candidates);
|
||||
}
|
||||
|
||||
protected abstract HostWorker select(Collection<HostWorker> nodes);
|
||||
|
||||
protected List<HostWorker> getHostWorkersFromDatabase(String workerGroup) {
|
||||
List<HostWorker> hostWorkers = new ArrayList<>();
|
||||
List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(workerGroup);
|
||||
if (CollectionUtils.isNotEmpty(workerGroups)) {
|
||||
Map<String, String> serverMaps = zkMasterClient.getServerMaps(ZKNodeType.WORKER, true);
|
||||
for (WorkerGroup wg : workerGroups) {
|
||||
for (String addr : wg.getAddrList().split(Constants.COMMA)) {
|
||||
if (serverMaps.containsKey(addr)) {
|
||||
String heartbeat = serverMaps.get(addr);
|
||||
int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat);
|
||||
hostWorkers.add(HostWorker.of(addr, hostWeight, workerGroup));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return hostWorkers;
|
||||
}
|
||||
|
||||
protected List<HostWorker> getHostWorkersFromZookeeper(String workerGroup) {
|
||||
List<HostWorker> hostWorkers = new ArrayList<>();
|
||||
Collection<String> nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup);
|
||||
if (CollectionUtils.isNotEmpty(nodes)) {
|
||||
for (String node : nodes) {
|
||||
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
|
||||
String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
|
||||
int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat);
|
||||
hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup));
|
||||
}
|
||||
}
|
||||
return hostWorkers;
|
||||
}
|
||||
|
||||
protected int getWorkerHostWeightFromHeartbeat(String heartbeat) {
|
||||
int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
|
||||
if (StringUtils.isNotEmpty(heartbeat)) {
|
||||
String[] parts = heartbeat.split(Constants.COMMA);
|
||||
if (ResInfo.isNewHeartbeatWithWeight(parts)) {
|
||||
hostWeight = Integer.parseInt(parts[10]);
|
||||
}
|
||||
}
|
||||
return hostWeight;
|
||||
}
|
||||
|
||||
public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
|
||||
this.zookeeperNodeManager = zookeeperNodeManager;
|
||||
}
|
||||
@ -100,4 +149,5 @@ public abstract class CommonHostManager implements HostManager {
|
||||
public ZookeeperNodeManager getZookeeperNodeManager() {
|
||||
return zookeeperNodeManager;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ public class HostManagerConfig {
|
||||
String hostSelector = masterConfig.getHostSelector();
|
||||
HostSelector selector = HostSelector.of(hostSelector);
|
||||
HostManager hostManager;
|
||||
switch (selector){
|
||||
switch (selector) {
|
||||
case RANDOM:
|
||||
hostManager = new RandomHostManager();
|
||||
break;
|
||||
|
@ -18,9 +18,11 @@
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.DateUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.ResInfo;
|
||||
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
@ -28,9 +30,11 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -90,7 +94,7 @@ public class LowerWeightHostManager extends CommonHostManager {
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void close(){
|
||||
public void close() {
|
||||
this.executorService.shutdownNow();
|
||||
}
|
||||
|
||||
@ -137,9 +141,30 @@ public class LowerWeightHostManager extends CommonHostManager {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
|
||||
// from database
|
||||
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
|
||||
if (CollectionUtils.isNotEmpty(workerGroups)) {
|
||||
Map<String, String> serverMaps = zkMasterClient.getServerMaps(ZKNodeType.WORKER, true);
|
||||
for (WorkerGroup wg : workerGroups) {
|
||||
String workerGroup = wg.getName();
|
||||
List<String> addrs = Arrays.asList(wg.getAddrList().split(Constants.COMMA));
|
||||
Set<HostWeight> hostWeights = new HashSet<>(addrs.size());
|
||||
for (String addr : addrs) {
|
||||
if (serverMaps.containsKey(addr)) {
|
||||
String heartbeat = serverMaps.get(addr);
|
||||
HostWeight hostWeight = getHostWeight(addr, workerGroup, heartbeat);
|
||||
if (hostWeight != null) {
|
||||
hostWeights.add(hostWeight);
|
||||
}
|
||||
}
|
||||
}
|
||||
workerHostWeights.put(workerGroup, hostWeights);
|
||||
}
|
||||
}
|
||||
// from zookeeper
|
||||
Map<String, Set<String>> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes();
|
||||
Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
|
||||
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
|
||||
for (Map.Entry<String, Set<String>> entry : entries) {
|
||||
String workerGroup = entry.getKey();
|
||||
Set<String> nodes = entry.getValue();
|
||||
@ -147,20 +172,8 @@ public class LowerWeightHostManager extends CommonHostManager {
|
||||
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
|
||||
for (String node : nodes) {
|
||||
String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
|
||||
if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) {
|
||||
String[] parts = heartbeat.split(Constants.COMMA);
|
||||
int status = Integer.parseInt(parts[8]);
|
||||
if (status == Constants.ABNORMAL_NODE_STATUS) {
|
||||
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}",
|
||||
Double.parseDouble(parts[3]) , Double.parseDouble(parts[2]));
|
||||
continue;
|
||||
}
|
||||
double cpu = Double.parseDouble(parts[0]);
|
||||
double memory = Double.parseDouble(parts[1]);
|
||||
double loadAverage = Double.parseDouble(parts[2]);
|
||||
long startTime = DateUtils.stringToDate(parts[6]).getTime();
|
||||
int weight = ResInfo.isNewHeartbeatWithWeight(parts) ? Integer.parseInt(parts[10]) : Constants.DEFAULT_WORKER_HOST_WEIGHT;
|
||||
HostWeight hostWeight = new HostWeight(HostWorker.of(node, weight, workerGroup), cpu, memory, loadAverage, startTime);
|
||||
HostWeight hostWeight = getHostWeight(node, workerGroup, heartbeat);
|
||||
if (hostWeight != null) {
|
||||
hostWeights.add(hostWeight);
|
||||
}
|
||||
}
|
||||
@ -171,6 +184,25 @@ public class LowerWeightHostManager extends CommonHostManager {
|
||||
logger.error("RefreshResourceTask error", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public HostWeight getHostWeight(String addr, String workerGroup, String heartbeat) {
|
||||
if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) {
|
||||
String[] parts = heartbeat.split(Constants.COMMA);
|
||||
int status = Integer.parseInt(parts[8]);
|
||||
if (status == Constants.ABNORMAL_NODE_STATUS) {
|
||||
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}",
|
||||
Double.parseDouble(parts[3]), Double.parseDouble(parts[2]));
|
||||
return null;
|
||||
}
|
||||
double cpu = Double.parseDouble(parts[0]);
|
||||
double memory = Double.parseDouble(parts[1]);
|
||||
double loadAverage = Double.parseDouble(parts[2]);
|
||||
long startTime = DateUtils.stringToDate(parts[6]).getTime();
|
||||
int weight = getWorkerHostWeightFromHeartbeat(heartbeat);
|
||||
return new HostWeight(HostWorker.of(addr, weight, workerGroup), cpu, memory, loadAverage, startTime);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ public class MasterRegistry {
|
||||
* registry
|
||||
*/
|
||||
public void registry() {
|
||||
String address = NetUtils.getHost();
|
||||
String address = NetUtils.getAddr(masterConfig.getListenPort());
|
||||
String localNodePath = getMasterPath();
|
||||
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
|
||||
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
|
||||
@ -101,7 +101,7 @@ public class MasterRegistry {
|
||||
masterConfig.getMasterMaxCpuloadAvg(),
|
||||
masterConfig.getMasterReservedMemory(),
|
||||
Sets.newHashSet(getMasterPath()),
|
||||
Constants.MASTER_PREFIX,
|
||||
Constants.MASTER_TYPE,
|
||||
zookeeperRegistryCenter);
|
||||
|
||||
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
|
||||
|
@ -113,7 +113,7 @@ public class HeartBeatTask implements Runnable {
|
||||
// save process id
|
||||
builder.append(OSUtils.getProcessID());
|
||||
// worker host weight
|
||||
if (Constants.WORKER_PREFIX.equals(serverType)) {
|
||||
if (Constants.WORKER_TYPE.equals(serverType)) {
|
||||
builder.append(Constants.COMMA).append(hostWeight);
|
||||
}
|
||||
|
||||
|
@ -17,10 +17,8 @@
|
||||
|
||||
package org.apache.dolphinscheduler.server.registry;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
|
||||
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
|
||||
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
|
||||
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.IStoppable;
|
||||
@ -230,9 +228,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
|
||||
// ip_sequence_no
|
||||
String[] zNodesPath = zNode.split("\\/");
|
||||
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
|
||||
|
||||
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
|
||||
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
|
||||
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
|
||||
|
||||
return !registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath);
|
||||
}
|
||||
|
@ -35,6 +35,9 @@ public class WorkerConfig {
|
||||
@Value("${worker.heartbeat.interval:10}")
|
||||
private int workerHeartbeatInterval;
|
||||
|
||||
@Value("${worker.host.weight:100}")
|
||||
private int hostWeight;
|
||||
|
||||
@Value("${worker.tenant.auto.create:false}")
|
||||
private boolean workerTenantAutoCreate;
|
||||
|
||||
@ -44,14 +47,11 @@ public class WorkerConfig {
|
||||
@Value("${worker.reserved.memory:0.3}")
|
||||
private double workerReservedMemory;
|
||||
|
||||
@Value("#{'${worker.groups:default}'.split(',')}")
|
||||
private Set<String> workerGroups;
|
||||
|
||||
@Value("${worker.listen.port:1234}")
|
||||
private int listenPort;
|
||||
|
||||
@Value("${worker.host.weight:100}")
|
||||
private int hostWeight;
|
||||
@Value("#{'${worker.groups:default}'.split(',')}")
|
||||
private Set<String> workerGroups;
|
||||
|
||||
@Value("${alert.listen.host:localhost}")
|
||||
private String alertListenHost;
|
||||
|
@ -99,7 +99,7 @@ public class WorkerRegistry {
|
||||
* registry
|
||||
*/
|
||||
public void registry() {
|
||||
String address = NetUtils.getHost();
|
||||
String address = NetUtils.getAddr(workerConfig.getListenPort());
|
||||
Set<String> workerZkPaths = getWorkerZkPaths();
|
||||
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
|
||||
|
||||
@ -125,7 +125,7 @@ public class WorkerRegistry {
|
||||
workerConfig.getWorkerReservedMemory(),
|
||||
workerConfig.getHostWeight(),
|
||||
workerZkPaths,
|
||||
Constants.WORKER_PREFIX,
|
||||
Constants.WORKER_TYPE,
|
||||
zookeeperRegistryCenter);
|
||||
|
||||
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
|
||||
|
@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.zk;
|
||||
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.IStoppable;
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
import org.apache.dolphinscheduler.common.model.Server;
|
||||
@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
|
||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.MasterServer;
|
||||
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
|
||||
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
@Autowired
|
||||
private MasterRegistry masterRegistry;
|
||||
|
||||
public void start(MasterServer masterServer) {
|
||||
public void start(IStoppable stoppable) {
|
||||
InterProcessMutex mutex = null;
|
||||
try {
|
||||
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
|
||||
@ -83,7 +83,7 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
|
||||
// master registry
|
||||
masterRegistry.registry();
|
||||
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
|
||||
masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable);
|
||||
String registryPath = this.masterRegistry.getMasterPath();
|
||||
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
|
||||
|
||||
@ -282,7 +282,7 @@ public class ZKMasterClient extends AbstractZKClient {
|
||||
return false;
|
||||
}
|
||||
Date workerServerStartDate = null;
|
||||
List<Server> workerServers = getServersList(ZKNodeType.WORKER);
|
||||
List<Server> workerServers = getServerList(ZKNodeType.WORKER);
|
||||
for (Server workerServer : workerServers) {
|
||||
if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) {
|
||||
workerServerStartDate = workerServer.getCreateTime();
|
||||
|
@ -15,7 +15,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# master execute thread num
|
||||
# master execute thread number
|
||||
#master.exec.threads=100
|
||||
|
||||
# master execute task number in parallel
|
||||
|
@ -15,12 +15,15 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# worker execute thread num
|
||||
# worker execute thread number
|
||||
#worker.exec.threads=100
|
||||
|
||||
# worker heartbeat interval
|
||||
#worker.heartbeat.interval=10
|
||||
|
||||
# worker host weight to dispatch tasks, default value 100
|
||||
#worker.host.weight=100
|
||||
|
||||
# worker tenant auto create
|
||||
#worker.tenant.auto.create=false
|
||||
|
||||
@ -33,11 +36,8 @@
|
||||
# worker listener port
|
||||
#worker.listen.port=1234
|
||||
|
||||
# default worker groups
|
||||
# default worker groups, if this worker belongs different groups, you can config the following like that 'worker.groups=default,test'
|
||||
#worker.groups=default
|
||||
|
||||
# default worker host weight
|
||||
#worker.host.weight=100
|
||||
|
||||
# alert server listener host
|
||||
alert.listen.host=localhost
|
||||
|
@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ResourceType;
|
||||
import org.apache.dolphinscheduler.common.model.TaskNode;
|
||||
import org.apache.dolphinscheduler.common.thread.Stopper;
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
|
||||
import org.apache.dolphinscheduler.dao.entity.DataSource;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
@ -36,10 +37,12 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
|
||||
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
|
||||
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
|
||||
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority;
|
||||
@ -66,12 +69,11 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
|
||||
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
|
||||
ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class,
|
||||
CuratorZookeeperClient.class})
|
||||
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, ZKMasterClient.class, TaskPriorityQueueConsumer.class,
|
||||
ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, MasterRegistry.class,
|
||||
CuratorZookeeperClient.class, SpringConnectionFactory.class})
|
||||
public class TaskPriorityQueueConsumerTest {
|
||||
|
||||
|
||||
@Autowired
|
||||
private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
|
||||
|
||||
|
@ -76,38 +76,38 @@ public abstract class AbstractZKClient extends RegisterOperator {
|
||||
* @param zkNodeType zookeeper node type
|
||||
* @return server list
|
||||
*/
|
||||
public List<Server> getServersList(ZKNodeType zkNodeType) {
|
||||
Map<String, String> masterMap = getServerMaps(zkNodeType);
|
||||
public List<Server> getServerList(ZKNodeType zkNodeType) {
|
||||
Map<String, String> serverMaps = getServerMaps(zkNodeType);
|
||||
String parentPath = getZNodeParentPath(zkNodeType);
|
||||
|
||||
List<Server> masterServers = new ArrayList<>();
|
||||
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
|
||||
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
|
||||
if (masterServer == null) {
|
||||
List<Server> serverList = new ArrayList<>();
|
||||
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
|
||||
Server server = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
|
||||
if (server == null) {
|
||||
continue;
|
||||
}
|
||||
String key = entry.getKey();
|
||||
masterServer.setZkDirectory(parentPath + "/" + key);
|
||||
//set host and port
|
||||
server.setZkDirectory(parentPath + "/" + key);
|
||||
// set host and port
|
||||
String[] hostAndPort = key.split(COLON);
|
||||
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
|
||||
// fetch the last one
|
||||
masterServer.setHost(hosts[hosts.length - 1]);
|
||||
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
|
||||
masterServers.add(masterServer);
|
||||
server.setHost(hosts[hosts.length - 1]);
|
||||
server.setPort(Integer.parseInt(hostAndPort[1]));
|
||||
serverList.add(server);
|
||||
}
|
||||
return masterServers;
|
||||
return serverList;
|
||||
}
|
||||
|
||||
/**
|
||||
* get master server list map.
|
||||
* get server list map.
|
||||
*
|
||||
* @param zkNodeType zookeeper node type
|
||||
* @param hostOnly host only
|
||||
* @return result : {host : resource info}
|
||||
*/
|
||||
public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
|
||||
|
||||
Map<String, String> masterMap = new HashMap<>();
|
||||
public Map<String, String> getServerMaps(ZKNodeType zkNodeType, boolean hostOnly) {
|
||||
Map<String, String> serverMap = new HashMap<>();
|
||||
try {
|
||||
String path = getZNodeParentPath(zkNodeType);
|
||||
List<String> serverList = super.getChildrenKeys(path);
|
||||
@ -122,13 +122,27 @@ public abstract class AbstractZKClient extends RegisterOperator {
|
||||
serverList = workerList;
|
||||
}
|
||||
for (String server : serverList) {
|
||||
masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
|
||||
String host = server;
|
||||
if (zkNodeType == ZKNodeType.WORKER && hostOnly) {
|
||||
host = server.split(Constants.SLASH)[1];
|
||||
}
|
||||
serverMap.putIfAbsent(host, super.get(path + Constants.SLASH + server));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("get server list failed", e);
|
||||
}
|
||||
|
||||
return masterMap;
|
||||
return serverMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* get server list map.
|
||||
*
|
||||
* @param zkNodeType zookeeper node type
|
||||
* @return result : {host : resource info}
|
||||
*/
|
||||
public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
|
||||
return getServerMaps(zkNodeType, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -145,7 +159,7 @@ public abstract class AbstractZKClient extends RegisterOperator {
|
||||
host, zkNodeType);
|
||||
return false;
|
||||
}
|
||||
Map<String, String> serverMaps = getServerMaps(zkNodeType);
|
||||
Map<String, String> serverMaps = getServerMaps(zkNodeType, true);
|
||||
for (String hostKey : serverMaps.keySet()) {
|
||||
if (hostKey.contains(host)) {
|
||||
return true;
|
||||
|
@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.service.zk;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
|
||||
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
|
||||
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
|
||||
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
|
||||
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
|
||||
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
|
||||
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
|
||||
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
||||
@ -99,7 +99,7 @@ public class RegisterOperator extends ZookeeperCachedOperator {
|
||||
*/
|
||||
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
|
||||
String host = getHostByEventDataPath(zNode);
|
||||
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
|
||||
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
|
||||
|
||||
//check server restart, if restart , dead server path in zk should be delete
|
||||
if (opType.equals(DELETE_ZK_OP)) {
|
||||
@ -130,7 +130,7 @@ public class RegisterOperator extends ZookeeperCachedOperator {
|
||||
*/
|
||||
public void handleDeadServer(Set<String> zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception {
|
||||
|
||||
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
|
||||
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
|
||||
for (String zNode : zNodeSet) {
|
||||
String host = getHostByEventDataPath(zNode);
|
||||
//check server restart, if restart , dead server path in zk should be delete
|
||||
|
@ -97,7 +97,7 @@ public class RegisterOperatorTest {
|
||||
testAfterPropertiesSet();
|
||||
registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
|
||||
String path = registerOperator.getDeadZNodeParentPath();
|
||||
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
|
||||
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE)));
|
||||
|
||||
}
|
||||
|
||||
@ -107,10 +107,10 @@ public class RegisterOperatorTest {
|
||||
String path = registerOperator.getDeadZNodeParentPath();
|
||||
|
||||
registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
|
||||
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
|
||||
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE)));
|
||||
|
||||
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
|
||||
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
|
||||
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_TYPE);
|
||||
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -37,9 +37,9 @@
|
||||
<el-switch size="small" v-model="sendEmail"></el-switch>
|
||||
</div>
|
||||
<div style="display: inline-block;" v-if="sqlType === '0'">
|
||||
<span class="text-b">{{$t('Display query result')}}</span>
|
||||
<span class="text-b">{{$t('Log display')}}</span>
|
||||
<m-select-input v-model="displayRows" :list="[1,10,25,50,100]" style="width: 70px;"></m-select-input>
|
||||
<span>({{$t('Rows')}})</span>
|
||||
<span>{{$t('rows of result')}}</span>
|
||||
</div>
|
||||
</div>
|
||||
</m-list-box>
|
||||
|
@ -117,11 +117,9 @@
|
||||
this._debounceGET('false')
|
||||
this.createQueueDialog = false
|
||||
},
|
||||
|
||||
close () {
|
||||
this.createQueueDialog = false
|
||||
},
|
||||
|
||||
_getList (flag) {
|
||||
if (sessionStorage.getItem('isLeft') === 0) {
|
||||
this.isLeft = false
|
||||
|
@ -119,11 +119,9 @@
|
||||
this._debounceGET('false')
|
||||
this.createTenementDialog = false
|
||||
},
|
||||
|
||||
close () {
|
||||
this.createTenementDialog = false
|
||||
},
|
||||
|
||||
_getList (flag) {
|
||||
if (sessionStorage.getItem('isLeft') === 0) {
|
||||
this.isLeft = false
|
||||
|
@ -48,7 +48,7 @@
|
||||
<span>{{scope.row.updateTime | formatDate}}</span>
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('Operation')" width="120" fixed="right">
|
||||
<el-table-column :label="$t('Operation')" width="100" fixed="right">
|
||||
<template slot-scope="scope">
|
||||
<el-tooltip :content="$t('Authorize')" placement="top">
|
||||
<el-dropdown trigger="click">
|
||||
|
@ -113,16 +113,13 @@
|
||||
this.item = item
|
||||
this.createUserDialog = true
|
||||
},
|
||||
|
||||
onUpdate () {
|
||||
this._debounceGET('false')
|
||||
this.createUserDialog = false
|
||||
},
|
||||
|
||||
close () {
|
||||
this.createUserDialog = false
|
||||
},
|
||||
|
||||
_getList (flag) {
|
||||
if (sessionStorage.getItem('isLeft') === 0) {
|
||||
this.isLeft = false
|
||||
|
@ -35,7 +35,7 @@
|
||||
<span>{{scope.row.updateTime | formatDate}}</span>
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('Operation')" width="130">
|
||||
<el-table-column :label="$t('Operation')" width="100">
|
||||
<template slot-scope="scope">
|
||||
<el-tooltip :content="$t('Edit')" placement="top">
|
||||
<span><el-button type="primary" size="mini" icon="el-icon-edit-outline" @click="_edit(scope.row)" circle></el-button></span>
|
||||
|
@ -121,16 +121,13 @@
|
||||
this.item = item
|
||||
this.createWarningDialog = true
|
||||
},
|
||||
|
||||
onUpdate () {
|
||||
this._debounceGET('false')
|
||||
this.createWarningDialog = false
|
||||
},
|
||||
|
||||
close () {
|
||||
this.createWarningDialog = false
|
||||
},
|
||||
|
||||
_getList (flag) {
|
||||
if (sessionStorage.getItem('isLeft') === 0) {
|
||||
this.isLeft = false
|
||||
|
@ -121,16 +121,13 @@
|
||||
this.item = item
|
||||
this.createWarningDialog = true
|
||||
},
|
||||
|
||||
onUpdate () {
|
||||
this._debounceGET('false')
|
||||
this.createWarningDialog = false
|
||||
},
|
||||
|
||||
close () {
|
||||
this.createWarningDialog = false
|
||||
},
|
||||
|
||||
_getList (flag) {
|
||||
if (sessionStorage.getItem('isLeft') === 0) {
|
||||
this.isLeft = false
|
||||
|
@ -15,11 +15,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
<template>
|
||||
<m-popup
|
||||
ref="popup"
|
||||
<m-popover
|
||||
ref="popover"
|
||||
:ok-text="item ? $t('Edit') : $t('Submit')"
|
||||
:nameText="item ? $t('Edit worker group') : $t('Create worker group')"
|
||||
@ok="_ok">
|
||||
@ok="_ok"
|
||||
@close="close">
|
||||
<template slot="content">
|
||||
<div class="create-worker-model">
|
||||
<m-list-box-f>
|
||||
@ -35,28 +35,28 @@
|
||||
</template>
|
||||
</m-list-box-f>
|
||||
<m-list-box-f>
|
||||
<template slot="name"><strong>*</strong>IP</template>
|
||||
<template slot="name"><strong>*</strong>{{$t('Worker Addresses')}}</template>
|
||||
<template slot="content">
|
||||
<el-input
|
||||
:autosize="{ minRows: 4, maxRows: 6 }"
|
||||
type="textarea"
|
||||
size="mini"
|
||||
v-model="ipList"
|
||||
:placeholder="$t('Please enter the IP address separated by commas')">
|
||||
v-model.trim="addrList"
|
||||
:placeholder="$t('Please enter the worker addresses separated by commas')">
|
||||
</el-input>
|
||||
<div class="ipt-tip">
|
||||
<span>{{$t('Note: Multiple IP addresses have been comma separated')}}</span>
|
||||
<div class="cwm-tip">
|
||||
<span>{{$t('Note: Multiple worker addresses have been comma separated')}}</span>
|
||||
</div>
|
||||
</template>
|
||||
</m-list-box-f>
|
||||
</div>
|
||||
</template>
|
||||
</m-popup>
|
||||
</m-popover>
|
||||
</template>
|
||||
<script>
|
||||
import i18n from '@/module/i18n'
|
||||
import store from '@/conf/home/store'
|
||||
import mPopup from '@/module/components/popup/popup'
|
||||
import mPopover from '@/module/components/popup/popover'
|
||||
import mListBoxF from '@/module/components/listBoxF/listBoxF'
|
||||
|
||||
export default {
|
||||
@ -66,7 +66,7 @@
|
||||
store,
|
||||
id: 0,
|
||||
name: '',
|
||||
ipList: ''
|
||||
addrList: ''
|
||||
}
|
||||
},
|
||||
props: {
|
||||
@ -79,15 +79,13 @@
|
||||
this._submit()
|
||||
}
|
||||
},
|
||||
checkIsIps (ips) {
|
||||
let reg = /^(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])$/
|
||||
let valdata = ips.split(',')
|
||||
for (let i = 0; i < valdata.length; i++) {
|
||||
if (reg.test(valdata[i]) === false) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
checkIpAndPorts (addrs) {
|
||||
let reg = /^(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5]):\d{1,5}$/
|
||||
return addrs.split(',').every(item => reg.test(item))
|
||||
},
|
||||
checkFqdnAndPorts (addrs) {
|
||||
let reg = /^([\w-]+\.)*[\w-]+:\d{1,5}$/i
|
||||
return addrs.split(',').every(item => reg.test(item))
|
||||
},
|
||||
_verification () {
|
||||
// group name
|
||||
@ -95,12 +93,12 @@
|
||||
this.$message.warning(`${i18n.$t('Please enter group name')}`)
|
||||
return false
|
||||
}
|
||||
if (!this.ipList) {
|
||||
this.$message.warning(`${i18n.$t('IP address cannot be empty')}`)
|
||||
if (!this.addrList) {
|
||||
this.$message.warning(`${i18n.$t('Worker addresses cannot be empty')}`)
|
||||
return false
|
||||
}
|
||||
if (!this.checkIsIps(this.ipList)) {
|
||||
this.$message.warning(`${i18n.$t('Please enter the correct IP')}`)
|
||||
if (!this.checkIpAndPorts(this.addrList) && !this.checkFqdnAndPorts(this.addrList)) {
|
||||
this.$message.warning(`${i18n.$t('Please enter the correct worker addresses')}`)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@ -109,22 +107,23 @@
|
||||
let param = {
|
||||
id: this.id,
|
||||
name: this.name,
|
||||
ipList: this.ipList
|
||||
addrList: this.addrList
|
||||
}
|
||||
if (this.item) {
|
||||
param.id = this.item.id
|
||||
}
|
||||
this.$refs.popup.spinnerLoading = true
|
||||
this.$refs.popover.spinnerLoading = true
|
||||
this.store.dispatch('security/saveWorkerGroups', param).then(res => {
|
||||
this.$refs.popover.spinnerLoading = false
|
||||
this.$emit('onUpdate')
|
||||
this.$message.success(res.msg)
|
||||
setTimeout(() => {
|
||||
this.$refs.popup.spinnerLoading = false
|
||||
}, 800)
|
||||
}).catch(e => {
|
||||
this.$message.error(e.msg || '')
|
||||
this.$refs.popup.spinnerLoading = false
|
||||
this.$refs.popover.spinnerLoading = false
|
||||
})
|
||||
},
|
||||
close () {
|
||||
this.$emit('close')
|
||||
}
|
||||
},
|
||||
watch: {},
|
||||
@ -132,17 +131,17 @@
|
||||
if (this.item) {
|
||||
this.id = this.item.id
|
||||
this.name = this.item.name
|
||||
this.ipList = this.item.ipList
|
||||
this.addrList = this.item.addrList
|
||||
}
|
||||
},
|
||||
mounted () {
|
||||
},
|
||||
components: { mPopup, mListBoxF }
|
||||
components: { mPopover, mListBoxF }
|
||||
}
|
||||
</script>
|
||||
<style lang="scss" rel="stylesheet/scss">
|
||||
.create-worker-model {
|
||||
.ipt-tip {
|
||||
.cwm-tip {
|
||||
color: #999;
|
||||
padding-top: 4px;
|
||||
display: block;
|
||||
|
@ -20,9 +20,9 @@
|
||||
<el-table :data="list" size="mini" style="width: 100%">
|
||||
<el-table-column type="index" :label="$t('#')" width="50"></el-table-column>
|
||||
<el-table-column prop="name" :label="$t('Group')"></el-table-column>
|
||||
<el-table-column label="IPList" min-width="300">
|
||||
<el-table-column :label="$t('Addresses')" min-width="300">
|
||||
<template slot-scope="scope">
|
||||
<span>{{scope.row.ipList.join(',')}}</span>
|
||||
<span style="display: inline-block; margin-right: 10px">{{scope.row.addrList}}</span>
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('Create Time')" min-width="120">
|
||||
@ -35,6 +35,25 @@
|
||||
<span>{{scope.row.updateTime | formatDate}}</span>
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('Operation')" width="100">
|
||||
<template slot-scope="scope">
|
||||
<el-tooltip :content="$t('Edit')" placement="top" v-if="!scope.row.systemDefault">
|
||||
<el-button type="primary" size="mini" icon="el-icon-edit-outline" @click="_edit(scope.row)" circle></el-button>
|
||||
</el-tooltip>
|
||||
<el-tooltip :content="$t('Delete')" placement="top" v-if="!scope.row.systemDefault">
|
||||
<el-popconfirm
|
||||
:confirmButtonText="$t('Confirm')"
|
||||
:cancelButtonText="$t('Cancel')"
|
||||
icon="el-icon-info"
|
||||
iconColor="red"
|
||||
:title="$t('Delete?')"
|
||||
@onConfirm="_delete(scope.row,scope.row.id)"
|
||||
>
|
||||
<el-button type="danger" size="mini" icon="el-icon-delete" circle slot="reference"></el-button>
|
||||
</el-popconfirm>
|
||||
</el-tooltip>
|
||||
</template>
|
||||
</el-table-column>
|
||||
</el-table>
|
||||
</div>
|
||||
</div>
|
||||
|
@ -17,7 +17,18 @@
|
||||
<template>
|
||||
<m-list-construction :title="$t('Worker group manage')">
|
||||
<template slot="conditions">
|
||||
<m-conditions @on-conditions="_onConditions"></m-conditions>
|
||||
<m-conditions @on-conditions="_onConditions">
|
||||
<template slot="button-group" v-if="isADMIN">
|
||||
<el-button size="mini" @click="_create('')">{{$t('Create worker group')}}</el-button>
|
||||
<el-dialog
|
||||
:title="item ? $t('Edit worker group') : $t('Create worker group')"
|
||||
v-if="createWorkerGroupDialog"
|
||||
:visible.sync="createWorkerGroupDialog"
|
||||
width="auto">
|
||||
<m-create-worker :item="item" @onUpdate="onUpdate" @close="close"></m-create-worker>
|
||||
</el-dialog>
|
||||
</template>
|
||||
</m-conditions>
|
||||
</template>
|
||||
<template slot="content">
|
||||
<template v-if="workerGroupList.length || total>0">
|
||||
@ -57,6 +68,7 @@
|
||||
import listUrlParamHandle from '@/module/mixin/listUrlParamHandle'
|
||||
import mConditions from '@/module/components/conditions/conditions'
|
||||
import mListConstruction from '@/module/components/listConstruction/listConstruction'
|
||||
import mCreateWorker from './_source/createWorker'
|
||||
|
||||
export default {
|
||||
name: 'worker-groups-index',
|
||||
@ -70,7 +82,9 @@
|
||||
pageNo: 1,
|
||||
searchVal: ''
|
||||
},
|
||||
isADMIN: store.state.user.userInfo.userType === 'ADMIN_USER'
|
||||
isADMIN: store.state.user.userInfo.userType === 'ADMIN_USER',
|
||||
createWorkerGroupDialog: false,
|
||||
item: {}
|
||||
}
|
||||
},
|
||||
mixins: [listUrlParamHandle],
|
||||
@ -96,6 +110,17 @@
|
||||
_onEdit (item) {
|
||||
this._create(item)
|
||||
},
|
||||
_create (item) {
|
||||
this.createWorkerGroupDialog = true
|
||||
this.item = item
|
||||
},
|
||||
onUpdate () {
|
||||
this._debounceGET('false')
|
||||
this.createWorkerGroupDialog = false
|
||||
},
|
||||
close () {
|
||||
this.createWorkerGroupDialog = false
|
||||
},
|
||||
_getList (flag) {
|
||||
this.isLoading = !flag
|
||||
this.getWorkerGroups(this.searchParams).then(res => {
|
||||
@ -122,6 +147,6 @@
|
||||
created () {},
|
||||
mounted () {
|
||||
},
|
||||
components: { mList, mListConstruction, mConditions, mSpin, mNoData }
|
||||
components: { mList, mListConstruction, mConditions, mSpin, mNoData, mCreateWorker }
|
||||
}
|
||||
</script>
|
||||
|
@ -624,7 +624,7 @@ export default {
|
||||
},
|
||||
deleteWorkerGroups ({ state }, payload) {
|
||||
return new Promise((resolve, reject) => {
|
||||
io.get('worker-group/delete-by-id', payload, res => {
|
||||
io.post('worker-group/delete-by-id', payload, res => {
|
||||
resolve(res)
|
||||
}).catch(e => {
|
||||
reject(e)
|
||||
|
@ -131,8 +131,8 @@ export default {
|
||||
'Please enter app name(optional)': 'Please enter app name(optional)',
|
||||
'SQL Type': 'SQL Type',
|
||||
'Send Email': 'Send Email',
|
||||
'Display query result': 'Display query result',
|
||||
Rows: 'Rows',
|
||||
'Log display': 'Log display',
|
||||
'rows of result': 'rows of result',
|
||||
Title: 'Title',
|
||||
'Please enter the title of email': 'Please enter the title of email',
|
||||
Table: 'Table',
|
||||
@ -371,7 +371,7 @@ export default {
|
||||
'Drag the file into the current upload window': 'Drag the file into the current upload window',
|
||||
'Drag area upload': 'Drag area upload',
|
||||
Upload: 'Upload',
|
||||
'ReUpload File': 'Re-upload file',
|
||||
'ReUpload File': 'ReUpload File',
|
||||
'Please enter file name': 'Please enter file name',
|
||||
'Please select the file to upload': 'Please select the file to upload',
|
||||
'Resources manage': 'Resources',
|
||||
@ -502,8 +502,10 @@ export default {
|
||||
'Token manage': 'Token manage',
|
||||
'Create token': 'Create token',
|
||||
'Edit token': 'Edit token',
|
||||
'Please enter the IP address separated by commas': 'Please enter the IP address separated by commas',
|
||||
'Note: Multiple IP addresses have been comma separated': 'Note: Multiple IP addresses have been comma separated',
|
||||
Addresses: 'Addresses',
|
||||
'Worker Addresses': 'Worker Addresses',
|
||||
'Please enter the worker addresses separated by commas': 'Please enter the worker addresses separated by commas',
|
||||
'Note: Multiple worker addresses have been comma separated': 'Note: Multiple worker addresses have been comma separated',
|
||||
'Failure time': 'Failure time',
|
||||
'Expiration time': 'Expiration time',
|
||||
User: 'User',
|
||||
@ -568,8 +570,8 @@ export default {
|
||||
'Please Enter Http Url': 'Please Enter Http Url(required)',
|
||||
'Please Enter Http Condition': 'Please Enter Http Condition',
|
||||
'There is no data for this period of time': 'There is no data for this period of time',
|
||||
'IP address cannot be empty': 'IP address cannot be empty',
|
||||
'Please enter the correct IP': 'Please enter the correct IP',
|
||||
'Worker addresses cannot be empty': 'Worker addresses cannot be empty',
|
||||
'Please enter the correct worker addresses': 'Please enter the correct worker addresses',
|
||||
'Please generate token': 'Please generate token',
|
||||
'Spark Version': 'Spark Version',
|
||||
TargetDataBase: 'target database',
|
||||
|
@ -131,8 +131,8 @@ export default {
|
||||
'Please enter app name(optional)': '请输入任务名称(选填)',
|
||||
'SQL Type': 'sql类型',
|
||||
'Send Email': '发送邮件',
|
||||
'Display query result': '展示查询结果',
|
||||
Rows: '行',
|
||||
'Log display': '日志显示',
|
||||
'rows of result': '行查询结果',
|
||||
Title: '主题',
|
||||
'Please enter the title of email': '请输入邮件主题',
|
||||
Table: '表名',
|
||||
@ -502,8 +502,10 @@ export default {
|
||||
'Token manage': '令牌管理',
|
||||
'Create token': '创建令牌',
|
||||
'Edit token': '编辑令牌',
|
||||
'Please enter the IP address separated by commas': '请输入IP地址多个用英文逗号隔开',
|
||||
'Note: Multiple IP addresses have been comma separated': '注意:多个IP地址以英文逗号分割',
|
||||
Addresses: '地址',
|
||||
'Worker Addresses': 'Worker地址',
|
||||
'Please enter the worker addresses separated by commas': '请输入Worker地址,多个用英文逗号隔开',
|
||||
'Note: Multiple worker addresses have been comma separated': '注意:多个Worker地址以英文逗号分割',
|
||||
'Failure time': '失效时间',
|
||||
'Expiration time': '失效时间',
|
||||
User: '用户',
|
||||
@ -568,8 +570,8 @@ export default {
|
||||
'Please Enter Http Url': '请填写请求地址(必填)',
|
||||
'Please Enter Http Condition': '请填写校验内容',
|
||||
'There is no data for this period of time': '该时间段无数据',
|
||||
'IP address cannot be empty': 'IP地址不能为空',
|
||||
'Please enter the correct IP': '请输入正确的IP',
|
||||
'Worker addresses cannot be empty': 'Worker地址不能为空',
|
||||
'Please enter the correct worker addresses': '请输入正确的Worker地址',
|
||||
'Please generate token': '请生成Token',
|
||||
'Spark Version': 'Spark版本',
|
||||
TargetDataBase: '目标库',
|
||||
|
@ -789,6 +789,22 @@ CREATE TABLE `t_ds_user` (
|
||||
-- Records of t_ds_user
|
||||
-- ----------------------------
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for t_ds_worker_group
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `t_ds_worker_group`;
|
||||
CREATE TABLE `t_ds_worker_group` (
|
||||
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`name` varchar(256) NULL DEFAULT NULL COMMENT 'worker group name',
|
||||
`addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]',
|
||||
`create_time` datetime NULL DEFAULT NULL COMMENT 'create time',
|
||||
`update_time` datetime NULL DEFAULT NULL COMMENT 'update time',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
|
||||
|
||||
-- ----------------------------
|
||||
-- Records of t_ds_worker_group
|
||||
-- ----------------------------
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for t_ds_version
|
||||
|
@ -659,7 +659,7 @@ DROP TABLE IF EXISTS t_ds_worker_group;
|
||||
CREATE TABLE t_ds_worker_group (
|
||||
id bigint NOT NULL ,
|
||||
name varchar(256) DEFAULT NULL ,
|
||||
ip_list varchar(256) DEFAULT NULL ,
|
||||
addr_list text DEFAULT NULL ,
|
||||
create_time timestamp DEFAULT NULL ,
|
||||
update_time timestamp DEFAULT NULL ,
|
||||
PRIMARY KEY (id)
|
||||
|
38
sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_ddl.sql
Normal file
38
sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_ddl.sql
Normal file
@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
|
||||
|
||||
-- uc_dolphin_T_t_ds_worker_group_R_ip_list
|
||||
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_worker_group_R_ip_list;
|
||||
delimiter d//
|
||||
CREATE PROCEDURE uc_dolphin_T_t_ds_worker_group_R_ip_list()
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
|
||||
WHERE TABLE_NAME='t_ds_worker_group'
|
||||
AND TABLE_SCHEMA=(SELECT DATABASE())
|
||||
AND COLUMN_NAME ='ip_list')
|
||||
THEN
|
||||
ALTER TABLE t_ds_worker_group CHANGE COLUMN `ip_list` `addr_list` text;
|
||||
END IF;
|
||||
END;
|
||||
|
||||
d//
|
||||
|
||||
delimiter ;
|
||||
CALL uc_dolphin_T_t_ds_worker_group_R_ip_list;
|
||||
DROP PROCEDURE uc_dolphin_T_t_ds_worker_group_R_ip_list;
|
16
sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_dml.sql
Normal file
16
sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_dml.sql
Normal file
@ -0,0 +1,16 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
@ -14,5 +14,27 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
-- uc_dolphin_T_t_ds_worker_group_A_ip_list
|
||||
delimiter d//
|
||||
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_worker_group_A_ip_list() RETURNS void AS $$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
|
||||
WHERE TABLE_NAME='t_ds_worker_group'
|
||||
AND COLUMN_NAME ='ip_list')
|
||||
THEN
|
||||
ALTER TABLE t_ds_worker_group rename ip_list TO addr_list;
|
||||
ALTER TABLE t_ds_worker_group ALTER column addr_list type text;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
d//
|
||||
|
||||
delimiter ;
|
||||
SELECT uc_dolphin_T_t_ds_worker_group_A_ip_list();
|
||||
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_worker_group_A_ip_list();
|
||||
|
||||
-- Add foreign key constraints for t_ds_task_instance --
|
||||
ALTER TABLE t_ds_task_instance ADD CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) REFERENCES t_ds_process_instance(id) ON DELETE CASCADE;
|
||||
ALTER TABLE t_ds_task_instance ADD CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) REFERENCES t_ds_process_instance(id) ON DELETE CASCADE;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user