跳到主要内容

RPC 类型

gRPC 支持四种 RPC 类型,满足不同的通信场景需求。本章详细介绍每种类型的特点和使用方法。

概述

RPC 生命周期

理解 RPC 的生命周期对于正确使用 gRPC 非常重要。每种 RPC 类型都有其特定的生命周期流程。

一元 RPC 生命周期

一元 RPC 是最简单的调用模式,其完整生命周期如下:

  1. 客户端调用:客户端调用 stub 方法,gRPC 框架向服务端发送请求元数据、方法名和可选的截止时间(deadline)。

  2. 服务端接收通知:服务端收到 RPC 调用通知,可以立即发送初始元数据,或等待客户端请求消息。

  3. 请求处理:服务端收到客户端请求后,执行业务逻辑并生成响应。

  4. 响应返回:服务端返回响应(如果成功)、状态详情(状态码和可选消息)以及可选的尾部元数据。

  5. 客户端完成:如果状态码为 OK,客户端获得响应,调用完成。

// 一元 RPC 的完整生命周期示例
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 1. 可以发送初始元数据(在响应之前)
grpc.SendHeader(ctx, metadata.Pairs("x-response-time", time.Now().String()))

// 2. 检查上下文是否已取消或超时
if ctx.Err() == context.Canceled {
return nil, status.Error(codes.Canceled, "请求被取消")
}

// 3. 执行业务逻辑
user, err := s.userService.GetByID(req.UserId)
if err != nil {
// 4. 返回错误状态
return nil, status.Errorf(codes.NotFound, "用户不存在: %s", req.UserId)
}

// 5. 发送尾部元数据(可选)
grpc.SetTrailer(ctx, metadata.Pairs("x-request-id", req.RequestId))

// 6. 返回响应
return user, nil
}

流式 RPC 生命周期

流式 RPC 的生命周期更为复杂,因为客户端和服务端可以在不同时间点读写消息。

服务端流

  1. 客户端发送请求
  2. 服务端收到请求后开始发送消息流
  3. 服务端发送完所有消息后,发送状态和尾部元数据
  4. 客户端收到所有消息后完成

客户端流

  1. 客户端开始发送消息流
  2. 服务端接收消息(可以边接收边处理)
  3. 客户端发送完毕,等待响应
  4. 服务端处理完成后返回单个响应

双向流

  1. 客户端调用方法,服务端收到客户端元数据
  2. 服务端可以发送初始元数据或等待客户端开始流式发送
  3. 客户端和服务端可以按任意顺序读写消息
  4. 两个流独立运行,但每个流内的消息顺序保持不变

元数据(Metadata)

元数据是关于 RPC 调用的附加信息,以键值对形式存在。它在认证、追踪、调试等场景中非常有用。

元数据的结构

  • 键(Key):字符串,由 ASCII 字母、数字和特殊字符 -_. 组成,不能以 grpc- 开头(保留给 gRPC 内部使用)
  • 值(Value):通常是字符串,也可以是二进制数据(键名需以 -bin 结尾)

服务端接收元数据

func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 从上下文获取元数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.DataLoss, "缺少元数据")
}

// 获取单个值
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
token := tokens[0]

// 获取二进制元数据
traceIds := md.Get("trace-id-bin")

// 获取所有值(一个键可能有多个值)
requestIds := md.Get("x-request-id")

// 使用元数据...
return &pb.User{}, nil
}

客户端发送元数据

func callWithMetadata(client pb.GreeterClient) {
// 方式一:使用 metadata.New
md := metadata.New(map[string]string{
"authorization": "Bearer my-token",
"x-request-id": "req-12345",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 方式二:追加到现有上下文
ctx = metadata.AppendToOutgoingContext(ctx,
"x-custom-header", "value1",
"x-custom-header", "value2", // 同一个键可以有多个值
)

// 调用服务
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"})
}

接收响应元数据

func receiveMetadata(client pb.GreeterClient) {
ctx := context.Background()

// 使用 WithCallOption 获取响应元数据
var header, trailer metadata.MD

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"},
grpc.Header(&header), // 获取响应头
grpc.Trailer(&trailer), // 获取尾部元数据
)

// 读取响应头
if contentType := header.Get("content-type"); len(contentType) > 0 {
fmt.Printf("Content-Type: %s\n", contentType[0])
}

// 读取尾部元数据
if requestId := trailer.Get("x-request-id"); len(requestId) > 0 {
fmt.Printf("Request ID: %s\n", requestId[0])
}
}

服务端发送元数据

func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 发送初始元数据(必须在响应之前)
err := grpc.SendHeader(ctx, metadata.Pairs(
"x-response-time", time.Now().Format(time.RFC3339),
"x-server-version", "1.0.0",
))
if err != nil {
return nil, err
}

// 处理请求...

// 设置尾部元数据(在响应之后发送)
grpc.SetTrailer(ctx, metadata.Pairs(
"x-request-id", req.RequestId,
"x-processed-at", time.Now().String(),
))

return &pb.User{}, nil
}

