跳到主要内容

安全认证

本章介绍 gRPC 的安全机制,包括 TLS/SSL 加密、认证方式和授权控制。

安全概述

TLS 加密

生成证书

# 创建 CA 私钥
openssl genrsa -out ca.key 4096

# 创建 CA 证书
openssl req -new -x509 -days 365 -key ca.key -out ca.crt \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/CN=MyCA"

# 创建服务端私钥
openssl genrsa -out server.key 4096

# 创建服务端 CSR
openssl req -new -key server.key -out server.csr \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/CN=localhost"

# 使用 CA 签发服务端证书
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out server.crt

# 创建客户端私钥和证书(用于 mTLS)
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/CN=client"
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out client.crt

服务端 TLS(单向)

服务端使用证书,客户端验证服务端身份:

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func main() {
// 加载服务端证书
creds, err := credentials.NewServerTLSFromFile(
"server.crt", // 证书文件
"server.key", // 私钥文件
)
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}

// 创建带 TLS 的服务器
s := grpc.NewServer(grpc.Creds(creds))

// 注册服务...
pb.RegisterGreeterServer(s, &server{})

lis, _ := net.Listen("tcp", ":50051")
s.Serve(lis)
}

客户端

func main() {
// 加载 CA 证书验证服务端
creds, err := credentials.NewClientTLSFromFile(
"ca.crt", // CA 证书
"localhost", // 服务器域名(需与证书 CN 匹配)
)
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}

// 创建带 TLS 的连接
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(creds),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()

client := pb.NewGreeterClient(conn)
// ...
}

双向 TLS(mTLS)

双方互相验证:

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"google.golang.org/grpc/credentials"
)

// 服务端配置
func serverTLS() credentials.TransportCredentials {
// 加载 CA 证书(用于验证客户端)
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)

// 配置 TLS
config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("server.crt", "server.key"),
},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caPool,
}

return credentials.NewTLS(config)
}

// 客户端配置
func clientTLS() credentials.TransportCredentials {
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)

config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("client.crt", "client.key"),
},
RootCAs: caPool,
}

return credentials.NewTLS(config)
}

func loadCert(certFile, keyFile string) tls.Certificate {
cert, _ := tls.LoadX509KeyPair(certFile, keyFile)
return cert
}

Token 认证

拦截器实现

// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/api.Auth/Login" || info.FullMethod == "/api.Auth/Register" {
return handler(ctx, req)
}

// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}

values := md.Get("authorization")
if len(values) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证 token")
}

token := values[0]
if !strings.HasPrefix(token, "Bearer ") {
return nil, status.Error(codes.Unauthenticated, "无效的 token 格式")
}

// 验证 token
userID, err := validateToken(strings.TrimPrefix(token, "Bearer "))
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}

// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", userID)

return handler(ctx, req)
}

// 验证 token(示例)
func validateToken(token string) (string, error) {
// 这里实现实际的 token 验证逻辑
// 可以使用 JWT 或查询数据库
return "user123", nil
}

func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(authInterceptor),
)
}

客户端发送 Token

func callWithToken(client pb.GreeterClient, token string) {
// 创建带 token 的元数据
md := metadata.New(map[string]string{
"authorization": "Bearer " + token,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 调用服务
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}

Per-RPC 认证

// 使用 PerRPCCredentials 接口
type tokenAuth struct {
token string
}

func (t *tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}

func (t *tokenAuth) RequireTransportSecurity() bool {
return true // 要求 TLS
}

func main() {
// 创建认证凭证
auth := &tokenAuth{token: "my-jwt-token"}

// 连接时使用
conn, _ := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(auth),
)

// 所有调用都会自动带上 token
client := pb.NewGreeterClient(conn)
client.SayHello(ctx, req) // 自动带上 token
}

JWT 认证实战

JWT(JSON Web Token)是现代微服务中常用的认证方式。下面展示如何在 gRPC 中集成 JWT。

JWT 服务端认证

import (
"github.com/golang-jwt/jwt/v5"
)

// JWT Claims
type Claims struct {
UserID string `json:"user_id"`
Role string `json:"role"`
jwt.RegisteredClaims
}

// JWT 认证拦截器
func jwtAuthInterceptor(secretKey string) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}

// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}

authHeader := md.Get("authorization")
if len(authHeader) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}

// 解析 Bearer token
tokenString := strings.TrimPrefix(authHeader[0], "Bearer ")
if tokenString == authHeader[0] {
return nil, status.Error(codes.Unauthenticated, "无效的认证格式")
}

// 解析 JWT
claims, err := parseJWT(tokenString, secretKey)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}

// 检查 token 是否过期
if claims.ExpiresAt != nil && claims.ExpiresAt.Before(time.Now()) {
return nil, status.Error(codes.Unauthenticated, "token 已过期")
}

// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", claims.UserID)
ctx = context.WithValue(ctx, "role", claims.Role)

return handler(ctx, req)
}
}

// 解析 JWT
func parseJWT(tokenString, secretKey string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
// 验证签名算法
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("无效的签名算法")
}
return []byte(secretKey), nil
})

if err != nil {
return nil, err
}

