跳到主要内容

ZooKeeper 分布式协调服务

Apache ZooKeeper 是一个集中式的分布式协调服务,用于维护配置信息、命名服务、分布式同步和组服务。它是 Hadoop、Kafka、HBase 等分布式系统的核心基础设施。

什么是 ZooKeeper?

ZooKeeper 最初是 Yahoo! 为 Hadoop 项目开发的子项目,后来成为 Apache 顶级项目。它的设计目标是简化分布式系统的开发,让开发者无需从头实现复杂的协调逻辑。

核心特性

ZooKeeper 提供以下核心能力:

  1. 配置管理:集中存储和管理分布式系统的配置信息
  2. 命名服务:提供统一的命名注册和发现机制
  3. 分布式同步:实现分布式锁、屏障等同步原语
  4. 组服务:管理集群成员关系和选举

设计目标

ZooKeeper 的设计遵循以下原则:

  • 简单性:提供类似文件系统的树形命名空间
  • 高性能:数据存储在内存中,支持高吞吐量
  • 有序性:所有更新操作都有全局顺序
  • 可靠性:通过复制实现高可用

数据模型

ZNode 节点

ZooKeeper 的数据模型是一个层次化的命名空间,称为"数据树"。每个节点称为 ZNode,可以存储数据并拥有子节点。

/
├── zookeeper(系统保留)
│ └── config
├── hbase
│ ├── meta-region-server
│ └── rs
├── kafka
│ ├── brokers
│ ├── controller
│ └── consumers
└── services
├── service1
└── service2

ZNode 的核心特性:

节点类型

类型说明用途
持久节点(Persistent)节点创建后一直存在,直到被删除存储配置、元数据
临时节点(Ephemeral)会话结束时节自动删除服务注册、临时状态
持久顺序节点持久节点 + 自动递增序号分布式队列、选举
临时顺序节点临时节点 + 自动递增序号分布式锁、选举
容器节点(Container)最后一个子节点被删除后,容器节点成为删除候选Leader选举、锁的实现
TTL节点在指定时间内未被修改且无子节点时自动删除临时数据存储

容器节点(Container Nodes)

容器节点是 ZooKeeper 3.5.3 引入的特殊节点类型,主要用于实现 Leader 选举、分布式锁等场景:

工作原理

  • 当容器节点的最后一个子节点被删除时,该容器节点会成为删除候选
  • 服务器会在未来的某个时间点自动删除该容器节点

使用场景

  • Leader 选举:作为选举节点的父节点,当所有参与者离开后自动清理
  • 分布式锁:作为锁节点的父节点,避免锁释放后留下空目录

注意事项

  • 在容器节点内创建子节点时,需要捕获 KeeperException.NoNodeException
  • 如果容器节点已被删除,需要重新创建
// 创建容器节点
zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);

