什么是通知机制?
什么是watch事件?
测试代码(单通知)
测试代码(多通知)
客户端注册监听它关心的目录节点,当节点发生变化时(数据增删改、子目录节点增删改)时,zookeeper就会通知客户端。
通知机制是观察者(watch)的模式,异步回调的触发机制。
zookeeper支持watch的概念,客户端可在每个znode节点上设置一个观察。如被观察服务端的znode节点有变更,那么watch就会被触发,这watch所属的客户端就会收到一个通知包,被告知节点已经发生变化,把对应的事件通知给设置过的Watcher的Client端。
zookeeper所有读操作:getData(),getChildren()和exists()都有设置watch的选项,watch只会通知一次!
(1)一次触发
当数据发生变化时,zkServer向zkClient发送一个watch,这是一次性的动作(只监控一次),即触发一次就不再有效,好比一次性水杯。如果想继续watch,需要client重新设置watcher!
(2)发送客户端
wathes(通知)是异步发送给客户端的,zookeeper提供一个顺序保证:在监控到watch之间绝不会发生变化,这样不同的客户端看的是一致性的顺序。
(3)为数据设置watch
节点有不同的改动方式,zookeeper维护两个观察列表:数据观察和子节点观察。读操作设置watch,写操作触发watch。
(4)时序性和一致性
watches是Client连接到zookeeper服务器端的本地维护,watches是轻量级的,可维护和派发的! ①watches和其他事件、wathes和异步回复都是有序的,zookeeper客户端保证了每件事件有序派发;
②客户端在看到新数据之前先到watch事件; ③对应更新顺序的watches事件顺序由zookeeper服务所见。
服务端节点进行了修改的动作,通知就会发送客户端!
再次对节点进行修改的操作,客户端就不能收到通知,因为watch只会通知一次,这里我们需要注意!那么,如何让服务端通知多次了,请看下面的操作…
获得值之后设置一个观察者watcher,如果该路径下节点的值发生了变化,要求通知Cilent端继续观察,又再次获得新的值的同时在重新设置一个观察者,继续观察并获得值…
/** * @author lindaxia * @date 2019/10/31 16:45 * watch一直通知 */ public class ZK_WatchDemo { public static final String CONNECTSTRING = "192.168.43.111:2181"; public static final String PATH = "/zk-watch-more"; public static final int SESSION_TIMEOUT = 20 * 1000; //Session失效时间 //实例变量 private ZooKeeper zk=null; private String oldValue = null; private String newValue = null; public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public static String getCONNECTSTRING() { return CONNECTSTRING; } public String getOldValue() { return oldValue; } public void setOldValue(String oldValue) { this.oldValue = oldValue; } public String getNewValue() { return newValue; } public void setNewValue(String newValue) { this.newValue = newValue; } /** * 通过Java程序,新建连接Zookeeper * * @return ZooKeeper 返回类型 * @throws IOException */ public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } /** * 创建节点 create -s path data //创建持久化序号节点 * * @param path * @param data * @throws Exception */ public void createZookeeper(String path, String data) throws Exception { //路径+数据+权限 zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 获取节点 get path [watch] * * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getZNode(String path) throws KeeperException, InterruptedException { String result = ""; byte[] data = zk.getData(path, new Watcher() { @Override //查询节点需要派一个watch public void process(WatchedEvent watchedEvent) { try { triggerValue(path); //回调函数 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }, new Stat()); result = new String(data); oldValue=result; return result; } public boolean triggerValue(String path) throws KeeperException, InterruptedException { String result = ""; byte[] byteArray = zk.getData(path, new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); } catch (KeeperException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }, new Stat()); result = new String(byteArray); newValue = result; if (oldValue.equals(newValue)) { System.out.println("================节点没有变化========="); return false; } else { System.out.println("=newValue=" + newValue + "\t" + "=oldValue=" + oldValue); oldValue = newValue; return true; } } public static void main(String[] args) throws Exception { ZK_WatchDemo zk_watchDemo = new ZK_WatchDemo(); zk_watchDemo.setZk(zk_watchDemo.startZK()); if (zk_watchDemo.getZk().exists(PATH, false) == null) { zk_watchDemo.createZookeeper(PATH, "AAA"); String zNode = zk_watchDemo.getZNode(PATH); System.out.println("====测试获取zNode节点信息========" + zNode); } else { System.out.println("zNode节点已经存在"); } Thread.sleep(Long.MAX_VALUE);//一直睡 } }☝上述分享来源个人总结,如果分享对您有帮忙,希望您积极转载;如果您有不同的见解,希望您积极留言,让我们一起探讨,您的鼓励将是我前进道路上一份助力,非常感谢!我会不定时更新相关技术动态,同时我也会不断完善自己,提升技术,希望与君同成长同进步!
☞本人博客:https://coding0110lin.blog.csdn.net/ 欢迎转载,一起技术交流吧!