03-zookeeper基础

mac2022-06-30  29

文章目录

1. zookeeper客户端命令操作2. zookeeper内部原理2.1 持久化节点和临时节点2.2 Stat结构体2.3 监听原理2.4paxos算法2.5选举机制2.6写数据流程 3.API操作3.1zk客户端操作3.2动态上下线3.3同步线程锁

1. zookeeper客户端命令操作

1.启动zookeeper客户端 zkCli.sh 2.创建普通节点 create /iweb "jianhau" 3.获取节点的值 get /iweb 4.创建短暂节点 create -e /iweb 6.监听节点的变化 ls /iweb watch 5.退出 quit

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v9afaE3X-1570004773361)(11886AE15C5E4EE284904D287986A1BD)]

2. zookeeper内部原理

2.1 持久化节点和临时节点

持久化节点是当客户端和zookeeper断开连接后,该节点依旧存在临时节点是当客户端和zookeeper断开连接后,节点自动删除

2.2 Stat结构体

[zk: localhost:2181(CONNECTED) 5] get /sanguo jinlian cZxid = 0x100000003 ==创建节点的事务id== ctime = Wed Aug 29 00:03:23 CST 2018 ==被创建的好毫秒数== mZxid = 0x100000003 ==最后更新的事务== mtime = Wed Aug 29 00:03:23 CST 2018 ==最后更改的毫秒数== pZxid = 0x100000004 ==最后更新的子节点id== cversion = 1 ==子节点修改次数== dataVersion = 0 ==数据变化号== aclVersion = 0 ==访问被控制列表的变化号== ephemeralOwner = 0x0 ==如果是临时节点,这个是拥有者的session id 如果不是临时节点就是0== dataLength = 7 ==数据长度== numChildren = 1 ==子节点数量==

2.3 监听原理

首先创建一个main线程在main线程中创建一个客户端,这是就会创建两个线程connect,Listener,一负责通信,一个负责监听通过connect注册监听在zookeeper的注册监听器列表中添加注册的监听事件监听到有数据或者路径变化时通过,就会将消息发送给listener线程listener线程内部调用了process方法 常见的监听 1.监听数据 get path [watch] 2.监听子节点的增减变化 ls path [watch]

2.4paxos算法

当节点发生变化后,会汇报给zkserver,此时zkserver收到一个zxid如果zxid大于自己当前的zxid,先记录下来,然后同步给其他的zkserver如果超过半数的zkserver同意后即生效,更新后同步给其他的zkserver,修改自己的zxid

2.5选举机制

半数机制:集群中半数以上的集群存活,集群可用,所以安装奇数台服务器虽然在配置文件中没有指定的主从,但是会选举产生一个leader和其他的follower如果有五台机器 第一台上线后先投自己一票第二台上线后,先头自己一票,由于他的id比第一台的大,所以第一台改投第二台,此时第一台0票,第二台2票,但是少于3,仍然不能成为leader第三台上线后,此时服务器都会改选票给服务器3,此时他又三票,成为leader,其他状态为follower第四台启动,123不会交换选票信息,第四台只有一票少数服从多数

2.6写数据流程

client向server1发送写数据请求,如果server1不是leader,那么server1会把请求转发给leaderleader把写请求广播给followfollow返回信息并把请求放入待写队列中,并返回成功信息当leader收到半数以上的成功信息后,说明该写操作可以执行leader向各个server发送提交信息,各个server收到后落实写请求,操作成功返回给客户端操作成功

3.API操作

3.1zk客户端操作

