跳到主要内容

Python 客户端开发

Python 是数据科学和微服务开发中常用的语言,使用 Kazoo 库可以方便地与 ZooKeeper 进行交互。本章介绍如何使用 Python 开发 ZooKeeper 应用。

Kazoo 库简介

Kazoo 是 Python 中最成熟的 ZooKeeper 客户端库,提供了简洁的 API 和丰富的功能。

主要特性

  • 支持连接管理和自动重连
  • 提供同步和异步两种 API 风格
  • 内置分布式锁、选举等常用配方
  • 支持 Watcher 回调和事件监听
  • 完善的错误处理和重试机制

环境准备

安装 Kazoo

# 使用 pip 安装
pip install kazoo

# 指定版本安装
pip install kazoo==2.10.0

# 验证安装
python -c "import kazoo; print(kazoo.__version__)"

安装 ZooKeeper 服务

确保 ZooKeeper 服务已启动:

# 使用 Docker 快速启动(开发环境)
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.8

# 验证服务
echo ruok | nc localhost 2181
# 应返回 imok

连接管理

创建连接

from kazoo.client import KazooClient

# 创建客户端实例
zk = KazooClient(hosts='localhost:2181')

# 启动连接
zk.start()

# 检查连接状态
print(f"连接状态: {zk.connected}") # True

# 关闭连接
zk.stop()

连接参数配置

from kazoo.client import KazooClient
from kazoo.retry import KazooRetry

# 完整的连接配置
zk = KazooClient(
hosts='localhost:2181', # ZooKeeper 地址,多个用逗号分隔
timeout=10.0, # 连接超时(秒)
client_id=None, # 会话 ID,用于恢复会话
max_retries=None, # 最大重试次数,None 表示无限
retry_delay=0.1, # 初始重试延迟(秒)
retry_backoff=2, # 重试延迟的指数退避因子
retry_jitter=0.1, # 重试延迟的随机抖动
randomize_hosts=True, # 是否随机选择主机
connection_retry=None, # 连接重试策略
command_retry=None, # 命令重试策略
auth_data=None, # 认证数据
read_only=False, # 是否允许只读模式
keyfile=None, # SSL 密钥文件
keyfile_password=None, # SSL 密钥密码
certfile=None, # SSL 证书文件
ca=None, # CA 证书文件
use_ssl=False, # 是否使用 SSL
verify_certs=True, # 是否验证证书
sasl_options=None, # SASL 选项
logger=None, # 日志器
)

# 启动连接
zk.start(timeout=15) # 等待最多 15 秒

连接状态监听

from kazoo.client import KazooClient, KazooState

def connection_listener(state):
"""连接状态变化监听器"""
if state == KazooState.CONNECTED:
print("已连接到 ZooKeeper")
elif state == KazooState.SUSPENDED:
print("连接已暂停(可能网络问题)")
elif state == KazooState.LOST:
print("连接已丢失(会话可能已过期)")

zk = KazooClient(hosts='localhost:2181')
zk.add_listener(connection_listener)
zk.start()

# 保持连接
import time
time.sleep(60)

zk.stop()

使用上下文管理器

from kazoo.client import KazooClient

# 推荐方式:使用上下文管理器自动管理连接
with KazooClient(hosts='localhost:2181') as zk:
# 在此块中,连接已建立
print(f"连接状态: {zk.connected}")

# 执行操作...

# 离开块后自动关闭连接

CRUD 操作

创建节点

from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 确保父节点存在
zk.ensure_path('/myapp')

# 创建持久节点
zk.create('/myapp/config', value=b'production')

# 创建节点(如果不存在则创建,存在则忽略)
zk.create('/myapp/config', value=b'production', makepath=True)

# 创建临时节点(会话结束自动删除)
zk.create('/myapp/temp', value=b'temp-data', ephemeral=True)

# 创建顺序节点
seq_path = zk.create('/myapp/seq-', value=b'sequential', sequence=True)
print(f"创建的顺序节点: {seq_path}") # 如 /myapp/seq-0000000001

# 创建临时顺序节点
ephemeral_seq = zk.create(
'/myapp/lock-',
value=b'lock-holder',
ephemeral=True,
sequence=True
)

# 设置 ACL
from kazoo.security import make_digest_acl, OPEN_ACL_UNSAFE
acl = make_digest_acl('user', 'password', read=True, write=True)
zk.create('/myapp/secure', value=b'secret', acl=[acl])

