[Feature][JsonSplit-api] merging from dev to json_split_two (#5712)

* [BUG-#5678][Registry]fix registry init node miss (#5686)

* [Improvement][UI] Update the update time after the user information is successfully modified (#5684)

* improve

edit the userinfo success, but the updatetime is not the latest.

* Improved shell task execution result log information, adding process.waitFor() and process.exitValue() information to the original log (#5691)

Co-authored-by: shenglm <shenglm840722@126.com>

* [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603)

* add globalParams new plan with varPool

* add unit test

* add python task varPoolParams


Co-authored-by: wangxj <wangxj31>

* Issue robot translation judgment changed to Chinese (#5694)



Co-authored-by: chenxingchun <438044805@qq.com>

* the update function should use post instead of get (#5703)

* enhance form verify (#5696)

* checkState only supports %s not {} (#5711)

* [Fix-5701]When deleting a user, the accessToken associated with the user should also be deleted (#5697)

* update

* fix the codestyle error

* fix the compile error

* support rollback

Co-authored-by: Kirs <acm_master@163.com>
Co-authored-by: kyoty <echohlne@gmail.com>
Co-authored-by: ji04xiaogang <ji04xiaogang@163.com>
Co-authored-by: shenglm <shenglm840722@126.com>
Co-authored-by: wangxj3 <857234426@qq.com>
Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com>
Co-authored-by: chenxingchun <438044805@qq.com>
Co-authored-by: JinyLeeChina <297062848@qq.com>
This commit is contained in:
JinyLeeChina 2021-06-29 22:55:01 +08:00 committed by GitHub
parent 5b6c9b7d43
commit 57414c4df7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 732 additions and 454 deletions

@ -1 +0,0 @@
Subproject commit 959b66feb4231b08e8251422ac6d469cdc03d140

3
.gitmodules vendored
View File

@ -21,6 +21,3 @@
[submodule ".github/actions/lable-on-issue"] [submodule ".github/actions/lable-on-issue"]
path = .github/actions/lable-on-issue path = .github/actions/lable-on-issue
url = https://github.com/xingchun-chen/labeler url = https://github.com/xingchun-chen/labeler
[submodule ".github/actions/translate-on-issue"]
path = .github/actions/translate-on-issue
url = https://github.com/xingchun-chen/translation-helper.git

View File

@ -76,7 +76,7 @@ public class AlertPluginManager extends AbstractDolphinPluginManager {
requireNonNull(name, "name is null"); requireNonNull(name, "name is null");
AlertChannelFactory alertChannelFactory = alertChannelFactoryMap.get(name); AlertChannelFactory alertChannelFactory = alertChannelFactoryMap.get(name);
checkState(alertChannelFactory != null, "Alert Plugin {} is not registered", name); checkState(alertChannelFactory != null, "Alert Plugin %s is not registered", name);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(alertChannelFactory.getClass().getClassLoader())) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(alertChannelFactory.getClass().getClassLoader())) {
AlertChannel alertChannel = alertChannelFactory.create(); AlertChannel alertChannel = alertChannelFactory.create();

View File

@ -108,7 +108,7 @@ public class AlertPluginInstanceController extends BaseController {
@ApiImplicitParam(name = "instanceName", value = "ALERT_PLUGIN_INSTANCE_NAME", required = true, dataType = "String", example = "DING TALK"), @ApiImplicitParam(name = "instanceName", value = "ALERT_PLUGIN_INSTANCE_NAME", required = true, dataType = "String", example = "DING TALK"),
@ApiImplicitParam(name = "pluginInstanceParams", value = "ALERT_PLUGIN_INSTANCE_PARAMS", required = true, dataType = "String", example = "ALERT_PLUGIN_INSTANCE_PARAMS") @ApiImplicitParam(name = "pluginInstanceParams", value = "ALERT_PLUGIN_INSTANCE_PARAMS", required = true, dataType = "String", example = "ALERT_PLUGIN_INSTANCE_PARAMS")
}) })
@GetMapping(value = "/update") @PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_ALERT_PLUGIN_INSTANCE_ERROR) @ApiException(UPDATE_ALERT_PLUGIN_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")

View File

@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UDFUser; import org.apache.dolphinscheduler.dao.entity.UDFUser;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper; import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@ -83,6 +84,9 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
private static final Logger logger = LoggerFactory.getLogger(UsersServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(UsersServiceImpl.class);
@Autowired
private AccessTokenMapper accessTokenMapper;
@Autowired @Autowired
private UserMapper userMapper; private UserMapper userMapper;
@ -482,6 +486,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
* @throws Exception exception when operate hdfs * @throws Exception exception when operate hdfs
*/ */
@Override @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteUserById(User loginUser, int id) throws IOException { public Map<String, Object> deleteUserById(User loginUser, int id) throws IOException {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
@ -514,6 +519,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
} }
} }
accessTokenMapper.deleteAccessTokenByUserId(id);
userMapper.deleteById(id); userMapper.deleteById(id);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

View File

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper; import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@ -79,6 +80,9 @@ public class UsersServiceTest {
@Mock @Mock
private UserMapper userMapper; private UserMapper userMapper;
@Mock
private AccessTokenMapper accessTokenMapper;
@Mock @Mock
private TenantMapper tenantMapper; private TenantMapper tenantMapper;
@ -221,7 +225,6 @@ public class UsersServiceTest {
Assert.assertEquals(user.getId(), userExistId); Assert.assertEquals(user.getId(), userExistId);
} }
@Test @Test
public void testQueryUserList() { public void testQueryUserList() {
User user = new User(); User user = new User();
@ -265,13 +268,13 @@ public class UsersServiceTest {
String userPassword = "userTest0001"; String userPassword = "userTest0001";
try { try {
//user not exist //user not exist
Map<String, Object> result = usersService.updateUser(getLoginUser(), 0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1); Map<String, Object> result = usersService.updateUser(getLoginUser(), 0, userName, userPassword, "3443@qq.com", 1, "13457864543", "queue", 1);
Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS)); Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS));
logger.info(result.toString()); logger.info(result.toString());
//success //success
when(userMapper.selectById(1)).thenReturn(getUser()); when(userMapper.selectById(1)).thenReturn(getUser());
result = usersService.updateUser(getLoginUser(), 1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1); result = usersService.updateUser(getLoginUser(), 1, userName, userPassword, "32222s@qq.com", 1, "13457864543", "queue", 1);
logger.info(result.toString()); logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} catch (Exception e) { } catch (Exception e) {
@ -286,7 +289,7 @@ public class UsersServiceTest {
try { try {
when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser()); when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getUser()); when(userMapper.selectById(1)).thenReturn(getUser());
when(accessTokenMapper.deleteAccessTokenByUserId(1)).thenReturn(0);
//no operate //no operate
Map<String, Object> result = usersService.deleteUserById(loginUser, 3); Map<String, Object> result = usersService.deleteUserById(loginUser, 3);
logger.info(result.toString()); logger.info(result.toString());
@ -356,7 +359,6 @@ public class UsersServiceTest {
} }
@Test @Test
public void testGrantUDFFunction() { public void testGrantUDFFunction() {
String udfIds = "100000,120000"; String udfIds = "100000,120000";
@ -398,7 +400,7 @@ public class UsersServiceTest {
} }
private User getLoginUser(){ private User getLoginUser() {
User loginUser = new User(); User loginUser = new User();
loginUser.setId(1); loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER); loginUser.setUserType(UserType.ADMIN_USER);
@ -431,7 +433,6 @@ public class UsersServiceTest {
Assert.assertEquals("userTest0001", tempUser.getUserName()); Assert.assertEquals("userTest0001", tempUser.getUserName());
} }
@Test @Test
public void testQueryAllGeneralUsers() { public void testQueryAllGeneralUsers() {
User loginUser = new User(); User loginUser = new User();
@ -478,7 +479,6 @@ public class UsersServiceTest {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} }
@Test @Test
public void testAuthorizedUser() { public void testAuthorizedUser() {
User loginUser = new User(); User loginUser = new User();
@ -535,7 +535,6 @@ public class UsersServiceTest {
} }
} }
@Test @Test
public void testActivateUser() { public void testActivateUser() {
User user = new User(); User user = new User();
@ -618,7 +617,6 @@ public class UsersServiceTest {
return user; return user;
} }
/** /**
* get user * get user
*/ */

View File

@ -30,6 +30,7 @@ public enum DataType {
* 6 time, "HH:MM:SS" * 6 time, "HH:MM:SS"
* 7 time stamp * 7 time stamp
* 8 Boolean * 8 Boolean
* 9 list <String>
*/ */
VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN,LIST
} }

View File

@ -16,55 +16,163 @@
*/ */
package org.apache.dolphinscheduler.common.task; package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
/** /**
* job params related class * job params related class
*/ */
public abstract class AbstractParameters implements IParameters { public abstract class AbstractParameters implements IParameters {
@Override @Override
public abstract boolean checkParameters(); public abstract boolean checkParameters();
@Override @Override
public abstract List<ResourceInfo> getResourceFilesList(); public abstract List<ResourceInfo> getResourceFilesList();
/** /**
* local parameters * local parameters
*/ */
public List<Property> localParams; public List<Property> localParams;
/** /**
* get local parameters list * var pool
* @return Property list */
*/ public List<Property> varPool;
public List<Property> getLocalParams() {
return localParams;
}
public void setLocalParams(List<Property> localParams) { /**
this.localParams = localParams; * get local parameters list
} *
* @return Property list
*/
public List<Property> getLocalParams() {
return localParams;
}
/** public void setLocalParams(List<Property> localParams) {
* get local parameters map this.localParams = localParams;
* @return parameters map }
*/
public Map<String,Property> getLocalParametersMap() {
if (localParams != null) {
Map<String,Property> localParametersMaps = new LinkedHashMap<>();
for (Property property : localParams) { /**
localParametersMaps.put(property.getProp(),property); * get local parameters map
*
* @return parameters map
*/
public Map<String, Property> getLocalParametersMap() {
if (localParams != null) {
Map<String, Property> localParametersMaps = new LinkedHashMap<>();
for (Property property : localParams) {
localParametersMaps.put(property.getProp(), property);
}
return localParametersMaps;
} }
return localParametersMaps; return null;
} }
return null;
} /**
* get varPool map
*
* @return parameters map
*/
public Map<String, Property> getVarPoolMap() {
if (varPool != null) {
Map<String, Property> varPoolMap = new LinkedHashMap<>();
for (Property property : varPool) {
varPoolMap.put(property.getProp(), property);
}
return varPoolMap;
}
return null;
}
public List<Property> getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
if (StringUtils.isEmpty(varPool)) {
this.varPool = new ArrayList<>();
} else {
this.varPool = JSONUtils.toList(varPool, Property.class);
}
}
public void dealOutParam(String result) {
if (CollectionUtils.isEmpty(localParams)) {
return;
}
List<Property> outProperty = getOutProperty(localParams);
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
if (StringUtils.isEmpty(result)) {
varPool.addAll(outProperty);
return;
}
Map<String, String> taskResult = getMapByString(result);
if (taskResult == null || taskResult.size() == 0) {
return;
}
for (Property info : outProperty) {
info.setValue(taskResult.get(info.getProp()));
varPool.add(info);
}
}
public List<Property> getOutProperty(List<Property> params) {
if (CollectionUtils.isEmpty(params)) {
return new ArrayList<>();
}
List<Property> result = new ArrayList<>();
for (Property info : params) {
if (info.getDirect() == Direct.OUT) {
result.add(info);
}
}
return result;
}
public List<Map<String, String>> getListMapByString(String json) {
List<Map<String, String>> allParams = new ArrayList<>();
ArrayNode paramsByJson = JSONUtils.parseArray(json);
Iterator<JsonNode> listIterator = paramsByJson.iterator();
while (listIterator.hasNext()) {
Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
allParams.add(param);
}
return allParams;
}
/**
* shell's result format is key=value$VarPool$key=value$VarPool$
* @param result
* @return
*/
public static Map<String, String> getMapByString(String result) {
String[] formatResult = result.split("\\$VarPool\\$");
Map<String, String> format = new HashMap<>();
for (String info : formatResult) {
if (StringUtils.isNotEmpty(info) && info.contains("=")) {
String[] keyValue = info.split("=");
format.put(keyValue[0], keyValue[1]);
}
}
return format;
}
} }

View File

@ -14,52 +14,52 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.task.shell;
package org.apache.dolphinscheduler.common.task.shell;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* shell parameters * shell parameters
*/ */
public class ShellParameters extends AbstractParameters { public class ShellParameters extends AbstractParameters {
/** /**
* shell script * shell script
*/ */
private String rawScript; private String rawScript;
/** /**
* resource list * resource list
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
public String getRawScript() { public String getRawScript() {
return rawScript; return rawScript;
} }
public void setRawScript(String rawScript) { public void setRawScript(String rawScript) {
this.rawScript = rawScript; this.rawScript = rawScript;
} }
public List<ResourceInfo> getResourceList() { public List<ResourceInfo> getResourceList() {
return resourceList; return resourceList;
} }
public void setResourceList(List<ResourceInfo> resourceList) { public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList; this.resourceList = resourceList;
} }
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return rawScript != null && !rawScript.isEmpty(); return rawScript != null && !rawScript.isEmpty();
} }
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
} }

View File

@ -17,12 +17,19 @@
package org.apache.dolphinscheduler.common.task.sql; package org.apache.dolphinscheduler.common.task.sql;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
@ -94,6 +101,16 @@ public class SqlParameters extends AbstractParameters {
*/ */
private String title; private String title;
private int limit;
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
public String getType() { public String getType() {
return type; return type;
} }
@ -208,6 +225,53 @@ public class SqlParameters extends AbstractParameters {
return new ArrayList<>(); return new ArrayList<>();
} }
@Override
public void dealOutParam(String result) {
if (CollectionUtils.isEmpty(localParams)) {
return;
}
List<Property> outProperty = getOutProperty(localParams);
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
if (StringUtils.isEmpty(result)) {
varPool.addAll(outProperty);
return;
}
List<Map<String, String>> sqlResult = getListMapByString(result);
if (CollectionUtils.isEmpty(sqlResult)) {
return;
}
//if sql return more than one line
if (sqlResult.size() > 1) {
Map<String, List<String>> sqlResultFormat = new HashMap<>();
//init sqlResultFormat
Set<String> keySet = sqlResult.get(0).keySet();
for (String key : keySet) {
sqlResultFormat.put(key, new ArrayList<>());
}
for (Map<String, String> info : sqlResult) {
for (String key : info.keySet()) {
sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
}
}
for (Property info : outProperty) {
if (info.getType() == DataType.LIST) {
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
varPool.add(info);
}
}
} else {
//result only one line
Map<String, String> firstRow = sqlResult.get(0);
for (Property info : outProperty) {
info.setValue(String.valueOf(firstRow.get(info.getProp())));
varPool.add(info);
}
}
}
@Override @Override
public String toString() { public String toString() {
return "SqlParameters{" return "SqlParameters{"
@ -217,6 +281,7 @@ public class SqlParameters extends AbstractParameters {
+ ", sqlType=" + sqlType + ", sqlType=" + sqlType
+ ", sendEmail=" + sendEmail + ", sendEmail=" + sendEmail
+ ", displayRows=" + displayRows + ", displayRows=" + displayRows
+ ", limit=" + limit
+ ", udfs='" + udfs + '\'' + ", udfs='" + udfs + '\''
+ ", showType='" + showType + '\'' + ", showType='" + showType + '\''
+ ", connParams='" + connParams + '\'' + ", connParams='" + connParams + '\''

View File

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class VarPoolUtils { public class VarPoolUtils {
@ -71,7 +70,7 @@ public class VarPoolUtils {
if (kvs.length == 2) { if (kvs.length == 2) {
propToValue.put(kvs[0], kvs[1]); propToValue.put(kvs[0], kvs[1]);
} else { } else {
throw new ParseException(kv, 2); return;
} }
} }
} }

View File

@ -32,7 +32,7 @@ public class EntityTestUtils {
static { static {
OBJECT_MAP.put("java.lang.Long", 1L); OBJECT_MAP.put("java.lang.Long", 1L);
OBJECT_MAP.put("java.lang.String", "test"); OBJECT_MAP.put("java.lang.String", "[{\"direct\":\"OUT\",\"prop\":\"percentage5\",\"type\":\"VARCHAR\",\"value\":\"qwe\"}]");
OBJECT_MAP.put("java.lang.Integer", 1); OBJECT_MAP.put("java.lang.Integer", 1);
OBJECT_MAP.put("int", 1); OBJECT_MAP.put("int", 1);
OBJECT_MAP.put("long", 1L); OBJECT_MAP.put("long", 1L);

View File

@ -17,9 +17,17 @@
package org.apache.dolphinscheduler.common.task; package org.apache.dolphinscheduler.common.task;
import static org.junit.Assert.assertNotNull;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -38,6 +46,14 @@ public class SqlParametersTest {
@Test @Test
public void testSqlParameters() { public void testSqlParameters() {
List<Property> properties = new ArrayList<>();
Property property = new Property();
property.setProp("test1");
property.setDirect(Direct.OUT);
property.setType(DataType.VARCHAR);
property.setValue("test1");
properties.add(property);
SqlParameters sqlParameters = new SqlParameters(); SqlParameters sqlParameters = new SqlParameters();
Assert.assertTrue(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList())); Assert.assertTrue(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList()));
@ -63,6 +79,18 @@ public class SqlParametersTest {
Assert.assertEquals(title, sqlParameters.getTitle()); Assert.assertEquals(title, sqlParameters.getTitle());
Assert.assertEquals(groupId, sqlParameters.getGroupId()); Assert.assertEquals(groupId, sqlParameters.getGroupId());
Assert.assertTrue(sqlParameters.checkParameters()); String sqlResult = "[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]";
String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]";
sqlParameters.setLocalParams(properties);
sqlParameters.varPool = new ArrayList<>();
sqlParameters.dealOutParam(sqlResult1);
assertNotNull(sqlParameters.getVarPool().get(0));
property.setType(DataType.LIST);
properties.clear();
properties.add(property);
sqlParameters.setLocalParams(properties);
sqlParameters.dealOutParam(sqlResult);
assertNotNull(sqlParameters.getVarPool().get(0));
} }
} }

View File

@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.AccessToken; import org.apache.dolphinscheduler.dao.entity.AccessToken;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
/** /**
* accesstoken mapper interface * accesstoken mapper interface
@ -30,6 +33,7 @@ public interface AccessTokenMapper extends BaseMapper<AccessToken> {
/** /**
* access token page * access token page
*
* @param page page * @param page page
* @param userName userName * @param userName userName
* @param userId userId * @param userId userId
@ -39,4 +43,12 @@ public interface AccessTokenMapper extends BaseMapper<AccessToken> {
@Param("userName") String userName, @Param("userName") String userName,
@Param("userId") int userId @Param("userId") int userId
); );
/**
* delete by userId
*
* @param userId userId
* @return delete result
*/
int deleteAccessTokenByUserId(@Param("userId") int userId);
} }

View File

@ -31,4 +31,8 @@
</if> </if>
order by t.update_time desc order by t.update_time desc
</select> </select>
<delete id="deleteAccessTokenByUserId">
delete from t_ds_access_token
where user_id = #{userId}
</delete>
</mapper> </mapper>

View File

@ -16,12 +16,22 @@
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.AccessToken; import org.apache.dolphinscheduler.dao.entity.AccessToken;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -31,14 +41,8 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import com.baomidou.mybatisplus.core.metadata.IPage;
import java.text.SimpleDateFormat; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.*;
/** /**
* AccessToken mapper test * AccessToken mapper test
@ -57,8 +61,6 @@ public class AccessTokenMapperTest {
/** /**
* test insert * test insert
*
* @throws Exception
*/ */
@Test @Test
public void testInsert() throws Exception { public void testInsert() throws Exception {
@ -68,6 +70,27 @@ public class AccessTokenMapperTest {
assertThat(accessToken.getId(), greaterThan(0)); assertThat(accessToken.getId(), greaterThan(0));
} }
/**
* test delete AccessToken By UserId
*/
@Test
public void testDeleteAccessTokenByUserId() throws Exception {
Integer userId = 1;
int insertCount = 0;
for (int i = 0; i < 10; i++) {
try {
createAccessToken(userId);
insertCount++;
} catch (Exception e) {
e.printStackTrace();
}
}
int deleteCount = accessTokenMapper.deleteAccessTokenByUserId(userId);
Assert.assertEquals(insertCount, deleteCount);
}
/** /**
* test select by id * test select by id

View File

@ -68,10 +68,6 @@ public class TaskExecuteResponseCommand implements Serializable {
* varPool string * varPool string
*/ */
private String varPool; private String varPool;
/**
* task return result
*/
private String result;
public void setVarPool(String varPool) { public void setVarPool(String varPool) {
this.varPool = varPool; this.varPool = varPool;
@ -143,12 +139,4 @@ public class TaskExecuteResponseCommand implements Serializable {
+ ", appIds='" + appIds + '\'' + ", appIds='" + appIds + '\''
+ '}'; + '}';
} }
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
} }

View File

@ -21,8 +21,15 @@ import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UN
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.server.entity.*; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/** /**
* TaskExecutionContext builder * TaskExecutionContext builder
@ -41,7 +48,7 @@ public class TaskExecutionContextBuilder {
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return TaskExecutionContextBuilder * @return TaskExecutionContextBuilder
*/ */
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){ public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance) {
taskExecutionContext.setTaskInstanceId(taskInstance.getId()); taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName()); taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime()); taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime());
@ -52,6 +59,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setResources(taskInstance.getResources()); taskExecutionContext.setResources(taskInstance.getResources());
taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
taskExecutionContext.setVarPool(taskInstance.getVarPool());
return this; return this;
} }

View File

@ -216,6 +216,11 @@ public class TaskExecutionContext implements Serializable {
*/ */
private SqoopTaskExecutionContext sqoopTaskExecutionContext; private SqoopTaskExecutionContext sqoopTaskExecutionContext;
/**
* taskInstance varPool
*/
private String varPool;
/** /**
* procedure TaskExecutionContext * procedure TaskExecutionContext
*/ */
@ -556,4 +561,12 @@ public class TaskExecutionContext implements Serializable {
+ ", procedureTaskExecutionContext=" + procedureTaskExecutionContext + ", procedureTaskExecutionContext=" + procedureTaskExecutionContext
+ '}'; + '}';
} }
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
} }

View File

@ -80,8 +80,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getAppIds(), responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(), responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(), responseCommand.getVarPool(),
channel, channel
responseCommand.getResult()
); );
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
} }

