文章目录
1. zookeeper客户端命令操作2. zookeeper内部原理2.1 持久化节点和临时节点2.2 Stat结构体2.3 监听原理2.4paxos算法2.5选举机制2.6写数据流程
3.API操作3.1zk客户端操作3.2动态上下线3.3同步线程锁
1. zookeeper客户端命令操作
1.启动zookeeper客户端
zkCli.sh
2.创建普通节点
create /iweb "jianhau"
3.获取节点的值
get /iweb
4.创建短暂节点
create -e /iweb
6.监听节点的变化
ls /iweb watch
5.退出
quit
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v9afaE3X-1570004773361)(11886AE15C5E4EE284904D287986A1BD)]
2. zookeeper内部原理
2.1 持久化节点和临时节点
持久化节点是当客户端和zookeeper断开连接后,该节点依旧存在临时节点是当客户端和zookeeper断开连接后,节点自动删除
2.2 Stat结构体
[zk: localhost:2181(CONNECTED) 5] get /sanguo
jinlian
cZxid = 0x100000003 ==创建节点的事务id==
ctime = Wed Aug 29 00:03:23 CST 2018 ==被创建的好毫秒数==
mZxid = 0x100000003 ==最后更新的事务==
mtime = Wed Aug 29 00:03:23 CST 2018 ==最后更改的毫秒数==
pZxid = 0x100000004 ==最后更新的子节点id==
cversion = 1 ==子节点修改次数==
dataVersion = 0 ==数据变化号==
aclVersion = 0 ==访问被控制列表的变化号==
ephemeralOwner = 0x0 ==如果是临时节点,这个是拥有者的session id 如果不是临时节点就是0==
dataLength = 7 ==数据长度==
numChildren = 1 ==子节点数量==
2.3 监听原理
首先创建一个main线程在main线程中创建一个客户端,这是就会创建两个线程connect,Listener,一负责通信,一个负责监听通过connect注册监听在zookeeper的注册监听器列表中添加注册的监听事件监听到有数据或者路径变化时通过,就会将消息发送给listener线程listener线程内部调用了process方法
常见的监听
1.监听数据
get path [watch]
2.监听子节点的增减变化
ls path [watch]
2.4paxos算法
当节点发生变化后,会汇报给zkserver,此时zkserver收到一个zxid如果zxid大于自己当前的zxid,先记录下来,然后同步给其他的zkserver如果超过半数的zkserver同意后即生效,更新后同步给其他的zkserver,修改自己的zxid
2.5选举机制
半数机制:集群中半数以上的集群存活,集群可用,所以安装奇数台服务器虽然在配置文件中没有指定的主从,但是会选举产生一个leader和其他的follower如果有五台机器
第一台上线后先投自己一票第二台上线后,先头自己一票,由于他的id比第一台的大,所以第一台改投第二台,此时第一台0票,第二台2票,但是少于3,仍然不能成为leader第三台上线后,此时服务器都会改选票给服务器3,此时他又三票,成为leader,其他状态为follower第四台启动,123不会交换选票信息,第四台只有一票少数服从多数
2.6写数据流程
client向server1发送写数据请求,如果server1不是leader,那么server1会把请求转发给leaderleader把写请求广播给followfollow返回信息并把请求放入待写队列中,并返回成功信息当leader收到半数以上的成功信息后,说明该写操作可以执行leader向各个server发送提交信息,各个server收到后落实写请求,操作成功返回给客户端操作成功
3.API操作
3.1zk客户端操作
public class Zkutils {
private static int Session_Time_Out
= 300000;
private static ZooKeeper zk
= null
;
public static void CreateNodes() throws KeeperException
, InterruptedException
{
String path
= "/test";
byte [] bytes
= "hello zk ".getBytes();
String result
= zk
.create(path
, bytes
, ZooDefs
.Ids
.OPEN_ACL_UNSAFE
, CreateMode
.PERSISTENT
);
System
.out
.println(result
);
}
public static void NodeExist()throws Exception
{
String path
="/test";
Stat stat
= zk
.exists(path
, false);
System
.out
.println(stat
);
}
public static void getData() throws KeeperException
, InterruptedException
{
String path
= "/test";
Stat stat
= new Stat();
byte[] data
= zk
.getData(path
, false, stat
);
System
.out
.println(new String(data
, Charset
.forName("UTF-8")));
}
public static void setData() throws KeeperException
, InterruptedException
{
String path
="/test";
Stat stat
= zk
.setData(path
, "hellozk".getBytes(), -1);
System
.out
.println(stat
);
}
public static void DeleteNode() throws KeeperException
, InterruptedException
{
String path
="/test";
zk
.delete(path
,-1);
}
public static void main(String
[] args
) {
try {
zk
= new ZooKeeper("bigdata1:2181,bigdata2:2181,bigdata3:2181", Session_Time_Out
, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent
) {
System
.out
.println(watchedEvent
.toString());
}
});
DeleteNode();
NodeExist();
} catch (IOException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
} catch (KeeperException e
) {
e
.printStackTrace();
} catch (Exception e
) {
e
.printStackTrace();
}
try {
zk
.close();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
3.2动态上下线
public class DynamicUpDown {
private ZooKeeper zk
= null
;
private static int SESSION_TIME_OUT
= 300000;
private static String HOSTS
= "bigdata1:2181,bigdata2:2181,bigdata3:2181";
private static List
<ACL> ACL
= ZooDefs
.Ids
.OPEN_ACL_UNSAFE
;
private static String PRAENT_NODE
= "/hosts";
private List
<String> serverlist
= new ArrayList<>();
public void Init() throws Exception
{
zk
= new ZooKeeper(HOSTS
, SESSION_TIME_OUT
, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent
) {
String path
= watchedEvent
.getPath();
Event
.EventType type
= watchedEvent
.getType();
System
.out
.println(type
);
if (Event
.EventType
.NodeChildrenChanged
==type
&&path
.equals(PRAENT_NODE
)){
try {
UpdateServerList();
} catch (Exception e
) {
e
.printStackTrace();
}
}
}
});
Stat stat
= zk
.exists(PRAENT_NODE
, false);
if (stat
== null
){
zk
.create(PRAENT_NODE
,"父节点".getBytes(),ACL
, CreateMode
.PERSISTENT
);
}
UpdateServerList();
}
private void UpdateServerList() throws Exception
{
List
<String> newServerList
= new ArrayList<>();
List
<String> children
= zk
.getChildren(PRAENT_NODE
,true);
children
.forEach(child
->{
try {
byte[] data
= zk
.getData(PRAENT_NODE
+ "/" + child
, false, null
);
newServerList
.add(new String(data
));
serverlist
= newServerList
;
} catch (KeeperException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
});
serverlist
.forEach(list
->{
System
.out
.println(list
);
});
}
public void close() {
try {
zk
.close();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
public static void main(String
[] args
) {
DynamicUpDown server
= new DynamicUpDown();
try {
server
.Init();
Thread
.sleep(Long
.MAX_VALUE
);
} catch (Exception e
) {
e
.printStackTrace();
}finally {
server
.close();
}
}
}
3.3同步线程锁
public interface CustomTask {
void doSomething();
}
public class Mytask implements CustomTask{
private String name
;
public Mytask(String name
){
this.name
=name
;
}
@Override
public void doSomething() {
for (int i
= 1;i
<=5;i
++){
System
.out
.println("做事情"+i
);
try {
Thread
.sleep(1000);
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
}
public class DistributeLock {
private ZooKeeper zk
= null
;
private CustomTask task
= null
;
private final List
<org
.apache
.zookeeper
.data
.ACL
> ACL
= ZooDefs
.Ids
.OPEN_ACL_UNSAFE
;
private final int sessionTimeOut
= 5000;
private final String parent_node
= "/locks";
private String connectstring
= null
;
private volatile String currentPath
= null
;
public DistributeLock(String connectstring
){
this.connectstring
= connectstring
;
}
public DistributeLock(){
this("bigdata1:2181,bigdata2:2181,bigdata3:2181");
}
public void setTask(CustomTask task
){
this.task
= task
;
}
public void getClient() throws Exception
{
zk
=new ZooKeeper(connectstring
,sessionTimeOut
,event
->{
if (event
.getType()==Watcher
.Event
.EventType
.NodeChildrenChanged
&&
event
.getPath().equals(parent_node
)){
try {
List
<String> child
= zk
.getChildren(parent_node
,true);
String currentNode
= currentPath
.substring(parent_node
.length() + 1);
Collections
.sort(child
);
if (child
.indexOf(currentNode
)==0){
task
.doSomething();
deleteLock();
registerLock();
}
} catch (KeeperException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
});
}
public void registerLock()throws Exception
{
currentPath
= zk
.create(parent_node
+"/lock",null
,ACL
,CreateMode
.EPHEMERAL_SEQUENTIAL
);
}
public void watchParent() throws Exception
{
List
<String> children
= zk
.getChildren(parent_node
, false);
if (children
!=null
&&children
.size()==1){
task
.doSomething();
deleteLock();
}else {
Thread
.sleep(Long
.MAX_VALUE
);
}
}
public void deleteLock() throws Exception
{
zk
.delete(currentPath
,-1);
}
}
public class Test {
public static void main(String
[] args
) throws Exception
{
DistributeLock distributeLock
= new DistributeLock();
CustomTask customTask
= new Mytask(UUID
.randomUUID().toString());
distributeLock
.setTask(customTask
);
distributeLock
.getClient();
distributeLock
.registerLock();
distributeLock
.watchParent();
}
}