Python 客户端开发
Python 是数据科学和微服务开发中常用的语言,使用 Kazoo 库可以方便地与 ZooKeeper 进行交互。本章介绍如何使用 Python 开发 ZooKeeper 应用。
Kazoo 库简介
Kazoo 是 Python 中最成熟的 ZooKeeper 客户端库,提供了简洁的 API 和丰富的功能。
主要特性:
- 支持连接管理和自动重连
- 提供同步和异步两种 API 风格
- 内置分布式锁、选举等常用配方
- 支持 Watcher 回调和事件监听
- 完善的错误处理和重试机制
环境准备
安装 Kazoo
# 使用 pip 安装
pip install kazoo
# 指定版本安装
pip install kazoo==2.10.0
# 验证安装
python -c "import kazoo; print(kazoo.__version__)"
安装 ZooKeeper 服务
确保 ZooKeeper 服务已启动:
# 使用 Docker 快速启动(开发环境)
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.8
# 验证服务
echo ruok | nc localhost 2181
# 应返回 imok
连接管理
创建连接
from kazoo.client import KazooClient
# 创建客户端实例
zk = KazooClient(hosts='localhost:2181')
# 启动连接
zk.start()
# 检查连接状态
print(f"连接状态: {zk.connected}") # True
# 关闭连接
zk.stop()
连接参数配置
from kazoo.client import KazooClient
from kazoo.retry import KazooRetry
# 完整的连接配置
zk = KazooClient(
hosts='localhost:2181', # ZooKeeper 地址,多个用逗号分隔
timeout=10.0, # 连接超时(秒)
client_id=None, # 会话 ID,用于恢复会话
max_retries=None, # 最大重试次数,None 表示无限
retry_delay=0.1, # 初始重试延迟(秒)
retry_backoff=2, # 重试延迟的指数退避因子
retry_jitter=0.1, # 重试延迟的随机抖动
randomize_hosts=True, # 是否随机选择主机
connection_retry=None, # 连接重试策略
command_retry=None, # 命令重试策略
auth_data=None, # 认证数据
read_only=False, # 是否允许只读模式
keyfile=None, # SSL 密钥文件
keyfile_password=None, # SSL 密钥密码
certfile=None, # SSL 证书文件
ca=None, # CA 证书文件
use_ssl=False, # 是否使用 SSL
verify_certs=True, # 是否验证证书
sasl_options=None, # SASL 选项
logger=None, # 日志器
)
# 启动连接
zk.start(timeout=15) # 等待最多 15 秒
连接状态监听
from kazoo.client import KazooClient, KazooState
def connection_listener(state):
"""连接状态变化监听器"""
if state == KazooState.CONNECTED:
print("已连接到 ZooKeeper")
elif state == KazooState.SUSPENDED:
print("连接已暂停(可能网络问题)")
elif state == KazooState.LOST:
print("连接已丢失(会话可能已过期)")
zk = KazooClient(hosts='localhost:2181')
zk.add_listener(connection_listener)
zk.start()
# 保持连接
import time
time.sleep(60)
zk.stop()
使用上下文管理器
from kazoo.client import KazooClient
# 推荐方式:使用上下文管理器自动管理连接
with KazooClient(hosts='localhost:2181') as zk:
# 在此块中,连接已建立
print(f"连接状态: {zk.connected}")
# 执行操作...
# 离开块后自动关闭连接
CRUD 操作
创建节点
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 确保父节点存在
zk.ensure_path('/myapp')
# 创建持久节点
zk.create('/myapp/config', value=b'production')
# 创建节点(如果不存在则创建,存在则忽略)
zk.create('/myapp/config', value=b'production', makepath=True)
# 创建临时节点(会话结束自动删除)
zk.create('/myapp/temp', value=b'temp-data', ephemeral=True)
# 创建顺序节点
seq_path = zk.create('/myapp/seq-', value=b'sequential', sequence=True)
print(f"创建的顺序节点: {seq_path}") # 如 /myapp/seq-0000000001
# 创建临时顺序节点
ephemeral_seq = zk.create(
'/myapp/lock-',
value=b'lock-holder',
ephemeral=True,
sequence=True
)
# 设置 ACL
from kazoo.security import make_digest_acl, OPEN_ACL_UNSAFE
acl = make_digest_acl('user', 'password', read=True, write=True)
zk.create('/myapp/secure', value=b'secret', acl=[acl])
zk.stop()
读取节点
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 检查节点是否存在
if zk.exists('/myapp/config'):
print("节点存在")
# 获取节点数据
data, stat = zk.get('/myapp/config')
print(f"数据: {data.decode()}")
print(f"版本: {stat.version}")
print(f"创建时间: {stat.ctime}")
print(f"修改时间: {stat.mtime}")
print(f"数据长度: {stat.data_length}")
print(f"子节点数: {stat.num_children}")
# 获取子节点列表
children = zk.get_children('/myapp')
print(f"子节点: {children}")
# 获取子节点列表(带状态)
children, stat = zk.get_children('/myapp')
print(f"子节点: {children}")
print(f"子节点版本: {stat.cversion}")
# 异步获取数据
async_result = zk.get_async('/myapp/config')
data, stat = async_result.get() # 阻塞等待结果
print(f"异步获取数据: {data.decode()}")
zk.stop()
更新节点
from kazoo.client import KazooClient
from kazoo.exceptions import BadVersionError
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 获取当前版本
data, stat = zk.get('/myapp/config')
# 更新数据(指定版本,乐观锁)
try:
zk.set('/myapp/config', value=b'new-production', version=stat.version)
print("更新成功")
except BadVersionError:
print("版本冲突,数据已被其他客户端修改")
# 无条件更新(不检查版本)
zk.set('/myapp/config', value=b'unconditional-update')
# 使用 set() 返回的新状态
stat = zk.set('/myapp/config', value=b'updated-data')
print(f"新版本: {stat.version}")
zk.stop()
删除节点
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NotEmptyError, BadVersionError
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 删除节点(无子节点)
try:
zk.delete('/myapp/config')
print("删除成功")
except NoNodeError:
print("节点不存在")
except NotEmptyError:
print("节点有子节点,不能删除")
# 条件删除(指定版本)
data, stat = zk.get('/myapp/config')
zk.delete('/myapp/config', version=stat.version)
# 递归删除(包含子节点)
zk.delete('/myapp', recursive=True)
# 删除(如果存在)
zk.delete('/myapp/config', recursive=False)
zk.stop()
Watcher 机制
数据 Watcher
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
def data_watcher(event):
"""数据变更监听器"""
print(f"事件类型: {event.type}")
print(f"事件路径: {event.path}")
# 重新注册 watcher 并获取新数据
if event.type == 'CHANGED':
data, stat = zk.get(event.path, watch=data_watcher)
print(f"新数据: {data.decode()}")
elif event.type == 'DELETED':
print(f"节点 {event.path} 已删除")
# 注册数据 watcher
data, stat = zk.get('/myapp/config', watch=data_watcher)
print(f"当前数据: {data.decode()}")
# 保持运行
import time
time.sleep(60)
zk.stop()
子节点 Watcher
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
def children_watcher(event):
"""子节点变更监听器"""
print(f"事件类型: {event.type}")
print(f"事件路径: {event.path}")
# 重新注册 watcher
children = zk.get_children(event.path, watch=children_watcher)
print(f"当前子节点: {children}")
# 注册子节点 watcher
children = zk.get_children('/myapp', watch=children_watcher)
print(f"当前子节点: {children}")
import time
time.sleep(60)
zk.stop()
使用装饰器简化
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 使用 @zk.DataWatch 装饰器
@zk.DataWatch('/myapp/config')
def watch_config(data, stat):
"""数据变更时自动调用"""
if data is not None:
print(f"配置数据: {data.decode()}")
print(f"版本: {stat.version}")
else:
print("节点已删除")
# 使用 @zk.ChildrenWatch 装饰器
@zk.ChildrenWatch('/myapp')
def watch_children(children):
"""子节点变更时自动调用"""
print(f"子节点列表: {children}")
# 这两个装饰器会自动重新注册 watcher
import time
time.sleep(60)
zk.stop()
分布式锁
Kazoo 提供了开箱即用的分布式锁实现。
基本锁
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import threading
import time
zk = KazooClient(hosts='localhost:2181')
zk.start()
def worker(worker_id):
"""工作线程"""
lock = Lock(zk, '/locks/my-resource')
print(f"Worker {worker_id} 尝试获取锁")
with lock: # 获取锁,离开块时自动释放
print(f"Worker {worker_id} 获得锁,开始工作")
time.sleep(2)
print(f"Worker {worker_id} 完成工作,释放锁")
# 启动多个工作线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
zk.stop()
带超时的锁
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
from kazoo.exceptions import LockTimeout
zk = KazooClient(hosts='localhost:2181')
zk.start()
lock = Lock(zk, '/locks/my-resource')
try:
# 尝试获取锁,最多等待 5 秒
lock.acquire(timeout=5)
print("获得锁")
# 执行业务逻辑
# ...
finally:
lock.release()
print("释放锁")
# 或使用上下文管理器带超时
try:
with lock.acquire(timeout=5):
print("获得锁")
# 执行业务逻辑
except LockTimeout:
print("获取锁超时")
zk.stop()
读写锁
from kazoo.client import KazooClient
from kazoo.recipe.lock import ReadLock, WriteLock
import threading
import time
zk = KazooClient(hosts='localhost:2181')
zk.start()
def reader(reader_id):
"""读线程"""
lock = ReadLock(zk, '/locks/rw-resource')
with lock:
print(f"Reader {reader_id} 获得读锁")
time.sleep(1)
print(f"Reader {reader_id} 释放读锁")
def writer(writer_id):
"""写线程"""
lock = WriteLock(zk, '/locks/rw-resource')
with lock:
print(f"Writer {writer_id} 获得写锁")
time.sleep(2)
print(f"Writer {writer_id} 释放写锁")
# 读线程可以并发
for i in range(3):
t = threading.Thread(target=reader, args=(i,))
t.start()
# 写线程需要独占
time.sleep(0.5)
t = threading.Thread(target=writer, args=(1,))
t.start()
zk.stop()
信号量
from kazoo.client import KazooClient
from kazoo.recipe.lock import Semaphore
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 创建信号量,最大允许 3 个持有者
sem = Semaphore(zk, '/semaphores/my-resource', max_leases=3)
def limited_worker(worker_id):
"""受限制的工作线程"""
with sem:
print(f"Worker {worker_id} 获取信号量")
time.sleep(2)
print(f"Worker {worker_id} 释放信号量")
# 启动 5 个线程,但只有 3 个能同时获取信号量
threads = []
for i in range(5):
t = threading.Thread(target=limited_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
zk.stop()
Leader 选举
Kazoo 提供了简单的 Leader 选举实现。
使用 Election
from kazoo.client import KazooClient
from kazoo.recipe.election import Election
import time
import uuid
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 每个实例使用唯一标识符
identifier = str(uuid.uuid4())[:8]
election = Election(zk, '/election/leader', identifier)
def leader_work():
"""Leader 执行的任务"""
print(f"[{identifier}] 我被选为 Leader!")
try:
while True:
print(f"[{identifier}] Leader 工作中...")
time.sleep(2)
except KeyboardInterrupt:
pass
# 参与选举(阻塞直到成为 Leader)
# 当 Leader 退出时,会自动重新选举
election.run(leader_work)
zk.stop()
使用 LeaderLatch
from kazoo.client import KazooClient
from kazoo.recipe.party import Party
import time
import threading
zk = KazooClient(hosts='localhost:2181')
zk.start()
class LeaderLatch:
"""Leader 选举实现"""
def __init__(self, zk, path, identifier):
self.zk = zk
self.path = path
self.identifier = identifier
self.lock_path = None
self.is_leader = False
def start(self):
"""开始参与选举"""
self.zk.ensure_path(self.path)
# 创建临时顺序节点
self.lock_path = self.zk.create(
f'{self.path}/lock-',
value=self.identifier.encode(),
ephemeral=True,
sequence=True
)
# 检查是否是 Leader
self._check_leader()
def _check_leader(self):
"""检查自己是否是 Leader"""
children = self.zk.get_children(self.path)
if not children:
return
# 排序获取最小的节点
children.sort()
# 获取自己的序号
my_seq = self.lock_path.split('/')[-1]
if children[0] == my_seq:
self.is_leader = True
print(f"[{self.identifier}] 成为 Leader")
else:
self.is_leader = False
# 监听前一个节点
idx = children.index(my_seq)
prev_node = f"{self.path}/{children[idx - 1]}"
if self.zk.exists(prev_node, watch=self._on_prev_deleted):
print(f"[{self.identifier}] 等待 {prev_node} 释放")
def _on_prev_deleted(self, event):
"""前一个节点删除时的回调"""
self._check_leader()
def close(self):
"""退出选举"""
if self.lock_path:
self.zk.delete(self.lock_path)
self.is_leader = False
# 使用示例
latch = LeaderLatch(zk, '/election/leader', 'participant-1')
latch.start()
try:
while True:
if latch.is_leader:
print("执行 Leader 任务...")
time.sleep(1)
except KeyboardInterrupt:
latch.close()
zk.stop()
配置中心实现
使用 ZooKeeper 实现分布式配置中心:
from kazoo.client import KazooClient
import json
import threading
import time
class ConfigCenter:
"""分布式配置中心"""
def __init__(self, hosts, config_path='/config'):
self.zk = KazooClient(hosts=hosts)
self.config_path = config_path
self.config = {}
self.lock = threading.Lock()
self.callbacks = []
def start(self):
"""启动配置中心"""
self.zk.start()
self.zk.ensure_path(self.config_path)
self._load_config()
self._watch_config()
def _load_config(self):
"""加载所有配置"""
children = self.zk.get_children(self.config_path)
with self.lock:
self.config.clear()
for child in children:
path = f"{self.config_path}/{child}"
data, _ = self.zk.get(path)
if data:
self.config[child] = json.loads(data.decode())
def _watch_config(self):
"""监听配置变更"""
@self.zk.ChildrenWatch(self.config_path)
def watch_children(children):
self._load_config()
self._notify_callbacks()
def _notify_callbacks(self):
"""通知所有回调"""
for callback in self.callbacks:
try:
callback(self.config.copy())
except Exception as e:
print(f"回调执行错误: {e}")
def get(self, key, default=None):
"""获取配置"""
with self.lock:
return self.config.get(key, default)
def set(self, key, value):
"""设置配置"""
path = f"{self.config_path}/{key}"
data = json.dumps(value).encode()
if self.zk.exists(path):
self.zk.set(path, data)
else:
self.zk.create(path, data)
def delete(self, key):
"""删除配置"""
path = f"{self.config_path}/{key}"
self.zk.delete(path)
def register_callback(self, callback):
"""注册配置变更回调"""
self.callbacks.append(callback)
def close(self):
"""关闭配置中心"""
self.zk.stop()
# 使用示例
def on_config_change(config):
print(f"配置已更新: {config}")
config_center = ConfigCenter('localhost:2181')
config_center.start()
config_center.register_callback(on_config_change)
# 设置配置
config_center.set('database', {
'host': 'localhost',
'port': 3306,
'name': 'mydb'
})
config_center.set('cache', {
'type': 'redis',
'host': 'localhost',
'port': 6379
})
# 获取配置
db_config = config_center.get('database')
print(f"数据库配置: {db_config}")
time.sleep(5)
config_center.close()
服务注册发现实现
from kazoo.client import KazooClient
import socket
import time
import random
class ServiceRegistry:
"""服务注册中心"""
def __init__(self, hosts, registry_path='/services'):
self.zk = KazooClient(hosts=hosts)
self.registry_path = registry_path
self.services = {}
def start(self):
"""启动服务注册中心"""
self.zk.start()
self.zk.ensure_path(self.registry_path)
def register(self, service_name, host, port):
"""注册服务"""
service_path = f"{self.registry_path}/{service_name}"
self.zk.ensure_path(service_path)
# 创建临时节点
address = f"{host}:{port}"
node_path = self.zk.create(
f"{service_path}/instance_",
value=address.encode(),
ephemeral=True,
sequence=True
)
print(f"服务 {service_name} 注册成功: {address}")
return node_path
def discover(self, service_name):
"""发现服务"""
service_path = f"{self.registry_path}/{service_name}"
if not self.zk.exists(service_path):
return []
children = self.zk.get_children(service_path)
addresses = []
for child in children:
data, _ = self.zk.get(f"{service_path}/{child}")
if data:
addresses.append(data.decode())
return addresses
def discover_one(self, service_name):
"""发现一个服务实例(随机选择)"""
addresses = self.discover(service_name)
if addresses:
return random.choice(addresses)
return None
def watch_service(self, service_name, callback):
"""监听服务变更"""
service_path = f"{self.registry_path}/{service_name}"
@self.zk.ChildrenWatch(service_path)
def watch_children(children):
addresses = []
for child in children:
data, _ = self.zk.get(f"{service_path}/{child}")
if data:
addresses.append(data.decode())
callback(addresses)
def close(self):
"""关闭"""
self.zk.stop()
# 使用示例
def on_service_change(addresses):
print(f"服务地址列表更新: {addresses}")
registry = ServiceRegistry('localhost:2181')
registry.start()
# 注册服务
registry.register('order-service', '192.168.1.100', 8080)
registry.register('order-service', '192.168.1.101', 8080)
# 发现服务
addresses = registry.discover('order-service')
print(f"服务地址: {addresses}")
# 监听服务变更
registry.watch_service('order-service', on_service_change)
# 随机选择一个实例
instance = registry.discover_one('order-service')
print(f"选中实例: {instance}")
time.sleep(30)
registry.close()
事务操作
Kazoo 支持原子事务操作:
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 创建事务
transaction = zk.transaction()
# 添加操作
transaction.create('/tx/node1', b'data1')
transaction.create('/tx/node2', b'data2')
transaction.set_data('/tx/node1', b'updated1') # 注意:这里使用 set_data
transaction.check('/tx/node1', version=0) # 检查版本
# 提交事务(原子执行)
try:
results = transaction.commit()
print("事务执行成功")
print(f"结果: {results}")
except Exception as e:
print(f"事务执行失败: {e}")
zk.stop()
错误处理
from kazoo.client import KazooClient
from kazoo.exceptions import (
NoNodeError, # 节点不存在
NodeExistsError, # 节点已存在
BadVersionError, # 版本冲突
NoAuthError, # 认证失败
SessionExpiredError, # 会话过期
ConnectionClosedError, # 连接关闭
)
zk = KazooClient(hosts='localhost:2181')
zk.start()
try:
data, stat = zk.get('/nonexistent')
except NoNodeError:
print("节点不存在")
except NoAuthError:
print("没有权限")
except SessionExpiredError:
print("会话已过期,需要重新连接")
except ConnectionClosedError:
print("连接已关闭")
# 处理节点已存在错误
try:
zk.create('/myapp/config', b'data')
except NodeExistsError:
print("节点已存在,更新数据")
zk.set('/myapp/config', b'data')
# 处理版本冲突
try:
zk.set('/myapp/config', b'new-data', version=0)
except BadVersionError:
print("版本冲突,获取最新版本后重试")
data, stat = zk.get('/myapp/config')
zk.set('/myapp/config', b'new-data', version=stat.version)
zk.stop()
最佳实践
1. 使用上下文管理器
# 推荐:自动管理连接
with KazooClient(hosts='localhost:2181') as zk:
data, stat = zk.get('/config')
# 离开块自动关闭连接
2. 设置合理的超时
zk = KazooClient(
hosts='localhost:2181',
timeout=10.0, # 连接超时
command_retry={
'max_tries': 3,
'delay': 0.1,
'backoff': 2,
'max_delay': 10,
}
)
3. 处理连接状态变化
from kazoo.client import KazooState
def state_listener(state):
if state == KazooState.LOST:
# 会话丢失,需要重新初始化
print("会话丢失")
elif state == KazooState.SUSPENDED:
# 连接暂停,可能网络问题
print("连接暂停")
elif state == KazooState.CONNECTED:
# 重新连接成功
print("已连接")
zk.add_listener(state_listener)
4. 使用装饰器简化 Watcher
# 推荐:使用装饰器自动重新注册
@zk.DataWatch('/config')
def watch_config(data, stat):
if data:
process_config(data)
# 而不是手动管理
def manual_watch(event):
data, stat = zk.get(event.path, watch=manual_watch)
process_config(data)
小结
本章介绍了使用 Python Kazoo 库开发 ZooKeeper 应用的核心内容:
- 连接管理:创建连接、配置参数、状态监听
- CRUD 操作:创建、读取、更新、删除节点
- Watcher 机制:数据监听、子节点监听、装饰器简化
- 分布式锁:基本锁、读写锁、信号量
- Leader 选举:Election 和 LeaderLatch 实现
- 配方实现:配置中心、服务注册发现
- 事务操作:原子性多操作执行
- 错误处理:常见异常和处理方式
Kazoo 是 Python 生态中最成熟的 ZooKeeper 客户端,提供了简洁易用的 API,适合快速开发分布式应用。