Deadline 和 Timeout

Deadline 和 Timeout 是 gRPC 中控制请求时间的关键机制。它们确保客户端不会无限期等待响应。

Deadline vs Timeout

  • Timeout:从调用开始计算的等待时间长度(如 5 秒)
  • Deadline:一个固定的时间点,表示请求必须在此时间之前完成
// 使用 Timeout(持续时间)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 使用 Deadline(时间点)
deadline := time.Now().Add(5 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

客户端设置 Deadline

func callWithDeadline(client pb.GreeterClient) {
// 设置 3 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("请求超时")
} else {
log.Printf("其他错误: %v", err)
}
return
}

fmt.Printf("响应: %s\n", resp.Message)
}

服务端检查 Deadline

func (s *server) LongRunningTask(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// 检查是否已经超时
if ctx.Err() == context.DeadlineExceeded {
return nil, status.Error(codes.DeadlineExceeded, "请求已超时")
}

// 获取剩余时间
deadline, ok := ctx.Deadline()
if ok {
remaining := time.Until(deadline)
log.Printf("剩余时间: %v", remaining)

// 如果剩余时间不足,提前返回
if remaining < 100*time.Millisecond {
return nil, status.Error(codes.DeadlineExceeded, "剩余时间不足")
}
}

// 执行耗时任务时定期检查
for i := 0; i < 10; i++ {
// 检查上下文是否已取消
if ctx.Err() != nil {
return nil, status.Error(codes.Canceled, "请求被取消")
}

// 执行一部分任务
time.Sleep(100 * time.Millisecond)
}

return &pb.Response{}, nil
}

Deadline 传播

在微服务架构中,应该将 deadline 传播到下游服务:

func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// 直接传递上下文,deadline 会自动传播
inventoryResp, err := s.inventoryClient.CheckStock(ctx, &inventory.Request{...})
if err != nil {
return nil, err
}

// 同一个 ctx 会继续传播剩余时间
paymentResp, err := s.paymentClient.Process(ctx, &payment.Request{...})
if err != nil {
return nil, err
}

return &pb.OrderResponse{}, nil
}

RPC 终止和取消

在 gRPC 中,客户端和服务端都可以独立判断调用是否成功,他们的结论可能不同。这是一个重要的设计理念。

RPC 终止的特性

  1. 独立判断:客户端和服务端各自判断调用成功与否
  2. 可能不一致:服务端可能成功完成,但客户端因超时认为失败
  3. 提前终止:服务端可能在客户端发送完所有请求前就完成

客户端取消 RPC

func callWithCancel(client pb.GreeterClient) {
ctx, cancel := context.WithCancel(context.Background())

// 在另一个 goroutine 中取消
go func() {
time.Sleep(2 * time.Second)
cancel() // 手动取消
log.Println("请求已取消")
}()

resp, err := client.LongRunningTask(ctx, &pb.Request{})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.Canceled {
log.Println("请求被取消")
}
return
}

fmt.Printf("响应: %v\n", resp)
}

服务端检测取消

func (s *server) StreamData(req *pb.Request, stream pb.Service_StreamDataServer) error {
for i := 0; i < 100; i++ {
// 检查客户端是否已取消
if stream.Context().Err() == context.Canceled {
log.Println("客户端取消了请求")
return status.Error(codes.Canceled, "客户端取消")
}

// 检查流是否仍然活跃
if !stream.Context().Err() == nil {
data := generateData(i)
if err := stream.Send(data); err != nil {
return err
}
}

time.Sleep(100 * time.Millisecond)
}
return nil
}

取消的影响

重要提示:取消 RPC 会立即终止调用,但不会回滚已完成的操作。

