跳到主要内容

Go 客户端开发

Go 语言在分布式系统和微服务领域广泛应用,使用 go-zookeeper 库可以高效地与 ZooKeeper 进行交互。本章介绍如何使用 Go 语言开发 ZooKeeper 应用。

go-zookeeper 库简介

go-zookeeper(github.com/go-zookeeper/zk)是 Go 语言原生实现的 ZooKeeper 客户端库,提供了完整的 ZooKeeper 功能支持。

主要特性

  • 纯 Go 实现,无 CGO 依赖
  • 支持同步和异步 API
  • 内置分布式锁实现
  • 支持连接状态监听和自动重连
  • 完善的错误处理

环境准备

安装依赖

# 初始化 Go 模块(如果还没有)
go mod init myproject

# 安装 go-zookeeper
go get github.com/go-zookeeper/zk

# 验证安装
go list -m github.com/go-zookeeper/zk

启动 ZooKeeper 服务

# 使用 Docker 快速启动(开发环境)
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.8

# 验证服务
echo ruok | nc localhost 2181
# 应返回 imok

连接管理

创建连接

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
// 连接 ZooKeeper
// servers: ZooKeeper 地址列表
// sessionTimeout: 会话超时时间
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

fmt.Printf("连接状态: %v\n", conn.State())

// 连接成功后可以执行操作
// ...
}

连接状态监听

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, eventChan, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 监听连接事件
go func() {
for event := range eventChan {
fmt.Printf("收到事件: Type=%v, State=%v, Path=%v\n",
event.Type, event.State, event.Path)

switch event.State {
case zk.StateConnected:
fmt.Println("已连接到 ZooKeeper")
case zk.StateDisconnected:
fmt.Println("连接断开")
case zk.StateExpired:
fmt.Println("会话已过期")
case zk.StateConnecting:
fmt.Println("正在重连...")
case zk.StateSyncConnected:
fmt.Println("同步连接成功")
}
}
}()

// 保持运行
time.Sleep(time.Minute)
}

连接选项

package main

import (
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
// 使用连接选项
conn, _, err := zk.Connect(
[]string{"localhost:2181"},
time.Second*5,
zk.WithLogInfo(true), // 启用日志
zk.WithMaxBufferSize(1024*1024), // 设置最大缓冲区
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 设置自定义日志器
// conn.SetLogger(customLogger)
}

CRUD 操作

创建节点

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 创建持久节点
// flags: 0 表示持久节点
// acl: 权限控制列表
path, err := conn.Create(
"/myapp/config", // 路径
[]byte("production"), // 数据
0, // flags: 0=持久, zk.FlagEphemeral=临时, zk.FlagSequence=顺序
zk.WorldACL(zk.PermAll), // ACL: 所有人所有权限
)
if err != nil {
log.Printf("创建节点失败: %v", err)
} else {
fmt.Printf("创建节点成功: %s\n", path)
}

// 创建临时节点
ephemeralPath, err := conn.Create(
"/myapp/temp",
[]byte("temporary"),
zk.FlagEphemeral,
zk.WorldACL(zk.PermAll),
)
if err != nil {
log.Printf("创建临时节点失败: %v", err)
} else {
fmt.Printf("创建临时节点成功: %s\n", ephemeralPath)
}

// 创建顺序节点
seqPath, err := conn.Create(
"/myapp/seq-",
[]byte("sequential"),
zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
if err != nil {
log.Printf("创建顺序节点失败: %v", err)
} else {
fmt.Printf("创建顺序节点成功: %s\n", seqPath) // 如 /myapp/seq-0000000001
}

// 创建临时顺序节点
ephemeralSeqPath, err := conn.Create(
"/myapp/lock-",
[]byte("lock-holder"),
zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
if err != nil {
log.Printf("创建临时顺序节点失败: %v", err)
} else {
fmt.Printf("创建临时顺序节点成功: %s\n", ephemeralSeqPath)
}

// 创建容器节点(ZooKeeper 3.5+)
containerPath, err := conn.CreateContainer(
"/myapp/container",
[]byte("container-data"),
0,
zk.WorldACL(zk.PermAll),
)
if err != nil {
log.Printf("创建容器节点失败: %v", err)
} else {
fmt.Printf("创建容器节点成功: %s\n", containerPath)
}
}

读取节点

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 检查节点是否存在
exists, stat, err := conn.Exists("/myapp/config")
if err != nil {
log.Printf("检查节点失败: %v", err)
return
}
fmt.Printf("节点存在: %v\n", exists)

if exists {
// 获取节点数据
data, stat, err := conn.Get("/myapp/config")
if err != nil {
log.Printf("获取数据失败: %v", err)
return
}
fmt.Printf("数据: %s\n", string(data))
fmt.Printf("版本: %d\n", stat.Version)
fmt.Printf("数据长度: %d\n", stat.DataLength)
fmt.Printf("子节点数: %d\n", stat.NumChildren)
fmt.Printf("创建时间: %d\n", stat.Ctime)
fmt.Printf("修改时间: %d\n", stat.Mtime)
}

// 获取子节点列表
children, stat, err := conn.Children("/myapp")
if err != nil {
log.Printf("获取子节点失败: %v", err)
return
}
fmt.Printf("子节点: %v\n", children)
fmt.Printf("子节点版本: %d\n", stat.Cversion)
}

更新节点

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 获取当前数据
data, stat, err := conn.Get("/myapp/config")
if err != nil {
log.Printf("获取数据失败: %v", err)
return
}
fmt.Printf("当前数据: %s, 版本: %d\n", string(data), stat.Version)

// 更新数据(指定版本,乐观锁)
newStat, err := conn.Set("/myapp/config", []byte("new-production"), stat.Version)
if err != nil {
if err == zk.ErrBadVersion {
log.Println("版本冲突,数据已被其他客户端修改")
} else {
log.Printf("更新失败: %v", err)
}
return
}
fmt.Printf("更新成功,新版本: %d\n", newStat.Version)

// 无条件更新(版本设为 -1)
newStat, err = conn.Set("/myapp/config", []byte("unconditional-update"), -1)
if err != nil {
log.Printf("更新失败: %v", err)
return
}
fmt.Printf("无条件更新成功,版本: %d\n", newStat.Version)
}