// 在容器节点内创建子节点时处理可能的删除
try {
zk.create("/locks/lock-", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException.NoNodeException e) {
// 容器节点已被删除,重新创建
zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);
zk.create("/locks/lock-", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}

TTL节点

TTL(Time To Live)节点允许为持久节点设置生存时间,在 ZooKeeper 3.5.3 中引入:

特点

  • 只适用于持久节点(PERSISTENT)和持久顺序节点(PERSISTENT_SEQUENTIAL)
  • 节点在 TTL 时间内未被修改且没有子节点,将成为删除候选
  • TTL 以毫秒为单位

启用方式

TTL 节点默认禁用,需要在服务器配置中启用:

# 在 zoo.cfg 中添加
znode.container.checkIntervalMs=1000
znode.container.maxPerMinute=10000

使用示例

// 创建 TTL 节点(需要自定义 CreateMode)
// 注意:标准 ZooKeeper API 不直接支持 TTL,需要使用扩展方法
// 使用 Curator 客户端更方便
CuratorFramework client = ...;
client.create()
.withMode(CreateMode.PERSISTENT_WITH_TTL)
.withTtl(3600000) // 1小时 = 3600000毫秒
.forPath("/ttl-node", "data".getBytes());

适用场景

  • 临时配置存储:配置在一定时间后自动清理
  • 缓存数据:缓存数据在过期后自动删除
  • 会话相关数据:无需使用临时节点但需要自动清理的数据

Stat 结构

每个 ZNode 都包含一个 Stat 结构,记录节点的元数据:

字段说明
czxid创建节点的事务ID
mzxid最后修改的事务ID
pzxid子节点最后修改的事务ID
ctime创建时间
mtime最后修改时间
version数据版本号
cversion子节点版本号
aversionACL版本号
ephemeralOwner临时节点的会话ID
dataLength数据长度
numChildren子节点数量

数据访问特点

ZooKeeper 的数据访问有几个重要特点:

原子性:读写操作都是原子的。读取会获取节点的所有数据,写入会替换所有数据。

数据大小限制:每个 ZNode 的数据限制在 1MB 以内。ZooKeeper 设计用于存储协调数据(配置、状态、元数据),而非大数据存储。

版本控制:更新和删除操作可以指定版本号,实现乐观并发控制。如果版本不匹配,操作会失败。

会话机制

会话状态

客户端与 ZooKeeper 服务器建立连接后创建会话,会话有以下状态:

状态说明
CONNECTING正在连接服务器
CONNECTED已连接,可以正常操作
RECONNECTING连接断开后正在重连
EXPIRED会话过期,需要重新创建

会话状态转换遵循以下规则:

┌─────────────┐     连接成功      ┌─────────────┐
│ CONNECTING │ ───────────────→ │ CONNECTED │
└─────────────┘ └─────────────┘
↑ │
│ │
│ 连接断开 │
└────────────────────────────────┤

│ 会话过期

┌─────────────┐
│ EXPIRED │
└─────────────┘

重要说明

  • 会话过期由 ZooKeeper 集群管理,而非客户端
  • 当集群在会话超时时间内未收到客户端心跳时,会话过期
  • 客户端断开连接后,会话可能仍然有效(如果在超时时间内重连)
  • 会话过期时,该会话创建的所有临时节点会被删除

会话超时

会话超时是 ZooKeeper 容错机制的关键:

  1. 客户端在创建连接时指定超时时间
  2. 服务器返回实际可用的超时时间
  3. 客户端定期发送心跳保持会话活跃
  4. 超时未收到心跳,服务器会清理该会话的所有临时节点

超时时间的选择需要权衡:

  • 太短:网络抖动可能导致会话频繁过期
  • 太长:故障检测延迟增加

通常建议设置为几秒到几十秒。

本地会话(Local Session)

本地会话是 ZooKeeper 3.5.0 引入的特性,用于解决大规模客户端连接的性能问题。

为什么需要本地会话?

在传统会话模型中:

  • 会话的创建和关闭需要仲裁确认
  • 会话状态需要在 Leader 和所有 Follower 之间同步
  • 当连接数达到数千时,会话管理成为性能瓶颈

本地会话特点

本地会话将会话信息只保存在客户端连接的服务器上:

特性传统会话(全局会话)本地会话
会话信息存储所有服务器仅连接的服务器
创建/关闭开销需要仲裁确认本地处理
临时节点创建支持默认不支持
会话恢复可在任意服务器恢复只能在原服务器恢复
适用场景需要临时节点大量短连接、轻量级监听

配置启用

# 启用本地会话
localSessionsEnabled=true

# 是否允许本地会话升级为全局会话
localSessionsUpgradingEnabled=true

本地会话升级

localSessionsUpgradingEnabled=true 时,本地会话可以自动升级为全局会话:

  1. 初始创建时是本地会话
  2. 当需要创建临时节点时,自动升级为全局会话
  3. 升级后保持相同的会话 ID

升级触发条件

  • 创建临时节点(Ephemeral Node)

使用建议

// 场景1:只需要读取数据,不需要创建临时节点
// 使用本地会话可以大幅提升性能

// 场景2:需要创建临时节点
// 会自动升级为全局会话

// 场景3:使用 Observer 连接
// Observer 非常适合本地会话,避免对 Leader 的压力

注意事项

  • 本地会话丢失后无法在其他服务器恢复
  • 断开 TCP 连接不一定会丢失会话(如果在超时前重连到同一服务器)
  • 如果需要高可用性,建议使用传统全局会话

连接字符串与 chroot

客户端连接 ZooKeeper 时可以指定 chroot 后缀,所有操作都会相对于这个根路径:

// 普通 connection string
"127.0.0.1:2181"

// 带 chroot 的 connection string
"127.0.0.1:2181/app/a"

// 多服务器 + chroot
"server1:2181,server2:2181,server3:2181/app/a"

chroot 的作用

  • 所有路径操作自动添加 /app/a 前缀
  • 客户端代码中路径写作 /foo/bar,实际操作 /app/a/foo/bar
  • 实现多租户环境的路径隔离
  • 便于应用迁移和部署

示例

// 连接字符串带 chroot
ZooKeeper zk = new ZooKeeper("localhost:2181/myapp", 30000, watcher);

// 客户端操作
zk.create("/config", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 实际创建的路径是 /myapp/config

zk.getData("/config", false, null);
// 实际读取的是 /myapp/config

Watch 监听机制

Watch 是 ZooKeeper 实现事件通知的核心机制。客户端可以在读取操作时注册 Watch,当数据发生变化时收到通知。

Watch 特性

一次性触发:Watch 触发后立即失效,需要重新注册才能继续监听。

异步通知:Watch 事件异步发送给客户端,不能保证实时性。

顺序保证:客户端先看到 Watch 事件,再看到数据变化。

Watch 类型

不同操作注册的 Watch 类型不同:

操作Watch 类型触发条件
exists数据 Watch节点创建、删除、数据变更
getData数据 Watch节点删除、数据变更
getChildren子节点 Watch子节点创建、删除

使用注意事项

使用 Watch 时需要注意以下几点:

  1. Watch 是一次性的:收到通知后需要重新注册
  2. 可能错过事件:在收到通知和重新注册之间可能发生多次变化
  3. 断开连接时不触发:客户端断开时不会收到 Watch 事件
  4. 会话事件:连接状态变化时会收到会话事件

持久递归监听(ZooKeeper 3.6.0+)

传统的 Watch 是一次性的,触发后需要重新注册。ZooKeeper 3.6.0 引入了持久递归监听,解决了这个问题:

持久监听

持久监听不会在触发后自动移除,可以持续监听节点变化:

import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.Watcher;

// 添加持久监听
zk.addWatch("/mynode", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
// 不需要重新注册,监听会持续有效
}
}, AddWatchMode.PERSISTENT);

