安全认证
本章介绍 gRPC 的安全机制,包括 TLS/SSL 加密、认证方式和授权控制。
安全概述
TLS 加密
生成证书
# 创建 CA 私钥
openssl genrsa -out ca.key 4096
# 创建 CA 证书
openssl req -new -x509 -days 365 -key ca.key -out ca.crt \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/CN=MyCA"
# 创建服务端私钥
openssl genrsa -out server.key 4096
# 创建服务端 CSR
openssl req -new -key server.key -out server.csr \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/CN=localhost"
# 使用 CA 签发服务端证书
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out server.crt
# 创建客户端私钥和证书(用于 mTLS)
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr \
-subj "/C=CN/ST=Beijing/L=Beijing/O=MyOrg/CN=client"
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out client.crt
服务端 TLS(单向)
服务端使用证书,客户端验证服务端身份:
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func main() {
// 加载服务端证书
creds, err := credentials.NewServerTLSFromFile(
"server.crt", // 证书文件
"server.key", // 私钥文件
)
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}
// 创建带 TLS 的服务器
s := grpc.NewServer(grpc.Creds(creds))
// 注册服务...
pb.RegisterGreeterServer(s, &server{})
lis, _ := net.Listen("tcp", ":50051")
s.Serve(lis)
}
客户端:
func main() {
// 加载 CA 证书验证服务端
creds, err := credentials.NewClientTLSFromFile(
"ca.crt", // CA 证书
"localhost", // 服务器域名(需与证书 CN 匹配)
)
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}
// 创建带 TLS 的连接
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(creds),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewGreeterClient(conn)
// ...
}
双向 TLS(mTLS)
双方互相验证:
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"google.golang.org/grpc/credentials"
)
// 服务端配置
func serverTLS() credentials.TransportCredentials {
// 加载 CA 证书(用于验证客户端)
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
// 配置 TLS
config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("server.crt", "server.key"),
},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caPool,
}
return credentials.NewTLS(config)
}
// 客户端配置
func clientTLS() credentials.TransportCredentials {
caCert, _ := ioutil.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
config := &tls.Config{
Certificates: []tls.Certificate{
loadCert("client.crt", "client.key"),
},
RootCAs: caPool,
}
return credentials.NewTLS(config)
}
func loadCert(certFile, keyFile string) tls.Certificate {
cert, _ := tls.LoadX509KeyPair(certFile, keyFile)
return cert
}
Token 认证
拦截器实现
// 认证拦截器
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/api.Auth/Login" || info.FullMethod == "/api.Auth/Register" {
return handler(ctx, req)
}
// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
values := md.Get("authorization")
if len(values) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证 token")
}
token := values[0]
if !strings.HasPrefix(token, "Bearer ") {
return nil, status.Error(codes.Unauthenticated, "无效的 token 格式")
}
// 验证 token
userID, err := validateToken(strings.TrimPrefix(token, "Bearer "))
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}
// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", userID)
return handler(ctx, req)
}
// 验证 token(示例)
func validateToken(token string) (string, error) {
// 这里实现实际的 token 验证逻辑
// 可以使用 JWT 或查询数据库
return "user123", nil
}
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(authInterceptor),
)
}
客户端发送 Token
func callWithToken(client pb.GreeterClient, token string) {
// 创建带 token 的元数据
md := metadata.New(map[string]string{
"authorization": "Bearer " + token,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 调用服务
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "World"})
if err != nil {
log.Printf("调用失败: %v", err)
return
}
log.Printf("响应: %s", resp.Message)
}
Per-RPC 认证
// 使用 PerRPCCredentials 接口
type tokenAuth struct {
token string
}
func (t *tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}
func (t *tokenAuth) RequireTransportSecurity() bool {
return true // 要求 TLS
}
func main() {
// 创建认证凭证
auth := &tokenAuth{token: "my-jwt-token"}
// 连接时使用
conn, _ := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(auth),
)
// 所有调用都会自动带上 token
client := pb.NewGreeterClient(conn)
client.SayHello(ctx, req) // 自动带上 token
}
JWT 认证实战
JWT(JSON Web Token)是现代微服务中常用的认证方式。下面展示如何在 gRPC 中集成 JWT。
JWT 服务端认证
import (
"github.com/golang-jwt/jwt/v5"
)
// JWT Claims
type Claims struct {
UserID string `json:"user_id"`
Role string `json:"role"`
jwt.RegisteredClaims
}
// JWT 认证拦截器
func jwtAuthInterceptor(secretKey string) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过不需要认证的方法
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}
// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
authHeader := md.Get("authorization")
if len(authHeader) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
// 解析 Bearer token
tokenString := strings.TrimPrefix(authHeader[0], "Bearer ")
if tokenString == authHeader[0] {
return nil, status.Error(codes.Unauthenticated, "无效的认证格式")
}
// 解析 JWT
claims, err := parseJWT(tokenString, secretKey)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}
// 检查 token 是否过期
if claims.ExpiresAt != nil && claims.ExpiresAt.Before(time.Now()) {
return nil, status.Error(codes.Unauthenticated, "token 已过期")
}
// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", claims.UserID)
ctx = context.WithValue(ctx, "role", claims.Role)
return handler(ctx, req)
}
}
// 解析 JWT
func parseJWT(tokenString, secretKey string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
// 验证签名算法
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("无效的签名算法")
}
return []byte(secretKey), nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("无效的 token")
}
// 公开方法列表
func isPublicMethod(method string) bool {
publicMethods := map[string]bool{
"/api.Auth/Login": true,
"/api.Auth/Register": true,
"/api.Auth/RefreshToken": true,
"/grpc.health.v1.Health/Check": true,
}
return publicMethods[method]
}
JWT 登录服务
// 登录服务
type AuthService struct {
pb.UnimplementedAuthServer
secretKey string
tokenDuration time.Duration
userStore UserStore
}
func (s *AuthService) Login(ctx context.Context, req *pb.LoginRequest) (*pb.LoginResponse, error) {
// 验证用户凭据
user, err := s.userStore.ValidateUser(req.Username, req.Password)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "用户名或密码错误")
}
// 生成 JWT
token, err := s.generateToken(user)
if err != nil {
return nil, status.Error(codes.Internal, "生成 token 失败")
}
// 生成刷新 token
refreshToken, err := s.generateRefreshToken(user)
if err != nil {
return nil, status.Error(codes.Internal, "生成刷新 token 失败")
}
return &pb.LoginResponse{
AccessToken: token,
RefreshToken: refreshToken,
ExpiresIn: int64(s.tokenDuration.Seconds()),
User: user,
}, nil
}
func (s *AuthService) generateToken(user *User) (string, error) {
claims := &Claims{
UserID: user.ID,
Role: user.Role,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(s.tokenDuration)),
IssuedAt: jwt.NewNumericDate(time.Now()),
Issuer: "my-grpc-service",
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString([]byte(s.secretKey))
}
// 刷新 Token
func (s *AuthService) RefreshToken(ctx context.Context, req *pb.RefreshTokenRequest) (*pb.LoginResponse, error) {
// 验证刷新 token
claims, err := parseJWT(req.RefreshToken, s.secretKey)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的刷新 token")
}
// 检查刷新 token 是否有效(可以维护一个黑名单)
if s.isTokenRevoked(req.RefreshToken) {
return nil, status.Error(codes.Unauthenticated, "token 已失效")
}
// 获取用户信息
user, err := s.userStore.GetByID(claims.UserID)
if err != nil {
return nil, status.Error(codes.NotFound, "用户不存在")
}
// 生成新的 access token
newToken, err := s.generateToken(user)
if err != nil {
return nil, status.Error(codes.Internal, "生成 token 失败")
}
return &pb.LoginResponse{
AccessToken: newToken,
ExpiresIn: int64(s.tokenDuration.Seconds()),
}, nil
}
JWT 客户端
// JWT 客户端封装
type JWTClient struct {
conn *grpc.ClientConn
authService pb.AuthClient
accessToken string
refreshToken string
mutex sync.RWMutex
}
func NewJWTClient(address string, creds credentials.TransportCredentials) (*JWTClient, error) {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, err
}
return &JWTClient{
conn: conn,
authService: pb.NewAuthClient(conn),
}, nil
}
// 登录获取 token
func (c *JWTClient) Login(username, password string) error {
resp, err := c.authService.Login(context.Background(), &pb.LoginRequest{
Username: username,
Password: password,
})
if err != nil {
return err
}
c.mutex.Lock()
c.accessToken = resp.AccessToken
c.refreshToken = resp.RefreshToken
c.mutex.Unlock()
return nil
}
// 自动刷新 token 的拦截器
func (c *JWTClient) refreshTokenInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 第一次尝试
err := invoker(c.withToken(ctx), method, req, reply, cc, opts...)
// 如果 token 过期,尝试刷新
if st, ok := status.FromError(err); ok && st.Code() == codes.Unauthenticated {
if refreshErr := c.refreshAccessToken(); refreshErr == nil {
// 刷新成功,重试请求
return invoker(c.withToken(ctx), method, req, reply, cc, opts...)
}
}
return err
}
}
func (c *JWTClient) refreshAccessToken() error {
c.mutex.RLock()
refreshToken := c.refreshToken
c.mutex.RUnlock()
if refreshToken == "" {
return fmt.Errorf("没有刷新 token")
}
resp, err := c.authService.RefreshToken(context.Background(), &pb.RefreshTokenRequest{
RefreshToken: refreshToken,
})
if err != nil {
return err
}
c.mutex.Lock()
c.accessToken = resp.AccessToken
c.mutex.Unlock()
return nil
}
func (c *JWTClient) withToken(ctx context.Context) context.Context {
c.mutex.RLock()
token := c.accessToken
c.mutex.RUnlock()
return metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
}
OAuth2 集成
在企业环境中,可能需要集成 OAuth2 进行认证。以下是集成 Google OAuth2 的示例。
服务端 OAuth2 验证
import (
"google.golang.org/api/oauth2/v2"
)
// OAuth2 认证拦截器
func oauth2Interceptor() grpc.UnaryServerInterceptor {
oauth2Service, err := oauth2.New(http.DefaultClient)
if err != nil {
panic(err)
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
token := strings.TrimPrefix(tokens[0], "Bearer ")
// 验证 OAuth2 token
tokenInfo, err := oauth2Service.Tokeninfo().AccessToken(token).Do()
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 OAuth2 token")
}
// 验证受众(Audience)
if tokenInfo.Audience != "your-client-id" {
return nil, status.Error(codes.Unauthenticated, "token 受众不匹配")
}
// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_id", tokenInfo.UserId)
ctx = context.WithValue(ctx, "email", tokenInfo.Email)
return handler(ctx, req)
}
}
多种认证方式组合
在实际应用中,可能需要支持多种认证方式:
// 认证提供者接口
type AuthProvider interface {
Authenticate(ctx context.Context, token string) (*UserInfo, error)
}
// JWT 认证提供者
type JWTAuthProvider struct {
secretKey string
}
func (p *JWTAuthProvider) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
claims, err := parseJWT(token, p.secretKey)
if err != nil {
return nil, err
}
return &UserInfo{ID: claims.UserID, Role: claims.Role}, nil
}
// OAuth2 认证提供者
type OAuth2Provider struct {
service *oauth2.Service
}
func (p *OAuth2Provider) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
tokenInfo, err := p.service.Tokeninfo().AccessToken(token).Do()
if err != nil {
return nil, err
}
return &UserInfo{ID: tokenInfo.UserId, Email: tokenInfo.Email}, nil
}
// API Key 认证提供者
type APIKeyProvider struct {
store APIKeyStore
}
func (p *APIKeyProvider) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
keyInfo, err := p.store.Validate(token)
if err != nil {
return nil, err
}
return &UserInfo{ID: keyInfo.UserID, Role: keyInfo.Role}, nil
}
// 组合认证拦截器
func multiAuthProviderInterceptor(providers map[string]AuthProvider) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if isPublicMethod(info.FullMethod) {
return handler(ctx, req)
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
// 获取认证类型
authTypes := md.Get("x-auth-type")
authType := "jwt" // 默认
if len(authTypes) > 0 {
authType = authTypes[0]
}
// 获取 token
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少认证信息")
}
token := strings.TrimPrefix(tokens[0], "Bearer ")
// 选择对应的认证提供者
provider, ok := providers[authType]
if !ok {
return nil, status.Error(codes.Unauthenticated, "不支持的认证类型")
}
// 认证
userInfo, err := provider.Authenticate(ctx, token)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "认证失败")
}
// 将用户信息添加到上下文
ctx = context.WithValue(ctx, "user_info", userInfo)
return handler(ctx, req)
}
}
// 使用示例
func main() {
providers := map[string]AuthProvider{
"jwt": &JWTAuthProvider{secretKey: "my-secret"},
"oauth2": &OAuth2Provider{service: oauth2Service},
"api_key": &APIKeyProvider{store: apiKeyStore},
}
s := grpc.NewServer(
grpc.UnaryInterceptor(multiAuthProviderInterceptor(providers)),
)
}
授权控制
基于角色的访问控制
// 角色定义
const (
RoleAdmin = "admin"
RoleUser = "user"
)
// 权限映射
var permissions = map[string][]string{
"/api.UserService/DeleteUser": {RoleAdmin},
"/api.UserService/ListUsers": {RoleAdmin, RoleUser},
"/api.UserService/GetUser": {RoleAdmin, RoleUser},
}
// 授权拦截器
func authzInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 获取用户角色
userID := ctx.Value("user_id").(string)
userRole := getUserRole(userID)
// 检查权限
allowedRoles, ok := permissions[info.FullMethod]
if !ok {
// 方法未定义权限,允许访问
return handler(ctx, req)
}
for _, role := range allowedRoles {
if role == userRole {
return handler(ctx, req)
}
}
return nil, status.Error(codes.PermissionDenied, "权限不足")
}
func getUserRole(userID string) string {
// 从数据库获取用户角色
return RoleUser
}
// 组合拦截器
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
authInterceptor, // 认证
authzInterceptor, // 授权
)),
)
}
安全最佳实践
1. 生产环境必须使用 TLS
// ❌ 不好:不安全连接
grpc.WithTransportCredentials(insecure.NewCredentials())
// ✅ 好:使用 TLS
grpc.WithTransportCredentials(creds)
2. 证书管理
// 定期轮换证书
func loadCertificate(certPath, keyPath string) (tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return tls.Certificate{}, err
}
return cert, nil
}
// 监控证书过期
func checkCertExpiry(cert *x509.Certificate) time.Duration {
return time.Until(cert.NotAfter)
}
3. 敏感信息处理
// ❌ 不好:日志中记录敏感信息
log.Printf("Token: %s", token)
// ✅ 好:隐藏敏感信息
log.Printf("User authenticated: %s", userID)
4. 限制请求大小
s := grpc.NewServer(
grpc.MaxRecvMsgSize(10*1024*1024), // 10MB
grpc.MaxSendMsgSize(10*1024*1024), // 10MB
)
5. 设置超时
// 服务端设置全局超时
func timeoutInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
return handler(ctx, req)
}
小结
本章我们学习了:
- TLS 加密:单向和双向 TLS 配置
- Token 认证:使用拦截器实现认证
- 授权控制:基于角色的访问控制
- 最佳实践:证书管理、敏感信息处理、请求限制
安全是生产环境 gRPC 服务的必备特性,务必正确配置。