This commit is contained in:
CalvinKirs 2021-01-14 14:57:02 +08:00
parent 74a4de43b9
commit ed164fe325
6 changed files with 45 additions and 10 deletions

View File

@ -6,5 +6,5 @@ package org.apache.dolphinscheduler.remote.rpc;
*/
public interface IUserService {
String say();
String say(String sb);
}

View File

@ -3,6 +3,8 @@ package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.remote.rpc.client.RpcClient;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer;
@ -17,16 +19,17 @@ public class MainTest {
public static void main(String[] args) throws Exception {
NettyServer nettyServer=new NettyServer(new NettyServerConfig());
NettyClient nettyClient=new NettyClient(new NettyClientConfig());
// NettyClient nettyClient=new NettyClient(new NettyClientConfig());
Host host=new Host("127.0.0.1",12366);
RpcRequest rpcRequest=new RpcRequest();
rpcRequest.setRequestId("988");
rpcRequest.setClassName("kris");
rpcRequest.setMethodName("ll");
IRpcClient rpcClient=new RpcClient();
IUserService userService= rpcClient.create(IUserService.class);
userService.say("calvin");
nettyClient.sendMsg(host,rpcRequest);
// nettyClient.sendMsg(host,rpcRequest);
}
}

View File

@ -6,7 +6,7 @@ package org.apache.dolphinscheduler.remote.rpc;
*/
public class UserService implements IUserService{
@Override
public String say() {
return null;
public String say(String s) {
return "krris"+s;
}
}

View File

@ -4,9 +4,12 @@ import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.lang.reflect.Method;
import java.util.UUID;
@ -32,6 +35,9 @@ public class ConsumerInterceptor {
RpcRequest request = buildReq(args, method);
//todo
System.out.println(invoker.invoke(request));
NettyClient nettyClient = new NettyClient(new NettyClientConfig());
Host host = new Host("127.0.0.1", 12366);
nettyClient.sendMsg(host, request);
return null;
}

View File

@ -4,6 +4,9 @@ import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -15,6 +18,8 @@ public class RpcClient implements IRpcClient{
private ConcurrentHashMap<String,Object> classMap=new ConcurrentHashMap<>();
@Override
public <T> T create(Class<T> clazz) throws Exception {
if(!classMap.containsKey(clazz.getName())){

View File

@ -5,6 +5,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,11 +40,28 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
logger.info("server read msg");
System.out.println("收到消息");
RpcRequest req= (RpcRequest) msg;
System.out.println(req.getRequestId());
RpcResponse response=new RpcResponse();
response.setMsg("llll");
response.setRequestId(req.getRequestId());
Class<?> handlerClass = req.getClass();
System.out.println(req.getMethodName());
System.out.println(req.getClassName());
String methodName = req.getMethodName();
Class<?>[] parameterTypes = req.getParameterTypes();
Object[] parameters = req.getParameters();
// JDK reflect
Method method = handlerClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(req.getClassName(), parameters);
response.setResult(result);
ctx.writeAndFlush(response);
}
@Override