Dubbo源码分析之CuratorZookeeperClient与CuratorZookeeperTransporter

mac2025-03-26  16

一、概述

Dubbo使用Zookeeper作为service注册中心时,provider发布是在zookeeper服务器上创建/dubbo父节点,然后再在上面创建以url为路径的子节点,Customer获取子节点路径进行解析,建立Netty连接,调用服务,获取结果。Dubbo默认使用开源的Curator作为zookeeper的客户端,并进行了基本的API封装,下面我们来分析CuratorZookeeperClient的源码。

二、ZookeeperClient

ZookeeperClient接口,暴露create,delete,getChildren等操作节点方法,并可以添加连接状态监听器、节点数据信息监听器。

public interface ZookeeperClient { void create(String path, boolean ephemeral); void delete(String path); List<String> getChildren(String path); List<String> addChildListener(String path, ChildListener listener); /** * @param path: directory. All of child of path will be listened. * @param listener */ void addDataListener(String path, DataListener listener); /** * @param path: directory. All of child of path will be listened. * @param listener * @param executor another thread */ void addDataListener(String path, DataListener listener, Executor executor); void removeDataListener(String path, DataListener listener); void removeChildListener(String path, ChildListener listener); void addStateListener(StateListener listener); void removeStateListener(StateListener listener); boolean isConnected(); void close(); URL getUrl(); void create(String path, String content, boolean ephemeral); String getContent(String path); }

AbstractZookeeperClient是实现ZookeeperClient接口的抽象类,为子类提供模板方法。

public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient { protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class); private final URL url; // 连接状态监听器队列,是一个COW的ArraySet private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>(); // 子节点监听器,key为父节点path private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>(); // 节点数据监听器,key为父节点path private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>(); //多线程下可见的属性 closed private volatile boolean closed = false; public AbstractZookeeperClient(URL url) { this.url = url; } @Override public URL getUrl() { return url; } // 创建节点,第二个参数为是否为临时节点 @Override public void create(String path, boolean ephemeral) { if (!ephemeral) { // 持久节点检查path是否存在,存在方法返回 if (checkExists(path)) { return; } } // 以‘/’ 为分隔符,递归调用创建父节点,zookeeper的父节点一定是持久化节点,第二个参数false int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } // 调用子类不同的方法创建节点 if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } } // 添加状态监听器 @Override public void addStateListener(StateListener listener) { stateListeners.add(listener); } // 删除状态监听器 @Override public void removeStateListener(StateListener listener) { stateListeners.remove(listener); } // 获取连接session的监听器 public Set<StateListener> getSessionListeners() { return stateListeners; } // 添加子节点监听器,并根据子节点监听器,创建zookeeper curator客户端需要的TargetChildListener @Override public List<String> addChildListener(String path, final ChildListener listener) { ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); if (listeners == null) { childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>()); listeners = childListeners.get(path); } TargetChildListener targetListener = listeners.get(listener); if (targetListener == null) { listeners.putIfAbsent(listener, createTargetChildListener(path, listener)); targetListener = listeners.get(listener); } return addTargetChildListener(path, targetListener); } // 添加节点数据监听器 @Override public void addDataListener(String path, DataListener listener) { this.addDataListener(path, listener, null); } // 添加节点数据监听器,并根据节点数据监听器,创建zookeeper curator客户端需要的TargetDataListener @Override public void addDataListener(String path, DataListener listener, Executor executor) { ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path); if (dataListenerMap == null) { listeners.putIfAbsent(path, new ConcurrentHashMap<DataListener, TargetDataListener>()); dataListenerMap = listeners.get(path); } TargetDataListener targetListener = dataListenerMap.get(listener); if (targetListener == null) { dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener)); targetListener = dataListenerMap.get(listener); } addTargetDataListener(path, targetListener, executor); } // 移除节点数据监听器 @Override public void removeDataListener(String path, DataListener listener ){ ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = listeners.get(path); if (dataListenerMap != null) { TargetDataListener targetListener = dataListenerMap.remove(listener); if(targetListener != null){ removeTargetDataListener(path, targetListener); } } } // 移除子节点监听器 @Override public void removeChildListener(String path, ChildListener listener) { ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); if (listeners != null) { TargetChildListener targetListener = listeners.remove(listener); if (targetListener != null) { removeTargetChildListener(path, targetListener); } } } // 时间监听调用 protected void stateChanged(int state) { for (StateListener sessionListener : getSessionListeners()) { sessionListener.stateChanged(state); } } // 关闭客户端,模板方法,实际调用doClose方法,并进行异常捕获 @Override public void close() { if (closed) { return; } closed = true; try { doClose(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } // 创建节点和数据 @Override public void create(String path, String content, boolean ephemeral) { if (checkExists(path)) { delete(path); } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } if (ephemeral) { createEphemeral(path, content); } else { createPersistent(path, content); } } // 获取节点内容 @Override public String getContent(String path) { if (!checkExists(path)) { return null; } return doGetContent(path); } protected abstract void doClose(); protected abstract void createPersistent(String path); protected abstract void createEphemeral(String path); protected abstract void createPersistent(String path, String data); protected abstract void createEphemeral(String path, String data); protected abstract boolean checkExists(String path); protected abstract TargetChildListener createTargetChildListener(String path, ChildListener listener); protected abstract List<String> addTargetChildListener(String path, TargetChildListener listener); protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener); protected abstract void addTargetDataListener(String path, TargetDataListener listener); protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor); protected abstract void removeTargetDataListener(String path, TargetDataListener listener); protected abstract void removeTargetChildListener(String path, TargetChildListener listener); protected abstract String doGetContent(String path); }