func (s *server) CreateOrder(ctx context.Context, req *pb.OrderRequest) (*pb.Order, error) {
// 步骤1:创建订单记录
order, err := s.db.CreateOrder(req)
if err != nil {
return nil, err
}

// 如果在此处被取消,订单已创建但不会回滚

// 步骤2:扣减库存
err = s.inventory.Deduct(req.Items)
if err != nil {
// 需要手动回滚订单
s.db.CancelOrder(order.Id)
return nil, err
}

// 如果在此处被取消,订单已创建,库存已扣减

// 步骤3:发起支付
payment, err := s.payment.Create(order)
if err != nil {
// 需要手动回滚
s.inventory.Restore(req.Items)
s.db.CancelOrder(order.Id)
return nil, err
}

return order, nil
}

一元 RPC(Unary RPC)

一元 RPC 是最简单的 RPC 类型,客户端发送一个请求,服务端返回一个响应,类似于普通的函数调用。

Proto 定义

service UserService {
// 一元 RPC:获取用户信息
rpc GetUser (GetUserRequest) returns (User);
}

message GetUserRequest {
string user_id = 1;
}

message User {
string id = 1;
string name = 2;
string email = 3;
}

使用场景

  • 获取单个资源
  • 创建资源
  • 更新资源
  • 简单查询

代码示例

Go 服务端

func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 从数据库获取用户
user, err := s.db.GetUser(req.UserId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "用户不存在: %v", err)
}
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
}, nil
}

Go 客户端

// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// 调用 RPC
user, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: "123"})
if err != nil {
log.Fatalf("获取用户失败: %v", err)
}
fmt.Printf("用户: %s\n", user.Name)

服务端流 RPC(Server Streaming RPC)

服务端流 RPC 允许服务端返回一个消息流,客户端从流中读取直到没有更多消息。

Proto 定义

service OrderService {
// 服务端流:获取订单列表
rpc ListOrders (ListOrdersRequest) returns (stream Order);
}

message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
}

message Order {
string id = 1;
string product_name = 2;
double amount = 3;
string status = 4;
}

使用场景

  • 分页数据返回
  • 日志流传输
  • 实时数据推送
  • 大数据集传输

代码示例

Go 服务端

func (s *server) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
// 从数据库获取订单
orders, err := s.db.ListOrders(req.UserId, req.PageSize)
if err != nil {
return err
}

// 逐个发送订单
for _, order := range orders {
if err := stream.Send(&pb.Order{
Id: order.ID,
ProductName: order.ProductName,
Amount: order.Amount,
Status: order.Status,
}); err != nil {
return err
}
}
return nil
}

Go 客户端

// 创建流
stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user123",
PageSize: 10,
})
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 接收流数据
for {
order, err := stream.Recv()
if err == io.EOF {
break // 流结束
}
if err != nil {
log.Fatalf("接收订单失败: %v", err)
}
fmt.Printf("订单: %s, 金额: %.2f\n", order.ProductName, order.Amount)
}

客户端流 RPC(Client Streaming RPC)

客户端流 RPC 允许客户端发送一个消息流,服务端接收所有消息后返回单个响应。

Proto 定义

service FileService {
// 客户端流:上传文件
rpc UploadFile (stream FileChunk) returns (UploadResult);
}

message FileChunk {
string filename = 1;
bytes content = 2;
int64 offset = 3;
}

message UploadResult {
bool success = 1;
string message = 2;
string file_id = 3;
}

使用场景

  • 文件上传
  • 批量数据提交
  • 数据聚合处理

代码示例

Go 服务端

func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var filename string
var totalSize int64

// 接收所有数据块
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 所有数据接收完成
return stream.SendAndClose(&pb.UploadResult{
Success: true,
Message: "上传成功",
FileId: generateFileId(),
})
}
if err != nil {
return err
}

// 处理数据块
filename = chunk.Filename
totalSize += int64(len(chunk.Content))
// 写入文件...
}
}

Go 客户端

// 创建流
stream, err := client.UploadFile(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 分块发送文件
buffer := make([]byte, 1024*64) // 64KB 块
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("读取文件失败: %v", err)
}

if err := stream.Send(&pb.FileChunk{
Filename: "example.txt",
Content: buffer[:n],
}); err != nil {
log.Fatalf("发送数据失败: %v", err)
}
}

// 关闭流并接收响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("上传失败: %v", err)
}
fmt.Printf("上传结果: %s\n", result.Message)

