feat 服务端删除 Java-WebSocket 依赖(采用统一模块管理)

This commit is contained in:
bwcx_jzy 2022-12-26 17:22:35 +08:00
parent 931e0fe2a8
commit d0a63fd304
No known key found for this signature in database
GPG Key ID: 5E48E9372088B9E5
22 changed files with 613 additions and 363 deletions

View File

@ -18,6 +18,7 @@
7. 【server】优化 websocket 控制台操作日志记录
8. 【server】修复 超级管理的 websocket 操作日志记录工作空间不正确
9. 【agent】优化 插件端删除 spring-boot-starter-websocket 依赖
10. 【server】优化 服务端删除 Java-WebSocket 依赖(采用统一模块管理)
### ❌ 不兼容功能

View File

@ -0,0 +1,67 @@
package top.jpom.transport;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
/**
* @author bwcx_jzy
* @since 2022/12/26
*/
public interface IProxyWebSocket {
/**
* 关闭连接
*
* @throws IOException 关闭异常
*/
void close() throws IOException;
/**
* 打开连接
*
* @return 打开状态
*/
boolean open();
/**
* 重新打开连接
*
* @return 打开状态
* @throws IOException 关闭异常
*/
default boolean reopen() throws IOException {
this.close();
return this.open();
}
/**
* 发送消息
*
* @param msg 消息
* @throws IOException 发送异常
*/
void send(String msg) throws IOException;
/**
* 发送消息
*
* @param bytes 消息
* @throws IOException 发送异常
*/
void send(ByteBuffer bytes) throws IOException;
/**
* 收到消息
*
* @param consumer 回调
*/
void onMessage(Consumer<String> consumer);
/**
* 是否连接上
*
* @return true
*/
boolean isConnected();
}

View File

@ -37,6 +37,7 @@ public interface IUrlItem {
/**
* 请求超时时间
* 单位秒
*
* @return 超时时间
*/

View File

@ -91,4 +91,14 @@ public interface TransportServer {
* @param consumer 回调
*/
void download(INodeInfo nodeInfo, IUrlItem urlItem, Object data, Consumer<DownloadCallback> consumer);
/**
* 创建 websocket 连接
*
* @param nodeInfo 节点信息
* @param urlItem 请求 item
* @param parameters 参数
* @return websocket
*/
IProxyWebSocket websocket(INodeInfo nodeInfo, IUrlItem urlItem, Object... parameters);
}

View File

@ -22,6 +22,7 @@
*/
package top.jpom.transport;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.*;
@ -126,4 +127,31 @@ public class HttpTransportServer implements TransportServer {
throw Lombok.sneakyThrow(TransformServerFactory.get().transformException(e, nodeInfo));
}
}
@Override
public IProxyWebSocket websocket(INodeInfo nodeInfo, IUrlItem urlItem, Object... parameters) {
String ws = "https".equalsIgnoreCase(nodeInfo.scheme()) ? "wss" : "ws";
String url = StrUtil.format("{}://{}/", nodeInfo.scheme(), nodeInfo.url());
UrlBuilder urlBuilder = UrlBuilder.of(url).addPath(urlItem.path());
//
urlBuilder.addQuery(TransportServer.JPOM_AGENT_AUTHORIZE, nodeInfo.authorize());
//
urlBuilder.addQuery(TransportServer.WORKSPACE_ID_REQ_HEADER, urlItem.workspaceId());
for (int i = 0; i < parameters.length; i += 2) {
Object parameter = parameters[i + 1];
String value = Convert.toStr(parameter, StrUtil.EMPTY);
urlBuilder.addQuery(parameters[i].toString(), value);
}
urlBuilder.setWithEndTag(false);
String uriTemplate = urlBuilder.build();
uriTemplate = StrUtil.removePrefixIgnoreCase(uriTemplate, "http");
uriTemplate = StrUtil.removePrefixIgnoreCase(uriTemplate, "https");
uriTemplate = StrUtil.format("{}{}", ws, uriTemplate);
//
if (log.isDebugEnabled()) {
log.debug("{}[{}] -> {}", nodeInfo.name(), uriTemplate, urlItem.workspaceId());
}
Integer timeout = urlItem.timeout();
return new ServletWebSocketClientHandler(uriTemplate, timeout);
}
}

View File

