merge from 1.3.3-release

This commit is contained in:
baoliang 2020-11-10 10:45:07 +08:00
parent db3d9fc27a
commit 0df0ab1bc5
2 changed files with 74 additions and 1 deletions

View File

@ -117,6 +117,8 @@ public class DolphinSchedulerManager {
upgradeDao.upgradeDolphinScheduler(schemaDir);
if ("1.3.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList();
}
version = schemaVersion;
}

View File

@ -16,10 +16,10 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
@ -34,7 +34,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class UpgradeDao extends AbstractBaseDao {
@ -268,6 +270,15 @@ public abstract class UpgradeDao extends AbstractBaseDao {
public void upgradeDolphinSchedulerWorkerGroup() {
updateProcessDefinitionJsonWorkerGroup();
}
/**
* upgrade DolphinScheduler resource list
* ds-1.3.2 modify the resource list for process definition json
*/
public void upgradeDolphinSchedulerResourceList() {
updateProcessDefinitionJsonResourceList();
}
/**
* updateProcessDefinitionJsonWorkerGroup
*/
@ -312,6 +323,66 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
/**
* updateProcessDefinitionJsonResourceList
*/
protected void updateProcessDefinitionJsonResourceList(){
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<String,Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0 ;i < tasks.size() ; i++){
ObjectNode task = (ObjectNode) tasks.get(i);
ObjectNode param = (ObjectNode)task.get("params");
if (param != null) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == 0) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s",mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar",JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s",resInfo.getRes());
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList",JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
task.put("params",param);
}
jsonObject.remove("tasks");
jsonObject.put("tasks",tasks);
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json resource list error",e);
}
}
/**
* upgradeDolphinScheduler DML
* @param schemaDir schemaDir