双向流 RPC(Bidirectional Streaming RPC)

双向流 RPC 允许客户端和服务端独立地读写消息流,两个流相互独立。

Proto 定义

service ChatService {
// 双向流:实时聊天
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
string user_id = 1;
string content = 2;
int64 timestamp = 3;
}

使用场景

  • 实时聊天应用
  • 在线协作
  • 实时游戏
  • 双向数据同步

代码示例

Go 服务端

func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
// 接收消息
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

// 广播消息给其他用户
broadcast := &pb.ChatMessage{
UserId: msg.UserId,
Content: msg.Content,
Timestamp: time.Now().Unix(),
}

if err := stream.Send(broadcast); err != nil {
return err
}
}
}

Go 客户端

// 创建双向流
stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 发送消息的 goroutine
go func() {
for {
msg := &pb.ChatMessage{
UserId: "user123",
Content: "Hello!",
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(time.Second)
}
}()

// 接收消息
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
fmt.Printf("[%s]: %s\n", msg.UserId, msg.Content)
}

类型选择指南

RPC 类型客户端服务端典型场景
一元 RPC请求×1响应×1CRUD操作、简单查询
服务端流请求×1响应流分页查询、实时推送
客户端流请求流响应×1文件上传、批量提交
双向流请求流响应流聊天、游戏、协作

流式传输注意事项

1. 消息顺序

gRPC 保证单个流内的消息顺序:

// 服务端按顺序发送
stream.Send(msg1) // 先发送
stream.Send(msg2) // 后发送
// 客户端接收顺序也是 msg1 -> msg2

2. 流的生命周期

// 客户端流
stream, _ := client.SomeStreamingRPC(ctx) // 创建流
stream.Send(msg) // 发送消息
stream.CloseSend() // 关闭发送端(客户端流)
result, _ := stream.Recv() // 接收响应

// 服务端流
stream.Send(msg) // 发送消息
stream.Send(msg2)
return nil // 返回时自动关闭流

3. 错误处理

for {
msg, err := stream.Recv()
if err == io.EOF {
// 正常结束
break
}
if err != nil {
// 处理错误
statusErr, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: %v", statusErr.Message())
} else {
log.Printf("其他错误: %v", err)
}
break
}
// 处理消息
}

流控制(Flow Control)

流控制是 gRPC 中一个关键但常被忽视的机制。它确保消息接收方不会被快速发送方压垮,防止数据丢失、提高性能并增强可靠性。流控制仅适用于流式 RPC,对一元 RPC 不相关。

为什么需要流控制?

想象这样一个场景:服务端生成数据的速度远快于客户端处理数据的速度。如果没有流控制,数据会在客户端的缓冲区中不断堆积,最终导致:

  • 内存溢出:缓冲区耗尽,程序崩溃
  • 数据丢失:旧数据被覆盖或丢弃
  • 网络拥塞:不必要的重传和超时
  • 性能下降:系统资源被无效占用

gRPC 基于 HTTP/2 的流控制机制,自动处理这些问题,但理解其工作原理对于编写高性能流式应用至关重要。

gRPC 流控制的工作原理

核心机制

  1. 发送方写入消息:应用调用 stream.Send() 时,消息被传递给 gRPC 框架
  2. 框架判断是否可发送:如果接收方缓冲区已满,框架会阻塞发送调用
  3. 消息传输:当有可用空间时,消息通过网络发送
  4. 接收确认:接收方读取消息后,发送确认告知发送方有更多容量
  5. 继续发送:发送方收到确认后,可以继续发送等待中的消息

关键理解:Send 不等于已发送

很多开发者误以为调用 Send() 成功就意味着消息已经通过网络发出。实际上:

// ❌ 误解:Send 返回成功 = 消息已发送到网络
err := stream.Send(msg)
if err == nil {
log.Println("消息已发送") // 这只是说消息已被框架接收!
}

// ✅ 正确理解:Send 成功 = 消息已进入 gRPC 框架的缓冲区
err := stream.Send(msg)
if err == nil {
log.Println("消息已进入框架缓冲区,等待发送")
}

Send() 调用成功仅表示消息已被框架接收并缓存,框架会负责后续的发送细节。这意味着:

  • 消息可能还在内存缓冲区中
  • 流控制可能会延迟实际的网络发送
  • 如果连接断开,缓冲区中的消息可能丢失

