mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-02 20:28:03 +08:00
[Upgrade][Install] fix upgrade 2.0 bug (#6734)
* add convert dependent/conditions * fix upgrade 2.0 bug * fix upgrade 2.0 bug
This commit is contained in:
parent
24bd8a16e1
commit
bdea8d6ae4
@ -53,7 +53,7 @@ public class JsonSplitDao {
|
||||
processUpdate.setInt(2, processDefinitionLog.getTimeout());
|
||||
processUpdate.setInt(3, processDefinitionLog.getTenantId());
|
||||
processUpdate.setString(4, processDefinitionLog.getLocations());
|
||||
processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime());
|
||||
processUpdate.setDate(5, new Date(processDefinitionLog.getUpdateTime().getTime()));
|
||||
processUpdate.setInt(6, processDefinitionLog.getId());
|
||||
processUpdate.addBatch();
|
||||
|
||||
@ -70,9 +70,9 @@ public class JsonSplitDao {
|
||||
insertLog.setInt(11, processDefinitionLog.getTimeout());
|
||||
insertLog.setInt(12, processDefinitionLog.getTenantId());
|
||||
insertLog.setInt(13, processDefinitionLog.getOperator());
|
||||
insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime());
|
||||
insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime());
|
||||
insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime());
|
||||
insertLog.setDate(14, new Date(processDefinitionLog.getOperateTime().getTime()));
|
||||
insertLog.setDate(15, new Date(processDefinitionLog.getCreateTime().getTime()));
|
||||
insertLog.setDate(16, new Date(processDefinitionLog.getUpdateTime().getTime()));
|
||||
insertLog.addBatch();
|
||||
|
||||
i++;
|
||||
@ -121,8 +121,8 @@ public class JsonSplitDao {
|
||||
insert.setInt(7, processTaskRelationLog.getPostTaskVersion());
|
||||
insert.setInt(8, processTaskRelationLog.getConditionType().getCode());
|
||||
insert.setString(9, processTaskRelationLog.getConditionParams());
|
||||
insert.setDate(10, (Date) processTaskRelationLog.getCreateTime());
|
||||
insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime());
|
||||
insert.setDate(10, new Date(processTaskRelationLog.getCreateTime().getTime()));
|
||||
insert.setDate(11, new Date(processTaskRelationLog.getUpdateTime().getTime()));
|
||||
insert.addBatch();
|
||||
|
||||
insertLog.setLong(1, processTaskRelationLog.getProjectCode());
|
||||
@ -135,9 +135,9 @@ public class JsonSplitDao {
|
||||
insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode());
|
||||
insertLog.setString(9, processTaskRelationLog.getConditionParams());
|
||||
insertLog.setInt(10, processTaskRelationLog.getOperator());
|
||||
insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime());
|
||||
insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime());
|
||||
insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime());
|
||||
insertLog.setDate(11, new Date(processTaskRelationLog.getOperateTime().getTime()));
|
||||
insertLog.setDate(12, new Date(processTaskRelationLog.getCreateTime().getTime()));
|
||||
insertLog.setDate(13, new Date(processTaskRelationLog.getUpdateTime().getTime()));
|
||||
insertLog.addBatch();
|
||||
|
||||
i++;
|
||||
@ -169,10 +169,10 @@ public class JsonSplitDao {
|
||||
public void executeJsonSplitTaskDefinition(Connection conn, List<TaskDefinitionLog> taskDefinitionLogs) {
|
||||
String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
|
||||
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,"
|
||||
+ "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
|
||||
+ "create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
|
||||
String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
|
||||
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,"
|
||||
+ "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
|
||||
+ "operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
|
||||
try {
|
||||
PreparedStatement insert = conn.prepareStatement(insertSql);
|
||||
PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
|
||||
@ -193,12 +193,12 @@ public class JsonSplitDao {
|
||||
insert.setInt(13, taskDefinitionLog.getFailRetryTimes());
|
||||
insert.setInt(14, taskDefinitionLog.getFailRetryInterval());
|
||||
insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
|
||||
insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
|
||||
insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
|
||||
insert.setInt(17, taskDefinitionLog.getTimeout());
|
||||
insert.setInt(18, taskDefinitionLog.getDelayTime());
|
||||
insert.setString(19, taskDefinitionLog.getResourceIds());
|
||||
insert.setDate(20, (Date) taskDefinitionLog.getCreateTime());
|
||||
insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime());
|
||||
insert.setDate(20, new Date(taskDefinitionLog.getCreateTime().getTime()));
|
||||
insert.setDate(21, new Date(taskDefinitionLog.getUpdateTime().getTime()));
|
||||
insert.addBatch();
|
||||
|
||||
insertLog.setLong(1, taskDefinitionLog.getCode());
|
||||
@ -216,14 +216,14 @@ public class JsonSplitDao {
|
||||
insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes());
|
||||
insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval());
|
||||
insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
|
||||
insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
|
||||
insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
|
||||
insertLog.setInt(17, taskDefinitionLog.getTimeout());
|
||||
insertLog.setInt(18, taskDefinitionLog.getDelayTime());
|
||||
insertLog.setString(19, taskDefinitionLog.getResourceIds());
|
||||
insertLog.setInt(20, taskDefinitionLog.getOperator());
|
||||
insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime());
|
||||
insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime());
|
||||
insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime());
|
||||
insertLog.setDate(21, new Date(taskDefinitionLog.getOperateTime().getTime()));
|
||||
insertLog.setDate(22, new Date(taskDefinitionLog.getCreateTime().getTime()));
|
||||
insertLog.setDate(23, new Date(taskDefinitionLog.getUpdateTime().getTime()));
|
||||
insertLog.addBatch();
|
||||
|
||||
i++;
|
||||
|
@ -148,7 +148,7 @@ public class ProcessDefinitionDao {
|
||||
pstmt.setLong(1, processDefinition.getCode());
|
||||
long projectCode = processDefinition.getProjectCode();
|
||||
if (String.valueOf(projectCode).length() <= 10) {
|
||||
Integer projectId = Integer.getInteger(String.valueOf(projectCode));
|
||||
Integer projectId = Integer.parseInt(String.valueOf(projectCode));
|
||||
if (projectIdCodeMap.containsKey(projectId)) {
|
||||
projectCode = projectIdCodeMap.get(projectId);
|
||||
processDefinition.setProjectCode(projectCode);
|
||||
|
@ -77,7 +77,7 @@ public class ScheduleDao {
|
||||
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
|
||||
long projectDefinitionCode = entry.getValue();
|
||||
if (String.valueOf(projectDefinitionCode).length() <= 10) {
|
||||
Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode));
|
||||
Integer projectDefinitionId = Integer.parseInt(String.valueOf(projectDefinitionCode));
|
||||
if (processIdCodeMap.containsKey(projectDefinitionId)) {
|
||||
projectDefinitionCode = processIdCodeMap.get(projectDefinitionId);
|
||||
}
|
||||
|
@ -63,10 +63,10 @@ import javax.sql.DataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public abstract class UpgradeDao extends AbstractBaseDao {
|
||||
@ -649,14 +649,17 @@ public abstract class UpgradeDao extends AbstractBaseDao {
|
||||
ObjectNode param = (ObjectNode) task.get("params");
|
||||
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
|
||||
if (param != null) {
|
||||
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
|
||||
if (!resourceList.isEmpty()) {
|
||||
JsonNode resourceJsonNode = param.get("resourceList");
|
||||
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
|
||||
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
|
||||
List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
|
||||
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ","));
|
||||
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
|
||||
} else {
|
||||
taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
|
||||
}
|
||||
param.put("conditionResult", task.get("conditionResult"));
|
||||
param.put("dependence", task.get("dependence"));
|
||||
taskDefinitionLog.setTaskParams(param.asText());
|
||||
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
|
||||
}
|
||||
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
|
||||
if (timeout != null) {
|
||||
@ -674,6 +677,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
|
||||
taskDefinitionLog.setName(name);
|
||||
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
|
||||
long taskCode = SnowFlakeUtils.getInstance().nextId();
|
||||
// System.out.println(taskCode);
|
||||
taskDefinitionLog.setCode(taskCode);
|
||||
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
|
||||
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
|
||||
@ -686,7 +690,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
|
||||
taskDefinitionLog.setUpdateTime(now);
|
||||
taskDefinitionLogList.add(taskDefinitionLog);
|
||||
taskIdCodeMap.put(task.get("id").asText(), taskCode);
|
||||
List<String> preTasks = JSONUtils.toList(task.get("preTasks").asText(), String.class);
|
||||
List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
|
||||
taskNamePreMap.put(name, preTasks);
|
||||
taskNameCodeMap.put(name, taskCode);
|
||||
}
|
||||
@ -745,13 +749,16 @@ public abstract class UpgradeDao extends AbstractBaseDao {
|
||||
if (StringUtils.isBlank(locations)) {
|
||||
return locations;
|
||||
}
|
||||
Map<String, String> locationsMap = JSONUtils.toMap(locations);
|
||||
JsonNodeFactory factory = new JsonNodeFactory(false);
|
||||
ArrayNode jsonNodes = factory.arrayNode();
|
||||
for (Map.Entry<String, String> entry : locationsMap.entrySet()) {
|
||||
ObjectNode nodes = factory.objectNode();
|
||||
Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
|
||||
});
|
||||
if (locationsMap == null) {
|
||||
return locations;
|
||||
}
|
||||
ArrayNode jsonNodes = JSONUtils.createArrayNode();
|
||||
for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
|
||||
ObjectNode nodes = JSONUtils.createObjectNode();
|
||||
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
|
||||
ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue());
|
||||
ObjectNode oldNodes = entry.getValue();
|
||||
nodes.put("x", oldNodes.get("x").asInt());
|
||||
nodes.put("y", oldNodes.get("y").asInt());
|
||||
jsonNodes.add(nodes);
|
||||
|
@ -15,8 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
alter table t_ds_process_definition drop primary key;
|
||||
ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`);
|
||||
alter table t_ds_process_definition drop primary key, ADD PRIMARY KEY (`id`,`code`);
|
||||
ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`;
|
||||
ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`;
|
||||
alter table t_ds_process_definition drop process_definition_json;
|
||||
|
Loading…
Reference in New Issue
Block a user