ZooKeeper使用-java

mac2024-04-09  29

Zookeeper 介绍

Zookeeper 是由雅虎捐献给Apache的一个开源的一个分布式的分布式协调组件,是一个高可用的分布式协调组件,基于Google Chubby(不开源),主要解决了分布式一致性问题与分布式锁。在目前常见的分布式组件中都有使用,比如Dubbo,本篇文章就是简单介绍下在Java环境下如何使用Zookeeper.

使用

针对Zookeeper比较常见的Java客户端有zkclient、curator,由于Curator对Zookeeper的抽象层次比较高,简化了Zookeeper客户端的开发量,使得curator逐步被广泛使用,这里我们先引入curator包

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency>

建立连接

private static String ip="192.168.1.1:2181,192.168.1.2:2181"; public static void main( String[] args ) throws Exception { CuratorFramework curatorFramework= CuratorFrameworkFactory.builder() .connectString(ip) //连接地址 .sessionTimeoutMs(5000)//超时时间 .retryPolicy(new ExponentialBackoffRetry(1000,3))//重试策略 ExponentialBackoffRetry衰减策略 .build(); //RetryOneTime 仅仅只重试一次 //RetryUntilElapsed 一直重试知道规定时间 //RetryNTimes 指定最大重试次数 curatorFramework.start(); }

CRUD

private static void createData( CuratorFramework curatorFramework) throws Exception { curatorFramework.create().creatingParentsIfNeeded()//父节点不存在时候添加,实现递归添加 .withMode(CreateMode.PERSISTENT)//节点类型 临时节点不能存在子节点。 .forPath("/data/program","test".getBytes());//是否创建父节点; } private static void updateData( CuratorFramework curatorFramework) throws Exception { curatorFramework.setData() .forPath("/data/program","up".getBytes());//创建或修改的节点,value,传递类型 } private static void deleteData( CuratorFramework curatorFramework) throws Exception { curatorFramework.delete() .deletingChildrenIfNeeded()//递归删除 .forPath("/data/program");//创建或修改的节点,value,传递类型 Stat stat=new Stat(); String value=new String(curatorFramework.getData().storingStatIn(stat).forPath("/data/program")); curatorFramework.delete() .withVersion(stat.getVersion())//版本号 版本号不一致则失败 .forPath("/data/program"); }

节点权限设置

private static void demoOne() throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(ip) //连接地址 .sessionTimeoutMs(5000)//超时时间 .retryPolicy(new ExponentialBackoffRetry(1000, 3))//重试策略 ExponentialBackoffRetry衰减策略 .build(); curatorFramework.start(); List<ACL> list=new ArrayList<>();//用于存储节点权限 //新建ACL,包含了权限与对应操作对象。该代码内容就是 针对该节点,会话用户是admin,密码是password的用户享有只读权限。 digest 是设置该节点的权限粒度 //分别有IP/Digest(密钥)/world(开放)/Super(超级用户) //操作权限可以看下面的代码Perms 。 ACL acl=new ACL(ZooDefs.Perms.READ,new Id("digest", DigestAuthenticationProvider.generateDigest("admin:password"))); list.add(acl); curatorFramework.create().withMode(CreateMode.PERSISTENT).withACL(list).forPath("/auth"); } public interface Perms { int READ = 1 << 0; //读权限 int WRITE = 1 << 1;//写权限 int CREATE = 1 << 2;//创建权限 int DELETE = 1 << 3;//删除权限 int ADMIN = 1 << 4; //管理权限 int ALL = READ | WRITE | CREATE | DELETE | ADMIN; }

节点事件监听

watcher监听机制是Zookeeper中非常重要的特性,基于zookeeper上创建的节点,可以对这些节点绑定监听事件。监听的维度可以分为两种,针对当前节点与针对当前节点子节点。

zookeeper事件事件含义NodeCreated节点创建事件 当前节点NodeChiledrenChanged子节点发生变化,比如创建、删除。NodeDataChanged节点的值发生变化NodeDeleted删除事件

只会监听一次,下次必须重新发起监听。即客户端只会收到一次这样的通知,如果以后这个数据再发生变化,那么之前设置 watch 的客户端不会再次收到消息。如果要实现永久监听,有一个很粗暴的方法,就是循环。 接下来引入curator 事件监听的包。

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>

curator提供了三种Watcher来监听节点的变化

PathChildCache:监听一个路径下子节点的创建、删除、更新NodeCache:监视当前节点的创建、更新、删除,并将节点数据缓存到本地。TreeCache:相当于两者的综合效果,也就是全监听。 private static String ip="192.168.1.1:2181,192.168.1.2:2181"; public static void main(String[] args) throws Exception { CuratorFramework curatorFramework= CuratorFrameworkFactory.builder() .connectString(ip) //连接地址 .sessionTimeoutMs(5000)//超时时间 .retryPolicy(new ExponentialBackoffRetry(1000,3))//重试策略 ExponentialBackoffRetry衰减策略 .build(); curatorFramework.start(); addListenerWithNode(curatorFramework); System.in.read(); } //配置中心 private static void addListenerWithNode(CuratorFramework curatorFramework) throws Exception { NodeCache nodeCache= new NodeCache(curatorFramework,"/watch",false);//指定某个节点去注册事件 NodeCacheListener nodeCacheListener=()->{ System.out.println("receive Node Changed"); System.out.println(nodeCache.getCurrentData().getPath()+"-----"+new String(nodeCache.getCurrentData().getData())); }; nodeCache.getListenable().addListener(nodeCacheListener); nodeCache.start(); } //实现服务注册中心的时候可以对服务做动态感知 private static void addListenerWithChild(CuratorFramework curatorFramework) throws Exception { PathChildrenCache childCache= new PathChildrenCache(curatorFramework,"/watch",true);//指定某个节点去注册事件 PathChildrenCacheListener childCacheListener=(curatorFramework1,pathChildrenCacheEvent)->{ System.out.println(pathChildrenCacheEvent.getType()+"->"+pathChildrenCacheEvent.getData().getData()); }; childCache.getListenable().addListener(childCacheListener); childCache.start(PathChildrenCache.StartMode.NORMAL); }
最新回复(0)