服务端开发
本章介绍如何使用 Go 语言实现 gRPC 服务端,包括服务注册、拦截器、错误处理等核心内容。
服务端架构
基本服务实现
定义 Proto
// hello.proto
syntax = "proto3";
package hello;
option go_package = "./hello";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
生成代码
protoc --go_out=. --go-grpc_out=. hello.proto
实现服务
// server/main.go
package main
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "myproject/proto/hello"
)
// 定义服务结构体
type server struct {
pb.UnimplementedGreeterServer // 必须嵌入
}
// 实现 SayHello 方法
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("收到请求: %s", req.Name)
return &pb.HelloReply{
Message: fmt.Sprintf("你好, %s!", req.Name),
}, nil
}
func main() {
// 创建监听器
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
// 创建 gRPC 服务器
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
log.Println("服务启动在 :50051")
// 启动服务
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}
重要说明:
UnimplementedGreeterServer是生成代码中的嵌入结构,确保新增方法时编译通过- 所有服务方法必须接收
context.Context作为第一个参数
流式服务实现
服务端流
// Proto 定义
// rpc ListUsers (ListUsersRequest) returns (stream User);
func (s *server) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
users := []User{
{ID: "1", Name: "Alice"},
{ID: "2", Name: "Bob"},
{ID: "3", Name: "Charlie"},
}
for _, user := range users {
// 检查上下文是否已取消
if err := stream.Context().Err(); err != nil {
return err
}
// 发送消息
if err := stream.Send(&pb.User{
Id: user.ID,
Name: user.Name,
}); err != nil {
return err
}
}
return nil
}
客户端流
// Proto 定义
// rpc UploadFile (stream FileChunk) returns (UploadResult);
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var totalSize int64
var filename string
for {
// 接收消息
chunk, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕,返回结果
return stream.SendAndClose(&pb.UploadResult{
Success: true,
Message: fmt.Sprintf("上传成功,大小: %d 字节", totalSize),
Filename: filename,
})
}
if err != nil {
return err
}
filename = chunk.Filename
totalSize += int64(len(chunk.Content))
// 处理文件内容...
}
}
双向流
// Proto 定义
// rpc Chat (stream ChatMessage) returns (stream ChatMessage);
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
// 接收消息
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 处理消息
reply := &pb.ChatMessage{
UserId: "server",
Content: fmt.Sprintf("收到: %s", msg.Content),
Timestamp: time.Now().Unix(),
}
// 发送响应
if err := stream.Send(reply); err != nil {
return err
}
}
}
拦截器
拦截器类似于中间件,可以在请求处理前后执行自定义逻辑。
一元拦截器
// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 调用前
log.Printf("[请求] 方法: %s", info.FullMethod)
// 调用实际处理方法
resp, err := handler(ctx, req)
// 调用后
log.Printf("[响应] 方法: %s, 耗时: %v", info.FullMethod, time.Since(start))
return resp, err
}
// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/api.Auth/Login" {
return handler(ctx, req)
}
// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少 token")
}
// 验证 token
userID, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}
// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", userID)
return handler(ctx, req)
}
// 使用拦截器
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
loggingInterceptor,
authInterceptor,
)),
)
}
流拦截器
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Printf("[流请求] 方法: %s", info.FullMethod)
err := handler(srv, ss)
if err != nil {
log.Printf("[流错误] 方法: %s, 错误: %v", info.FullMethod, err)
}
return err
}
// 使用流拦截器
s := grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
)
错误处理
gRPC 错误码
const (
OK Code = 0
Canceled Code = 1
Unknown Code = 2
InvalidArgument Code = 3
DeadlineExceeded Code = 4
NotFound Code = 5
AlreadyExists Code = 6
PermissionDenied Code = 7
ResourceExhausted Code = 8
FailedPrecondition Code = 9
Aborted Code = 10
OutOfRange Code = 11
Unimplemented Code = 12
Internal Code = 13
Unavailable Code = 14
DataLoss Code = 15
Unauthenticated Code = 16
)
返回错误
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 参数验证
if req.UserId == "" {
return nil, status.Error(codes.InvalidArgument, "用户 ID 不能为空")
}
// 查询用户
user, err := s.db.GetUser(req.UserId)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "用户不存在: %s", req.UserId)
}
return nil, status.Errorf(codes.Internal, "数据库错误: %v", err)
}
return user, nil
}
// 带详细信息的错误
func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// 创建错误详情
details := []*errdetails.BadRequest_FieldViolation{
{
Field: "email",
Description: "邮箱格式不正确",
},
}
badReq := &errdetails.BadRequest{
FieldViolations: details,
}
// 创建带详情的状态
st, err := status.New(codes.InvalidArgument, "参数验证失败").
WithDetails(badReq)
if err != nil {
return nil, status.Error(codes.Internal, "创建错误详情失败")
}
return nil, st.Err()
}
服务器选项
func main() {
// 基本配置
lis, _ := net.Listen("tcp", ":50051")
// 服务器选项
opts := []grpc.ServerOption{
// 最大消息大小
grpc.MaxRecvMsgSize(1024 * 1024 * 10), // 10MB
grpc.MaxSendMsgSize(1024 * 1024 * 10), // 10MB
// 连接保活
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
Time: 10 * time.Second,
Timeout: 1 * time.Second,
}),
// 拦截器
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
}
s := grpc.NewServer(opts...)
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)
}
优雅关闭
func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
// 启动服务(在 goroutine 中)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}()
// 监听系统信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("正在关闭服务器...")
// 优雅关闭(等待现有请求完成)
s.GracefulStop()
log.Println("服务器已关闭")
}
健康检查
import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
func main() {
s := grpc.NewServer()
// 注册健康检查服务
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)
// 设置服务状态
healthServer.SetServingStatus("my-service", grpc_health_v1.HealthCheckResponse_SERVING)
// 注册业务服务
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)
}
小结
本章我们学习了:
- 基本服务实现:创建和注册 gRPC 服务
- 流式服务:实现三种流式 RPC
- 拦截器:请求前后的自定义处理
- 错误处理:gRPC 错误码和详细信息
- 服务器配置:选项和优化
- 优雅关闭:安全停止服务
- 健康检查:服务状态监控
下一章我们将学习如何实现 gRPC 客户端。