if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}

return nil, fmt.Errorf("无效的 token")
}

// 公开方法列表
func isPublicMethod(method string) bool {
publicMethods := map[string]bool{
"/api.Auth/Login": true,
"/api.Auth/Register": true,
"/api.Auth/RefreshToken": true,
"/grpc.health.v1.Health/Check": true,
}
return publicMethods[method]
}

JWT 登录服务

// 登录服务
type AuthService struct {
pb.UnimplementedAuthServer
secretKey string
tokenDuration time.Duration
userStore UserStore
}

func (s *AuthService) Login(ctx context.Context, req *pb.LoginRequest) (*pb.LoginResponse, error) {
// 验证用户凭据
user, err := s.userStore.ValidateUser(req.Username, req.Password)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "用户名或密码错误")
}

// 生成 JWT
token, err := s.generateToken(user)
if err != nil {
return nil, status.Error(codes.Internal, "生成 token 失败")
}

// 生成刷新 token
refreshToken, err := s.generateRefreshToken(user)
if err != nil {
return nil, status.Error(codes.Internal, "生成刷新 token 失败")
}

return &pb.LoginResponse{
AccessToken: token,
RefreshToken: refreshToken,
ExpiresIn: int64(s.tokenDuration.Seconds()),
User: user,
}, nil
}

func (s *AuthService) generateToken(user *User) (string, error) {
claims := &Claims{
UserID: user.ID,
Role: user.Role,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(s.tokenDuration)),
IssuedAt: jwt.NewNumericDate(time.Now()),
Issuer: "my-grpc-service",
},
}

token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString([]byte(s.secretKey))
}

// 刷新 Token
func (s *AuthService) RefreshToken(ctx context.Context, req *pb.RefreshTokenRequest) (*pb.LoginResponse, error) {
// 验证刷新 token
claims, err := parseJWT(req.RefreshToken, s.secretKey)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的刷新 token")
}

// 检查刷新 token 是否有效(可以维护一个黑名单)
if s.isTokenRevoked(req.RefreshToken) {
return nil, status.Error(codes.Unauthenticated, "token 已失效")
}

// 获取用户信息
user, err := s.userStore.GetByID(claims.UserID)
if err != nil {
return nil, status.Error(codes.NotFound, "用户不存在")
}

// 生成新的 access token
newToken, err := s.generateToken(user)
if err != nil {
return nil, status.Error(codes.Internal, "生成 token 失败")
}

return &pb.LoginResponse{
AccessToken: newToken,
ExpiresIn: int64(s.tokenDuration.Seconds()),
}, nil
}

JWT 客户端

// JWT 客户端封装
type JWTClient struct {
conn *grpc.ClientConn
authService pb.AuthClient
accessToken string
refreshToken string
mutex sync.RWMutex
}

func NewJWTClient(address string, creds credentials.TransportCredentials) (*JWTClient, error) {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, err
}

return &JWTClient{
conn: conn,
authService: pb.NewAuthClient(conn),
}, nil
}

// 登录获取 token
func (c *JWTClient) Login(username, password string) error {
resp, err := c.authService.Login(context.Background(), &pb.LoginRequest{
Username: username,
Password: password,
})
if err != nil {
return err
}

c.mutex.Lock()
c.accessToken = resp.AccessToken
c.refreshToken = resp.RefreshToken
c.mutex.Unlock()

return nil
}

// 自动刷新 token 的拦截器
func (c *JWTClient) refreshTokenInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 第一次尝试
err := invoker(c.withToken(ctx), method, req, reply, cc, opts...)

// 如果 token 过期,尝试刷新
if st, ok := status.FromError(err); ok && st.Code() == codes.Unauthenticated {
if refreshErr := c.refreshAccessToken(); refreshErr == nil {
// 刷新成功,重试请求
return invoker(c.withToken(ctx), method, req, reply, cc, opts...)
}
}

return err
}
}

func (c *JWTClient) refreshAccessToken() error {
c.mutex.RLock()
refreshToken := c.refreshToken
c.mutex.RUnlock()

if refreshToken == "" {
return fmt.Errorf("没有刷新 token")
}

resp, err := c.authService.RefreshToken(context.Background(), &pb.RefreshTokenRequest{
RefreshToken: refreshToken,
})
if err != nil {
return err
}

c.mutex.Lock()
c.accessToken = resp.AccessToken
c.mutex.Unlock()

return nil
}

func (c *JWTClient) withToken(ctx context.Context) context.Context {
c.mutex.RLock()
token := c.accessToken
c.mutex.RUnlock()

return metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
}

OAuth2 集成

在企业环境中,可能需要集成 OAuth2 进行认证。以下是集成 Google OAuth2 的示例。

服务端 OAuth2 验证

import (
"google.golang.org/api/oauth2/v2"
)

// OAuth2 认证拦截器
func oauth2Interceptor() grpc.UnaryServerInterceptor {
oauth2Service, err := oauth2.New(http.DefaultClient)
if err != nil {
panic(err)
}

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}

tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}

token := strings.TrimPrefix(tokens[0], "Bearer ")

