跳到主要内容

实战案例:电商订单服务

本章通过一个完整的电商订单服务案例,演示如何使用 gRPC 构建微服务。案例涵盖服务定义、服务端实现、客户端调用、错误处理、认证授权等完整流程。

项目概述

业务场景

构建一个电商订单管理系统,提供以下功能:

  • 创建订单
  • 查询订单
  • 列出用户订单
  • 更新订单状态
  • 订单流式追踪

项目结构

order-service/
├── proto/
│ ├── order.proto # 订单服务定义
│ └── product.proto # 产品服务定义
├── server/
│ ├── main.go # 服务入口
│ ├── service.go # 业务逻辑
│ ├── interceptor.go # 拦截器
│ └── repository.go # 数据访问
├── client/
│ └── main.go # 客户端示例
├── generated/
│ ├── order.pb.go
│ ├── order_grpc.pb.go
│ ├── product.pb.go
│ └── product_grpc.pb.go
└── Makefile

Proto 定义

订单服务

// proto/order.proto
syntax = "proto3";

package ecommerce;

option go_package = "./ecommerce";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 订单服务
service OrderService {
// 创建订单
rpc CreateOrder(CreateOrderRequest) returns (Order);

// 获取订单
rpc GetOrder(GetOrderRequest) returns (Order);

// 列出用户订单
rpc ListOrders(ListOrdersRequest) returns (stream Order);

// 更新订单状态
rpc UpdateOrderStatus(UpdateOrderStatusRequest) returns (Order);

// 取消订单
rpc CancelOrder(CancelOrderRequest) returns (google.protobuf.Empty);

// 订单状态流(实时推送订单状态变化)
rpc StreamOrderEvents(StreamOrderEventsRequest) returns (stream OrderEvent);
}

// 订单
message Order {
string id = 1;
string user_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
OrderStatus status = 5;
string shipping_address = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}

// 订单项
message OrderItem {
string product_id = 1;
string product_name = 2;
int32 quantity = 3;
double unit_price = 4;
double subtotal = 5;
}

// 订单状态
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0;
ORDER_STATUS_PENDING = 1; // 待支付
ORDER_STATUS_PAID = 2; // 已支付
ORDER_STATUS_SHIPPED = 3; // 已发货
ORDER_STATUS_DELIVERED = 4; // 已送达
ORDER_STATUS_CANCELLED = 5; // 已取消
}

// 创建订单请求
message CreateOrderRequest {
string user_id = 1;
repeated OrderItemInput items = 2;
string shipping_address = 3;
}

// 订单项输入
message OrderItemInput {
string product_id = 1;
int32 quantity = 2;
}

// 获取订单请求
message GetOrderRequest {
string order_id = 1;
}

// 列出订单请求
message ListOrdersRequest {
string user_id = 1;
int32 page_size = 2;
string page_token = 3;
}

// 更新订单状态请求
message UpdateOrderStatusRequest {
string order_id = 1;
OrderStatus new_status = 2;
}

// 取消订单请求
message CancelOrderRequest {
string order_id = 1;
string reason = 2;
}

// 订单事件
message OrderEvent {
string order_id = 1;
OrderStatus old_status = 2;
OrderStatus new_status = 3;
google.protobuf.Timestamp event_time = 4;
string message = 5;
}

// 订单事件流请求
message StreamOrderEventsRequest {
string user_id = 1;
}

产品服务

// proto/product.proto
syntax = "proto3";

package ecommerce;

option go_package = "./ecommerce";

// 产品服务(简化版)
service ProductService {
rpc GetProduct(GetProductRequest) returns (Product);
rpc ListProducts(ListProductsRequest) returns (stream Product);
}

message Product {
string id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
}

message GetProductRequest {
string product_id = 1;
}

message ListProductsRequest {
int32 page_size = 1;
}

服务端实现

主入口

// server/main.go
package main

import (
"context"
"log"
"net"
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

pb "order-service/generated/ecommerce"
)

func main() {
// 创建监听器
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}

// 创建服务器(带拦截器)
s := grpc.NewServer(
grpc.UnaryInterceptor(chainUnaryInterceptors(
loggingInterceptor,
authInterceptor,
recoveryInterceptor,
)),
grpc.StreamInterceptor(chainStreamInterceptors(
loggingStreamInterceptor,
authStreamInterceptor,
)),
)

