手写RPC通信框架

mac2024-04-13  35

RPC基本介绍

RPC框架就是一个远程调用,不需要客户端去了解底层通信协议而进行数据交换,现有RPC框架有 Dubbo、Thrift(跨语言)、webservice、hessian,这里就利用网络编程基础尝试着去编写一套 PRC框架

服务器端构建

API包下主体编写

首先初始采用Socket进行通信利用IO进行数据交互,对客户端要提供API调用接口,使得客户端可以利用API中提供的接口,获得服务器端的数据。服务器端模块目录如下:

先编写一个接口,该接口放置在api模块下,实现类放置在provider模块下,这样做目的是让客户端仅加载API包的条件下,可以调用服务器端的实现类完成具体业务逻辑。

public interface IHelloService { String sayHello(String content); String saveUser(User user); }

服务器端需要客户端传递,需要调用的类、方法以及参数,这样服务器端会知晓调用。故完善API包下的类如下:

public class RpcRequest implements Serializable { private String className; private String methodName; private Object[] parameters; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParametes() { return parameters; } public void setParametes(Object[] parameters) { this.parameters = parameters; } } public class User implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; } }

API包中结构基本完成 接下来编写服务器的通信交互与处理逻辑

provider包下主体编写

首先需要晚上HelloService的业务逻辑

public class HelloServiceImpl implements IHelloService { @Override public String sayHello(String content) { System.out.println("request in sayHello:"+content); return "Say Hellp: "+content; } @Override public String saveUser(User user) { System.out.println("request in SaveUser:"+user.toString()); return "SUCCESS"; } }

基本的编写完成,接下来是对交互逻辑代码的编写,首先需要采用Socke进行传输,传输过程中会拿到调用类、方法与参数,故采用反射可以调用类。首先我们先编写传输通信逻辑 在这里IO阻塞部分利用线程池进行优化,使得一个线程处理一个Socket(后续可以利用NIO优化)。

public class RpcProxyServer { ExecutorService executorService= Executors.newCachedThreadPool(); public void publisher(Object service,int port){ ServerSocket serverSocket=null; try { serverSocket = new ServerSocket(port); while (true){ Socket socket= serverSocket.accept(); //每个Socker 交给一个ProcessorHandler来处理 executorService.execute(new ProcessorHandler(socket,service)); } } catch (IOException e) { e.printStackTrace(); }finally { if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }

接下来是反射调用,由于采用线程去优化故需要实现Runnable接口中的run方法,在run方法中完成Socket通信后的I/O流,调用invoke方法完成反射调用逻辑。

public class ProcessorHandler implements Runnable{ private Socket socket; private Object service; public ProcessorHandler(Socket socket, Object service) { this.socket = socket; this.service = service; } @Override public void run() { ObjectInputStream objectInputStream=null; ObjectOutputStream objectOutputStream=null; try { objectInputStream = new ObjectInputStream(socket.getInputStream()); //输入流的信息 //请求类,请求方法名、请求参数 RpcRequest rpcRequest=(RpcRequest)objectInputStream.readObject(); Object result=invoke(rpcRequest);//反射调用本地服务结果写出 objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(result); objectOutputStream.flush(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { e.printStackTrace(); }finally { if(objectInputStream!=null){ try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(objectOutputStream!=null){ try { objectOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } private Object invoke(RpcRequest request) throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException { Object[] args=request.getParametes();//拿到客户端请求参数 Class<?>[] types=new Class[args.length]; for(int i=0;i<args.length;i++){ types[i]=args[i].getClass();//去获得参数类型 } Class clazz= Class.forName(request.getClassName());//根据请求去加载类 Method method=clazz.getMethod(request.getMethodName(),types); //去找到类中方法 Object result= method.invoke(service,args);//进行反射调用 return result; } }

服务器端编写完毕。接下来对API模块进行打包,在客户端中pom文件中引用。

客户端模块编写

首先在Pom文件引入编写的api包,编写Socket通信类。

public class RpcNetTransport { private String host; private int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } public Object send(RpcRequest request){ Socket socket; Object result=null; ObjectOutputStream objectOutputStream=null; ObjectInputStream objectInputStream=null; try { socket =new Socket(host,port); //连接服务器 objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); //网络流 objectOutputStream.writeObject(request); //写入传输给服务端的数据 objectOutputStream.flush(); objectInputStream=new ObjectInputStream(socket.getInputStream()); result=objectInputStream.readObject();//接收服务端传回的数据 } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }finally { if(objectInputStream!=null){ try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(objectOutputStream!=null){ try { objectOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return result; } }

接下来是利用JDK动态代理去调用方法 这里采用了代理模式,借用动态代理去实现调用。

public class RpcProxyClient { public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){ return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),new Class<?>[]{interfaceCls},new RemoteInvocationHandler(host, port)); } } public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest=new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParametes(args); RpcNetTransport rpcNetTransport=new RpcNetTransport(host,port); return rpcNetTransport.send(rpcRequest); } }

最后客户端调用代码

public class App { public static void main( String[] args ) { RpcProxyClient rpcProxyClient = new RpcProxyClient(); IHelloService iHelloService=rpcProxyClient .clientProxy(IHelloService.class,"localhost",8080); User user = new User(); user.setAge(18); user.setName("洋娃娃"); String result= iHelloService.saveUser(user); System.out.println(result); result= iHelloService.sayHello("杨某"); System.out.println(result); } }

这里的iHelloService实际上利用动态代理对象。 客户端输出结果 后续有精力可以利用NIO去改善IO处理,或者使用Netty框架。rpc通信框架其实与Tomcat的编程思想也大同小异,也可以借此去思路去实现Tomcat,http传输的也是就是一串字符串,有特定协议的字符串,通过浏览器(客户端)去解析。

最新回复(0)