zk.stop()

读取节点

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 检查节点是否存在
if zk.exists('/myapp/config'):
print("节点存在")

# 获取节点数据
data, stat = zk.get('/myapp/config')
print(f"数据: {data.decode()}")
print(f"版本: {stat.version}")
print(f"创建时间: {stat.ctime}")
print(f"修改时间: {stat.mtime}")
print(f"数据长度: {stat.data_length}")
print(f"子节点数: {stat.num_children}")
# 获取子节点列表
children = zk.get_children('/myapp')
print(f"子节点: {children}")

# 获取子节点列表(带状态)
children, stat = zk.get_children('/myapp')
print(f"子节点: {children}")
print(f"子节点版本: {stat.cversion}")

# 异步获取数据
async_result = zk.get_async('/myapp/config')
data, stat = async_result.get() # 阻塞等待结果
print(f"异步获取数据: {data.decode()}")

zk.stop()

更新节点

from kazoo.client import KazooClient
from kazoo.exceptions import BadVersionError

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 获取当前版本
data, stat = zk.get('/myapp/config')

# 更新数据(指定版本,乐观锁)
try:
zk.set('/myapp/config', value=b'new-production', version=stat.version)
print("更新成功")
except BadVersionError:
print("版本冲突,数据已被其他客户端修改")

# 无条件更新(不检查版本)
zk.set('/myapp/config', value=b'unconditional-update')

# 使用 set() 返回的新状态
stat = zk.set('/myapp/config', value=b'updated-data')
print(f"新版本: {stat.version}")

zk.stop()

删除节点

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NotEmptyError, BadVersionError

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 删除节点(无子节点)
try:
zk.delete('/myapp/config')
print("删除成功")
except NoNodeError:
print("节点不存在")
except NotEmptyError:
print("节点有子节点,不能删除")

# 条件删除(指定版本)
data, stat = zk.get('/myapp/config')
zk.delete('/myapp/config', version=stat.version)

# 递归删除(包含子节点)
zk.delete('/myapp', recursive=True)

# 删除(如果存在)
zk.delete('/myapp/config', recursive=False)

zk.stop()

Watcher 机制

数据 Watcher

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

def data_watcher(event):
"""数据变更监听器"""
print(f"事件类型: {event.type}")
print(f"事件路径: {event.path}")

# 重新注册 watcher 并获取新数据
if event.type == 'CHANGED':
data, stat = zk.get(event.path, watch=data_watcher)
print(f"新数据: {data.decode()}")
elif event.type == 'DELETED':
print(f"节点 {event.path} 已删除")

# 注册数据 watcher
data, stat = zk.get('/myapp/config', watch=data_watcher)
print(f"当前数据: {data.decode()}")

# 保持运行
import time
time.sleep(60)

zk.stop()

子节点 Watcher

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

def children_watcher(event):
"""子节点变更监听器"""
print(f"事件类型: {event.type}")
print(f"事件路径: {event.path}")

# 重新注册 watcher
children = zk.get_children(event.path, watch=children_watcher)
print(f"当前子节点: {children}")

# 注册子节点 watcher
children = zk.get_children('/myapp', watch=children_watcher)
print(f"当前子节点: {children}")

import time
time.sleep(60)

zk.stop()

使用装饰器简化

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 使用 @zk.DataWatch 装饰器
@zk.DataWatch('/myapp/config')
def watch_config(data, stat):
"""数据变更时自动调用"""
if data is not None:
print(f"配置数据: {data.decode()}")
print(f"版本: {stat.version}")
else:
print("节点已删除")

# 使用 @zk.ChildrenWatch 装饰器
@zk.ChildrenWatch('/myapp')
def watch_children(children):
"""子节点变更时自动调用"""
print(f"子节点列表: {children}")

# 这两个装饰器会自动重新注册 watcher

import time
time.sleep(60)

zk.stop()

分布式锁

Kazoo 提供了开箱即用的分布式锁实现。

基本锁

from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import threading
import time

zk = KazooClient(hosts='localhost:2181')
zk.start()

def worker(worker_id):
"""工作线程"""
lock = Lock(zk, '/locks/my-resource')

print(f"Worker {worker_id} 尝试获取锁")

