Go gRPC 开发
Go 语言是 gRPC 的原生支持语言之一,具有优秀的性能和简洁的语法。本章详细介绍如何使用 Go 语言开发 gRPC 服务端和客户端。
环境准备
安装 Go
确保已安装 Go 1.19 或更高版本:
# 检查 Go 版本
go version
# 设置 Go 模块代理(国内用户)
go env -w GOPROXY=https://goproxy.cn,direct
安装 gRPC 和 Protobuf
# 安装 gRPC 核心库
go get google.golang.org/grpc
# 安装 Protobuf 编译器插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 确保 $GOPATH/bin 在 PATH 中
export PATH="$PATH:$(go env GOPATH)/bin"
# 验证安装
protoc-gen-go --version
protoc-gen-go-grpc --version
项目结构
grpc-go-project/
├── proto/ # Protocol Buffers 定义
│ └── greeter.proto
├── generated/ # 生成的代码
│ ├── greeter.pb.go
│ └── greeter_grpc.pb.go
├── server/ # 服务端代码
│ └── main.go
├── client/ # 客户端代码
│ └── main.go
├── go.mod
└── Makefile
初始化项目
# 创建项目目录
mkdir grpc-go-project && cd grpc-go-project
# 初始化 Go 模块
go mod init github.com/yourname/grpc-go-project
# 创建目录结构
mkdir -p proto generated server client
Proto 定义与代码生成
Proto 文件
// proto/greeter.proto
syntax = "proto3";
package greeter;
// Go 包路径
option go_package = "github.com/yourname/grpc-go-project/generated/greeter";
// 问候服务
service Greeter {
// 一元 RPC
rpc SayHello (HelloRequest) returns (HelloReply);
// 服务端流
rpc SayHelloStream (HelloRequest) returns (stream HelloReply);
// 客户端流
rpc SendGreetings (stream HelloRequest) returns (HelloReply);
// 双向流
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}
message HelloRequest {
string name = 1;
int32 count = 2;
}
message HelloReply {
string message = 1;
int64 timestamp = 2;
}
生成代码
# 基本生成命令
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/greeter.proto
# 或使用 Makefile
Makefile 示例:
.PHONY: proto
proto:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/greeter.proto
mv github.com/yourname/grpc-go-project/generated/greeter/*.go generated/
clean:
rm -rf generated/*.go
生成的代码说明
执行后会生成两个文件:
greeter.pb.go:消息类型定义(HelloRequest、HelloReply)greeter_grpc.pb.go:服务端接口和客户端存根
关键类型:
// 服务端需要实现的接口
type GreeterServer interface {
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
SayHelloStream(*HelloRequest, Greeter_SayHelloStreamServer) error
SendGreetings(Greeter_SendGreetingsServer) error
Chat(Greeter_ChatServer) error
// 必须嵌入以保持向前兼容
mustEmbedUnimplementedGreeterServer()
}
// 客户端存根
type GreeterClient interface {
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
SayHelloStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (Greeter_SayHelloStreamClient, error)
SendGreetings(ctx context.Context, opts ...grpc.CallOption) (Greeter_SendGreetingsClient, error)
Chat(ctx context.Context, opts ...grpc.CallOption) (Greeter_ChatClient, error)
}
服务端开发
基本服务实现
// server/main.go
package main
import (
"context"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
pb "github.com/yourname/grpc-go-project/generated"
)
// GreeterServer 服务实现
type GreeterServer struct {
pb.UnimplementedGreeterServer // 必须嵌入,确保向前兼容
}
// SayHello 一元 RPC
func (s *GreeterServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("收到请求: name=%s", req.Name)
return &pb.HelloReply{
Message: fmt.Sprintf("你好, %s!", req.Name),
Timestamp: time.Now().Unix(),
}, nil
}
// SayHelloStream 服务端流
func (s *GreeterServer) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
log.Printf("收到流请求: name=%s, count=%d", req.Name, req.Count)
count := req.Count
if count <= 0 {
count = 5
}
for i := int32(0); i < count; i++ {
// 检查客户端是否已取消
if err := stream.Context().Err(); err != nil {
log.Printf("客户端已取消: %v", err)
return err
}
err := stream.Send(&pb.HelloReply{
Message: fmt.Sprintf("问候 %d: 你好, %s!", i+1, req.Name),
Timestamp: time.Now().Unix(),
})
if err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
}
return nil
}
// SendGreetings 客户端流
func (s *GreeterServer) SendGreetings(stream pb.Greeter_SendGreetingsServer) error {
var names []string
// 接收所有请求
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕
return stream.SendAndClose(&pb.HelloReply{
Message: fmt.Sprintf("收到了 %d 个问候,来自: %s", len(names), strings.Join(names, ", ")),
Timestamp: time.Now().Unix(),
})
}
if err != nil {
return err
}
log.Printf("收到: name=%s", req.Name)
names = append(names, req.Name)
}
}
// Chat 双向流
func (s *GreeterServer) Chat(stream pb.Greeter_ChatServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("聊天消息: %s", req.Name)
err = stream.Send(&pb.HelloReply{
Message: fmt.Sprintf("[服务器] 收到你的消息: %s", req.Name),
Timestamp: time.Now().Unix(),
})
if err != nil {
return err
}
}
}
func main() {
// 创建监听器
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
// 创建 gRPC 服务器
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &GreeterServer{})
log.Println("服务器启动,监听端口 :50051")
// 启动服务
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}
服务器配置选项
import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
func createServer() *grpc.Server {
// Keep-alive 配置
kaParams := keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Minute,
MaxConnectionAge: 30 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Minute,
Time: 10 * time.Second,
Timeout: 1 * time.Second,
}
// Keep-alive 策略
kaPolicy := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}
// 服务器选项
opts := []grpc.ServerOption{
// 消息大小限制
grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
grpc.MaxSendMsgSize(10 * 1024 * 1024), // 10MB
// Keep-alive
grpc.KeepaliveParams(kaParams),
grpc.KeepaliveEnforcementPolicy(kaPolicy),
// 拦截器
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
// 连接缓冲区
grpc.ReadBufferSize(32 * 1024),
grpc.WriteBufferSize(32 * 1024),
}
return grpc.NewServer(opts...)
}
拦截器
Go gRPC 支持一元拦截器和流拦截器。
一元拦截器
import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// 日志拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
log.Printf("[请求] 方法: %s", info.FullMethod)
// 调用处理方法
resp, err := handler(ctx, req)
// 记录响应
duration := time.Since(start)
code := status.Code(err)
log.Printf("[响应] 方法: %s, 状态: %s, 耗时: %v", info.FullMethod, code, duration)
return resp, err
}
// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}
// 获取元数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
// 获取 token
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
// 验证 token
userID, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的认证信息")
}
// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", userID)
return handler(ctx, req)
}
// 恢复拦截器(处理 panic)
func recoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC] 方法: %s, 错误: %v\n%s", info.FullMethod, r, debug.Stack())
err = status.Error(codes.Internal, "内部服务错误")
}
}()
return handler(ctx, req)
}
// 拦截器链
func chainUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
buildChain := func(current grpc.UnaryServerInterceptor, next grpc.UnaryHandler) grpc.UnaryHandler {
return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return current(currentCtx, currentReq, info, next)
}
}
chain := handler
for i := len(interceptors) - 1; i >= 0; i-- {
chain = buildChain(interceptors[i], chain)
}
return chain(ctx, req)
}
}
流拦截器
// 日志流拦截器
func loggingStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
log.Printf("[流请求] 方法: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("[流响应] 方法: %s, 耗时: %v, 错误: %v", info.FullMethod, time.Since(start), err)
return err
}
使用拦截器
import (
"github.com/grpc-ecosystem/go-grpc-middleware"
)
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
recoveryInterceptor,
loggingInterceptor,
authInterceptor,
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
loggingStreamInterceptor,
authStreamInterceptor,
)),
)
}
优雅关闭
import (
"os"
"os/signal"
"syscall"
)
func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &GreeterServer{})
// 启动服务器(在 goroutine 中)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}()
// 监听系统信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("正在关闭服务器...")
// 优雅关闭
s.GracefulStop()
log.Println("服务器已关闭")
}
健康检查
import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
func main() {
s := grpc.NewServer()
// 注册业务服务
pb.RegisterGreeterServer(s, &GreeterServer{})
// 注册健康检查服务
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(s, healthServer)
// 设置服务状态
healthServer.SetServingStatus("greeter.Greeter", grpc_health_v1.HealthCheckResponse_SERVING)
s.Serve(lis)
}
客户端开发
基本客户端
// client/main.go
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/yourname/grpc-go-project/generated"
)
func main() {
// 创建连接
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
// 创建客户端
client := pb.NewGreeterClient(conn)
// 调用各种 RPC
callSayHello(client)
callSayHelloStream(client)
callSendGreetings(client)
callChat(client)
}
// 一元 RPC
func callSayHello(client pb.GreeterClient) {
log.Println("\n=== 一元 RPC ===")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "张三"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}
// 服务端流
func callSayHelloStream(client pb.GreeterClient) {
log.Println("\n=== 服务端流 ===")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.SayHelloStream(ctx, &pb.HelloRequest{Name: "李四", Count: 3})
if err != nil {
log.Printf("创建流失败: %v", err)
return
}
for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("流结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("收到: %s", resp.Message)
}
}
// 客户端流
func callSendGreetings(client pb.GreeterClient) {
log.Println("\n=== 客户端流 ===")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.SendGreetings(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}
// 发送多个请求
names := []string{"王五", "赵六", "钱七"}
for _, name := range names {
log.Printf("发送: %s", name)
if err := stream.Send(&pb.HelloRequest{Name: name}); err != nil {
log.Printf("发送失败: %v", err)
return
}
}
// 关闭发送端并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Printf("接收响应失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}
// 双向流
func callChat(client pb.GreeterClient) {
log.Println("\n=== 双向流 ===")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.Chat(ctx)
if err != nil {
log.Printf("创建流失败: %v", err)
return
}
// 发送消息的 goroutine
go func() {
messages := []string{"你好", "今天天气不错", "再见"}
for _, msg := range messages {
log.Printf("发送: %s", msg)
if err := stream.Send(&pb.HelloRequest{Name: msg}); err != nil {
log.Printf("发送失败: %v", err)
return
}
time.Sleep(500 * time.Millisecond)
}
stream.CloseSend()
}()
// 接收消息
for {
resp, err := stream.Recv()
if err == io.EOF {
log.Println("聊天结束")
break
}
if err != nil {
log.Printf("接收失败: %v", err)
break
}
log.Printf("收到: %s", resp.Message)
}
}
客户端配置选项
import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)
func createConnection() *grpc.ClientConn {
// Keep-alive 配置
kaParams := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}
// 连接参数
connectParams := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: 5 * time.Second,
}
// 服务配置(重试策略)
serviceConfig := `{
"methodConfig": [{
"name": [{"service": ""}],
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(kaParams),
grpc.WithConnectParams(connectParams),
grpc.WithDefaultServiceConfig(serviceConfig),
grpc.WithBlock(), // 阻塞等待连接建立
grpc.WithTimeout(5*time.Second), // 连接超时
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
return conn
}
元数据传递
import "google.golang.org/grpc/metadata"
// 发送元数据
func callWithMetadata(client pb.GreeterClient) {
// 创建元数据
md := metadata.New(map[string]string{
"authorization": "Bearer my-token",
"x-request-id": "req-12345",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}
// 接收响应元数据
func receiveMetadata(client pb.GreeterClient) {
ctx := context.Background()
var header, trailer metadata.MD
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"},
grpc.Header(&header), // 获取响应头
grpc.Trailer(&trailer), // 获取尾部元数据
)
if err != nil {
log.Printf("调用失败: %v", err)
return
}
// 读取响应头
if responseType := header.Get("content-type"); len(responseType) > 0 {
log.Printf("Content-Type: %s", responseType[0])
}
// 读取尾部元数据
if requestID := trailer.Get("x-request-id"); len(requestID) > 0 {
log.Printf("Request ID: %s", requestID[0])
}
log.Printf("响应: %s", resp.Message)
}
超时和取消
// 超时
func callWithTimeout(client pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("请求超时")
} else {
log.Printf("其他错误: %v", err)
}
return
}
log.Printf("响应: %s", resp.Message)
}
// 取消
func callWithCancel(client pb.GreeterClient) {
ctx, cancel := context.WithCancel(context.Background())
// 在另一个 goroutine 中取消
go func() {
time.Sleep(time.Second)
cancel()
log.Println("请求已取消")
}()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "测试"})
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.Canceled {
log.Println("请求被取消")
}
return
}
log.Printf("响应: %s", resp.Message)
}
客户端拦截器
// 日志拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
log.Printf("[客户端请求] 方法: %s", method)
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("[客户端响应] 方法: %s, 耗时: %v, 错误: %v", method, time.Since(start), err)
return err
}
// 使用拦截器
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
)
}
错误处理
服务端返回错误
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/genproto/googleapis/rpc/errdetails"
)
func (s *GreeterServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
// 参数验证
if req.Name == "" {
return nil, status.Error(codes.InvalidArgument, "name 不能为空")
}
// 业务逻辑错误
if req.Name == "error" {
return nil, status.Errorf(codes.Internal, "内部处理错误")
}
// 带详细信息的错误
if req.Name == "detailed_error" {
st, _ := status.New(codes.InvalidArgument, "参数验证失败").
WithDetails(&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequest_FieldViolation{
{
Field: "name",
Description: "name 不能是 'detailed_error'",
},
},
})
return nil, st.Err()
}
return &pb.HelloReply{Message: "你好, " + req.Name}, nil
}
客户端处理错误
import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/genproto/googleapis/rpc/errdetails"
)
func handleError(err error) {
if err == nil {
return
}
st, ok := status.FromError(err)
if !ok {
log.Printf("非 gRPC 错误: %v", err)
return
}
log.Printf("错误码: %s", st.Code())
log.Printf("错误消息: %s", st.Message())
// 处理详细错误信息
for _, detail := range st.Details() {
switch d := detail.(type) {
case *errdetails.BadRequest:
for _, v := range d.FieldViolations {
log.Printf("字段错误: %s - %s", v.Field, v.Description)
}
case *errdetails.RetryInfo:
log.Printf("建议重试延迟: %v", d.RetryDelay.AsDuration())
}
}
// 根据错误码处理
switch st.Code() {
case codes.InvalidArgument:
log.Println("参数错误")
case codes.NotFound:
log.Println("资源不存在")
case codes.Unauthenticated:
log.Println("未认证")
case codes.DeadlineExceeded:
log.Println("请求超时")
case codes.Unavailable:
log.Println("服务不可用")
}
}
TLS 安全连接
服务端 TLS
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func main() {
// 加载证书
creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}
// 创建服务器
s := grpc.NewServer(grpc.Creds(creds))
pb.RegisterGreeterServer(s, &GreeterServer{})
lis, _ := net.Listen("tcp", ":50051")
s.Serve(lis)
}
客户端 TLS
func main() {
// 加载 CA 证书验证服务端
creds, err := credentials.NewClientTLSFromFile("ca.crt", "localhost")
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}
// 创建连接
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(creds),
)
}
双向 TLS(mTLS)
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"google.golang.org/grpc/credentials"
)
// 服务端 mTLS 配置
func serverMTLS() credentials.TransportCredentials {
// 加载 CA 证书
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
// 配置 TLS
config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("server.crt", "server.key"),
},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caPool,
}
return credentials.NewTLS(config)
}
// 客户端 mTLS 配置
func clientMTLS() credentials.TransportCredentials {
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("client.crt", "client.key"),
},
RootCAs: caPool,
}
return credentials.NewTLS(config)
}
func loadCert(certFile, keyFile string) tls.Certificate {
cert, _ := tls.LoadX509KeyPair(certFile, keyFile)
return cert
}
负载均衡
客户端负载均衡
// 使用 DNS 服务发现
func createClientWithLB() *grpc.ClientConn {
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051", // DNS 解析
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin": {}}]
}`),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
return conn
}
// 静态地址列表
func createClientWithStaticAddresses() *grpc.ClientConn {
conn, err := grpc.Dial(
"localhost:50051,localhost:50052,localhost:50053",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{
"loadBalancingConfig": [{"round_robin": {}}]
}`),
)
return conn
}
性能优化
连接复用
// 推荐:使用单例模式复用连接
type GRPCClient struct {
conn *grpc.ClientConn
stub pb.GreeterClient
once sync.Once
}
var clientInstance *GRPCClient
func GetGRPCClient() (*GRPCClient, error) {
var err error
clientInstance.once.Do(func() {
var conn *grpc.ClientConn
conn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return
}
clientInstance = &GRPCClient{
conn: conn,
stub: pb.NewGreeterClient(conn),
}
})
return clientInstance, err
}
func (c *GRPCClient) Close() {
if c.conn != nil {
c.conn.Close()
}
}
连接池
// 连接池实现
type ConnPool struct {
conns []*grpc.ClientConn
mu sync.Mutex
index int
}
func NewConnPool(size int, target string) (*ConnPool, error) {
pool := &ConnPool{
conns: make([]*grpc.ClientConn, size),
}
for i := 0; i < size; i++ {
conn, err := grpc.Dial(target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, err
}
pool.conns[i] = conn
}
return pool, nil
}
func (p *ConnPool) Get() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
conn := p.conns[p.index]
p.index = (p.index + 1) % len(p.conns)
return conn
}
func (p *ConnPool) Close() {
for _, conn := range p.conns {
conn.Close()
}
}
最佳实践
1. 连接管理
// ✅ 好:复用连接
var globalConn *grpc.ClientConn
func init() {
var err error
globalConn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
}
// ❌ 不好:每次调用都创建新连接
func badPractice() {
conn, _ := grpc.Dial("localhost:50051", ...)
defer conn.Close() // 立即关闭,效率低
client := pb.NewGreeterClient(conn)
client.SayHello(...)
}
2. 资源释放
// 使用 defer 确保资源释放
func main() {
conn, err := grpc.Dial(...)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 应用逻辑...
}
3. Context 传递
// 始终使用 context 控制超时和取消
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, req)
4. 错误处理
// 使用 status.FromError 处理 gRPC 错误
resp, err := client.SayHello(ctx, req)
if err != nil {
if st, ok := status.FromError(err); ok {
// gRPC 错误
switch st.Code() {
case codes.NotFound:
// 处理不存在
case codes.DeadlineExceeded:
// 处理超时
}
} else {
// 其他错误
}
return
}
小结
本章介绍了 Go gRPC 开发的核心内容:
- 环境准备:安装 gRPC 和 Protobuf 工具
- 代码生成:使用 protoc 生成 Go 代码
- 服务端开发:服务实现、配置选项、拦截器、优雅关闭
- 客户端开发:连接创建、各种 RPC 调用方式、元数据传递
- 错误处理:标准错误码和错误详情
- 安全连接:TLS 和 mTLS 配置
- 负载均衡:客户端负载均衡配置
- 性能优化:连接复用、连接池
- 最佳实践:连接管理、资源释放、错误处理
Go 语言是 gRPC 开发的优秀选择,其简洁的语法和出色的并发支持使得构建高性能 gRPC 服务变得简单。
[!TIP] Go gRPC 的性能非常出色,适合用于微服务架构中的服务间通信。结合 Go 的并发特性,可以轻松处理大量并发请求。