删除节点

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 删除节点(无子节点)
err = conn.Delete("/myapp/config", -1) // -1 表示不检查版本
if err != nil {
if err == zk.ErrNoNode {
log.Println("节点不存在")
} else if err == zk.ErrNotEmpty {
log.Println("节点有子节点,不能删除")
} else {
log.Printf("删除失败: %v", err)
}
return
}
fmt.Println("删除成功")

// 条件删除(指定版本)
_, stat, err := conn.Exists("/myapp/other")
if err != nil {
log.Printf("检查节点失败: %v", err)
return
}
if stat != nil {
err = conn.Delete("/myapp/other", stat.Version)
if err != nil {
log.Printf("条件删除失败: %v", err)
}
}
}

Watcher 机制

数据 Watcher

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 获取数据并设置 Watcher
// 返回的 channel 会在数据变更时收到事件
data, stat, eventChan, err := conn.GetW("/myapp/config")
if err != nil {
log.Printf("获取数据失败: %v", err)
return
}
fmt.Printf("当前数据: %s\n", string(data))

// 监听事件
go func() {
for {
select {
case event := <-eventChan:
fmt.Printf("收到事件: Type=%v, Path=%v\n", event.Type, event.Path)

if event.Type == zk.EventNodeDataChanged {
// 数据变更,重新获取
data, _, _, err := conn.GetW(event.Path)
if err != nil {
log.Printf("重新获取数据失败: %v", err)
return
}
fmt.Printf("新数据: %s\n", string(data))
} else if event.Type == zk.EventNodeDeleted {
fmt.Println("节点已删除")
return
}
case <-time.After(time.Minute):
return
}
}
}()

// 保持运行
time.Sleep(time.Minute)
}

子节点 Watcher

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 获取子节点并设置 Watcher
children, stat, eventChan, err := conn.ChildrenW("/myapp")
if err != nil {
log.Printf("获取子节点失败: %v", err)
return
}
fmt.Printf("当前子节点: %v\n", children)

// 监听子节点变更
go func() {
for {
select {
case event := <-eventChan:
fmt.Printf("收到事件: Type=%v, Path=%v\n", event.Type, event.Path)

if event.Type == zk.EventNodeChildrenChanged {
// 子节点变更,重新获取
children, _, _, err := conn.ChildrenW(event.Path)
if err != nil {
log.Printf("重新获取子节点失败: %v", err)
return
}
fmt.Printf("新子节点列表: %v\n", children)
}
case <-time.After(time.Minute):
return
}
}
}()

time.Sleep(time.Minute)
}

