diff --git a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java index 9b20c81a..ac477c42 100644 --- a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java +++ b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java @@ -1,4 +1,3 @@ - package com.alibaba.arthas.tunnel.client; import java.net.URI; @@ -12,7 +11,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; @@ -30,18 +28,13 @@ import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.util.concurrent.GenericFutureListener; /** - * * @author hengyunabc 2019-08-28 - * */ public class ForwardClientSocketClientHandler extends SimpleChannelInboundHandler { - private final static Logger logger = LoggerFactory.getLogger(ForwardClientSocketClientHandler.class); - private ChannelPromise handshakeFuture; + private static final Logger logger = LoggerFactory.getLogger(ForwardClientSocketClientHandler.class); - private Channel localChannel; - - private URI localServerURI; + private final URI localServerURI; public ForwardClientSocketClientHandler(URI localServerURI) { this.localServerURI = localServerURI; @@ -49,7 +42,6 @@ public class ForwardClientSocketClientHandler extends SimpleChannelInboundHandle @Override public void channelActive(ChannelHandlerContext ctx) { - } @Override @@ -58,26 +50,30 @@ public class ForwardClientSocketClientHandler extends SimpleChannelInboundHandle } @Override - public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { - + public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) { if (evt.equals(ClientHandshakeStateEvent.HANDSHAKE_COMPLETE)) { - - EventLoopGroup group = new NioEventLoopGroup(); - try { + connectLocalServer(ctx); + } catch (Throwable e) { + logger.error("ForwardClientSocketClientHandler connect local arthas server error", e); + } + } else { + ctx.fireUserEventTriggered(evt); + } + } - logger.info("ForwardClientSocketClientHandler star connect local arthas server"); + private void connectLocalServer(final ChannelHandlerContext ctx) throws InterruptedException { + EventLoopGroup group = new NioEventLoopGroup(); + logger.info("ForwardClientSocketClientHandler star connect local arthas server"); + WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(localServerURI, + WebSocketVersion.V13, null, true, new DefaultHttpHeaders()); + final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler( + newHandshaker); + final LocalFrameHandler localFrameHandler = new LocalFrameHandler(); - WebSocketClientHandshaker newHandshaker = WebSocketClientHandshakerFactory.newHandshaker(localServerURI, - WebSocketVersion.V13, null, true, new DefaultHttpHeaders()); - - final WebSocketClientProtocolHandler websocketClientHandler = new WebSocketClientProtocolHandler( - newHandshaker); - - final LocalFrameHandler localFrameHandler = new LocalFrameHandler(); - - Bootstrap b = new Bootstrap(); - b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { + Bootstrap b = new Bootstrap(); + b.group(group).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); @@ -86,45 +82,30 @@ public class ForwardClientSocketClientHandler extends SimpleChannelInboundHandle } }); - localChannel = b.connect(localServerURI.getHost(), localServerURI.getPort()).sync().channel(); - - localFrameHandler.handshakeFuture().addListener(new GenericFutureListener() { + Channel localChannel = b.connect(localServerURI.getHost(), localServerURI.getPort()).sync().channel(); + localFrameHandler.handshakeFuture() + .addListener(new GenericFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ChannelPipeline pipeline = future.channel().pipeline(); pipeline.remove(localFrameHandler); pipeline.addLast(new RelayHandler(ctx.channel())); - } }); - localFrameHandler.handshakeFuture().sync(); - - ctx.pipeline().remove(ForwardClientSocketClientHandler.this); - - ctx.pipeline().addLast(new RelayHandler(localChannel)); - - logger.info("ForwardClientSocketClientHandler connect local arthas server success"); - } catch (Throwable e) { - logger.error("ForwardClientSocketClientHandler connect local arthas server error", e); - } - - } else { - ctx.fireUserEventTriggered(evt); - } + localFrameHandler.handshakeFuture().sync(); + ctx.pipeline().remove(ForwardClientSocketClientHandler.this); + ctx.pipeline().addLast(new RelayHandler(localChannel)); + logger.info("ForwardClientSocketClientHandler connect local arthas server success"); } @Override - protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception { - + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); - if (!handshakeFuture.isDone()) { - handshakeFuture.setFailure(cause); - } ctx.close(); } }