View File

@ -92,10 +92,6 @@ public class TaskResponseEvent {
* channel * channel
*/ */
private Channel channel; private Channel channel;
/**
* task return result
*/
private String result;
public static TaskResponseEvent newAck(ExecutionStatus state, public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime, Date startTime,
@ -122,8 +118,7 @@ public class TaskResponseEvent {
String appIds, String appIds,
int taskInstanceId, int taskInstanceId,
String varPool, String varPool,
Channel channel, Channel channel) {
String result) {
TaskResponseEvent event = new TaskResponseEvent(); TaskResponseEvent event = new TaskResponseEvent();
event.setState(state); event.setState(state);
event.setEndTime(endTime); event.setEndTime(endTime);
@ -133,7 +128,6 @@ public class TaskResponseEvent {
event.setEvent(Event.RESULT); event.setEvent(Event.RESULT);
event.setVarPool(varPool); event.setVarPool(varPool);
event.setChannel(channel); event.setChannel(channel);
event.setResult(result);
return event; return event;
} }
@ -233,11 +227,4 @@ public class TaskResponseEvent {
this.channel = channel; this.channel = channel;
} }
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
} }

View File

@ -165,8 +165,7 @@ public class TaskResponseService {
taskResponseEvent.getProcessId(), taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(), taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool(), taskResponseEvent.getVarPool()
taskResponseEvent.getResult()
); );
} }
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
@ -134,18 +135,6 @@ public class MasterRegistryClient {
unRegistry(); unRegistry();
} }
/**
* init system node
*/
private void initMasterSystemNode() {
try {
registryClient.persist(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, "");
logger.info("initialize master server nodes success.");
} catch (Exception e) {
logger.error("init system node failed", e);
}
}
/** /**
* remove zookeeper node path * remove zookeeper node path
* *
@ -346,7 +335,6 @@ public class MasterRegistryClient {
* registry * registry
*/ */
public void registry() { public void registry() {
initMasterSystemNode();
String address = NetUtils.getAddr(masterConfig.getListenPort()); String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath(); localNodePath = getMasterPath();
registryClient.persistEphemeral(localNodePath, ""); registryClient.persistEphemeral(localNodePath, "");
@ -395,7 +383,7 @@ public class MasterRegistryClient {
*/ */
public String getMasterPath() { public String getMasterPath() {
String address = getLocalAddress(); String address = getLocalAddress();
return registryClient.getMasterPath() + "/" + address; return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
} }
/** /**

View File

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -131,11 +134,11 @@ public class ServerNodeManager implements InitializingBean {
/** /**
* init MasterNodeListener listener * init MasterNodeListener listener
*/ */
registryClient.subscribe(registryClient.getMasterPath(), new MasterDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
/** /**
* init WorkerNodeListener listener * init WorkerNodeListener listener
*/ */
registryClient.subscribe(registryClient.getWorkerPath(), new MasterDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new MasterDataListener());
} }
/** /**

View File

@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -47,7 +46,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
@ -60,7 +58,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -470,8 +467,6 @@ public class MasterExecThread implements Runnable {
* @return TaskInstance * @return TaskInstance
*/ */
private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
//update processInstance for update the globalParams
this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId());
TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion()); TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
if (taskInstance == null) { if (taskInstance == null) {
taskInstance = new TaskInstance(); taskInstance = new TaskInstance();
@ -503,6 +498,9 @@ public class MasterExecThread implements Runnable {
// retry task instance interval // retry task instance interval
taskInstance.setRetryInterval(taskNode.getRetryInterval()); taskInstance.setRetryInterval(taskNode.getRetryInterval());
//set task param
taskInstance.setTaskParams(taskNode.getTaskParams());
// task instance priority // task instance priority
if (taskNode.getTaskInstancePriority() == null) { if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM); taskInstance.setTaskInstancePriority(Priority.MEDIUM);
@ -518,54 +516,74 @@ public class MasterExecThread implements Runnable {
} else { } else {
taskInstance.setWorkerGroup(taskWorkerGroup); taskInstance.setWorkerGroup(taskWorkerGroup);
} }
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams()));
// delay execution time // delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime()); taskInstance.setDelayTime(taskNode.getDelayTime());
} }
//get pre task ,get all the task varPool to this task
Set<String> preTask = dag.getPreviousNodes(taskInstance.getName());
getPreVarPool(taskInstance, preTask);
return taskInstance; return taskInstance;
} }
private String globalParamToTaskParams(String params) { public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) {
String globalParams = this.processInstance.getGlobalParams(); Map<String,Property> allProperty = new HashMap<>();
if (StringUtils.isBlank(globalParams)) { Map<String,TaskInstance> allTaskInstance = new HashMap<>();
return params; if (CollectionUtils.isNotEmpty(preTask)) {
} for (String preTaskName : preTask) {
Map<String, String> globalMap = processService.getGlobalParamMap(globalParams); TaskInstance preTaskInstance = completeTaskList.get(preTaskName);
if (globalMap == null || globalMap.size() == 0) { if (preTaskInstance == null) {
return params; continue;
}
// the process global param save in localParams
Map<String, Object> result = JSONUtils.toMap(params, String.class, Object.class);
Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
String paramName = info.getProp();
if (StringUtils.isNotEmpty(paramName) && propToValue.containsKey(paramName)) {
info.setValue((String) propToValue.get(paramName));
} }
if (info.getDirect().equals(Direct.IN)) { String preVarPool = preTaskInstance.getVarPool();
String value = globalMap.get(paramName); if (StringUtils.isNotEmpty(preVarPool)) {
if (StringUtils.isNotEmpty(value)) { List<Property> properties = JSONUtils.toList(preVarPool, Property.class);
info.setValue(value); for (Property info : properties) {
setVarPoolValue(allProperty, allTaskInstance, preTaskInstance, info);
} }
} }
} }
result.put(LOCAL_PARAMS, allParam); if (allProperty.size() > 0) {
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
}
}
}
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
//for this taskInstance all the param in this part is IN.
thisProperty.setDirect(Direct.IN);
//get the pre taskInstance Property's name
String proName = thisProperty.getProp();
//if the Previous nodes have the Property of same name
if (allProperty.containsKey(proName)) {
//comparison the value of two Property
Property otherPro = allProperty.get(proName);
//if this property'value of loop is empty,use the other,whether the other's value is empty or not
if (StringUtils.isEmpty(thisProperty.getValue())) {
allProperty.put(proName, otherPro);
//if property'value of loop is not empty,and the other's value is not empty too, use the earlier value
} else if (StringUtils.isNotEmpty(otherPro.getValue())) {
TaskInstance otherTask = allTaskInstance.get(proName);
if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
allProperty.put(proName, thisProperty);
allTaskInstance.put(proName,preTaskInstance);
} else {
allProperty.put(proName, otherPro);
}
} else {
allProperty.put(proName, thisProperty);
allTaskInstance.put(proName,preTaskInstance);
}
} else {
allProperty.put(proName, thisProperty);
allTaskInstance.put(proName,preTaskInstance);
} }
return JSONUtils.toJsonString(result);
} }
private void submitPostNode(String parentNodeName) { private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList); Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>(); List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) { for (String taskNode : submitTaskNodeList) {
try {
VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool());
} catch (ParseException e) {
logger.error("parse {} exception", processInstance.getVarPool(), e);
throw new RuntimeException();
}
TaskNode taskNodeObject = dag.getNode(taskNode); TaskNode taskNodeObject = dag.getNode(taskNode);
taskInstances.add(createTaskInstance(processInstance, taskNodeObject)); taskInstances.add(createTaskInstance(processInstance, taskNodeObject));
} }

