客户端开发
本章介绍如何使用 Go 语言实现 gRPC 客户端,包括连接管理、调用方式、错误处理等内容。
客户端架构
基本客户端实现
创建连接
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "myproject/proto/hello"
)
func main() {
// 创建连接(不安全连接,仅用于开发环境)
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
// 创建客户端
client := pb.NewGreeterClient(conn)
// 调用服务
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "世界"})
if err != nil {
log.Fatalf("调用失败: %v", err)
}
log.Printf("响应: %s", resp.Message)
}
连接选项
// 完整的连接配置
conn, err := grpc.Dial(
"localhost:50051",
// 传输凭证
grpc.WithTransportCredentials(insecure.NewCredentials()),
// 阻塞等待连接建立
grpc.WithBlock(),
// 超时设置
grpc.WithTimeout(time.Second*5),
// 最大消息大小
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*10), // 10MB
grpc.MaxCallSendMsgSize(1024*1024*10), // 10MB
),
// 连接保活
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}),
// 重试策略
grpc.WithDefaultServiceConfig(`{
"methodConfig": [{
"name": [{"service": ""}],
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`),
)
一元 RPC 调用
// 简单调用
func simpleCall(client pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}
// 带元数据的调用
func callWithMetadata(client pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 添加元数据
md := metadata.New(map[string]string{
"authorization": "Bearer token123",
"request-id": "req-001",
})
ctx = metadata.NewOutgoingContext(ctx, md)
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
// 读取响应头
header, err := grpc.Header(ctx)
if err == nil {
log.Printf("响应头: %v", header)
}
log.Printf("响应: %s", resp.Message)
}
流式 RPC 调用
服务端流
func serverStream(client pb.OrderServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// 创建流
stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user123",
PageSize: 10,
})
if err != nil {
log.Printf("创建流失败: %v", err)
return
}
// 接收流数据
for {
order, err := stream.Recv()
if err == io.EOF {
log.Println("流结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("订单: %s, 金额: %.2f", order.Id, order.Amount)
}
// 读取响应尾部的元数据
trailer := stream.Trailer()
log.Printf("Trailer: %v", trailer)
}
客户端流
func clientStream(client pb.FileServiceClient, file *os.File) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
// 创建流
stream, err := client.UploadFile(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}
// 分块发送文件
buffer := make([]byte, 64*1024) // 64KB
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Printf("读取文件失败: %v", err)
return
}
if err := stream.Send(&pb.FileChunk{
Filename: "upload.txt",
Content: buffer[:n],
}); err != nil {
log.Printf("发送失败: %v", err)
return
}
}
// 关闭发送端并接收响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Printf("上传失败: %v", err)
return
}
log.Printf("上传结果: %s", result.Message)
}
双向流
func bidirectionalStream(client pb.ChatServiceClient) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建流
stream, err := client.Chat(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}
// 发送消息的 goroutine
go func() {
for i := 0; i < 5; i++ {
msg := &pb.ChatMessage{
UserId: "user123",
Content: fmt.Sprintf("消息 %d", i+1),
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(time.Second)
}
stream.CloseSend()
}()
// 接收消息
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Println("流结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("收到: [%s] %s", msg.UserId, msg.Content)
}
}
错误处理
解析错误
func handleError(err error) {
if err == nil {
return
}
// 转换为 gRPC 状态
st, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: code=%s, message=%s", st.Code(), st.Message())
// 处理详细错误信息
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.GetFieldViolations() {
log.Printf("字段错误: %s - %s", violation.Field, violation.Description)
}
case *errdetails.RetryInfo:
log.Printf("建议重试延迟: %v", t.GetRetryDelay())
}
}
// 根据错误码处理
switch st.Code() {
case codes.NotFound:
log.Println("资源不存在")
case codes.PermissionDenied:
log.Println("权限不足")
case codes.Unauthenticated:
log.Println("未认证")
case codes.DeadlineExceeded:
log.Println("请求超时")
case codes.Unavailable:
log.Println("服务不可用,请重试")
}
} else {
log.Printf("其他错误: %v", err)
}
}
超时和取消
func callWithTimeout(client pb.GreeterClient) {
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("请求超时")
} else {
log.Printf("其他错误: %v", err)
}
return
}
log.Printf("响应: %s", resp.Message)
}
// 手动取消
func callWithCancel(client pb.GreeterClient) {
ctx, cancel := context.WithCancel(context.Background())
// 在另一个 goroutine 中取消
go func() {
time.Sleep(time.Second)
cancel()
}()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.Canceled {
log.Println("请求被取消")
}
return
}
log.Printf("响应: %s", resp.Message)
}
客户端拦截器
// 一元拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
log.Printf("[客户端请求] 方法: %s", method)
// 调用服务
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("[客户端响应] 方法: %s, 耗时: %v, 错误: %v", method, time.Since(start), err)
return err
}
// 流拦截器
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Printf("[客户端流] 方法: %s", method)
stream, err := streamer(ctx, desc, cc, method, opts...)
return stream, err
}
// 使用拦截器
func main() {
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
)
}
连接池和负载均衡
// 使用多个服务端地址实现负载均衡
func loadBalancedClient() {
// 多个服务端地址
conn, err := grpc.Dial(
"dns:///my-service.example.com", // DNS 服务发现
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin": {}}]
}`),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewGreeterClient(conn)
// 调用会被自动负载均衡
for i := 0; i < 10; i++ {
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("请求%d", i)})
if err != nil {
log.Printf("调用失败: %v", err)
continue
}
log.Printf("响应: %s", resp.Message)
}
}
最佳实践
1. 连接复用
// ✅ 好:复用连接
var globalConn *grpc.ClientConn
func getClient() (*grpc.ClientConn, error) {
if globalConn != nil {
return globalConn, nil
}
var err error
globalConn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
return globalConn, err
}
// ❌ 不好:每次调用都创建新连接
func badPractice() {
conn, _ := grpc.Dial("localhost:50051", ...)
defer conn.Close() // 立即关闭,效率低
client := pb.NewGreeterClient(conn)
client.SayHello(...)
}
2. 设置合理的超时
// ✅ 好:根据操作类型设置不同超时
func callWithAppropriateTimeout(client pb.GreeterClient) {
// 快速操作
quickCtx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
client.QuickOperation(quickCtx, ...)
// 慢操作
slowCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client.SlowOperation(slowCtx, ...)
}
3. 处理连接状态
func monitorConnection(conn *grpc.ClientConn) {
state := conn.GetState()
log.Printf("连接状态: %s", state)
for {
// 等待状态变化
changed := conn.WaitForStateChange(context.Background(), state)
if !changed {
continue
}
newState := conn.GetState()
log.Printf("状态变化: %s -> %s", state, newState)
// 处理状态变化
switch newState {
case connectivity.Idle:
log.Println("连接空闲")
case connectivity.Connecting:
log.Println("正在连接")
case connectivity.Ready:
log.Println("连接就绪")
case connectivity.TransientFailure:
log.Println("临时故障")
case connectivity.Shutdown:
log.Println("连接关闭")
return
}
state = newState
}
}
小结
本章我们学习了:
- 基本客户端:创建连接和调用服务
- 连接配置:各种连接选项和参数
- 流式调用:三种流式 RPC 的客户端实现
- 错误处理:解析错误和超时处理
- 拦截器:客户端请求拦截
- 连接管理:连接复用和负载均衡
- 最佳实践:连接复用、超时设置、状态监控
下一章我们将学习 gRPC 的错误处理机制。