完善实时进度条

This commit is contained in:
myzf 2019-08-13 15:09:16 +08:00
parent 28bc40aa38
commit fc42a993af
9 changed files with 459 additions and 74 deletions

View File

@ -0,0 +1,37 @@
package cn.keepbx.plugin.netty;
/**
* @package cn.keepbx.plugin.netty
* @Date Created in 2019/8/13 10:45
* @Author myzf
*/
public class AuthMsg {
private int type;
private String userId;
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
@Override
public String toString() {
return "AuthMsg{" +
"type=" + type +
", userId='" + userId + '\'' +
'}';
}
}

View File

@ -0,0 +1,45 @@
package cn.keepbx.plugin.netty;
import cn.hutool.core.util.StrUtil;
import cn.jiangzeyin.common.spring.SpringUtil;
import cn.keepbx.jpom.model.data.UserModel;
import cn.keepbx.jpom.service.user.UserService;
import cn.keepbx.util.UserChannelRel;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
*
* @Description: 处理消息的handler
* TextWebSocketFrame 在netty中是用于为websocket专门处理文本的对象frame是消息的载体
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于记录和管理所有客户端的channle
public static ChannelGroup users =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常之后关闭连接关闭channel随后从ChannelGroup中移除
ctx.channel().close();
users.remove(ctx.channel());
}
}

View File