View File

@ -47,6 +47,7 @@ public class ParamUtils {
public static Map<String,Property> convert(Map<String,Property> globalParams, public static Map<String,Property> convert(Map<String,Property> globalParams,
Map<String,String> globalParamsMap, Map<String,String> globalParamsMap,
Map<String,Property> localParams, Map<String,Property> localParams,
Map<String,Property> varParams,
CommandType commandType, CommandType commandType,
Date scheduleTime) { Date scheduleTime) {
if (globalParams == null && localParams == null) { if (globalParams == null && localParams == null) {
@ -64,10 +65,15 @@ public class ParamUtils {
} }
if (globalParams != null && localParams != null) { if (globalParams != null && localParams != null) {
globalParams.putAll(localParams); localParams.putAll(globalParams);
globalParams = localParams;
} else if (globalParams == null && localParams != null) { } else if (globalParams == null && localParams != null) {
globalParams = localParams; globalParams = localParams;
} }
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator(); Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next(); Map.Entry<String, Property> en = iter.next();

View File

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.registry; package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SLASH; import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -130,7 +131,7 @@ public class WorkerRegistryClient {
public Set<String> getWorkerZkPaths() { public Set<String> getWorkerZkPaths() {
Set<String> workerPaths = Sets.newHashSet(); Set<String> workerPaths = Sets.newHashSet();
String address = getLocalAddress(); String address = getLocalAddress();
String workerZkPathPrefix = registryClient.getWorkerPath(); String workerZkPathPrefix = REGISTRY_DOLPHINSCHEDULER_WORKERS;
for (String workGroup : this.workerGroups) { for (String workGroup : this.workerGroups) {
StringJoiner workerPathJoiner = new StringJoiner(SLASH); StringJoiner workerPathJoiner = new StringJoiner(SLASH);

View File

@ -151,10 +151,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.getTaskInstanceId())); taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService); task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
// task init // task init
task.init(); task.init();
//init varPool
task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle // task handle
task.handle(); task.handle();
@ -165,8 +165,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId()); responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds()); responseCommand.setAppIds(task.getAppIds());
responseCommand.setVarPool(task.getVarPool()); responseCommand.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
responseCommand.setResult(task.getResultString());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) { } catch (Exception e) {
logger.error("task scheduler failure", e); logger.error("task scheduler failure", e);

View File

@ -88,11 +88,6 @@ public abstract class AbstractCommandExecutor {
protected boolean logOutputIsScuccess = false; protected boolean logOutputIsScuccess = false;
/**
* SHELL result string
*/
protected String taskResultString;
/** /**
* taskExecutionContext * taskExecutionContext
*/ */
@ -207,8 +202,8 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish // waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode()); taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode(), status, process.exitValue());
// if SHELL task exit // if SHELL task exit
if (status) { if (status) {
@ -224,7 +219,8 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE); result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
} }
} else { } else {
logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode()); logger.error("process has failure , exitStatusCode:{}, processExitValue:{}, ready to kill ...",
result.getExitStatusCode(), process.exitValue());
ProcessUtils.kill(taskExecutionContext); ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_FAILURE); result.setExitStatusCode(EXIT_CODE_FAILURE);
} }
@ -364,7 +360,6 @@ public abstract class AbstractCommandExecutor {
varPool.append("$VarPool$"); varPool.append("$VarPool$");
} else { } else {
logBuffer.add(line); logBuffer.add(line);
taskResultString = line;
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -592,11 +587,4 @@ public abstract class AbstractCommandExecutor {
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
public String getTaskResultString() {
return taskResultString;
}
public void setTaskResultString(String taskResultString) {
this.taskResultString = taskResultString;
}
} }