// 移除持久监听
zk.removeWatches("/mynode", watcher, Watcher.WatcherType.Any, false);

支持的触发事件

  • NodeCreated:节点创建
  • NodeDeleted:节点删除
  • NodeDataChanged:节点数据变更

注意:持久监听不会触发 NodeChildrenChanged 事件,因为这可以通过递归监听实现。

递归监听

递归监听可以监听节点及其所有子节点的变化:

// 添加递归监听(包含持久特性)
zk.addWatch("/parent", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + ", Path: " + event.getPath());
// 会收到 /parent 及其所有子节点的变化通知
}
}, AddWatchMode.PERSISTENT_RECURSIVE);

递归监听特点

  • 监听指定节点及其所有子节点(递归)
  • 子节点的创建、删除、数据变更都会触发事件
  • 新创建的子节点自动被监听
  • 不会触发 NodeChildrenChanged 事件(因为递归监听已经覆盖)

适用场景

  • 监控配置目录的所有变化
  • 监控服务注册目录的变化
  • 需要持续监听多个子节点的场景

持久监听与传统监听对比

特性传统 Watch持久监听
触发后行为自动移除保持监听
需要重新注册
事件类型所有类型不含 NodeChildrenChanged
版本要求所有版本3.6.0+

Curator 中的缓存监听

Curator 提供了更便捷的缓存监听机制:

import org.apache.curator.framework.recipes.cache.*;

// NodeCache:监听单个节点的变化
NodeCache nodeCache = new NodeCache(client, "/mynode");
nodeCache.getListenable().addListener(() -> {
ChildData data = nodeCache.getCurrentData();
if (data != null) {
System.out.println("Data: " + new String(data.getData()));
}
});
nodeCache.start();

// PathChildrenCache:监听子节点的变化
PathChildrenCache childrenCache = new PathChildrenCache(client, "/parent", true);
childrenCache.getListenable().addListener((client1, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("Child added: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("Child removed: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("Child updated: " + event.getData().getPath());
break;
}
});
childrenCache.start();

// TreeCache:监听节点及其所有子节点(递归)
TreeCache treeCache = new TreeCache(client, "/parent");
treeCache.getListenable().addListener((client1, event) -> {
System.out.println("Event: " + event.getType() + ", Path: " + event.getData().getPath());
});
treeCache.start();

ACL 权限控制

ZooKeeper 使用 ACL(访问控制列表)控制对 ZNode 的访问权限。

权限类型

权限缩写说明
READr读取节点数据和子节点列表
WRITEw设置节点数据
CREATEc创建子节点
DELETEd删除子节点
ADMINa设置权限

内置认证方案

方案说明示例
world默认方案,只有一个 ID:anyoneworld:anyone
auth已认证用户auth:user:password
digest用户名密码认证digest:user:base64(SHA1(password))
ipIP 地址认证ip:192.168.1.1
super超级管理员,拥有所有权限

常用 ACL

# 完全开放
setAcl /path world:anyone:rwcda

# 只读权限
setAcl /path world:anyone:r

# 认证用户拥有所有权限,其他用户只读
setAcl /path auth::rwcda,world:anyone:r

ZooKeeper 架构

服务器角色

ZooKeeper 集群中的服务器有以下角色:

Leader(领导者)

Leader 是集群的核心:

  • 处理所有写请求
  • 协调集群内部状态同步
  • 负责投票和提案

Follower(跟随者)

Follower 是普通的服务器节点:

  • 处理读请求
  • 转发写请求给 Leader
  • 参与投票

Observer(观察者)

Observer 是特殊的节点:

  • 不参与投票
  • 处理读请求,减轻 Follower 压力
  • 适合跨数据中心部署

Observer 的优势

  1. 提高扩展性:可以增加任意数量的 Observer 而不影响写性能,因为 Observer 不参与投票
  2. 提高可用性:Observer 故障不会影响集群可用性
  3. 跨数据中心部署:Observer 可以部署在不同数据中心,客户端读取本地 Observer 的数据,写入时只需最小化的网络传输