// 注册服务
orderService := NewOrderService(NewMemoryRepository())
pb.RegisterOrderServiceServer(s, orderService)

// 注册健康检查
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)
healthServer.SetServingStatus("ecommerce.OrderService", grpc_health_v1.HealthCheckResponse_SERVING)

// 启用反射(开发环境)
reflection.Register(s)

// 启动服务器
go func() {
log.Printf("订单服务启动,监听端口 :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}()

// 优雅关闭
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

log.Println("正在关闭服务器...")
healthServer.SetServingStatus("ecommerce.OrderService", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
s.GracefulStop()
log.Println("服务器已关闭")
}

服务实现

// server/service.go
package main

import (
"context"
"errors"
"sync"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

pb "order-service/generated/ecommerce"
)

var (
ErrOrderNotFound = errors.New("订单不存在")
ErrInvalidStatus = errors.New("无效的订单状态")
ErrOrderAlreadyPaid = errors.New("订单已支付")
ErrOrderCancelled = errors.New("订单已取消")
ErrPermissionDenied = errors.New("无权操作此订单")
)

// OrderService 订单服务实现
type OrderService struct {
pb.UnimplementedOrderServiceServer
repo Repository
eventSubs *EventSubscribers
}

// NewOrderService 创建订单服务
func NewOrderService(repo Repository) *OrderService {
return &OrderService{
repo: repo,
eventSubs: NewEventSubscribers(),
}
}

// CreateOrder 创建订单
func (s *OrderService) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.Order, error) {
// 验证请求
if req.UserId == "" {
return nil, status.Error(codes.InvalidArgument, "用户ID不能为空")
}
if len(req.Items) == 0 {
return nil, status.Error(codes.InvalidArgument, "订单项不能为空")
}

// 获取用户ID(从上下文获取,由认证拦截器注入)
userID := ctx.Value("user_id").(string)
if userID != req.UserId {
return nil, status.Error(codes.PermissionDenied, "无权为其他用户创建订单")
}

// 构建订单项(实际项目中需要调用产品服务获取价格)
items := make([]*pb.OrderItem, 0, len(req.Items))
var totalAmount float64

for _, input := range req.Items {
// 这里应该调用产品服务获取产品信息
// 简化示例:假设价格为 100
unitPrice := 100.0
subtotal := unitPrice * float64(input.Quantity)

items = append(items, &pb.OrderItem{
ProductId: input.ProductId,
ProductName: "产品-" + input.ProductId,
Quantity: input.Quantity,
UnitPrice: unitPrice,
Subtotal: subtotal,
})
totalAmount += subtotal
}

// 创建订单
order := &pb.Order{
Id: generateOrderID(),
UserId: req.UserId,
Items: items,
TotalAmount: totalAmount,
Status: pb.OrderStatus_ORDER_STATUS_PENDING,
ShippingAddress: req.ShippingAddress,
CreatedAt: timestamppb.Now(),
UpdatedAt: timestamppb.Now(),
}

// 保存订单
if err := s.repo.CreateOrder(ctx, order); err != nil {
return nil, status.Errorf(codes.Internal, "创建订单失败: %v", err)
}

// 发布订单事件
s.publishEvent(&pb.OrderEvent{
OrderId: order.Id,
NewStatus: order.Status,
EventTime: timestamppb.Now(),
Message: "订单创建成功",
})

return order, nil
}

// GetOrder 获取订单
func (s *OrderService) GetOrder(ctx context.Context, req *pb.GetOrderRequest) (*pb.Order, error) {
if req.OrderId == "" {
return nil, status.Error(codes.InvalidArgument, "订单ID不能为空")
}

order, err := s.repo.GetOrder(ctx, req.OrderId)
if err != nil {
if errors.Is(err, ErrOrderNotFound) {
return nil, status.Errorf(codes.NotFound, "订单不存在: %s", req.OrderId)
}
return nil, status.Errorf(codes.Internal, "查询订单失败: %v", err)
}

// 权限检查
if err := s.checkPermission(ctx, order.UserId); err != nil {
return nil, err
}

return order, nil
}

// ListOrders 列出用户订单
func (s *OrderService) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
ctx := stream.Context()

if req.UserId == "" {
return status.Error(codes.InvalidArgument, "用户ID不能为空")
}

// 权限检查
if err := s.checkPermission(ctx, req.UserId); err != nil {
return err
}

orders, err := s.repo.ListOrders(ctx, req.UserId, int(req.PageSize))
if err != nil {
return status.Errorf(codes.Internal, "查询订单失败: %v", err)
}

// 流式返回
for _, order := range orders {
if err := stream.Send(order); err != nil {
return err
}
}

return nil
}