View File

@ -35,11 +35,6 @@ import org.slf4j.Logger;
*/ */
public abstract class AbstractTask { public abstract class AbstractTask {
/**
* varPool string
*/
protected String varPool;
/** /**
* taskExecutionContext * taskExecutionContext
**/ **/
@ -56,11 +51,6 @@ public abstract class AbstractTask {
*/ */
protected int processId; protected int processId;
/**
* SHELL result string
*/
protected String resultString;
/** /**
* other resource manager appId , for example : YARN etc * other resource manager appId , for example : YARN etc
*/ */
@ -81,7 +71,7 @@ public abstract class AbstractTask {
* constructor * constructor
* *
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @param logger logger * @param logger logger
*/ */
protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) { protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) {
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
@ -139,14 +129,6 @@ public abstract class AbstractTask {
} }
} }
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
/** /**
* get exit status code * get exit status code
* *
@ -176,14 +158,6 @@ public abstract class AbstractTask {
this.processId = processId; this.processId = processId;
} }
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
/** /**
* get task parameters * get task parameters
* *

View File

@ -158,6 +158,7 @@ public class DataxTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(), dataXParameters.getLocalParametersMap(),
dataXParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());

View File

@ -84,6 +84,7 @@ public class FlinkTask extends AbstractYarnTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(), flinkParameters.getLocalParametersMap(),
flinkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());

View File

@ -135,6 +135,7 @@ public class HttpTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
httpParameters.getLocalParametersMap(), httpParameters.getLocalParametersMap(),
httpParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());
List<HttpProperty> httpPropertyList = new ArrayList<>(); List<HttpProperty> httpPropertyList = new ArrayList<>();

View File

@ -88,6 +88,7 @@ public class MapReduceTask extends AbstractYarnTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
mapreduceParameters.getLocalParametersMap(), mapreduceParameters.getLocalParametersMap(),
mapreduceParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());

View File

@ -122,6 +122,7 @@ public class ProcedureTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
procedureParameters.getLocalParametersMap(), procedureParameters.getLocalParametersMap(),
procedureParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());

View File

@ -92,7 +92,7 @@ public class PythonTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
setVarPool(pythonCommandExecutor.getVarPool()); pythonParameters.dealOutParam(pythonCommandExecutor.getVarPool());
} }
catch (Exception e) { catch (Exception e) {
logger.error("python task failure", e); logger.error("python task failure", e);
@ -119,6 +119,7 @@ public class PythonTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
pythonParameters.getLocalParametersMap(), pythonParameters.getLocalParametersMap(),
pythonParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());

View File

@ -105,8 +105,7 @@ public class ShellTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
setResult(shellCommandExecutor.getTaskResultString()); shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
setVarPool(shellCommandExecutor.getVarPool());
} catch (Exception e) { } catch (Exception e) {
logger.error("shell task error", e); logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE); setExitStatusCode(Constants.EXIT_CODE_FAILURE);
@ -169,6 +168,7 @@ public class ShellTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
shellParameters.getLocalParametersMap(), shellParameters.getLocalParametersMap(),
shellParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
@ -188,17 +188,4 @@ public class ShellTask extends AbstractTask {
} }
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
} }
public void setResult(String result) {
Map<String, Property> localParams = shellParameters.getLocalParametersMap();
List<Map<String, String>> outProperties = new ArrayList<>();
Map<String, String> p = new HashMap<>();
localParams.forEach((k,v) -> {
if (v.getDirect() == Direct.OUT) {
p.put(k, result);
}
});
outProperties.add(p);
resultString = JSONUtils.toJsonString(outProperties);
}
} }

View File

@ -113,6 +113,7 @@ public class SparkTask extends AbstractYarnTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(), sparkParameters.getLocalParametersMap(),
sparkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());

View File

@ -86,12 +86,6 @@ public class SqlTask extends AbstractTask {
*/ */
private TaskExecutionContext taskExecutionContext; private TaskExecutionContext taskExecutionContext;
/**
* default query sql limit
*/
private static final int LIMIT = 10000;
private AlertClientService alertClientService; private AlertClientService alertClientService;
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) { public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) {
@ -117,14 +111,16 @@ public class SqlTask extends AbstractTask {
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
logger.info("Full sql parameters: {}", sqlParameters); logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(), sqlParameters.getType(),
sqlParameters.getDatasource(), sqlParameters.getDatasource(),
sqlParameters.getSql(), sqlParameters.getSql(),
sqlParameters.getLocalParams(), sqlParameters.getLocalParams(),
sqlParameters.getUdfs(), sqlParameters.getUdfs(),
sqlParameters.getShowType(), sqlParameters.getShowType(),
sqlParameters.getConnParams()); sqlParameters.getConnParams(),
sqlParameters.getVarPool(),
sqlParameters.getLimit());
try { try {
SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
@ -175,6 +171,7 @@ public class SqlTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), taskExecutionContext.getDefinedParams(),
sqlParameters.getLocalParametersMap(), sqlParameters.getLocalParametersMap(),
sqlParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());
@ -268,10 +265,9 @@ public class SqlTask extends AbstractTask {
String updateResult = String.valueOf(stmt.executeUpdate()); String updateResult = String.valueOf(stmt.executeUpdate());
result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams()); result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
} }
//deal out params
sqlParameters.dealOutParam(result);
postSql(connection, postStatementsBinds); postSql(connection, postStatementsBinds);
this.setResultString(result);
} catch (Exception e) { } catch (Exception e) {
logger.error("execute sql error: {}", e.getMessage()); logger.error("execute sql error: {}", e.getMessage());
throw e; throw e;
@ -280,6 +276,7 @@ public class SqlTask extends AbstractTask {
} }
} }
public String setNonQuerySqlReturn(String updateResult, List<Property> properties) { public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
String result = null; String result = null;
for (Property info :properties) { for (Property info :properties) {
@ -309,7 +306,7 @@ public class SqlTask extends AbstractTask {
int rowCount = 0; int rowCount = 0;
while (rowCount < LIMIT && resultSet.next()) { while (rowCount < sqlParameters.getLimit() && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode(); ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) { for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i))); mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
@ -326,12 +323,11 @@ public class SqlTask extends AbstractTask {
logger.info("row {} : {}", i + 1, row); logger.info("row {} : {}", i + 1, row);
} }
} }
String result = JSONUtils.toJsonString(resultJSONArray); String result = JSONUtils.toJsonString(resultJSONArray);
if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) { if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) {
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle())
? sqlParameters.getTitle() ? sqlParameters.getTitle()
: taskExecutionContext.getTaskName() + " query result sets", result); : taskExecutionContext.getTaskName() + " query result sets", result);
} }
logger.debug("execute sql result : {}", result); logger.debug("execute sql result : {}", result);
return result; return result;
@ -478,8 +474,16 @@ public class SqlTask extends AbstractTask {
String paramName = m.group(1); String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName); Property prop = paramsPropsMap.get(paramName);
sqlParamsMap.put(index, prop); if (prop == null) {
index++; logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
}
else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
} }
} }
@ -495,8 +499,13 @@ public class SqlTask extends AbstractTask {
//parameter print style //parameter print style
logger.info("after replace sql , preparing : {}", formatSql); logger.info("after replace sql , preparing : {}", formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:"); StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
for (int i = 1; i <= sqlParamsMap.size(); i++) { if (sqlParamsMap == null) {
logPrint.append(sqlParamsMap.get(i).getValue() + "(" + sqlParamsMap.get(i).getType() + ")"); logger.info("printReplacedSql: sqlParamsMap is null.");
}
else {
for (int i = 1; i <= sqlParamsMap.size(); i++) {
logPrint.append(sqlParamsMap.get(i).getValue() + "(" + sqlParamsMap.get(i).getType() + ")");
}
} }
logger.info("Sql Params are {}", logPrint); logger.info("Sql Params are {}", logPrint);
} }

