前面我们的例子是一个固定的出参和入参,固定的方法实现。
本节将实现通用的调用,让框架具有更高的实用性。
所有的方法调用,基于反射进行相关处理实现。
和原来一样,计算接口实现及入参/出参,此处不再赘述。
本节内容较多,只节选部分核心内容进行讲解。
这里会根据 address 创建 client-server 之间的连接信息。
ReferenceProxy.newProxyInstance(proxyContext) 是服务端代理创建的核心实现。
这里是直接使用 java 动态代理实现的。
/** * 获取代理实例 * (1)接口只是为了代理。 * (2)实际调用中更加关心 的是 serviceId * @param proxyContext 代理上下文 * @param <T> 泛型 * @return 代理实例 * @since 0.0.6 */ @SuppressWarnings("unchecked") public static <T> T newProxyInstance(ProxyContext<T> proxyContext) { final Class<T> interfaceClass = proxyContext.serviceInterface(); ClassLoader classLoader = interfaceClass.getClassLoader(); Class<?>[] interfaces = new Class[]{interfaceClass}; ReferenceProxy proxy = new ReferenceProxy(proxyContext); return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy); }核心流程如下:
(1)根据 proxyContext 构建 rpcRequest
(2)将 rpcRequest 写入到服务端
(3)同步等待服务端响应。
public class ReferenceProxy<T> implements InvocationHandler { private static final Log LOG = LogFactory.getLog(ReferenceProxy.class); /** * 服务标识 * @since 0.0.6 */ private final ProxyContext<T> proxyContext; /** * 暂时私有化该构造器 * @param proxyContext 代理上下文 * @since 0.0.6 */ private ReferenceProxy(ProxyContext<T> proxyContext) { this.proxyContext = proxyContext; } /** * 反射调用 * @param proxy 代理 * @param method 方法 * @param args 参数 * @return 结果 * @throws Throwable 异常 * @since 0.0.6 * @see Method#getGenericSignature() 通用标识,可以根据这个来优化代码。 */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 反射信息处理成为 rpcRequest final String seqId = Uuid.getInstance().id(); final long createTime = DefaultSystemTime.getInstance().time(); DefaultRpcRequest rpcRequest = new DefaultRpcRequest(); rpcRequest.serviceId(proxyContext.serviceId()); rpcRequest.seqId(seqId); rpcRequest.createTime(createTime); rpcRequest.paramValues(args); rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method)); rpcRequest.methodName(method.getName()); // 调用远程 LOG.info("[Client] start call remote with request: {}", rpcRequest); proxyContext.invokeService().addRequest(seqId); // 这里使用 load-balance 进行选择 channel 写入。 final Channel channel = getChannel(); LOG.info("[Client] start call channel id: {}", channel.id().asLongText()); // 对于信息的写入,实际上有着严格的要求。 // writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。 // 支持的必须是 ByteBuf channel.writeAndFlush(rpcRequest).sync(); // 循环获取结果 // 通过 Loop+match wait/notifyAll 来获取 // 分布式根据 redis+queue+loop LOG.info("[Client] start get resp for seqId: {}", seqId); RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId); LOG.info("[Client] start get resp for seqId: {}", seqId); Throwable error = rpcResponse.error(); if(ObjectUtil.isNotNull(error)) { throw error; } return rpcResponse.result(); } }服务端会在启动的时候,将所有的方法信息进行处理,便于后期调用。
DefaultServiceFactory#registerServices /** * 服务注册一般在项目启动的时候,进行处理。 * 属于比较重的操作,而且一个服务按理说只应该初始化一次。 * 此处加锁为了保证线程安全。 * @param serviceConfigList 服务配置列表 * @return this */ @Override public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) { ArgUtil.notEmpty(serviceConfigList, "serviceConfigList"); // 集合初始化 serviceMap = new HashMap<>(serviceConfigList.size()); // 这里只是预估,一般为2个服务。 methodMap = new HashMap<>(serviceConfigList.size()*2); for(ServiceConfig serviceConfig : serviceConfigList) { serviceMap.put(serviceConfig.id(), serviceConfig.reference()); } // 存放方法名称 for(Map.Entry<String, Object> entry : serviceMap.entrySet()) { String serviceId = entry.getKey(); Object reference = entry.getValue(); //获取所有方法列表 Method[] methods = reference.getClass().getMethods(); for(Method method : methods) { String methodName = method.getName(); if(ReflectMethodUtil.isIgnoreMethod(methodName)) { continue; } List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method); String key = buildMethodKey(serviceId, methodName, paramTypeNames); methodMap.put(key, method); } } return this; }整体流程
(1)接收到请求信息后,整理出方法的相关信息
(2)根据方法信息,去初始化的方法集合中选取对应的方法
(3)反射调用,并且返回结果
/** * 处理请求信息 * @param rpcRequest 请求信息 * @return 结果信息 * @since 0.0.6 */ private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) { DefaultRpcResponse rpcResponse = new DefaultRpcResponse(); rpcResponse.seqId(rpcRequest.seqId()); try { // 获取对应的 service 实现类 // rpcRequest=>invocationRequest // 执行 invoke Object result = DefaultServiceFactory.getInstance() .invoke(rpcRequest.serviceId(), rpcRequest.methodName(), rpcRequest.paramTypeNames(), rpcRequest.paramValues()); rpcResponse.result(result); } catch (Exception e) { rpcResponse.error(e); log.error("[Server] execute meet ex for request", rpcRequest, e); } // 构建结果值 return rpcResponse; }方法节选如下
public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) { //参数校验 ArgUtil.notEmpty(serviceId, "serviceId"); ArgUtil.notEmpty(methodName, "methodName"); // 提供 cache,可以根据前三个值快速定位对应的 method // 根据 method 进行反射处理。 // 对于 paramTypes 进行 string 连接处理。 final Object reference = serviceMap.get(serviceId); final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames); final Method method = methodMap.get(methodKey); try { return method.invoke(reference, paramValues); } catch (IllegalAccessException | InvocationTargetException e) { throw new RpcRuntimeException(e); } }看代码之前需要掌握整体的流程。
这样看起来顺着流程,会比较轻松。
java 反射
netty 网络通讯
序列化