// UpdateOrderStatus 更新订单状态
func (s *OrderService) UpdateOrderStatus(ctx context.Context, req *pb.UpdateOrderStatusRequest) (*pb.Order, error) {
if req.OrderId == "" {
return nil, status.Error(codes.InvalidArgument, "订单ID不能为空")
}

order, err := s.repo.GetOrder(ctx, req.OrderId)
if err != nil {
if errors.Is(err, ErrOrderNotFound) {
return nil, status.Errorf(codes.NotFound, "订单不存在: %s", req.OrderId)
}
return nil, status.Errorf(codes.Internal, "查询订单失败: %v", err)
}

// 检查状态转换是否合法
if !isValidStatusTransition(order.Status, req.NewStatus) {
return nil, status.Errorf(codes.FailedPrecondition,
"不能从 %s 状态转换到 %s", order.Status, req.NewStatus)
}

oldStatus := order.Status
order.Status = req.NewStatus
order.UpdatedAt = timestamppb.Now()

if err := s.repo.UpdateOrder(ctx, order); err != nil {
return nil, status.Errorf(codes.Internal, "更新订单失败: %v", err)
}

// 发布订单事件
s.publishEvent(&pb.OrderEvent{
OrderId: order.Id,
OldStatus: oldStatus,
NewStatus: order.Status,
EventTime: timestamppb.Now(),
Message: "订单状态更新",
})

return order, nil
}

// CancelOrder 取消订单
func (s *OrderService) CancelOrder(ctx context.Context, req *pb.CancelOrderRequest) (*pb.Empty, error) {
if req.OrderId == "" {
return nil, status.Error(codes.InvalidArgument, "订单ID不能为空")
}

order, err := s.repo.GetOrder(ctx, req.OrderId)
if err != nil {
if errors.Is(err, ErrOrderNotFound) {
return nil, status.Errorf(codes.NotFound, "订单不存在: %s", req.OrderId)
}
return nil, status.Errorf(codes.Internal, "查询订单失败: %v", err)
}

// 权限检查
if err := s.checkPermission(ctx, order.UserId); err != nil {
return nil, err
}

// 检查是否可以取消
if order.Status == pb.OrderStatus_ORDER_STATUS_CANCELLED {
return nil, status.Error(codes.FailedPrecondition, "订单已取消")
}
if order.Status == pb.OrderStatus_ORDER_STATUS_DELIVERED {
return nil, status.Error(codes.FailedPrecondition, "已送达订单无法取消")
}

oldStatus := order.Status
order.Status = pb.OrderStatus_ORDER_STATUS_CANCELLED
order.UpdatedAt = timestamppb.Now()

if err := s.repo.UpdateOrder(ctx, order); err != nil {
return nil, status.Errorf(codes.Internal, "取消订单失败: %v", err)
}

// 发布订单事件
s.publishEvent(&pb.OrderEvent{
OrderId: order.Id,
OldStatus: oldStatus,
NewStatus: order.Status,
EventTime: timestamppb.Now(),
Message: "订单已取消: " + req.Reason,
})

return &pb.Empty{}, nil
}

// StreamOrderEvents 订单事件流
func (s *OrderService) StreamOrderEvents(req *pb.StreamOrderEventsRequest, stream pb.OrderService_StreamOrderEventsServer) error {
ctx := stream.Context()

// 权限检查
if err := s.checkPermission(ctx, req.UserId); err != nil {
return err
}

// 订阅事件
eventCh := s.eventSubs.Subscribe(req.UserId)
defer s.eventSubs.Unsubscribe(req.UserId, eventCh)

for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-eventCh:
if err := stream.Send(event); err != nil {
return err
}
}
}
}

