典型应用场景
本章介绍 ZooKeeper 的典型应用场景,包括分布式锁、Leader 选举、配置中心、服务注册发现等。
分布式锁
分布式锁是 ZooKeeper 最常见的应用场景之一。利用临时顺序节点可以实现公平的分布式锁。
实现原理
┌─────────────────────────────────────────────────────────────┐
│ 分布式锁实现原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ /locks/resource │
│ ├── lock-0000000001 (客户端A - 序号最小,获得锁) │
│ ├── lock-0000000002 (客户端B - 监听前一个节点) │
│ └── lock-0000000003 (客户端C - 监听前一个节点) │
│ │
│ 获取锁流程: │
│ 1. 在锁节点下创建临时顺序节点 │
│ 2. 获取所有子节点,判断自己是否序号最小 │
│ 3. 如果最小,获得锁 │
│ 4. 如果不是最小,监听前一个节点 │
│ 5. 前一个节点删除后,收到通知,获得锁 │
│ │
│ 释放锁流程: │
│ 1. 删除自己创建的节点 │
│ 2. 下一个监听的客户端收到通知,获得锁 │
│ │
└─────────────────────────────────────────────────────────────┘
原生实现
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
public class DistributedLock implements Watcher {
private ZooKeeper zk;
private String lockPath;
private String currentLock;
private String waitLock;
public DistributedLock(ZooKeeper zk, String lockPath) {
this.zk = zk;
this.lockPath = lockPath;
}
public void lock() throws Exception {
if (zk.exists(lockPath, false) == null) {
zk.create(lockPath, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
currentLock = zk.create(lockPath + "/lock-",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
String currentNode = currentLock.substring(lockPath.length() + 1);
int currentIndex = children.indexOf(currentNode);
if (currentIndex == 0) {
return;
}
waitLock = lockPath + "/" + children.get(currentIndex - 1);
Stat stat = zk.exists(waitLock, true);
if (stat != null) {
synchronized (this) {
wait();
}
}
}
}
public void unlock() throws Exception {
zk.delete(currentLock, -1);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notify();
}
}
}
}
Curator 实现
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/locks/resource");
try {
System.out.println("尝试获取锁...");
lock.acquire();
System.out.println("获取锁成功,执行业务逻辑");
Thread.sleep(5000);
} finally {
lock.release();
System.out.println("释放锁");
}
client.close();
}
}
可重入锁
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
public class ReentrantLockExample {
private static InterProcessMutex lock;
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
lock = new InterProcessMutex(client, "/locks/reentrant");
methodA();
client.close();
}
public static void methodA() throws Exception {
lock.acquire();
try {
System.out.println("methodA 获取锁");
methodB();
} finally {
lock.release();
System.out.println("methodA 释放锁");
}
}
public static void methodB() throws Exception {
lock.acquire();
try {
System.out.println("methodB 获取锁(可重入)");
} finally {
lock.release();
System.out.println("methodB 释放锁");
}
}
}
读写锁
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
public class ReadWriteLockExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(
client, "/locks/rw-resource");
Thread readThread1 = new Thread(() -> {
try {
rwLock.readLock().acquire();
System.out.println("读线程1 获取读锁");
Thread.sleep(3000);
rwLock.readLock().release();
System.out.println("读线程1 释放读锁");
} catch (Exception e) {
e.printStackTrace();
}
});
Thread readThread2 = new Thread(() -> {
try {
rwLock.readLock().acquire();
System.out.println("读线程2 获取读锁");
Thread.sleep(3000);
rwLock.readLock().release();
System.out.println("读线程2 释放读锁");
} catch (Exception e) {
e.printStackTrace();
}
});
Thread writeThread = new Thread(() -> {
try {
rwLock.writeLock().acquire();
System.out.println("写线程 获取写锁");
Thread.sleep(2000);
rwLock.writeLock().release();
System.out.println("写线程 释放写锁");
} catch (Exception e) {
e.printStackTrace();
}
});
readThread1.start();
readThread2.start();
writeThread.start();
readThread1.join();
readThread2.join();
writeThread.join();
client.close();
}
}
Leader 选举
Leader 选举用于在分布式系统中选举一个主节点来协调工作。
实现原理
┌─────────────────────────────────────────────────────────────┐
│ Leader 选举原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ /election │
│ ├── node-0000000001 (客户端A - 序号最小,成为Leader) │
│ ├── node-0000000002 (客户端B - 监听前一个节点) │
│ └── node-0000000003 (客户端C - 监听前一个节点) │
│ │
│ 选举流程: │
│ 1. 所有参与者创建临时顺序节点 │
│ 2. 序号最小的节点成为 Leader │
│ 3. 其他节点监听前一个节点 │
│ 4. Leader 故障后,下一个节点成为新 Leader │
│ │
└─────────────────────────────────────────────────────────────┘
Curator 实现
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class LeaderElectionExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
LeaderLatch leaderLatch = new LeaderLatch(client, "/election", "participant-1");
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("成为 Leader,开始执行主节点任务");
}
@Override
public void notLeader() {
System.out.println("不再是 Leader,停止主节点任务");
}
});
leaderLatch.start();
leaderLatch.await();
System.out.println("当前是否为 Leader: " + leaderLatch.hasLeadership());
Thread.sleep(10000);
leaderLatch.close();
client.close();
}
}
LeaderSelector 实现
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
public class LeaderSelectorExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(
client,
"/election",
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("成为 Leader");
try {
Thread.sleep(5000);
} finally {
System.out.println("释放 Leader 权限");
}
}
}
);
leaderSelector.autoRequeue();
leaderSelector.start();
Thread.sleep(30000);
leaderSelector.close();
client.close();
}
}
配置中心
使用 ZooKeeper 实现分布式配置中心,支持配置的集中管理和动态更新。
实现代码
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.Properties;
public class ConfigCenter {
private CuratorFramework client;
private NodeCache configCache;
private Properties config;
public ConfigCenter(String connectString) throws Exception {
client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
config = new Properties();
}
public void init(String configPath) throws Exception {
if (client.checkExists().forPath(configPath) == null) {
client.create().creatingParentsIfNeeded().forPath(configPath);
}
configCache = new NodeCache(client, configPath);
configCache.start();
loadConfig();
configCache.getListenable().addListener(() -> {
loadConfig();
System.out.println("配置已更新");
});
}
private void loadConfig() throws Exception {
byte[] data = configCache.getCurrentData().getData();
if (data != null && data.length > 0) {
String configStr = new String(data);
config.clear();
for (String line : configStr.split("\n")) {
String[] parts = line.split("=", 2);
if (parts.length == 2) {
config.setProperty(parts[0].trim(), parts[1].trim());
}
}
}
}
public String get(String key) {
return config.getProperty(key);
}
public void set(String key, String value) throws Exception {
config.setProperty(key, value);
saveConfig();
}
private void saveConfig() throws Exception {
StringBuilder sb = new StringBuilder();
for (String key : config.stringPropertyNames()) {
sb.append(key).append("=").append(config.getProperty(key)).append("\n");
}
client.setData().forPath(configCache.getCurrentData().getPath(), sb.toString().getBytes());
}
public void close() throws Exception {
if (configCache != null) {
configCache.close();
}
if (client != null) {
client.close();
}
}
}
使用示例
public class ConfigCenterExample {
public static void main(String[] args) throws Exception {
ConfigCenter configCenter = new ConfigCenter("localhost:2181");
configCenter.init("/config/app");
configCenter.set("db.url", "jdbc:mysql://localhost:3306/mydb");
configCenter.set("db.user", "root");
configCenter.set("db.timeout", "30000");
System.out.println("db.url: " + configCenter.get("db.url"));
System.out.println("db.user: " + configCenter.get("db.user"));
Thread.sleep(60000);
configCenter.close();
}
}
服务注册与发现
使用 ZooKeeper 实现服务注册与发现机制。
服务注册
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class ServiceRegistry {
private CuratorFramework client;
private String registryPath;
public ServiceRegistry(String connectString, String registryPath) {
this.registryPath = registryPath;
this.client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
}
public void start() throws Exception {
client.start();
if (client.checkExists().forPath(registryPath) == null) {
client.create().creatingParentsIfNeeded().forPath(registryPath);
}
}
public void register(String serviceName, String serviceAddress) throws Exception {
String servicePath = registryPath + "/" + serviceName;
if (client.checkExists().forPath(servicePath) == null) {
client.create().creatingParentsIfNeeded().forPath(servicePath);
}
String addressPath = servicePath + "/address-";
String addressNode = client.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(addressPath, serviceAddress.getBytes());
System.out.println("服务注册成功: " + addressNode);
}
public void close() {
client.close();
}
}
服务发现
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class ServiceDiscovery {
private CuratorFramework client;
private String registryPath;
private PathChildrenCache cache;
private List<String> serviceAddresses = new ArrayList<>();
public ServiceDiscovery(String connectString, String registryPath) {
this.registryPath = registryPath;
this.client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
}
public void start() throws Exception {
client.start();
}
public void subscribe(String serviceName) throws Exception {
String servicePath = registryPath + "/" + serviceName;
cache = new PathChildrenCache(client, servicePath, true);
cache.start();
cache.getListenable().addListener((client, event) -> {
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
String address = new String(event.getData().getData());
serviceAddresses.add(address);
System.out.println("服务上线: " + address);
} else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
String address = new String(event.getData().getData());
serviceAddresses.remove(address);
System.out.println("服务下线: " + address);
}
});
}
public String discover(String serviceName) {
if (serviceAddresses.isEmpty()) {
return null;
}
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
public void close() throws Exception {
if (cache != null) {
cache.close();
}
client.close();
}
}
使用示例
public class ServiceRegistryExample {
public static void main(String[] args) throws Exception {
ServiceRegistry registry = new ServiceRegistry("localhost:2181", "/registry");
registry.start();
registry.register("order-service", "192.168.1.100:8080");
registry.register("order-service", "192.168.1.101:8080");
registry.register("user-service", "192.168.1.102:8080");
Thread.sleep(60000);
registry.close();
}
}
public class ServiceDiscoveryExample {
public static void main(String[] args) throws Exception {
ServiceDiscovery discovery = new ServiceDiscovery("localhost:2181", "/registry");
discovery.start();
discovery.subscribe("order-service");
for (int i = 0; i < 5; i++) {
String address = discovery.discover("order-service");
System.out.println("发现服务地址: " + address);
Thread.sleep(2000);
}
discovery.close();
}
}
分布式屏障
屏障用于协调多个任务的执行,只有当所有参与者都到达屏障点时,才能继续执行。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class BarrierExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
DistributedBarrier barrier = new DistributedBarrier(client, "/barrier/test");
for (int i = 0; i < 3; i++) {
final int index = i;
new Thread(() -> {
try {
System.out.println("线程 " + index + " 到达屏障,等待其他线程");
barrier.waitOnBarrier();
System.out.println("线程 " + index + " 通过屏障");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(5000);
System.out.println("所有线程就绪,释放屏障");
barrier.removeBarrier();
Thread.sleep(3000);
client.close();
}
}
分布式计数器
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
public class CounterExample {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
SharedCount counter = new SharedCount(client, "/counters/mycounter", 0);
counter.start();
counter.addListener(new SharedCountListener() {
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("计数器变更: " + newCount);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
});
for (int i = 0; i < 10; i++) {
boolean success = counter.trySetCount(counter.getCount() + 1);
if (success) {
System.out.println("设置成功: " + counter.getCount());
} else {
System.out.println("设置失败,版本冲突");
}
Thread.sleep(500);
}
counter.close();
client.close();
}
}
小结
本章学习了 ZooKeeper 的典型应用场景:
- 分布式锁:公平锁、可重入锁、读写锁
- Leader 选举:LeaderLatch 和 LeaderSelector
- 配置中心:集中管理和动态更新配置
- 服务注册发现:服务自动注册和发现
- 分布式屏障:协调多任务执行
- 分布式计数器:原子计数操作
下一章我们将学习 ZooKeeper 集群的部署与管理。