View File

@ -76,6 +76,7 @@ public class SqoopTask extends AbstractYarnTask {
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
sqoopTaskExecutionContext.getDefinedParams(), sqoopTaskExecutionContext.getDefinedParams(),
sqoopParameters.getLocalParametersMap(), sqoopParameters.getLocalParametersMap(),
sqoopParameters.getVarPoolMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()), CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime()); sqoopTaskExecutionContext.getScheduleTime());

View File

@ -44,10 +44,14 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.text.ParseException; import java.text.ParseException;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -207,6 +211,50 @@ public class MasterExecThreadTest {
} }
} }
@Test
public void testGetPreVarPool() {
try {
Set<String> preTaskName = new HashSet<>();
preTaskName.add("test1");
preTaskName.add("test2");
Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
TaskInstance taskInstance = new TaskInstance();
TaskInstance taskInstance1 = new TaskInstance();
taskInstance1.setId(1);
taskInstance1.setName("test1");
taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]");
taskInstance1.setEndTime(new Date());
TaskInstance taskInstance2 = new TaskInstance();
taskInstance2.setId(2);
taskInstance2.setName("test2");
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
taskInstance2.setEndTime(new Date());
completeTaskList.put("test1", taskInstance1);
completeTaskList.put("test2", taskInstance2);
Class<MasterExecThread> masterExecThreadClass = MasterExecThread.class;
Field field = masterExecThreadClass.getDeclaredField("completeTaskList");
field.setAccessible(true);
field.set(masterExecThread, completeTaskList);
masterExecThread.getPreVarPool(taskInstance, preTaskName);
Assert.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
completeTaskList.put("test2", taskInstance2);
field.setAccessible(true);
field.set(masterExecThread, completeTaskList);
masterExecThread.getPreVarPool(taskInstance, preTaskName);
Assert.assertNotNull(taskInstance.getVarPool());
} catch (Exception e) {
Assert.fail();
}
}
private List<Schedule> zeroSchedulerList() { private List<Schedule> zeroSchedulerList() {
return Collections.EMPTY_LIST; return Collections.EMPTY_LIST;
} }