Observer 配置

# 在 Observer 节点的配置文件中添加
peerType=observer

# 在所有节点的配置文件中标注 Observer
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888:observer

Observer Master

Observer Master 是 ZooKeeper 3.6.0 引入的特性,允许 Observer 连接到 Follower 而不是 Leader:

为什么需要 Observer Master?

默认情况下,Observer 连接到 Leader 获取数据更新。当 Observer 数量很多时,Leader 承受的压力会很大。Observer Master 让 Observer 可以连接到 Follower,从而:

  1. 减轻 Leader 压力:Leader 可以专注于协调写请求
  2. 支持更多 Observer:允许 Observer 扩展到数百个
  3. 加快 Observer 同步:减少 Observer 完成同步的时间

配置 Observer Master

# 在所有节点的配置文件中添加
# 指定 Follower 监听 Observer 连接的端口
observerMasterPort=2191

工作原理

Leader (端口 2888)

├── Follower 1 (Observer Master 端口 2191)
│ │
│ ├── Observer 1 (连接到 Follower 1)
│ └── Observer 2 (连接到 Follower 1)

└── Follower 2 (Observer Master 端口 2191)

└── Observer 3 (连接到 Follower 2)

适用场景

  • Observer 数量很多(数十个以上)
  • Leader 负载过高
  • 需要快速扩展 Observer

原子广播协议(ZAB)

ZooKeeper 使用 ZAB 协议保证数据一致性:

崩溃恢复:Leader 崩溃时,通过投票选举新 Leader

消息广播:Leader 将写请求广播给所有 Follower

ZAB 协议保证:

  1. 全序性:所有事务有全局顺序
  2. 一致性:所有服务器看到相同的事务序列
  3. 可靠性:已提交的事务不会丢失

读写流程

写流程

  1. 客户端向任意服务器发送写请求
  2. Follower 将请求转发给 Leader
  3. Leader 将请求转换为提案,分配事务ID
  4. Leader 向所有 Follower 发送提案
  5. Follower 将提案写入日志并返回确认
  6. 收到多数确认后,Leader 提交事务
  7. Leader 通知所有 Follower 提交
  8. 响应客户端

读流程

  1. 客户端向任意服务器发送读请求
  2. 服务器读取本地数据并返回

由于读请求直接从本地读取,可能读到旧数据。如需强一致性,可以在读之前调用 sync() 方法。

客户端使用

命令行操作

# 连接服务器
zkCli.sh -server localhost:2181

# 查看节点
ls /
ls -R /path

# 创建节点
create /path data
create -e /path data # 临时节点
create -s /path data # 顺序节点
create -e -s /path data # 临时顺序节点

# 获取节点数据
get /path

# 设置节点数据
set /path newdata

# 删除节点
delete /path # 无子节点时才能删除
deleteall /path # 递归删除

# 查看节点状态
stat /path

# 设置 ACL
setAcl /path acl

# 获取 ACL
getAcl /path

Java API 使用

创建连接

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

// 创建连接
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 服务器地址
30000, // 会话超时时间(毫秒)
event -> { // Watcher 回调
System.out.println("Event: " + event.getType());
}
);

// 等待连接建立
zk.getState().isConnected();

节点操作

// 创建持久节点
String path = zk.create(
"/mynode", // 节点路径
"hello".getBytes(), // 节点数据
ZooDefs.Ids.OPEN_ACL_UNSAFE, // ACL
CreateMode.PERSISTENT // 节点类型
);

// 创建临时节点
zk.create("/ephemeral", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

// 创建顺序节点
String seqPath = zk.create("/sequential", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

// 获取节点数据
Stat stat = new Stat();
byte[] data = zk.getData("/mynode", false, stat);
System.out.println("Data: " + new String(data));
System.out.println("Version: " + stat.getVersion());

// 获取数据并注册 Watch
byte[] data = zk.getData("/mynode", event -> {
System.out.println("Node changed: " + event.getPath());
}, stat);

// 设置节点数据
zk.setData("/mynode", "newdata".getBytes(), -1); // -1 表示不检查版本

// 条件更新(乐观锁)
try {
zk.setData("/mynode", "newdata".getBytes(), stat.getVersion());
} catch (KeeperException.BadVersionException e) {
// 版本冲突,更新失败
}

// 检查节点是否存在
Stat exists = zk.exists("/mynode", false);
if (exists != null) {
System.out.println("Node exists");
}

// 获取子节点列表
List<String> children = zk.getChildren("/", false);

// 删除节点
zk.delete("/mynode", -1);

// 获取当前会话创建的所有临时节点(3.6.0+)
// 方法1:获取所有临时节点
List<String> allEphemerals = zk.getEphemerals();
System.out.println("All ephemeral nodes: " + allEphemerals);

// 方法2:获取指定路径前缀的临时节点
List<String> serviceEphemerals = zk.getEphemerals("/services");
System.out.println("Service ephemeral nodes: " + serviceEphemerals);

// 使用场景:服务注册时检查是否已创建过临时节点
List<String> existingNodes = zk.getEphemerals("/services/payment");
if (existingNodes.stream().anyMatch(p -> p.contains("host1:8080"))) {
System.out.println("Service already registered");
} else {
zk.create("/services/payment/host1:8080", metadata,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}

// 关闭连接
zk.close();

异步 API

ZooKeeper 同时提供异步 API,适合高并发场景:

// 异步创建节点
zk.create("/async", "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("Created: " + name);
}
},
null); // ctx 上下文对象

// 异步获取数据
zk.getData("/async", false,
(rc, path, ctx, data, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("Data: " + new String(data));
}
},
null);

