RegistryProtocol.export服务导出流程:导出服务ExporterChangeableWrapper->注册服务到注册中心->订阅注册中心overrideSubscribeUrl数据;篇幅有限,本篇幅主要分析导出服务ExporterChangeableWrapper源码实现
RegistryProtocol.export(final Invoker<T> originInvoker)
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker //导出服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下: // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider URL registryUrl = getRegistryUrl(originInvoker); //registry provider // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 获取已注册的服务提供者 URL,比如: // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); //to judge to delay publish whether or not //获取register参数;register表示是否注册到注册中心 boolean register = registeredProviderUrl.getParameter("register", true); //缓存到ProviderConsumerRegTable的表中 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //注册服务到zookeeper if (register) { register(registryUrl, registeredProviderUrl); //找到该originInvoker对应的ProviderInvokerWrapper设置reg属性为true ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); //创建监听器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); //放入overrideSubscribeUrl对应的OverrideListener overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 向注册中心进行订阅 override 数据 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export //创建并返回DestroyableExporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }该方法核心代码可以简化为
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //1.导出invoker; final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); .... boolean register = registeredProviderUrl.getParameter("register", true); //2.注册服务到zookeeper if (register) { register(registryUrl, registeredProviderUrl); } .... //3.向注册中心进行订阅overrideSubscribeUrl final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }doLocalExport(final Invoker<T> originInvoker)
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { //获取originInvoker对应的缓存key String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //创建invoker委托类对象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //调用protocol的export方法导出服务 //这个时候返回的url肯定是类似dubbo://....这样子的所以protocol //由于dubbo的spi机制,此时肯定是通过DubboProtocol的export方法 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }通过originInvoker获取缓存key,然后就是dubbo惯用的缓存伎俩,从缓存中获取,如果没有成功获取到,调用成员变量protocol的export方法导出invoker,这里的protocol还是Protocol$Adaptive,所以又会经过https://blog.csdn.net/qq_23536449/article/details/102639888文中描述的
QosProtocolWrapper,ProtocolFilterWrapper,ProtocolListenerWrapper,通过debug代码可以清楚看到执行流程:
QosProtocolWrapper ProtocolFilterWrapper ProtocolListenerWrapper 最后会进入到DubboProtocol的export方法中,下文会分析getCacheKey(originInvoker)
private String getCacheKey(final Invoker<?> originInvoker) { URL providerUrl = getProviderUrl(originInvoker); String key = providerUrl.removeParameters("dynamic", "enabled").toFullString(); return key; } /** * Get the address of the providerUrl through the url of the invoker * * 通过调用者的地址获取providerUrl的地址 * @param origininvoker * @return */ private URL getProviderUrl(final Invoker<?> origininvoker) { String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); if (export == null || export.length() == 0) { throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl()); } URL providerUrl = URL.valueOf(export); return providerUrl; } key类似:dubbo://ip:port/com.alibaba.dubbo.study.day01.xml.service.EchoService?addListener.1.callback=true&addListener.retries=2&anyhost=true&application=echo-provider&bean.name=com.alibaba.dubbo.study.day01.xml.service.EchoService&bind.ip=169.254.22.149&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.study.day01.xml.service.EchoService&methods=echo,addListener&pid=3600&side=provider×tamp=1572674213180DubboProtocol.export(Invoker<T> invoker)
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如: // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 String key = serviceKey(url); // 创建dubbo的exporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 放入缓存中 exporterMap.put(key, exporter); //export an stub service for dispatching event //导出存根服务以调度事件 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //启动服务器 openServer(url); //优化序列化 optimizeSerialization(url); return exporter; }构造DubboExporter,放入缓存;存根服务处理(这里可以略过);启动服务器;优化序列化(非重点关心)
openServer(url);
private void openServer(URL url) { // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例 String key = url.getAddress(); //client can export a service which's only for server to invoke //客户端可导出仅用作服务调用的服务 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { //获取address对应的server, ExchangeServer server = serverMap.get(key); if (server == null) { //创建服务器实例并放入缓存中 serverMap.put(key, createServer(url)); } else { // server supports reset, use together with override // 服务器已创建,则根据 url 中的配置重置服务器 server.reset(url); } } }如上,在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置,让我们先来分析下创建服务器实例方法。
createServer(URL url)
private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default // 添加心跳检测配置到url中 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 获取server参数 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加编码解码器参数 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { //创建ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } /** * 获取client代表的Transporter是否存在,如果不存在抛出异常好了 */ str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中, // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }添加默认参数,然后检测指定的server类型(netty,minal,gizzly)的Transproter是否存在,使用Exchangers创建的bind方法创建server实例,检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。
Exchangers.bind(url, requestHandler)
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger(URL url) { //获取url中的exchanger属性,没有使用默认的header String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { //dubbo的spi机制 return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }通过Exchanger命名理解设计他的初衷,交换层,输入url和一个handler,输出server(比如类型为HeaderExchangeServer),然后dubbo本身支持spi扩展,所以exchanger也支持扩展(dubbo默认实现了一个扩展名称为header的Exchager即下文的HeaderExchanger)。
HeaderExchanger.bind(URL url, ExchangeHandler handler)
@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }调用Transporters.bind方法创建Server(包装handler为DecodeHandler),类型可以为Netty、Mina、Grizzly,使用HeaderExchageServer装饰创建出来的Server,HeaderExchangeServer为Server提供了发送空闲channel检测、channel重连的功能
Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }通过spi获取获取不同类型的Transporter,默认是NettyTransporter,至于为什么请看Transporter$Adaptive的bind方法
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Transporter$Adaptive implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.connect(arg0, arg1); } public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.bind(arg0, arg1); } }NettyTransporter.bind(URL url, ChannelHandler listener)
@Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }创建一个Netty类型的服务实例!!!NettyServer的类继承关系图如下
ChannelHandler:主要是处理channel的,比如channel关闭、连接,发送消息,接收消息等操作Resetable:该接口实现类可以实现例如Server相关属性的重置AbstractPeer:主要是保存了服务提供这协议的url和通过委托成员变量ChannelHandler实现了ChannelHandler接口的方法和EndPoint接口的方法 /** * channel事件处理器 */ private final ChannelHandler handler; /** * 第一个服务提供者协议的url地址 */ private volatile URL url; AbstractEndPoint:实现了Resetable接口reset方法,使得我们可以重置编码器、超时时间、连接超时时间 /** * 编码解码器 */ private Codec2 codec; /** * 超时时间 */ private int timeout; /** * 连接超时时间 */ private int connectTimeout; AbstractServer:该类实现了Server接口,作为Mina、Netty、Grizzly类型的服务端统一实现,并且该类重写了AbstractEndPoint的reset方法 ExecutorService executor; /** * url host:port地址。 */ private InetSocketAddress localAddress; /** * 如果是多网卡,并且指定了 bind.ip、bind.port,如果为空,与localAddress相同。 */ private InetSocketAddress bindAddress; /** * AbstractServer#accepts未使用到。 */ private int accepts; /** * 空闲时间 */ private int idleTimeout = 600; //600 seconds NettyServer:服务端实现类 /** * < ip:port, channel> 所有通道。 */ private Map<String, Channel> channels; // <ip:port, channel> /** * netty 服务端启动器。 */ private ServerBootstrap bootstrap; /** * 服务端监听通道。 */ private io.netty.channel.Channel channel; /** * Netty boss线程组(负责连接事件) */ private EventLoopGroup bossGroup; /** * work线程组(负责IO事件) */ private EventLoopGroup workerGroup; 回顾下上在DubboProtocol.openServer(URL url)代码中的如下代码片段可知其真正的实现在AbstractServer中。AbstractServer.reset(URL url)
@Override public void reset(URL url) { if (url == null) { return; } try { //url中是否包含accepts属性,存在并且大于0则重置Server的accepts if (url.hasParameter(Constants.ACCEPTS_KEY)) { int a = url.getParameter(Constants.ACCEPTS_KEY, 0); if (a > 0) { this.accepts = a; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { //url中是否包含idle.timeout属性,存在并且大于0则重置Server的idle.timeout if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) { int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0); if (t > 0) { this.idleTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { //url是否包含threads属性,并且executor是ThreadPoolExecutor的实例并且线程池未被关闭 if (url.hasParameter(Constants.THREADS_KEY) && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; //url中的threads数量 int threads = url.getParameter(Constants.THREADS_KEY, 0); //线程池最大容量 int max = threadPoolExecutor.getMaximumPoolSize(); //线程池核心线程数 int core = threadPoolExecutor.getCorePoolSize(); if (threads > 0 && (threads != max || threads != core)) { if (threads < core) { threadPoolExecutor.setCorePoolSize(threads); if (core == max) { threadPoolExecutor.setMaximumPoolSize(threads); } } else { threadPoolExecutor.setMaximumPoolSize(threads); if (core == max) { threadPoolExecutor.setCorePoolSize(threads); } } } } } catch (Throwable t) { logger.error(t.getMessage(), t); } //合并url中的参数并设置给AbstractPeer的url属性 super.setUrl(getUrl().addParameters(url.getParameters())); } /** * Add parameters to a new url. * * @param parameters parameters in key-value pairs * @return A new URL */ public URL addParameters(Map<String, String> parameters) { if (parameters == null || parameters.size() == 0) { return this; } //判断当前URL实例的parameter是否与参数中的parameters //是否相同,包括键和值,不全相同返回false boolean hasAndEqual = true; for (Map.Entry<String, String> entry : parameters.entrySet()) { String value = getParameters().get(entry.getKey()); if (value == null) { if (entry.getValue() != null) { hasAndEqual = false; break; } } else { if (!value.equals(entry.getValue())) { hasAndEqual = false; break; } } } // return immediately if there's no change //没有变化直接返回当前url if (hasAndEqual) return this; //如果有变化合并并且返回新的map Map<String, String> map = new HashMap<String, String>(getParameters()); map.putAll(parameters); return new URL(protocol, username, password, host, port, path, map); }覆盖NettyServer的accepts、idleTimeout、threadPoolExecutor的配置;通过调用AbstractPeer的setUrl方法设置新的Url成员变量的值。
HeaderExchangeServer.reset(URL url)
@Override public void reset(URL url) { server.reset(url); try { //url中是否包含heartbeat或者heartbeat.timeout属性 if (url.hasParameter(Constants.HEARTBEAT_KEY) || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) { int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat); int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3); if (t < h * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (h != heartbeat || t != heartbeatTimeout) { heartbeat = h; heartbeatTimeout = t; startHeartbeatTimer(); } } } catch (Throwable t) { logger.error(t.getMessage(), t); } } //发送心跳 private void startHeartbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } } //停止心跳那啥 private void stopHeartbeatTimer() { try { ScheduledFuture<?> timer = heartbeatTimer; if (timer != null && !timer.isCancelled()) { timer.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } finally { heartbeatTimer = null; } }可以看到HeaderExchageServer其实是在NettyServer的基础上增加了下面两个功能见HeartBeatTask.java
a. 对channel进行 空闲时间检测,超过则关闭连接,节省资源。b. 如果server关闭,则发送消息给client端,不再发送请求到该server。HeartBeatTask.run
@Override public void run() { try { long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue; } try { //channel上次读数据的时间戳 Long lastRead = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP); //上次写数据的时间戳 Long lastWrite = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); //读的时间戳不为null并且空闲读时间大于心跳 if ((lastRead != null && now - lastRead > heartbeat) //写的时间戳不为null并且空闲写时间大于心跳 || (lastWrite != null && now - lastWrite > heartbeat)) { /** * 发送心跳数据 */ Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } //空闲读时间戳不为null并且空闲读大于心跳超时时间 if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); //channel是Client则重新连接 if (channel instanceof Client) { try { ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } //否则关闭channel } else { channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } }NettyServer.java
构造函数代码如下:通过super关键字调用父类方法,解析url中的参数给AbstractServer、AbstractPeer、AbstractEndpoint属性赋值,值得注意的是AbstractServer中的doOpen()方法,该方法是个模板方法,子类实现doOpen方法启动服务器 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); //设置可接受最大连接数 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); //连接空闲时间 this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))
public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }Dispatcher通过dubbo的spi机制最终为AllDispatcher,我们来看下Dispatcher$Adaptive的代码就能知晓原因
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher { public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all"))); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])"); com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName); return extension.dispatch(arg0, arg1); } }AllDispatcher.dispatch(ChannelHandler handler,URL url)
/** * default thread pool configure */ public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }handler的包装流程如下
DubboProtocol.createServer #此时requestHandler为DubboProtocol中的匿名内部类实现ExchangeHandlerAdapter ->Exchangers.bind(url,requestHandler) #该方法对handler做了两次包装ExchangeHandlerAdapter->HeaderExchangeHandler->DecodeHandler ->HeaderExchanger.bind(url,handler) #到这里handler已经为DecodeHandler了 ->NettyServer(url,handler) #该方法首先通过dubbo的spi机制将handler包装为AllChannelHandler->HeartbeatHandler->MultiMessageHandler ->ChannelHandlers.wrapInternal(handler,url)MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> DubboProtocol.requestHandler
MultiMessageHandler:检查消息是否为MutiMessage ,如果 是,分开单条调用后续handlerHeartbeatHanlder:1.在每个channel动作,对channel标记时间属性, 2. 检查是否心跳请求,是则直接返回心跳,不继续后续请求。 AllChannelHandler : 1. 将后续handler 包装成 ChannelEventRunnable,捕获后续执行的异常,记录日志 。 2. 包装的runnable 放到独立线程池运行, 达到全流程异步化效果。DecodeHandler:判断message的种类然后解码Netty.doOpen()
@Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }Netty服务端启动的标准模板:主要关注的点就是添加的编码器、解码器、NettyServerHandler。
目前的handler包装链如下:
NettyServerHandler->NettyServer-> MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> DubboProtocol.requestHandler
NettyCodecAdapter.java
成员变量如下 //内部编码 private final ChannelHandler encoder = new InternalEncoder(); //内部解码器 private final ChannelHandler decoder = new InternalDecoder(); //AbstractEndpoint中的编解码属性codec怎么获取的参考AbstractEndpoit.getChannelCodec(URL url) private final Codec2 codec;
InternalEncoder.java
private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } } }委托给编解码器codec处理
InternalDecoder.java
private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message); } catch (IOException e) { throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } if (msg != null) { out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } }委托给编码器codec处理
codec是怎么获取到的?
在doOpen中有以下代码getCodec()
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);getCodec();该方法通过父类AbstractEndpoint继承而来,我们看下codec在AbstractEndpoint的初始化逻辑
protected static Codec2 getChannelCodec(URL url) { //获取url中的codec属性默认值为telnet String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); //如果Codec2的SPI实现类包含名称为codecName编解码器的扩展直接加载返回 if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { //加载Spi扩展Codec的名称为codecName的编解码器 return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } }未完待续....