public class Zkutils { private static int Session_Time_Out = 300000; private static ZooKeeper zk = null; //创建节点 public static void CreateNodes() throws KeeperException, InterruptedException { String path = "/test"; byte [] bytes = "hello zk ".getBytes(); String result = zk.create(path, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(result); } //判断节点是否存在 public static void NodeExist()throws Exception{ String path ="/test"; Stat stat = zk.exists(path, false); System.out.println(stat); } //获取数据 public static void getData() throws KeeperException, InterruptedException { String path = "/test"; Stat stat = new Stat(); byte[] data = zk.getData(path, false, stat); System.out.println(new String(data, Charset.forName("UTF-8"))); } //存储数据 public static void setData() throws KeeperException, InterruptedException { String path ="/test"; Stat stat = zk.setData(path, "hellozk".getBytes(), -1); System.out.println(stat); } //删除节点 public static void DeleteNode() throws KeeperException, InterruptedException { String path ="/test"; zk.delete(path,-1); } //测试 public static void main(String[] args) { try { zk = new ZooKeeper("bigdata1:2181,bigdata2:2181,bigdata3:2181", Session_Time_Out, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); } }); // CreateNodes(); // setData(); // getData(); DeleteNode(); NodeExist(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }

3.2动态上下线

public class DynamicUpDown { private ZooKeeper zk = null; private static int SESSION_TIME_OUT = 300000; private static String HOSTS = "bigdata1:2181,bigdata2:2181,bigdata3:2181"; private static List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static String PRAENT_NODE = "/hosts"; private List<String> serverlist = new ArrayList<>(); public void Init() throws Exception{ zk = new ZooKeeper(HOSTS, SESSION_TIME_OUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { String path = watchedEvent.getPath(); Event.EventType type = watchedEvent.getType(); //打印出当前监听事件类型 System.out.println(type); //判断子节点的变化 if (Event.EventType.NodeChildrenChanged==type&&path.equals(PRAENT_NODE)){ try { //更新列表 UpdateServerList(); } catch (Exception e) { e.printStackTrace(); } } } }); //创建父节点 Stat stat = zk.exists(PRAENT_NODE, false); if (stat == null){ zk.create(PRAENT_NODE,"父节点".getBytes(),ACL, CreateMode.PERSISTENT); } UpdateServerList(); } private void UpdateServerList() throws Exception{ List<String> newServerList = new ArrayList<>(); List<String> children = zk.getChildren(PRAENT_NODE,true); children.forEach(child->{ try { //获取当前子节点的目录 byte[] data = zk.getData(PRAENT_NODE + "/" + child, false, null); newServerList.add(new String(data)); serverlist = newServerList; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }); serverlist.forEach(list->{ System.out.println(list); }); } //关闭资源 public void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { DynamicUpDown server = new DynamicUpDown(); try { server.Init(); Thread.sleep(Long.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); }finally { server.close(); } } }

3.3同步线程锁

//抽取任务接口 public interface CustomTask { void doSomething(); } //定义自己的任务 public class Mytask implements CustomTask{ private String name; public Mytask(String name){ this.name =name; } @Override public void doSomething() { for (int i = 1;i<=5;i++){ System.out.println("做事情"+i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } /* 指定个数的客户端访问服务器的资源 1.上线就向zookeeper客户端注册 2.判断是否只有一个客户端工作,若只有一个,便可以处理业务 3.获取父节点下注册的所有的锁,通过判断自己是否是号码最小的那把锁,如果是则可以处理业务 */ public class DistributeLock { private ZooKeeper zk= null; private CustomTask task = null; private final List<org.apache.zookeeper.data.ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; private final int sessionTimeOut = 5000; private final String parent_node = "/locks"; private String connectstring = null; private volatile String currentPath = null; public DistributeLock(String connectstring){ this.connectstring = connectstring; } public DistributeLock(){ this("bigdata1:2181,bigdata2:2181,bigdata3:2181"); } public void setTask(CustomTask task){ this.task = task; } //获取客户端 public void getClient() throws Exception{ zk =new ZooKeeper(connectstring,sessionTimeOut,event->{ //监听子节点的变化 if (event.getType()==Watcher.Event.EventType.NodeChildrenChanged&& event.getPath().equals(parent_node)){ try { //拿到所有的子节点 List<String> child = zk.getChildren(parent_node,true); //判断自己是否是最小的节点 String currentNode = currentPath.substring(parent_node.length() + 1); //排序 Collections.sort(child); if (child.indexOf(currentNode)==0){ task.doSomething(); //释放锁 deleteLock(); //注册新锁 registerLock(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } //注册锁 public void registerLock()throws Exception{ //使用CreateMode.EPHEMERAL_SEQUENTIAL临时顺序型 currentPath = zk.create(parent_node+"/lock",null,ACL,CreateMode.EPHEMERAL_SEQUENTIAL); } //判断是否只有一个节点在线,若只有自己一个节点,则调用业务处理的方法 public void watchParent() throws Exception{ List<String> children = zk.getChildren(parent_node, false); if (children!=null&&children.size()==1){ task.doSomething(); deleteLock(); }else { Thread.sleep(Long.MAX_VALUE); } } public void deleteLock() throws Exception{ zk.delete(currentPath,-1); } } //测试 public class Test { public static void main(String[] args) throws Exception{ //获取客户端连接 DistributeLock distributeLock = new DistributeLock(); CustomTask customTask = new Mytask(UUID.randomUUID().toString()); //设置任务 distributeLock.setTask(customTask); distributeLock.getClient(); //注册锁 distributeLock.registerLock(); //监听父节点 distributeLock.watchParent(); } }
最新回复(0)