跳到主要内容

会话与监听机制

会话和 Watcher 是 ZooKeeper 的两个核心机制,理解它们对于正确使用 ZooKeeper 至关重要。

会话机制

什么是会话?

会话是客户端与 ZooKeeper 服务器之间的 TCP 长连接。当客户端连接到 ZooKeeper 集群时,会建立一个会话,会话有唯一的 ID 和超时时间。

┌─────────────────────────────────────────────────────────────┐
│ 会话生命周期 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ CONNECTING│───▶│ CONNECTED │───▶│ CLOSED │ │
│ └──────────┘ └─────┬────┘ └──────────┘ │
│ ▲ │ │
│ │ ▼ │
│ │ ┌──────────┐ │
│ └─────────│DISCONNECTED│ │
│ └──────────┘ │
│ │
│ 状态说明: │
│ CONNECTING - 正在连接 │
│ CONNECTED - 已连接 │
│ DISCONNECTED - 连接断开(可重连) │
│ CLOSED - 会话关闭(不可恢复) │
│ │
└─────────────────────────────────────────────────────────────┘

会话状态

状态说明
CONNECTING正在尝试连接服务器
CONNECTED已成功连接到服务器
DISCONNECTED连接断开,正在尝试重连
EXPIRED会话已过期,无法恢复
AUTH_FAILED认证失败
CLOSED会话已关闭

会话创建

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class SessionDemo {
public static void main(String[] args) throws Exception {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event state: " + event.getState());

if (event.getState() == KeeperState.SyncConnected) {
System.out.println("会话建立成功");
} else if (event.getState() == KeeperState.Expired) {
System.out.println("会话已过期");
} else if (event.getState() == KeeperState.Disconnected) {
System.out.println("连接断开");
}
}
};

ZooKeeper zk = new ZooKeeper(
"localhost:2181",
3000,
watcher
);

System.out.println("Session ID: " + zk.getSessionId());
System.out.println("Session Timeout: " + zk.getSessionTimeout());

Thread.sleep(Long.MAX_VALUE);
}
}

会话参数

会话超时时间

ZooKeeper zk = new ZooKeeper(
"localhost:2181",
3000,
watcher
);

超时时间的限制:

最小超时时间 = 2 * tickTime(服务器配置)
最大超时时间 = 20 * tickTime(服务器配置)

如果客户端请求的超时时间超出范围,服务器会返回实际可用的超时时间

连接字符串

// 单服务器
"localhost:2181"

// 多服务器(集群)
"server1:2181,server2:2181,server3:2181"

// 带 chroot 后缀
"server1:2181,server2:2181/app/root"
// 所有路径都相对于 /app/root

会话保活

客户端会自动发送心跳来保持会话:

┌─────────────────────────────────────────────────────────────┐
│ 会话心跳机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 服务器 │
│ │ │ │
│ │──── PING ────────────────────▶│ │
│ │ │ │
│ │◀─── PONG ────────────────────│ │
│ │ │ │
│ │ (间隔:timeout/3) │ │
│ │ │ │
│ │──── PING ────────────────────▶│ │
│ │ │ │
│ │◀─── PONG ────────────────────│ │
│ │ │ │
│ │
│ 心跳间隔 = sessionTimeout / 3 │
│ 如果超过 sessionTimeout 未收到响应,会话过期 │
│ │
└─────────────────────────────────────────────────────────────┘

会话迁移

当客户端与当前服务器断开连接时,会自动尝试连接到其他服务器:

┌─────────────────────────────────────────────────────────────┐
│ 会话迁移 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 │
│ │ │
│ │ 连接 Server1 ────▶ 连接失败 │
│ │ │ │
│ │ ▼ │
│ │ 尝试 Server2 ────▶ 连接成功 │
│ │ │ │
│ │ ▼ │
│ │ 发送 sessionId + password │
│ │ │ │
│ │ ▼ │
│ │ 服务器验证后恢复会话 │
│ │ │
│ │
│ 注意: │
│ - 会话迁移期间,临时节点仍然存在 │
│ - Watcher 需要重新注册 │
│ - 迁移失败会导致会话过期 │
│ │
└─────────────────────────────────────────────────────────────┘

本地会话

ZooKeeper 3.5.0 引入了本地会话,用于减少全局会话的开销:

# 启用本地会话
localSessionsEnabled=true

# 允许本地会话升级为全局会话
localSessionsUpgradingEnabled=true

本地会话的特点:

  • 创建和关闭不需要仲裁确认
  • 不能创建临时节点(除非升级为全局会话)
  • 会话信息只保存在连接的服务器上