存在性 Watcher

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 检查节点是否存在并设置 Watcher
exists, stat, eventChan, err := conn.ExistsW("/myapp/new-node")
if err != nil {
log.Printf("检查节点失败: %v", err)
return
}

if exists {
fmt.Printf("节点已存在,版本: %d\n", stat.Version)
} else {
fmt.Println("节点不存在,等待创建...")
}

// 监听节点创建/删除
go func() {
event := <-eventChan
fmt.Printf("收到事件: Type=%v, Path=%v\n", event.Type, event.Path)

switch event.Type {
case zk.EventNodeCreated:
fmt.Println("节点已创建")
case zk.EventNodeDeleted:
fmt.Println("节点已删除")
case zk.EventNodeDataChanged:
fmt.Println("节点数据已变更")
}
}()

time.Sleep(time.Minute)
}

分布式锁

go-zookeeper 库内置了分布式锁实现。

基本锁

package main

import (
"fmt"
"log"
"sync"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 创建锁实例
lock := zk.NewLock(conn, "/locks/my-resource", zk.WorldACL(zk.PermAll))

var wg sync.WaitGroup

// 启动多个 goroutine 竞争锁
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

// 每个 goroutine 创建自己的锁实例
myLock := zk.NewLock(conn, "/locks/my-resource", zk.WorldACL(zk.PermAll))

fmt.Printf("Worker %d 尝试获取锁\n", id)

err := myLock.Lock()
if err != nil {
log.Printf("Worker %d 获取锁失败: %v", id, err)
return
}
defer myLock.Unlock()

fmt.Printf("Worker %d 获得锁,开始工作\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Worker %d 完成工作,释放锁\n", id)
}(i)
}

wg.Wait()
}

带超时的锁

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

lock := zk.NewLock(conn, "/locks/my-resource", zk.WorldACL(zk.PermAll))

fmt.Println("尝试获取锁(最多等待 5 秒)...")

// 尝试获取锁
// Lock() 是阻塞的,如果需要超时,可以用 goroutine + channel 实现
done := make(chan error, 1)
go func() {
done <- lock.Lock()
}()

select {
case err := <-done:
if err != nil {
log.Printf("获取锁失败: %v", err)
return
}
defer lock.Unlock()
fmt.Println("获得锁,执行业务逻辑")
time.Sleep(2 * time.Second)
fmt.Println("释放锁")

case <-time.After(5 * time.Second):
fmt.Println("获取锁超时")
// 注意:goroutine 仍在等待锁,需要关闭连接或实现取消机制
}
}

读写锁实现

package main

import (
"fmt"
"log"
"sync"
"time"

"github.com/go-zookeeper/zk"
)

// ReadWriteLock 读写锁实现
type ReadWriteLock struct {
conn *zk.Conn
basePath string
acl []zk.ACL
readLock *zk.Lock
writeLock *zk.Lock
}

// NewReadWriteLock 创建读写锁
func NewReadWriteLock(conn *zk.Conn, path string, acl []zk.ACL) *ReadWriteLock {
return &ReadWriteLock{
conn: conn,
basePath: path,
acl: acl,
readLock: zk.NewLock(conn, path+"/read", acl),
writeLock: zk.NewLock(conn, path+"/write", acl),
}
}

// RLock 获取读锁
func (rw *ReadWriteLock) RLock() error {
return rw.readLock.Lock()
}

// RUnlock 释放读锁
func (rw *ReadWriteLock) RUnlock() {
rw.readLock.Unlock()
}

// WLock 获取写锁
func (rw *ReadWriteLock) WLock() error {
return rw.writeLock.Lock()
}

// WUnlock 释放写锁
func (rw *ReadWriteLock) WUnlock() {
rw.writeLock.Unlock()
}

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

rwLock := NewReadWriteLock(conn, "/locks/rw-resource", zk.WorldACL(zk.PermAll))
var wg sync.WaitGroup

// 读协程
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := rwLock.RLock(); err != nil {
log.Printf("Reader %d 获取读锁失败: %v", id, err)
return
}
defer rwLock.RUnlock()

fmt.Printf("Reader %d 获得读锁\n", id)
time.Sleep(time.Second)
fmt.Printf("Reader %d 释放读锁\n", id)
}(i)
}

// 写协程
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // 让读锁先获取
if err := rwLock.WLock(); err != nil {
log.Printf("Writer 获取写锁失败: %v", err)
return
}
defer rwLock.WUnlock()

