跳到主要内容

Go gRPC 开发

Go 语言是 gRPC 的原生支持语言之一,具有优秀的性能和简洁的语法。本章详细介绍如何使用 Go 语言开发 gRPC 服务端和客户端。

环境准备

安装 Go

确保已安装 Go 1.19 或更高版本:

# 检查 Go 版本
go version

# 设置 Go 模块代理(国内用户)
go env -w GOPROXY=https://goproxy.cn,direct

安装 gRPC 和 Protobuf

# 安装 gRPC 核心库
go get google.golang.org/grpc

# 安装 Protobuf 编译器插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 确保 $GOPATH/bin 在 PATH 中
export PATH="$PATH:$(go env GOPATH)/bin"

# 验证安装
protoc-gen-go --version
protoc-gen-go-grpc --version

项目结构

grpc-go-project/
├── proto/ # Protocol Buffers 定义
│ └── greeter.proto
├── generated/ # 生成的代码
│ ├── greeter.pb.go
│ └── greeter_grpc.pb.go
├── server/ # 服务端代码
│ └── main.go
├── client/ # 客户端代码
│ └── main.go
├── go.mod
└── Makefile

初始化项目

# 创建项目目录
mkdir grpc-go-project && cd grpc-go-project

# 初始化 Go 模块
go mod init github.com/yourname/grpc-go-project

# 创建目录结构
mkdir -p proto generated server client

Proto 定义与代码生成

Proto 文件

// proto/greeter.proto
syntax = "proto3";

package greeter;

// Go 包路径
option go_package = "github.com/yourname/grpc-go-project/generated/greeter";

// 问候服务
service Greeter {
// 一元 RPC
rpc SayHello (HelloRequest) returns (HelloReply);

// 服务端流
rpc SayHelloStream (HelloRequest) returns (stream HelloReply);

// 客户端流
rpc SendGreetings (stream HelloRequest) returns (HelloReply);

// 双向流
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}

message HelloRequest {
string name = 1;
int32 count = 2;
}

message HelloReply {
string message = 1;
int64 timestamp = 2;
}

生成代码

# 基本生成命令
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/greeter.proto

# 或使用 Makefile

Makefile 示例

.PHONY: proto