服务端检测客户端状态

虽然 gRPC 框架自动处理流控制,但应用层需要检测客户端是否仍然活跃:

// ✅ 正确:定期检查上下文状态
func (s *server) StreamLargeData(req *pb.Request, stream pb.Service_StreamLargeDataServer) error {
for i := 0; i < 1000000; i++ {
// 检查客户端是否已断开或取消
if stream.Context().Err() != nil {
log.Println("客户端已断开或取消请求")
return stream.Context().Err()
}

data := generateData(i)
if err := stream.Send(data); err != nil {
// Send 失败可能是由于客户端处理不过来
return err
}
}
return nil
}

Java 中检测取消

@Override
public void streamData(Request req, StreamObserver<Response> responseObserver) {
for (int i = 0; i < 1000000; i++) {
// 检查客户端是否已取消
if (responseObserver.isCancelled()) {
return;
}

Response data = generateData(i);
responseObserver.onNext(data);
}
responseObserver.onCompleted();
}

避免死锁

流控制可能导致死锁,特别是在双向流场景中:

死锁场景:双方都在同步读取,但都试图大量写入。

// ❌ 危险:可能导致死锁(如果对方也在同步读取)
func (s *server) BidirectionalStream(stream pb.Service_BidirectionalStreamServer) error {
for {
// 同步读取
req, err := stream.Recv()
if err != nil {
return err
}

// 同步写入
if err := stream.Send(process(req)); err != nil {
return err
}
}
}

避免死锁的方法:使用独立的 goroutine 处理读写

func (s *server) BidirectionalStream(stream pb.Service_BidirectionalStreamServer) error {
// 使用通道解耦读写
recvChan := make(chan *pb.Request, 10)
errChan := make(chan error, 1)

// 接收 goroutine
go func() {
for {
req, err := stream.Recv()
if err != nil {
errChan <- err
return
}
recvChan <- req
}
}()

// 处理逻辑
for {
select {
case req := <-recvChan:
if err := stream.Send(process(req)); err != nil {
return err
}
case err := <-errChan:
return err
}
}
}

流控制配置

gRPC 允许通过配置调整流控制行为:

Go 服务端配置

s := grpc.NewServer(
grpc.MaxRecvMsgSize(16*1024*1024), // 最大接收消息大小
grpc.MaxSendMsgSize(16*1024*1024), // 最大发送消息大小
grpc.MaxConcurrentStreams(1000), // 最大并发流数
grpc.ReadBufferSize(32*1024), // 读缓冲区大小
grpc.WriteBufferSize(32*1024), // 写缓冲区大小
)

Go 客户端配置

conn, err := grpc.Dial(
target,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(16*1024*1024), // 最大接收消息大小
),
grpc.WithWriteBufferSize(32*1024), // 写缓冲区大小
grpc.WithReadBufferSize(32*1024), // 读缓冲区大小
)

流控制与 Keep-alive 的关系

流控制和 Keep-alive 是两个独立但相关的机制:

机制作用适用范围
流控制防止发送方压垮接收方仅流式 RPC
Keep-alive保持连接活跃,检测死连接所有 RPC 类型

对于长时间运行的流式 RPC,应该同时配置这两种机制以确保可靠性。

流式处理最佳实践

何时使用流式 RPC

流式 RPC 并不总是最佳选择。需要根据实际场景权衡:

适合使用流的场景

  • 大数据传输:文件上传/下载、数据库导出等
  • 实时数据推送:股票行情、日志流、事件通知
  • 双向通信:聊天应用、实时协作、游戏
  • 长时间操作:进度报告、中间结果返回

不适合使用流的场景

  • 简单的 CRUD 操作
  • 数据量小且固定的请求
  • 需要精确负载均衡的场景(流一旦建立无法负载均衡)
  • 需要重试的场景(流的重试更复杂)

流的超时和取消

流式 RPC 同样需要处理超时和取消:

func bidirectionalStreamWithTimeout(client pb.ChatServiceClient) {
// 设置总超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 发送 goroutine
sendCtx, sendCancel := context.WithCancel(context.Background())
go func() {
defer sendCancel()
for {
select {
case <-sendCtx.Done():
stream.CloseSend()
return
case msg := <-msgChan:
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
}
}
}()

// 接收循环
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("流超时")
} else if status.Code(err) == codes.Canceled {
log.Println("流被取消")
} else {
log.Printf("接收错误: %v", err)
}
break
}
// 处理消息
}
}