Watcher 机制

什么是 Watcher?

Watcher 是 ZooKeeper 提供的事件通知机制。客户端可以在 znode 上注册 Watcher,当 znode 发生变化时,ZooKeeper 会向客户端发送通知。

┌─────────────────────────────────────────────────────────────┐
│ Watcher 工作流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 客户端 服务器 │
│ │ │ │
│ │──── getData(path, watch=true)▶│ │
│ │ │ │
│ │◀─── 数据 + 注册成功 ──────────│ │
│ │ │ │
│ │ (等待事件) │ │
│ │ │ │
│ │ │◀── setData(path) │
│ │ │ │
│ │◀─── Watcher 通知 ─────────────│ │
│ │ │ │
│ │──── getData(path, watch=true)▶│ // 重新注册 │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────┘

Watcher 的特性

一次性触发

标准 Watcher 是一次性的,触发后需要重新注册:

// Watcher 触发后自动移除
// 需要再次调用 getData/setData 等方法重新注册

异步通知

Watcher 通知是异步发送的,可能存在延迟:

事件顺序:
1. 客户端 A 修改数据
2. 服务器处理修改
3. 服务器向客户端 B 发送通知
4. 客户端 B 收到通知

注意:客户端 B 可能在收到通知前看到旧数据

轻量级

Watcher 只通知事件类型,不包含变化后的数据:

// Watcher 事件只包含:
// - 事件类型(创建、删除、修改、子节点变化)
// - 节点路径
// 需要再次获取数据

Watcher 事件类型

事件类型触发条件注册方法
NodeCreated节点被创建exists()
NodeDeleted节点被删除exists(), getData(), getChildren()
NodeDataChanged节点数据变更exists(), getData()
NodeChildrenChanged子节点变更getChildren()

使用 Watcher

方式一:实现 Watcher 接口

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;

public class MyWatcher implements Watcher {
private ZooKeeper zk;

@Override
public void process(WatchedEvent event) {
System.out.println("Event type: " + event.getType());
System.out.println("Event path: " + event.getPath());
System.out.println("Event state: " + event.getState());

// 处理不同类型的事件
switch (event.getType()) {
case NodeCreated:
System.out.println("节点创建: " + event.getPath());
break;
case NodeDeleted:
System.out.println("节点删除: " + event.getPath());
break;
case NodeDataChanged:
System.out.println("数据变更: " + event.getPath());
// 重新注册 Watcher 并获取新数据
try {
byte[] data = zk.getData(event.getPath(), this, null);
System.out.println("新数据: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
break;
case NodeChildrenChanged:
System.out.println("子节点变更: " + event.getPath());
break;
case None:
// 连接状态变化
handleStateChange(event.getState());
break;
}
}

private void handleStateChange(KeeperState state) {
switch (state) {
case SyncConnected:
System.out.println("已连接");
break;
case Disconnected:
System.out.println("连接断开");
break;
case Expired:
System.out.println("会话过期");
break;
case AuthFailed:
System.out.println("认证失败");
break;
}
}
}

方式二:使用 Lambda 表达式

Watcher watcher = event -> {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
};

zk.exists("/config", watcher);

方式三:使用布尔值

// 使用默认 Watcher
zk.getData("/config", true, null); // true 表示使用构造 ZooKeeper 时传入的 Watcher

注册 Watcher 的方法

exists() - 监听节点创建和删除

public class ExistsWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);

Stat stat = zk.exists("/new-node", event -> {
switch (event.getType()) {
case NodeCreated:
System.out.println("节点创建");
break;
case NodeDeleted:
System.out.println("节点删除");
break;
case NodeDataChanged:
System.out.println("数据变更");
break;
}
});

if (stat == null) {
System.out.println("节点不存在");
}

Thread.sleep(Long.MAX_VALUE);
}
}

getData() - 监听数据变更和删除

public class DataWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);

byte[] data = zk.getData("/config", event -> {
if (event.getType() == Event.EventType.NodeDataChanged) {
System.out.println("数据已变更,重新获取");
try {
byte[] newData = zk.getData(event.getPath(), true, null);
System.out.println("新数据: " + new String(newData));
} catch (Exception e) {
e.printStackTrace();
}
} else if (event.getType() == Event.EventType.NodeDeleted) {
System.out.println("节点已删除");
}
}, null);

System.out.println("当前数据: " + new String(data));
Thread.sleep(Long.MAX_VALUE);
}
}

getChildren() - 监听子节点变更

