跳到主要内容

RPC 类型

gRPC 支持四种 RPC 类型,满足不同的通信场景需求。本章详细介绍每种类型的特点和使用方法。

概述

一元 RPC(Unary RPC)

一元 RPC 是最简单的 RPC 类型,客户端发送一个请求,服务端返回一个响应,类似于普通的函数调用。

Proto 定义

service UserService {
// 一元 RPC:获取用户信息
rpc GetUser (GetUserRequest) returns (User);
}

message GetUserRequest {
string user_id = 1;
}

message User {
string id = 1;
string name = 2;
string email = 3;
}

使用场景

  • 获取单个资源
  • 创建资源
  • 更新资源
  • 简单查询

代码示例

Go 服务端

func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 从数据库获取用户
user, err := s.db.GetUser(req.UserId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "用户不存在: %v", err)
}
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
}, nil
}

Go 客户端

// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// 调用 RPC
user, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: "123"})
if err != nil {
log.Fatalf("获取用户失败: %v", err)
}
fmt.Printf("用户: %s\n", user.Name)

服务端流 RPC(Server Streaming RPC)

服务端流 RPC 允许服务端返回一个消息流,客户端从流中读取直到没有更多消息。

Proto 定义

service OrderService {
// 服务端流:获取订单列表
rpc ListOrders (ListOrdersRequest) returns (stream Order);
}

message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
}

message Order {
string id = 1;
string product_name = 2;
double amount = 3;
string status = 4;
}

使用场景

  • 分页数据返回
  • 日志流传输
  • 实时数据推送
  • 大数据集传输

代码示例

Go 服务端

func (s *server) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
// 从数据库获取订单
orders, err := s.db.ListOrders(req.UserId, req.PageSize)
if err != nil {
return err
}

// 逐个发送订单
for _, order := range orders {
if err := stream.Send(&pb.Order{
Id: order.ID,
ProductName: order.ProductName,
Amount: order.Amount,
Status: order.Status,
}); err != nil {
return err
}
}
return nil
}

Go 客户端

// 创建流
stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user123",
PageSize: 10,
})
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 接收流数据
for {
order, err := stream.Recv()
if err == io.EOF {
break // 流结束
}
if err != nil {
log.Fatalf("接收订单失败: %v", err)
}
fmt.Printf("订单: %s, 金额: %.2f\n", order.ProductName, order.Amount)
}

客户端流 RPC(Client Streaming RPC)

客户端流 RPC 允许客户端发送一个消息流,服务端接收所有消息后返回单个响应。

Proto 定义

service FileService {
// 客户端流:上传文件
rpc UploadFile (stream FileChunk) returns (UploadResult);
}

message FileChunk {
string filename = 1;
bytes content = 2;
int64 offset = 3;
}

message UploadResult {
bool success = 1;
string message = 2;
string file_id = 3;
}

使用场景

  • 文件上传
  • 批量数据提交
  • 数据聚合处理

代码示例

Go 服务端

func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var filename string
var totalSize int64

// 接收所有数据块
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 所有数据接收完成
return stream.SendAndClose(&pb.UploadResult{
Success: true,
Message: "上传成功",
FileId: generateFileId(),
})
}
if err != nil {
return err
}

// 处理数据块
filename = chunk.Filename
totalSize += int64(len(chunk.Content))
// 写入文件...
}
}

Go 客户端

// 创建流
stream, err := client.UploadFile(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 分块发送文件
buffer := make([]byte, 1024*64) // 64KB 块
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("读取文件失败: %v", err)
}

if err := stream.Send(&pb.FileChunk{
Filename: "example.txt",
Content: buffer[:n],
}); err != nil {
log.Fatalf("发送数据失败: %v", err)
}
}

// 关闭流并接收响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("上传失败: %v", err)
}
fmt.Printf("上传结果: %s\n", result.Message)

双向流 RPC(Bidirectional Streaming RPC)

双向流 RPC 允许客户端和服务端独立地读写消息流,两个流相互独立。

Proto 定义

service ChatService {
// 双向流:实时聊天
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
string user_id = 1;
string content = 2;
int64 timestamp = 3;
}

使用场景

  • 实时聊天应用
  • 在线协作
  • 实时游戏
  • 双向数据同步

代码示例

Go 服务端

func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
// 接收消息
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

// 广播消息给其他用户
broadcast := &pb.ChatMessage{
UserId: msg.UserId,
Content: msg.Content,
Timestamp: time.Now().Unix(),
}

if err := stream.Send(broadcast); err != nil {
return err
}
}
}

Go 客户端

// 创建双向流
stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}

// 发送消息的 goroutine
go func() {
for {
msg := &pb.ChatMessage{
UserId: "user123",
Content: "Hello!",
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(time.Second)
}
}()

// 接收消息
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
fmt.Printf("[%s]: %s\n", msg.UserId, msg.Content)
}

类型选择指南

RPC 类型客户端服务端典型场景
一元 RPC请求×1响应×1CRUD操作、简单查询
服务端流请求×1响应流分页查询、实时推送
客户端流请求流响应×1文件上传、批量提交
双向流请求流响应流聊天、游戏、协作

流式传输注意事项

1. 消息顺序

gRPC 保证单个流内的消息顺序:

// 服务端按顺序发送
stream.Send(msg1) // 先发送
stream.Send(msg2) // 后发送
// 客户端接收顺序也是 msg1 -> msg2

2. 流的生命周期

// 客户端流
stream, _ := client.SomeStreamingRPC(ctx) // 创建流
stream.Send(msg) // 发送消息
stream.CloseSend() // 关闭发送端(客户端流)
result, _ := stream.Recv() // 接收响应

// 服务端流
stream.Send(msg) // 发送消息
stream.Send(msg2)
return nil // 返回时自动关闭流

3. 错误处理

for {
msg, err := stream.Recv()
if err == io.EOF {
// 正常结束
break
}
if err != nil {
// 处理错误
statusErr, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: %v", statusErr.Message())
} else {
log.Printf("其他错误: %v", err)
}
break
}
// 处理消息
}

小结

本章我们学习了:

  1. 一元 RPC:简单的请求-响应模式
  2. 服务端流:服务端返回消息流
  3. 客户端流:客户端发送消息流
  4. 双向流:双方独立的消息流
  5. 类型选择:根据场景选择合适的 RPC 类型
  6. 注意事项:消息顺序、生命周期、错误处理

下一章我们将学习如何实现 gRPC 服务端。