从零开始手写 dubbo rpc 框架-06-通用客户端调用服务端

mac2025-09-17  33

通用调用

前面我们的例子是一个固定的出参和入参,固定的方法实现。

本节将实现通用的调用,让框架具有更高的实用性。

基本思路

所有的方法调用,基于反射进行相关处理实现。

测试代码

服务端

服务依赖

<dependency> <groupId>com.github.houbb</groupId> <artifactId>rpc-server</artifactId> <version>0.0.6</version> </dependency>

基础实现

和原来一样,计算接口实现及入参/出参,此处不再赘述。

测试

RpcServiceMain.java import com.github.houbb.rpc.example.server.service.CalculatorServiceImpl; import com.github.houbb.rpc.server.facade.constant.ServiceIdConst; import com.github.houbb.rpc.server.registry.impl.DefaultServiceRegistry; /** * @author binbin.hou * @since 1.0.0 */ public class RpcServiceMain { public static void main(String[] args) { // 启动服务 DefaultServiceRegistry.getInstance() .register(ServiceIdConst.CALC, new CalculatorServiceImpl()) .expose(); } } 日志 [DEBUG] [2019-11-01 15:45:25.406] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2019-11-01 15:45:25.411] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端 [INFO] [2019-11-01 15:45:26.580] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口

客户端

实现代码 import com.github.houbb.rpc.client.config.reference.ReferenceConfig; import com.github.houbb.rpc.client.config.reference.impl.DefaultReferenceConfig; import com.github.houbb.rpc.server.facade.constant.ServiceIdConst; import com.github.houbb.rpc.server.facade.model.CalculateRequest; import com.github.houbb.rpc.server.facade.model.CalculateResponse; import com.github.houbb.rpc.server.facade.service.CalculatorService; /** * @author binbin.hou * @since 1.0.0 */ public class RpcClientMain { public static void main(String[] args) { // 服务配置信息 ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>(); config.serviceId(ServiceIdConst.CALC); config.serviceInterface(CalculatorService.class); config.addresses("localhost:9527"); CalculatorService calculatorService = config.reference(); CalculateRequest request = new CalculateRequest(); request.setOne(10); request.setTwo(20); CalculateResponse response = calculatorService.sum(request); System.out.println(response); } } 日志 [DEBUG] [2019-11-01 15:46:42.437] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2019-11-01 15:46:42.467] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端 [INFO] [2019-11-01 15:46:43.687] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端完成,监听地址 localhost:9527 [INFO] [2019-11-01 15:46:43.780] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='6267e9869f094f688565726893eeb36d', createTime=1572594403773, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 15:46:43.780] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 6267e9869f094f688565726893eeb36d [INFO] [2019-11-01 15:46:43.781] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: d89c67fffe99d9d7-00002ed0-00000000-e080b63d353748a8-8dcc0ae6 [INFO] [2019-11-01 15:46:43.862] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: 6267e9869f094f688565726893eeb36d [INFO] [2019-11-01 15:46:43.862] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 6267e9869f094f688565726893eeb36d 对应结果为空,进入等待 [INFO] [2019-11-01 15:46:43.988] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seq: 6267e9869f094f688565726893eeb36d, rpcResponse: DefaultRpcResponse{seqId='6267e9869f094f688565726893eeb36d', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 15:46:43.989] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seq 信息已经放入,通知所有等待方 [INFO] [2019-11-01 15:46:43.990] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='6267e9869f094f688565726893eeb36d', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 15:46:43.990] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 6267e9869f094f688565726893eeb36d 对应结果已经获取: DefaultRpcResponse{seqId='6267e9869f094f688565726893eeb36d', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 15:46:43.992] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: 6267e9869f094f688565726893eeb36d CalculateResponse{success=true, sum=30}

客户端实现

节选说明

本节内容较多,只节选部分核心内容进行讲解。

DefaultReferenceConfig 引导类

/** * 获取对应的引用实现 * (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。 * (2)启动对应的长连接 * @return 引用代理类 * @since 0.0.6 */ @Override public T reference() { // 1. 启动 client 端到 server 端的连接信息 // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。 // 1.2 初期为了简单,直接使用同步循环的方式。 // 创建 handler // 循环连接 for(RpcAddress rpcAddress : rpcAddresses) { final ChannelHandler channelHandler = new RpcClientHandler(invokeService); final DefaultRpcClientContext context = new DefaultRpcClientContext(); context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler); ChannelFuture channelFuture = new RpcClient(context).connect(); // 循环同步等待 // 如果出现异常,直接中断?捕获异常继续进行?? channelFutures.add(channelFuture); } // 2. 接口动态代理 ProxyContext<T> proxyContext = buildReferenceProxyContext(); return ReferenceProxy.newProxyInstance(proxyContext); }

这里会根据 address 创建 client-server 之间的连接信息。

代理创建

ReferenceProxy.newProxyInstance(proxyContext) 是服务端代理创建的核心实现。

这里是直接使用 java 动态代理实现的。

/** * 获取代理实例 * (1)接口只是为了代理。 * (2)实际调用中更加关心 的是 serviceId * @param proxyContext 代理上下文 * @param <T> 泛型 * @return 代理实例 * @since 0.0.6 */ @SuppressWarnings("unchecked") public static <T> T newProxyInstance(ProxyContext<T> proxyContext) { final Class<T> interfaceClass = proxyContext.serviceInterface(); ClassLoader classLoader = interfaceClass.getClassLoader(); Class<?>[] interfaces = new Class[]{interfaceClass}; ReferenceProxy proxy = new ReferenceProxy(proxyContext); return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy); }

ReferenceProxy 核心代理类

核心流程如下:

(1)根据 proxyContext 构建 rpcRequest

(2)将 rpcRequest 写入到服务端

(3)同步等待服务端响应。

public class ReferenceProxy<T> implements InvocationHandler { private static final Log LOG = LogFactory.getLog(ReferenceProxy.class); /** * 服务标识 * @since 0.0.6 */ private final ProxyContext<T> proxyContext; /** * 暂时私有化该构造器 * @param proxyContext 代理上下文 * @since 0.0.6 */ private ReferenceProxy(ProxyContext<T> proxyContext) { this.proxyContext = proxyContext; } /** * 反射调用 * @param proxy 代理 * @param method 方法 * @param args 参数 * @return 结果 * @throws Throwable 异常 * @since 0.0.6 * @see Method#getGenericSignature() 通用标识,可以根据这个来优化代码。 */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 反射信息处理成为 rpcRequest final String seqId = Uuid.getInstance().id(); final long createTime = DefaultSystemTime.getInstance().time(); DefaultRpcRequest rpcRequest = new DefaultRpcRequest(); rpcRequest.serviceId(proxyContext.serviceId()); rpcRequest.seqId(seqId); rpcRequest.createTime(createTime); rpcRequest.paramValues(args); rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method)); rpcRequest.methodName(method.getName()); // 调用远程 LOG.info("[Client] start call remote with request: {}", rpcRequest); proxyContext.invokeService().addRequest(seqId); // 这里使用 load-balance 进行选择 channel 写入。 final Channel channel = getChannel(); LOG.info("[Client] start call channel id: {}", channel.id().asLongText()); // 对于信息的写入,实际上有着严格的要求。 // writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。 // 支持的必须是 ByteBuf channel.writeAndFlush(rpcRequest).sync(); // 循环获取结果 // 通过 Loop+match wait/notifyAll 来获取 // 分布式根据 redis+queue+loop LOG.info("[Client] start get resp for seqId: {}", seqId); RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId); LOG.info("[Client] start get resp for seqId: {}", seqId); Throwable error = rpcResponse.error(); if(ObjectUtil.isNotNull(error)) { throw error; } return rpcResponse.result(); } }

服务端实现

服务端方法信息注册

服务端会在启动的时候,将所有的方法信息进行处理,便于后期调用。

DefaultServiceFactory#registerServices /** * 服务注册一般在项目启动的时候,进行处理。 * 属于比较重的操作,而且一个服务按理说只应该初始化一次。 * 此处加锁为了保证线程安全。 * @param serviceConfigList 服务配置列表 * @return this */ @Override public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) { ArgUtil.notEmpty(serviceConfigList, "serviceConfigList"); // 集合初始化 serviceMap = new HashMap<>(serviceConfigList.size()); // 这里只是预估,一般为2个服务。 methodMap = new HashMap<>(serviceConfigList.size()*2); for(ServiceConfig serviceConfig : serviceConfigList) { serviceMap.put(serviceConfig.id(), serviceConfig.reference()); } // 存放方法名称 for(Map.Entry<String, Object> entry : serviceMap.entrySet()) { String serviceId = entry.getKey(); Object reference = entry.getValue(); //获取所有方法列表 Method[] methods = reference.getClass().getMethods(); for(Method method : methods) { String methodName = method.getName(); if(ReflectMethodUtil.isIgnoreMethod(methodName)) { continue; } List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method); String key = buildMethodKey(serviceId, methodName, paramTypeNames); methodMap.put(key, method); } } return this; }

对于客户端调用的处理

整体流程

(1)接收到请求信息后,整理出方法的相关信息

(2)根据方法信息,去初始化的方法集合中选取对应的方法

(3)反射调用,并且返回结果

/** * 处理请求信息 * @param rpcRequest 请求信息 * @return 结果信息 * @since 0.0.6 */ private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) { DefaultRpcResponse rpcResponse = new DefaultRpcResponse(); rpcResponse.seqId(rpcRequest.seqId()); try { // 获取对应的 service 实现类 // rpcRequest=>invocationRequest // 执行 invoke Object result = DefaultServiceFactory.getInstance() .invoke(rpcRequest.serviceId(), rpcRequest.methodName(), rpcRequest.paramTypeNames(), rpcRequest.paramValues()); rpcResponse.result(result); } catch (Exception e) { rpcResponse.error(e); log.error("[Server] execute meet ex for request", rpcRequest, e); } // 构建结果值 return rpcResponse; }

invoke 反射调用实现

方法节选如下

public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) { //参数校验 ArgUtil.notEmpty(serviceId, "serviceId"); ArgUtil.notEmpty(methodName, "methodName"); // 提供 cache,可以根据前三个值快速定位对应的 method // 根据 method 进行反射处理。 // 对于 paramTypes 进行 string 连接处理。 final Object reference = serviceMap.get(serviceId); final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames); final Method method = methodMap.get(methodKey); try { return method.invoke(reference, paramValues); } catch (IllegalAccessException | InvocationTargetException e) { throw new RpcRuntimeException(e); } }

总结

核心流程

看代码之前需要掌握整体的流程。

这样看起来顺着流程,会比较轻松。

核心技术

java 反射

netty 网络通讯

序列化

最新回复(0)