with lock: # 获取锁,离开块时自动释放
print(f"Worker {worker_id} 获得锁,开始工作")
time.sleep(2)
print(f"Worker {worker_id} 完成工作,释放锁")

# 启动多个工作线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

zk.stop()

带超时的锁

from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
from kazoo.exceptions import LockTimeout

zk = KazooClient(hosts='localhost:2181')
zk.start()

lock = Lock(zk, '/locks/my-resource')

try:
# 尝试获取锁,最多等待 5 秒
lock.acquire(timeout=5)
print("获得锁")

# 执行业务逻辑
# ...

finally:
lock.release()
print("释放锁")

# 或使用上下文管理器带超时
try:
with lock.acquire(timeout=5):
print("获得锁")
# 执行业务逻辑
except LockTimeout:
print("获取锁超时")

zk.stop()

读写锁

from kazoo.client import KazooClient
from kazoo.recipe.lock import ReadLock, WriteLock
import threading
import time

zk = KazooClient(hosts='localhost:2181')
zk.start()

def reader(reader_id):
"""读线程"""
lock = ReadLock(zk, '/locks/rw-resource')

with lock:
print(f"Reader {reader_id} 获得读锁")
time.sleep(1)
print(f"Reader {reader_id} 释放读锁")

def writer(writer_id):
"""写线程"""
lock = WriteLock(zk, '/locks/rw-resource')

with lock:
print(f"Writer {writer_id} 获得写锁")
time.sleep(2)
print(f"Writer {writer_id} 释放写锁")

# 读线程可以并发
for i in range(3):
t = threading.Thread(target=reader, args=(i,))
t.start()

# 写线程需要独占
time.sleep(0.5)
t = threading.Thread(target=writer, args=(1,))
t.start()

zk.stop()

信号量

from kazoo.client import KazooClient
from kazoo.recipe.lock import Semaphore

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 创建信号量,最大允许 3 个持有者
sem = Semaphore(zk, '/semaphores/my-resource', max_leases=3)

def limited_worker(worker_id):
"""受限制的工作线程"""
with sem:
print(f"Worker {worker_id} 获取信号量")
time.sleep(2)
print(f"Worker {worker_id} 释放信号量")

