From d0a63fd304f81d7c819445b70324f1ba4b49ae66 Mon Sep 17 00:00:00 2001 From: bwcx_jzy Date: Mon, 26 Dec 2022 17:22:35 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=E6=9C=8D=E5=8A=A1=E7=AB=AF=E5=88=A0?= =?UTF-8?q?=E9=99=A4=20Java-WebSocket=20=E4=BE=9D=E8=B5=96=EF=BC=88?= =?UTF-8?q?=E9=87=87=E7=94=A8=E7=BB=9F=E4=B8=80=E6=A8=A1=E5=9D=97=E7=AE=A1?= =?UTF-8?q?=E7=90=86=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + .../top/jpom/transport/IProxyWebSocket.java | 67 ++++ .../java/top/jpom/transport/IUrlItem.java | 1 + .../top/jpom/transport/TransportServer.java | 10 + .../jpom/transport/HttpTransportServer.java | 28 ++ .../ServletWebSocketClientHandler.java | 103 ++++++ modules/server/pom.xml | 7 - .../io/jpom/common/forward/NodeForward.java | 2 +- .../java/io/jpom/socket/BaseProxyHandler.java | 32 +- .../java/io/jpom/socket/ProxySession.java | 234 +++++++------- .../io/jpom/socket/ServerWebSocketConfig.java | 12 +- .../socket/ServerWebSocketInterceptor.java | 9 +- .../io/jpom/socket/client/NodeClient.java | 296 +++++++++--------- .../socket/handler/BaseTerminalHandler.java | 31 +- .../jpom/socket/handler/ConsoleHandler.java | 7 +- .../jpom/socket/handler/DockerCliHandler.java | 2 + .../jpom/socket/handler/DockerLogHandler.java | 1 + .../socket/handler/NodeScriptHandler.java | 5 +- .../socket/handler/NodeUpdateHandler.java | 63 ++-- .../io/jpom/socket/handler/SshHandler.java | 2 + .../io/jpom/socket/handler/TomcatHandler.java | 4 +- .../java/io/jpom/util/SocketSessionUtil.java | 59 ++-- 22 files changed, 613 insertions(+), 363 deletions(-) create mode 100644 modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IProxyWebSocket.java create mode 100644 modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/ServletWebSocketClientHandler.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fbc6bc1e..63e244533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ 7. 【server】优化 websocket 控制台操作日志记录 8. 【server】修复 超级管理的 websocket 操作日志记录工作空间不正确 9. 【agent】优化 插件端删除 spring-boot-starter-websocket 依赖 +10. 【server】优化 服务端删除 Java-WebSocket 依赖(采用统一模块管理) ### ❌ 不兼容功能 diff --git a/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IProxyWebSocket.java b/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IProxyWebSocket.java new file mode 100644 index 000000000..103eed7e4 --- /dev/null +++ b/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IProxyWebSocket.java @@ -0,0 +1,67 @@ +package top.jpom.transport; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * @author bwcx_jzy + * @since 2022/12/26 + */ +public interface IProxyWebSocket { + + /** + * 关闭连接 + * + * @throws IOException 关闭异常 + */ + void close() throws IOException; + + /** + * 打开连接 + * + * @return 打开状态 + */ + boolean open(); + + /** + * 重新打开连接 + * + * @return 打开状态 + * @throws IOException 关闭异常 + */ + default boolean reopen() throws IOException { + this.close(); + return this.open(); + } + + /** + * 发送消息 + * + * @param msg 消息 + * @throws IOException 发送异常 + */ + void send(String msg) throws IOException; + + /** + * 发送消息 + * + * @param bytes 消息 + * @throws IOException 发送异常 + */ + void send(ByteBuffer bytes) throws IOException; + + /** + * 收到消息 + * + * @param consumer 回调 + */ + void onMessage(Consumer consumer); + + /** + * 是否连接上 + * + * @return true + */ + boolean isConnected(); +} diff --git a/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IUrlItem.java b/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IUrlItem.java index 3bf6ad4ef..4cc3b1e81 100644 --- a/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IUrlItem.java +++ b/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/IUrlItem.java @@ -37,6 +37,7 @@ public interface IUrlItem { /** * 请求超时时间 + * 单位秒 * * @return 超时时间 */ diff --git a/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/TransportServer.java b/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/TransportServer.java index c09ddbc6a..6ad40648e 100644 --- a/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/TransportServer.java +++ b/modules/agent-transport/agent-transport-common/src/main/java/top/jpom/transport/TransportServer.java @@ -91,4 +91,14 @@ public interface TransportServer { * @param consumer 回调 */ void download(INodeInfo nodeInfo, IUrlItem urlItem, Object data, Consumer consumer); + + /** + * 创建 websocket 连接 + * + * @param nodeInfo 节点信息 + * @param urlItem 请求 item + * @param parameters 参数 + * @return websocket + */ + IProxyWebSocket websocket(INodeInfo nodeInfo, IUrlItem urlItem, Object... parameters); } diff --git a/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/HttpTransportServer.java b/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/HttpTransportServer.java index 2806d42ed..7364a9a72 100644 --- a/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/HttpTransportServer.java +++ b/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/HttpTransportServer.java @@ -22,6 +22,7 @@ */ package top.jpom.transport; +import cn.hutool.core.convert.Convert; import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.util.StrUtil; import cn.hutool.http.*; @@ -126,4 +127,31 @@ public class HttpTransportServer implements TransportServer { throw Lombok.sneakyThrow(TransformServerFactory.get().transformException(e, nodeInfo)); } } + + @Override + public IProxyWebSocket websocket(INodeInfo nodeInfo, IUrlItem urlItem, Object... parameters) { + String ws = "https".equalsIgnoreCase(nodeInfo.scheme()) ? "wss" : "ws"; + String url = StrUtil.format("{}://{}/", nodeInfo.scheme(), nodeInfo.url()); + UrlBuilder urlBuilder = UrlBuilder.of(url).addPath(urlItem.path()); + // + urlBuilder.addQuery(TransportServer.JPOM_AGENT_AUTHORIZE, nodeInfo.authorize()); + // + urlBuilder.addQuery(TransportServer.WORKSPACE_ID_REQ_HEADER, urlItem.workspaceId()); + for (int i = 0; i < parameters.length; i += 2) { + Object parameter = parameters[i + 1]; + String value = Convert.toStr(parameter, StrUtil.EMPTY); + urlBuilder.addQuery(parameters[i].toString(), value); + } + urlBuilder.setWithEndTag(false); + String uriTemplate = urlBuilder.build(); + uriTemplate = StrUtil.removePrefixIgnoreCase(uriTemplate, "http"); + uriTemplate = StrUtil.removePrefixIgnoreCase(uriTemplate, "https"); + uriTemplate = StrUtil.format("{}{}", ws, uriTemplate); + // + if (log.isDebugEnabled()) { + log.debug("{}[{}] -> {}", nodeInfo.name(), uriTemplate, urlItem.workspaceId()); + } + Integer timeout = urlItem.timeout(); + return new ServletWebSocketClientHandler(uriTemplate, timeout); + } } diff --git a/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/ServletWebSocketClientHandler.java b/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/ServletWebSocketClientHandler.java new file mode 100644 index 000000000..a5db1bd10 --- /dev/null +++ b/modules/agent-transport/agent-transport-http/src/main/java/top/jpom/transport/ServletWebSocketClientHandler.java @@ -0,0 +1,103 @@ +package top.jpom.transport; + +import cn.hutool.core.thread.ThreadUtil; +import org.springframework.util.Assert; +import org.springframework.util.unit.DataSize; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.client.WebSocketConnectionManager; +import org.springframework.web.socket.client.standard.StandardWebSocketClient; +import org.springframework.web.socket.handler.AbstractWebSocketHandler; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * @author bwcx_jzy + * @since 2022/12/26 + */ +public class ServletWebSocketClientHandler extends AbstractWebSocketHandler implements IProxyWebSocket { + + private static final StandardWebSocketClient CLIENT = new StandardWebSocketClient(); + + private WebSocketSession session; + // private final Integer timeout; + private final String uriTemplate; + private Consumer consumerText; + private WebSocketConnectionManager manager; + + public ServletWebSocketClientHandler(String uriTemplate, Integer timeout) { + this.uriTemplate = uriTemplate; +// this.timeout = timeout; + } + + @Override + public void onMessage(Consumer consumer) { + this.consumerText = consumer; + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + Optional.ofNullable(this.consumerText).ifPresent(consumer -> consumer.accept(message.getPayload())); + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + // 发送消息时间限制 60 秒 + // 缓存 5m 消息 + this.session = new ConcurrentWebSocketSessionDecorator(session, 60 * 1000, (int) DataSize.ofMegabytes(5).toBytes()); + } + + + @Override + public void close() throws IOException { + if (this.manager == null) { + return; + } + this.manager.stop(); + this.manager = null; + this.session = null; + } + + @Override + public boolean open() { + this.manager = new WebSocketConnectionManager(CLIENT, this, this.uriTemplate); + this.manager.start(); + int maxTimeout = 50 * 60;// Optional.ofNullable(this.timeout).orElse(5 * 60); + int waitTime = 0; + do { + if (this.isConnected()) { + return true; + } + waitTime++; + ThreadUtil.sleep(500, TimeUnit.MILLISECONDS); + } while (waitTime * 2 <= maxTimeout); + // + return false; + } + + @Override + public void send(String msg) throws IOException { + Assert.notNull(this.session, "还没有连接上"); + session.sendMessage(new TextMessage(msg)); + } + + @Override + public void send(ByteBuffer bytes) throws IOException { + Assert.notNull(this.session, "还没有连接上"); + session.sendMessage(new BinaryMessage(bytes)); + } + + @Override + public boolean isConnected() { + if (this.manager == null) { + return false; + } + return this.manager.isConnected(); + } +} diff --git a/modules/server/pom.xml b/modules/server/pom.xml index 5c091ac1e..3956a3d26 100644 --- a/modules/server/pom.xml +++ b/modules/server/pom.xml @@ -53,13 +53,6 @@ ${project.version} - - - org.java-websocket - Java-WebSocket - 1.5.3 - - io.jpom.plugins diff --git a/modules/server/src/main/java/io/jpom/common/forward/NodeForward.java b/modules/server/src/main/java/io/jpom/common/forward/NodeForward.java index a36a63453..8d4f0b9e4 100644 --- a/modules/server/src/main/java/io/jpom/common/forward/NodeForward.java +++ b/modules/server/src/main/java/io/jpom/common/forward/NodeForward.java @@ -77,7 +77,7 @@ public class NodeForward { * @param dataContentType 传输的数据类型 * @return item */ - private static IUrlItem createUrlItem(NodeModel nodeModel, NodeUrl nodeUrl, DataContentType dataContentType) { + public static IUrlItem createUrlItem(NodeModel nodeModel, NodeUrl nodeUrl, DataContentType dataContentType) { // 修正节点密码 if (StrUtil.isEmpty(nodeModel.getLoginPwd())) { NodeService nodeService = SpringUtil.getBean(NodeService.class); diff --git a/modules/server/src/main/java/io/jpom/socket/BaseProxyHandler.java b/modules/server/src/main/java/io/jpom/socket/BaseProxyHandler.java index efbac8b46..4f218098e 100644 --- a/modules/server/src/main/java/io/jpom/socket/BaseProxyHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/BaseProxyHandler.java @@ -27,9 +27,14 @@ import com.alibaba.fastjson2.JSONObject; import io.jpom.common.forward.NodeForward; import io.jpom.common.forward.NodeUrl; import io.jpom.model.data.NodeModel; -import io.jpom.model.user.UserModel; +import io.jpom.util.SocketSessionUtil; +import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; +import top.jpom.transport.DataContentType; +import top.jpom.transport.IProxyWebSocket; +import top.jpom.transport.IUrlItem; +import top.jpom.transport.TransportServerFactory; import java.io.IOException; import java.net.URISyntaxException; @@ -41,6 +46,7 @@ import java.util.Map; * @author jiangzeyin * @since 2019/4/25 */ +@Slf4j public abstract class BaseProxyHandler extends BaseHandler { private final NodeUrl nodeUrl; @@ -78,13 +84,16 @@ public abstract class BaseProxyHandler extends BaseHandler { return; } NodeModel nodeModel = (NodeModel) attributes.get("nodeInfo"); - UserModel userInfo = (UserModel) attributes.get("userInfo"); +// UserModel userInfo = (UserModel) attributes.get("userInfo"); if (nodeModel != null) { Object[] parameters = this.getParameters(attributes); - String url = NodeForward.getSocketUrl(nodeModel, nodeUrl, userInfo, parameters); // 连接节点 - ProxySession proxySession = new ProxySession(url, nodeModel.getTimeOut(), session); + IUrlItem urlItem = NodeForward.createUrlItem(nodeModel, this.nodeUrl, DataContentType.FORM_URLENCODED); + + IProxyWebSocket proxySession = TransportServerFactory.get().websocket(nodeModel, urlItem, parameters); + proxySession.onMessage(s -> sendMsg(session, s)); + proxySession.open(); session.getAttributes().put("proxySession", proxySession); } @@ -96,7 +105,7 @@ public abstract class BaseProxyHandler extends BaseHandler { protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String msg = message.getPayload(); Map attributes = session.getAttributes(); - ProxySession proxySession = (ProxySession) attributes.get("proxySession"); + IProxyWebSocket proxySession = (IProxyWebSocket) attributes.get("proxySession"); JSONObject json = JSONObject.parseObject(msg); String op = json.getString("op"); ConsoleCommandOp consoleCommandOp = StrUtil.isNotEmpty(op) ? ConsoleCommandOp.valueOf(op) : null; @@ -135,9 +144,9 @@ public abstract class BaseProxyHandler extends BaseHandler { * @param consoleCommandOp 操作类型 */ protected String handleTextMessage(Map attributes, - ProxySession proxySession, + IProxyWebSocket proxySession, JSONObject json, - ConsoleCommandOp consoleCommandOp) { + ConsoleCommandOp consoleCommandOp) throws IOException { return null; } @@ -151,9 +160,14 @@ public abstract class BaseProxyHandler extends BaseHandler { } catch (IOException ignored) { } Map attributes = session.getAttributes(); - ProxySession proxySession = (ProxySession) attributes.get("proxySession"); + IProxyWebSocket proxySession = (IProxyWebSocket) attributes.get("proxySession"); if (proxySession != null) { - proxySession.close(); + try { + proxySession.close(); + } catch (Exception e) { + log.error("关闭异常", e); + } } + SocketSessionUtil.close(session); } } diff --git a/modules/server/src/main/java/io/jpom/socket/ProxySession.java b/modules/server/src/main/java/io/jpom/socket/ProxySession.java index 8cea6e48e..786869a7d 100644 --- a/modules/server/src/main/java/io/jpom/socket/ProxySession.java +++ b/modules/server/src/main/java/io/jpom/socket/ProxySession.java @@ -20,109 +20,131 @@ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -package io.jpom.socket; - -import cn.hutool.core.exceptions.ExceptionUtil; -import cn.hutool.core.thread.ThreadUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import com.alibaba.fastjson2.JSONObject; -import io.jpom.system.init.OperateLogController; -import lombok.extern.slf4j.Slf4j; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -/** - * 代理socket 会话 - * - * @author jiangzeyin - * @since 2019/4/16 - */ -@Slf4j -public class ProxySession extends WebSocketClient { - private final WebSocketSession session; - private final OperateLogController logController; - - /** - * 等待连接成功 - */ - private void loopOpen() { - int count = 0; - while (!this.isOpen() && count < 20) { - ThreadUtil.sleep(500); - count++; - } - } - - public ProxySession(String uri, Integer timeOut, WebSocketSession session) throws URISyntaxException, InterruptedException { - super(new URI(uri)); - //this(new URI(uri), session); - Objects.requireNonNull(session); - this.session = session; - if (timeOut == null) { - this.connect(); - } else { - this.connectBlocking(Math.max(timeOut, 1), TimeUnit.SECONDS); - } - this.loopOpen(); - logController = SpringUtil.getBean(OperateLogController.class); - } - - @Override - public void onOpen(ServerHandshake serverHandshake) { - - } - - @Override - public void onMessage(String message) { - try { - session.sendMessage(new TextMessage(message)); - } catch (IOException e) { - log.error("发送消息失败", e); - } - try { - JSONObject jsonObject = JSONObject.parseObject(message); - String reqId = jsonObject.getString("reqId"); - if (StrUtil.isNotEmpty(reqId)) { - logController.updateLog(reqId, message); - } - } catch (Exception ignored) { - } - } - - @Override - public void onClose(int code, String reason, boolean remote) { - try { - session.close(); - } catch (IOException e) { - log.error("关闭错误", e); - } - } - - @Override - public void onError(Exception ex) { - try { - session.sendMessage(new TextMessage("agent服务端发生异常" + ExceptionUtil.stacktraceToString(ex))); -// SocketSessionUtil.send(session, ); - } catch (IOException ignored) { - } - log.error("发生错误", ex); - } - - @Override - public void send(String text) { - try { - super.send(text); - } catch (Exception e) { - log.error("转发消息失败", e); - } - } -} +///* +// * The MIT License (MIT) +// * +// * Copyright (c) 2019 Code Technology Studio +// * +// * Permission is hereby granted, free of charge, to any person obtaining a copy of +// * this software and associated documentation files (the "Software"), to deal in +// * the Software without restriction, including without limitation the rights to +// * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// * the Software, and to permit persons to whom the Software is furnished to do so, +// * subject to the following conditions: +// * +// * The above copyright notice and this permission notice shall be included in all +// * copies or substantial portions of the Software. +// * +// * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// */ +//package io.jpom.socket; +// +//import cn.hutool.core.exceptions.ExceptionUtil; +//import cn.hutool.core.thread.ThreadUtil; +//import cn.hutool.core.util.StrUtil; +//import cn.hutool.extra.spring.SpringUtil; +//import com.alibaba.fastjson2.JSONObject; +//import io.jpom.system.init.OperateLogController; +//import lombok.extern.slf4j.Slf4j; +//import org.java_websocket.client.WebSocketClient; +//import org.java_websocket.handshake.ServerHandshake; +//import org.springframework.web.socket.TextMessage; +//import org.springframework.web.socket.WebSocketSession; +// +//import java.io.IOException; +//import java.net.URI; +//import java.net.URISyntaxException; +//import java.util.Objects; +//import java.util.concurrent.TimeUnit; +// +///** +// * 代理socket 会话 +// * +// * @author jiangzeyin +// * @since 2019/4/16 +// */ +//@Slf4j +//public class ProxySession extends WebSocketClient { +// private final WebSocketSession session; +// private final OperateLogController logController; +// +// /** +// * 等待连接成功 +// */ +// private void loopOpen() { +// int count = 0; +// while (!this.isOpen() && count < 20) { +// ThreadUtil.sleep(500); +// count++; +// } +// } +// +// public ProxySession(String uri, Integer timeOut, WebSocketSession session) throws URISyntaxException, InterruptedException { +// super(new URI(uri)); +// //this(new URI(uri), session); +// Objects.requireNonNull(session); +// this.session = session; +// if (timeOut == null) { +// this.connect(); +// } else { +// this.connectBlocking(Math.max(timeOut, 1), TimeUnit.SECONDS); +// } +// this.loopOpen(); +// logController = SpringUtil.getBean(OperateLogController.class); +// } +// +// @Override +// public void onOpen(ServerHandshake serverHandshake) { +// +// } +// +// @Override +// public void onMessage(String message) { +// try { +// session.sendMessage(new TextMessage(message)); +// } catch (IOException e) { +// log.error("发送消息失败", e); +// } +// try { +// JSONObject jsonObject = JSONObject.parseObject(message); +// String reqId = jsonObject.getString("reqId"); +// if (StrUtil.isNotEmpty(reqId)) { +// logController.updateLog(reqId, message); +// } +// } catch (Exception ignored) { +// } +// } +// +// @Override +// public void onClose(int code, String reason, boolean remote) { +// try { +// session.close(); +// } catch (IOException e) { +// log.error("关闭错误", e); +// } +// } +// +// @Override +// public void onError(Exception ex) { +// try { +// session.sendMessage(new TextMessage("agent服务端发生异常" + ExceptionUtil.stacktraceToString(ex))); +//// SocketSessionUtil.send(session, ); +// } catch (IOException ignored) { +// } +// log.error("发生错误", ex); +// } +// +// @Override +// public void send(String text) { +// try { +// super.send(text); +// } catch (Exception e) { +// log.error("转发消息失败", e); +// } +// } +//} diff --git a/modules/server/src/main/java/io/jpom/socket/ServerWebSocketConfig.java b/modules/server/src/main/java/io/jpom/socket/ServerWebSocketConfig.java index 043504bc4..35c2df47b 100644 --- a/modules/server/src/main/java/io/jpom/socket/ServerWebSocketConfig.java +++ b/modules/server/src/main/java/io/jpom/socket/ServerWebSocketConfig.java @@ -22,6 +22,8 @@ */ package io.jpom.socket; +import io.jpom.service.node.NodeService; +import io.jpom.service.system.SystemParametersServer; import io.jpom.socket.handler.*; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; @@ -37,9 +39,15 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry @EnableWebSocket public class ServerWebSocketConfig implements WebSocketConfigurer { private final ServerWebSocketInterceptor serverWebSocketInterceptor; + private final SystemParametersServer systemParametersServer; + private final NodeService nodeService; - public ServerWebSocketConfig(ServerWebSocketInterceptor serverWebSocketInterceptor) { + public ServerWebSocketConfig(ServerWebSocketInterceptor serverWebSocketInterceptor, + SystemParametersServer systemParametersServer, + NodeService nodeService) { this.serverWebSocketInterceptor = serverWebSocketInterceptor; + this.systemParametersServer = systemParametersServer; + this.nodeService = nodeService; } @Override @@ -57,7 +65,7 @@ public class ServerWebSocketConfig implements WebSocketConfigurer { registry.addHandler(new SshHandler(), "/socket/ssh") .addInterceptors(serverWebSocketInterceptor).setAllowedOrigins("*"); // 节点升级 - registry.addHandler(new NodeUpdateHandler(), "/socket/node_update") + registry.addHandler(new NodeUpdateHandler(nodeService, systemParametersServer), "/socket/node_update") .addInterceptors(serverWebSocketInterceptor).setAllowedOrigins("*"); // 脚本模板 registry.addHandler(new ServerScriptHandler(), "/socket/script_run") diff --git a/modules/server/src/main/java/io/jpom/socket/ServerWebSocketInterceptor.java b/modules/server/src/main/java/io/jpom/socket/ServerWebSocketInterceptor.java index 6a25390fa..a67f8b894 100644 --- a/modules/server/src/main/java/io/jpom/socket/ServerWebSocketInterceptor.java +++ b/modules/server/src/main/java/io/jpom/socket/ServerWebSocketInterceptor.java @@ -171,15 +171,18 @@ public class ServerWebSocketInterceptor implements HandshakeInterceptor { String userId = httpServletRequest.getParameter("userId"); UserModel userModel = userService.checkUser(userId); if (userModel == null) { - return false; + attributes.put("permissionMsg", "用户不存在"); + return true; } boolean checkNode = this.checkNode(httpServletRequest, attributes, userModel); HandlerType handlerType = this.fromType(httpServletRequest); if (!checkNode || handlerType == null) { - return false; + attributes.put("permissionMsg", "未匹配到合适的处理类型"); + return true; } if (!this.checkHandlerType(handlerType, userModel, httpServletRequest, attributes)) { - return false; + attributes.put("permissionMsg", "未找到匹配的数据"); + return true; } // 判断权限 String permissionMsg = this.checkPermission(userModel, attributes, handlerType); diff --git a/modules/server/src/main/java/io/jpom/socket/client/NodeClient.java b/modules/server/src/main/java/io/jpom/socket/client/NodeClient.java index ce9fcab16..b50ca81be 100644 --- a/modules/server/src/main/java/io/jpom/socket/client/NodeClient.java +++ b/modules/server/src/main/java/io/jpom/socket/client/NodeClient.java @@ -1,148 +1,148 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2019 Code Technology Studio - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -package io.jpom.socket.client; - -import cn.hutool.core.thread.ThreadUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import com.alibaba.fastjson2.JSONObject; -import io.jpom.model.WebSocketMessageModel; -import io.jpom.model.data.NodeModel; -import io.jpom.system.init.OperateLogController; -import lombok.extern.slf4j.Slf4j; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; - -import java.io.IOException; -import java.net.Proxy; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -/** - * 节点Client - * - * @author lf - */ -@Slf4j -public class NodeClient extends WebSocketClient { - private final WebSocketSession session; - private final OperateLogController logController; - private final NodeModel nodeModel; - - - public NodeClient(String uri, NodeModel nodeModel, WebSocketSession session) throws URISyntaxException, InterruptedException { - super(new URI(uri)); - // 添加 http proxy - Proxy proxy = nodeModel.proxy(); - if (proxy != null) { - setProxy(proxy); - } - this.session = session; - this.nodeModel = nodeModel; - // - Integer timeOut = nodeModel.getTimeOut(); - if (timeOut == null) { - this.connect(); - } else { - this.connectBlocking(timeOut, TimeUnit.SECONDS); - } - this.loopOpen(); - logController = SpringUtil.getBean(OperateLogController.class); - } - - /** - * 等待连接成功 - */ - private void loopOpen() { - int count = 0; - while (!this.isOpen() && count < 20) { - ThreadUtil.sleep(500); - count++; - } - } - - @Override - public void onOpen(ServerHandshake serverHandshake) { - // 连接成功后获取版本信息 - getVersion(); - } - - public void getVersion() { - WebSocketMessageModel command = new WebSocketMessageModel("getVersion", this.nodeModel.getId()); - send(command.toString()); - } - - @Override - public void onMessage(String message) { - try { - // 不能并发向同一个客户端发送消息 @author jzy 2021-08-03 - synchronized (session.getId()) { - session.sendMessage(new TextMessage(message)); - } - } catch (IOException e) { - log.error("发送消息失败", e); - } - try { - JSONObject jsonObject = JSONObject.parseObject(message); - String reqId = jsonObject.getString("reqId"); - if (StrUtil.isNotEmpty(reqId)) { - logController.updateLog(reqId, message); - } - } catch (Exception ignored) { - } - } - - @Override - public void onClose(int code, String reason, boolean remote) { - - } - - @Override - public void send(String text) { - super.send(text); - } - - - @Override - public void close() { - try { - super.close(); - } catch (Exception e) { - log.error("关闭异常", e); - } - } - - @Override - public void send(ByteBuffer bytes) { - super.send(bytes); - } - - @Override - public void onError(Exception e) { - log.error("发生异常", e); - } -} +///* +// * The MIT License (MIT) +// * +// * Copyright (c) 2019 Code Technology Studio +// * +// * Permission is hereby granted, free of charge, to any person obtaining a copy of +// * this software and associated documentation files (the "Software"), to deal in +// * the Software without restriction, including without limitation the rights to +// * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// * the Software, and to permit persons to whom the Software is furnished to do so, +// * subject to the following conditions: +// * +// * The above copyright notice and this permission notice shall be included in all +// * copies or substantial portions of the Software. +// * +// * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// */ +//package io.jpom.socket.client; +// +//import cn.hutool.core.thread.ThreadUtil; +//import cn.hutool.core.util.StrUtil; +//import cn.hutool.extra.spring.SpringUtil; +//import com.alibaba.fastjson2.JSONObject; +//import io.jpom.model.WebSocketMessageModel; +//import io.jpom.model.data.NodeModel; +//import io.jpom.system.init.OperateLogController; +//import lombok.extern.slf4j.Slf4j; +//import org.java_websocket.client.WebSocketClient; +//import org.java_websocket.handshake.ServerHandshake; +//import org.springframework.web.socket.TextMessage; +//import org.springframework.web.socket.WebSocketSession; +// +//import java.io.IOException; +//import java.net.Proxy; +//import java.net.URI; +//import java.net.URISyntaxException; +//import java.nio.ByteBuffer; +//import java.util.concurrent.TimeUnit; +// +///** +// * 节点Client +// * +// * @author lf +// */ +//@Slf4j +//public class NodeClient extends WebSocketClient { +// private final WebSocketSession session; +// private final OperateLogController logController; +// private final NodeModel nodeModel; +// +// +// public NodeClient(String uri, NodeModel nodeModel, WebSocketSession session) throws URISyntaxException, InterruptedException { +// super(new URI(uri)); +// // 添加 http proxy +// Proxy proxy = nodeModel.proxy(); +// if (proxy != null) { +// setProxy(proxy); +// } +// this.session = session; +// this.nodeModel = nodeModel; +// // +// Integer timeOut = nodeModel.getTimeOut(); +// if (timeOut == null) { +// this.connect(); +// } else { +// this.connectBlocking(timeOut, TimeUnit.SECONDS); +// } +// this.loopOpen(); +// logController = SpringUtil.getBean(OperateLogController.class); +// } +// +// /** +// * 等待连接成功 +// */ +// private void loopOpen() { +// int count = 0; +// while (!this.isOpen() && count < 20) { +// ThreadUtil.sleep(500); +// count++; +// } +// } +// +// @Override +// public void onOpen(ServerHandshake serverHandshake) { +// // 连接成功后获取版本信息 +// getVersion(); +// } +// +// public void getVersion() { +// WebSocketMessageModel command = new WebSocketMessageModel("getVersion", this.nodeModel.getId()); +// send(command.toString()); +// } +// +// @Override +// public void onMessage(String message) { +// try { +// // 不能并发向同一个客户端发送消息 @author jzy 2021-08-03 +// synchronized (session.getId()) { +// session.sendMessage(new TextMessage(message)); +// } +// } catch (IOException e) { +// log.error("发送消息失败", e); +// } +// try { +// JSONObject jsonObject = JSONObject.parseObject(message); +// String reqId = jsonObject.getString("reqId"); +// if (StrUtil.isNotEmpty(reqId)) { +// logController.updateLog(reqId, message); +// } +// } catch (Exception ignored) { +// } +// } +// +// @Override +// public void onClose(int code, String reason, boolean remote) { +// +// } +// +// @Override +// public void send(String text) { +// super.send(text); +// } +// +// +// @Override +// public void close() { +// try { +// super.close(); +// } catch (Exception e) { +// log.error("关闭异常", e); +// } +// } +// +// @Override +// public void send(ByteBuffer bytes) { +// super.send(bytes); +// } +// +// @Override +// public void onError(Exception e) { +// log.error("发生异常", e); +// } +//} diff --git a/modules/server/src/main/java/io/jpom/socket/handler/BaseTerminalHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/BaseTerminalHandler.java index f47795a4d..3c9064c51 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/BaseTerminalHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/BaseTerminalHandler.java @@ -22,8 +22,8 @@ */ package io.jpom.socket.handler; -import cn.hutool.core.util.StrUtil; import io.jpom.socket.BaseHandler; +import io.jpom.util.SocketSessionUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.WebSocketSession; @@ -37,22 +37,15 @@ import java.io.IOException; @Slf4j public abstract class BaseTerminalHandler extends BaseHandler { - protected void sendBinary(WebSocketSession session, String msg) { - if (StrUtil.isEmpty(msg)) { - return; - } - if (!session.isOpen()) { - // 会话关闭不能发送消息 @author jzy 21-08-04 - log.warn("回话已经关闭啦,不能发送消息:{}", msg); - return; - } - synchronized (session.getId()) { - BinaryMessage byteBuffer = new BinaryMessage(msg.getBytes()); - try { - session.sendMessage(byteBuffer); - } catch (IOException e) { - log.error("发送消息失败:" + msg, e); - } - } - } + protected void sendBinary(WebSocketSession session, String msg) { + if (msg == null) { + return; + } + BinaryMessage byteBuffer = new BinaryMessage(msg.getBytes()); + try { + SocketSessionUtil.send(session, byteBuffer); + } catch (IOException e) { + log.error("发送消息失败:" + msg, e); + } + } } diff --git a/modules/server/src/main/java/io/jpom/socket/handler/ConsoleHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/ConsoleHandler.java index b3fcc3c22..df2abf965 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/ConsoleHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/ConsoleHandler.java @@ -30,8 +30,9 @@ import io.jpom.permission.Feature; import io.jpom.permission.MethodFeature; import io.jpom.socket.BaseProxyHandler; import io.jpom.socket.ConsoleCommandOp; -import io.jpom.socket.ProxySession; +import top.jpom.transport.IProxyWebSocket; +import java.io.IOException; import java.util.Map; /** @@ -54,9 +55,9 @@ public class ConsoleHandler extends BaseProxyHandler { @Override protected String handleTextMessage(Map attributes, - ProxySession proxySession, + IProxyWebSocket proxySession, JSONObject json, - ConsoleCommandOp consoleCommandOp) { + ConsoleCommandOp consoleCommandOp) throws IOException { //ProjectInfoCacheModel dataItem = (ProjectInfoCacheModel) attributes.get("dataItem"); // UserModel userModel = (UserModel) attributes.get("userInfo"); // if (RunMode.Dsl.name().equals(dataItem.getRunMode()) && userModel.isDemoUser()) { diff --git a/modules/server/src/main/java/io/jpom/socket/handler/DockerCliHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/DockerCliHandler.java index 29ddd2ef9..7fa1c47f1 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/DockerCliHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/DockerCliHandler.java @@ -35,6 +35,7 @@ import io.jpom.permission.MethodFeature; import io.jpom.plugin.IPlugin; import io.jpom.plugin.PluginFactory; import io.jpom.service.docker.DockerInfoService; +import io.jpom.util.SocketSessionUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; @@ -170,5 +171,6 @@ public class DockerCliHandler extends BaseTerminalHandler { } IoUtil.close(session); HANDLER_ITEM_CONCURRENT_HASH_MAP.remove(session.getId()); + SocketSessionUtil.close(session); } } diff --git a/modules/server/src/main/java/io/jpom/socket/handler/DockerLogHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/DockerLogHandler.java index e4afb8927..c63b74e5b 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/DockerLogHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/DockerLogHandler.java @@ -113,5 +113,6 @@ public class DockerLogHandler extends BaseProxyHandler { // super.destroy(session); // ScriptProcessBuilder.stopWatcher(session); + SocketSessionUtil.close(session); } } diff --git a/modules/server/src/main/java/io/jpom/socket/handler/NodeScriptHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/NodeScriptHandler.java index 70b1a19fb..b547c46b8 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/NodeScriptHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/NodeScriptHandler.java @@ -37,8 +37,9 @@ import io.jpom.service.node.script.NodeScriptExecuteLogServer; import io.jpom.service.node.script.NodeScriptServer; import io.jpom.socket.BaseProxyHandler; import io.jpom.socket.ConsoleCommandOp; -import io.jpom.socket.ProxySession; +import top.jpom.transport.IProxyWebSocket; +import java.io.IOException; import java.util.Map; /** @@ -60,7 +61,7 @@ public class NodeScriptHandler extends BaseProxyHandler { } @Override - protected String handleTextMessage(Map attributes, ProxySession proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) { + protected String handleTextMessage(Map attributes, IProxyWebSocket proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) throws IOException { if (consoleCommandOp != ConsoleCommandOp.heart) { super.logOpt(this.getClass(), attributes, json); } diff --git a/modules/server/src/main/java/io/jpom/socket/handler/NodeUpdateHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/NodeUpdateHandler.java index 2fa399134..e33c702bc 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/NodeUpdateHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/NodeUpdateHandler.java @@ -28,7 +28,6 @@ import cn.hutool.core.io.resource.FileResource; import cn.hutool.core.lang.Tuple; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; import cn.hutool.http.HttpStatus; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; @@ -41,7 +40,6 @@ import io.jpom.common.forward.NodeUrl; import io.jpom.model.AgentFileModel; import io.jpom.model.WebSocketMessageModel; import io.jpom.model.data.NodeModel; -import io.jpom.model.user.UserModel; import io.jpom.permission.ClassFeature; import io.jpom.permission.Feature; import io.jpom.permission.MethodFeature; @@ -50,9 +48,12 @@ import io.jpom.service.node.NodeService; import io.jpom.service.system.SystemParametersServer; import io.jpom.socket.BaseProxyHandler; import io.jpom.socket.ConsoleCommandOp; -import io.jpom.socket.client.NodeClient; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; +import top.jpom.transport.DataContentType; +import top.jpom.transport.IProxyWebSocket; +import top.jpom.transport.IUrlItem; +import top.jpom.transport.TransportServerFactory; import java.io.File; import java.io.FileInputStream; @@ -73,22 +74,25 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public class NodeUpdateHandler extends BaseProxyHandler { - private final ConcurrentMap clientMap = new ConcurrentHashMap<>(); + private final ConcurrentMap clientMap = new ConcurrentHashMap<>(); private static final int CHECK_COUNT = 60; - private SystemParametersServer systemParametersServer; - private NodeService nodeService; + private final SystemParametersServer systemParametersServer; + private final NodeService nodeService; - public NodeUpdateHandler() { + public NodeUpdateHandler(NodeService nodeService, SystemParametersServer systemParametersServer) { super(null); + this.nodeService = nodeService; + this.systemParametersServer = systemParametersServer; + //systemParametersServer = SpringUtil.getBean(SystemParametersServer.class); +// nodeService = SpringUtil.getBean(NodeService.class); } @Override protected void init(WebSocketSession session, Map attributes) throws Exception { super.init(session, attributes); - systemParametersServer = SpringUtil.getBean(SystemParametersServer.class); - nodeService = SpringUtil.getBean(NodeService.class); + } @Override @@ -109,18 +113,29 @@ public class NodeUpdateHandler extends BaseProxyHandler { return; } for (NodeModel model : nodeModelList) { - NodeClient nodeClient = clientMap.get(model.getId()); + IProxyWebSocket nodeClient = clientMap.get(model.getId()); if (nodeClient != null) { // - nodeClient.close(); + try { + nodeClient.close(); + } catch (Exception e) { + log.error("关闭连接异常", e); + } } - Map attributes = session.getAttributes(); - String url = NodeForward.getSocketUrl(model, NodeUrl.NodeUpdate, (UserModel) attributes.get("userInfo")); +// Map attributes = session.getAttributes(); +// String url = NodeForward.getSocketUrl(model,, (UserModel) attributes.get("userInfo")); // 连接节点 ThreadUtil.execute(() -> { try { - NodeClient client = new NodeClient(url, model, session); - clientMap.put(model.getId(), client); + IUrlItem urlItem = NodeForward.createUrlItem(model, NodeUrl.NodeUpdate, DataContentType.FORM_URLENCODED); + + IProxyWebSocket proxySession = TransportServerFactory.get().websocket(model, urlItem); + proxySession.onMessage(s -> sendMsg(session, s)); + proxySession.open(); + WebSocketMessageModel command = new WebSocketMessageModel("getVersion", model.getId()); + proxySession.send(command.toString()); +// NodeClient client = new NodeClient(url, model, session); + clientMap.put(model.getId(), proxySession); } catch (Exception e) { log.error("创建插件端连接失败", e); } @@ -131,9 +146,13 @@ public class NodeUpdateHandler extends BaseProxyHandler { @Override public void destroy(WebSocketSession session) { for (String key : clientMap.keySet()) { - NodeClient client = clientMap.get(key); - if (client.isOpen()) { - client.close(); + IProxyWebSocket client = clientMap.get(key); + if (client.isConnected()) { + try { + client.close(); + } catch (Exception e) { + log.error("关闭连接异常", e); + } } } clientMap.clear(); @@ -244,7 +263,7 @@ public class NodeUpdateHandler extends BaseProxyHandler { } } - private void updateNodeItemWebSocket(NodeClient client, String id, WebSocketSession session, AgentFileModel agentFileModel) throws IOException { + private void updateNodeItemWebSocket(IProxyWebSocket client, String id, WebSocketSession session, AgentFileModel agentFileModel) throws IOException { // 发送文件信息 WebSocketMessageModel webSocketMessageModel = new WebSocketMessageModel("upload", id); webSocketMessageModel.setNodeId(id); @@ -271,7 +290,7 @@ public class NodeUpdateHandler extends BaseProxyHandler { ++retryCount; try { ThreadUtil.sleep(1000L); - if (client.reconnectBlocking()) { + if (client.reopen()) { this.sendMsg(restartMessage.setData("重启完成"), session); return; } @@ -292,12 +311,12 @@ public class NodeUpdateHandler extends BaseProxyHandler { this.onError(session, "没有对应的节点:" + id); return; } - NodeClient client = clientMap.get(node.getId()); + IProxyWebSocket client = clientMap.get(node.getId()); if (client == null) { this.onError(session, "对应的插件端还没有被初始化:" + id); return; } - if (client.isOpen()) { + if (client.isConnected()) { if (http) { this.updateNodeItemHttp(node, session, agentFileModel); } else { diff --git a/modules/server/src/main/java/io/jpom/socket/handler/SshHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/SshHandler.java index 1c4ee5073..9c5084823 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/SshHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/SshHandler.java @@ -43,6 +43,7 @@ import io.jpom.permission.MethodFeature; import io.jpom.service.dblog.SshTerminalExecuteLogService; import io.jpom.service.node.ssh.SshService; import io.jpom.service.user.UserBindWorkspaceService; +import io.jpom.util.SocketSessionUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpHeaders; import org.springframework.web.socket.TextMessage; @@ -291,5 +292,6 @@ public class SshHandler extends BaseTerminalHandler { } IoUtil.close(session); HANDLER_ITEM_CONCURRENT_HASH_MAP.remove(session.getId()); + SocketSessionUtil.close(session); } } diff --git a/modules/server/src/main/java/io/jpom/socket/handler/TomcatHandler.java b/modules/server/src/main/java/io/jpom/socket/handler/TomcatHandler.java index b86edcec7..522376d73 100644 --- a/modules/server/src/main/java/io/jpom/socket/handler/TomcatHandler.java +++ b/modules/server/src/main/java/io/jpom/socket/handler/TomcatHandler.java @@ -31,12 +31,12 @@ import io.jpom.permission.Feature; import io.jpom.permission.MethodFeature; import io.jpom.socket.BaseProxyHandler; import io.jpom.socket.ConsoleCommandOp; -import io.jpom.socket.ProxySession; import io.jpom.socket.ServiceFileTailWatcher; import io.jpom.system.LogbackConfig; import io.jpom.util.SocketSessionUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; +import top.jpom.transport.IProxyWebSocket; import java.io.File; import java.io.IOException; @@ -95,7 +95,7 @@ public class TomcatHandler extends BaseProxyHandler { } @Override - protected String handleTextMessage(Map attributes, ProxySession proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) { + protected String handleTextMessage(Map attributes, IProxyWebSocket proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) throws IOException { proxySession.send(json.toString()); return null; } diff --git a/modules/server/src/main/java/io/jpom/util/SocketSessionUtil.java b/modules/server/src/main/java/io/jpom/util/SocketSessionUtil.java index 1abc7a770..fab919852 100644 --- a/modules/server/src/main/java/io/jpom/util/SocketSessionUtil.java +++ b/modules/server/src/main/java/io/jpom/util/SocketSessionUtil.java @@ -22,13 +22,16 @@ */ package io.jpom.util; -import cn.hutool.core.thread.ThreadUtil; -import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.unit.DataSize; import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * socket 会话对象 @@ -38,46 +41,24 @@ import java.io.IOException; */ @Slf4j public class SocketSessionUtil { - /** - * 锁 - */ - private static final KeyLock LOCK = new KeyLock<>(); - /** - * 错误尝试次数 - */ - private static final int ERROR_TRY_COUNT = 10; + + private static final Map SOCKET_MAP = new ConcurrentHashMap<>(); public static void send(WebSocketSession session, String msg) throws IOException { - if (StrUtil.isEmpty(msg)) { + send(session, new TextMessage(msg)); + } + + public static void send(WebSocketSession session, WebSocketMessage message) throws IOException { + if (!session.isOpen()) { + // 会话关闭不能发送消息 @author jzy 21-08-04 + log.warn("回话已经关闭啦,不能发送消息:{}", message.getPayload()); return; } - if (!session.isOpen()) { - throw new RuntimeException("session close "); - } - try { - LOCK.lock(session.getId()); - IOException exception = null; - int tryCount = 0; - do { - tryCount++; - if (exception != null) { - // 上一次有异常、休眠 500 - ThreadUtil.sleep(500); - } - try { - session.sendMessage(new TextMessage(msg)); - exception = null; - break; - } catch (IOException e) { - log.error("发送消息失败:" + tryCount, e); - exception = e; - } - } while (tryCount <= ERROR_TRY_COUNT); - if (exception != null) { - throw exception; - } - } finally { - LOCK.unlock(session.getId()); - } + WebSocketSession webSocketSession = SOCKET_MAP.computeIfAbsent(session.getId(), s -> new ConcurrentWebSocketSessionDecorator(session, 60 * 1000, (int) DataSize.ofMegabytes(5).toBytes())); + webSocketSession.sendMessage(message); + } + + public static void close(WebSocketSession session) { + SOCKET_MAP.remove(session.getId()); } }