View File

@ -20,20 +20,20 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* user define param * user define param
@ -73,20 +73,19 @@ public class ParamsTest {
} }
@Test @Test
public void convertTest()throws Exception{ public void convertTest() throws Exception {
Map<String,Property> globalParams = new HashMap<>(); Map<String, Property> globalParams = new HashMap<>();
Property property = new Property(); Property property = new Property();
property.setProp("global_param"); property.setProp("global_param");
property.setDirect(Direct.IN); property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR); property.setType(DataType.VARCHAR);
property.setValue("${system.biz.date}"); property.setValue("${system.biz.date}");
globalParams.put("global_param",property); globalParams.put("global_param", property);
Map<String,String> globalParamsMap = new HashMap<>(); Map<String, String> globalParamsMap = new HashMap<>();
globalParamsMap.put("global_param","${system.biz.date}"); globalParamsMap.put("global_param", "${system.biz.date}");
Map<String, Property> localParams = new HashMap<>();
Map<String,Property> localParams = new HashMap<>();
Property localProperty = new Property(); Property localProperty = new Property();
localProperty.setProp("local_param"); localProperty.setProp("local_param");
localProperty.setDirect(Direct.IN); localProperty.setDirect(Direct.IN);
@ -94,8 +93,16 @@ public class ParamsTest {
localProperty.setValue("${global_param}"); localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty); localParams.put("local_param", localProperty);
Map<String, Property> varPoolParams = new HashMap<>();
Property varProperty = new Property();
varProperty.setProp("local_param");
varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap,
localParams, CommandType.START_PROCESS, new Date()); localParams,varPoolParams, CommandType.START_PROCESS, new Date());
logger.info(JSONUtils.toJsonString(paramsMap)); logger.info(JSONUtils.toJsonString(paramsMap));

View File

@ -70,8 +70,7 @@ public class TaskResponseServiceTest {
"ids", "ids",
22, 22,
"varPol", "varPol",
channel, channel);
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
taskInstance = new TaskInstance(); taskInstance = new TaskInstance();
taskInstance.setId(22); taskInstance.setId(22);

View File

@ -17,23 +17,24 @@
package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import org.junit.Before;
import static org.junit.Assert.assertNull; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Test ParamUtils * Test ParamUtils
@ -49,8 +50,11 @@ public class ParamUtilsTest {
public Map<String, Property> localParams = new HashMap<>(); public Map<String, Property> localParams = new HashMap<>();
public Map<String, Property> varPoolParams = new HashMap<>();
/** /**
* Init params * Init params
*
* @throws Exception * @throws Exception
*/ */
@Before @Before
@ -71,6 +75,14 @@ public class ParamUtilsTest {
localProperty.setType(DataType.VARCHAR); localProperty.setType(DataType.VARCHAR);
localProperty.setValue("${global_param}"); localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty); localParams.put("local_param", localProperty);
Property varProperty = new Property();
varProperty.setProp("local_param");
varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
} }
/** /**
@ -80,16 +92,20 @@ public class ParamUtilsTest {
public void testConvert() { public void testConvert() {
//The expected value //The expected value
String expected = "{\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//The expected value when globalParams is null but localParams is not null //The expected value when globalParams is null but localParams is not null
String expected1 = "{\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; String expected1 = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//Define expected date , the month is 0-base //Define expected date , the month is 0-base
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.set(2019,11,30); calendar.set(2019, 11, 30);
Date date = calendar.getTime(); Date date = calendar.getTime();
//Invoke convert //Invoke convert
Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, CommandType.START_PROCESS, date); Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date);
String result = JSONUtils.toJsonString(paramsMap); String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result); assertEquals(expected, result);
@ -101,12 +117,12 @@ public class ParamUtilsTest {
} }
//Invoke convert with null globalParams //Invoke convert with null globalParams
Map<String, Property> paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams, CommandType.START_PROCESS, date); Map<String, Property> paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date);
String result1 = JSONUtils.toJsonString(paramsMap1); String result1 = JSONUtils.toJsonString(paramsMap1);
assertEquals(expected1, result1); assertEquals(expected1, result1);
//Null check, invoke convert with null globalParams and null localParams //Null check, invoke convert with null globalParams and null localParams
Map<String, Property> paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, CommandType.START_PROCESS, date); Map<String, Property> paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date);
assertNull(paramsMap2); assertNull(paramsMap2);
} }

View File

@ -67,8 +67,6 @@ public class TaskCallbackServiceTest {
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
String result = responseCommand.getResult();
responseCommand.setResult("return string");
taskCallbackService.sendResult(1, responseCommand.convert2Command()); taskCallbackService.sendResult(1, responseCommand.convert2Command());
Stopper.stop(); Stopper.stop();

View File

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
@ -53,6 +54,8 @@ public class TaskKillProcessorTest {
private TaskKillProcessor taskKillProcessor; private TaskKillProcessor taskKillProcessor;
private WorkerManagerThread workerManager;
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private Channel channel; private Channel channel;
@ -85,6 +88,8 @@ public class TaskKillProcessorTest {
PowerMockito.mockStatic(LoggerUtils.class); PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService); PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig); PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
WorkerManagerThread workerManager = PowerMockito.mock(WorkerManagerThread.class);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager); PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager);
PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any()); PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any());
PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null); PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null);
@ -102,7 +107,6 @@ public class TaskKillProcessorTest {
@Test @Test
public void testProcess() { public void testProcess() {
PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext); PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext);
taskKillProcessor.process(channel, command); taskKillProcessor.process(channel, command);

View File

@ -71,8 +71,6 @@ public class WorkerRegistryClientTest {
@Before @Before
public void before() { public void before() {
given(registryClient.getWorkerPath()).willReturn("/nodes/worker");
given(workerConfig.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); given(workerConfig.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); //given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//scheduleAtFixedRate //scheduleAtFixedRate

View File

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -165,7 +166,7 @@ public class TaskExecuteThreadTest {
@Override @Override
public AbstractParameters getParameters() { public AbstractParameters getParameters() {
return null; return new SqlParameters();
} }
@Override @Override

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class AbstractCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class);
private ShellCommandExecutor shellCommandExecutor;
@Before
public void before() throws Exception {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = new ShellCommandExecutor(null);
}
@Test
public void testSetTaskResultString() {
shellCommandExecutor.setTaskResultString("shellReturn");
}
@Test
public void testGetTaskResultString() {
logger.info(shellCommandExecutor.getTaskResultString());
}
}

View File

@ -110,17 +110,6 @@ public class ShellTaskReturnTest {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
shellTask.setResult("shell return string");
logger.info("shell return string:{}", shellTask.getResultString());
} }
@Test
public void testSetTaskResultString() {
shellCommandExecutor.setTaskResultString("shellReturn");
}
@Test
public void testGetTaskResultString() {
logger.info(shellCommandExecutor.getTaskResultString());
}
} }

View File

@ -164,11 +164,6 @@ public class TaskManagerTest {
definedParams.put("time_gb", "2020-12-16 00:00:00"); definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams); taskExecutionContext.setDefinedParams(definedParams);
ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService); ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
shellTask.setResultString("shell return");
String shellReturn = shellTask.getResultString();
shellTask.init();
shellTask.setResult(shellReturn);
Assert.assertSame(shellReturn, "shell return");
} }
@Test @Test

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import static org.junit.Assert.assertNotNull;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* shell task return test.
*/
@RunWith(PowerMockRunner.class)
public class TaskParamsTest {
private static final Logger logger = LoggerFactory.getLogger(TaskParamsTest.class);
@Test
public void testDealOutParam() {
List<Property> properties = new ArrayList<>();
Property property = new Property();
property.setProp("test1");
property.setDirect(Direct.OUT);
property.setType(DataType.VARCHAR);
property.setValue("test1");
properties.add(property);
ShellParameters shellParameters = new ShellParameters();
String resultShell = "key1=value1$VarPoolkey2=value2";
shellParameters.varPool = new ArrayList<>();
shellParameters.setLocalParams(properties);
shellParameters.dealOutParam(resultShell);
assertNotNull(shellParameters.getVarPool().get(0));
String sqlResult = "[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]";
SqlParameters sqlParameters = new SqlParameters();
String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]";
sqlParameters.setLocalParams(properties);
sqlParameters.varPool = new ArrayList<>();
sqlParameters.dealOutParam(sqlResult1);
assertNotNull(sqlParameters.getVarPool().get(0));
property.setType(DataType.LIST);
properties.clear();
properties.add(property);
sqlParameters.setLocalParams(properties);
sqlParameters.dealOutParam(sqlResult);
assertNotNull(sqlParameters.getVarPool().get(0));
}
}

