跳到主要内容

客户端开发

本章介绍如何使用 Go 语言实现 gRPC 客户端,包括连接管理、调用方式、错误处理等内容。

客户端架构

基本客户端实现

创建连接

package main

import (
"context"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "myproject/proto/hello"
)

func main() {
// 创建连接(不安全连接,仅用于开发环境)
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 创建客户端
client := pb.NewGreeterClient(conn)

// 调用服务
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "世界"})
if err != nil {
log.Fatalf("调用失败: %v", err)
}

log.Printf("响应: %s", resp.Message)
}

连接选项

// 完整的连接配置
conn, err := grpc.Dial(
"localhost:50051",
// 传输凭证
grpc.WithTransportCredentials(insecure.NewCredentials()),

// 阻塞等待连接建立
grpc.WithBlock(),

// 超时设置
grpc.WithTimeout(time.Second*5),

// 最大消息大小
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*10), // 10MB
grpc.MaxCallSendMsgSize(1024*1024*10), // 10MB
),

// 连接保活
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}),

// 重试策略
grpc.WithDefaultServiceConfig(`{
"methodConfig": [{
"name": [{"service": ""}],
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`),
)

一元 RPC 调用

// 简单调用
func simpleCall(client pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}

// 带元数据的调用
func callWithMetadata(client pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// 添加元数据
md := metadata.New(map[string]string{
"authorization": "Bearer token123",
"request-id": "req-001",
})
ctx = metadata.NewOutgoingContext(ctx, md)

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}

// 读取响应头
header, err := grpc.Header(ctx)
if err == nil {
log.Printf("响应头: %v", header)
}

log.Printf("响应: %s", resp.Message)
}

流式 RPC 调用

服务端流

func serverStream(client pb.OrderServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

// 创建流
stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user123",
PageSize: 10,
})
if err != nil {
log.Printf("创建流失败: %v", err)
return
}

// 接收流数据
for {
order, err := stream.Recv()
if err == io.EOF {
log.Println("流结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("订单: %s, 金额: %.2f", order.Id, order.Amount)
}

// 读取响应尾部的元数据
trailer := stream.Trailer()
log.Printf("Trailer: %v", trailer)
}

客户端流

func clientStream(client pb.FileServiceClient, file *os.File) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

// 创建流
stream, err := client.UploadFile(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}

// 分块发送文件
buffer := make([]byte, 64*1024) // 64KB
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Printf("读取文件失败: %v", err)
return
}

if err := stream.Send(&pb.FileChunk{
Filename: "upload.txt",
Content: buffer[:n],
}); err != nil {
log.Printf("发送失败: %v", err)
return
}
}

// 关闭发送端并接收响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Printf("上传失败: %v", err)
return
}
log.Printf("上传结果: %s", result.Message)
}

双向流

func bidirectionalStream(client pb.ChatServiceClient) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 创建流
stream, err := client.Chat(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}

// 发送消息的 goroutine
go func() {
for i := 0; i < 5; i++ {
msg := &pb.ChatMessage{
UserId: "user123",
Content: fmt.Sprintf("消息 %d", i+1),
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(time.Second)
}
stream.CloseSend()
}()

// 接收消息
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Println("流结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("收到: [%s] %s", msg.UserId, msg.Content)
}
}

错误处理

解析错误

func handleError(err error) {
if err == nil {
return
}

// 转换为 gRPC 状态
st, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: code=%s, message=%s", st.Code(), st.Message())

// 处理详细错误信息
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.GetFieldViolations() {
log.Printf("字段错误: %s - %s", violation.Field, violation.Description)
}
case *errdetails.RetryInfo:
log.Printf("建议重试延迟: %v", t.GetRetryDelay())
}
}

