ZooKeeper 分布式协调服务
Apache ZooKeeper 是一个集中式的分布式协调服务,用于维护配置信息、命名服务、分布式同步和组服务。它是 Hadoop、Kafka、HBase 等分布式系统的核心基础设施。
什么是 ZooKeeper?
ZooKeeper 最初是 Yahoo! 为 Hadoop 项目开发的子项目,后来成为 Apache 顶级项目。它的设计目标是简化分布式系统的开发,让开发者无需从头实现复杂的协调逻辑。
核心特性
ZooKeeper 提供以下核心能力:
- 配置管理:集中存储和管理分布式系统的配置信息
- 命名服务:提供统一的命名注册和发现机制
- 分布式同步:实现分布式锁、屏障等同步原语
- 组服务:管理集群成员关系和选举
设计目标
ZooKeeper 的设计遵循以下原则:
- 简单性:提供类似文件系统的树形命名空间
- 高性能:数据存储在内存中,支持高吞吐量
- 有序性:所有更新操作都有全局顺序
- 可靠性:通过复制实现高可用
数据模型
ZNode 节点
ZooKeeper 的数据模型是一个层次化的命名空间,称为"数据树"。每个节点称为 ZNode,可以存储数据并拥有子节点。
/
├── zookeeper(系统保留)
│ └── config
├── hbase
│ ├── meta-region-server
│ └── rs
├── kafka
│ ├── brokers
│ ├── controller
│ └── consumers
└── services
├── service1
└── service2
ZNode 的核心特性:
节点类型
| 类型 | 说明 | 用途 |
|---|---|---|
| 持久节点(Persistent) | 节点创建后一直存在,直到被删除 | 存储配置、元数据 |
| 临时节点(Ephemeral) | 会话结束时节自动删除 | 服务注册、临时状态 |
| 持久顺序节点 | 持久节点 + 自动递增序号 | 分布式队列、选举 |
| 临时顺序节点 | 临时节点 + 自动递增序号 | 分布式锁、选举 |
| 容器节点(Container) | 最后一个子节点被删除后,容器节点成为删除候选 | Leader选举、锁的实现 |
| TTL节点 | 在指定时间内未被修改且无子节点时自动删除 | 临时数据存储 |
容器节点(Container Nodes)
容器节点是 ZooKeeper 3.5.3 引入的特殊节点类型,主要用于实现 Leader 选举、分布式锁等场景:
工作原理:
- 当容器节点的最后一个子节点被删除时,该容器节点会成为删除候选
- 服务器会在未来的某个时间点自动删除该容器节点
使用场景:
- Leader 选举:作为选举节点的父节点,当所有参与者离开后自动清理
- 分布式锁:作为锁节点的父节点,避免锁释放后留下空目录
注意事项:
- 在容器节点内创建子节点时,需要捕获
KeeperException.NoNodeException - 如果容器节点已被删除,需要重新创建
// 创建容器节点
zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);
// 在容器节点内创建子节点时处理可能的删除
try {
zk.create("/locks/lock-", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException.NoNodeException e) {
// 容器节点已被删除,重新创建
zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);
zk.create("/locks/lock-", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
TTL节点
TTL(Time To Live)节点允许为持久节点设置生存时间,在 ZooKeeper 3.5.3 中引入:
特点:
- 只适用于持久节点(PERSISTENT)和持久顺序节点(PERSISTENT_SEQUENTIAL)
- 节点在 TTL 时间内未被修改且没有子节点,将成为删除候选
- TTL 以毫秒为单位
启用方式:
TTL 节点默认禁用,需要在服务器配置中启用:
# 在 zoo.cfg 中添加
znode.container.checkIntervalMs=1000
znode.container.maxPerMinute=10000
使用示例:
// 创建 TTL 节点(需要自定义 CreateMode)
// 注意:标准 ZooKeeper API 不直接支持 TTL,需要使用扩展方法
// 使用 Curator 客户端更方便
CuratorFramework client = ...;
client.create()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.withTtl(3600000) // 1小时 = 3600000毫秒
.forPath("/ttl-node", "data".getBytes());
适用场景:
- 临时配置存储:配置在一定时间后自动清理
- 缓存数据:缓存数据在过期后自动删除
- 会话相关数据:无需使用临时节点但需要自动清理的数据
Stat 结构
每个 ZNode 都包含一个 Stat 结构,记录节点的元数据:
| 字段 | 说明 |
|---|---|
| czxid | 创建节点的事务ID |
| mzxid | 最后修改的事务ID |
| pzxid | 子节点最后修改的事务ID |
| ctime | 创建时间 |
| mtime | 最后修改时间 |
| version | 数据版本号 |
| cversion | 子节点版本号 |
| aversion | ACL版本号 |
| ephemeralOwner | 临时节点的会话ID |
| dataLength | 数据长度 |
| numChildren | 子节点数量 |
数据访问特点
ZooKeeper 的数据访问有几个重要特点:
原子性:读写操作都是原子的。读取会获取节点的所有数据,写入会替换所有数据。
数据大小限制:每个 ZNode 的数据限制在 1MB 以内。ZooKeeper 设计用于存储协调数据(配置、状态、元数据),而非大数据存储。
版本控制:更新和删除操作可以指定版本号,实现乐观并发控制。如果版本不匹配,操作会失败。
会话机制
会话状态
客户端与 ZooKeeper 服务器建立连接后创建会话,会话有以下状态:
| 状态 | 说明 |
|---|---|
| CONNECTING | 正在连接服务器 |
| CONNECTED | 已连接,可以正常操作 |
| RECONNECTING | 连接断开后正在重连 |
| EXPIRED | 会话过期,需要重新创建 |
会话状态转换遵循以下规则:
┌─────────────┐ 连接成功 ┌─────────────┐
│ CONNECTING │ ───────────────→ │ CONNECTED │
└─────────────┘ └─────────────┘
↑ │
│ │
│ 连接断开 │
└────────────────────────────────┤
│
│ 会话过期
↓
┌─────────────┐
│ EXPIRED │
└─────────────┘
重要说明:
- 会话过期由 ZooKeeper 集群管理,而非客户端
- 当集群在会话超时时间内未收到客户端心跳时,会话过期
- 客户端断开连接后,会话可能仍然有效(如果在超时时间内重连)
- 会话过期时,该会话创建的所有临时节点会被删除
会话超时
会话超时是 ZooKeeper 容错机制的关键:
- 客户端在创建连接时指定超时时间
- 服务器返回实际可用的超时时间
- 客户端定期发送心跳保持会话活跃
- 超时未收到心跳,服务器会清理该会话的所有临时节点
超时时间的选择需要权衡:
- 太短:网络抖动可能导致会话频繁过期
- 太长:故障检测延迟增加
通常建议设置为几秒到几十秒。
本地会话(Local Session)
本地会话是 ZooKeeper 3.5.0 引入的特性,用于解决大规模客户端连接的性能问题。
为什么需要本地会话?
在传统会话模型中:
- 会话的创建和关闭需要仲裁确认
- 会话状态需要在 Leader 和所有 Follower 之间同步
- 当连接数达到数千时,会话管理成为性能瓶颈
本地会话特点
本地会话将会话信息只保存在客户端连接的服务器上:
| 特性 | 传统会话(全局会话) | 本地会话 |
|---|---|---|
| 会话信息存储 | 所有服务器 | 仅连接的服务器 |
| 创建/关闭开销 | 需要仲裁确认 | 本地处理 |
| 临时节点创建 | 支持 | 默认不支持 |
| 会话恢复 | 可在任意服务器恢复 | 只能在原服务器恢复 |
| 适用场景 | 需要临时节点 | 大量短连接、轻量级监听 |
配置启用
# 启用本地会话
localSessionsEnabled=true
# 是否允许本地会话升级为全局会话
localSessionsUpgradingEnabled=true
本地会话升级
当 localSessionsUpgradingEnabled=true 时,本地会话可以自动升级为全局会话:
- 初始创建时是本地会话
- 当需要创建临时节点时,自动升级为全局会话
- 升级后保持相同的会话 ID
升级触发条件:
- 创建临时节点(Ephemeral Node)
使用建议
// 场景1:只需要读取数据,不需要创建临时节点
// 使用本地会话可以大幅提升性能
// 场景2:需要创建临时节点
// 会自动升级为全局会话
// 场景3:使用 Observer 连接
// Observer 非常适合本地会话,避免对 Leader 的压力
注意事项:
- 本地会话丢失后无法在其他服务器恢复
- 断开 TCP 连接不一定会丢失会话(如果在超时前重连到同一服务器)
- 如果需要高可用性,建议使用传统全局会话
连接字符串与 chroot
客户端连接 ZooKeeper 时可以指定 chroot 后缀,所有操作都会相对于这个根路径:
// 普通 connection string
"127.0.0.1:2181"
// 带 chroot 的 connection string
"127.0.0.1:2181/app/a"
// 多服务器 + chroot
"server1:2181,server2:2181,server3:2181/app/a"
chroot 的作用:
- 所有路径操作自动添加
/app/a前缀 - 客户端代码中路径写作
/foo/bar,实际操作/app/a/foo/bar - 实现多租户环境的路径隔离
- 便于应用迁移和部署
示例:
// 连接字符串带 chroot
ZooKeeper zk = new ZooKeeper("localhost:2181/myapp", 30000, watcher);
// 客户端操作
zk.create("/config", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 实际创建的路径是 /myapp/config
zk.getData("/config", false, null);
// 实际读取的是 /myapp/config
Watch 监听机制
Watch 是 ZooKeeper 实现事件通知的核心机制。客户端可以在读取操作时注册 Watch,当数据发生变化时收到通知。
Watch 特性
一次性触发:Watch 触发后立即失效,需要重新注册才能继续监听。
异步通知:Watch 事件异步发送给客户端,不能保证实时性。
顺序保证:客户端先看到 Watch 事件,再看到数据变化。
Watch 类型
不同操作注册的 Watch 类型不同:
| 操作 | Watch 类型 | 触发条件 |
|---|---|---|
| exists | 数据 Watch | 节点创建、删除、数据变更 |
| getData | 数据 Watch | 节点删除、数据变更 |
| getChildren | 子节点 Watch | 子节点创建、删除 |
使用注意事项
使用 Watch 时需要注意以下几点:
- Watch 是一次性的:收到通知后需要重新注册
- 可能错过事件:在收到通知和重新注册之间可能发生多次变化
- 断开连接时不触发:客户端断开时不会收到 Watch 事件
- 会话事件:连接状态变化时会收到会话事件
持久递归监听(ZooKeeper 3.6.0+)
传统的 Watch 是一次性的,触发后需要重新注册。ZooKeeper 3.6.0 引入了持久递归监听,解决了这个问题:
持久监听
持久监听不会在触发后自动移除,可以持续监听节点变化:
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.Watcher;
// 添加持久监听
zk.addWatch("/mynode", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
// 不需要重新注册,监听会持续有效
}
}, AddWatchMode.PERSISTENT);
// 移除持久监听
zk.removeWatches("/mynode", watcher, Watcher.WatcherType.Any, false);
支持的触发事件:
- NodeCreated:节点创建
- NodeDeleted:节点删除
- NodeDataChanged:节点数据变更
注意:持久监听不会触发 NodeChildrenChanged 事件,因为这可以通过递归监听实现。
递归监听
递归监听可以监听节点及其所有子节点的变化:
// 添加递归监听(包含持久特性)
zk.addWatch("/parent", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
// 会收到 /parent 及其所有子节点的变化通知
}
}, AddWatchMode.PERSISTENT_RECURSIVE);
递归监听特点:
- 监听指定节点及其所有子节点(递归)
- 子节点的创建、删除、数据变更都会触发事件
- 新创建的子节点自动被监听
- 不会触发 NodeChildrenChanged 事件(因为递归监听已经覆盖)
适用场景:
- 监控配置目录的所有变化
- 监控服务注册目录的变化
- 需要持续监听多个子节点的场景
持久监听与传统监听对比
| 特性 | 传统 Watch | 持久监听 |
|---|---|---|
| 触发后行为 | 自动移除 | 保持监听 |
| 需要重新注册 | 是 | 否 |
| 事件类型 | 所有类型 | 不含 NodeChildrenChanged |
| 版本要求 | 所有版本 | 3.6.0+ |
Curator 中的缓存监听
Curator 提供了更便捷的缓存监听机制:
import org.apache.curator.framework.recipes.cache.*;
// NodeCache:监听单个节点的变化
NodeCache nodeCache = new NodeCache(client, "/mynode");
nodeCache.getListenable().addListener(() -> {
ChildData data = nodeCache.getCurrentData();
if (data != null) {
System.out.println("Data: " + new String(data.getData()));
}
});
nodeCache.start();
// PathChildrenCache:监听子节点的变化
PathChildrenCache childrenCache = new PathChildrenCache(client, "/parent", true);
childrenCache.getListenable().addListener((client1, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("Child added: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("Child removed: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("Child updated: " + event.getData().getPath());
break;
}
});
childrenCache.start();
// TreeCache:监听节点及其所有子节点(递归)
TreeCache treeCache = new TreeCache(client, "/parent");
treeCache.getListenable().addListener((client1, event) -> {
System.out.println("Event: " + event.getType() + ", Path: " + event.getData().getPath());
});
treeCache.start();
ACL 权限控制
ZooKeeper 使用 ACL(访问控制列表)控制对 ZNode 的访问权限。
权限类型
| 权限 | 缩写 | 说明 |
|---|---|---|
| READ | r | 读取节点数据和子节点列表 |
| WRITE | w | 设置节点数据 |
| CREATE | c | 创建子节点 |
| DELETE | d | 删除子节点 |
| ADMIN | a | 设置权限 |
内置认证方案
| 方案 | 说明 | 示例 |
|---|---|---|
| world | 默认方案,只有一个 ID:anyone | world:anyone |
| auth | 已认证用户 | auth:user:password |
| digest | 用户名密码认证 | digest:user:base64(SHA1(password)) |
| ip | IP 地址认证 | ip:192.168.1.1 |
| super | 超级管理员,拥有所有权限 |
常用 ACL
# 完全开放
setAcl /path world:anyone:rwcda
# 只读权限
setAcl /path world:anyone:r
# 认证用户拥有所有权限,其他用户只读
setAcl /path auth::rwcda,world:anyone:r
ZooKeeper 架构
服务器角色
ZooKeeper 集群中的服务器有以下角色:
Leader(领导者)
Leader 是集群的核心:
- 处理所有写请求
- 协调集群内部状态同步
- 负责投票和提案
Follower(跟随者)
Follower 是普通的服务器节点:
- 处理读请求
- 转发写请求给 Leader
- 参与投票
Observer(观察者)
Observer 是特殊的节点:
- 不参与投票
- 处理读请求,减轻 Follower 压力
- 适合跨数据中心部署
Observer 的优势:
- 提高扩展性:可以增加任意数量的 Observer 而不影响写性能,因为 Observer 不参与投票
- 提高可用性:Observer 故障不会影响集群可用性
- 跨数据中心部署:Observer 可以部署在不同数据中心,客户端读取本地 Observer 的数据,写入时只需最小化的网络传输
Observer 配置:
# 在 Observer 节点的配置文件中添加
peerType=observer
# 在所有节点的配置文件中标注 Observer
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888:observer
Observer Master
Observer Master 是 ZooKeeper 3.6.0 引入的特性,允许 Observer 连接到 Follower 而不是 Leader:
为什么需要 Observer Master?
默认情况下,Observer 连接到 Leader 获取数据更新。当 Observer 数量很多时,Leader 承受的压力会很大。Observer Master 让 Observer 可以连接到 Follower,从而:
- 减轻 Leader 压力:Leader 可以专注于协调写请求
- 支持更多 Observer:允许 Observer 扩展到数百个
- 加快 Observer 同步:减少 Observer 完成同步的时间
配置 Observer Master:
# 在所有节点的配置文件中添加
# 指定 Follower 监听 Observer 连接的端口
observerMasterPort=2191
工作原理:
Leader (端口 2888)
│
├── Follower 1 (Observer Master 端口 2191)
│ │
│ ├── Observer 1 (连接到 Follower 1)
│ └── Observer 2 (连接到 Follower 1)
│
└── Follower 2 (Observer Master 端口 2191)
│
└── Observer 3 (连接到 Follower 2)
适用场景:
- Observer 数量很多(数十个以上)
- Leader 负载过高
- 需要快速扩展 Observer
原子广播协议(ZAB)
ZooKeeper 使用 ZAB 协议保证数据一致性:
崩溃恢复:Leader 崩溃时,通过投票选举新 Leader
消息广播:Leader 将写请求广播给所有 Follower
ZAB 协议保证:
- 全序性:所有事务有全局顺序
- 一致性:所有服务器看到相同的事务序列
- 可靠性:已提交的事务不会丢失
读写流程
写流程
- 客户端向任意服务器发送写请求
- Follower 将请求转发给 Leader
- Leader 将请求转换为提案,分配事务ID
- Leader 向所有 Follower 发送提案
- Follower 将提案写入日志并返回确认
- 收到多数确认后,Leader 提交事务
- Leader 通知所有 Follower 提交
- 响应客户端
读流程
- 客户端向任意服务器发送读请求
- 服务器读取本地数据并返回
由于读请求直接从本地读取,可能读到旧数据。如需强一致性,可以在读之前调用 sync() 方法。
客户端使用
命令行操作
# 连接服务器
zkCli.sh -server localhost:2181
# 查看节点
ls /
ls -R /path
# 创建节点
create /path data
create -e /path data # 临时节点
create -s /path data # 顺序节点
create -e -s /path data # 临时顺序节点
# 获取节点数据
get /path
# 设置节点数据
set /path newdata
# 删除节点
delete /path # 无子节点时才能删除
deleteall /path # 递归删除
# 查看节点状态
stat /path
# 设置 ACL
setAcl /path acl
# 获取 ACL
getAcl /path
Java API 使用
创建连接
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
// 创建连接
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 服务器地址
30000, // 会话超时时间(毫秒)
event -> { // Watcher 回调
System.out.println("Event: " + event.getType());
}
);
// 等待连接建立
zk.getState().isConnected();
节点操作
// 创建持久节点
String path = zk.create(
"/mynode", // 节点路径
"hello".getBytes(), // 节点数据
ZooDefs.Ids.OPEN_ACL_UNSAFE, // ACL
CreateMode.PERSISTENT // 节点类型
);
// 创建临时节点
zk.create("/ephemeral", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 创建顺序节点
String seqPath = zk.create("/sequential", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
// 获取节点数据
Stat stat = new Stat();
byte[] data = zk.getData("/mynode", false, stat);
System.out.println("Data: " + new String(data));
System.out.println("Version: " + stat.getVersion());
// 获取数据并注册 Watch
byte[] data = zk.getData("/mynode", event -> {
System.out.println("Node changed: " + event.getPath());
}, stat);
// 设置节点数据
zk.setData("/mynode", "newdata".getBytes(), -1); // -1 表示不检查版本
// 条件更新(乐观锁)
try {
zk.setData("/mynode", "newdata".getBytes(), stat.getVersion());
} catch (KeeperException.BadVersionException e) {
// 版本冲突,更新失败
}
// 检查节点是否存在
Stat exists = zk.exists("/mynode", false);
if (exists != null) {
System.out.println("Node exists");
}
// 获取子节点列表
List<String> children = zk.getChildren("/", false);
// 删除节点
zk.delete("/mynode", -1);
// 获取当前会话创建的所有临时节点(3.6.0+)
// 方法1:获取所有临时节点
List<String> allEphemerals = zk.getEphemerals();
System.out.println("All ephemeral nodes: " + allEphemerals);
// 方法2:获取指定路径前缀的临时节点
List<String> serviceEphemerals = zk.getEphemerals("/services");
System.out.println("Service ephemeral nodes: " + serviceEphemerals);
// 使用场景:服务注册时检查是否已创建过临时节点
List<String> existingNodes = zk.getEphemerals("/services/payment");
if (existingNodes.stream().anyMatch(p -> p.contains("host1:8080"))) {
System.out.println("Service already registered");
} else {
zk.create("/services/payment/host1:8080", metadata,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
// 关闭连接
zk.close();
异步 API
ZooKeeper 同时提供异步 API,适合高并发场景:
// 异步创建节点
zk.create("/async", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("Created: " + name);
}
},
null); // ctx 上下文对象
// 异步获取数据
zk.getData("/async", false,
(rc, path, ctx, data, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("Data: " + new String(data));
}
},
null);
Curator 客户端
Curator 是 Netflix 开源的 ZooKeeper 客户端,提供了更高级的 API:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
// 创建节点
client.create()
.creatingParentsIfNeeded() // 自动创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath("/path/to/node", "data".getBytes());
// 获取数据
byte[] data = client.getData().forPath("/path/to/node");
// 设置数据
client.setData().forPath("/path/to/node", "newdata".getBytes());
// 删除节点
client.delete()
.deletingChildrenIfNeeded() // 递归删除子节点
.forPath("/path/to/node");
// 检查节点是否存在
Stat stat = client.checkExists().forPath("/path/to/node");
// 获取子节点
List<String> children = client.getChildren().forPath("/path");
// 关闭客户端
client.close();
分布式锁
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
// 创建可重入锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/mylock");
try {
// 获取锁
lock.acquire();
// 执行业务逻辑
System.out.println("Got lock, doing work...");
} finally {
// 释放锁
lock.release();
}
Leader 选举
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/election");
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("I am the leader!");
}
@Override
public void notLeader() {
System.out.println("I am not the leader anymore");
}
});
leaderLatch.start();
leaderLatch.await(); // 等待成为 Leader
leaderLatch.close();
典型应用场景
配置管理
集中管理分布式系统的配置:
// 应用启动时读取配置
byte[] configData = zk.getData("/config/app", event -> {
// 配置变化时重新加载
reloadConfig();
}, null);
// 更新配置
zk.setData("/config/app", newConfig.getBytes(), -1);
服务注册与发现
服务启动时注册到 ZooKeeper,停止时自动注销:
// 服务注册(创建临时节点)
String servicePath = zk.create(
"/services/payment/host1:8080",
"metadata".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL
);
// 服务发现
List<String> services = zk.getChildren("/services/payment", event -> {
// 服务列表变化时更新
updateServiceList();
});
分布式锁
使用临时顺序节点实现分布式锁:
public class DistributedLock {
private final ZooKeeper zk;
private final String lockPath;
private String currentPath;
public DistributedLock(ZooKeeper zk, String lockPath) {
this.zk = zk;
this.lockPath = lockPath;
}
public void lock() throws Exception {
// 创建临时顺序节点
currentPath = zk.create(lockPath + "/lock-",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有竞争者
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
// 检查自己是否是最小的
String currentNode = currentPath.substring(lockPath.length() + 1);
int currentIndex = children.indexOf(currentNode);
if (currentIndex == 0) {
// 获取锁成功
return;
}
// 等待前一个节点释放
String prevNode = children.get(currentIndex - 1);
final CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(lockPath + "/" + prevNode, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
latch.await();
}
}
public void unlock() throws Exception {
zk.delete(currentPath, -1);
}
}
分布式队列
使用持久顺序节点实现分布式队列:
public class DistributedQueue {
private final ZooKeeper zk;
private final String queuePath;
public DistributedQueue(ZooKeeper zk, String queuePath) {
this.zk = zk;
this.queuePath = queuePath;
}
// 入队
public void enqueue(byte[] data) throws Exception {
zk.create(queuePath + "/item-", data,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}
// 出队
public byte[] dequeue() throws Exception {
while (true) {
List<String> children = zk.getChildren(queuePath, false);
if (children.isEmpty()) {
return null;
}
Collections.sort(children);
String first = children.get(0);
try {
byte[] data = zk.getData(queuePath + "/" + first, false, null);
zk.delete(queuePath + "/" + first, -1);
return data;
} catch (KeeperException.NoNodeException e) {
// 节点已被其他消费者处理,继续尝试下一个
}
}
}
}
集群成员管理
使用临时节点管理集群成员:
// 成员加入
public void joinCluster(String memberId) throws Exception {
zk.create("/cluster/members/" + memberId,
"info".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
// 监控成员变化
public void watchMembers() throws Exception {
List<String> members = zk.getChildren("/cluster/members", event -> {
// 成员变化时重新获取列表
watchMembers();
});
System.out.println("Current members: " + members);
}
部署与运维
集群部署
ZooKeeper 集群通常部署奇数个节点(3、5、7),因为:
- 需要多数节点存活才能服务
- 3节点集群容忍1个节点故障
- 5节点集群容忍2个节点故障
- 4节点集群也只能容忍1个节点故障,与3节点相同
配置文件
# 基本配置
tickTime=2000 # 基本时间单位(毫秒)
dataDir=/var/lib/zookeeper # 数据目录
clientPort=2181 # 客户端连接端口
# 集群配置
initLimit=10 # Follower 连接 Leader 的超时时间(tickTime 倍数)
syncLimit=5 # Follower 与 Leader 同步的超时时间
# 集群成员(新格式,3.5.0+)
# server.id=host:quorumPort:electionPort[:role];clientPort
server.1=zoo1:2888:3888:participant;2181
server.2=zoo2:2888:3888:participant;2181
server.3=zoo3:2888:3888:participant;2181
# 自动清理
autopurge.snapRetainCount=3 # 保留的快照数量
autopurge.purgeInterval=1 # 清理间隔(小时)
多地址支持(3.6.0+)
ZooKeeper 3.6.0 支持为每个服务器配置多个网络地址,提高网络容错能力:
# 使用管道符 (|) 分隔多个地址
server.1=zoo1-net1:2888:3888|zoo1-net2:2889:3889;2181
server.2=zoo2-net1:2888:3888|zoo2-net2:2889:3889;2181
server.3=zoo3-net1:2888:3888|zoo3-net2:2889:3889;2181
多地址的优势:
- 网络容错:当一个网络接口故障时,自动切换到另一个
- 提高可用性:减少因网络问题导致的服务中断
- 跨网络部署:支持多网卡服务器的灵活配置
适用场景:
- 服务器有多个网络接口
- 需要网络级别的容错
- 跨子网部署
myid 文件
每个服务器需要创建 myid 文件:
# 在 server.1 上
echo "1" > /var/lib/zookeeper/myid
# 在 server.2 上
echo "2" > /var/lib/zookeeper/myid
# 在 server.3 上
echo "3" > /var/lib/zookeeper/myid
启动与管理
# 启动服务器
zkServer.sh start
# 查看状态
zkServer.sh status
# 停止服务器
zkServer.sh stop
# 四字命令
echo ruok | nc localhost 2181 # 检查服务器是否正常
echo stat | nc localhost 2181 # 查看服务器状态
echo conf | nc localhost 2181 # 查看配置
echo cons | nc localhost 2181 # 查看连接信息
echo dump | nc localhost 2181 # 查看会话和临时节点
echo wchs | nc localhost 2181 # 查看 Watch 统计
echo srst | nc localhost 2181 # 重置统计信息
性能调优
内存配置:
# 设置 JVM 堆大小(建议不超过物理内存的 75%)
export JVMFLAGS="-Xms2g -Xmx2g"
磁盘配置:
# 事务日志与快照分离存储
dataDir=/var/lib/zookeeper/snapshots
dataLogDir=/var/lib/zookeeper/logs
网络配置:
# 最大客户端连接数
maxClientCnxns=60
# 最小/最大会话超时
minSessionTimeout=4000
maxSessionTimeout=40000
监控指标
关键监控指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| latency_avg | 平均延迟 | > 10ms |
| packet_loss | 丢包率 | > 0 |
| outstanding_requests | 待处理请求数 | > 10 |
| watch_count | Watch 数量 | 过大需优化 |
| ephemerals_count | 临时节点数量 | 过大需优化 |
| approximate_data_size | 数据大小 | 接近内存限制 |
常见问题
脑裂问题
网络分区可能导致多个节点同时认为自己是 Leader。解决方案:
- 确保集群有足够的节点(至少3个)
- 合理设置超时时间
- 使用 Observer 降低跨数据中心影响
会话过期
会话过期会导致临时节点被删除。解决方案:
- 合理设置会话超时时间
- 监听会话状态变化
- 实现重连机制
数据不一致
读请求可能读到旧数据。解决方案:
- 使用 sync() 方法同步最新数据
- 对于关键数据,从 Leader 读取
动态重配置
动态重配置是 ZooKeeper 3.5.0 引入的重要特性,允许在不停止服务的情况下修改集群配置。
为什么需要动态重配置?
在 ZooKeeper 3.5.0 之前,修改集群成员(如添加/删除节点)需要进行"滚动重启":
- 停止一个节点
- 修改其配置文件
- 重启该节点
- 对其他节点重复以上步骤
这种方式存在以下问题:
- 操作繁琐:需要手动逐个重启节点
- 容易出错:配置错误可能导致数据丢失或不一致
- 服务中断:重启过程中可能出现短暂的服务不可用
动态重配置解决了这些问题,可以在运行时修改:
- 集群成员(添加/删除服务器)
- 服务器角色(Participant/Observer)
- 端口号
- 甚至仲裁系统
启用动态重配置
重要:从 ZooKeeper 3.5.3 开始,动态重配置默认禁用,需要显式启用:
# 启用动态重配置
reconfigEnabled=true
# 允许单节点模式(可选,推荐)
standaloneEnabled=false
动态配置文件
ZooKeeper 3.5.0+ 将配置分为静态配置和动态配置:
静态配置文件(zoo.cfg):
tickTime=2000
dataDir=/var/lib/zookeeper
initLimit=5
syncLimit=2
# 指向动态配置文件
dynamicConfigFile=/var/lib/zookeeper/zoo.cfg.dynamic
动态配置文件(zoo.cfg.dynamic):
server.1=125.23.63.23:2780:2783:participant;2791
server.2=125.23.63.24:2781:2784:participant;2792
server.3=125.23.63.25:2782:2785:participant;2793
查看当前配置
# 使用命令行
[zk: localhost:2181(CONNECTED)] config
server.1=localhost:2780:2783:participant;localhost:2791
server.2=localhost:2781:2784:participant;localhost:2792
server.3=localhost:2782:2785:participant;localhost:2793
version=400000003
# 只显示版本和连接字符串
[zk: localhost:2181(CONNECTED)] config -c
400000003 localhost:2791,localhost:2792,localhost:2793
增量重配置
增量模式允许添加和删除服务器:
# 添加服务器
reconfig -add server.4=125.23.63.26:2786:2787:participant;2794
# 删除服务器
reconfig -remove 3
# 同时添加和删除
reconfig -remove 3,4 -add server.5=localhost:2111:2112;2113
# 将 Participant 改为 Observer
reconfig -add server.3=localhost:2782:2785:observer;2793
批量重配置
批量模式指定完整的新配置:
# 从文件读取新配置
reconfig -file newconfig.cfg
# 直接指定新成员
reconfig -members server.1=...,server.2=...,server.3=...
Java API 使用
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.ArrayList;
public class ReconfigExample {
private ZooKeeper zk;
// 增量重配置
public void incrementalReconfig() throws Exception {
List<String> joiningServers = new ArrayList<>();
joiningServers.add("server.4=localhost:2786:2787;2794");
List<String> leavingServers = new ArrayList<>();
leavingServers.add("3");
// 参数:加入列表、离开列表、新配置列表、版本、Stat对象
byte[] newConfig = zk.reconfig(
joiningServers, // 要添加的服务器
leavingServers, // 要删除的服务器
null, // 完整新配置(批量模式用)
-1, // 版本号,-1表示不检查版本
new Stat()
);
System.out.println("新配置: " + new String(newConfig));
}
// 批量重配置
public void bulkReconfig() throws Exception {
List<String> newMembers = new ArrayList<>();
newMembers.add("server.1=localhost:2780:2783:participant;2791");
newMembers.add("server.2=localhost:2781:2784:participant;2792");
newMembers.add("server.3=localhost:2782:2785:observer;2793");
byte[] newConfig = zk.reconfig(null, null, newMembers, -1, new Stat());
System.out.println("新配置: " + new String(newConfig));
}
// 条件重配置(指定版本)
public void conditionalReconfig(long expectedVersion) throws Exception {
// 只有当前配置版本匹配时才执行重配置
List<String> joiningServers = new ArrayList<>();
joiningServers.add("server.4=localhost:2786:2787;2794");
try {
zk.reconfig(joiningServers, null, null, expectedVersion, new Stat());
} catch (BadVersionException e) {
System.out.println("配置已被其他操作修改,请重试");
}
}
// 获取当前配置
public void getCurrentConfig() throws Exception {
// 先同步确保获取最新配置
zk.sync(ZooDefs.CONFIG_NODE, null, null);
byte[] config = zk.getConfig(false, new Stat());
System.out.println("当前配置: " + new String(config));
}
}
添加新服务器的步骤
添加新服务器需要特殊处理,因为新服务器需要先连接到现有集群:
方法1:将加入者列为 Observer(推荐)
# 新服务器 D 的初始配置
# 注意:D 被列为 Observer,但不会真正成为 Observer
# 只是防止它与其他加入者组成仲裁
server.1=host1:2888:3888:participant;2181
server.2=host2:2888:3888:participant;2181
server.3=host3:2888:3888:participant;2181
server.4=host4:2888:3888:observer;2181 # 新服务器,列为 Observer
方法2:将加入者列为 Participant
# 新服务器 D 的初始配置
# 警告:不要在初始配置中列出多个加入者作为 Participant
server.1=host1:2888:3888:participant;2181
server.2=host2:2888:3888:participant;2181
server.3=host3:2888:3888:participant;2181
server.4=host4:2888:3888:participant;2181 # 只有 D 一个加入者
警告:永远不要在初始配置中将多个加入者列为 Participant,否则可能形成独立的仲裁(脑裂)。
删除服务器的注意事项
删除服务器时,被删除的服务器不会自动关闭,而是成为"非投票跟随者":
- 优点:连接到该服务器的客户端不会被立即断开
- 缺点:非投票跟随者对系统吞吐量有负面影响
- 建议:删除后应尽快关闭或重新配置该服务器
删除 Leader:
删除 Leader 会导致短暂的不可用(通常比 Leader 崩溃恢复更快),因为 Leader 会主动提名新 Leader。
安全配置
从 ZooKeeper 3.5.3 开始,动态重配置需要权限控制:
启用 ACL 检查:
# 启用动态重配置
reconfigEnabled=true
# 启用认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
跳过 ACL 检查(仅限安全环境):
# 跳过 ACL 检查(不推荐生产环境使用)
skipACL=yes
使用权限控制:
// 添加认证
zk.addAuthInfo("digest", "admin:password".getBytes());
// 执行重配置(需要有权限)
List<String> joiningServers = new ArrayList<>();
joiningServers.add("server.4=...");
zk.reconfig(joiningServers, null, null, -1, new Stat());
客户端连接迁移
当集群配置变化时,客户端需要更新连接字符串:
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class ReconfigAwareClient {
private ZooKeeper zk;
private List<String> servers;
public void connectWithReconfigSupport(List<String> serverList) {
servers = serverList;
try {
zk = new ZooKeeper(String.join(",", servers), 30000, event -> {
if (event.getType() == Watcher.Event.EventType.None) {
// 处理连接状态变化
handleConnectionChange(event);
}
});
// 设置配置变更监听
watchConfigChanges();
} catch (Exception e) {
e.printStackTrace();
}
}
private void watchConfigChanges() throws Exception {
// 获取当前配置并设置监听
zk.getConfig(true, (rc, path, ctx, data, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
// 配置变更,更新连接字符串
String config = new String(data);
updateConnectionString(config);
}
}, null);
}
private void updateConnectionString(String config) {
// 解析配置并更新服务器列表
// 重新连接时使用新的服务器列表
System.out.println("配置已变更: " + config);
}
private void handleConnectionChange(WatchedEvent event) {
switch (event.getState()) {
case SyncConnected:
System.out.println("已连接");
break;
case Disconnected:
System.out.println("连接断开,尝试重连");
break;
case Expired:
System.out.println("会话过期,需要重建会话");
break;
}
}
}
动态重配置最佳实践
- 启用 standaloneEnabled=false:允许单节点集群扩展
- 使用增量模式:更安全,适合大多数场景
- 监控配置版本:跟踪配置变更历史
- 保留旧配置:配置文件会自动备份,不要手动删除
- 避免高峰期操作:重配置可能影响性能
- 测试环境验证:在生产环境操作前先在测试环境验证
小结
本章介绍了 ZooKeeper 的核心概念和使用方法:
- 数据模型:树形命名空间、ZNode、节点类型(包括 Container 和 TTL)、版本控制
- 会话机制:会话状态、超时管理、本地会话、重连机制、chroot 支持
- Watch 机制:一次性触发、持久递归监听(3.6.0+)、异步通知、使用注意事项
- ACL 权限:权限类型、认证方案、权限设置
- 架构设计:Leader、Follower、Observer 角色,ZAB 协议
- 应用场景:配置管理、服务发现、分布式锁、队列
- 部署运维:集群部署、配置参数、监控指标
ZooKeeper 版本特性总结
| 版本 | 新增特性 |
|---|---|
| 3.5.0 | 动态重配置、本地会话(Local Session) |
| 3.5.3 | 容器节点(Container Nodes)、TTL 节点、reconfigEnabled 安全选项 |
| 3.6.0 | 持久递归监听(Persistent Recursive Watches)、多地址支持、Observer Master |
| 3.7.0 | 改进的快照格式、更好的内存管理 |
| 3.8.0 | 安全性增强、性能优化 |
| 3.9.0 | 证书热重载(cert.Reload)、安全审计日志增强 |
最佳实践
- 客户端选择:生产环境推荐使用 Curator,它提供了更高级的 API 和常用配方的实现
- 会话超时设置:根据网络环境和业务需求合理设置,一般 10-30 秒
- 节点类型选择:
- 配置数据:持久节点
- 服务注册:临时节点
- Leader 选举:临时顺序节点或容器节点
- 需要自动清理的数据:TTL 节点
- 监听策略:
- 简单监听:传统 Watch
- 需要持续监听:持久监听(3.6.0+)
- 监听目录树:TreeCache 或递归监听
- 集群规模:生产环境建议 3 或 5 个节点,奇数个
- 监控告警:重点关注延迟、Watch 数量、临时节点数量
ZooKeeper 是分布式系统的核心基础设施,掌握 ZooKeeper 对于构建可靠的分布式应用至关重要。在实际项目中,推荐使用 Curator 等高级客户端库简化开发。