// 辅助方法
func (s *OrderService) checkPermission(ctx context.Context, orderUserID string) error {
userID := ctx.Value("user_id")
if userID == nil {
return status.Error(codes.Unauthenticated, "未认证")
}

role := ctx.Value("role")
if role == "admin" {
return nil // 管理员可以操作所有订单
}

if userID.(string) != orderUserID {
return status.Error(codes.PermissionDenied, "无权操作此订单")
}

return nil
}

func (s *OrderService) publishEvent(event *pb.OrderEvent) {
s.eventSubs.Publish(event.OrderId, event)
}

func isValidStatusTransition(from, to pb.OrderStatus) bool {
// 定义合法的状态转换
validTransitions := map[pb.OrderStatus][]pb.OrderStatus{
pb.OrderStatus_ORDER_STATUS_PENDING: {
pb.OrderStatus_ORDER_STATUS_PAID,
pb.OrderStatus_ORDER_STATUS_CANCELLED,
},
pb.OrderStatus_ORDER_STATUS_PAID: {
pb.OrderStatus_ORDER_STATUS_SHIPPED,
pb.OrderStatus_ORDER_STATUS_CANCELLED,
},
pb.OrderStatus_ORDER_STATUS_SHIPPED: {
pb.OrderStatus_ORDER_STATUS_DELIVERED,
},
}

allowed, exists := validTransitions[from]
if !exists {
return false
}

for _, s := range allowed {
if s == to {
return true
}
}

return false
}

func generateOrderID() string {
return "ORD-" + time.Now().Format("20060102150405")
}

// EventSubscribers 事件订阅管理
type EventSubscribers struct {
mu sync.RWMutex
subs map[string][]chan *pb.OrderEvent
}

func NewEventSubscribers() *EventSubscribers {
return &EventSubscribers{
subs: make(map[string][]chan *pb.OrderEvent),
}
}

func (e *EventSubscribers) Subscribe(userID string) chan *pb.OrderEvent {
e.mu.Lock()
defer e.mu.Unlock()

ch := make(chan *pb.OrderEvent, 10)
e.subs[userID] = append(e.subs[userID], ch)
return ch
}

func (e *EventSubscribers) Unsubscribe(userID string, ch chan *pb.OrderEvent) {
e.mu.Lock()
defer e.mu.Unlock()

subs := e.subs[userID]
for i, sub := range subs {
if sub == ch {
e.subs[userID] = append(subs[:i], subs[i+1:]...)
close(ch)
break
}
}
}

func (e *EventSubscribers) Publish(orderID string, event *pb.OrderEvent) {
e.mu.RLock()
defer e.mu.RUnlock()

for _, ch := range e.subs[orderID] {
select {
case ch <- event:
default:
// 缓冲区满,跳过
}
}
}

拦截器

// server/interceptor.go
package main

import (
"context"
"log"
"runtime/debug"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()

// 调用前日志
log.Printf("[请求] 方法: %s", info.FullMethod)

// 调用处理方法
resp, err := handler(ctx, req)

// 调用后日志
duration := time.Since(start)
code := codes.OK
if err != nil {
code = status.Code(err)
}
log.Printf("[响应] 方法: %s, 状态: %s, 耗时: %v", info.FullMethod, code, duration)

return resp, err
}

// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过健康检查和反射
if isSystemMethod(info.FullMethod) {
return handler(ctx, req)
}

// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}

tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}

// 验证 token(简化示例)
userID, role, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的认证信息")
}

// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", userID)
ctx = context.WithValue(ctx, "role", role)

return handler(ctx, req)
}

// 恢复拦截器
func recoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC] 方法: %s, 错误: %v\n%s", info.FullMethod, r, debug.Stack())
err = status.Error(codes.Internal, "内部服务错误")
}
}()
return handler(ctx, req)
}

// 拦截器链
func chainUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
buildChain := func(current grpc.UnaryServerInterceptor, next grpc.UnaryHandler) grpc.UnaryHandler {
return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return current(currentCtx, currentReq, info, next)
}
}

