dubbo源码系列5-consumer服务引用

mac2024-03-18  28

一、前沿

在 dubbo源码系列4-provider启动服务导出 中我们了解了 provider 导出服务的整个过程,服务导出之后就是要应用这些服务了,即服务引入。dubbo中引用远程服务有两种方式:服务直连(不经过注册中心)、基于注册中心引用服务,在实际线上环境中我们基本上使用的都是基于注册中心引用服务的方式,下面我们就围绕该方式讲解分析

二、服务引用原理

Dubbo 服务引用的时机有以下两个:

1、Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务(饿汉式)

2、ReferenceBean 对应的服务被注入到其他类中时引用(懒汉式)

第一种是饿汉式的,第二种是懒汉式的,dubbo 中默认使用的是懒汉式,如果懒汉式下需要使用饿汉式,则通过配置 <dubbo:reference init="method"> 的 init 属性开启

我们按照 Dubbo 默认配置方式进行分析,整个分析过程从 ReferenceBean 的 getObject 方法开始。当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。

在服务引用过程中需要进行配置检查和信息收集工作,根据收集到的信息决定服务引用的具体方式,服务引用有以下三种方式:

1、引用本地 (JVM) 服务

2、直连方式引用远程服务(不经过注册中心)

3、通过注册中心引用远程服务

上述无论哪种方式引用服务,都需要通过 Invoker 实例载体进行。如果有多个注册中心和多个服务提供者,这时候会得到一组 Invoker 实例(Invoker 列表),此时通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但此时还不能暴露给用户使用,不然会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。这样避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用

三、服务引用过程

我们先从宏观上了解一下服务引用过程,如下图:

服务引用时序图:

四、服务引用源码

服务引用入口是 ReferenceBean 的 getObject 方法,该方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法,代码如下:

// 1、ReferenceBean 的 getObject 方法,实现了 FactoryBean 的 getObject 接口 @Override public Object getObject() { return get(); } // 2、ReferenceConfig 的 get 方法 public synchronized T get() { checkAndUpdateSubConfigs(); if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); } if (ref == null) { // 接口代理不存在时,初始化创建代理类 init(); } return ref; }

4.1 配置检查及处理

Dubbo 提供了很多的配置,用于调整和优化框架行为,性能等。Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性。配置解析检查逻辑封装在 ReferenceConfig 的 checkAndUpdateSubConfigs 方法中,代码如下:

/** * This method should be called right after the creation of this class's instance, before any property in other config modules is used. * Check each config modules are created properly and override their properties if necessary. */ // 1、ReferenceConfig 的 checkAndUpdateSubConfigs 方法 public void checkAndUpdateSubConfigs() { // 检测接口名合法性 if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!"); } // 完成application、module、registries、monitor配置信息填充 completeCompoundConfigs(); // configCenter 配置信息设置 startConfigCenter(); // get consumer's global configuration // 检测 consumer 变量是否为空,为空则创建默认的 consumer checkDefault(); this.refresh(); if (getGeneric() == null && getConsumer() != null) { // 设置 generic setGeneric(getConsumer().getGeneric()); } if (ProtocolUtils.isGeneric(getGeneric())) { // 检测是否为泛化接口 interfaceClass = GenericService.class; } else { try { // 加载接口类 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 检查配置的方法名列表是否都在接口类中存在 checkInterfaceAndMethods(interfaceClass, methods); } // 解析文件,获取接口名对应的配置信息赋值给 url 属性 resolveFile(); // 检查application配置信息 checkApplication(); // 检查 metadataReport配置信息 checkMetadataReport(); } // 2、ReferenceConfig 的 resolveFile 方法 private void resolveFile() { // 从系统变量中获取与接口名对应的属性值 String resolve = System.getProperty(interfaceName); String resolveFile = null; if (StringUtils.isEmpty(resolve)) { // 从系统属性中获取解析文件路径 resolveFile = System.getProperty("dubbo.resolve.file"); if (StringUtils.isEmpty(resolveFile)) { // 获取指定目录下的 dubbo-resolve.properties 配置文件 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); try (FileInputStream fis = new FileInputStream(new File(resolveFile))) { // 加载文件中的配置信息 properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Failed to load " + resolveFile + ", cause: " + e.getMessage(), e); } // 获取与接口名对应的配置 resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { // 将接口名对应的配置信息 resolve 赋值给 url url = resolve; if (logger.isWarnEnabled()) { if (resolveFile != null) { logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); } else { logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); } } } }

配置检查主要实现了以下内容的检查:

1)、对 consumer、application、monitor 等配置信息校验,如果 consumerConfig 实例不存在,则通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段

2)、接口和方法合法性,方法都必须是在接口中定义的

3)、解析文件,获取接口名对应的配置信息赋值给 url 属性,url 属性用于点对点调用

配置检查完之后,就需要对配置做处理,处理逻辑封装在 ReferenceConfig 的 init 方法中,代码如下:

private void init() { if (initialized) { // 已初始化直接返回 return; } // 检查本地存根配置合法性,主要检查本地存根类是否实现了接口类,以及检查带有参数who类型的本地存根类构造函数是否是接口类类型 checkStubAndLocal(interfaceClass); // mock检查 checkMock(interfaceClass); Map<String, String> map = new HashMap<String, String>(); // 添加side、dubbo version、pid等参数到map中 map.put(SIDE_KEY, CONSUMER_SIDE); appendRuntimeParameters(map); if (!isGeneric()) { // 非泛化处理 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { // revision 参数添加到map map.put(REVISION_KEY, revision); } // 获取方法列表,并将方法名以“,”拼接,methods 参数添加到map String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR)); } } // interface 参数添加到map map.put(INTERFACE_KEY, interfaceName); // 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中 appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); // remove 'default.' prefix for configs from ConsumerConfig // appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, consumer); appendParameters(map, this); Map<String, Object> attributes = null; if (CollectionUtils.isNotEmpty(methods)) { attributes = new HashMap<String, Object>(); for (MethodConfig methodConfig : methods) { // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名 appendParameters(map, methodConfig, methodConfig.getName()); String retryKey = methodConfig.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { // 添加重试次数配置 methodName.retries 到map,不允许重试时,值为0 map.put(methodConfig.getName() + ".retries", "0"); } } // 添加 MethodConfig 中的“属性”字段到 attributes, 例如 onreturn、onthrow、oninvoke 等 attributes.put(methodConfig.getName(), convertMethodConfig2AsyncInfo(methodConfig)); } } // 获取服务消费者ip地址 String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY); if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } // 添加服务消费者ip参数到map map.put(REGISTER_IP_KEY, hostToRegistry); // 创建接口代理类 ref = createProxy(map); // 获取消费者服务名,例如:org.apache.dubbo.demo.DemoService String serviceKey = URL.buildKey(interfaceName, group, version); // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,并将 ConsumerModel 存入到 ApplicationModel 的消费者集合map中 ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes)); initialized = true; }

配置处理主要完成了以下工作:

1)、收集各种配置信息,并将配置存储到 map 中

2)、主要用于解析服务消费者 ip

3)、创建接口代理类

4)、根据服务名构建 ConsumerModel,并将 ConsumerModel 存入到 ApplicationModel 的消费者集合map中

4.2 引用服务

配置信息检查和处理完之后,就是要引用服务了,引用服务入口是 createProxy 方法,该方法字面上看是创建代理对象,但事实上不是这样的,下面我们跟着源码一探究竟,代码如下:

private T createProxy(Map<String, String> map) { if (shouldJvmRefer(map)) { // 本地服务引用 // 生成本地引用 URL,协议为 injvm,ip 为 127.0.0.1,port 为 0 URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); // 先调用 AbstractProtocol 的 refer 方法,然后调用 InjvmProtocol 的 protocolBindingRefer 方法构建 InjvmInvoker 实例 invoker = REF_PROTOCOL.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 远程服务引用 urls.clear(); // reference retry init will add url to urls, lead to OOM if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. // 配置了url,可能想使用点对点调用 String[] us = SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { // 设置接口全限定名为 url 路径,例如:org.apache.dubbo.demo.DemoService url = url.setPath(interfaceName); } // 检测 url 协议是否为 registry,若是,表明要使用指定的注册中心 if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中 urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性), // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等 // 最后将合并后的配置设置为 url 查询字符串中。 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // if protocols not injvm checkRegistry // protocols 非 injvm时,即 if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){ // 检查 RegistryConfig 配置是否存在,并且将 registries 和 registryIds 转换为 RegistryConfig 对象 checkRegistry(); // 加载注册中心链接 List<URL> us = loadRegistries(false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { // 加载 monitor 配置 URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { // monitor 参数添加到map map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } } // 单个注册中心或服务提供者->服务直连 if (urls.size() == 1) { // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { // 存在多个注册中心或者服务提供者 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 根据url获取所有的invoker for (URL url : urls) { // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时 // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法 invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { // use last registry url // 对于 url 协议是 registry 的,registryURL 使用最后的 url registryURL = url; } } if (registryURL != null) { // registry url is available // use RegistryAwareCluster only when register's CLUSTER is available // 如果注册中心链接不为空,则将使用 RegistryAwareCluster URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME); // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } if (shouldCheck() && !invoker.isAvailable()) { // 检查invoker是否可用,即提供者服务是否可用 throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } /** * @since 2.7.0 * ServiceData Store * 保存消费者的Metadata数据 */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); metadataReportService.publishConsumer(consumerURL); } // create service proxy // 创建服务代理类 return (T) PROXY_FACTORY.getProxy(invoker); }

createProxy 方法实现了以下功能:

1)、若是本地服务调用,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例

2)、配置了url参数,则将所有 url 做设置 path 或者 设置 refer 或者合并处理,最后添加到 urls 列表中