View File

@ -62,7 +62,6 @@ public class ShellTaskTest {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
shellCommandExecutor.setTaskResultString("shellReturn");
taskExecutionContext = new TaskExecutionContext(); taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test"); taskExecutionContext.setTaskName("kris test");
@ -85,6 +84,7 @@ public class ShellTaskTest {
taskExecutionContext.setTenantCode("roo"); taskExecutionContext.setTenantCode("roo");
taskExecutionContext.setScheduleTime(new Date()); taskExecutionContext.setScheduleTime(new Date());
taskExecutionContext.setQueue("default"); taskExecutionContext.setQueue("default");
taskExecutionContext.setVarPool("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
taskExecutionContext.setTaskParams( taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+ +
@ -105,6 +105,7 @@ public class ShellTaskTest {
public void testComplementData() throws Exception { public void testComplementData() throws Exception {
shellTask = new ShellTask(taskExecutionContext, logger); shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init(); shellTask.init();
shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>()); shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>());
shellCommandExecutor.isSuccessOfYarnState(null); shellCommandExecutor.isSuccessOfYarnState(null);
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
@ -116,16 +117,9 @@ public class ShellTaskTest {
taskExecutionContext.setCmdTypeIfComplement(0); taskExecutionContext.setCmdTypeIfComplement(0);
shellTask = new ShellTask(taskExecutionContext, logger); shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init(); shellTask.init();
shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
shellTask.handle(); shellTask.handle();
} }
@Test
public void testSetResult() {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
String r = "return";
shellTask.setResult(r);
}
} }

View File

@ -89,6 +89,7 @@ public class SqlTaskTest {
PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date()); PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
PowerMockito.when(taskExecutionContext.getVarPool()).thenReturn("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
@ -98,6 +99,7 @@ public class SqlTaskTest {
PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao()); PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao());
alertClientService = PowerMockito.mock(AlertClientService.class); alertClientService = PowerMockito.mock(AlertClientService.class);
sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService); sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
sqlTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
sqlTask.init(); sqlTask.init();
} }

View File

@ -134,7 +134,6 @@ import org.springframework.transaction.annotation.Transactional;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
@ -1585,71 +1584,51 @@ public class ProcessService {
int processId, int processId,
String appIds, String appIds,
int taskInstId, int taskInstId,
String varPool, String varPool) {
String result) {
taskInstance.setPid(processId); taskInstance.setPid(processId);
taskInstance.setAppLink(appIds); taskInstance.setAppLink(appIds);
taskInstance.setState(state); taskInstance.setState(state);
taskInstance.setEndTime(endTime); taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool); taskInstance.setVarPool(varPool);
changeOutParam(result, taskInstance); changeOutParam(taskInstance);
saveTaskInstance(taskInstance); saveTaskInstance(taskInstance);
} }
public void changeOutParam(String result, TaskInstance taskInstance) { /**
if (StringUtils.isEmpty(result)) { * for show in page of taskInstance
* @param taskInstance
*/
public void changeOutParam(TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getVarPool())) {
return; return;
} }
List<Map<String, String>> workerResultParam = getListMapByString(result); List<Property> properties = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
if (CollectionUtils.isEmpty(workerResultParam)) { if (CollectionUtils.isEmpty(properties)) {
return; return;
} }
//if the result more than one line,just get the first . //if the result more than one line,just get the first .
Map<String, String> row = workerResultParam.get(0);
if (row == null || row.size() == 0) {
return;
}
Map<String, Object> taskParams = JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class); Map<String, Object> taskParams = JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class);
Object localParams = taskParams.get(LOCAL_PARAMS); Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) { if (localParams == null) {
return; return;
} }
ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
List<Property> params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
Map<String, Property> allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> outProperty = new HashMap<>();
for (Property info : properties) {
if (info.getDirect() == Direct.OUT) {
outProperty.put(info.getProp(), info.getValue());
}
}
for (Property info : allParam) { for (Property info : allParam) {
if (info.getDirect() == Direct.OUT) { if (info.getDirect() == Direct.OUT) {
String paramName = info.getProp(); String paramName = info.getProp();
Property property = allParamMap.get(paramName); info.setValue(outProperty.get(paramName));
if (property == null) {
continue;
}
String value = String.valueOf(row.get(paramName));
if (StringUtils.isNotEmpty(value)) {
property.setValue(value);
info.setValue(value);
}
} }
} }
taskParams.put(LOCAL_PARAMS, allParam); taskParams.put(LOCAL_PARAMS, allParam);
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams)); taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
String params4ProcessString = JSONUtils.toJsonString(params4Property);
int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId());
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
} }
public List<Map<String, String>> getListMapByString(String json) {
List<Map<String, String>> allParams = new ArrayList<>();
ArrayNode paramsByJson = JSONUtils.parseArray(json);
Iterator<JsonNode> listIterator = paramsByJson.iterator();
while (listIterator.hasNext()) {
Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
allParams.add(param);
}
return allParams;
}
/** /**
* convert integer list to string list * convert integer list to string list

View File

@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.service.registry; package org.apache.dolphinscheduler.service.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@ -57,16 +59,7 @@ public class RegistryCenter {
*/ */
protected static String NODES; protected static String NODES;
/**
* master path
*/
protected static String MASTER_PATH = "/nodes/master";
private RegistryPluginManager registryPluginManager; private RegistryPluginManager registryPluginManager;
/**
* worker path
*/
protected static String WORKER_PATH = "/nodes/worker";
protected static final String EMPTY = ""; protected static final String EMPTY = "";
@ -113,8 +106,9 @@ public class RegistryCenter {
* init nodes * init nodes
*/ */
private void initNodes() { private void initNodes() {
persist(MASTER_PATH, EMPTY); persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY);
persist(WORKER_PATH, EMPTY); persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY);
persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY);
} }
/** /**
@ -205,15 +199,6 @@ public class RegistryCenter {
return stoppable; return stoppable;
} }
/**
* get master path
*
* @return master path
*/
public String getMasterPath() {
return MASTER_PATH;
}
/** /**
* whether master path * whether master path
* *
@ -221,16 +206,7 @@ public class RegistryCenter {
* @return result * @return result
*/ */
public boolean isMasterPath(String path) { public boolean isMasterPath(String path) {
return path != null && path.contains(MASTER_PATH); return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
/**
* get worker path
*
* @return worker path
*/
public String getWorkerPath() {
return WORKER_PATH;
} }
/** /**
@ -240,7 +216,7 @@ public class RegistryCenter {
* @return worker group path * @return worker group path
*/ */
public String getWorkerGroupPath(String workerGroup) { public String getWorkerGroupPath(String workerGroup) {
return WORKER_PATH + "/" + workerGroup; return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup;
} }
/** /**
@ -250,7 +226,7 @@ public class RegistryCenter {
* @return result * @return result
*/ */
public boolean isWorkerPath(String path) { public boolean isWorkerPath(String path) {
return path != null && path.contains(WORKER_PATH); return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS);
} }
/** /**

View File

@ -22,6 +22,8 @@ import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP; import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE; import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
@ -344,7 +346,7 @@ public class RegistryClient extends RegistryCenter {
* @return master nodes * @return master nodes
*/ */
public Set<String> getMasterNodesDirectly() { public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(MASTER_PATH); List<String> masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
return new HashSet<>(masters); return new HashSet<>(masters);
} }
@ -354,7 +356,7 @@ public class RegistryClient extends RegistryCenter {
* @return master nodes * @return master nodes
*/ */
public Set<String> getWorkerNodesDirectly() { public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(WORKER_PATH); List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers); return new HashSet<>(workers);
} }
@ -364,7 +366,7 @@ public class RegistryClient extends RegistryCenter {
* @return worker group nodes * @return worker group nodes
*/ */
public Set<String> getWorkerGroupDirectly() { public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(getWorkerPath()); List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers); return new HashSet<>(workers);
} }

