RPC 类型
gRPC 支持四种 RPC 类型,满足不同的通信场景需求。本章详细介绍每种类型的特点和使用方法。
概述
一元 RPC(Unary RPC)
一元 RPC 是最简单的 RPC 类型,客户端发送一个请求,服务端返回一个响应,类似于普通的函数调用。
Proto 定义
service UserService {
// 一元 RPC:获取用户信息
rpc GetUser (GetUserRequest) returns (User);
}
message GetUserRequest {
string user_id = 1;
}
message User {
string id = 1;
string name = 2;
string email = 3;
}
使用场景
- 获取单个资源
- 创建资源
- 更新资源
- 简单查询
代码示例
Go 服务端:
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 从数据库获取用户
user, err := s.db.GetUser(req.UserId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "用户不存在: %v", err)
}
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
}, nil
}
Go 客户端:
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 调用 RPC
user, err := client.GetUser(ctx, &pb.GetUserRequest{UserId: "123"})
if err != nil {
log.Fatalf("获取用户失败: %v", err)
}
fmt.Printf("用户: %s\n", user.Name)
服务端流 RPC(Server Streaming RPC)
服务端流 RPC 允许服务端返回一个消息流,客户端从流中读取直到没有更多消息。
Proto 定义
service OrderService {
// 服务端流:获取订单列表
rpc ListOrders (ListOrdersRequest) returns (stream Order);
}
message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
}
message Order {
string id = 1;
string product_name = 2;
double amount = 3;
string status = 4;
}
使用场景
- 分页数据返回
- 日志流传输
- 实时数据推送
- 大数据集传输
代码示例
Go 服务端:
func (s *server) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
// 从数据库获取订单
orders, err := s.db.ListOrders(req.UserId, req.PageSize)
if err != nil {
return err
}
// 逐个发送订单
for _, order := range orders {
if err := stream.Send(&pb.Order{
Id: order.ID,
ProductName: order.ProductName,
Amount: order.Amount,
Status: order.Status,
}); err != nil {
return err
}
}
return nil
}
Go 客户端:
// 创建流
stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user123",
PageSize: 10,
})
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 接收流数据
for {
order, err := stream.Recv()
if err == io.EOF {
break // 流结束
}
if err != nil {
log.Fatalf("接收订单失败: %v", err)
}
fmt.Printf("订单: %s, 金额: %.2f\n", order.ProductName, order.Amount)
}
客户端流 RPC(Client Streaming RPC)
客户端流 RPC 允许客户端发送一个消息流,服务端接收所有消息后返回单个响应。
Proto 定义
service FileService {
// 客户端流:上传文件
rpc UploadFile (stream FileChunk) returns (UploadResult);
}
message FileChunk {
string filename = 1;
bytes content = 2;
int64 offset = 3;
}
message UploadResult {
bool success = 1;
string message = 2;
string file_id = 3;
}
使用场景
- 文件上传
- 批量数据提交
- 数据聚合处理
代码示例
Go 服务端:
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var filename string
var totalSize int64
// 接收所有数据块
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 所有数据接收完成
return stream.SendAndClose(&pb.UploadResult{
Success: true,
Message: "上传成功",
FileId: generateFileId(),
})
}
if err != nil {
return err
}
// 处理数据块
filename = chunk.Filename
totalSize += int64(len(chunk.Content))
// 写入文件...
}
}
Go 客户端:
// 创建流
stream, err := client.UploadFile(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 分块发送文件
buffer := make([]byte, 1024*64) // 64KB 块
for {
n, err := file.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("读取文件失败: %v", err)
}
if err := stream.Send(&pb.FileChunk{
Filename: "example.txt",
Content: buffer[:n],
}); err != nil {
log.Fatalf("发送数据失败: %v", err)
}
}
// 关闭流并接收响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("上传失败: %v", err)
}
fmt.Printf("上传结果: %s\n", result.Message)
双向流 RPC(Bidirectional Streaming RPC)
双向流 RPC 允许客户端和服务端独立地读写消息流,两个流相互独立。
Proto 定义
service ChatService {
// 双向流:实时聊天
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string content = 2;
int64 timestamp = 3;
}
使用场景
- 实时聊天应用
- 在线协作
- 实时游戏
- 双向数据同步
代码示例
Go 服务端:
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
// 接收消息
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 广播消息给其他用户
broadcast := &pb.ChatMessage{
UserId: msg.UserId,
Content: msg.Content,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(broadcast); err != nil {
return err
}
}
}
Go 客户端:
// 创建双向流
stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 发送消息的 goroutine
go func() {
for {
msg := &pb.ChatMessage{
UserId: "user123",
Content: "Hello!",
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(time.Second)
}
}()
// 接收消息
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
fmt.Printf("[%s]: %s\n", msg.UserId, msg.Content)
}
类型选择指南
| RPC 类型 | 客户端 | 服务端 | 典型场景 |
|---|---|---|---|
| 一元 RPC | 请求×1 | 响应×1 | CRUD操作、简单查询 |
| 服务端流 | 请求×1 | 响应流 | 分页查询、实时推送 |
| 客户端流 | 请求流 | 响应×1 | 文件上传、批量提交 |
| 双向流 | 请求流 | 响应流 | 聊天、游戏、协作 |
流式传输注意事项
1. 消息顺序
gRPC 保证单个流内的消息顺序:
// 服务端按顺序发送
stream.Send(msg1) // 先发送
stream.Send(msg2) // 后发送
// 客户端接收顺序也是 msg1 -> msg2
2. 流的生命周期
// 客户端流
stream, _ := client.SomeStreamingRPC(ctx) // 创建流
stream.Send(msg) // 发送消息
stream.CloseSend() // 关闭发送端(客户端流)
result, _ := stream.Recv() // 接收响应
// 服务端流
stream.Send(msg) // 发送消息
stream.Send(msg2)
return nil // 返回时自动关闭流
3. 错误处理
for {
msg, err := stream.Recv()
if err == io.EOF {
// 正常结束
break
}
if err != nil {
// 处理错误
statusErr, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: %v", statusErr.Message())
} else {
log.Printf("其他错误: %v", err)
}
break
}
// 处理消息
}
小结
本章我们学习了:
- 一元 RPC:简单的请求-响应模式
- 服务端流:服务端返回消息流
- 客户端流:客户端发送消息流
- 双向流:双方独立的消息流
- 类型选择:根据场景选择合适的 RPC 类型
- 注意事项:消息顺序、生命周期、错误处理
下一章我们将学习如何实现 gRPC 服务端。