3)、没有配置url参数时,从 registries 参数中加载注册中心链接 url,url 做设置 refer 处理后添加到 urls 列表中

4)、urls 元素为1个时,直接调用 RegistryProtocol 的 refer 构建 Invoker 实例

5)、urls 元素大于1时,根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法构建 Invoker 实例,最后通过 Cluster 合并多个 Invoker

6)、保存消费者的Metadata数据

7)、 调用 ProxyFactory 的 getproxy 方法创建服务代理类

4.2.1 invoker 创建

Invoker 是 Dubbo 的核心模型,代表一个可执行体。在前面文章中我们已经提过了,在provider(服务提供方),Invoker 用于调用服务提供类。在consumer(服务消费方),Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,我们就以最常用的两个为例分析,分别是 RegistryProtocol 和 DubboProtocol。

createProxy 方法中执行了一系列调用逻辑之后,如下图所示:

我们debug调试看调用栈可以看出整个调用过程,createProxy(ReferenceConfig) -> ref(ProtocolListenerWrapper)-> ref(ProtocolFilterWrapper)-> ref(RegistryProtocol),最终调用到了 RegistryProtocol 的 ref 方法

下面先从 RegistryProtocol  的 ref 方法开始讲起,代码如下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 从 url 中获取 registry 参数值,不存在时默认为 dubbo,并设置为 url 协议头 // url变化前:registry://192.168.1.235:2181/org.apache.dubbo.registry.RegistryService?registry=zookeeper&..... // url变化后:zookeeper://192.168.1.235:2181/org.apache.dubbo.registry.RegistryService?..... url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); // 由于测试使用的是 Zookeeper 注册中心,这里获取就是 zk 客户端 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" // 将 url 中的 refer 参数值对应的查询字符串转为 Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { // 多个group时 或者 group 为 “*” 时,cluster 使用 MergeableCluster if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 通过 SPI 方法获取 MergeableCluster 实例,调用 doRefer 方法执行服务引用 return doRefer(getMergeableCluster(), registry, type, url); } } // 调用 doRefer 方法执行服务引用 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 设置注册中心和协议 directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 构建服务消费者链接 // 例如:consumer:///org.apache.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.2& // interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=18888&qos.port=33333&side=consumer& // sticky=false&timestamp=1572576516380 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); // 注册服务消费者,在 consumers 目录下创建节点,节点数据: // consumer:///org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers& // check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello& // pid=18888&qos.port=33333&side=consumer&sticky=false&timestamp=1572576516380 registry.register(directory.getRegisteredConsumerUrl()); } // 构建路由器链条,为服务调用路由使用 directory.buildRouterChain(subscribeUrl); // 订阅 Zookeeper /dubbo/org.apache.dubbo.demo.DemoService 目录下的 providers、configurators、routers 节点信息 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); // 一个注册中心可能有多个服务提供者 provider,因此这里需要将多个服务提供者合并为一个 Invoker invoker = cluster.join(directory); // 向服务提供者与消费者注册表中注册服务消费者,即consumer ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }

上述代码主要完成了一些工作:

1)、构建消费者链接url,并将url注册到Zookeeper中,即创建url节点

2)、RegistryDirectory 订阅 Zookeeper 当前接口目录下的 providers、configurators、routers 节点信息

3)、集群 cluster 将多个服务提供者处理合并为一个 provider,即生成一个 invoker

4)、向服务提供者与消费者注册表中注册服务消费者,即consumer

consumer 消费者 url 注册到 Zookeeper中内容如下图:

org.apache.dubbo.demo.DemoService 接口对应的生产者和消费者 url 注册到 Zookeeper 如下图所示:

4.2.2 创建代理

invoker 创建完成之后,接下来要做的事情就是为服务接口生成代理对象,即可进行远程调用。代理对象生成的入口是 AbstractProxyFactory 的 getProxy 方法,代码如下:

