tonglin0325的个人主页

Zk学习笔记——ZkClient

ZkClient是开源的zk客户端,对Zookeeper原生的java api进行了封装,实现了诸如session超时重连,watcher反复注册等功能。

依赖的话有

1
2
3
4
5
6
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>

1
2
3
4
5
6
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>

前者使用较为广泛,使用其的项目包括kafka,dubbo等

 

 

1.创建,获取和删除节点

依赖

1
2
3
4
5
6
7
8
9
10
11
12
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh5.16.2</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>

 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.bigdata.zookeeper;

import org.I0Itec.zkclient.ZkClient;

public class ZkClientExample {

public static void main(String[] args) throws Exception {

ZkClient zkClient = new ZkClient("master:2181", 5000);
System.out.println("Zookeeper session established");
String path = "/app4/app4-1";
// true表示可以递归创建节点,重复节点不会报错
zkClient.createPersistent(path, true);
// 获取
System.out.println(zkClient.getChildren("/app4"));

// 删除节点
// zkClient.delete(path);
// 递归删除
zkClient.deleteRecursive("/app4");
}

}

2.zkClient引入了listener,客户端可以通过注册相关的事件监听来实现对ZooKeeper服务端事件的订阅

listener是异步的,可能触发会有延迟

代码,chiild listener,监听子节点是否发生变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.bigdata.zookeeper;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;

public class ZkClientExample {

public static void main(String[] args) throws Exception {

ZkClient zkClient = new ZkClient("master:2181", 5000);
System.out.println("Zookeeper session established");
String path = "/app4/app4-1";

// listener
zkClient.subscribeChildChanges("/app4", new Listerner());

// true表示可以递归创建节点,重复节点不会报错
zkClient.createPersistent(path, true);
// 如果注释的话,listener的执行可能会在删除节点执行之后,所以输出可能是空的子节点
Thread.sleep(10000);
// 获取
System.out.println(zkClient.getChildren("/app4"));


// 删除节点
zkClient.delete(path);

Thread.sleep(10000);
// 递归删除
// zkClient.deleteRecursive("/app4");


}

}

class Listerner implements IZkChildListener {

@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println("parent path: " + s + " changed, current children: " + list);
}
}

输出

 

3.代码,data listener,监听节点的data是否发生变化

ZkClient的默认序列化类是org.I0Itec.zkclient.serialize.SerializableSerializer

在zkui上查看会有乱码,比如

所以重写了序列化类,并且使用writeData和readData修改和读取节点数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.bigdata.zookeeper;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.io.Charsets;

import java.util.List;

public class ZkClientExample {

public static void main(String[] args) throws Exception {

ZkClient zkClient = new ZkClient("master:2181", 5000);
zkClient.setZkSerializer(new MyZkSerializer());
System.out.println("Zookeeper session established");
String path = "/app4/app4-1";

// child listener
zkClient.subscribeChildChanges("/app4", new ChildListerner());
// data listener
zkClient.subscribeDataChanges(path, new DataListerner());

// true表示可以递归创建节点,重复节点不会报错
zkClient.createPersistent(path, true);
// 设置data
zkClient.writeData(path, "12345");
System.out.println(zkClient.readData(path).toString());
// 如果注释的话,listener的执行可能会在删除节点执行之后,所以输出可能是空的子节点
Thread.sleep(10000);
// 获取
System.out.println(zkClient.getChildren("/app4"));


// 删除节点
zkClient.delete(path);

Thread.sleep(10000);
// 递归删除
// zkClient.deleteRecursive("/app4");


}

}

class MyZkSerializer implements ZkSerializer {
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes, Charsets.UTF_8);
}

public byte[] serialize(Object obj) throws ZkMarshallingError {
return String.valueOf(obj).getBytes(Charsets.UTF_8);
}
}


class ChildListerner implements IZkChildListener {

@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println("parent path: " + s + " changed, current children: " + list);
}
}

class DataListerner implements IZkDataListener {


@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("data path: " + s + " changed, current data: " + o);
}

@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("data path: " + s + " deleted");
}
}

输出