View File

@ -509,4 +509,19 @@ public class ProcessServiceTest {
Mockito.verify(commandMapper, Mockito.times(1)).insert(command); Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
} }
@Test
public void testChangeOutParam() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setProcessInstanceId(62);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(62);
taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\","
+ "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
+ "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
+ "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
processService.changeOutParam(taskInstance);
}
} }

View File

@ -104,7 +104,7 @@ public class DolphinPluginLoader {
private void loadPlugin(URLClassLoader pluginClassLoader) { private void loadPlugin(URLClassLoader pluginClassLoader) {
ServiceLoader<DolphinSchedulerPlugin> serviceLoader = ServiceLoader.load(DolphinSchedulerPlugin.class, pluginClassLoader); ServiceLoader<DolphinSchedulerPlugin> serviceLoader = ServiceLoader.load(DolphinSchedulerPlugin.class, pluginClassLoader);
List<DolphinSchedulerPlugin> plugins = ImmutableList.copyOf(serviceLoader); List<DolphinSchedulerPlugin> plugins = ImmutableList.copyOf(serviceLoader);
Preconditions.checkState(!plugins.isEmpty(), "No service providers the plugin {}", DolphinSchedulerPlugin.class.getName()); Preconditions.checkState(!plugins.isEmpty(), "No service providers the plugin %s", DolphinSchedulerPlugin.class.getName());
for (DolphinSchedulerPlugin plugin : plugins) { for (DolphinSchedulerPlugin plugin : plugins) {
logger.info("Installing {}", plugin.getClass().getName()); logger.info("Installing {}", plugin.getClass().getName());
for (AbstractDolphinPluginManager dolphinPluginManager : dolphinPluginManagerList) { for (AbstractDolphinPluginManager dolphinPluginManager : dolphinPluginManagerList) {

View File

@ -109,6 +109,10 @@
this.$message.warning(`${i18n.$t('Please enter group name')}`) this.$message.warning(`${i18n.$t('Please enter group name')}`)
return false return false
} }
if (this.alertInstanceIds) {
this.$message.warning(`${i18n.$t('Select Alarm plugin instance')}`)
return false
}
return true return true
}, },
_submit () { _submit () {

View File

@ -111,6 +111,10 @@
this.$message.warning(`${i18n.$t('Please enter group name')}`) this.$message.warning(`${i18n.$t('Please enter group name')}`)
return false return false
} }
if (!this.pluginDefineId) {
this.$message.warning(`${i18n.$t('Select Alarm plugin')}`)
return false
}
return true return true
}, },
// Select plugin // Select plugin

View File

@ -80,7 +80,7 @@
</div> </div>
</template> </template>
<script> <script>
import { mapState, mapMutations } from 'vuex' import { mapActions, mapState, mapMutations } from 'vuex'
import mListBoxF from '@/module/components/listBoxF/listBoxF' import mListBoxF from '@/module/components/listBoxF/listBoxF'
import mCreateUser from '@/conf/home/pages/security/pages/users/_source/createUser' import mCreateUser from '@/conf/home/pages/security/pages/users/_source/createUser'
@ -95,6 +95,7 @@
props: {}, props: {},
methods: { methods: {
...mapMutations('user', ['setUserInfo']), ...mapMutations('user', ['setUserInfo']),
...mapActions('user', ['getUserInfo']),
/** /**
* edit * edit
*/ */
@ -109,7 +110,9 @@
email: param.email, email: param.email,
phone: param.phone phone: param.phone
}) })
this.createUserDialog = false this.getUserInfo().finally(() => {
this.createUserDialog = false
})
}, },
close () { close () {

View File

@ -446,7 +446,7 @@ export default {
*/ */
updateAlertPluginInstance ({ state }, payload) { updateAlertPluginInstance ({ state }, payload) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
io.get('alert-plugin-instance/update', payload, res => { io.post('alert-plugin-instance/update', payload, res => {
resolve(res) resolve(res)
}).catch(e => { }).catch(e => {
reject(e) reject(e)

View File

@ -228,10 +228,12 @@ export default {
'Alarm instance name': 'Alarm instance name', 'Alarm instance name': 'Alarm instance name',
'Alarm plugin name': 'Alarm plugin name', 'Alarm plugin name': 'Alarm plugin name',
'Select plugin': 'Select plugin', 'Select plugin': 'Select plugin',
'Select Alarm plugin': 'Please select an Alarm plugin',
'Please enter group name': 'Please enter group name', 'Please enter group name': 'Please enter group name',
'Instance parameter exception': 'Instance parameter exception', 'Instance parameter exception': 'Instance parameter exception',
'Group Type': 'Group Type', 'Group Type': 'Group Type',
'Alarm plugin instance': 'Alarm plugin instance', 'Alarm plugin instance': 'Alarm plugin instance',
'Select Alarm plugin instance': 'Please select an Alarm plugin instance',
Remarks: 'Remarks', Remarks: 'Remarks',
SMS: 'SMS', SMS: 'SMS',
'Managing Users': 'Managing Users', 'Managing Users': 'Managing Users',

View File

@ -228,10 +228,12 @@ export default {
'Alarm instance name': '告警实例名称', 'Alarm instance name': '告警实例名称',
'Alarm plugin name': '告警插件名称', 'Alarm plugin name': '告警插件名称',
'Select plugin': '选择插件', 'Select plugin': '选择插件',
'Select Alarm plugin': '请选择告警插件',
'Please enter group name': '请输入组名称', 'Please enter group name': '请输入组名称',
'Instance parameter exception': '实例参数异常', 'Instance parameter exception': '实例参数异常',
'Group Type': '组类型', 'Group Type': '组类型',
'Alarm plugin instance': '告警插件实例', 'Alarm plugin instance': '告警插件实例',
'Select Alarm plugin instance': '请选择告警插件实例',
Remarks: '备注', Remarks: '备注',
SMS: '短信', SMS: '短信',
'Managing Users': '管理用户', 'Managing Users': '管理用户',

View File

@ -1010,8 +1010,8 @@
<include>**/server/worker/task/processdure/ProcedureTaskTest.java</include> <include>**/server/worker/task/processdure/ProcedureTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include> <include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include> <include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
<include>**/server/worker/task/PythonCommandExecutorTest.java</include> <include>**/server/worker/task/PythonCommandExecutorTest.java</include>
<include>**/server/worker/task/TaskParamsTest.java</include>
<include>**/server/worker/task/ShellTaskReturnTest.java</include> <include>**/server/worker/task/ShellTaskReturnTest.java</include>
<include>**/server/worker/task/sql/SqlTaskTest.java</include> <include>**/server/worker/task/sql/SqlTaskTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include> <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>