This commit is contained in:
break60 2021-01-12 14:48:14 +08:00
commit 4e2f27dec2
7 changed files with 86 additions and 10 deletions

View File

@ -33,6 +33,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
@JsonInclude(Include.NON_NULL) @JsonInclude(Include.NON_NULL)
public class ProcessAlertContent implements Serializable { public class ProcessAlertContent implements Serializable {
@JsonProperty("projectId")
private int projectId;
@JsonProperty("projectName")
private String projectName;
@JsonProperty("owner")
private String owner;
@JsonProperty("processId") @JsonProperty("processId")
private int processId; private int processId;
@JsonProperty("processName") @JsonProperty("processName")
@ -79,6 +85,9 @@ public class ProcessAlertContent implements Serializable {
private String logPath; private String logPath;
private ProcessAlertContent(Builder builder) { private ProcessAlertContent(Builder builder) {
this.projectId = builder.projectId;
this.projectName = builder.projectName;
this.owner = builder.owner;
this.processId = builder.processId; this.processId = builder.processId;
this.processName = builder.processName; this.processName = builder.processName;
this.processType = builder.processType; this.processType = builder.processType;
@ -107,7 +116,9 @@ public class ProcessAlertContent implements Serializable {
} }
public static class Builder { public static class Builder {
private int projectId;
private String projectName;
private String owner;
private int processId; private int processId;
private String processName; private String processName;
private CommandType processType; private CommandType processType;
@ -129,6 +140,21 @@ public class ProcessAlertContent implements Serializable {
private String taskHost; private String taskHost;
private String logPath; private String logPath;
public Builder projectId(int projectId) {
this.projectId = projectId;
return this;
}
public Builder projectName(String projectName) {
this.projectName = projectName;
return this;
}
public Builder owner(String owner) {
this.owner = owner;
return this;
}
public Builder processId(int processId) { public Builder processId(int processId) {
this.processId = processId; this.processId = processId;
return this; return this;

View File

@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/** /**
* project mapper interface * project mapper interface
*/ */
@ -81,4 +85,11 @@ public interface ProjectMapper extends BaseMapper<Project> {
*/ */
List<Project> queryProjectCreatedAndAuthorizedByUserId(@Param("userId") int userId); List<Project> queryProjectCreatedAndAuthorizedByUserId(@Param("userId") int userId);
/**
* query project name and user name by processInstanceId.
* @param processInstanceId processInstanceId
* @return projectName and userName
*/
ProjectUser queryProjectWithUserByProcessInstanceId(@Param("processInstanceId") int processInstanceId);
} }

View File

@ -96,4 +96,16 @@
(select project_id from t_ds_relation_project_user where user_id=#{userId} (select project_id from t_ds_relation_project_user where user_id=#{userId}
union select id as project_id from t_ds_project where user_id=#{userId}) union select id as project_id from t_ds_project where user_id=#{userId})
</select> </select>
<select id="queryProjectWithUserByProcessInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.ProjectUser">
select
dp.id projectId,
dp.name projectName,
u.user_name userName
from t_ds_process_instance di
join t_ds_process_definition dpd on di.process_definition_id = dpd.id
join t_ds_project dp on dpd.project_id = dp.id
join t_ds_user u on dp.user_id = u.id
where di.id = #{processInstanceId};
</select>
</mapper> </mapper>

View File

@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.VarPoolUtils; import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.dao.utils.DagHelper;
@ -371,7 +372,8 @@ public class MasterExecThread implements Runnable {
processService.createRecoveryWaitingThreadCommand(null, processInstance); processService.createRecoveryWaitingThreadCommand(null, processInstance);
} }
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId()); List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
alertManager.sendAlertProcessInstance(processInstance, taskInstances); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
alertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
} }
/** /**

View File

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent; import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.ArrayList; import java.util.ArrayList;
@ -94,12 +95,16 @@ public class AlertManager {
* @return process instance format content * @return process instance format content
*/ */
public String getContentProcessInstance(ProcessInstance processInstance, public String getContentProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances) { List<TaskInstance> taskInstances,
ProjectUser projectUser) {
String res = ""; String res = "";
if (processInstance.getState().typeIsSuccess()) { if (processInstance.getState().typeIsSuccess()) {
List<ProcessAlertContent> successTaskList = new ArrayList<>(1); List<ProcessAlertContent> successTaskList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())
.processName(processInstance.getName()) .processName(processInstance.getName())
.processType(processInstance.getCommandType()) .processType(processInstance.getCommandType())
@ -120,6 +125,9 @@ public class AlertManager {
continue; continue;
} }
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())
.processName(processInstance.getName()) .processName(processInstance.getName())
.taskId(task.getId()) .taskId(task.getId())
@ -196,9 +204,10 @@ public class AlertManager {
* @param taskInstances task instance list * @param taskInstances task instance list
*/ */
public void sendAlertProcessInstance(ProcessInstance processInstance, public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances) { List<TaskInstance> taskInstances,
ProjectUser projectUser) {
if(Flag.YES == processInstance.getIsSubProcess()){ if (Flag.YES == processInstance.getIsSubProcess()) {
return; return;
} }
boolean sendWarnning = false; boolean sendWarnning = false;
@ -231,7 +240,7 @@ public class AlertManager {
alert.setTitle(cmdName + " " + success); alert.setTitle(cmdName + " " + success);
ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE; ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE;
alert.setShowType(showType); alert.setShowType(showType);
String content = getContentProcessInstance(processInstance, taskInstances); String content = getContentProcessInstance(processInstance, taskInstances, projectUser);
alert.setContent(content); alert.setContent(content);
alert.setAlertType(AlertType.EMAIL); alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId());

View File

@ -19,9 +19,11 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.junit.Ignore; import org.junit.Ignore;
@ -51,6 +53,9 @@ public class AlertManagerTest {
@Autowired @Autowired
TaskInstanceMapper taskInstanceMapper; TaskInstanceMapper taskInstanceMapper;
@Autowired
ProjectMapper projectMapper;
AlertManager alertManager; AlertManager alertManager;
/** /**
@ -90,7 +95,6 @@ public class AlertManagerTest {
ProcessDefinition processDefinition = processDefinitionMapper.selectById(47); ProcessDefinition processDefinition = processDefinitionMapper.selectById(47);
processInstance.setProcessDefinition(processDefinition); processInstance.setProcessDefinition(processDefinition);
// fault task instance // fault task instance
TaskInstance toleranceTask1 = taskInstanceMapper.selectById(5038); TaskInstance toleranceTask1 = taskInstanceMapper.selectById(5038);
toleranceTask1.setState(ExecutionStatus.FAILURE); toleranceTask1.setState(ExecutionStatus.FAILURE);
@ -101,7 +105,9 @@ public class AlertManagerTest {
toleranceTaskList.add(toleranceTask1); toleranceTaskList.add(toleranceTask1);
toleranceTaskList.add(toleranceTask2); toleranceTaskList.add(toleranceTask2);
alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList); ProjectUser projectUser = projectMapper.queryProjectWithUserByProcessInstanceId(processInstance.getId());
alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList, projectUser);
} }
} }

View File

@ -56,6 +56,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -1859,6 +1860,15 @@ public class ProcessService {
return queue; return queue;
} }
/**
* query project name and user name by processInstanceId.
* @param processInstanceId processInstanceId
* @return projectName and userName
*/
public ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId) {
return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
}
/** /**
* get task worker group * get task worker group
* *