RPC 类型
gRPC 支持四种 RPC 类型,满足不同的通信场景需求。本章详细介绍每种类型的特点和使用方法。
概述
RPC 生命周期
理解 RPC 的生命周期对于正确使用 gRPC 非常重要。每种 RPC 类型都有其特定的生命周期流程。
一元 RPC 生命周期
一元 RPC 是最简单的调用模式,其完整生命周期如下:
-
客户端调用:客户端调用 stub 方法,gRPC 框架向服务端发送请求元数据、方法名和可选的截止时间(deadline)。
-
服务端接收通知:服务端收到 RPC 调用通知,可以立即发送初始元数据,或等待客户端请求消息。
-
请求处理:服务端收到客户端请求后,执行业务逻辑并生成响应。
-
响应返回:服务端返回响应(如果成功)、状态详情(状态码和可选消息)以及可选的尾部元数据。
-
客户端完成:如果状态码为 OK,客户端获得响应,调用完成。
// 一元 RPC 的完整生命周期示例
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 1. 可以发送初始元数据(在响应之前)
grpc.SendHeader(ctx, metadata.Pairs("x-response-time", time.Now().String()))
// 2. 检查上下文是否已取消或超时
if ctx.Err() == context.Canceled {
return nil, status.Error(codes.Canceled, "请求被取消")
}
// 3. 执行业务逻辑
user, err := s.userService.GetByID(req.UserId)
if err != nil {
// 4. 返回错误状态
return nil, status.Errorf(codes.NotFound, "用户不存在: %s", req.UserId)
}
// 5. 发送尾部元数据(可选)
grpc.SetTrailer(ctx, metadata.Pairs("x-request-id", req.RequestId))
// 6. 返回响应
return user, nil
}
流式 RPC 生命周期
流式 RPC 的生命周期更为复杂,因为客户端和服务端可以在不同时间点读写消息。
服务端流:
- 客户端发送请求
- 服务端收到请求后开始发送消息流
- 服务端发送完所有消息后,发送状态和尾部元数据
- 客户端收到所有消息后完成
客户端流:
- 客户端开始发送消息流
- 服务端接收消息(可以边接收边处理)
- 客户端发送完毕,等待响应
- 服务端处理完成后返回单个响应
双向流:
- 客户端调用方法,服务端收到客户端元数据
- 服务端可以发送初始元数据或等待客户端开始流式发送
- 客户端和服务端可以按任意顺序读写消息
- 两个流独立运行,但每个流内的消息顺序保持不变
元数据(Metadata)
元数据是关于 RPC 调用的附加信息,以键值对形式存在。它在认证、追踪、调试等场景中非常有用。
元数据的结构
- 键(Key):字符串,由 ASCII 字母、数字和特殊字符
-、_、.组成,不能以grpc-开头(保留给 gRPC 内部使用) - 值(Value):通常是字符串,也可以是二进制数据(键名需以
-bin结尾)
服务端接收元数据
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 从上下文获取元数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.DataLoss, "缺少元数据")
}
// 获取单个值
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
token := tokens[0]
// 获取二进制元数据
traceIds := md.Get("trace-id-bin")
// 获取所有值(一个键可能有多个值)
requestIds := md.Get("x-request-id")
// 使用元数据...
return &pb.User{}, nil
}
客户端发送元数据
func callWithMetadata(client pb.GreeterClient) {
// 方式一:使用 metadata.New
md := metadata.New(map[string]string{
"authorization": "Bearer my-token",
"x-request-id": "req-12345",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 方式二:追加到现有上下文
ctx = metadata.AppendToOutgoingContext(ctx,
"x-custom-header", "value1",
"x-custom-header", "value2", // 同一个键可以有多个值
)
// 调用服务
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"})
}
接收响应元数据
func receiveMetadata(client pb.GreeterClient) {
ctx := context.Background()
// 使用 WithCallOption 获取响应元数据
var header, trailer metadata.MD
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"},
grpc.Header(&header), // 获取响应头
grpc.Trailer(&trailer), // 获取尾部元数据
)
// 读取响应头
if contentType := header.Get("content-type"); len(contentType) > 0 {
fmt.Printf("Content-Type: %s\n", contentType[0])
}
// 读取尾部元数据
if requestId := trailer.Get("x-request-id"); len(requestId) > 0 {
fmt.Printf("Request ID: %s\n", requestId[0])
}
}
服务端发送元数据
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 发送初始元数据(必须在响应之前)
err := grpc.SendHeader(ctx, metadata.Pairs(
"x-response-time", time.Now().Format(time.RFC3339),
"x-server-version", "1.0.0",
))
if err != nil {
return nil, err
}
// 处理请求...
// 设置尾部元数据(在响应之后发送)
grpc.SetTrailer(ctx, metadata.Pairs(
"x-request-id", req.RequestId,
"x-processed-at", time.Now().String(),
))
return &pb.User{}, nil
}
Deadline 和 Timeout
Deadline 和 Timeout 是 gRPC 中控制请求时间的关键机制。它们确保客户端不会无限期等待响应。
Deadline vs Timeout
- Timeout:从调用开始计算的等待时间长度(如 5 秒)
- Deadline:一个固定的时间点,表示请求必须在此时间之前完成
// 使用 Timeout(持续时间)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 使用 Deadline(时间点)
deadline := time.Now().Add(5 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
客户端设置 Deadline
func callWithDeadline(client pb.GreeterClient) {
// 设置 3 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("请求超时")
} else {
log.Printf("其他错误: %v", err)
}
return
}
fmt.Printf("响应: %s\n", resp.Message)
}
服务端检查 Deadline
func (s *server) LongRunningTask(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// 检查是否已经超时
if ctx.Err() == context.DeadlineExceeded {
return nil, status.Error(codes.DeadlineExceeded, "请求已超时")
}
// 获取剩余时间
deadline, ok := ctx.Deadline()
if ok {
remaining := time.Until(deadline)
log.Printf("剩余时间: %v", remaining)
// 如果剩余时间不足,提前返回
if remaining < 100*time.Millisecond {
return nil, status.Error(codes.DeadlineExceeded, "剩余时间不足")
}
}
// 执行耗时任务时定期检查
for i := 0; i < 10; i++ {
// 检查上下文是否已取消
if ctx.Err() != nil {
return nil, status.Error(codes.Canceled, "请求被取消")
}
// 执行一部分任务
time.Sleep(100 * time.Millisecond)
}
return &pb.Response{}, nil
}
Deadline 传播
在微服务架构中,应该将 deadline 传播到下游服务:
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// 直接传递上下文,deadline 会自动传播
inventoryResp, err := s.inventoryClient.CheckStock(ctx, &inventory.Request{...})
if err != nil {
return nil, err
}
// 同一个 ctx 会继续传播剩余时间
paymentResp, err := s.paymentClient.Process(ctx, &payment.Request{...})
if err != nil {
return nil, err
}
return &pb.OrderResponse{}, nil
}
RPC 终止和取消
在 gRPC 中,客户端和服务端都可以独立判断调用是否成功,他们的结论可能不同。这是一个重要的设计理念。
RPC 终止的特性
- 独立判断:客户端和服务端各自判断调用成功与否
- 可能不一致:服务端可能成功完成,但客户端因超时认为失败
- 提前终止:服务端可能在客户端发送完所有请求前就完成
客户端取消 RPC
func callWithCancel(client pb.GreeterClient) {
ctx, cancel := context.WithCancel(context.Background())
// 在另一个 goroutine 中取消
go func() {
time.Sleep(2 * time.Second)
cancel() // 手动取消
log.Println("请求已取消")
}()
resp, err := client.LongRunningTask(ctx, &pb.Request{})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.Canceled {
log.Println("请求被取消")
}
return
}
fmt.Printf("响应: %v\n", resp)
}
服务端检测取消
func (s *server) StreamData(req *pb.Request, stream pb.Service_StreamDataServer) error {
for i := 0; i < 100; i++ {
// 检查客户端是否已取消
if stream.Context().Err() == context.Canceled {
log.Println("客户端取消了请求")
return status.Error(codes.Canceled, "客户端取消")
}
// 检查流是否仍然活跃
if !stream.Context().Err() == nil {
data := generateData(i)
if err := stream.Send(data); err != nil {
return err
}
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
取消的影响
重要提示:取消 RPC 会立即终止调用,但不会回滚已完成的操作。
func (s *server) CreateOrder(ctx context.Context, req *pb.OrderRequest) (*pb.Order, error) {
// 步骤1:创建订单记录
order, err := s.db.CreateOrder(req)
if err != nil {
return nil, err
}
// 如果在此处被取消,订单已创建但不会回滚
// 步骤2:扣减库存
err = s.inventory.Deduct(req.Items)
if err != nil {
// 需要手动回滚订单
s.db.CancelOrder(order.Id)
return nil, err
}
// 如果在此处被取消,订单已创建,库存已扣减
// 步骤3:发起支付
payment, err := s.payment.Create(order)
if err != nil {
// 需要手动回滚
s.inventory.Restore(req.Items)
s.db.CancelOrder(order.Id)
return nil, err
}
return order, nil
}
一元 RPC(Unary RPC)
一元 RPC 是最简单的 RPC 类型,客户端发送一个请求,服务端返回一个响应,类似于普通的函数调用。
Proto 定义
service UserService {
// 一元 RPC:获取用户信息
rpc GetUser (GetUserRequest) returns (User);
}
message GetUserRequest {
string user_id = 1;
}
message User {
string id = 1;
string name = 2;
string email = 3;
}
使用场景
- 获取单个资源
- 创建资源
- 更新资源
- 简单查询
代码示例
Go 服务端:
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 从数据库获取用户
user, err := s.db.GetUser(req.UserId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "用户不存在: %v", err)
}
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
}, nil
}
Go 客户端:
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 调用 RPC
user, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: "123"})
if err != nil {
log.Fatalf("获取用户失败: %v", err)
}
fmt.Printf("用户: %s\n", user.Name)
服务端流 RPC(Server Streaming RPC)
服务端流 RPC 允许服务端返回一个消息流,客户端从流中读取直到没有更多消息。
Proto 定义
service OrderService {
// 服务端流:获取订单列表
rpc ListOrders (ListOrdersRequest) returns (stream Order);
}
message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
}
message Order {
string id = 1;
string product_name = 2;
double amount = 3;
string status = 4;
}
使用场景
- 分页数据返回
- 日志流传输
- 实时数据推送
- 大数据集传输
代码示例
Go 服务端:
func (s *server) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
// 从数据库获取订单
orders, err := s.db.ListOrders(req.UserId, req.PageSize)
if err != nil {
return err
}
// 逐个发送订单
for _, order := range orders {
if err := stream.Send(&pb.Order{
Id: order.ID,
ProductName: order.ProductName,
Amount: order.Amount,
Status: order.Status,
}); err != nil {
return err
}
}
return nil
}
Go 客户端:
// 创建流
stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user123",
PageSize: 10,
})
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 接收流数据
for {
order, err := stream.Recv()
if err == io.EOF {
break // 流结束
}
if err != nil {
log.Fatalf("接收订单失败: %v", err)
}
fmt.Printf("订单: %s, 金额: %.2f\n", order.ProductName, order.Amount)
}
客户端流 RPC(Client Streaming RPC)
客户端流 RPC 允许客户端发送一个消息流,服务端接收所有消息后返回单个响应。
Proto 定义
service FileService {
// 客户端流:上传文件
rpc UploadFile (stream FileChunk) returns (UploadResult);
}
message FileChunk {
string filename = 1;
bytes content = 2;
int64 offset = 3;
}
message UploadResult {
bool success = 1;
string message = 2;
string file_id = 3;
}
使用场景
- 文件上传
- 批量数据提交
- 数据聚合处理
代码示例
Go 服务端:
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var filename string
var totalSize int64
// 接收所有数据块
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 所有数据接收完成
return stream.SendAndClose(&pb.UploadResult{
Success: true,
Message: "上传成功",
FileId: generateFileId(),
})
}
if err != nil {
return err
}
// 处理数据块
filename = chunk.Filename
totalSize += int64(len(chunk.Content))
// 写入文件...
}
}
Go 客户端:
// 创建流
stream, err := client.UploadFile(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 分块发送文件
buffer := make([]byte, 1024*64) // 64KB 块
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("读取文件失败: %v", err)
}
if err := stream.Send(&pb.FileChunk{
Filename: "example.txt",
Content: buffer[:n],
}); err != nil {
log.Fatalf("发送数据失败: %v", err)
}
}
// 关闭流并接收响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("上传失败: %v", err)
}
fmt.Printf("上传结果: %s\n", result.Message)
双向流 RPC(Bidirectional Streaming RPC)
双向流 RPC 允许客户端和服务端独立地读写消息流,两个流相互独立。
Proto 定义
service ChatService {
// 双向流:实时聊天
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string content = 2;
int64 timestamp = 3;
}
使用场景
- 实时聊天应用
- 在线协作
- 实时游戏
- 双向数据同步
代码示例
Go 服务端:
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
// 接收消息
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 广播消息给其他用户
broadcast := &pb.ChatMessage{
UserId: msg.UserId,
Content: msg.Content,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(broadcast); err != nil {
return err
}
}
}
Go 客户端:
// 创建双向流
stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 发送消息的 goroutine
go func() {
for {
msg := &pb.ChatMessage{
UserId: "user123",
Content: "Hello!",
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(time.Second)
}
}()
// 接收消息
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
fmt.Printf("[%s]: %s\n", msg.UserId, msg.Content)
}
类型选择指南
| RPC 类型 | 客户端 | 服务端 | 典型场景 |
|---|---|---|---|
| 一元 RPC | 请求×1 | 响应×1 | CRUD操作、简单查询 |
| 服务端流 | 请求×1 | 响应流 | 分页查询、实时推送 |
| 客户端流 | 请求流 | 响应×1 | 文件上传、批量提交 |
| 双向流 | 请求流 | 响应流 | 聊天、游戏、协作 |
流式传输注意事项
1. 消息顺序
gRPC 保证单个流内的消息顺序:
// 服务端按顺序发送
stream.Send(msg1) // 先发送
stream.Send(msg2) // 后发送
// 客户端接收顺序也是 msg1 -> msg2
2. 流的生命周期
// 客户端流
stream, _ := client.SomeStreamingRPC(ctx) // 创建流
stream.Send(msg) // 发送消息
stream.CloseSend() // 关闭发送端(客户端流)
result, _ := stream.Recv() // 接收响应
// 服务端流
stream.Send(msg) // 发送消息
stream.Send(msg2)
return nil // 返回时自动关闭流
3. 错误处理
for {
msg, err := stream.Recv()
if err == io.EOF {
// 正常结束
break
}
if err != nil {
// 处理错误
statusErr, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: %v", statusErr.Message())
} else {
log.Printf("其他错误: %v", err)
}
break
}
// 处理消息
}
流控制(Flow Control)
流控制是 gRPC 中一个关键但常被忽视的机制。它确保消息接收方不会被快速发送方压垮,防止数据丢失、提高性能并增强可靠性。流控制仅适用于流式 RPC,对一元 RPC 不相关。
为什么需要流控制?
想象这样一个场景:服务端生成数据的速度远快于客户端处理数据的速度。如果没有流控制,数据会在客户端的缓冲区中不断堆积,最终导致:
- 内存溢出:缓冲区耗尽,程序崩溃
- 数据丢失:旧数据被覆盖或丢弃
- 网络拥塞:不必要的重传和超时
- 性能下降:系统资源被无效占用
gRPC 基于 HTTP/2 的流控制机制,自动处理这些问题,但理解其工作原理对于编写高性能流式应用至关重要。
gRPC 流控制的工作原理
核心机制:
- 发送方写入消息:应用调用
stream.Send()时,消息被传递给 gRPC 框架 - 框架判断是否可发送:如果接收方缓冲区已满,框架会阻塞发送调用
- 消息传输:当有可用空间时,消息通过网络发送
- 接收确认:接收方读取消息后,发送确认告知发送方有更多容量
- 继续发送:发送方收到确认后,可以继续发送等待中的消息
关键理解:Send 不等于已发送
很多开发者误以为调用 Send() 成功就意味着消息已经通过网络发出。实际上:
// ❌ 误解:Send 返回成功 = 消息已发送到网络
err := stream.Send(msg)
if err == nil {
log.Println("消息已发送") // 这只是说消息已被框架接收!
}
// ✅ 正确理解:Send 成功 = 消息已进入 gRPC 框架的缓冲区
err := stream.Send(msg)
if err == nil {
log.Println("消息已进入框架缓冲区,等待发送")
}
Send() 调用成功仅表示消息已被框架接收并缓存,框架会负责后续的发送细节。这意味着:
- 消息可能还在内存缓冲区中
- 流控制可能会延迟实际的网络发送
- 如果连接断开,缓冲区中的消息可能丢失
服务端检测客户端状态
虽然 gRPC 框架自动处理流控制,但应用层需要检测客户端是否仍然活跃:
// ✅ 正确:定期检查上下文状态
func (s *server) StreamLargeData(req *pb.Request, stream pb.Service_StreamLargeDataServer) error {
for i := 0; i < 1000000; i++ {
// 检查客户端是否已断开或取消
if stream.Context().Err() != nil {
log.Println("客户端已断开或取消请求")
return stream.Context().Err()
}
data := generateData(i)
if err := stream.Send(data); err != nil {
// Send 失败可能是由于客户端处理不过来
return err
}
}
return nil
}
Java 中检测取消
@Override
public void streamData(Request req, StreamObserver<Response> responseObserver) {
for (int i = 0; i < 1000000; i++) {
// 检查客户端是否已取消
if (responseObserver.isCancelled()) {
return;
}
Response data = generateData(i);
responseObserver.onNext(data);
}
responseObserver.onCompleted();
}
避免死锁
流控制可能导致死锁,特别是在双向流场景中:
死锁场景:双方都在同步读取,但都试图大量写入。
// ❌ 危险:可能导致死锁(如果对方也在同步读取)
func (s *server) BidirectionalStream(stream pb.Service_BidirectionalStreamServer) error {
for {
// 同步读取
req, err := stream.Recv()
if err != nil {
return err
}
// 同步写入
if err := stream.Send(process(req)); err != nil {
return err
}
}
}
避免死锁的方法:使用独立的 goroutine 处理读写
func (s *server) BidirectionalStream(stream pb.Service_BidirectionalStreamServer) error {
// 使用通道解耦读写
recvChan := make(chan *pb.Request, 10)
errChan := make(chan error, 1)
// 接收 goroutine
go func() {
for {
req, err := stream.Recv()
if err != nil {
errChan <- err
return
}
recvChan <- req
}
}()
// 处理逻辑
for {
select {
case req := <-recvChan:
if err := stream.Send(process(req)); err != nil {
return err
}
case err := <-errChan:
return err
}
}
}
流控制配置
gRPC 允许通过配置调整流控制行为:
Go 服务端配置:
s := grpc.NewServer(
grpc.MaxRecvMsgSize(16*1024*1024), // 最大接收消息大小
grpc.MaxSendMsgSize(16*1024*1024), // 最大发送消息大小
grpc.MaxConcurrentStreams(1000), // 最大并发流数
grpc.ReadBufferSize(32*1024), // 读缓冲区大小
grpc.WriteBufferSize(32*1024), // 写缓冲区大小
)
Go 客户端配置:
conn, err := grpc.Dial(
target,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(16*1024*1024), // 最大接收消息大小
),
grpc.WithWriteBufferSize(32*1024), // 写缓冲区大小
grpc.WithReadBufferSize(32*1024), // 读缓冲区大小
)
流控制与 Keep-alive 的关系
流控制和 Keep-alive 是两个独立但相关的机制:
| 机制 | 作用 | 适用范围 |
|---|---|---|
| 流控制 | 防止发送方压垮接收方 | 仅流式 RPC |
| Keep-alive | 保持连接活跃,检测死连接 | 所有 RPC 类型 |
对于长时间运行的流式 RPC,应该同时配置这两种机制以确保可靠性。
流式处理最佳实践
何时使用流式 RPC
流式 RPC 并不总是最佳选择。需要根据实际场景权衡:
适合使用流的场景:
- 大数据传输:文件上传/下载、数据库导出等
- 实时数据推送:股票行情、日志流、事件通知
- 双向通信:聊天应用、实时协作、游戏
- 长时间操作:进度报告、中间结果返回
不适合使用流的场景:
- 简单的 CRUD 操作
- 数据量小且固定的请求
- 需要精确负载均衡的场景(流一旦建立无法负载均衡)
- 需要重试的场景(流的重试更复杂)
流的超时和取消
流式 RPC 同样需要处理超时和取消:
func bidirectionalStreamWithTimeout(client pb.ChatServiceClient) {
// 设置总超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 发送 goroutine
sendCtx, sendCancel := context.WithCancel(context.Background())
go func() {
defer sendCancel()
for {
select {
case <-sendCtx.Done():
stream.CloseSend()
return
case msg := <-msgChan:
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
}
}
}()
// 接收循环
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("流超时")
} else if status.Code(err) == codes.Canceled {
log.Println("流被取消")
} else {
log.Printf("接收错误: %v", err)
}
break
}
// 处理消息
}
}
分块传输大数据
对于大数据,应该分块传输而不是一次性发送:
syntax = "proto3";
message FileChunk {
string file_id = 1; // 文件标识
int32 chunk_index = 2; // 块索引
int32 total_chunks = 3; // 总块数
bytes data = 4; // 数据块(建议 64KB - 1MB)
string checksum = 5; // 校验和(可选)
}
message UploadStatus {
bool success = 1;
int32 received_chunks = 2; // 已接收块数
string message = 3;
}
// 服务端实现
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var fileID string
var totalChunks int
receivedChunks := make(map[int][]byte)
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 所有块接收完成
// 组装文件...
return stream.SendAndClose(&pb.UploadStatus{
Success: true,
ReceivedChunks: int32(len(receivedChunks)),
Message: "上传成功",
})
}
if err != nil {
return err
}
fileID = chunk.FileId
totalChunks = int(chunk.TotalChunks)
receivedChunks[int(chunk.ChunkIndex)] = chunk.Data
// 可选:定期发送进度更新
if len(receivedChunks)%10 == 0 {
// 可以通过另一个流发送进度
}
}
}
// 客户端实现
func uploadFile(client pb.FileServiceClient, filePath string, chunkSize int) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
stat, _ := file.Stat()
totalChunks := int(stat.Size())/chunkSize + 1
stream, err := client.UploadFile(context.Background())
if err != nil {
return err
}
buffer := make([]byte, chunkSize)
fileID := uuid.New().String()
for i := 0; ; i++ {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return err
}
chunk := &pb.FileChunk{
FileId: fileID,
ChunkIndex: int32(i),
TotalChunks: int32(totalChunks),
Data: buffer[:n],
}
if err := stream.Send(chunk); err != nil {
return err
}
}
status, err := stream.CloseAndRecv()
if err != nil {
return err
}
fmt.Printf("上传完成: %s\n", status.Message)
return nil
}
流的错误恢复
流中断后如何恢复是一个重要问题:
// 带恢复机制的流式客户端
type ResilientStreamClient struct {
client pb.ServiceClient
maxRetry int
retryWait time.Duration
}
func (r *ResilientStreamClient) StreamWithRecovery(ctx context.Context, req *pb.Request) error {
var lastOffset int64 = 0
for attempt := 0; attempt < r.maxRetry; attempt++ {
if attempt > 0 {
time.Sleep(r.retryWait)
log.Printf("重试第 %d 次,从偏移 %d 继续", attempt+1, lastOffset)
}
// 带偏移量的请求
stream, err := r.client.StreamData(ctx, &pb.Request{
StartOffset: lastOffset,
})
if err != nil {
continue
}
for {
data, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// 检查是否可重试
if isRetryableError(err) {
break // 跳出内层循环,重试
}
return err
}
// 处理数据
if err := processData(data); err != nil {
return err
}
// 更新偏移量
lastOffset = data.Offset
}
}
return fmt.Errorf("重试次数耗尽")
}
func isRetryableError(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Aborted:
return true
default:
return false
}
}
流的监控和调试
// 带监控的流包装器
type MonitoredStream struct {
stream grpc.ClientStream
method string
sent int64
received int64
start time.Time
}
func (m *MonitoredStream) Send(msg interface{}) error {
err := m.stream.SendMsg(msg)
if err == nil {
m.sent++
}
return err
}
func (m *MonitoredStream) Recv(msg interface{}) error {
err := m.stream.RecvMsg(msg)
if err == nil {
m.received++
}
return err
}
func (m *MonitoredStream) Close() {
duration := time.Since(m.start)
log.Printf("[流统计] 方法: %s, 发送: %d, 接收: %d, 耗时: %v",
m.method, m.sent, m.received, duration)
}
小结
本章我们学习了:
- 一元 RPC:简单的请求-响应模式
- 服务端流:服务端返回消息流
- 客户端流:客户端发送消息流
- 双向流:双方独立的消息流
- 类型选择:根据场景选择合适的 RPC 类型
- 注意事项:消息顺序、生命周期、错误处理
下一章我们将学习如何实现 gRPC 服务端。