// 根据错误码处理
switch st.Code() {
case codes.NotFound:
log.Println("资源不存在")
case codes.PermissionDenied:
log.Println("权限不足")
case codes.Unauthenticated:
log.Println("未认证")
case codes.DeadlineExceeded:
log.Println("请求超时")
case codes.Unavailable:
log.Println("服务不可用,请重试")
}
} else {
log.Printf("其他错误: %v", err)
}
}

超时和取消

func callWithTimeout(client pb.GreeterClient) {
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("请求超时")
} else {
log.Printf("其他错误: %v", err)
}
return
}
log.Printf("响应: %s", resp.Message)
}

// 手动取消
func callWithCancel(client pb.GreeterClient) {
ctx, cancel := context.WithCancel(context.Background())

// 在另一个 goroutine 中取消
go func() {
time.Sleep(time.Second)
cancel()
}()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.Canceled {
log.Println("请求被取消")
}
return
}
log.Printf("响应: %s", resp.Message)
}

客户端拦截器

// 一元拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()

log.Printf("[客户端请求] 方法: %s", method)

// 调用服务
err := invoker(ctx, method, req, reply, cc, opts...)

log.Printf("[客户端响应] 方法: %s, 耗时: %v, 错误: %v", method, time.Since(start), err)

return err
}

// 流拦截器
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Printf("[客户端流] 方法: %s", method)

stream, err := streamer(ctx, desc, cc, method, opts...)

return stream, err
}

// 使用拦截器
func main() {
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
)
}

连接池和负载均衡

// 使用多个服务端地址实现负载均衡
func loadBalancedClient() {
// 多个服务端地址
conn, err := grpc.Dial(
"dns:///my-service.example.com", // DNS 服务发现
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin": {}}]
}`),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

client := pb.NewGreeterClient(conn)

// 调用会被自动负载均衡
for i := 0; i < 10; i++ {
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("请求%d", i)})
if err != nil {
log.Printf("调用失败: %v", err)
continue
}
log.Printf("响应: %s", resp.Message)
}
}

最佳实践

1. 连接复用

// ✅ 好:复用连接
var globalConn *grpc.ClientConn

func getClient() (*grpc.ClientConn, error) {
if globalConn != nil {
return globalConn, nil
}

var err error
globalConn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
return globalConn, err
}

// ❌ 不好:每次调用都创建新连接
func badPractice() {
conn, _ := grpc.Dial("localhost:50051", ...)
defer conn.Close() // 立即关闭,效率低
client := pb.NewGreeterClient(conn)
client.SayHello(...)
}

2. 设置合理的超时

// ✅ 好:根据操作类型设置不同超时
func callWithAppropriateTimeout(client pb.GreeterClient) {
// 快速操作
quickCtx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
client.QuickOperation(quickCtx, ...)

// 慢操作
slowCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client.SlowOperation(slowCtx, ...)
}

3. 处理连接状态

func monitorConnection(conn *grpc.ClientConn) {
state := conn.GetState()
log.Printf("连接状态: %s", state)

for {
// 等待状态变化
changed := conn.WaitForStateChange(context.Background(), state)
if !changed {
continue
}

newState := conn.GetState()
log.Printf("状态变化: %s -> %s", state, newState)

// 处理状态变化
switch newState {
case connectivity.Idle:
log.Println("连接空闲")
case connectivity.Connecting:
log.Println("正在连接")
case connectivity.Ready:
log.Println("连接就绪")
case connectivity.TransientFailure:
log.Println("临时故障")
case connectivity.Shutdown:
log.Println("连接关闭")
return
}

state = newState
}
}

小结

本章我们学习了:

  1. 基本客户端:创建连接和调用服务
  2. 连接配置:各种连接选项和参数
  3. 流式调用:三种流式 RPC 的客户端实现
  4. 错误处理:解析错误和超时处理
  5. 拦截器:客户端请求拦截
  6. 连接管理:连接复用和负载均衡
  7. 最佳实践:连接复用、超时设置、状态监控

下一章我们将学习 gRPC 的错误处理机制。