// 验证 OAuth2 token
tokenInfo, err := oauth2Service.Tokeninfo().AccessToken(token).Do()
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 OAuth2 token")
}

// 验证受众(Audience)
if tokenInfo.Audience != "your-client-id" {
return nil, status.Error(codes.Unauthenticated, "token 受众不匹配")
}

// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", tokenInfo.UserId)
ctx = context.WithValue(ctx, "email", tokenInfo.Email)

return handler(ctx, req)
}
}

多种认证方式组合

在实际应用中,可能需要支持多种认证方式:

// 认证提供者接口
type AuthProvider interface {
Authenticate(ctx context.Context, token string) (*UserInfo, error)
}

// JWT 认证提供者
type JWTAuthProvider struct {
secretKey string
}

func (p *JWTAuthProvider) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
claims, err := parseJWT(token, p.secretKey)
if err != nil {
return nil, err
}
return &UserInfo{ID: claims.UserID, Role: claims.Role}, nil
}

// OAuth2 认证提供者
type OAuth2Provider struct {
service *oauth2.Service
}

func (p *OAuth2Provider) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
tokenInfo, err := p.service.Tokeninfo().AccessToken(token).Do()
if err != nil {
return nil, err
}
return &UserInfo{ID: tokenInfo.UserId, Email: tokenInfo.Email}, nil
}

// API Key 认证提供者
type APIKeyProvider struct {
store APIKeyStore
}

func (p *APIKeyProvider) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
keyInfo, err := p.store.Validate(token)
if err != nil {
return nil, err
}
return &UserInfo{ID: keyInfo.UserID, Role: keyInfo.Role}, nil
}

// 组合认证拦截器
func multiAuthProviderInterceptor(providers map[string]AuthProvider) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}

// 获取认证类型
authTypes := md.Get("x-auth-type")
authType := "jwt" // 默认
if len(authTypes) > 0 {
authType = authTypes[0]
}

// 获取 token
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}

token := strings.TrimPrefix(tokens[0], "Bearer ")

// 选择对应的认证提供者
provider, ok := providers[authType]
if !ok {
return nil, status.Error(codes.Unauthenticated, "不支持的认证类型")
}

// 认证
userInfo, err := provider.Authenticate(ctx, token)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "认证失败")
}

// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_info", userInfo)

return handler(ctx, req)
}
}

// 使用示例
func main() {
providers := map[string]AuthProvider{
"jwt": &JWTAuthProvider{secretKey: "my-secret"},
"oauth2": &OAuth2Provider{service: oauth2Service},
"api_key": &APIKeyProvider{store: apiKeyStore},
}

s := grpc.NewServer(
grpc.UnaryInterceptor(multiAuthProviderInterceptor(providers)),
)
}

授权控制

基于角色的访问控制

// 角色定义
const (
RoleAdmin = "admin"
RoleUser = "user"
)

// 权限映射
var permissions = map[string][]string{
"/api.UserService/DeleteUser": {RoleAdmin},
"/api.UserService/ListUsers": {RoleAdmin, RoleUser},
"/api.UserService/GetUser": {RoleAdmin, RoleUser},
}

// 授权拦截器
func authzInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 获取用户角色
userID := ctx.Value("user_id").(string)
userRole := getUserRole(userID)

// 检查权限
allowedRoles, ok := permissions[info.FullMethod]
if !ok {
// 方法未定义权限,允许访问
return handler(ctx, req)
}

for _, role := range allowedRoles {
if role == userRole {
return handler(ctx, req)
}
}

return nil, status.Error(codes.PermissionDenied, "权限不足")
}

func getUserRole(userID string) string {
// 从数据库获取用户角色
return RoleUser
}

// 组合拦截器
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
authInterceptor, // 认证
authzInterceptor, // 授权
)),
)
}

安全最佳实践

1. 生产环境必须使用 TLS

// ❌ 不好:不安全连接
grpc.WithTransportCredentials(insecure.NewCredentials())

// ✅ 好:使用 TLS
grpc.WithTransportCredentials(creds)

2. 证书管理

// 定期轮换证书
func loadCertificate(certPath, keyPath string) (tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return tls.Certificate{}, err
}
return cert, nil
}

// 监控证书过期
func checkCertExpiry(cert *x509.Certificate) time.Duration {
return time.Until(cert.NotAfter)
}

3. 敏感信息处理

// ❌ 不好:日志中记录敏感信息
log.Printf("Token: %s", token)

// ✅ 好:隐藏敏感信息
log.Printf("User authenticated: %s", userID)

4. 限制请求大小

s := grpc.NewServer(
grpc.MaxRecvMsgSize(10*1024*1024), // 10MB
grpc.MaxSendMsgSize(10*1024*1024), // 10MB
)

5. 设置超时

// 服务端设置全局超时
func timeoutInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
return handler(ctx, req)
}

小结

本章我们学习了:

  1. TLS 加密:单向和双向 TLS 配置
  2. Token 认证:使用拦截器实现认证
  3. 授权控制:基于角色的访问控制
  4. 最佳实践:证书管理、敏感信息处理、请求限制

安全是生产环境 gRPC 服务的必备特性,务必正确配置。