【第5章】Zookeeper重要组成部分之通知机制(非常重要)

mac2024-11-08  14

笔记大纲

什么是通知机制?

什么是watch事件?

测试代码(单通知)

测试代码(多通知)


一、什么是通知机制?

  客户端注册监听它关心的目录节点,当节点发生变化时(数据增删改、子目录节点增删改)时,zookeeper就会通知客户端。

  通知机制是观察者(watch)的模式,异步回调的触发机制。

  zookeeper支持watch的概念,客户端可在每个znode节点上设置一个观察。如被观察服务端的znode节点有变更,那么watch就会被触发,这watch所属的客户端就会收到一个通知包,被告知节点已经发生变化,把对应的事件通知给设置过的Watcher的Client端。

  zookeeper所有读操作:getData(),getChildren()和exists()都有设置watch的选项,watch只会通知一次!

二、什么是watch事件?

(1)一次触发

  当数据发生变化时,zkServer向zkClient发送一个watch,这是一次性的动作(只监控一次),即触发一次就不再有效,好比一次性水杯。如果想继续watch,需要client重新设置watcher!

(2)发送客户端

  wathes(通知)是异步发送给客户端的,zookeeper提供一个顺序保证:在监控到watch之间绝不会发生变化,这样不同的客户端看的是一致性的顺序。

(3)为数据设置watch

  节点有不同的改动方式,zookeeper维护两个观察列表:数据观察和子节点观察。读操作设置watch,写操作触发watch。

(4)时序性和一致性

  watches是Client连接到zookeeper服务器端的本地维护,watches是轻量级的,可维护和派发的!   ①watches和其他事件、wathes和异步回复都是有序的,zookeeper客户端保证了每件事件有序派发;

  ②客户端在看到新数据之前先到watch事件; ​   ③对应更新顺序的watches事件顺序由zookeeper服务所见。

三、测试代码(单通知)

/** * @author lindaxia * @date 2019/10/31 16:45 * watch只通知一次 */ public class ZK_WatchDemo { public static final String CONNECTSTRING = "192.168.43.111:2181"; public static final String PATH = "/zk-watch-one"; public static final int SESSION_TIMEOUT = 20 * 1000; //Session失效时间 //实例变量 private ZooKeeper zk; public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } /** * 通过Java程序,新建连接Zookeeper * * @return * @throws IOException */ public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } /** * 创建节点 create -s path data //创建持久化序号节点 * * @param path * @param data * @throws Exception */ public void createZookeeper(String path, String data) throws Exception { //路径+数据+权限 zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 获取节点 get path [watch] * * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getZNode(String path) throws KeeperException, InterruptedException { String result = ""; byte[] data = zk.getData(path, new Watcher() { @Override //查询节点需要派一个watch public void process(WatchedEvent watchedEvent) { try { triggerValue(path); //回调函数 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }, new Stat()); result = new String(data); return result; } public void triggerValue(String path) throws KeeperException, InterruptedException { String result = ""; byte[] data = zk.getData(path, false, new Stat()); result = new String(data); System.out.println("=========triggerValue======="+result); } public static void main(String[] args) throws Exception{ ZK_WatchDemo zk_watchDemo = new ZK_WatchDemo(); zk_watchDemo.setZk(zk_watchDemo.startZK()); if(zk_watchDemo.getZk().exists(PATH,false)==null){ zk_watchDemo.createZookeeper(PATH, "AAA"); String zNode = zk_watchDemo.getZNode(PATH); System.out.println("====测试获取zNode节点信息========"+zNode); }else{ System.out.println("zNode节点已经存在"); } Thread.sleep(Long.MAX_VALUE);//一直睡 } }
Cilent端控制台1-1

Server端(CentOS7.0)1-2

修改Server端(CentOS7.0)2-1

查看Cilent端控制台2-2

服务端节点进行了修改的动作,通知就会发送客户端!

小结

再次对节点进行修改的操作,客户端就不能收到通知,因为watch只会通知一次,这里我们需要注意!那么,如何让服务端通知多次了,请看下面的操作…

四、测试代码(多通知)

  获得值之后设置一个观察者watcher,如果该路径下节点的值发生了变化,要求通知Cilent端继续观察,又再次获得新的值的同时在重新设置一个观察者,继续观察并获得值…

/** * @author lindaxia * @date 2019/10/31 16:45 * watch一直通知 */ public class ZK_WatchDemo { public static final String CONNECTSTRING = "192.168.43.111:2181"; public static final String PATH = "/zk-watch-more"; public static final int SESSION_TIMEOUT = 20 * 1000; //Session失效时间 //实例变量 private ZooKeeper zk=null; private String oldValue = null; private String newValue = null; public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public static String getCONNECTSTRING() { return CONNECTSTRING; } public String getOldValue() { return oldValue; } public void setOldValue(String oldValue) { this.oldValue = oldValue; } public String getNewValue() { return newValue; } public void setNewValue(String newValue) { this.newValue = newValue; } /** * 通过Java程序,新建连接Zookeeper * * @return ZooKeeper 返回类型 * @throws IOException */ public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } /** * 创建节点 create -s path data //创建持久化序号节点 * * @param path * @param data * @throws Exception */ public void createZookeeper(String path, String data) throws Exception { //路径+数据+权限 zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 获取节点 get path [watch] * * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getZNode(String path) throws KeeperException, InterruptedException { String result = ""; byte[] data = zk.getData(path, new Watcher() { @Override //查询节点需要派一个watch public void process(WatchedEvent watchedEvent) { try { triggerValue(path); //回调函数 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }, new Stat()); result = new String(data); oldValue=result; return result; } public boolean triggerValue(String path) throws KeeperException, InterruptedException { String result = ""; byte[] byteArray = zk.getData(path, new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); } catch (KeeperException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }, new Stat()); result = new String(byteArray); newValue = result; if (oldValue.equals(newValue)) { System.out.println("================节点没有变化========="); return false; } else { System.out.println("=newValue=" + newValue + "\t" + "=oldValue=" + oldValue); oldValue = newValue; return true; } } public static void main(String[] args) throws Exception { ZK_WatchDemo zk_watchDemo = new ZK_WatchDemo(); zk_watchDemo.setZk(zk_watchDemo.startZK()); if (zk_watchDemo.getZk().exists(PATH, false) == null) { zk_watchDemo.createZookeeper(PATH, "AAA"); String zNode = zk_watchDemo.getZNode(PATH); System.out.println("====测试获取zNode节点信息========" + zNode); } else { System.out.println("zNode节点已经存在"); } Thread.sleep(Long.MAX_VALUE);//一直睡 } }

 ☝上述分享来源个人总结,如果分享对您有帮忙,希望您积极转载;如果您有不同的见解,希望您积极留言,让我们一起探讨,您的鼓励将是我前进道路上一份助力,非常感谢!我会不定时更新相关技术动态,同时我也会不断完善自己,提升技术,希望与君同成长同进步!

☞本人博客:https://coding0110lin.blog.csdn.net/  欢迎转载,一起技术交流吧!

最新回复(0)