@ -0,0 +1,103 @@
package top.jpom.transport;
import cn.hutool.core.thread.ThreadUtil;
import org.springframework.util.Assert;
import org.springframework.util.unit.DataSize;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketConnectionManager;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* @author bwcx_jzy
* @since 2022/12/26
*/
public class ServletWebSocketClientHandler extends AbstractWebSocketHandler implements IProxyWebSocket {
private static final StandardWebSocketClient CLIENT = new StandardWebSocketClient();
private WebSocketSession session;
// private final Integer timeout;
private final String uriTemplate;
private Consumer<String> consumerText;
private WebSocketConnectionManager manager;
public ServletWebSocketClientHandler(String uriTemplate, Integer timeout) {
this.uriTemplate = uriTemplate;
// this.timeout = timeout;
}
@Override
public void onMessage(Consumer<String> consumer) {
this.consumerText = consumer;
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
Optional.ofNullable(this.consumerText).ifPresent(consumer -> consumer.accept(message.getPayload()));
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 发送消息时间限制 60
// 缓存 5m 消息
this.session = new ConcurrentWebSocketSessionDecorator(session, 60 * 1000, (int) DataSize.ofMegabytes(5).toBytes());
}
@Override
public void close() throws IOException {
if (this.manager == null) {
return;
}
this.manager.stop();
this.manager = null;
this.session = null;
}
@Override
public boolean open() {
this.manager = new WebSocketConnectionManager(CLIENT, this, this.uriTemplate);
this.manager.start();
int maxTimeout = 50 * 60;// Optional.ofNullable(this.timeout).orElse(5 * 60);
int waitTime = 0;
do {
if (this.isConnected()) {
return true;
}
waitTime++;
ThreadUtil.sleep(500, TimeUnit.MILLISECONDS);
} while (waitTime * 2 <= maxTimeout);
//
return false;
}
@Override
public void send(String msg) throws IOException {
Assert.notNull(this.session, "还没有连接上");
session.sendMessage(new TextMessage(msg));
}
@Override
public void send(ByteBuffer bytes) throws IOException {
Assert.notNull(this.session, "还没有连接上");
session.sendMessage(new BinaryMessage(bytes));
}
@Override
public boolean isConnected() {
if (this.manager == null) {
return false;
}
return this.manager.isConnected();
}
}

View File

@ -53,13 +53,6 @@
<version>${project.version}</version>
</dependency>
<!--websocket作为客户端-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<!-- 数据库-->
<dependency>
<groupId>io.jpom.plugins</groupId>

View File