分块传输大数据

对于大数据,应该分块传输而不是一次性发送:

syntax = "proto3";

message FileChunk {
string file_id = 1; // 文件标识
int32 chunk_index = 2; // 块索引
int32 total_chunks = 3; // 总块数
bytes data = 4; // 数据块(建议 64KB - 1MB)
string checksum = 5; // 校验和(可选)
}

message UploadStatus {
bool success = 1;
int32 received_chunks = 2; // 已接收块数
string message = 3;
}
// 服务端实现
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var fileID string
var totalChunks int
receivedChunks := make(map[int][]byte)

for {
chunk, err := stream.Recv()
if err == io.EOF {
// 所有块接收完成
// 组装文件...
return stream.SendAndClose(&pb.UploadStatus{
Success: true,
ReceivedChunks: int32(len(receivedChunks)),
Message: "上传成功",
})
}
if err != nil {
return err
}

fileID = chunk.FileId
totalChunks = int(chunk.TotalChunks)
receivedChunks[int(chunk.ChunkIndex)] = chunk.Data

// 可选:定期发送进度更新
if len(receivedChunks)%10 == 0 {
// 可以通过另一个流发送进度
}
}
}

// 客户端实现
func uploadFile(client pb.FileServiceClient, filePath string, chunkSize int) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()

stat, _ := file.Stat()
totalChunks := int(stat.Size())/chunkSize + 1

stream, err := client.UploadFile(context.Background())
if err != nil {
return err
}

buffer := make([]byte, chunkSize)
fileID := uuid.New().String()

for i := 0; ; i++ {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return err
}

chunk := &pb.FileChunk{
FileId: fileID,
ChunkIndex: int32(i),
TotalChunks: int32(totalChunks),
Data: buffer[:n],
}

if err := stream.Send(chunk); err != nil {
return err
}
}

status, err := stream.CloseAndRecv()
if err != nil {
return err
}

fmt.Printf("上传完成: %s\n", status.Message)
return nil
}

流的错误恢复

流中断后如何恢复是一个重要问题:

// 带恢复机制的流式客户端
type ResilientStreamClient struct {
client pb.ServiceClient
maxRetry int
retryWait time.Duration
}

func (r *ResilientStreamClient) StreamWithRecovery(ctx context.Context, req *pb.Request) error {
var lastOffset int64 = 0

for attempt := 0; attempt < r.maxRetry; attempt++ {
if attempt > 0 {
time.Sleep(r.retryWait)
log.Printf("重试第 %d 次,从偏移 %d 继续", attempt+1, lastOffset)
}

// 带偏移量的请求
stream, err := r.client.StreamData(ctx, &pb.Request{
StartOffset: lastOffset,
})
if err != nil {
continue
}

for {
data, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// 检查是否可重试
if isRetryableError(err) {
break // 跳出内层循环,重试
}
return err
}

// 处理数据
if err := processData(data); err != nil {
return err
}

// 更新偏移量
lastOffset = data.Offset
}
}

return fmt.Errorf("重试次数耗尽")
}

func isRetryableError(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}

switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Aborted:
return true
default:
return false
}
}

流的监控和调试

// 带监控的流包装器
type MonitoredStream struct {
stream grpc.ClientStream
method string
sent int64
received int64
start time.Time
}

func (m *MonitoredStream) Send(msg interface{}) error {
err := m.stream.SendMsg(msg)
if err == nil {
m.sent++
}
return err
}

func (m *MonitoredStream) Recv(msg interface{}) error {
err := m.stream.RecvMsg(msg)
if err == nil {
m.received++
}
return err
}

func (m *MonitoredStream) Close() {
duration := time.Since(m.start)
log.Printf("[流统计] 方法: %s, 发送: %d, 接收: %d, 耗时: %v",
m.method, m.sent, m.received, duration)
}

小结

本章我们学习了:

  1. 一元 RPC:简单的请求-响应模式
  2. 服务端流:服务端返回消息流
  3. 客户端流:客户端发送消息流
  4. 双向流:双方独立的消息流
  5. 类型选择:根据场景选择合适的 RPC 类型
  6. 注意事项:消息顺序、生命周期、错误处理

下一章我们将学习如何实现 gRPC 服务端。