public class ChildrenWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);

List<String> children = zk.getChildren("/services", event -> {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
System.out.println("子节点已变更");
try {
List<String> newChildren = zk.getChildren(event.getPath(), true);
System.out.println("当前子节点: " + newChildren);
} catch (Exception e) {
e.printStackTrace();
}
}
});

System.out.println("当前子节点: " + children);
Thread.sleep(Long.MAX_VALUE);
}
}

永久监听器

ZooKeeper 3.6.0 引入了永久监听器,触发后不会被移除:

import org.apache.zookeeper.AddWatchMode;

public class PersistentWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);

// 添加永久监听器
zk.addWatch("/config", event -> {
System.out.println("Event: " + event.getType());
// 不需要重新注册
}, AddWatchMode.PERSISTENT);

// 添加递归永久监听器
zk.addWatch("/services", event -> {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
}, AddWatchMode.PERSISTENT_RECURSIVE);

Thread.sleep(Long.MAX_VALUE);
}
}

移除 Watcher

import org.apache.zookeeper.Watcher.WatcherType;

public class RemoveWatcherDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);

Watcher watcher = event -> {
System.out.println("Event: " + event.getType());
};

// 注册 Watcher
zk.getData("/config", watcher, null);

// 移除指定 Watcher
zk.removeWatches("/config", watcher, WatcherType.Data, false);

// 移除所有数据 Watcher
zk.removeAllWatches("/config", WatcherType.Data, false);
}
}

Watcher 使用注意事项

1. 一次性 Watcher 需要重新注册

// 错误示例:忘记重新注册
zk.getData("/config", event -> {
System.out.println("数据变更");
// 忘记重新注册,后续变更不会收到通知
}, null);

// 正确示例:重新注册
zk.getData("/config", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
byte[] data = zk.getData(event.getPath(), this, null);
System.out.println("新数据: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, null);

2. 处理连接断开

public class ConnectionAwareWatcher implements Watcher {
private ZooKeeper zk;
private String path;

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
// 会话过期,需要重新创建 ZooKeeper 实例
System.out.println("会话过期,需要重连");
reconnect();
} else if (event.getState() == KeeperState.Disconnected) {
// 连接断开,等待重连
System.out.println("连接断开,等待重连");
} else if (event.getState() == KeeperState.SyncConnected) {
// 连接成功,重新注册 Watcher
System.out.println("连接成功,重新注册 Watcher");
registerWatcher();
}
}

private void reconnect() {
// 重新创建 ZooKeeper 实例
}

private void registerWatcher() {
try {
zk.getData(path, this, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}

3. 可能丢失事件

在 Watcher 触发和重新注册之间,可能发生多次变更:

// 问题:可能丢失中间状态
zk.getData("/config", event -> {
// 这里获取的是最新数据,中间的变更可能丢失
try {
byte[] data = zk.getData(event.getPath(), true, null);
} catch (Exception e) {}
}, null);

// 解决方案:根据业务需求处理
// 1. 如果只关心最终状态,可以忽略
// 2. 如果需要所有变更,考虑使用版本号或其他机制

4. Watcher 数量限制

// 大量 Watcher 可能影响性能
// 建议:
// 1. 避免不必要的 Watcher
// 2. 使用永久递归 Watcher 替代多个单独的 Watcher
// 3. 合理设置 Watcher 回调逻辑

异步 API

ZooKeeper 提供异步 API,避免阻塞主线程:

import org.apache.zookeeper.AsyncCallback;

public class AsyncApiDemo {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null);

// 异步获取数据
zk.getData("/config", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx,
byte[] data, Stat stat) {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("数据: " + new String(data));
System.out.println("版本: " + stat.getVersion());
} else {
System.out.println("错误码: " + rc);
}
}
}, "context");

// 异步创建节点
zk.create("/async-node", "data".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx,
String name, Stat stat) {
System.out.println("创建结果: " + name);
}
}, "context");

Thread.sleep(5000);
}
}

小结

本章深入学习了 ZooKeeper 的会话和 Watcher 机制:

  1. 会话机制:TCP 长连接,有超时时间,支持迁移
  2. 会话状态:CONNECTING、CONNECTED、DISCONNECTED、EXPIRED 等
  3. Watcher 机制:事件通知,一次性触发,异步通知
  4. 事件类型:NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChanged
  5. 永久监听器:触发后不移除,支持递归监听
  6. 异步 API:非阻塞操作,使用回调处理结果

下一章我们将学习 ZooKeeper 的 ACL 权限控制。