fmt.Println("Writer 获得写锁")
time.Sleep(2 * time.Second)
fmt.Println("Writer 释放写锁")
}()

wg.Wait()
}

Leader 选举

package main

import (
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"

"github.com/go-zookeeper/zk"
)

// LeaderElection Leader 选举实现
type LeaderElection struct {
conn *zk.Conn
basePath string
nodeName string
currentNode string
isLeader bool
stopChan chan struct{}
}

// NewLeaderElection 创建 Leader 选举实例
func NewLeaderElection(conn *zk.Conn, basePath, nodeName string) *LeaderElection {
return &LeaderElection{
conn: conn,
basePath: basePath,
nodeName: nodeName,
stopChan: make(chan struct{}),
}
}

// Start 开始参与选举
func (le *LeaderElection) Start() error {
// 确保基础路径存在
path := le.basePath
if exists, _, _ := le.conn.Exists(path); !exists {
if _, err := le.conn.Create(path, nil, 0, zk.WorldACL(zk.PermAll)); err != nil && err != zk.ErrNodeExists {
return err
}
}

// 创建临时顺序节点
node, err := le.conn.Create(
path+"/node-",
[]byte(le.nodeName),
zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
if err != nil {
return err
}
le.currentNode = node

// 开始选举循环
go le.electLoop()

return nil
}

// electLoop 选举循环
func (le *LeaderElection) electLoop() {
for {
select {
case <-le.stopChan:
return
default:
if le.checkLeader() {
if !le.isLeader {
le.isLeader = true
fmt.Printf("[%s] 成为 Leader\n", le.nodeName)
}
// Leader 监控自己节点
le.watchSelf()
} else {
le.isLeader = false
// Follower 监控前一个节点
le.watchPrevious()
}
}
}
}

// checkLeader 检查是否是 Leader
func (le *LeaderElection) checkLeader() bool {
children, _, err := le.conn.Children(le.basePath)
if err != nil {
return false
}

// 排序
sort.Strings(children)

// 获取自己的序号
mySeq := le.currentNode[strings.LastIndex(le.currentNode, "/")+1:]

// 判断是否是最小节点
return children[0] == mySeq
}

// watchPrevious 监控前一个节点
func (le *LeaderElection) watchPrevious() {
children, _, err := le.conn.Children(le.basePath)
if err != nil {
return
}

sort.Strings(children)

mySeq := le.currentNode[strings.LastIndex(le.currentNode, "/")+1:]
myIndex := -1

for i, child := range children {
if child == mySeq {
myIndex = i
break
}
}

if myIndex <= 0 {
return
}

prevNode := le.basePath + "/" + children[myIndex-1]

// 监控前一个节点
exists, _, eventChan, err := le.conn.ExistsW(prevNode)
if err != nil {
return
}

if !exists {
// 前一个节点已不存在,重新检查
return
}

fmt.Printf("[%s] 监控前一个节点: %s\n", le.nodeName, prevNode)

// 等待事件
select {
case event := <-eventChan:
if event.Type == zk.EventNodeDeleted {
fmt.Printf("[%s] 前一个节点已删除\n", le.nodeName)
}
case <-le.stopChan:
return
case <-time.After(time.Minute):
// 超时重新检查
}
}

// watchSelf Leader 监控自己节点
func (le *LeaderElection) watchSelf() {
exists, _, eventChan, err := le.conn.ExistsW(le.currentNode)
if err != nil {
return
}

if !exists {
// 自己节点已不存在,退出
fmt.Printf("[%s] 自己节点已不存在\n", le.nodeName)
return
}

select {
case event := <-eventChan:
if event.Type == zk.EventNodeDeleted {
fmt.Printf("[%s] 会话可能已断开\n", le.nodeName)
}
case <-le.stopChan:
return
case <-time.After(time.Second * 10):
// 定期检查
}
}

// IsLeader 返回是否是 Leader
func (le *LeaderElection) IsLeader() bool {
return le.isLeader
}

// Stop 停止选举
func (le *LeaderElection) Stop() {
close(le.stopChan)
if le.currentNode != "" {
le.conn.Delete(le.currentNode, -1)
}
}

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 随机生成节点名称
rand.Seed(time.Now().UnixNano())
nodeName := fmt.Sprintf("node-%d", rand.Intn(1000))

// 创建选举实例
election := NewLeaderElection(conn, "/election/leader", nodeName)

// 开始选举
if err := election.Start(); err != nil {
log.Fatalf("开始选举失败: %v", err)
}
defer election.Stop()

// 工作循环
go func() {
for {
if election.IsLeader() {
fmt.Printf("[%s] 执行 Leader 任务...\n", nodeName)
} else {
fmt.Printf("[%s] 作为 Follower 运行\n", nodeName)
}
time.Sleep(3 * time.Second)
}
}()

// 等待退出信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

fmt.Printf("[%s] 退出\n", nodeName)
}

