会话与监听机制
会话和 Watcher 是 ZooKeeper 的两个核心机制,理解它们对于正确使用 ZooKeeper 至关重要。
会话机制
什么是会话?
会话是客户端与 ZooKeeper 服务器之间的 TCP 长连接。当客户端连接到 ZooKeeper 集群时,会建立一个会话,会话有唯一的 ID 和超时时间。
┌─────────────────────────────────────────────────────────────┐
│ 会话生命周期 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ CONNECTING│───▶│ CONNECTED │───▶│ CLOSED │ │
│ └──────────┘ └─────┬────┘ └──────────┘ │
│ ▲ │ │
│ │ ▼ │
│ │ ┌──────────┐ │
│ └─────────│DISCONNECTED│ │
│ └──────────┘ │
│ │
│ 状态说明: │
│ CONNECTING - 正在连接 │
│ CONNECTED - 已连接 │
│ DISCONNECTED - 连接断开(可重连) │
│ CLOSED - 会话关闭(不可恢复) │
│ │
└─────────────────────────────────────────────────────────────┘
会话状态
| 状态 | 说明 |
|---|---|
CONNECTING | 正在尝试连接服务器 |
CONNECTED | 已成功连接到服务器 |
DISCONNECTED | 连接断开,正在尝试重连 |
EXPIRED | 会话已过期,无法恢复 |
AUTH_FAILED | 认证失败 |
CLOSED | 会话已关闭 |
会话创建
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class SessionDemo {
public static void main(String[] args) throws Exception {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event state: " + event.getState());
if (event.getState() == KeeperState.SyncConnected) {
System.out.println("会话建立成功");
} else if (event.getState() == KeeperState.Expired) {
System.out.println("会话已过期");
} else if (event.getState() == KeeperState.Disconnected) {
System.out.println("连接断开");
}
}
};
ZooKeeper zk = new ZooKeeper(
"localhost:2181",
3000,
watcher
);
System.out.println("Session ID: " + zk.getSessionId());
System.out.println("Session Timeout: " + zk.getSessionTimeout());
Thread.sleep(Long.MAX_VALUE);
}
}
会话参数
会话超时时间:
ZooKeeper zk = new ZooKeeper(
"localhost:2181",
3000,
watcher
);
超时时间的限制:
最小超时时间 = 2 * tickTime(服务器配置)
最大超时时间 = 20 * tickTime(服务器配置)
如果客户端请求的超时时间超出范围,服务器会返回实际可用的超时时间
连接字符串:
// 单服务器
"localhost:2181"
// 多服务器(集群)
"server1:2181,server2:2181,server3:2181"
// 带 chroot 后缀
"server1:2181,server2:2181/app/root"
// 所有路径都相对于 /app/root
会话保活
客户端会自动发送心跳来保持会话:
┌─────────────────────────────────────────────────────────────┐
│ 会话心跳机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 服务器 │
│ │ │ │
│ │──── PING ────────────────────▶│ │
│ │ │ │
│ │◀─── PONG ────────────────────│ │
│ │ │ │
│ │ (间隔:timeout/3) │ │
│ │ │ │
│ │──── PING ────────────────────▶│ │
│ │ │ │
│ │◀─── PONG ────────────────────│ │
│ │ │ │
│ │
│ 心跳间隔 = sessionTimeout / 3 │
│ 如果超过 sessionTimeout 未收到响应,会话过期 │
│ │
└─────────────────────────────────────────────────────────────┘
会话迁移
当客户端与当前服务器断开连接时,会自动尝试连接到其他服务器:
┌─────────────────────────────────────────────────────────────┐
│ 会话迁移 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 │
│ │ │
│ │ 连接 Server1 ────▶ 连接失败 │
│ │ │ │
│ │ ▼ │
│ │ 尝试 Server2 ────▶ 连接成功 │
│ │ │ │
│ │ ▼ │
│ │ 发送 sessionId + password │
│ │ │ │
│ │ ▼ │
│ │ 服务器验证后恢复会话 │
│ │ │
│ │
│ 注意: │
│ - 会话迁移期间,临时节点仍然存在 │
│ - Watcher 需要重新注册 │
│ - 迁移失败会导致会话过期 │
│ │
└─────────────────────────────────────────────────────────────┘
本地会话
ZooKeeper 3.5.0 引入了本地会话,用于减少全局会话的开销:
# 启用本地会话
localSessionsEnabled=true
# 允许本地会话升级为全局会话
localSessionsUpgradingEnabled=true
本地会话的特点:
- 创建和关闭不需要仲裁确认
- 不能创建临时节点(除非升级为全局会话)
- 会话信息只保存在连接的服务器上
Watcher 机制
什么是 Watcher?
Watcher 是 ZooKeeper 提供的事件通知机制。客户端可以在 znode 上注册 Watcher,当 znode 发生变化时,ZooKeeper 会向客户端发送通知。
┌─────────────────────────────────────────────────────────────┐
│ Watcher 工作流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 服务器 │
│ │ │ │
│ │──── getData(path, watch=true)▶│ │
│ │ │ │
│ │◀─── 数据 + 注册成功 ──────────│ │
│ │ │ │
│ │ (等待事件) │ │
│ │ │ │
│ │ │◀── setData(path) │
│ │ │ │
│ │◀─── Watcher 通知 ─────────────│ │
│ │ │ │
│ │──── getData(path, watch=true)▶│ // 重新注册 │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────┘
Watcher 的特性
一次性触发:
标准 Watcher 是一次性的,触发后需要重新注册:
// Watcher 触发后自动移除
// 需要再次调用 getData/setData 等方法重新注册
异步通知:
Watcher 通知是异步发送的,可能存在延迟:
事件顺序:
1. 客户端 A 修改数据
2. 服务器处理修改
3. 服务器向客户端 B 发送通知
4. 客户端 B 收到通知
注意:客户端 B 可能在收到通知前看到旧数据
轻量级:
Watcher 只通知事件类型,不包含变化后的数据:
// Watcher 事件只包含:
// - 事件类型(创建、删除、修改、子节点变化)
// - 节点路径
// 需要再次获取数据
Watcher 事件类型
| 事件类型 | 触发条件 | 注册方法 |
|---|---|---|
NodeCreated | 节点被创建 | exists() |
NodeDeleted | 节点被删除 | exists(), getData(), getChildren() |
NodeDataChanged | 节点数据变更 | exists(), getData() |
NodeChildrenChanged | 子节点变更 | getChildren() |
使用 Watcher
方式一:实现 Watcher 接口
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
public class MyWatcher implements Watcher {
private ZooKeeper zk;
@Override
public void process(WatchedEvent event) {
System.out.println("Event type: " + event.getType());
System.out.println("Event path: " + event.getPath());
System.out.println("Event state: " + event.getState());
// 处理不同类型的事件
switch (event.getType()) {
case NodeCreated:
System.out.println("节点创建: " + event.getPath());
break;
case NodeDeleted:
System.out.println("节点删除: " + event.getPath());
break;
case NodeDataChanged:
System.out.println("数据变更: " + event.getPath());
// 重新注册 Watcher 并获取新数据
try {
byte[] data = zk.getData(event.getPath(), this, null);
System.out.println("新数据: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
break;
case NodeChildrenChanged:
System.out.println("子节点变更: " + event.getPath());
break;
case None:
// 连接状态变化
handleStateChange(event.getState());
break;
}
}
private void handleStateChange(KeeperState state) {
switch (state) {
case SyncConnected:
System.out.println("已连接");
break;
case Disconnected:
System.out.println("连接断开");
break;
case Expired:
System.out.println("会话过期");
break;
case AuthFailed:
System.out.println("认证失败");
break;
}
}
}
方式二:使用 Lambda 表达式
Watcher watcher = event -> {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
};
zk.exists("/config", watcher);
方式三:使用布尔值
// 使用默认 Watcher
zk.getData("/config", true, null); // true 表示使用构造 ZooKeeper 时传入的 Watcher
注册 Watcher 的方法
exists() - 监听节点创建和删除
public class ExistsWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);
Stat stat = zk.exists("/new-node", event -> {
switch (event.getType()) {
case NodeCreated:
System.out.println("节点创建");
break;
case NodeDeleted:
System.out.println("节点删除");
break;
case NodeDataChanged:
System.out.println("数据变更");
break;
}
});
if (stat == null) {
System.out.println("节点不存在");
}
Thread.sleep(Long.MAX_VALUE);
}
}
getData() - 监听数据变更和删除
public class DataWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);
byte[] data = zk.getData("/config", event -> {
if (event.getType() == Event.EventType.NodeDataChanged) {
System.out.println("数据已变更,重新获取");
try {
byte[] newData = zk.getData(event.getPath(), true, null);
System.out.println("新数据: " + new String(newData));
} catch (Exception e) {
e.printStackTrace();
}
} else if (event.getType() == Event.EventType.NodeDeleted) {
System.out.println("节点已删除");
}
}, null);
System.out.println("当前数据: " + new String(data));
Thread.sleep(Long.MAX_VALUE);
}
}
getChildren() - 监听子节点变更
public class ChildrenWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);
List<String> children = zk.getChildren("/services", event -> {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
System.out.println("子节点已变更");
try {
List<String> newChildren = zk.getChildren(event.getPath(), true);
System.out.println("当前子节点: " + newChildren);
} catch (Exception e) {
e.printStackTrace();
}
}
});
System.out.println("当前子节点: " + children);
Thread.sleep(Long.MAX_VALUE);
}
}
永久监听器
ZooKeeper 3.6.0 引入了永久监听器,触发后不会被移除:
import org.apache.zookeeper.AddWatchMode;
public class PersistentWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);
// 添加永久监听器
zk.addWatch("/config", event -> {
System.out.println("Event: " + event.getType());
// 不需要重新注册
}, AddWatchMode.PERSISTENT);
// 添加递归永久监听器
zk.addWatch("/services", event -> {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
}, AddWatchMode.PERSISTENT_RECURSIVE);
Thread.sleep(Long.MAX_VALUE);
}
}
移除 Watcher
import org.apache.zookeeper.Watcher.WatcherType;
public class RemoveWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);
Watcher watcher = event -> {
System.out.println("Event: " + event.getType());
};
// 注册 Watcher
zk.getData("/config", watcher, null);
// 移除指定 Watcher
zk.removeWatches("/config", watcher, WatcherType.Data, false);
// 移除所有数据 Watcher
zk.removeAllWatches("/config", WatcherType.Data, false);
}
}
Watcher 使用注意事项
1. 一次性 Watcher 需要重新注册
// 错误示例:忘记重新注册
zk.getData("/config", event -> {
System.out.println("数据变更");
// 忘记重新注册,后续变更不会收到通知
}, null);
// 正确示例:重新注册
zk.getData("/config", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
byte[] data = zk.getData(event.getPath(), this, null);
System.out.println("新数据: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, null);
2. 处理连接断开
public class ConnectionAwareWatcher implements Watcher {
private ZooKeeper zk;
private String path;
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
// 会话过期,需要重新创建 ZooKeeper 实例
System.out.println("会话过期,需要重连");
reconnect();
} else if (event.getState() == KeeperState.Disconnected) {
// 连接断开,等待重连
System.out.println("连接断开,等待重连");
} else if (event.getState() == KeeperState.SyncConnected) {
// 连接成功,重新注册 Watcher
System.out.println("连接成功,重新注册 Watcher");
registerWatcher();
}
}
private void reconnect() {
// 重新创建 ZooKeeper 实例
}
private void registerWatcher() {
try {
zk.getData(path, this, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 可能丢失事件
在 Watcher 触发和重新注册之间,可能发生多次变更:
// 问题:可能丢失中间状态
zk.getData("/config", event -> {
// 这里获取的是最新数据,中间的变更可能丢失
try {
byte[] data = zk.getData(event.getPath(), true, null);
} catch (Exception e) {}
}, null);
// 解决方案:根据业务需求处理
// 1. 如果只关心最终状态,可以忽略
// 2. 如果需要所有变更,考虑使用版本号或其他机制
4. Watcher 数量限制
// 大量 Watcher 可能影响性能
// 建议:
// 1. 避免不必要的 Watcher
// 2. 使用永久递归 Watcher 替代多个单独的 Watcher
// 3. 合理设置 Watcher 回调逻辑
异步 API
ZooKeeper 提供异步 API,避免阻塞主线程:
import org.apache.zookeeper.AsyncCallback;
public class AsyncApiDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);
// 异步获取数据
zk.getData("/config", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx,
byte[] data, Stat stat) {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("数据: " + new String(data));
System.out.println("版本: " + stat.getVersion());
} else {
System.out.println("错误码: " + rc);
}
}
}, "context");
// 异步创建节点
zk.create("/async-node", "data".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx,
String name, Stat stat) {
System.out.println("创建结果: " + name);
}
}, "context");
Thread.sleep(5000);
}
}
小结
本章深入学习了 ZooKeeper 的会话和 Watcher 机制:
- 会话机制:TCP 长连接,有超时时间,支持迁移
- 会话状态:CONNECTING、CONNECTED、DISCONNECTED、EXPIRED 等
- Watcher 机制:事件通知,一次性触发,异步通知
- 事件类型:NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChanged
- 永久监听器:触发后不移除,支持递归监听
- 异步 API:非阻塞操作,使用回调处理结果
下一章我们将学习 ZooKeeper 的 ACL 权限控制。