# 启动 5 个线程,但只有 3 个能同时获取信号量
threads = []
for i in range(5):
t = threading.Thread(target=limited_worker, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

zk.stop()

Leader 选举

Kazoo 提供了简单的 Leader 选举实现。

使用 Election

from kazoo.client import KazooClient
from kazoo.recipe.election import Election
import time
import uuid

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 每个实例使用唯一标识符
identifier = str(uuid.uuid4())[:8]
election = Election(zk, '/election/leader', identifier)

def leader_work():
"""Leader 执行的任务"""
print(f"[{identifier}] 我被选为 Leader!")

try:
while True:
print(f"[{identifier}] Leader 工作中...")
time.sleep(2)
except KeyboardInterrupt:
pass

# 参与选举(阻塞直到成为 Leader)
# 当 Leader 退出时,会自动重新选举
election.run(leader_work)

zk.stop()

使用 LeaderLatch

from kazoo.client import KazooClient
from kazoo.recipe.party import Party
import time
import threading

zk = KazooClient(hosts='localhost:2181')
zk.start()

class LeaderLatch:
"""Leader 选举实现"""

def __init__(self, zk, path, identifier):
self.zk = zk
self.path = path
self.identifier = identifier
self.lock_path = None
self.is_leader = False

def start(self):
"""开始参与选举"""
self.zk.ensure_path(self.path)

# 创建临时顺序节点
self.lock_path = self.zk.create(
f'{self.path}/lock-',
value=self.identifier.encode(),
ephemeral=True,
sequence=True
)

# 检查是否是 Leader
self._check_leader()

def _check_leader(self):
"""检查自己是否是 Leader"""
children = self.zk.get_children(self.path)

if not children:
return

# 排序获取最小的节点
children.sort()

# 获取自己的序号
my_seq = self.lock_path.split('/')[-1]

if children[0] == my_seq:
self.is_leader = True
print(f"[{self.identifier}] 成为 Leader")
else:
self.is_leader = False
# 监听前一个节点
idx = children.index(my_seq)
prev_node = f"{self.path}/{children[idx - 1]}"

if self.zk.exists(prev_node, watch=self._on_prev_deleted):
print(f"[{self.identifier}] 等待 {prev_node} 释放")

def _on_prev_deleted(self, event):
"""前一个节点删除时的回调"""
self._check_leader()

def close(self):
"""退出选举"""
if self.lock_path:
self.zk.delete(self.lock_path)
self.is_leader = False

# 使用示例
latch = LeaderLatch(zk, '/election/leader', 'participant-1')
latch.start()

try:
while True:
if latch.is_leader:
print("执行 Leader 任务...")
time.sleep(1)
except KeyboardInterrupt:
latch.close()

zk.stop()

配置中心实现

使用 ZooKeeper 实现分布式配置中心:

from kazoo.client import KazooClient
import json
import threading
import time

class ConfigCenter:
"""分布式配置中心"""

def __init__(self, hosts, config_path='/config'):
self.zk = KazooClient(hosts=hosts)
self.config_path = config_path
self.config = {}
self.lock = threading.Lock()
self.callbacks = []

def start(self):
"""启动配置中心"""
self.zk.start()
self.zk.ensure_path(self.config_path)
self._load_config()
self._watch_config()

def _load_config(self):
"""加载所有配置"""
children = self.zk.get_children(self.config_path)

with self.lock:
self.config.clear()

for child in children:
path = f"{self.config_path}/{child}"
data, _ = self.zk.get(path)
if data:
self.config[child] = json.loads(data.decode())

def _watch_config(self):
"""监听配置变更"""
@self.zk.ChildrenWatch(self.config_path)
def watch_children(children):
self._load_config()
self._notify_callbacks()

def _notify_callbacks(self):
"""通知所有回调"""
for callback in self.callbacks:
try:
callback(self.config.copy())
except Exception as e:
print(f"回调执行错误: {e}")

def get(self, key, default=None):
"""获取配置"""
with self.lock:
return self.config.get(key, default)

def set(self, key, value):
"""设置配置"""
path = f"{self.config_path}/{key}"
data = json.dumps(value).encode()

if self.zk.exists(path):
self.zk.set(path, data)
else:
self.zk.create(path, data)

def delete(self, key):
"""删除配置"""
path = f"{self.config_path}/{key}"
self.zk.delete(path)

def register_callback(self, callback):
"""注册配置变更回调"""
self.callbacks.append(callback)

def close(self):
"""关闭配置中心"""
self.zk.stop()


# 使用示例
def on_config_change(config):
print(f"配置已更新: {config}")

config_center = ConfigCenter('localhost:2181')
config_center.start()
config_center.register_callback(on_config_change)

# 设置配置
config_center.set('database', {
'host': 'localhost',
'port': 3306,
'name': 'mydb'
})

config_center.set('cache', {
'type': 'redis',
'host': 'localhost',
'port': 6379
})

# 获取配置
db_config = config_center.get('database')
print(f"数据库配置: {db_config}")

time.sleep(5)

config_center.close()

服务注册发现实现

from kazoo.client import KazooClient
import socket
import time
import random

class ServiceRegistry:
"""服务注册中心"""

def __init__(self, hosts, registry_path='/services'):
self.zk = KazooClient(hosts=hosts)
self.registry_path = registry_path
self.services = {}

def start(self):
"""启动服务注册中心"""
self.zk.start()
self.zk.ensure_path(self.registry_path)

def register(self, service_name, host, port):
"""注册服务"""
service_path = f"{self.registry_path}/{service_name}"
self.zk.ensure_path(service_path)

# 创建临时节点
address = f"{host}:{port}"
node_path = self.zk.create(
f"{service_path}/instance_",
value=address.encode(),
ephemeral=True,
sequence=True
)

print(f"服务 {service_name} 注册成功: {address}")
return node_path

def discover(self, service_name):
"""发现服务"""
service_path = f"{self.registry_path}/{service_name}"

if not self.zk.exists(service_path):
return []

children = self.zk.get_children(service_path)
addresses = []

for child in children:
data, _ = self.zk.get(f"{service_path}/{child}")
if data:
addresses.append(data.decode())

return addresses

def discover_one(self, service_name):
"""发现一个服务实例(随机选择)"""
addresses = self.discover(service_name)
if addresses:
return random.choice(addresses)
return None

def watch_service(self, service_name, callback):
"""监听服务变更"""
service_path = f"{self.registry_path}/{service_name}"

@self.zk.ChildrenWatch(service_path)
def watch_children(children):
addresses = []
for child in children:
data, _ = self.zk.get(f"{service_path}/{child}")
if data:
addresses.append(data.decode())
callback(addresses)

def close(self):
"""关闭"""
self.zk.stop()


# 使用示例
def on_service_change(addresses):
print(f"服务地址列表更新: {addresses}")

registry = ServiceRegistry('localhost:2181')
registry.start()

# 注册服务
registry.register('order-service', '192.168.1.100', 8080)
registry.register('order-service', '192.168.1.101', 8080)

# 发现服务
addresses = registry.discover('order-service')
print(f"服务地址: {addresses}")

# 监听服务变更
registry.watch_service('order-service', on_service_change)

# 随机选择一个实例
instance = registry.discover_one('order-service')
print(f"选中实例: {instance}")

time.sleep(30)

registry.close()

事务操作

Kazoo 支持原子事务操作:

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 创建事务
transaction = zk.transaction()

# 添加操作
transaction.create('/tx/node1', b'data1')
transaction.create('/tx/node2', b'data2')
transaction.set_data('/tx/node1', b'updated1') # 注意:这里使用 set_data
transaction.check('/tx/node1', version=0) # 检查版本

# 提交事务(原子执行)
try:
results = transaction.commit()
print("事务执行成功")
print(f"结果: {results}")
except Exception as e:
print(f"事务执行失败: {e}")

zk.stop()

错误处理

from kazoo.client import KazooClient
from kazoo.exceptions import (
NoNodeError, # 节点不存在
NodeExistsError, # 节点已存在
BadVersionError, # 版本冲突
NoAuthError, # 认证失败
SessionExpiredError, # 会话过期
ConnectionClosedError, # 连接关闭
)

zk = KazooClient(hosts='localhost:2181')
zk.start()

try:
data, stat = zk.get('/nonexistent')
except NoNodeError:
print("节点不存在")
except NoAuthError:
print("没有权限")
except SessionExpiredError:
print("会话已过期,需要重新连接")
except ConnectionClosedError:
print("连接已关闭")

# 处理节点已存在错误
try:
zk.create('/myapp/config', b'data')
except NodeExistsError:
print("节点已存在,更新数据")
zk.set('/myapp/config', b'data')

# 处理版本冲突
try:
zk.set('/myapp/config', b'new-data', version=0)
except BadVersionError:
print("版本冲突,获取最新版本后重试")
data, stat = zk.get('/myapp/config')
zk.set('/myapp/config', b'new-data', version=stat.version)

zk.stop()

最佳实践

1. 使用上下文管理器

# 推荐:自动管理连接
with KazooClient(hosts='localhost:2181') as zk:
data, stat = zk.get('/config')
# 离开块自动关闭连接

2. 设置合理的超时

zk = KazooClient(
hosts='localhost:2181',
timeout=10.0, # 连接超时
command_retry={
'max_tries': 3,
'delay': 0.1,
'backoff': 2,
'max_delay': 10,
}
)

3. 处理连接状态变化

from kazoo.client import KazooState

def state_listener(state):
if state == KazooState.LOST:
# 会话丢失,需要重新初始化
print("会话丢失")
elif state == KazooState.SUSPENDED:
# 连接暂停,可能网络问题
print("连接暂停")
elif state == KazooState.CONNECTED:
# 重新连接成功
print("已连接")

zk.add_listener(state_listener)

4. 使用装饰器简化 Watcher

# 推荐:使用装饰器自动重新注册
@zk.DataWatch('/config')
def watch_config(data, stat):
if data:
process_config(data)

# 而不是手动管理
def manual_watch(event):
data, stat = zk.get(event.path, watch=manual_watch)
process_config(data)

小结

本章介绍了使用 Python Kazoo 库开发 ZooKeeper 应用的核心内容:

  1. 连接管理:创建连接、配置参数、状态监听
  2. CRUD 操作:创建、读取、更新、删除节点
  3. Watcher 机制:数据监听、子节点监听、装饰器简化
  4. 分布式锁:基本锁、读写锁、信号量
  5. Leader 选举:Election 和 LeaderLatch 实现
  6. 配方实现:配置中心、服务注册发现
  7. 事务操作:原子性多操作执行
  8. 错误处理:常见异常和处理方式

Kazoo 是 Python 生态中最成熟的 ZooKeeper 客户端,提供了简洁易用的 API,适合快速开发分布式应用。

参考资料