chain := handler
for i := len(interceptors) - 1; i >= 0; i-- {
chain = buildChain(interceptors[i], chain)
}

return chain(ctx, req)
}
}

func chainStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
// 类似于一元拦截器的实现
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
buildChain := func(current grpc.StreamServerInterceptor, next grpc.StreamHandler) grpc.StreamHandler {
return func(currentSrv interface{}, currentStream grpc.ServerStream) error {
return current(currentSrv, currentStream, info, next)
}
}

chain := handler
for i := len(interceptors) - 1; i >= 0; i-- {
chain = buildChain(interceptors[i], chain)
}

return chain(srv, ss)
}
}

// 流拦截器
func loggingStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
log.Printf("[流请求] 方法: %s", info.FullMethod)

err := handler(srv, ss)

log.Printf("[流响应] 方法: %s, 耗时: %v, 错误: %v", info.FullMethod, time.Since(start), err)
return err
}

func authStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if isSystemMethod(info.FullMethod) {
return handler(srv, ss)
}

ctx := ss.Context()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Error(codes.Unauthenticated, "缺少元数据")
}

tokens := md.Get("authorization")
if len(tokens) == 0 {
return status.Error(codes.Unauthenticated, "缺少认证信息")
}

userID, role, err := validateToken(tokens[0])
if err != nil {
return status.Error(codes.Unauthenticated, "无效的认证信息")
}

// 注意:流拦截器中修改上下文需要包装 ServerStream
wrapped := &wrappedStream{
ServerStream: ss,
ctx: context.WithValue(context.WithValue(ctx, "user_id", userID), "role", role),
}

return handler(srv, wrapped)
}

type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
return w.ctx
}

// 辅助函数
func isSystemMethod(method string) bool {
return method == "/grpc.health.v1.Health/Check" ||
method == "/grpc.health.v1.Health/Watch" ||
method == "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo"
}

func validateToken(token string) (userID, role string, err error) {
// 简化示例:实际应使用 JWT 或其他认证方式
if token == "Bearer test-token" {
return "user-123", "user", nil
}
if token == "Bearer admin-token" {
return "admin-001", "admin", nil
}
return "", "", status.Error(codes.Unauthenticated, "无效的 token")
}

数据存储

// server/repository.go
package main

import (
"context"
"sync"

pb "order-service/generated/ecommerce"
)

// Repository 数据访问接口
type Repository interface {
CreateOrder(ctx context.Context, order *pb.Order) error
GetOrder(ctx context.Context, orderID string) (*pb.Order, error)
ListOrders(ctx context.Context, userID string, pageSize int) ([]*pb.Order, error)
UpdateOrder(ctx context.Context, order *pb.Order) error
}

// MemoryRepository 内存存储(示例用)
type MemoryRepository struct {
mu sync.RWMutex
orders map[string]*pb.Order
byUser map[string][]string // userID -> orderIDs
}

func NewMemoryRepository() *MemoryRepository {
return &MemoryRepository{
orders: make(map[string]*pb.Order),
byUser: make(map[string][]string),
}
}

func (r *MemoryRepository) CreateOrder(ctx context.Context, order *pb.Order) error {
r.mu.Lock()
defer r.mu.Unlock()

r.orders[order.Id] = order
r.byUser[order.UserId] = append(r.byUser[order.UserId], order.Id)

return nil
}

func (r *MemoryRepository) GetOrder(ctx context.Context, orderID string) (*pb.Order, error) {
r.mu.RLock()
defer r.mu.RUnlock()

order, exists := r.orders[orderID]
if !exists {
return nil, ErrOrderNotFound
}

return order, nil
}

func (r *MemoryRepository) ListOrders(ctx context.Context, userID string, pageSize int) ([]*pb.Order, error) {
r.mu.RLock()
defer r.mu.RUnlock()

orderIDs, exists := r.byUser[userID]
if !exists {
return []*pb.Order{}, nil
}

orders := make([]*pb.Order, 0, len(orderIDs))
for _, id := range orderIDs {
if order, exists := r.orders[id]; exists {
orders = append(orders, order)
if pageSize > 0 && len(orders) >= pageSize {
break
}
}
}

return orders, nil
}

