diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 078506898a..2994208ea2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.PluginType; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; import org.apache.dolphinscheduler.spi.params.base.PluginParams; @@ -68,10 +67,7 @@ public class ApiApplicationServer { String paramsJson = PluginParamsTransfer.transferParamsToJson(params); PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson); - int count = pluginDao.addOrUpdatePluginDefine(pluginDefine); - if (count <= 0) { - throw new TaskPluginException("Failed to update task plugin: " + taskPluginName); - } + pluginDao.addOrUpdatePluginDefine(pluginDefine); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java index 57ff712bc8..5ca4812a72 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java @@ -21,12 +21,20 @@ import static java.util.Objects.requireNonNull; import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; + +import java.util.Objects; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +@Slf4j @Component public class PluginDao { + @Autowired private PluginDefineMapper pluginDefineMapper; @@ -43,20 +51,36 @@ public class PluginDao { * add or update plugin define * * @param pluginDefine new pluginDefine + * @return plugin id */ - public int addOrUpdatePluginDefine(PluginDefine pluginDefine) { - requireNonNull(pluginDefine, "pluginDefine is null"); + public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) { requireNonNull(pluginDefine.getPluginName(), "pluginName is null"); requireNonNull(pluginDefine.getPluginType(), "pluginType is null"); - PluginDefine currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType()); + PluginDefine currPluginDefine = + pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType()); if (currPluginDefine == null) { - if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) { - return pluginDefine.getId(); + try { + if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) { + return pluginDefine.getId(); + } + throw new TaskPluginException( + String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s", + pluginDefine.getPluginName(), pluginDefine.getPluginType())); + } catch (TaskPluginException ex) { + throw ex; + } catch (Exception ex) { + log.error("Insert plugin definition error, there may already exist a plugin", ex); + currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), + pluginDefine.getPluginType()); + if (currPluginDefine == null) { + throw new TaskPluginException( + String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s", + pluginDefine.getPluginName(), pluginDefine.getPluginType())); + } } - throw new IllegalStateException("Failed to insert plugin definition"); } - if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) { + if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) { currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime()); currPluginDefine.setPluginParams(pluginDefine.getPluginParams()); pluginDefineMapper.updateById(currPluginDefine);