// 1、AbstractProxyFactory 的 getProxy 方法 @Override public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; // 获取url中interfaces配置的接口列表 String config = invoker.getUrl().getParameter(INTERFACES); if (config != null && config.length() > 0) { String[] types = COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; // 设置服务接口类和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // TODO can we load successfully for a different classloader?. // 反射加载接口类 interfaces[i + 2] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } // 为 http 和 hessian 协议提供泛化调用支持 if (!GenericService.class.isAssignableFrom(invoker.getInterface()) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); interfaces[len] = com.alibaba.dubbo.rpc.service.GenericService.class; } // 调用重载方法,具体实现代理类的getProxy方法, // dubbo中默认使用的是 Javassist 代理,故这里调用的是 JavassistProxyFactory 的 getProxy 方法 return getProxy(invoker, interfaces); } // 2、JavassistProxyFactory 的 getProxy 方法 @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。 // InvokerInvocationHandler 实现自 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } // 3、Proxy 的 getProxy 方法 /** * Get proxy. * * @param ics interface class array. * @return Proxy instance. */ public static Proxy getProxy(Class<?>... ics) { // 调用重载方法 return getProxy(ClassUtils.getClassLoader(Proxy.class), ics); } /** * Get proxy. * * @param cl class loader. * @param ics interface class array. * @return Proxy instance. */ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { // 代理接口上线不能超过 65535 个 if (ics.length > MAX_PROXY_COUNT) { throw new IllegalArgumentException("interface limit exceeded"); } StringBuilder sb = new StringBuilder(); // 遍历接口列表 for (int i = 0; i < ics.length; i++) { String itf = ics[i].getName(); if (!ics[i].isInterface()) { throw new RuntimeException(itf + " is not a interface."); } Class<?> tmp = null; try { // 重新加载接口类 tmp = Class.forName(itf, false, cl); } catch (ClassNotFoundException e) { } if (tmp != ics[i]) { throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); } // 以";"拼接所有的接口全限定名(带上包路径的接口) sb.append(itf).append(';'); } // use interface class name list as key. String key = sb.toString(); // get cache by class loader. final Map<String, Object> cache; synchronized (PROXY_CACHE_MAP) { cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>()); } Proxy proxy = null; synchronized (cache) { do { // 缓存中获取 Reference<Proxy> 实例 Object value = cache.get(key); if (value instanceof Reference<?>) { proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null) { return proxy; } } // 并发锁控制,只有一个线程可以操作 if (value == PENDING_GENERATION_MARKER) { try { // 其他线程等待 cache.wait(); } catch (InterruptedException e) { } } else { // 放置标志位到缓存中,并跳出 while 循环进行后续操作 cache.put(key, PENDING_GENERATION_MARKER); break; } } while (true); } // 代理类计数器自增 long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; ClassGenerator ccp = null, ccm = null; try { // 创建 ClassGenerator 对象 ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<>(); List<Method> methods = new ArrayList<>(); for (int i = 0; i < ics.length; i++) { if (!Modifier.isPublic(ics[i].getModifiers())) { // 接口访问级别非public // 获取接口包名 String npkg = ics[i].getPackage().getName(); if (pkg == null) { pkg = npkg; } else { if (!pkg.equals(npkg)) { // 非public级别的接口必须在同一包路径下 throw new IllegalArgumentException("non-public interfaces from different packages"); } } } ccp.addInterface(ics[i]); // 遍历接口方法 for (Method method : ics[i].getMethods()) { // 获取方法描述,可理解为方法签名 String desc = ReflectUtils.getDesc(method); // 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况, // A 接口和 B 接口中包含一个完全相同的方法 if (worked.contains(desc)) { continue; } if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) { // static 访问级别的方法忽略 continue; } worked.add(desc); int ix = methods.size(); // 获取方法返回值类型 Class<?> rt = method.getReturnType(); // 获取方法参数列表 Class<?>[] pts = method.getParameterTypes(); // 生成 Object[] args = new Object[1...N] StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); for (int j = 0; j < pts.length; j++) { // 生成 args[0...N] = ($w)$1...N; code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); } // 生成 InvokerHandler 接口的 invoker 方法调用语句,如下: // Object ret = handler.invoke(this, methods[1...N], args); code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);"); // 返回值不为 void if (!Void.TYPE.equals(rt)) { // 生成返回语句,形如 return (java.lang.String) ret; code.append(" return ").append(asArgument(rt, "ret")).append(";"); } methods.add(method); // 添加方法名、访问控制符、方法参数列表、方法返回值代码等信息到 ClassGenerator 中 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null) { // pkg 默认是 org.apache.dubbo.common.bytecode pkg = PACKAGE_NAME; } // create ProxyInstance class. // 创建接口代理类名称:pkg + ".proxy" + id,例如: org.apache.dubbo.common.bytecode.proxy0 String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;"); // 生成 private java.lang.reflect.InvocationHandler handler; 语句 ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); // 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如: // porxy0(java.lang.reflect.InvocationHandler arg0) { // handler=$1; // } ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); // 为接口代理类添加默认构造方法 ccp.addDefaultConstructor(); // 创建接口代理类,为服务接口(org.apache.dubbo.demo.DemoService)生成代理类 Class<?> clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0])); // 在 E 盘ccp目录下输出接口代理类 ccp.getClassPool().get(pcn).debugWriteFile("E:\\ccp\\"); // create Proxy class. // 创建 Proxy 子类名称,比如 Proxy1,Proxy2 等,例如:org.apache.dubbo.common.bytecode.Proxy0 String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); // 为 Proxy 的抽象方法 newInstance 生成实现代码,形如: // public Object newInstance(java.lang.reflect.InvocationHandler h) { // return org.apache.dubbo.demo.proxy0($1); // } ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); // 生成 Proxy 实现类, 为org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象 Class<?> pc = ccm.toClass(); // 在 E 盘ccm目录下输出 Proxy 实现类 ccm.getClassPool().get(fcn).debugWriteFile("E:\\ccm\\"); // 通过反射创建 Proxy 实例 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { // release ClassGenerator // 释放资源 if (ccp != null) { ccp.release(); } if (ccm != null) { ccm.release(); } synchronized (cache) { if (proxy == null) { cache.remove(key); } else { // 放入缓存 cache.put(key, new WeakReference<Proxy>(proxy)); } // 唤醒其他等待线程 cache.notifyAll(); } } return proxy; }