CuratorZookeeper继承AbstractZookeeperClient类,TargetDataListener, TargetChildListener是同时实现Curator的CuratorWatcher, TreeCacheListener接口的CuratorWatcherImpl类构造方法CuratorZookeeperClient即创建CuratorFramework客户端,获取zookeeper连接,不了解Curator客户端的童鞋,可以参考我的另一篇文章学习。

https://blog.csdn.net/qq_33513250/article/details/102677481

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> { protected static final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class); static final Charset CHARSET = Charset.forName("UTF-8"); private final CuratorFramework client; private Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>(); public CuratorZookeeperClient(URL url) { super(url); try { int timeout = url.getParameter(TIMEOUT_KEY, 5000); // 创建CuratorFramework客户端 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(timeout); String authority = url.getAuthority(); // 添加授权信息 if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } client = builder.build(); // 添加连接状态监听 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { // 调用父类方法,发布连接丢失事件 CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { // 发布连接事件 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { // 发布重连事件 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); // 客户端启动 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 客户端创建持久几点 public void createPersistent(String path) { try { client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 客户端创建临时节点 public void createEphemeral(String path) { try { client.create().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 客户端创建持久节点,如果节点存在,则设置数据 protected void createPersistent(String path, String data) { byte[] dataBytes = data.getBytes(CHARSET); try { client.create().forPath(path, dataBytes); } catch (NodeExistsException e) { try { client.setData().forPath(path, dataBytes); } catch (Exception e1) { throw new IllegalStateException(e.getMessage(), e1); } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 客户端创建临时节点,节点存在则设置数据 protected void createEphemeral(String path, String data) { byte[] dataBytes = data.getBytes(CHARSET); try { client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes); } catch (NodeExistsException e) { try { client.setData().forPath(path, dataBytes); } catch (Exception e1) { throw new IllegalStateException(e.getMessage(), e1); } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 删除节点 public void delete(String path) { try { client.delete().forPath(path); } catch (NoNodeException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 获取子节点 public List<String> getChildren(String path) { try { return client.getChildren().forPath(path); } catch (NoNodeException e) { return null; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override // 检测节点是否存在 public boolean checkExists(String path) { try { if (client.checkExists().forPath(path) != null) { return true; } } catch (Exception e) { } return false; } @Override public boolean isConnected() { return client.getZookeeperClient().isConnected(); } @Override // 获取节点内容 public String doGetContent(String path) { try { byte[] dataBytes = client.getData().forPath(path); return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET); } catch (NoNodeException e) { // ignore NoNode Exception. } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } return null; } @Override public void doClose() { client.close(); } @Override public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) { return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener); } @Override // 子节点添加监听 public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) { try { return client.getChildren().usingWatcher(listener).forPath(path); } catch (NoNodeException e) { return null; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) { return new CuratorWatcherImpl(client, listener); } @Override // 节点数据添加监听 protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) { this.addTargetDataListener(path, treeCacheListener, null); } @Override // 使用treeCache为节点数据添加监听 protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) { try { TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build(); treeCacheMap.putIfAbsent(path, treeCache); treeCache.start(); if (executor == null) { treeCache.getListenable().addListener(treeCacheListener); } else { treeCache.getListenable().addListener(treeCacheListener, executor); } } catch (Exception e) { throw new IllegalStateException("Add treeCache listener for path:" + path, e); } } @Override protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) { TreeCache treeCache = treeCacheMap.get(path); if (treeCache != null) { treeCache.getListenable().removeListener(treeCacheListener); } treeCacheListener.dataListener = null; } }

CuratorWatcherImpl是CuratorZookeeperClient的内部类,实现了CuratorWatcher, TreeCacheListener接口,是ChildListener和DataListener的Curator监听适配器,分别对子节点和当前节点数据进行监听。

static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener { private CuratorFramework client; private volatile ChildListener childListener; private volatile DataListener dataListener; public CuratorWatcherImpl(CuratorFramework client, ChildListener listener) { this.client = client; this.childListener = listener; } public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) { this.dataListener = dataListener; } protected CuratorWatcherImpl() { } public void unwatch() { this.childListener = null; } @Override // 适配childListener监听WatchedEvent子节点信息 public void process(WatchedEvent event) throws Exception { if (childListener != null) { String path = event.getPath() == null ? "" : event.getPath(); childListener.childChanged(path, // if path is null, curator using watcher will throw NullPointerException. // if client connect or disconnect to server, zookeeper will queue // watched event(Watcher.Event.EventType.None, .., path = null). StringUtils.isNotEmpty(path) ? client.getChildren().usingWatcher(this).forPath(path) : Collections.<String>emptyList()); } } @Override // 适配dataListener监听TreeCacheEvent节点创建,更新,删除等事件 public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { if (dataListener != null) { if (logger.isDebugEnabled()) { logger.debug("listen the zookeeper changed. The changed data:" + event.getData()); } TreeCacheEvent.Type type = event.getType(); EventType eventType = null; String content = null; String path = null; switch (type) { case NODE_ADDED: eventType = EventType.NodeCreated; path = event.getData().getPath(); content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET); break; case NODE_UPDATED: eventType = EventType.NodeDataChanged; path = event.getData().getPath(); content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET); break; case NODE_REMOVED: path = event.getData().getPath(); eventType = EventType.NodeDeleted; break; case INITIALIZED: eventType = EventType.INITIALIZED; break; case CONNECTION_LOST: eventType = EventType.CONNECTION_LOST; break; case CONNECTION_RECONNECTED: eventType = EventType.CONNECTION_RECONNECTED; break; case CONNECTION_SUSPENDED: eventType = EventType.CONNECTION_SUSPENDED; break; } dataListener.dataChanged(path, content, eventType); } } }

三、ZookeeperTransporter

Dubbo的网络连接分为连接层和传输层,ZookeeperClient是操作zookeeper的具体连接,ZookeeperTransporter则是负责客户端管理的连接池ZookeeperTransporter接口中的唯一方法即是连接Url获取ZookeeperClient。

@SPI("curator") public interface ZookeeperTransporter { @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) ZookeeperClient connect(URL url); }

AbstractZookeeperTransporter是实现ZookeeperTransporter的抽象类,主要用zookeeperClientMap作缓存,为 ZookeeperClient提供连接池的特性,并提供createZookeeperClient的模板方法供子类实现。

public abstract class AbstractZookeeperTransporter implements ZookeeperTransporter { private static final Logger logger = LoggerFactory.getLogger(ZookeeperTransporter.class); private final Map<String, ZookeeperClient> zookeeperClientMap = new ConcurrentHashMap<>(); /** * share connnect for registry, metadata, etc.. * <p> * Make sure the connection is connected. * * @param url * @return */ @Override public ZookeeperClient connect(URL url) { ZookeeperClient zookeeperClient; List<String> addressList = getURLBackupAddress(url); // The field define the zookeeper server , including protocol, host, port, username, password if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // avoid creating too many connections, so add lock synchronized (zookeeperClientMap) { if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } zookeeperClient = createZookeeperClient(toClientURL(url)); logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url); writeToClientMap(addressList, zookeeperClient); } return zookeeperClient; } /** * @param url the url that will create zookeeper connection . * The url in AbstractZookeeperTransporter#connect parameter is rewritten by this one. * such as: zookeeper://127.0.0.1:2181/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter * @return */ protected abstract ZookeeperClient createZookeeperClient(URL url); /** * get the ZookeeperClient from cache, the ZookeeperClient must be connected. * <p> * It is not private method for unit test. * * @param addressList * @return */ ZookeeperClient fetchAndUpdateZookeeperClientCache(List<String> addressList) { ZookeeperClient zookeeperClient = null; for (String address : addressList) { if ((zookeeperClient = zookeeperClientMap.get(address)) != null && zookeeperClient.isConnected()) { break; } } if (zookeeperClient != null && zookeeperClient.isConnected()) { writeToClientMap(addressList, zookeeperClient); } return zookeeperClient; } /** * get all zookeeper urls (such as :zookeeper://127.0.0.1:2181?127.0.0.1:8989,127.0.0.1:9999) * * @param url such as:zookeeper://127.0.0.1:2181?127.0.0.1:8989,127.0.0.1:9999 * @return such as 127.0.0.1:2181,127.0.0.1:8989,127.0.0.1:9999 */ List<String> getURLBackupAddress(URL url) { List<String> addressList = new ArrayList<String>(); addressList.add(url.getAddress()); addressList.addAll(url.getParameter(RemotingConstants.BACKUP_KEY, Collections.EMPTY_LIST)); return addressList; } /** * write address-ZookeeperClient relationship to Map * * @param addressList * @param zookeeperClient */ void writeToClientMap(List<String> addressList, ZookeeperClient zookeeperClient) { for (String address : addressList) { zookeeperClientMap.put(address, zookeeperClient); } } /** * redefine the url for zookeeper. just keep protocol, username, password, host, port, and individual parameter. * * @param url * @return */ URL toClientURL(URL url) { Map<String, String> parameterMap = new HashMap<>(); // for CuratorZookeeperClient if (url.getParameter(TIMEOUT_KEY) != null) { parameterMap.put(TIMEOUT_KEY, url.getParameter(TIMEOUT_KEY)); } if (url.getParameter(RemotingConstants.BACKUP_KEY) != null) { parameterMap.put(RemotingConstants.BACKUP_KEY, url.getParameter(RemotingConstants.BACKUP_KEY)); } return new URL(url.getProtocol(), url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), ZookeeperTransporter.class.getName(), parameterMap); } /** * for unit test * * @return */ Map<String, ZookeeperClient> getZookeeperClientMap() { return zookeeperClientMap; } }

 CuratorZookeeperTransporter继承AbstractZookeeperTransporter类,创建CuratorZookeeperClient客户端实例。

public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter { @Override public ZookeeperClient createZookeeperClient(URL url) { return new CuratorZookeeperClient(url); } }

 

最新回复(0)