服务注册发现

package main

import (
"encoding/json"
"fmt"
"log"
"math/rand"
"time"

"github.com/go-zookeeper/zk"
)

// ServiceInstance 服务实例信息
type ServiceInstance struct {
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
Version string `json:"version"`
}

// ServiceRegistry 服务注册中心
type ServiceRegistry struct {
conn *zk.Conn
basePath string
instancePath string
}

// NewServiceRegistry 创建服务注册中心
func NewServiceRegistry(conn *zk.Conn, basePath string) *ServiceRegistry {
return &ServiceRegistry{
conn: conn,
basePath: basePath,
}
}

// Register 注册服务
func (sr *ServiceRegistry) Register(instance *ServiceInstance) (string, error) {
// 确保基础路径存在
servicePath := sr.basePath + "/" + instance.Name
if exists, _, _ := sr.conn.Exists(servicePath); !exists {
if _, err := sr.conn.Create(servicePath, nil, 0, zk.WorldACL(zk.PermAll)); err != nil && err != zk.ErrNodeExists {
return "", err
}
}

// 序列化实例信息
data, err := json.Marshal(instance)
if err != nil {
return "", err
}

// 创建临时顺序节点
path, err := sr.conn.Create(
servicePath+"/instance_",
data,
zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
if err != nil {
return "", err
}

sr.instancePath = path
fmt.Printf("服务注册成功: %s -> %s:%d\n", instance.Name, instance.Host, instance.Port)

return path, nil
}

// Deregister 注销服务
func (sr *ServiceRegistry) Deregister() error {
if sr.instancePath != "" {
return sr.conn.Delete(sr.instancePath, -1)
}
return nil
}

// Discover 发现服务
func (sr *ServiceRegistry) Discover(serviceName string) ([]*ServiceInstance, error) {
servicePath := sr.basePath + "/" + serviceName

children, _, err := sr.conn.Children(servicePath)
if err != nil {
if err == zk.ErrNoNode {
return nil, nil
}
return nil, err
}

instances := make([]*ServiceInstance, 0, len(children))
for _, child := range children {
data, _, err := sr.conn.Get(servicePath + "/" + child)
if err != nil {
continue
}

var instance ServiceInstance
if err := json.Unmarshal(data, &instance); err != nil {
continue
}
instances = append(instances, &instance)
}

return instances, nil
}

// DiscoverOne 发现一个服务实例(随机选择)
func (sr *ServiceRegistry) DiscoverOne(serviceName string) (*ServiceInstance, error) {
instances, err := sr.Discover(serviceName)
if err != nil {
return nil, err
}

if len(instances) == 0 {
return nil, nil
}

rand.Seed(time.Now().UnixNano())
return instances[rand.Intn(len(instances))], nil
}

// WatchService 监听服务变更
func (sr *ServiceRegistry) WatchService(serviceName string, callback func([]*ServiceInstance)) error {
servicePath := sr.basePath + "/" + serviceName

go func() {
for {
children, _, eventChan, err := sr.conn.ChildrenW(servicePath)
if err != nil {
time.Sleep(time.Second)
continue
}

// 获取当前实例列表
instances := make([]*ServiceInstance, 0, len(children))
for _, child := range children {
data, _, err := sr.conn.Get(servicePath + "/" + child)
if err != nil {
continue
}
var instance ServiceInstance
if err := json.Unmarshal(data, &instance); err != nil {
continue
}
instances = append(instances, &instance)
}
callback(instances)

// 等待变更事件
event := <-eventChan
if event.Type == zk.EventNodeChildrenChanged {
// 继续循环获取新列表
}
}
}()

return nil
}

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

registry := NewServiceRegistry(conn, "/services")

// 注册服务
instance1 := &ServiceInstance{
Name: "order-service",
Host: "192.168.1.100",
Port: 8080,
Version: "1.0.0",
}
registry.Register(instance1)

instance2 := &ServiceInstance{
Name: "order-service",
Host: "192.168.1.101",
Port: 8080,
Version: "1.0.0",
}
registry.Register(instance2)

// 发现服务
instances, _ := registry.Discover("order-service")
fmt.Println("发现服务实例:")
for _, inst := range instances {
fmt.Printf(" - %s:%d\n", inst.Host, inst.Port)
}

// 随机选择一个
selected, _ := registry.DiscoverOne("order-service")
if selected != nil {
fmt.Printf("选中实例: %s:%d\n", selected.Host, selected.Port)
}

// 监听服务变更
registry.WatchService("order-service", func(instances []*ServiceInstance) {
fmt.Println("服务列表更新:")
for _, inst := range instances {
fmt.Printf(" - %s:%d\n", inst.Host, inst.Port)
}
})

time.Sleep(time.Minute)
}