Curator 客户端

Curator 是 Netflix 开源的 ZooKeeper 客户端,提供了更高级的 API:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

// 创建节点
client.create()
.creatingParentsIfNeeded() // 自动创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath("/path/to/node", "data".getBytes());

// 获取数据
byte[] data = client.getData().forPath("/path/to/node");

// 设置数据
client.setData().forPath("/path/to/node", "newdata".getBytes());

// 删除节点
client.delete()
.deletingChildrenIfNeeded() // 递归删除子节点
.forPath("/path/to/node");

// 检查节点是否存在
Stat stat = client.checkExists().forPath("/path/to/node");

// 获取子节点
List<String> children = client.getChildren().forPath("/path");

// 关闭客户端
client.close();

分布式锁

import org.apache.curator.framework.recipes.locks.InterProcessMutex;

// 创建可重入锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/mylock");

try {
// 获取锁
lock.acquire();

// 执行业务逻辑
System.out.println("Got lock, doing work...");

} finally {
// 释放锁
lock.release();
}

Leader 选举

import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/election");

leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("I am the leader!");
}

@Override
public void notLeader() {
System.out.println("I am not the leader anymore");
}
});

leaderLatch.start();
leaderLatch.await(); // 等待成为 Leader
leaderLatch.close();

典型应用场景

配置管理

集中管理分布式系统的配置:

// 应用启动时读取配置
byte[] configData = zk.getData("/config/app", event -> {
// 配置变化时重新加载
reloadConfig();
}, null);

// 更新配置
zk.setData("/config/app", newConfig.getBytes(), -1);

服务注册与发现

服务启动时注册到 ZooKeeper,停止时自动注销:

// 服务注册(创建临时节点)
String servicePath = zk.create(
"/services/payment/host1:8080",
"metadata".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL
);

// 服务发现
List<String> services = zk.getChildren("/services/payment", event -> {
// 服务列表变化时更新
updateServiceList();
});

分布式锁

使用临时顺序节点实现分布式锁:

public class DistributedLock {
private final ZooKeeper zk;
private final String lockPath;
private String currentPath;

public DistributedLock(ZooKeeper zk, String lockPath) {
this.zk = zk;
this.lockPath = lockPath;
}

public void lock() throws Exception {
// 创建临时顺序节点
currentPath = zk.create(lockPath + "/lock-",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);

// 获取所有竞争者
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);

// 检查自己是否是最小的
String currentNode = currentPath.substring(lockPath.length() + 1);
int currentIndex = children.indexOf(currentNode);

if (currentIndex == 0) {
// 获取锁成功
return;
}

// 等待前一个节点释放
String prevNode = children.get(currentIndex - 1);
final CountDownLatch latch = new CountDownLatch(1);

Stat stat = zk.exists(lockPath + "/" + prevNode, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});

if (stat != null) {
latch.await();
}
}

public void unlock() throws Exception {
zk.delete(currentPath, -1);
}
}

分布式队列

使用持久顺序节点实现分布式队列:

public class DistributedQueue {
private final ZooKeeper zk;
private final String queuePath;

public DistributedQueue(ZooKeeper zk, String queuePath) {
this.zk = zk;
this.queuePath = queuePath;
}

// 入队
public void enqueue(byte[] data) throws Exception {
zk.create(queuePath + "/item-", data,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}

// 出队
public byte[] dequeue() throws Exception {
while (true) {
List<String> children = zk.getChildren(queuePath, false);
if (children.isEmpty()) {
return null;
}

Collections.sort(children);
String first = children.get(0);

try {
byte[] data = zk.getData(queuePath + "/" + first, false, null);
zk.delete(queuePath + "/" + first, -1);
return data;
} catch (KeeperException.NoNodeException e) {
// 节点已被其他消费者处理,继续尝试下一个
}
}
}
}

集群成员管理

使用临时节点管理集群成员:

// 成员加入
public void joinCluster(String memberId) throws Exception {
zk.create("/cluster/members/" + memberId,
"info".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}

// 监控成员变化
public void watchMembers() throws Exception {
List<String> members = zk.getChildren("/cluster/members", event -> {
// 成员变化时重新获取列表
watchMembers();
});
System.out.println("Current members: " + members);
}

部署与运维

集群部署

ZooKeeper 集群通常部署奇数个节点(3、5、7),因为:

  • 需要多数节点存活才能服务
  • 3节点集群容忍1个节点故障
  • 5节点集群容忍2个节点故障
  • 4节点集群也只能容忍1个节点故障,与3节点相同

配置文件

# 基本配置
tickTime=2000 # 基本时间单位(毫秒)
dataDir=/var/lib/zookeeper # 数据目录
clientPort=2181 # 客户端连接端口

# 集群配置
initLimit=10 # Follower 连接 Leader 的超时时间(tickTime 倍数)
syncLimit=5 # Follower 与 Leader 同步的超时时间

# 集群成员(新格式,3.5.0+)
# server.id=host:quorumPort:electionPort[:role];clientPort
server.1=zoo1:2888:3888:participant;2181
server.2=zoo2:2888:3888:participant;2181
server.3=zoo3:2888:3888:participant;2181

# 自动清理
autopurge.snapRetainCount=3 # 保留的快照数量
autopurge.purgeInterval=1 # 清理间隔(小时)

多地址支持(3.6.0+)

ZooKeeper 3.6.0 支持为每个服务器配置多个网络地址,提高网络容错能力:

# 使用管道符 (|) 分隔多个地址
server.1=zoo1-net1:2888:3888|zoo1-net2:2889:3889;2181
server.2=zoo2-net1:2888:3888|zoo2-net2:2889:3889;2181
server.3=zoo3-net1:2888:3888|zoo3-net2:2889:3889;2181

多地址的优势

  1. 网络容错:当一个网络接口故障时,自动切换到另一个
  2. 提高可用性:减少因网络问题导致的服务中断
  3. 跨网络部署:支持多网卡服务器的灵活配置

适用场景

  • 服务器有多个网络接口
  • 需要网络级别的容错
  • 跨子网部署

myid 文件

每个服务器需要创建 myid 文件:

# 在 server.1 上
echo "1" > /var/lib/zookeeper/myid

# 在 server.2 上
echo "2" > /var/lib/zookeeper/myid

# 在 server.3 上
echo "3" > /var/lib/zookeeper/myid

启动与管理

# 启动服务器
zkServer.sh start

# 查看状态
zkServer.sh status

# 停止服务器
zkServer.sh stop

# 四字命令
echo ruok | nc localhost 2181 # 检查服务器是否正常
echo stat | nc localhost 2181 # 查看服务器状态
echo conf | nc localhost 2181 # 查看配置
echo cons | nc localhost 2181 # 查看连接信息
echo dump | nc localhost 2181 # 查看会话和临时节点
echo wchs | nc localhost 2181 # 查看 Watch 统计
echo srst | nc localhost 2181 # 重置统计信息

性能调优

内存配置

# 设置 JVM 堆大小(建议不超过物理内存的 75%)
export JVMFLAGS="-Xms2g -Xmx2g"

磁盘配置

# 事务日志与快照分离存储
dataDir=/var/lib/zookeeper/snapshots
dataLogDir=/var/lib/zookeeper/logs

网络配置

# 最大客户端连接数
maxClientCnxns=60

# 最小/最大会话超时
minSessionTimeout=4000
maxSessionTimeout=40000

监控指标

关键监控指标:

指标说明告警阈值
latency_avg平均延迟> 10ms
packet_loss丢包率> 0
outstanding_requests待处理请求数> 10
watch_countWatch 数量过大需优化
ephemerals_count临时节点数量过大需优化
approximate_data_size数据大小接近内存限制

常见问题

脑裂问题

网络分区可能导致多个节点同时认为自己是 Leader。解决方案:

  • 确保集群有足够的节点(至少3个)
  • 合理设置超时时间
  • 使用 Observer 降低跨数据中心影响

会话过期

会话过期会导致临时节点被删除。解决方案:

  • 合理设置会话超时时间
  • 监听会话状态变化
  • 实现重连机制

数据不一致

读请求可能读到旧数据。解决方案:

  • 使用 sync() 方法同步最新数据
  • 对于关键数据,从 Leader 读取

动态重配置

动态重配置是 ZooKeeper 3.5.0 引入的重要特性,允许在不停止服务的情况下修改集群配置。

为什么需要动态重配置?

在 ZooKeeper 3.5.0 之前,修改集群成员(如添加/删除节点)需要进行"滚动重启":

  1. 停止一个节点
  2. 修改其配置文件
  3. 重启该节点
  4. 对其他节点重复以上步骤

这种方式存在以下问题:

  • 操作繁琐:需要手动逐个重启节点
  • 容易出错:配置错误可能导致数据丢失或不一致
  • 服务中断:重启过程中可能出现短暂的服务不可用

动态重配置解决了这些问题,可以在运行时修改:

  • 集群成员(添加/删除服务器)
  • 服务器角色(Participant/Observer)
  • 端口号
  • 甚至仲裁系统

启用动态重配置

重要:从 ZooKeeper 3.5.3 开始,动态重配置默认禁用,需要显式启用:

# 启用动态重配置
reconfigEnabled=true

# 允许单节点模式(可选,推荐)
standaloneEnabled=false

动态配置文件

ZooKeeper 3.5.0+ 将配置分为静态配置和动态配置:

静态配置文件(zoo.cfg)

tickTime=2000
dataDir=/var/lib/zookeeper
initLimit=5
syncLimit=2

# 指向动态配置文件
dynamicConfigFile=/var/lib/zookeeper/zoo.cfg.dynamic

动态配置文件(zoo.cfg.dynamic)

server.1=125.23.63.23:2780:2783:participant;2791
server.2=125.23.63.24:2781:2784:participant;2792
server.3=125.23.63.25:2782:2785:participant;2793

查看当前配置

# 使用命令行
[zk: localhost:2181(CONNECTED)] config
server.1=localhost:2780:2783:participant;localhost:2791
server.2=localhost:2781:2784:participant;localhost:2792
server.3=localhost:2782:2785:participant;localhost:2793
version=400000003

# 只显示版本和连接字符串
[zk: localhost:2181(CONNECTED)] config -c
400000003 localhost:2791,localhost:2792,localhost:2793

增量重配置

增量模式允许添加和删除服务器:

# 添加服务器
reconfig -add server.4=125.23.63.26:2786:2787:participant;2794

# 删除服务器
reconfig -remove 3

# 同时添加和删除
reconfig -remove 3,4 -add server.5=localhost:2111:2112;2113

# 将 Participant 改为 Observer
reconfig -add server.3=localhost:2782:2785:observer;2793

批量重配置

批量模式指定完整的新配置:

# 从文件读取新配置
reconfig -file newconfig.cfg

# 直接指定新成员
reconfig -members server.1=...,server.2=...,server.3=...

Java API 使用

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.ArrayList;

public class ReconfigExample {
private ZooKeeper zk;

// 增量重配置
public void incrementalReconfig() throws Exception {
List<String> joiningServers = new ArrayList<>();
joiningServers.add("server.4=localhost:2786:2787;2794");

List<String> leavingServers = new ArrayList<>();
leavingServers.add("3");

// 参数:加入列表、离开列表、新配置列表、版本、Stat对象
byte[] newConfig = zk.reconfig(
joiningServers, // 要添加的服务器
leavingServers, // 要删除的服务器
null, // 完整新配置(批量模式用)
-1, // 版本号,-1表示不检查版本
new Stat()
);

System.out.println("新配置: " + new String(newConfig));
}

// 批量重配置
public void bulkReconfig() throws Exception {
List<String> newMembers = new ArrayList<>();
newMembers.add("server.1=localhost:2780:2783:participant;2791");
newMembers.add("server.2=localhost:2781:2784:participant;2792");
newMembers.add("server.3=localhost:2782:2785:observer;2793");

byte[] newConfig = zk.reconfig(null, null, newMembers, -1, new Stat());
System.out.println("新配置: " + new String(newConfig));
}

// 条件重配置(指定版本)
public void conditionalReconfig(long expectedVersion) throws Exception {
// 只有当前配置版本匹配时才执行重配置
List<String> joiningServers = new ArrayList<>();
joiningServers.add("server.4=localhost:2786:2787;2794");

try {
zk.reconfig(joiningServers, null, null, expectedVersion, new Stat());
} catch (BadVersionException e) {
System.out.println("配置已被其他操作修改,请重试");
}
}

// 获取当前配置
public void getCurrentConfig() throws Exception {
// 先同步确保获取最新配置
zk.sync(ZooDefs.CONFIG_NODE, null, null);

byte[] config = zk.getConfig(false, new Stat());
System.out.println("当前配置: " + new String(config));
}
}

添加新服务器的步骤

添加新服务器需要特殊处理,因为新服务器需要先连接到现有集群:

方法1:将加入者列为 Observer(推荐)

# 新服务器 D 的初始配置
# 注意:D 被列为 Observer,但不会真正成为 Observer
# 只是防止它与其他加入者组成仲裁
server.1=host1:2888:3888:participant;2181
server.2=host2:2888:3888:participant;2181
server.3=host3:2888:3888:participant;2181
server.4=host4:2888:3888:observer;2181 # 新服务器,列为 Observer

方法2:将加入者列为 Participant

# 新服务器 D 的初始配置
# 警告:不要在初始配置中列出多个加入者作为 Participant
server.1=host1:2888:3888:participant;2181
server.2=host2:2888:3888:participant;2181
server.3=host3:2888:3888:participant;2181
server.4=host4:2888:3888:participant;2181 # 只有 D 一个加入者

警告:永远不要在初始配置中将多个加入者列为 Participant,否则可能形成独立的仲裁(脑裂)。

删除服务器的注意事项

删除服务器时,被删除的服务器不会自动关闭,而是成为"非投票跟随者":

  • 优点:连接到该服务器的客户端不会被立即断开
  • 缺点:非投票跟随者对系统吞吐量有负面影响
  • 建议:删除后应尽快关闭或重新配置该服务器

删除 Leader

删除 Leader 会导致短暂的不可用(通常比 Leader 崩溃恢复更快),因为 Leader 会主动提名新 Leader。

安全配置

从 ZooKeeper 3.5.3 开始,动态重配置需要权限控制:

启用 ACL 检查

# 启用动态重配置
reconfigEnabled=true

# 启用认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

跳过 ACL 检查(仅限安全环境)

# 跳过 ACL 检查(不推荐生产环境使用)
skipACL=yes

使用权限控制

// 添加认证
zk.addAuthInfo("digest", "admin:password".getBytes());

// 执行重配置(需要有权限)
List<String> joiningServers = new ArrayList<>();
joiningServers.add("server.4=...");
zk.reconfig(joiningServers, null, null, -1, new Stat());

客户端连接迁移

当集群配置变化时,客户端需要更新连接字符串:

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class ReconfigAwareClient {
private ZooKeeper zk;
private List<String> servers;

public void connectWithReconfigSupport(List<String> serverList) {
servers = serverList;

try {
zk = new ZooKeeper(String.join(",", servers), 30000, event -> {
if (event.getType() == Watcher.Event.EventType.None) {
// 处理连接状态变化
handleConnectionChange(event);
}
});

// 设置配置变更监听
watchConfigChanges();

} catch (Exception e) {
e.printStackTrace();
}
}

private void watchConfigChanges() throws Exception {
// 获取当前配置并设置监听
zk.getConfig(true, (rc, path, ctx, data, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
// 配置变更,更新连接字符串
String config = new String(data);
updateConnectionString(config);
}
}, null);
}

private void updateConnectionString(String config) {
// 解析配置并更新服务器列表
// 重新连接时使用新的服务器列表
System.out.println("配置已变更: " + config);
}

private void handleConnectionChange(WatchedEvent event) {
switch (event.getState()) {
case SyncConnected:
System.out.println("已连接");
break;
case Disconnected:
System.out.println("连接断开,尝试重连");
break;
case Expired:
System.out.println("会话过期,需要重建会话");
break;
}
}
}

动态重配置最佳实践