上面最主要的逻辑就是ccp 和 ccm 的作用是什么?分别生成了什么?如下:

1)、ccp 用于为服务接口生成代理类,比如我们有一个 DemoService 接口,这个接口代理类就是由 ccp 生成的

2)、ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法

我们通过在源码中分别加入 ccp.getClassPool().get(pcn).debugWriteFile("E:\\ccp\\");   ccm.getClassPool().get(fcn).debugWriteFile("E:\\ccm\\");  这两句话输出ccp 和 ccm 生成的代理类如下:

1)、ccp 生成的 proxy0.class 代理类,如下图:

proxy0.class 代码如下:

package org.apache.dubbo.common.bytecode; import com.alibaba.dubbo.rpc.service.EchoService; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import org.apache.dubbo.demo.DemoService; public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { public static Method[] methods; private InvocationHandler handler; public String sayHello(String paramString) { Object[] arrayOfObject = new Object[1]; arrayOfObject[0] = paramString; Object localObject = this.handler.invoke(this, methods[0], arrayOfObject); return (String)localObject; } public Object $echo(Object paramObject) { Object[] arrayOfObject = new Object[1]; arrayOfObject[0] = paramObject; Object localObject = this.handler.invoke(this, methods[1], arrayOfObject); return (Object)localObject; } public proxy0() { } public proxy0(InvocationHandler paramInvocationHandler) { this.handler = paramInvocationHandler; } }

2)、ccm 生成的 Proxy0.class 代理类,如下图:

Proxy0.class 代码如下:

package org.apache.dubbo.common.bytecode; import java.lang.reflect.InvocationHandler; public class Proxy0 extends Proxy implements ClassGenerator.DC { public Object newInstance(InvocationHandler paramInvocationHandler) { return new proxy0(paramInvocationHandler); } }

 

到这里,创建代理也讲解完了,整个过程比较难以理解,耐心细致的看总会有所收获的。

consumer的服务引用到这里就全部解析完了,回过头来再去看服务引用过程的流程图和时序图,和我们的源码过程对比一下就会发现是一致的。

总结

本文详细分析了 consumer 启动过程中服务引用全过程,包括配置检测,加载注册中心链接、URL 构建并注册,Invoker 创建、代理创建、RegistryDirectory 订阅 Zookeeper节点信息(providers、configurators、routers)等等。文章内容很多,需要足够的耐心阅读,有不正确之处请大家指正。这里没有讲解有关 Directory、Cluster 的作用,后续单独讲解。

参考:

https://dubbo.apache.org/zh-cn/docs/source_code_guide/refer-service.html

最新回复(0)