proto:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/greeter.proto
mv github.com/yourname/grpc-go-project/generated/greeter/*.go generated/

clean:
rm -rf generated/*.go

生成的代码说明

执行后会生成两个文件:

  • greeter.pb.go:消息类型定义(HelloRequestHelloReply
  • greeter_grpc.pb.go:服务端接口和客户端存根

关键类型

// 服务端需要实现的接口
type GreeterServer interface {
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
SayHelloStream(*HelloRequest, Greeter_SayHelloStreamServer) error
SendGreetings(Greeter_SendGreetingsServer) error
Chat(Greeter_ChatServer) error

// 必须嵌入以保持向前兼容
mustEmbedUnimplementedGreeterServer()
}

// 客户端存根
type GreeterClient interface {
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
SayHelloStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (Greeter_SayHelloStreamClient, error)
SendGreetings(ctx context.Context, opts ...grpc.CallOption) (Greeter_SendGreetingsClient, error)
Chat(ctx context.Context, opts ...grpc.CallOption) (Greeter_ChatClient, error)
}

服务端开发

基本服务实现

// server/main.go
package main

import (
"context"
"fmt"
"log"
"net"
"time"

"google.golang.org/grpc"
pb "github.com/yourname/grpc-go-project/generated"
)

// GreeterServer 服务实现
type GreeterServer struct {
pb.UnimplementedGreeterServer // 必须嵌入,确保向前兼容
}

// SayHello 一元 RPC
func (s *GreeterServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("收到请求: name=%s", req.Name)

return &pb.HelloReply{
Message: fmt.Sprintf("你好, %s!", req.Name),
Timestamp: time.Now().Unix(),
}, nil
}

// SayHelloStream 服务端流
func (s *GreeterServer) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
log.Printf("收到流请求: name=%s, count=%d", req.Name, req.Count)

count := req.Count
if count <= 0 {
count = 5
}

for i := int32(0); i < count; i++ {
// 检查客户端是否已取消
if err := stream.Context().Err(); err != nil {
log.Printf("客户端已取消: %v", err)
return err
}

err := stream.Send(&pb.HelloReply{
Message: fmt.Sprintf("问候 %d: 你好, %s!", i+1, req.Name),
Timestamp: time.Now().Unix(),
})
if err != nil {
return err
}

time.Sleep(500 * time.Millisecond)
}

return nil
}

// SendGreetings 客户端流
func (s *GreeterServer) SendGreetings(stream pb.Greeter_SendGreetingsServer) error {
var names []string

// 接收所有请求
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕
return stream.SendAndClose(&pb.HelloReply{
Message: fmt.Sprintf("收到了 %d 个问候,来自: %s", len(names), strings.Join(names, ", ")),
Timestamp: time.Now().Unix(),
})
}
if err != nil {
return err
}

log.Printf("收到: name=%s", req.Name)
names = append(names, req.Name)
}
}

// Chat 双向流
func (s *GreeterServer) Chat(stream pb.Greeter_ChatServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

log.Printf("聊天消息: %s", req.Name)

err = stream.Send(&pb.HelloReply{
Message: fmt.Sprintf("[服务器] 收到你的消息: %s", req.Name),
Timestamp: time.Now().Unix(),
})
if err != nil {
return err
}
}
}

func main() {
// 创建监听器
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}

// 创建 gRPC 服务器
s := grpc.NewServer()

// 注册服务
pb.RegisterGreeterServer(s, &GreeterServer{})

log.Println("服务器启动,监听端口 :50051")

// 启动服务
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}

服务器配置选项

import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

func createServer() *grpc.Server {
// Keep-alive 配置
kaParams := keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Minute,
MaxConnectionAge: 30 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Minute,
Time: 10 * time.Second,
Timeout: 1 * time.Second,
}

// Keep-alive 策略
kaPolicy := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}

// 服务器选项
opts := []grpc.ServerOption{
// 消息大小限制
grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
grpc.MaxSendMsgSize(10 * 1024 * 1024), // 10MB

// Keep-alive
grpc.KeepaliveParams(kaParams),
grpc.KeepaliveEnforcementPolicy(kaPolicy),

// 拦截器
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),

// 连接缓冲区
grpc.ReadBufferSize(32 * 1024),
grpc.WriteBufferSize(32 * 1024),
}

return grpc.NewServer(opts...)
}

拦截器

Go gRPC 支持一元拦截器和流拦截器。

一元拦截器

import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()

log.Printf("[请求] 方法: %s", info.FullMethod)

// 调用处理方法
resp, err := handler(ctx, req)

// 记录响应
duration := time.Since(start)
code := status.Code(err)
log.Printf("[响应] 方法: %s, 状态: %s, 耗时: %v", info.FullMethod, code, duration)

return resp, err
}

// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}

// 获取元数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}

// 获取 token
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}

// 验证 token
userID, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的认证信息")
}

// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", userID)

return handler(ctx, req)
}

// 恢复拦截器(处理 panic)
func recoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC] 方法: %s, 错误: %v\n%s", info.FullMethod, r, debug.Stack())
err = status.Error(codes.Internal, "内部服务错误")
}
}()
return handler(ctx, req)
}

// 拦截器链
func chainUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
buildChain := func(current grpc.UnaryServerInterceptor, next grpc.UnaryHandler) grpc.UnaryHandler {
return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return current(currentCtx, currentReq, info, next)
}
}

chain := handler
for i := len(interceptors) - 1; i >= 0; i-- {
chain = buildChain(interceptors[i], chain)
}

return chain(ctx, req)
}
}

流拦截器

// 日志流拦截器
func loggingStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
log.Printf("[流请求] 方法: %s", info.FullMethod)

err := handler(srv, ss)

log.Printf("[流响应] 方法: %s, 耗时: %v, 错误: %v", info.FullMethod, time.Since(start), err)
return err
}

使用拦截器

import (
"github.com/grpc-ecosystem/go-grpc-middleware"
)

func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
recoveryInterceptor,
loggingInterceptor,
authInterceptor,
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
loggingStreamInterceptor,
authStreamInterceptor,
)),
)
}

优雅关闭

import (
"os"
"os/signal"
"syscall"
)

func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer()

pb.RegisterGreeterServer(s, &GreeterServer{})

// 启动服务器(在 goroutine 中)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}()

// 监听系统信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

log.Println("正在关闭服务器...")

// 优雅关闭
s.GracefulStop()

log.Println("服务器已关闭")
}

健康检查

import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)

func main() {
s := grpc.NewServer()

// 注册业务服务
pb.RegisterGreeterServer(s, &GreeterServer{})

// 注册健康检查服务
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)

// 设置服务状态
healthServer.SetServingStatus("greeter.Greeter", grpc_health_v1.HealthCheckResponse_SERVING)

s.Serve(lis)
}

客户端开发

基本客户端

// client/main.go
package main

import (
"context"
"io"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/yourname/grpc-go-project/generated"
)

func main() {
// 创建连接
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

// 创建客户端
client := pb.NewGreeterClient(conn)

// 调用各种 RPC
callSayHello(client)
callSayHelloStream(client)
callSendGreetings(client)
callChat(client)
}

// 一元 RPC
func callSayHello(client pb.GreeterClient) {
log.Println("\n=== 一元 RPC ===")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}

log.Printf("响应: %s", resp.Message)
}

// 服务端流
func callSayHelloStream(client pb.GreeterClient) {
log.Println("\n=== 服务端流 ===")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

stream, err := client.SayHelloStream(ctx, &pb.HelloRequest{Name: "李四", Count: 3})
if err != nil {
log.Printf("创建流失败: %v", err)
return
}

for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("流结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("收到: %s", resp.Message)
}
}

// 客户端流
func callSendGreetings(client pb.GreeterClient) {
log.Println("\n=== 客户端流 ===")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

stream, err := client.SendGreetings(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}

// 发送多个请求
names := []string{"王五", "赵六", "钱七"}
for _, name := range names {
log.Printf("发送: %s", name)
if err := stream.Send(&pb.HelloRequest{Name: name}); err != nil {
log.Printf("发送失败: %v", err)
return
}
}

// 关闭发送端并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Printf("接收响应失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}

// 双向流
func callChat(client pb.GreeterClient) {
log.Println("\n=== 双向流 ===")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

stream, err := client.Chat(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}

// 发送消息的 goroutine
go func() {
messages := []string{"你好", "今天天气不错", "再见"}
for _, msg := range messages {
log.Printf("发送: %s", msg)
if err := stream.Send(&pb.HelloRequest{Name: msg}); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(500 * time.Millisecond)
}
stream.CloseSend()
}()

// 接收消息
for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("聊天结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("收到: %s", resp.Message)
}
}

客户端配置选项

import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

func createConnection() *grpc.ClientConn {
// Keep-alive 配置
kaParams := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}

// 连接参数
connectParams := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: 5 * time.Second,
}

// 服务配置(重试策略)
serviceConfig := `{
"methodConfig": [{
"name": [{"service": ""}],
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`

conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(kaParams),
grpc.WithConnectParams(connectParams),
grpc.WithDefaultServiceConfig(serviceConfig),
grpc.WithBlock(), // 阻塞等待连接建立
grpc.WithTimeout(5*time.Second), // 连接超时
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}

return conn
}

元数据传递

import "google.golang.org/grpc/metadata"

// 发送元数据
func callWithMetadata(client pb.GreeterClient) {
// 创建元数据
md := metadata.New(map[string]string{
"authorization": "Bearer my-token",
"x-request-id": "req-12345",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}

// 接收响应元数据
func receiveMetadata(client pb.GreeterClient) {
ctx := context.Background()

var header, trailer metadata.MD

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"},
grpc.Header(&header), // 获取响应头
grpc.Trailer(&trailer), // 获取尾部元数据
)
if err != nil {
log.Printf("调用失败: %v", err)
return
}

// 读取响应头
if responseType := header.Get("content-type"); len(responseType) > 0 {
log.Printf("Content-Type: %s", responseType[0])
}

// 读取尾部元数据
if requestID := trailer.Get("x-request-id"); len(requestID) > 0 {
log.Printf("Request ID: %s", requestID[0])
}

log.Printf("响应: %s", resp.Message)
}

超时和取消

// 超时
func callWithTimeout(client pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("请求超时")
} else {
log.Printf("其他错误: %v", err)
}
return
}
log.Printf("响应: %s", resp.Message)
}

// 取消
func callWithCancel(client pb.GreeterClient) {
ctx, cancel := context.WithCancel(context.Background())

// 在另一个 goroutine 中取消
go func() {
time.Sleep(time.Second)
cancel()
log.Println("请求已取消")
}()

resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.Canceled {
log.Println("请求被取消")
}
return
}
log.Printf("响应: %s", resp.Message)
}

客户端拦截器

// 日志拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()

log.Printf("[客户端请求] 方法: %s", method)

err := invoker(ctx, method, req, reply, cc, opts...)

log.Printf("[客户端响应] 方法: %s, 耗时: %v, 错误: %v", method, time.Since(start), err)

return err
}

// 使用拦截器
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
)
}

错误处理

服务端返回错误

import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/genproto/googleapis/rpc/errdetails"
)

func (s *GreeterServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
// 参数验证
if req.Name == "" {
return nil, status.Error(codes.InvalidArgument, "name 不能为空")
}

// 业务逻辑错误
if req.Name == "error" {
return nil, status.Errorf(codes.Internal, "内部处理错误")
}

// 带详细信息的错误
if req.Name == "detailed_error" {
st, _ := status.New(codes.InvalidArgument, "参数验证失败").
WithDetails(&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequest_FieldViolation{
{
Field: "name",
Description: "name 不能是 'detailed_error'",
},
},
})
return nil, st.Err()
}

return &pb.HelloReply{Message: "你好, " + req.Name}, nil
}

客户端处理错误

import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/genproto/googleapis/rpc/errdetails"
)

func handleError(err error) {
if err == nil {
return
}

st, ok := status.FromError(err)
if !ok {
log.Printf("非 gRPC 错误: %v", err)
return
}

log.Printf("错误码: %s", st.Code())
log.Printf("错误消息: %s", st.Message())

// 处理详细错误信息
for _, detail := range st.Details() {
switch d := detail.(type) {
case *errdetails.BadRequest:
for _, v := range d.FieldViolations {
log.Printf("字段错误: %s - %s", v.Field, v.Description)
}
case *errdetails.RetryInfo:
log.Printf("建议重试延迟: %v", d.RetryDelay.AsDuration())
}
}

// 根据错误码处理
switch st.Code() {
case codes.InvalidArgument:
log.Println("参数错误")
case codes.NotFound:
log.Println("资源不存在")
case codes.Unauthenticated:
log.Println("未认证")
case codes.DeadlineExceeded:
log.Println("请求超时")
case codes.Unavailable:
log.Println("服务不可用")
}
}

TLS 安全连接

服务端 TLS

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func main() {
// 加载证书
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}

// 创建服务器
s := grpc.NewServer(grpc.Creds(creds))

pb.RegisterGreeterServer(s, &GreeterServer{})

lis, _ := net.Listen("tcp", ":50051")
s.Serve(lis)
}

客户端 TLS

func main() {
// 加载 CA 证书验证服务端
creds, err := credentials.NewClientTLSFromFile("ca.crt", "localhost")
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}

// 创建连接
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(creds),
)
}

双向 TLS(mTLS)

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"google.golang.org/grpc/credentials"
)

// 服务端 mTLS 配置
func serverMTLS() credentials.TransportCredentials {
// 加载 CA 证书
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)

// 配置 TLS
config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("server.crt", "server.key"),
},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caPool,
}

return credentials.NewTLS(config)
}

// 客户端 mTLS 配置
func clientMTLS() credentials.TransportCredentials {
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)

config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("client.crt", "client.key"),
},
RootCAs: caPool,
}

return credentials.NewTLS(config)
}

func loadCert(certFile, keyFile string) tls.Certificate {
cert, _ := tls.LoadX509KeyPair(certFile, keyFile)
return cert
}

负载均衡

客户端负载均衡

// 使用 DNS 服务发现
func createClientWithLB() *grpc.ClientConn {
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051", // DNS 解析
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin": {}}]
}`),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
return conn
}

// 静态地址列表
func createClientWithStaticAddresses() *grpc.ClientConn {
conn, err := grpc.Dial(
"localhost:50051,localhost:50052,localhost:50053",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin": {}}]
}`),
)
return conn
}

性能优化

连接复用

// 推荐:使用单例模式复用连接
type GRPCClient struct {
conn *grpc.ClientConn
stub pb.GreeterClient
once sync.Once
}

var clientInstance *GRPCClient

func GetGRPCClient() (*GRPCClient, error) {
var err error
clientInstance.once.Do(func() {
var conn *grpc.ClientConn
conn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return
}
clientInstance = &GRPCClient{
conn: conn,
stub: pb.NewGreeterClient(conn),
}
})
return clientInstance, err
}

func (c *GRPCClient) Close() {
if c.conn != nil {
c.conn.Close()
}
}

连接池

// 连接池实现
type ConnPool struct {
conns []*grpc.ClientConn
mu sync.Mutex
index int
}

func NewConnPool(size int, target string) (*ConnPool, error) {
pool := &ConnPool{
conns: make([]*grpc.ClientConn, size),
}

for i := 0; i < size; i++ {
conn, err := grpc.Dial(target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, err
}
pool.conns[i] = conn
}

return pool, nil
}

func (p *ConnPool) Get() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()

conn := p.conns[p.index]
p.index = (p.index + 1) % len(p.conns)
return conn
}

func (p *ConnPool) Close() {
for _, conn := range p.conns {
conn.Close()
}
}

最佳实践

1. 连接管理

// ✅ 好:复用连接
var globalConn *grpc.ClientConn

func init() {
var err error
globalConn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
}

// ❌ 不好:每次调用都创建新连接
func badPractice() {
conn, _ := grpc.Dial("localhost:50051", ...)
defer conn.Close() // 立即关闭,效率低
client := pb.NewGreeterClient(conn)
client.SayHello(...)
}

2. 资源释放

// 使用 defer 确保资源释放
func main() {
conn, err := grpc.Dial(...)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// 应用逻辑...
}

3. Context 传递

// 始终使用 context 控制超时和取消
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.SayHello(ctx, req)

4. 错误处理

// 使用 status.FromError 处理 gRPC 错误
resp, err := client.SayHello(ctx, req)
if err != nil {
if st, ok := status.FromError(err); ok {
// gRPC 错误
switch st.Code() {
case codes.NotFound:
// 处理不存在
case codes.DeadlineExceeded:
// 处理超时
}
} else {
// 其他错误
}
return
}

小结

本章介绍了 Go gRPC 开发的核心内容:

  1. 环境准备:安装 gRPC 和 Protobuf 工具
  2. 代码生成:使用 protoc 生成 Go 代码
  3. 服务端开发:服务实现、配置选项、拦截器、优雅关闭
  4. 客户端开发:连接创建、各种 RPC 调用方式、元数据传递
  5. 错误处理:标准错误码和错误详情
  6. 安全连接:TLS 和 mTLS 配置
  7. 负载均衡:客户端负载均衡配置
  8. 性能优化:连接复用、连接池
  9. 最佳实践:连接管理、资源释放、错误处理

Go 语言是 gRPC 开发的优秀选择,其简洁的语法和出色的并发支持使得构建高性能 gRPC 服务变得简单。

[!TIP] Go gRPC 的性能非常出色,适合用于微服务架构中的服务间通信。结合 Go 的并发特性,可以轻松处理大量并发请求。