手写一个RPC服务

mac2024-06-03  49

手写一个RPC服务

一、简介

RPC可以提高系统稳定性,比如说,我们的订单服务程序更新出BUG,导致内存溢出,是这台服务器宕机了,但是它只会影响的整个系统的订单业务部分,对于用户注册登录等业务没有影响,同样对于系统的日志记录也没有影响。 RPC远程过程调用(Remote Procedure Call),调用远程计算机上的服务,就像调用本地服务一样。 java本身也为我们提供一种RPC的实现:RMI,允许运行在一个java 虚拟机的对象调用运行在另一个java虚拟机上对象的方法。RMI***使用的是***JRMP(Java Remote Messageing Protocol)协议, ***JRMP***是专门为java定制的通信协议,所以是纯java的分布式解决方案。这里我就不作详细说明了,简要写一下实现的步骤:

1.创建一个远程接口,并继承java.rmi.Remote接口 2.实现远程接口,并且继承UnicastRemoteObject 3.创建服务器程序,同时使用createRegistry方法注册远程接口对象 4.创建客户端程序,通过Naming类的lookup方法来远程调用接口中的方法 但是RMI接口和实现类不灵活,RMI必须继承和实现Remote接口。

二、实现RPC服务

基于socket通信、jdk的动态代理以及java的序列化,我们可以说实现一个简单的RPC服务。

2.1.服务端

服务端定义服务接口及实现:

//服务接口 public interface TechInterface { String XJ(String name); } //服务的实现 public class TechImpl implements TechInterface { @Override public String XJ(String name) { return "你好 我是Eminem"; } }

服务注册中心以及服务处理rpc请求

public class RegisterCenter { //线程池 多线程加快处理请求 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //一个map模拟注册中心 private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>(); private static boolean isRunning = false; //服务端口 private static int port; public RegisterCenter(int port) { this.port = port; } //注册中心启动 public void start() throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress("127.0.0.1",port)); try { while (true) { //serverSocket.accept() 在没有接受到客户端的请求会阻塞在这 //客户端的请求会被包装成任务放到线程池里面执行 executor.execute(new ServiceTask(serverSocket.accept())); System.out.println("11111"); } } finally { serverSocket.close(); } } //map模拟服务注册 public void register(Class serviceInterface, Class impl) { serviceRegistry.put("com.zjx.rpc.service.TechInterface", impl); } //处理客户端请求的任务类 public static class ServiceTask implements Runnable { //客户端client Socket client=null; public ServiceTask(Socket socket){ this.client=socket; } @Override public void run() { ObjectInputStream inputStream=null; ObjectOutputStream outputStream=null; try { inputStream=new ObjectInputStream(client.getInputStream()); //拿到接口名 String serviceName = inputStream.readUTF(); //拿到方法名 String methodName = inputStream.readUTF(); //拿到参数类型 Class<?>[] paramTypes = ( Class<?>[])inputStream.readObject(); //拿到参数值 Object[] arguments = (Object[])inputStream.readObject(); //到注册中心根据接口名获取实现类 Class serviceClass=serviceRegistry.get(serviceName); //反射进行调用 Method method=serviceClass.getMethod(methodName,paramTypes); //执行方法 Object result=method.invoke(serviceClass.newInstance(),arguments); //返回给客户端 outputStream=new ObjectOutputStream(client.getOutputStream()); outputStream.writeObject(result); // outputStream.close(); inputStream.close(); client.close(); }catch (Exception e){ }finally { } } } }

服务端启动

public class Server { public static void main(String[] args) throws Exception{ new Thread(new Runnable() { public void run() { try { //起一个服务中心 RegisterCenter serviceServer = new RegisterCenter(8888); //注册服务至注册中心 serviceServer.register(TechInterface.class, TechImpl.class); //运行我们的服务 serviceServer.start(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
2.2.客户端

定义一个相同的接口

public interface TechInterface { String XJ(String name); }

客户端利用动态代理实现远程服务的调用

public class RpcClientFrame { public static <T> T getRemoteProxyObj(final Class<?> serviceInerface) throws Exception{ //默认端口 InetSocketAddress socketAddress=new InetSocketAddress("127.0.0.1",8888); return (T)Proxy.newProxyInstance(serviceInerface.getClassLoader(),new Class<?>[]{serviceInerface},new DynProxy(serviceInerface,socketAddress)); } private static class DynProxy implements InvocationHandler{ //接口 private final Class<?> serviceInterface; //远程调用的地址 private final InetSocketAddress addr; public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) { this.serviceInterface = serviceInterface; this.addr = addr; } /** * 增强 实现了对远程服务的访问 * @param proxy * @param method * @param args * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { /** * 网络增强部分 */ Socket socket=null; //因为传递的大部分是 方法、参数,所以我们使用Object流对象 ObjectInputStream objectInputStream=null; ObjectOutputStream objectOutputStream=null; try { socket=new Socket(); System.out.println("3333"); socket.connect(addr); System.out.println("34444"); //往远端发送数据 //拿到输出流 objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); //发送调用方法的类名 使用utf防止乱码 objectOutputStream.writeUTF(serviceInterface.getName()); System.out.println(serviceInterface.getName()); //发送 方法名 objectOutputStream.writeUTF(method.getName()); System.out.println(method.getName()); //发送参数类型 objectOutputStream.writeObject(method.getParameterTypes()); //发送参数值 objectOutputStream.writeObject(args); //刷新缓冲区 使得数据立马发送 objectOutputStream.flush(); //立刻拿到远程执行的结果 objectInputStream=new ObjectInputStream(socket.getInputStream()); //打印调用细节 System.out.println("远程调用成功!"+serviceInterface.getName()); return objectInputStream.readObject(); }finally { socket.close(); objectOutputStream.close(); objectInputStream.close(); } } } }

客户端调用远程服务

public static void main(String[] args) { try { TechInterface techInterface= RpcClientFrame.getRemoteProxyObj(TechInterface.class); //进远程调用我们的对象 System.out.println(techInterface.XJ("king")); } catch (Exception e) { e.printStackTrace(); } }

调用结果: 注意这边我简单化了,如果传递的参数是实体类,需要序列化。

三、总结

这里只是简单的写了一下,还有许多可以拓展的地方,例如: 1.通讯效率 2.IO方式 3.序列化速度 4.服务的管理 后面会继续研究阿里的dubbo。

最新回复(0)