事务操作

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 创建事务
// Multi 执行多个操作,要么全部成功,要么全部失败
results, err := conn.Multi(
&zk.CreateRequest{
Path: "/tx/node1",
Data: []byte("data1"),
Flags: 0,
Acl: zk.WorldACL(zk.PermAll),
},
&zk.CreateRequest{
Path: "/tx/node2",
Data: []byte("data2"),
Flags: 0,
Acl: zk.WorldACL(zk.PermAll),
},
&zk.SetDataRequest{
Path: "/tx/node1",
Data: []byte("updated1"),
Version: 0,
},
)

if err != nil {
log.Printf("事务执行失败: %v", err)
return
}

fmt.Println("事务执行成功")
for i, result := range results {
fmt.Printf(" 操作 %d: %v\n", i+1, result)
}
}

错误处理

package main

import (
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

func main() {
conn, _, err := zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 处理各种错误
data, stat, err := conn.Get("/nonexistent")
if err != nil {
switch err {
case zk.ErrNoNode:
fmt.Println("节点不存在")
case zk.ErrNoAuth:
fmt.Println("没有权限")
case zk.ErrConnectionClosed:
fmt.Println("连接已关闭")
case zk.ErrSessionExpired:
fmt.Println("会话已过期")
default:
fmt.Printf("其他错误: %v\n", err)
}
return
}

// 处理版本冲突
_, err = conn.Set("/myapp/config", []byte("new-data"), 0)
if err == zk.ErrBadVersion {
fmt.Println("版本冲突,获取最新版本后重试")
data, stat, _ := conn.Get("/myapp/config")
conn.Set("/myapp/config", []byte("new-data"), stat.Version)
}

// 处理节点已存在
_, err = conn.Create("/myapp/config", []byte("data"), 0, zk.WorldACL(zk.PermAll))
if err == zk.ErrNodeExists {
fmt.Println("节点已存在,更新数据")
conn.Set("/myapp/config", []byte("data"), -1)
}

_ = data
_ = stat
}

最佳实践

1. 使用连接池

// 推荐:复用连接,而不是每次操作都创建新连接
var conn *zk.Conn
var once sync.Once

func GetConnection() *zk.Conn {
once.Do(func() {
var err error
conn, _, err = zk.Connect([]string{"localhost:2181"}, time.Second*5)
if err != nil {
panic(err)
}
})
return conn
}

2. 处理会话过期

func handleSession(conn *zk.Conn, eventChan <-chan zk.Event) {
for event := range eventChan {
if event.State == zk.StateExpired {
// 会话过期,需要重新初始化
// 临时节点会自动删除,需要重新创建
log.Println("会话已过期,需要重新初始化")
}
}
}

3. 设置合理的超时

conn, _, err := zk.Connect(
[]string{"localhost:2181"},
time.Second*5, // 会话超时
zk.WithMaxBufferSize(1024*1024),
)

小结

本章介绍了使用 Go 语言 go-zookeeper 库开发 ZooKeeper 应用的核心内容:

  1. 连接管理:创建连接、状态监听、连接选项
  2. CRUD 操作:创建、读取、更新、删除节点
  3. Watcher 机制:数据监听、子节点监听、存在性监听
  4. 分布式锁:基本锁、读写锁实现
  5. Leader 选举:基于临时顺序节点的选举实现
  6. 服务注册发现:完整的服务注册中心实现
  7. 事务操作:原子性多操作执行
  8. 错误处理:常见异常和处理方式

go-zookeeper 库提供了简洁高效的 API,适合在 Go 微服务中使用 ZooKeeper 进行协调。

参考资料