跳到主要内容

Java 客户端开发

本章介绍如何使用 Java 客户端开发 ZooKeeper 应用,包括原生 API 和 Curator 框架。

原生 Java API

添加依赖

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.4</version>
</dependency>

创建连接

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;

public class ZooKeeperConnection {
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;

private ZooKeeper zk;

public void connect() throws Exception {
Watcher watcher = event -> {
System.out.println("收到事件: " + event.getType() +
", 状态: " + event.getState());
};

zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, watcher);

System.out.println("连接状态: " + zk.getState());
System.out.println("会话 ID: " + zk.getSessionId());
System.out.println("会话超时: " + zk.getSessionTimeout());
}

public void close() throws Exception {
if (zk != null) {
zk.close();
}
}

public ZooKeeper getZooKeeper() {
return zk;
}
}

创建节点

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class CreateNodeExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

String path1 = zk.create(
"/persistent-node",
"持久节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
System.out.println("创建持久节点: " + path1);

String path2 = zk.create(
"/ephemeral-node",
"临时节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL
);
System.out.println("创建临时节点: " + path2);

String path3 = zk.create(
"/sequential-",
"顺序节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL
);
System.out.println("创建顺序节点: " + path3);

String path4 = zk.create(
"/ephemeral-sequential-",
"临时顺序节点数据".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);
System.out.println("创建临时顺序节点: " + path4);

connection.close();
}
}

读取数据

import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;

public class GetDataExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

Stat stat = new Stat();
byte[] data = zk.getData("/persistent-node", false, stat);

System.out.println("数据: " + new String(data));
System.out.println("版本: " + stat.getVersion());
System.out.println("数据长度: " + stat.getDataLength());
System.out.println("创建时间: " + new java.util.Date(stat.getCtime()));
System.out.println("修改时间: " + new java.util.Date(stat.getMtime()));

connection.close();
}
}

更新数据

import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;

public class SetDataExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

Stat currentStat = new Stat();
zk.getData("/persistent-node", false, currentStat);

Stat newStat = zk.setData(
"/persistent-node",
"更新后的数据".getBytes(),
currentStat.getVersion()
);
System.out.println("新版本: " + newStat.getVersion());

connection.close();
}
}

删除节点

import org.apache.zookeeper.ZooKeeper;

public class DeleteNodeExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

zk.delete("/persistent-node", -1);
System.out.println("节点已删除");

connection.close();
}
}

列出子节点

import org.apache.zookeeper.ZooKeeper;

public class ListChildrenExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

List<String> children = zk.getChildren("/", false);
System.out.println("根节点下的子节点:");
for (String child : children) {
System.out.println(" " + child);
}

connection.close();
}
}

检查节点是否存在

import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;

public class ExistsExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

Stat stat = zk.exists("/test-node", false);

if (stat != null) {
System.out.println("节点存在,版本: " + stat.getVersion());
} else {
System.out.println("节点不存在");
}

connection.close();
}
}

使用 Watcher

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class WatcherExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

Watcher dataWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("事件类型: " + event.getType());
System.out.println("事件路径: " + event.getPath());

if (event.getType() == Event.EventType.NodeDataChanged) {
try {
byte[] newData = zk.getData(event.getPath(), this, null);
System.out.println("新数据: " + new String(newData));
} catch (Exception e) {
e.printStackTrace();
}
}
}
};

byte[] data = zk.getData("/config", dataWatcher, null);
System.out.println("当前数据: " + new String(data));

System.out.println("等待数据变更...");
Thread.sleep(Long.MAX_VALUE);
}
}

异步操作

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AsyncExample {
public static void main(String[] args) throws Exception {
ZooKeeperConnection connection = new ZooKeeperConnection();
connection.connect();
ZooKeeper zk = connection.getZooKeeper();

AsyncCallback.DataCallback dataCallback = (rc, path, ctx, data, stat) -> {
System.out.println("异步获取数据完成");
System.out.println("路径: " + path);
System.out.println("结果码: " + rc);
System.out.println("数据: " + new String(data));
System.out.println("版本: " + stat.getVersion());
};

zk.getData("/config", false, dataCallback, "context-data");

AsyncCallback.Create2Callback createCallback = (rc, path, ctx, name, stat) -> {
System.out.println("异步创建节点完成");
System.out.println("创建的节点: " + name);
};

zk.create("/async-node", "data".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
createCallback, "context");

Thread.sleep(5000);
connection.close();
}
}

Curator 框架

Curator 是 Netflix 开源的 ZooKeeper 客户端框架,提供了更高级的 API 和常用配方的实现。

添加依赖

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>

创建客户端

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorClient {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("myapp") // 命名空间,所有操作都在 /myapp 下
.build();

client.start();

System.out.println("客户端已启动");

client.close();
}
}

CRUD 操作

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorCrudExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

String path = "/curator-test";

if (client.checkExists().forPath(path) != null) {
client.delete().forPath(path);
}

client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, "hello curator".getBytes());
System.out.println("节点创建成功");

byte[] data = client.getData().forPath(path);
System.out.println("数据: " + new String(data));

