Java 客户端开发
本章介绍如何使用 Java 客户端开发 ZooKeeper 应用,包括原生 API 和 Curator 框架。
原生 Java API
添加依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.4</version>
</dependency>
创建连接
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
public class ZooKeeperConnection {
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;
private ZooKeeper zk;
public void connect() throws Exception {
Watcher watcher = event -> {
System.out.println("收到事件: " + event.getType() +
", 状态: " + event.getState());
};
zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, watcher);
System.out.println("连接状态: " + zk.getState());
System.out.println("会话 ID: " + zk.getSessionId());
System.out.println("会话超时: " + zk.getSessionTimeout());
}
public void close() throws Exception {
if (zk != null) {
zk.close();
}
}
public ZooKeeper getZooKeeper() {
return zk;
}
}
创建节点
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class CreateNodeExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
String path1 = zk.create(
"/persistent-node",
"持久节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
System.out.println("创建持久节点: " + path1);
String path2 = zk.create(
"/ephemeral-node",
"临时节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL
);
System.out.println("创建临时节点: " + path2);
String path3 = zk.create(
"/sequential-",
"顺序节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL
);
System.out.println("创建顺序节点: " + path3);
String path4 = zk.create(
"/ephemeral-sequential-",
"临时顺序节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);
System.out.println("创建临时顺序节点: " + path4);
connection.close();
}
}
读取数据
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
public class GetDataExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
Stat stat = new Stat();
byte[] data = zk.getData("/persistent-node", false, stat);
System.out.println("数据: " + new String(data));
System.out.println("版本: " + stat.getVersion());
System.out.println("数据长度: " + stat.getDataLength());
System.out.println("创建时间: " + new java.util.Date(stat.getCtime()));
System.out.println("修改时间: " + new java.util.Date(stat.getMtime()));
connection.close();
}
}
更新数据
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
public class SetDataExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
Stat currentStat = new Stat();
zk.getData("/persistent-node", false, currentStat);
Stat newStat = zk.setData(
"/persistent-node",
"更新后的数据".getBytes(),
currentStat.getVersion()
);
System.out.println("新版本: " + newStat.getVersion());
connection.close();
}
}
删除节点
import org.apache.zookeeper.ZooKeeper;
public class DeleteNodeExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
zk.delete("/persistent-node", -1);
System.out.println("节点已删除");
connection.close();
}
}
列出子节点
import org.apache.zookeeper.ZooKeeper;
public class ListChildrenExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
List<String> children = zk.getChildren("/", false);
System.out.println("根节点下的子节点:");
for (String child : children) {
System.out.println(" " + child);
}
connection.close();
}
}
检查节点是否存在
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
public class ExistsExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
Stat stat = zk.exists("/test-node", false);
if (stat != null) {
System.out.println("节点存在,版本: " + stat.getVersion());
} else {
System.out.println("节点不存在");
}
connection.close();
}
}
使用 Watcher
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class WatcherExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
Watcher dataWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("事件类型: " + event.getType());
System.out.println("事件路径: " + event.getPath());
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
byte[] newData = zk.getData(event.getPath(), this, null);
System.out.println("新数据: " + new String(newData));
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
byte[] data = zk.getData("/config", dataWatcher, null);
System.out.println("当前数据: " + new String(data));
System.out.println("等待数据变更...");
Thread.sleep(Long.MAX_VALUE);
}
}
异步操作
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class AsyncExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();
AsyncCallback.DataCallback dataCallback = (rc, path, ctx, data, stat) -> {
System.out.println("异步获取数据完成");
System.out.println("路径: " + path);
System.out.println("结果码: " + rc);
System.out.println("数据: " + new String(data));
System.out.println("版本: " + stat.getVersion());
};
zk.getData("/config", false, dataCallback, "context-data");
AsyncCallback.Create2Callback createCallback = (rc, path, ctx, name, stat) -> {
System.out.println("异步创建节点完成");
System.out.println("创建的节点: " + name);
};
zk.create("/async-node", "data".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
createCallback, "context");
Thread.sleep(5000);
connection.close();
}
}
Curator 框架
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,提供了更高级的 API 和常用配方的实现。
添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>
创建客户端
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorClient {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("myapp") // 命名空间,所有操作都在 /myapp 下
.build();
client.start();
System.out.println("客户端已启动");
client.close();
}
}
CRUD 操作
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorCrudExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
String path = "/curator-test";
if (client.checkExists().forPath(path) != null) {
client.delete().forPath(path);
}
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, "hello curator".getBytes());
System.out.println("节点创建成功");
byte[] data = client.getData().forPath(path);
System.out.println("数据: " + new String(data));
client.setData().forPath(path, "updated data".getBytes());
System.out.println("数据更新成功");
client.delete().forPath(path);
System.out.println("节点删除成功");
client.close();
}
}
异步操作
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
public class CuratorAsyncExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
BackgroundCallback callback = (CuratorFramework client1, CuratorEvent event) -> {
System.out.println("事件类型: " + event.getType());
System.out.println("路径: " + event.getPath());
System.out.println("结果码: " + event.getResultCode());
};
client.create()
.creatingParentsIfNeeded()
.inBackground(callback)
.forPath("/async-node", "data".getBytes());
Thread.sleep(3000);
client.close();
}
}
事务操作
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
public class CuratorTransactionExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
Collection<CuratorTransactionResult> results = client.inTransaction()
.create().forPath("/tx-node1", "data1".getBytes())
.and()
.create().forPath("/tx-node2", "data2".getBytes())
.and()
.setData().forPath("/tx-node1", "updated1".getBytes())
.and()
.commit();
for (CuratorTransactionResult result : results) {
System.out.println("操作: " + result.getType() +
", 路径: " + result.getForPath() +
", 结果: " + result.getResultPath());
}
client.close();
}
}
缓存机制
Curator 提供了多种缓存实现:
NodeCache - 节点数据缓存:
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
public class NodeCacheExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
String path = "/config";
client.create().orSetData().forPath(path, "initial".getBytes());
NodeCache nodeCache = new NodeCache(client, path);
nodeCache.start();
nodeCache.getListenable().addListener(() -> {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("数据变更: " + new String(data));
});
client.setData().forPath(path, "update1".getBytes());
Thread.sleep(1000);
client.setData().forPath(path, "update2".getBytes());
Thread.sleep(1000);
nodeCache.close();
client.close();
}
}
PathChildrenCache - 子节点缓存:
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
public class PathChildrenCacheExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
String parentPath = "/services";
client.create().creatingParentsIfNeeded().forPath(parentPath);
PathChildrenCache cache = new PathChildrenCache(client, parentPath, true);
cache.start();
PathChildrenCacheListener listener = (client1, event) -> {
System.out.println("事件类型: " + event.getType());
if (event.getData() != null) {
System.out.println("路径: " + event.getData().getPath());
System.out.println("数据: " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(listener);
client.create().forPath(parentPath + "/service1", "data1".getBytes());
Thread.sleep(500);
client.create().forPath(parentPath + "/service2", "data2".getBytes());
Thread.sleep(500);
client.delete().forPath(parentPath + "/service1");
Thread.sleep(500);
cache.close();
client.close();
}
}
TreeCache - 树形缓存:
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public class TreeCacheExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
String path = "/tree";
client.create().creatingParentsIfNeeded().forPath(path);
TreeCache treeCache = new TreeCache(client, path);
treeCache.start();
TreeCacheListener listener = (client1, event) -> {
System.out.println("事件: " + event.getType() +
", 路径: " + (event.getData() != null ? event.getData().getPath() : "null"));
};
treeCache.getListenable().addListener(listener);
client.create().forPath(path + "/child1", "data1".getBytes());
Thread.sleep(500);
client.create().forPath(path + "/child1/sub1", "subdata1".getBytes());
Thread.sleep(500);
client.setData().forPath(path + "/child1", "updated1".getBytes());
Thread.sleep(500);
treeCache.close();
client.close();
}
}
连接状态处理
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
public class ConnectionStateExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
ConnectionStateListener listener = (client1, newState) -> {
switch (newState) {
case CONNECTED:
System.out.println("已连接");
break;
case RECONNECTED:
System.out.println("重新连接成功");
break;
case SUSPENDED:
System.out.println("连接暂停");
break;
case LOST:
System.out.println("连接丢失");
break;
case READ_ONLY:
System.out.println("只读模式");
break;
}
};
client.getConnectionStateListenable().addListener(listener);
client.start();
Thread.sleep(Long.MAX_VALUE);
}
}
重试策略
Curator 提供了多种重试策略:
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
public class RetryPolicyExample {
public static void main(String[] args) {
ExponentialBackoffRetry exponentialRetry = new ExponentialBackoffRetry(
1000, // 初始休眠时间
3, // 最大重试次数
5000 // 最大休眠时间
);
RetryNTimes retryNTimes = new RetryNTimes(3, 1000);
RetryForever retryForever = new RetryForever(1000);
RetryOneTime retryOneTime = new RetryOneTime(1000);
}
}
小结
本章学习了 ZooKeeper 的 Java 客户端开发:
- 原生 API:基本的 CRUD 操作、Watcher、异步操作
- Curator 框架:更高级的 API、流式编程风格
- 缓存机制:NodeCache、PathChildrenCache、TreeCache
- 连接状态处理:监听连接状态变化
- 重试策略:多种重试策略选择
下一章我们将学习 ZooKeeper 的典型应用场景实现。