func (r *MemoryRepository) UpdateOrder(ctx context.Context, order *pb.Order) error {
r.mu.Lock()
defer r.mu.Unlock()

if _, exists := r.orders[order.Id]; !exists {
return ErrOrderNotFound
}

r.orders[order.Id] = order
return nil
}

客户端调用

// client/main.go
package main

import (
"context"
"io"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

pb "order-service/generated/ecommerce"
)

func main() {
// 创建连接
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 创建客户端
client := pb.NewOrderServiceClient(conn)

// 创建带认证的上下文
ctx := metadata.AppendToOutgoingContext(
context.Background(),
"authorization", "Bearer test-token",
)

// 测试各种操作
testCreateOrder(ctx, client)
testGetOrder(ctx, client)
testListOrders(ctx, client)
testUpdateOrderStatus(ctx, client)
testStreamOrderEvents(ctx, client)
}

func testCreateOrder(ctx context.Context, client pb.OrderServiceClient) {
log.Println("\n=== 创建订单 ===")

resp, err := client.CreateOrder(ctx, &pb.CreateOrderRequest{
UserId: "user-123",
Items: []*pb.OrderItemInput{
{ProductId: "prod-001", Quantity: 2},
{ProductId: "prod-002", Quantity: 1},
},
ShippingAddress: "北京市朝阳区xxx街道",
})

if err != nil {
log.Printf("创建订单失败: %v", err)
return
}

log.Printf("订单创建成功: ID=%s, 金额=%.2f", resp.Id, resp.TotalAmount)
}

func testGetOrder(ctx context.Context, client pb.OrderServiceClient) {
log.Println("\n=== 获取订单 ===")

resp, err := client.GetOrder(ctx, &pb.GetOrderRequest{
OrderId: "ORD-20240101120000",
})

if err != nil {
log.Printf("获取订单失败: %v", err)
return
}

log.Printf("订单信息: %+v", resp)
}

func testListOrders(ctx context.Context, client pb.OrderServiceClient) {
log.Println("\n=== 列出订单 ===")

stream, err := client.ListOrders(ctx, &pb.ListOrdersRequest{
UserId: "user-123",
PageSize: 10,
})
if err != nil {
log.Printf("调用失败: %v", err)
return
}

for {
order, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("订单: ID=%s, 状态=%s", order.Id, order.Status)
}
}

func testUpdateOrderStatus(ctx context.Context, client pb.OrderServiceClient) {
log.Println("\n=== 更新订单状态 ===")

resp, err := client.UpdateOrderStatus(ctx, &pb.UpdateOrderStatusRequest{
OrderId: "ORD-20240101120000",
NewStatus: pb.OrderStatus_ORDER_STATUS_PAID,
})

if err != nil {
log.Printf("更新失败: %v", err)
return
}

log.Printf("更新成功: 状态=%s", resp.Status)
}

func testStreamOrderEvents(ctx context.Context, client pb.OrderServiceClient) {
log.Println("\n=== 订单事件流 ===")

stream, err := client.StreamOrderEvents(ctx, &pb.StreamOrderEventsRequest{
UserId: "user-123",
})
if err != nil {
log.Printf("订阅失败: %v", err)
return
}

// 设置超时
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

for {
select {
case <-ctx.Done():
log.Println("超时退出")
return
default:
event, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("接收事件失败: %v", err)
return
}
log.Printf("事件: 订单=%s, 状态 %s -> %s",
event.OrderId, event.OldStatus, event.NewStatus)
}
}
}

小结

本章通过电商订单服务的完整案例,演示了:

  1. 服务设计:如何设计 Proto 文件,包括消息类型、服务定义、枚举等
  2. 服务端实现:完整的业务逻辑、错误处理、权限控制
  3. 拦截器:日志、认证、恢复拦截器的实现
  4. 流式处理:服务端流和双向流的使用
  5. 客户端调用:各种 RPC 类型的客户端调用方式

这个案例涵盖了 gRPC 开发的主要场景,可以作为实际项目的参考模板。

[!TIP] 完整的生产级服务还需要考虑:数据库持久化、分布式事务、服务发现、熔断降级、链路追踪等。这些内容可以参考相关的微服务架构教程。