  1. 启用 standaloneEnabled=false:允许单节点集群扩展
  2. 使用增量模式:更安全,适合大多数场景
  3. 监控配置版本:跟踪配置变更历史
  4. 保留旧配置:配置文件会自动备份,不要手动删除
  5. 避免高峰期操作:重配置可能影响性能
  6. 测试环境验证:在生产环境操作前先在测试环境验证

小结

本章介绍了 ZooKeeper 的核心概念和使用方法:

  1. 数据模型:树形命名空间、ZNode、节点类型(包括 Container 和 TTL)、版本控制
  2. 会话机制:会话状态、超时管理、本地会话、重连机制、chroot 支持
  3. Watch 机制:一次性触发、持久递归监听(3.6.0+)、异步通知、使用注意事项
  4. ACL 权限:权限类型、认证方案、权限设置
  5. 架构设计:Leader、Follower、Observer 角色,ZAB 协议
  6. 应用场景:配置管理、服务发现、分布式锁、队列
  7. 部署运维:集群部署、配置参数、监控指标

ZooKeeper 版本特性总结

版本新增特性
3.5.0动态重配置、本地会话(Local Session)
3.5.3容器节点(Container Nodes)、TTL 节点、reconfigEnabled 安全选项
3.6.0持久递归监听(Persistent Recursive Watches)、多地址支持、Observer Master
3.7.0改进的快照格式、更好的内存管理
3.8.0安全性增强、性能优化
3.9.0证书热重载(cert.Reload)、安全审计日志增强

最佳实践

  1. 客户端选择:生产环境推荐使用 Curator,它提供了更高级的 API 和常用配方的实现
  2. 会话超时设置:根据网络环境和业务需求合理设置,一般 10-30 秒
  3. 节点类型选择
    • 配置数据:持久节点
    • 服务注册:临时节点
    • Leader 选举:临时顺序节点或容器节点
    • 需要自动清理的数据:TTL 节点
  4. 监听策略
    • 简单监听:传统 Watch
    • 需要持续监听:持久监听(3.6.0+)
    • 监听目录树:TreeCache 或递归监听
  5. 集群规模:生产环境建议 3 或 5 个节点,奇数个
  6. 监控告警:重点关注延迟、Watch 数量、临时节点数量

ZooKeeper 是分布式系统的核心基础设施,掌握 ZooKeeper 对于构建可靠的分布式应用至关重要。在实际项目中,推荐使用 Curator 等高级客户端库简化开发。