@ -10,30 +10,44 @@ import cn.hutool.extra.ssh.JschUtil;
import cn.jiangzeyin.common.DefaultSystemLog;
import cn.jiangzeyin.common.spring.SpringUtil;
import cn.keepbx.jpom.model.data.SshModel;
import cn.keepbx.jpom.model.data.UserModel;
import cn.keepbx.jpom.service.node.ssh.SshService;
import cn.keepbx.jpom.service.user.UserService;
import cn.keepbx.util.UserChannelRel;
import com.alibaba.fastjson.JSON;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.http.MediaType;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
@ -42,85 +56,143 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
* @author myzf
* @date 2019/8/11 19:42
*/
public class FileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> implements ChannelProgressiveFutureListener {
public class FileServerHandler extends SimpleChannelInboundHandler<Object> implements ChannelProgressiveFutureListener {
private Session session = null;
private ChannelSftp channel = null;
private long fileSize;
public static ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
//检测解码情况
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
//获取请求参数 共下面页面单个下载用
Map<String, String> parse = parse(request);
String id = parse.get("id");
String path = parse.get("path");
String name = parse.get("name");
SshService sshService = SpringUtil.getBean(SshService.class);
SshModel sshModel = sshService.getItem(id);
if (sshModel == null) {
sendError(ctx, NOT_FOUND);
return;
}
List<String> fileDirs = sshModel.getFileDirs();
//
if (StrUtil.isEmpty(path) || !fileDirs.contains(path)) {
sendError(ctx, NOT_FOUND);
return;
}
if (StrUtil.isEmpty(name)) {
sendError(ctx, NOT_FOUND);
return;
}
try {
session = JschUtil.openSession(sshModel.getHost(), sshModel.getPort(), sshModel.getUser(), sshModel.getPassword());
channel = (ChannelSftp) JschUtil.openChannel(session, ChannelType.SFTP);
String normalize = FileUtil.normalize(path + "/" + name);
SftpATTRS attr = channel.stat(normalize);
fileSize = attr.getSize();
ChannelSftp finalChannel = channel;
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
ThreadUtil.execute(() -> {
try {
finalChannel.get(normalize, pipedOutputStream);
} catch (SftpException e) {
DefaultSystemLog.ERROR().error("下载异常", e);
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest){
FullHttpRequest request = (FullHttpRequest)msg;
//检测解码情况
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
IoUtil.close(pipedOutputStream);
});
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpUtil.setContentLength(response, fileSize);
Map<String, String> parse = parse(request);
//获取请求参数 共下面页面单个下载用
String id = parse.get("id");
String path = parse.get("path");
String name = parse.get("name");
SshService sshService = SpringUtil.getBean(SshService.class);
SshModel sshModel = sshService.getItem(id);
if (sshModel == null) {
sendError(ctx, NOT_FOUND);
return;
}
List<String> fileDirs = sshModel.getFileDirs();
//
if (StrUtil.isEmpty(path) || !fileDirs.contains(path)) {
sendError(ctx, NOT_FOUND);
return;
}
if (StrUtil.isEmpty(name)) {
sendError(ctx, NOT_FOUND);
return;
}
try {
session = JschUtil.openSession(sshModel.getHost(), sshModel.getPort(), sshModel.getUser(), sshModel.getPassword());
channel = (ChannelSftp) JschUtil.openChannel(session, ChannelType.SFTP);
String normalize = FileUtil.normalize(path + "/" + name);
SftpATTRS attr = channel.stat(normalize);
fileSize = attr.getSize();
ChannelSftp finalChannel = channel;
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
ThreadUtil.execute(() -> {
try {
finalChannel.get(normalize, pipedOutputStream);
} catch (SftpException e) {
DefaultSystemLog.ERROR().error("下载异常", e);
}
IoUtil.close(pipedOutputStream);
});
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpUtil.setContentLength(response, fileSize);
response.headers().set(CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE);
// 设定默认文件输出名
String fileName = URLUtil.encode(FileUtil.getName(name));
response.headers().add("Content-disposition", "attachment; filename=" + fileName);
response.headers().set(CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE);
// 设定默认文件输出名
String fileName = URLUtil.encode(FileUtil.getName(name));
response.headers().add("Content-disposition", "attachment; filename=" + fileName);
ctx.write(response);
ChannelFuture sendFileFuture = ctx.write(new HttpChunkedInput(new ChunkedStream(pipedInputStream)), ctx.newProgressivePromise());
sendFileFuture.addListener(this);
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
ctx.write(response);
ChannelFuture sendFileFuture = ctx.write(new HttpChunkedInput(new ChunkedStream(pipedInputStream)), ctx.newProgressivePromise());
sendFileFuture.addListener(this);
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
} catch (Exception e) {
DefaultSystemLog.ERROR().error("下载失败", e);
sendError(ctx, INTERNAL_SERVER_ERROR);
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
} catch (Exception e) {
DefaultSystemLog.ERROR().error("下载失败", e);
sendError(ctx, INTERNAL_SERVER_ERROR);
}else if (msg instanceof WebSocketFrame){
TextWebSocketFrame tmsg = (TextWebSocketFrame)msg;
// 获取客户端传输过来的消息
String content = tmsg.text();
if(StrUtil.hasEmpty(content)){
ctx.channel().close();
}
// 1. 获取客户端发来的消息
AuthMsg authMsg = JSON.parseObject(content, AuthMsg.class);
if(UserChannelRel.get(authMsg.getUserId()) == null){
if(1 == authMsg.getType()){//鉴权类型的
UserService userService = SpringUtil.getBean(UserService.class);
UserModel userModel = userService.checkUser(authMsg.getUserId());
if(userModel == null){
//鉴权失败
ctx.channel().writeAndFlush(new TextWebSocketFrame("鉴权失败!"));
ctx.channel().close();
}
}
}
}
}
/**
* 当客户端连接服务端之后打开连接
* 获取客户端的channle并且放到ChannelGroup中去进行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
System.out.println("客户端被移除channelId为" + channelId);
channels.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
ctx.channel().close();
channels.remove(ctx.channel());
}
@ -166,20 +238,26 @@ public class FileServerHandler extends SimpleChannelInboundHandler<FullHttpReque
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
System.out.println(String.format("%.2f", (progress / (double) fileSize) * 100));
if (total < 0) {
// total unknown
// System.err.println(future.channel() + " Transfer progress: " + progress);
} else {
// System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
if (progress == total) {
if(total < 0 ){
String s = String.format("%.2f", (progress / (double) fileSize) * 100)+"%";
// websocket连接在线
for(Channel channel : channels){
channel.writeAndFlush(
new TextWebSocketFrame(
s));
}
System.err.println(s);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
for(Channel channel : channels){
channel.writeAndFlush(
new TextWebSocketFrame(
"complete"));
}
this.close();
}

View File

@ -0,0 +1,40 @@
package cn.keepbx.plugin.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @Description: 用于检测channel的心跳handler
* 继承ChannelInboundHandlerAdapter从而不需要实现channelRead0方法
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断evt是否是IdleStateEvent用于触发用户事件包含 读空闲/写空闲/读写空闲
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt; // 强制类型转换
if (event.state() == IdleState.READER_IDLE) {
System.out.println("进入读空闲...");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("进入写空闲...");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel关闭前users的数量为" + ChatHandler.users.size());
Channel channel = ctx.channel();
// 关闭无用的channel以防资源浪费
channel.close();
System.out.println("channel关闭后users的数量为" + ChatHandler.users.size());
}
}
}
}

View File

@ -11,9 +11,14 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
/**
* @author myzf
@ -42,10 +47,14 @@ public class NettyThread implements Runnable, AutoCloseable {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("logging",new LoggingHandler("DEBUG"));//
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new IdleStateHandler(30, 30, 60));
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
pipeline.addLast(new FileServerHandler());
}
});
@ -53,11 +62,6 @@ public class NettyThread implements Runnable, AutoCloseable {
channel.closeFuture().sync();
} catch (InterruptedException e) {
DefaultSystemLog.ERROR().error("netty 错误", e);
} finally {
try {
this.close();
} catch (Exception ignored) {
}
}
}

View File

@ -0,0 +1,116 @@
package cn.keepbx.plugin.netty;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import com.jcraft.jsch.SftpProgressMonitor;
import java.io.PipedOutputStream;
/**
* sftp进度条显示
*
* @author myzf
* @date 2019/8/11 20:10
*/
public class ProgressMonitor implements SftpProgressMonitor {
/**
* 记录传输是否结束
*/
private boolean isEnd = false;
/**
* 记录已传输的数据总大小
*/
private long transfered;
/**
* 记录文件总大小
*/
private long fileSize;
private long before;
private long speed;
private PipedOutputStream pipedOutputStream;
public ProgressMonitor(PipedOutputStream pipedOutputStream) {
this.pipedOutputStream = pipedOutputStream;
}
private void printProgress(double rate, long speed) {
int progressWidth = 20;
int downWidth = (int) (progressWidth * rate);
StringBuilder sb = new StringBuilder();
sb.append("\r[");
for (int i = 0; i < progressWidth; i++) {
if (i < downWidth) {
sb.append("");
} else {
sb.append("");
}
}
sb.append("] ").append(String.format("%.2f", rate * 100)).append("% ").append(String.format("%8s", FileUtil.readableFileSize(speed))).append("/S");
System.out.println(sb.toString());
}
/**
* 实现了SftpProgressMonitor接口的count方法
*/
@Override
public boolean count(long count) {
if (isEnd()) {
return false;
}
add(count);
//
// 判断传输是否已结束
long transfered = getTransfered();
if (transfered != fileSize) {
// 判断当前已传输数据大小是否等于文件总大小
speed = transfered - before;
before = transfered;
double rate = transfered / (double) fileSize;
printProgress(rate, speed);
} else {
// 如果当前已传输数据大小等于文件总大小说明已完成设置end
setEnd(true);
}
return true;
}
/**
* 实现了SftpProgressMonitor接口的end方法
*/
@Override
public void end() {
setEnd(true);
printProgress(1, speed);
IoUtil.close(pipedOutputStream);
}
private synchronized void add(long count) {
transfered = transfered + count;
}
private synchronized long getTransfered() {
return transfered;
}
public synchronized void setTransfered(long transfered) {
this.transfered = transfered;
}
private synchronized void setEnd(boolean isEnd) {
this.isEnd = isEnd;
}
private synchronized boolean isEnd() {
return isEnd;
}
@Override
public void init(int op, String src, String dest, long max) {
this.fileSize = max;
}
}

View File

@ -0,0 +1,28 @@
package cn.keepbx.util;
import io.netty.channel.Channel;
import java.util.HashMap;
/**
* @Description: 用户id和channel的关联关系处理
*/
public class UserChannelRel {
private static HashMap<String, Channel> manager = new HashMap<>();
public static void put(String senderId, Channel channel) {
manager.put(senderId, channel);
}
public static Channel get(String senderId) {
return manager.get(senderId);
}
public static void output() {
for (HashMap.Entry<String, Channel> entry : manager.entrySet()) {
System.out.println("UserId: " + entry.getKey()
+ ", ChannelId: " + entry.getValue().id().asLongText());
}
}
}

View File

@ -1,4 +1,6 @@
spring:
thymeleaf:
#开发时关闭缓存,不然没法看到实时页面
cache: false
cache: false
netty:
port: 8888

View File

@ -35,7 +35,7 @@
<label class="layui-form-label">文件夹</label>
<div class="layui-input-block">
<div class="layui-progress layui-progress-big" lay-filter="demo" lay-showPercent="true">
<div class="layui-progress-bar" lay-percent="0%">下载</div>
<div class="layui-progress-bar layui-bg-blue" lay-percent="0%">下载</div>
</div>
</div>
</div>
@ -60,6 +60,40 @@
var allData;
var clickData;
var openLayuiIndex;
var ee;
layui.use('element', function(){
ee = layui.element;
});
if ("WebSocket" in window) {
var ws = new WebSocket("ws://localhost:8888");
ws.onopen = function () {
//发送授权
var authContent = new AuthContent(1,"[[${session.user.getUserMd5Key()}]]")
ws.send(JSON.stringify(authContent));
}
ws.onmessage = function (evt) {
var received_msg = evt.data;
if(received_msg.indexOf("%")>0){
ee.progress('demo', received_msg);
}else if("complete"== received_msg){
ee.progress('demo', '100%');
alert("下载成功!")
}else {
alert(received_msg);
}
};
}
function AuthContent(type, userId){
this.type = type;
this.userId = userId;
}
function getData(path, children, preData) {
loadingAjax({
@ -152,6 +186,7 @@
});
$("#download").click(function () {
ee.init()
if (!clickData) {
return;
}