1.首先有一个main()线程 2.在main()线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络通信连接(connect),一个负责监听(listener). 3.通过connect线程将注册的监听事件发送给Zookeeper. 4.在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。 5.Zookeeper监听到有数据或路径变化,就会将这个消息发送给Listener进程。 6.listener线程内部调用process()方法。 7.常见的监听 1)监听节点数据的变化 get path[watch] 2)监听子节点的增减情况 ls path[watch]
1.流程图
1.创建Maven工程,项目名为zookeeper 2.导入相关jar包,修改pom.xml文件 注意:zookeeper的jar包版本最好与zookeeper服务端版本一致。 3.添加日志配置 需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入如下内容。
log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.file=E:/Log/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n4.创建一个测试类 在·项目的src/main/java目录下,创建一个名字为com.zookeeper.test的包,在这个包下创建一个为TestZookeeper的类。在类中添加一个init()方法。 package com.zookeeper.test;
import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test;
import java.util.List;
public class TestZookeeper {
private String connectString="47.102.222.48:2181"; private int sessionTimeout=2000; private ZooKeeper zooKeeper; @Test public void init() throws Exception{ zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); }} connectString用于指定Zookeeper服务的ip地址和端口号,sessionTimeout用于指定超时时间。 5.运行测试类 可以在控制台看到以下信息。 6.创建一个节点 在TestZookeeper类中添加一个createNode()方法用于创建一个节点,同时将init()方法的@Test注解改为@Before,运行createNode()方法。创建一个/world节点,值为more。
package com.zookeeper.test; import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test; import java.util.List; public class TestZookeeper { private String connectString="47.102.222.48:2181"; private int sessionTimeout=2000; private ZooKeeper zooKeeper; @Before public void init() throws Exception{ zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { }); } @Test public void createNode() throws KeeperException,InterruptedException{ String path=zooKeeper.create("/world","more".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); } }控制台输出创建信息。 查看zookeeper节点情况 7.监听节点数据变化 添加getDataAndWatch()方法获取节点数据,实现process()方法。
package com.zookeeper.test; import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test; import java.util.List; public class TestZookeeper { private String connectString="47.102.222.48:2181"; private int sessionTimeout=2000; private ZooKeeper zooKeeper; @Before public void init() throws Exception{ zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----------start-----------"); List<String> children; try { children = zooKeeper.getChildren("/", true); for (String child : children) { System.out.println(child); } System.out.println("-----------end-----------"); }catch (KeeperException e){ e.printStackTrace(); }catch (InterruptedException e){ e.printStackTrace(); } } }); } @Test public void createNode() throws KeeperException,InterruptedException{ String path=zooKeeper.create("/world","more".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); } @Test public void getDataAndWatch() throws KeeperException,InterruptedException{ List<String> children=zooKeeper.getChildren("/",true); for(String child:children){ System.out.println(child); } Thread.sleep(Long.MAX_VALUE); } }执行如下命令在zookeeper的中添加一个/hello节点。
create /hello "world"可以在控制台输出zookeeper节点的变化情况 8.判断节点是否存在 添加exist()方法用于判断节点是否存在。
@Test public void exist() throws Exception{ Stat stat=zooKeeper.exists("/world",false); System.out.println(stat==null ? "not exist":"exist"); }控制台输出判断信息
通过以上几步代码,可以使用zookeeper的API连接zookeeper服务端,设置监听,监听zookeeper节点数据变化,调用相关方法可以判断指定节点是否存在。