client.setData().forPath(path, "updated data".getBytes());
System.out.println("数据更新成功");

client.delete().forPath(path);
System.out.println("节点删除成功");

client.close();
}
}

异步操作

import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;

public class CuratorAsyncExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

BackgroundCallback callback = (CuratorFramework client1, CuratorEvent event) -> {
System.out.println("事件类型: " + event.getType());
System.out.println("路径: " + event.getPath());
System.out.println("结果码: " + event.getResultCode());
};

client.create()
.creatingParentsIfNeeded()
.inBackground(callback)
.forPath("/async-node", "data".getBytes());

Thread.sleep(3000);
client.close();
}
}

事务操作

import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

public class CuratorTransactionExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

Collection<CuratorTransactionResult> results = client.inTransaction()
.create().forPath("/tx-node1", "data1".getBytes())
.and()
.create().forPath("/tx-node2", "data2".getBytes())
.and()
.setData().forPath("/tx-node1", "updated1".getBytes())
.and()
.commit();

for (CuratorTransactionResult result : results) {
System.out.println("操作: " + result.getType() +
", 路径: " + result.getForPath() +
", 结果: " + result.getResultPath());
}

client.close();
}
}

缓存机制

Curator 提供了多种缓存实现:

NodeCache - 节点数据缓存

import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;

public class NodeCacheExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

String path = "/config";
client.create().orSetData().forPath(path, "initial".getBytes());

NodeCache nodeCache = new NodeCache(client, path);
nodeCache.start();

nodeCache.getListenable().addListener(() -> {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("数据变更: " + new String(data));
});

client.setData().forPath(path, "update1".getBytes());
Thread.sleep(1000);
client.setData().forPath(path, "update2".getBytes());
Thread.sleep(1000);

nodeCache.close();
client.close();
}
}

PathChildrenCache - 子节点缓存

import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

public class PathChildrenCacheExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

String parentPath = "/services";
client.create().creatingParentsIfNeeded().forPath(parentPath);

PathChildrenCache cache = new PathChildrenCache(client, parentPath, true);
cache.start();

PathChildrenCacheListener listener = (client1, event) -> {
System.out.println("事件类型: " + event.getType());
if (event.getData() != null) {
System.out.println("路径: " + event.getData().getPath());
System.out.println("数据: " + new String(event.getData().getData()));
}
};

cache.getListenable().addListener(listener);

client.create().forPath(parentPath + "/service1", "data1".getBytes());
Thread.sleep(500);
client.create().forPath(parentPath + "/service2", "data2".getBytes());
Thread.sleep(500);
client.delete().forPath(parentPath + "/service1");
Thread.sleep(500);

cache.close();
client.close();
}
}

TreeCache - 树形缓存

import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

public class TreeCacheExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

String path = "/tree";
client.create().creatingParentsIfNeeded().forPath(path);

TreeCache treeCache = new TreeCache(client, path);
treeCache.start();

TreeCacheListener listener = (client1, event) -> {
System.out.println("事件: " + event.getType() +
", 路径: " + (event.getData() != null ? event.getData().getPath() : "null"));
};

treeCache.getListenable().addListener(listener);

client.create().forPath(path + "/child1", "data1".getBytes());
Thread.sleep(500);
client.create().forPath(path + "/child1/sub1", "subdata1".getBytes());
Thread.sleep(500);
client.setData().forPath(path + "/child1", "updated1".getBytes());
Thread.sleep(500);

treeCache.close();
client.close();
}
}

连接状态处理

import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;

public class ConnectionStateExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

ConnectionStateListener listener = (client1, newState) -> {
switch (newState) {
case CONNECTED:
System.out.println("已连接");
break;
case RECONNECTED:
System.out.println("重新连接成功");
break;
case SUSPENDED:
System.out.println("连接暂停");
break;
case LOST:
System.out.println("连接丢失");
break;
case READ_ONLY:
System.out.println("只读模式");
break;
}
};

client.getConnectionStateListenable().addListener(listener);
client.start();

Thread.sleep(Long.MAX_VALUE);
}
}

重试策略

Curator 提供了多种重试策略:

import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;

public class RetryPolicyExample {
public static void main(String[] args) {
ExponentialBackoffRetry exponentialRetry = new ExponentialBackoffRetry(
1000, // 初始休眠时间
3, // 最大重试次数
5000 // 最大休眠时间
);

RetryNTimes retryNTimes = new RetryNTimes(3, 1000);

RetryForever retryForever = new RetryForever(1000);

RetryOneTime retryOneTime = new RetryOneTime(1000);
}
}

小结

本章学习了 ZooKeeper 的 Java 客户端开发:

  1. 原生 API:基本的 CRUD 操作、Watcher、异步操作
  2. Curator 框架:更高级的 API、流式编程风格
  3. 缓存机制:NodeCache、PathChildrenCache、TreeCache
  4. 连接状态处理:监听连接状态变化
  5. 重试策略:多种重试策略选择

下一章我们将学习 ZooKeeper 的典型应用场景实现。