@ -77,7 +77,7 @@ public class NodeForward {
* @param dataContentType 传输的数据类型
* @return item
*/
private static IUrlItem createUrlItem(NodeModel nodeModel, NodeUrl nodeUrl, DataContentType dataContentType) {
public static IUrlItem createUrlItem(NodeModel nodeModel, NodeUrl nodeUrl, DataContentType dataContentType) {
// 修正节点密码
if (StrUtil.isEmpty(nodeModel.getLoginPwd())) {
NodeService nodeService = SpringUtil.getBean(NodeService.class);

View File

@ -27,9 +27,14 @@ import com.alibaba.fastjson2.JSONObject;
import io.jpom.common.forward.NodeForward;
import io.jpom.common.forward.NodeUrl;
import io.jpom.model.data.NodeModel;
import io.jpom.model.user.UserModel;
import io.jpom.util.SocketSessionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import top.jpom.transport.DataContentType;
import top.jpom.transport.IProxyWebSocket;
import top.jpom.transport.IUrlItem;
import top.jpom.transport.TransportServerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
@ -41,6 +46,7 @@ import java.util.Map;
* @author jiangzeyin
* @since 2019/4/25
*/
@Slf4j
public abstract class BaseProxyHandler extends BaseHandler {
private final NodeUrl nodeUrl;
@ -78,13 +84,16 @@ public abstract class BaseProxyHandler extends BaseHandler {
return;
}
NodeModel nodeModel = (NodeModel) attributes.get("nodeInfo");
UserModel userInfo = (UserModel) attributes.get("userInfo");
// UserModel userInfo = (UserModel) attributes.get("userInfo");
if (nodeModel != null) {
Object[] parameters = this.getParameters(attributes);
String url = NodeForward.getSocketUrl(nodeModel, nodeUrl, userInfo, parameters);
// 连接节点
ProxySession proxySession = new ProxySession(url, nodeModel.getTimeOut(), session);
IUrlItem urlItem = NodeForward.createUrlItem(nodeModel, this.nodeUrl, DataContentType.FORM_URLENCODED);
IProxyWebSocket proxySession = TransportServerFactory.get().websocket(nodeModel, urlItem, parameters);
proxySession.onMessage(s -> sendMsg(session, s));
proxySession.open();
session.getAttributes().put("proxySession", proxySession);
}
@ -96,7 +105,7 @@ public abstract class BaseProxyHandler extends BaseHandler {
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String msg = message.getPayload();
Map<String, Object> attributes = session.getAttributes();
ProxySession proxySession = (ProxySession) attributes.get("proxySession");
IProxyWebSocket proxySession = (IProxyWebSocket) attributes.get("proxySession");
JSONObject json = JSONObject.parseObject(msg);
String op = json.getString("op");
ConsoleCommandOp consoleCommandOp = StrUtil.isNotEmpty(op) ? ConsoleCommandOp.valueOf(op) : null;
@ -135,9 +144,9 @@ public abstract class BaseProxyHandler extends BaseHandler {
* @param consoleCommandOp 操作类型
*/
protected String handleTextMessage(Map<String, Object> attributes,
ProxySession proxySession,
IProxyWebSocket proxySession,
JSONObject json,
ConsoleCommandOp consoleCommandOp) {
ConsoleCommandOp consoleCommandOp) throws IOException {
return null;
}
@ -151,9 +160,14 @@ public abstract class BaseProxyHandler extends BaseHandler {
} catch (IOException ignored) {
}
Map<String, Object> attributes = session.getAttributes();
ProxySession proxySession = (ProxySession) attributes.get("proxySession");
IProxyWebSocket proxySession = (IProxyWebSocket) attributes.get("proxySession");
if (proxySession != null) {
proxySession.close();
try {
proxySession.close();
} catch (Exception e) {
log.error("关闭异常", e);
}
}
SocketSessionUtil.close(session);
}
}

View File

@ -20,109 +20,131 @@
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package io.jpom.socket;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson2.JSONObject;
import io.jpom.system.init.OperateLogController;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 代理socket 会话
*
* @author jiangzeyin
* @since 2019/4/16
*/
@Slf4j
public class ProxySession extends WebSocketClient {
private final WebSocketSession session;
private final OperateLogController logController;
/**
* 等待连接成功
*/
private void loopOpen() {
int count = 0;
while (!this.isOpen() && count < 20) {
ThreadUtil.sleep(500);
count++;
}
}
public ProxySession(String uri, Integer timeOut, WebSocketSession session) throws URISyntaxException, InterruptedException {
super(new URI(uri));
//this(new URI(uri), session);
Objects.requireNonNull(session);
this.session = session;
if (timeOut == null) {
this.connect();
} else {
this.connectBlocking(Math.max(timeOut, 1), TimeUnit.SECONDS);
}
this.loopOpen();
logController = SpringUtil.getBean(OperateLogController.class);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
}
@Override
public void onMessage(String message) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("发送消息失败", e);
}
try {
JSONObject jsonObject = JSONObject.parseObject(message);
String reqId = jsonObject.getString("reqId");
if (StrUtil.isNotEmpty(reqId)) {
logController.updateLog(reqId, message);
}
} catch (Exception ignored) {
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
try {
session.close();
} catch (IOException e) {
log.error("关闭错误", e);
}
}
@Override
public void onError(Exception ex) {
try {
session.sendMessage(new TextMessage("agent服务端发生异常" + ExceptionUtil.stacktraceToString(ex)));
// SocketSessionUtil.send(session, );
} catch (IOException ignored) {
}
log.error("发生错误", ex);
}
@Override
public void send(String text) {
try {
super.send(text);
} catch (Exception e) {
log.error("转发消息失败", e);
}
}
}
///*
// * The MIT License (MIT)
// *
// * Copyright (c) 2019 Code Technology Studio
// *
// * Permission is hereby granted, free of charge, to any person obtaining a copy of
// * this software and associated documentation files (the "Software"), to deal in
// * the Software without restriction, including without limitation the rights to
// * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// * the Software, and to permit persons to whom the Software is furnished to do so,
// * subject to the following conditions:
// *
// * The above copyright notice and this permission notice shall be included in all
// * copies or substantial portions of the Software.
// *
// * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// */
//package io.jpom.socket;
//
//import cn.hutool.core.exceptions.ExceptionUtil;
//import cn.hutool.core.thread.ThreadUtil;
//import cn.hutool.core.util.StrUtil;
//import cn.hutool.extra.spring.SpringUtil;
//import com.alibaba.fastjson2.JSONObject;
//import io.jpom.system.init.OperateLogController;
//import lombok.extern.slf4j.Slf4j;
//import org.java_websocket.client.WebSocketClient;
//import org.java_websocket.handshake.ServerHandshake;
//import org.springframework.web.socket.TextMessage;
//import org.springframework.web.socket.WebSocketSession;
//
//import java.io.IOException;
//import java.net.URI;
//import java.net.URISyntaxException;
//import java.util.Objects;
//import java.util.concurrent.TimeUnit;
//
///**
// * 代理socket 会话
// *
// * @author jiangzeyin
// * @since 2019/4/16
// */
//@Slf4j
//public class ProxySession extends WebSocketClient {
// private final WebSocketSession session;
// private final OperateLogController logController;
//
// /**
// * 等待连接成功
// */
// private void loopOpen() {
// int count = 0;
// while (!this.isOpen() && count < 20) {
// ThreadUtil.sleep(500);
// count++;
// }
// }
//
// public ProxySession(String uri, Integer timeOut, WebSocketSession session) throws URISyntaxException, InterruptedException {
// super(new URI(uri));
// //this(new URI(uri), session);
// Objects.requireNonNull(session);
// this.session = session;
// if (timeOut == null) {
// this.connect();
// } else {
// this.connectBlocking(Math.max(timeOut, 1), TimeUnit.SECONDS);
// }
// this.loopOpen();
// logController = SpringUtil.getBean(OperateLogController.class);
// }
//
// @Override
// public void onOpen(ServerHandshake serverHandshake) {
//
// }
//
// @Override
// public void onMessage(String message) {
// try {
// session.sendMessage(new TextMessage(message));
// } catch (IOException e) {
// log.error("发送消息失败", e);
// }
// try {
// JSONObject jsonObject = JSONObject.parseObject(message);
// String reqId = jsonObject.getString("reqId");
// if (StrUtil.isNotEmpty(reqId)) {
// logController.updateLog(reqId, message);
// }
// } catch (Exception ignored) {
// }
// }
//
// @Override
// public void onClose(int code, String reason, boolean remote) {
// try {
// session.close();
// } catch (IOException e) {
// log.error("关闭错误", e);
// }
// }
//
// @Override
// public void onError(Exception ex) {
// try {
// session.sendMessage(new TextMessage("agent服务端发生异常" + ExceptionUtil.stacktraceToString(ex)));
//// SocketSessionUtil.send(session, );
// } catch (IOException ignored) {
// }
// log.error("发生错误", ex);
// }
//
// @Override
// public void send(String text) {
// try {
// super.send(text);
// } catch (Exception e) {
// log.error("转发消息失败", e);
// }
// }
//}

View File

@ -22,6 +22,8 @@
*/
package io.jpom.socket;
import io.jpom.service.node.NodeService;
import io.jpom.service.system.SystemParametersServer;
import io.jpom.socket.handler.*;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
@ -37,9 +39,15 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
@EnableWebSocket
public class ServerWebSocketConfig implements WebSocketConfigurer {
private final ServerWebSocketInterceptor serverWebSocketInterceptor;
private final SystemParametersServer systemParametersServer;
private final NodeService nodeService;
public ServerWebSocketConfig(ServerWebSocketInterceptor serverWebSocketInterceptor) {
public ServerWebSocketConfig(ServerWebSocketInterceptor serverWebSocketInterceptor,
SystemParametersServer systemParametersServer,
NodeService nodeService) {
this.serverWebSocketInterceptor = serverWebSocketInterceptor;
this.systemParametersServer = systemParametersServer;
this.nodeService = nodeService;
}
@Override
@ -57,7 +65,7 @@ public class ServerWebSocketConfig implements WebSocketConfigurer {
registry.addHandler(new SshHandler(), "/socket/ssh")
.addInterceptors(serverWebSocketInterceptor).setAllowedOrigins("*");
// 节点升级
registry.addHandler(new NodeUpdateHandler(), "/socket/node_update")
registry.addHandler(new NodeUpdateHandler(nodeService, systemParametersServer), "/socket/node_update")
.addInterceptors(serverWebSocketInterceptor).setAllowedOrigins("*");
// 脚本模板
registry.addHandler(new ServerScriptHandler(), "/socket/script_run")

View File

@ -171,15 +171,18 @@ public class ServerWebSocketInterceptor implements HandshakeInterceptor {
String userId = httpServletRequest.getParameter("userId");
UserModel userModel = userService.checkUser(userId);
if (userModel == null) {
return false;
attributes.put("permissionMsg", "用户不存在");
return true;
}
boolean checkNode = this.checkNode(httpServletRequest, attributes, userModel);
HandlerType handlerType = this.fromType(httpServletRequest);
if (!checkNode || handlerType == null) {
return false;
attributes.put("permissionMsg", "未匹配到合适的处理类型");
return true;
}
if (!this.checkHandlerType(handlerType, userModel, httpServletRequest, attributes)) {
return false;
attributes.put("permissionMsg", "未找到匹配的数据");
return true;
}
// 判断权限
String permissionMsg = this.checkPermission(userModel, attributes, handlerType);

View File

@ -1,148 +1,148 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2019 Code Technology Studio
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package io.jpom.socket.client;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson2.JSONObject;
import io.jpom.model.WebSocketMessageModel;
import io.jpom.model.data.NodeModel;
import io.jpom.system.init.OperateLogController;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
/**
* 节点Client
*
* @author lf
*/
@Slf4j
public class NodeClient extends WebSocketClient {
private final WebSocketSession session;
private final OperateLogController logController;
private final NodeModel nodeModel;
public NodeClient(String uri, NodeModel nodeModel, WebSocketSession session) throws URISyntaxException, InterruptedException {
super(new URI(uri));
// 添加 http proxy
Proxy proxy = nodeModel.proxy();
if (proxy != null) {
setProxy(proxy);
}
this.session = session;
this.nodeModel = nodeModel;
//
Integer timeOut = nodeModel.getTimeOut();
if (timeOut == null) {
this.connect();
} else {
this.connectBlocking(timeOut, TimeUnit.SECONDS);
}
this.loopOpen();
logController = SpringUtil.getBean(OperateLogController.class);
}
/**
* 等待连接成功
*/
private void loopOpen() {
int count = 0;
while (!this.isOpen() && count < 20) {
ThreadUtil.sleep(500);
count++;
}
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
// 连接成功后获取版本信息
getVersion();
}
public void getVersion() {
WebSocketMessageModel command = new WebSocketMessageModel("getVersion", this.nodeModel.getId());
send(command.toString());
}
@Override
public void onMessage(String message) {
try {
// 不能并发向同一个客户端发送消息 @author jzy 2021-08-03
synchronized (session.getId()) {
session.sendMessage(new TextMessage(message));
}
} catch (IOException e) {
log.error("发送消息失败", e);
}
try {
JSONObject jsonObject = JSONObject.parseObject(message);
String reqId = jsonObject.getString("reqId");
if (StrUtil.isNotEmpty(reqId)) {
logController.updateLog(reqId, message);
}
} catch (Exception ignored) {
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
}
@Override
public void send(String text) {
super.send(text);
}
@Override
public void close() {
try {
super.close();
} catch (Exception e) {
log.error("关闭异常", e);
}
}
@Override
public void send(ByteBuffer bytes) {
super.send(bytes);
}
@Override
public void onError(Exception e) {
log.error("发生异常", e);
}
}
///*
// * The MIT License (MIT)
// *
// * Copyright (c) 2019 Code Technology Studio
// *
// * Permission is hereby granted, free of charge, to any person obtaining a copy of
// * this software and associated documentation files (the "Software"), to deal in
// * the Software without restriction, including without limitation the rights to
// * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// * the Software, and to permit persons to whom the Software is furnished to do so,
// * subject to the following conditions:
// *
// * The above copyright notice and this permission notice shall be included in all
// * copies or substantial portions of the Software.
// *
// * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// */
//package io.jpom.socket.client;
//
//import cn.hutool.core.thread.ThreadUtil;
//import cn.hutool.core.util.StrUtil;
//import cn.hutool.extra.spring.SpringUtil;
//import com.alibaba.fastjson2.JSONObject;
//import io.jpom.model.WebSocketMessageModel;
//import io.jpom.model.data.NodeModel;
//import io.jpom.system.init.OperateLogController;
//import lombok.extern.slf4j.Slf4j;
//import org.java_websocket.client.WebSocketClient;
//import org.java_websocket.handshake.ServerHandshake;
//import org.springframework.web.socket.TextMessage;
//import org.springframework.web.socket.WebSocketSession;
//
//import java.io.IOException;
//import java.net.Proxy;
//import java.net.URI;
//import java.net.URISyntaxException;
//import java.nio.ByteBuffer;
//import java.util.concurrent.TimeUnit;
//
///**
// * 节点Client
// *
// * @author lf
// */
//@Slf4j
//public class NodeClient extends WebSocketClient {
// private final WebSocketSession session;
// private final OperateLogController logController;
// private final NodeModel nodeModel;
//
//
// public NodeClient(String uri, NodeModel nodeModel, WebSocketSession session) throws URISyntaxException, InterruptedException {
// super(new URI(uri));
// // 添加 http proxy
// Proxy proxy = nodeModel.proxy();
// if (proxy != null) {
// setProxy(proxy);
// }
// this.session = session;
// this.nodeModel = nodeModel;
// //
// Integer timeOut = nodeModel.getTimeOut();
// if (timeOut == null) {
// this.connect();
// } else {
// this.connectBlocking(timeOut, TimeUnit.SECONDS);
// }
// this.loopOpen();
// logController = SpringUtil.getBean(OperateLogController.class);
// }
//
// /**
// * 等待连接成功
// */
// private void loopOpen() {
// int count = 0;
// while (!this.isOpen() && count < 20) {
// ThreadUtil.sleep(500);
// count++;
// }
// }
//
// @Override
// public void onOpen(ServerHandshake serverHandshake) {
// // 连接成功后获取版本信息
// getVersion();
// }
//
// public void getVersion() {
// WebSocketMessageModel command = new WebSocketMessageModel("getVersion", this.nodeModel.getId());
// send(command.toString());
// }
//
// @Override
// public void onMessage(String message) {
// try {
// // 不能并发向同一个客户端发送消息 @author jzy 2021-08-03
// synchronized (session.getId()) {
// session.sendMessage(new TextMessage(message));
// }
// } catch (IOException e) {
// log.error("发送消息失败", e);
// }
// try {
// JSONObject jsonObject = JSONObject.parseObject(message);
// String reqId = jsonObject.getString("reqId");
// if (StrUtil.isNotEmpty(reqId)) {
// logController.updateLog(reqId, message);
// }
// } catch (Exception ignored) {
// }
// }
//
// @Override
// public void onClose(int code, String reason, boolean remote) {
//
// }
//
// @Override
// public void send(String text) {
// super.send(text);
// }
//
//
// @Override
// public void close() {
// try {
// super.close();
// } catch (Exception e) {
// log.error("关闭异常", e);
// }
// }
//
// @Override
// public void send(ByteBuffer bytes) {
// super.send(bytes);
// }
//
// @Override
// public void onError(Exception e) {
// log.error("发生异常", e);
// }
//}

View File

@ -22,8 +22,8 @@
*/
package io.jpom.socket.handler;
import cn.hutool.core.util.StrUtil;
import io.jpom.socket.BaseHandler;
import io.jpom.util.SocketSessionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.WebSocketSession;
@ -37,22 +37,15 @@ import java.io.IOException;
@Slf4j
public abstract class BaseTerminalHandler extends BaseHandler {
protected void sendBinary(WebSocketSession session, String msg) {
if (StrUtil.isEmpty(msg)) {
return;
}
if (!session.isOpen()) {
// 会话关闭不能发送消息 @author jzy 21-08-04
log.warn("回话已经关闭啦,不能发送消息:{}", msg);
return;
}
synchronized (session.getId()) {
BinaryMessage byteBuffer = new BinaryMessage(msg.getBytes());
try {
session.sendMessage(byteBuffer);
} catch (IOException e) {
log.error("发送消息失败:" + msg, e);
}
}
}
protected void sendBinary(WebSocketSession session, String msg) {
if (msg == null) {
return;
}
BinaryMessage byteBuffer = new BinaryMessage(msg.getBytes());
try {
SocketSessionUtil.send(session, byteBuffer);
} catch (IOException e) {
log.error("发送消息失败:" + msg, e);
}
}
}

View File

@ -30,8 +30,9 @@ import io.jpom.permission.Feature;
import io.jpom.permission.MethodFeature;
import io.jpom.socket.BaseProxyHandler;
import io.jpom.socket.ConsoleCommandOp;
import io.jpom.socket.ProxySession;
import top.jpom.transport.IProxyWebSocket;
import java.io.IOException;
import java.util.Map;
/**
@ -54,9 +55,9 @@ public class ConsoleHandler extends BaseProxyHandler {
@Override
protected String handleTextMessage(Map<String, Object> attributes,
ProxySession proxySession,
IProxyWebSocket proxySession,
JSONObject json,
ConsoleCommandOp consoleCommandOp) {
ConsoleCommandOp consoleCommandOp) throws IOException {
//ProjectInfoCacheModel dataItem = (ProjectInfoCacheModel) attributes.get("dataItem");
// UserModel userModel = (UserModel) attributes.get("userInfo");
// if (RunMode.Dsl.name().equals(dataItem.getRunMode()) && userModel.isDemoUser()) {

View File

@ -35,6 +35,7 @@ import io.jpom.permission.MethodFeature;
import io.jpom.plugin.IPlugin;
import io.jpom.plugin.PluginFactory;
import io.jpom.service.docker.DockerInfoService;
import io.jpom.util.SocketSessionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
@ -170,5 +171,6 @@ public class DockerCliHandler extends BaseTerminalHandler {
}
IoUtil.close(session);
HANDLER_ITEM_CONCURRENT_HASH_MAP.remove(session.getId());
SocketSessionUtil.close(session);
}
}

View File

@ -113,5 +113,6 @@ public class DockerLogHandler extends BaseProxyHandler {
//
super.destroy(session);
// ScriptProcessBuilder.stopWatcher(session);
SocketSessionUtil.close(session);
}
}

View File

@ -37,8 +37,9 @@ import io.jpom.service.node.script.NodeScriptExecuteLogServer;
import io.jpom.service.node.script.NodeScriptServer;
import io.jpom.socket.BaseProxyHandler;
import io.jpom.socket.ConsoleCommandOp;
import io.jpom.socket.ProxySession;
import top.jpom.transport.IProxyWebSocket;
import java.io.IOException;
import java.util.Map;
/**
@ -60,7 +61,7 @@ public class NodeScriptHandler extends BaseProxyHandler {
}
@Override
protected String handleTextMessage(Map<String, Object> attributes, ProxySession proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) {
protected String handleTextMessage(Map<String, Object> attributes, IProxyWebSocket proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) throws IOException {
if (consoleCommandOp != ConsoleCommandOp.heart) {
super.logOpt(this.getClass(), attributes, json);
}

View File

@ -28,7 +28,6 @@ import cn.hutool.core.io.resource.FileResource;
import cn.hutool.core.lang.Tuple;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.http.HttpStatus;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
@ -41,7 +40,6 @@ import io.jpom.common.forward.NodeUrl;
import io.jpom.model.AgentFileModel;
import io.jpom.model.WebSocketMessageModel;
import io.jpom.model.data.NodeModel;
import io.jpom.model.user.UserModel;
import io.jpom.permission.ClassFeature;
import io.jpom.permission.Feature;
import io.jpom.permission.MethodFeature;
@ -50,9 +48,12 @@ import io.jpom.service.node.NodeService;
import io.jpom.service.system.SystemParametersServer;
import io.jpom.socket.BaseProxyHandler;
import io.jpom.socket.ConsoleCommandOp;
import io.jpom.socket.client.NodeClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import top.jpom.transport.DataContentType;
import top.jpom.transport.IProxyWebSocket;
import top.jpom.transport.IUrlItem;
import top.jpom.transport.TransportServerFactory;
import java.io.File;
import java.io.FileInputStream;
@ -73,22 +74,25 @@ import java.util.concurrent.ConcurrentMap;
@Slf4j
public class NodeUpdateHandler extends BaseProxyHandler {
private final ConcurrentMap<String, NodeClient> clientMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, IProxyWebSocket> clientMap = new ConcurrentHashMap<>();
private static final int CHECK_COUNT = 60;
private SystemParametersServer systemParametersServer;
private NodeService nodeService;
private final SystemParametersServer systemParametersServer;
private final NodeService nodeService;
public NodeUpdateHandler() {
public NodeUpdateHandler(NodeService nodeService, SystemParametersServer systemParametersServer) {
super(null);
this.nodeService = nodeService;
this.systemParametersServer = systemParametersServer;
//systemParametersServer = SpringUtil.getBean(SystemParametersServer.class);
// nodeService = SpringUtil.getBean(NodeService.class);
}
@Override
protected void init(WebSocketSession session, Map<String, Object> attributes) throws Exception {
super.init(session, attributes);
systemParametersServer = SpringUtil.getBean(SystemParametersServer.class);
nodeService = SpringUtil.getBean(NodeService.class);
}
@Override
@ -109,18 +113,29 @@ public class NodeUpdateHandler extends BaseProxyHandler {
return;
}
for (NodeModel model : nodeModelList) {
NodeClient nodeClient = clientMap.get(model.getId());
IProxyWebSocket nodeClient = clientMap.get(model.getId());
if (nodeClient != null) {
//
nodeClient.close();
try {
nodeClient.close();
} catch (Exception e) {
log.error("关闭连接异常", e);
}
}
Map<String, Object> attributes = session.getAttributes();
String url = NodeForward.getSocketUrl(model, NodeUrl.NodeUpdate, (UserModel) attributes.get("userInfo"));
// Map<String, Object> attributes = session.getAttributes();
// String url = NodeForward.getSocketUrl(model,, (UserModel) attributes.get("userInfo"));
// 连接节点
ThreadUtil.execute(() -> {
try {
NodeClient client = new NodeClient(url, model, session);
clientMap.put(model.getId(), client);
IUrlItem urlItem = NodeForward.createUrlItem(model, NodeUrl.NodeUpdate, DataContentType.FORM_URLENCODED);
IProxyWebSocket proxySession = TransportServerFactory.get().websocket(model, urlItem);
proxySession.onMessage(s -> sendMsg(session, s));
proxySession.open();
WebSocketMessageModel command = new WebSocketMessageModel("getVersion", model.getId());
proxySession.send(command.toString());
// NodeClient client = new NodeClient(url, model, session);
clientMap.put(model.getId(), proxySession);
} catch (Exception e) {
log.error("创建插件端连接失败", e);
}
@ -131,9 +146,13 @@ public class NodeUpdateHandler extends BaseProxyHandler {
@Override
public void destroy(WebSocketSession session) {
for (String key : clientMap.keySet()) {
NodeClient client = clientMap.get(key);
if (client.isOpen()) {
client.close();
IProxyWebSocket client = clientMap.get(key);
if (client.isConnected()) {
try {
client.close();
} catch (Exception e) {
log.error("关闭连接异常", e);
}
}
}
clientMap.clear();
@ -244,7 +263,7 @@ public class NodeUpdateHandler extends BaseProxyHandler {
}
}
private void updateNodeItemWebSocket(NodeClient client, String id, WebSocketSession session, AgentFileModel agentFileModel) throws IOException {
private void updateNodeItemWebSocket(IProxyWebSocket client, String id, WebSocketSession session, AgentFileModel agentFileModel) throws IOException {
// 发送文件信息
WebSocketMessageModel webSocketMessageModel = new WebSocketMessageModel("upload", id);
webSocketMessageModel.setNodeId(id);
@ -271,7 +290,7 @@ public class NodeUpdateHandler extends BaseProxyHandler {
++retryCount;
try {
ThreadUtil.sleep(1000L);
if (client.reconnectBlocking()) {
if (client.reopen()) {
this.sendMsg(restartMessage.setData("重启完成"), session);
return;
}
@ -292,12 +311,12 @@ public class NodeUpdateHandler extends BaseProxyHandler {
this.onError(session, "没有对应的节点:" + id);
return;
}
NodeClient client = clientMap.get(node.getId());
IProxyWebSocket client = clientMap.get(node.getId());
if (client == null) {
this.onError(session, "对应的插件端还没有被初始化:" + id);
return;
}
if (client.isOpen()) {
if (client.isConnected()) {
if (http) {
this.updateNodeItemHttp(node, session, agentFileModel);
} else {

View File

@ -43,6 +43,7 @@ import io.jpom.permission.MethodFeature;
import io.jpom.service.dblog.SshTerminalExecuteLogService;
import io.jpom.service.node.ssh.SshService;
import io.jpom.service.user.UserBindWorkspaceService;
import io.jpom.util.SocketSessionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.web.socket.TextMessage;
@ -291,5 +292,6 @@ public class SshHandler extends BaseTerminalHandler {
}
IoUtil.close(session);
HANDLER_ITEM_CONCURRENT_HASH_MAP.remove(session.getId());
SocketSessionUtil.close(session);
}
}

View File

@ -31,12 +31,12 @@ import io.jpom.permission.Feature;
import io.jpom.permission.MethodFeature;
import io.jpom.socket.BaseProxyHandler;
import io.jpom.socket.ConsoleCommandOp;
import io.jpom.socket.ProxySession;
import io.jpom.socket.ServiceFileTailWatcher;
import io.jpom.system.LogbackConfig;
import io.jpom.util.SocketSessionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
import top.jpom.transport.IProxyWebSocket;
import java.io.File;
import java.io.IOException;
@ -95,7 +95,7 @@ public class TomcatHandler extends BaseProxyHandler {
}
@Override
protected String handleTextMessage(Map<String, Object> attributes, ProxySession proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) {
protected String handleTextMessage(Map<String, Object> attributes, IProxyWebSocket proxySession, JSONObject json, ConsoleCommandOp consoleCommandOp) throws IOException {
proxySession.send(json.toString());
return null;
}

View File

@ -22,13 +22,16 @@
*/
package io.jpom.util;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.unit.DataSize;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* socket 会话对象
@ -38,46 +41,24 @@ import java.io.IOException;
*/
@Slf4j
public class SocketSessionUtil {
/**
*
*/
private static final KeyLock<String> LOCK = new KeyLock<>();
/**
* 错误尝试次数
*/
private static final int ERROR_TRY_COUNT = 10;
private static final Map<String, WebSocketSession> SOCKET_MAP = new ConcurrentHashMap<>();
public static void send(WebSocketSession session, String msg) throws IOException {
if (StrUtil.isEmpty(msg)) {
send(session, new TextMessage(msg));
}
public static void send(WebSocketSession session, WebSocketMessage<?> message) throws IOException {
if (!session.isOpen()) {
// 会话关闭不能发送消息 @author jzy 21-08-04
log.warn("回话已经关闭啦,不能发送消息:{}", message.getPayload());
return;
}
if (!session.isOpen()) {
throw new RuntimeException("session close ");
}
try {
LOCK.lock(session.getId());
IOException exception = null;
int tryCount = 0;
do {
tryCount++;
if (exception != null) {
// 上一次有异常休眠 500
ThreadUtil.sleep(500);
}
try {
session.sendMessage(new TextMessage(msg));
exception = null;
break;
} catch (IOException e) {
log.error("发送消息失败:" + tryCount, e);
exception = e;
}
} while (tryCount <= ERROR_TRY_COUNT);
if (exception != null) {
throw exception;
}
} finally {
LOCK.unlock(session.getId());
}
WebSocketSession webSocketSession = SOCKET_MAP.computeIfAbsent(session.getId(), s -> new ConcurrentWebSocketSessionDecorator(session, 60 * 1000, (int) DataSize.ofMegabytes(5).toBytes()));
webSocketSession.sendMessage(message);
}
public static void close(WebSocketSession session) {
SOCKET_MAP.remove(session.getId());
}
}