跳到主要内容

认证与授权速查表

本文档提供认证与授权的常用命令和代码片段快速参考。

Express.js (Node.js)

const express = require('express');
const session = require('express-session');
const RedisStore = require('connect-redis').default;
const redis = require('redis');

const app = express();
const redisClient = redis.createClient({ url: 'redis://localhost:6379' });
redisClient.connect();

// 配置 Session
app.use(session({
store: new RedisStore({ client: redisClient }),
secret: 'your-secret-key-change-this',
resave: false,
saveUninitialized: false,
cookie: {
secure: true, // 仅 HTTPS
httpOnly: true, // 禁止 JS 访问
maxAge: 1000 * 60 * 30, // 30 分钟
sameSite: 'strict' // 防 CSRF
}
}));

// 登录
app.post('/login', async (req, res) => {
const { username, password } = req.body;
const user = await authenticateUser(username, password);
if (user) {
req.session.userId = user.id;
req.session.username = user.username;
res.json({ success: true });
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});

// 登出
app.post('/logout', (req, res) => {
req.session.destroy((err) => {
res.clearCookie('connect.sid');
res.json({ success: true });
});
});

// 受保护路由
app.get('/profile', (req, res) => {
if (!req.session.userId) {
return res.status(401).json({ error: 'Not authenticated' });
}
res.json({ userId: req.session.userId, username: req.session.username });
});

Spring Boot (Java)

// SecurityConfig.java
@Configuration
@EnableWebSecurity
public class SecurityConfig {

@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/login", "/public/**").permitAll()
.anyRequest().authenticated()
)
.formLogin(form -> form
.loginPage("/login")
.defaultSuccessUrl("/dashboard")
.permitAll()
)
.logout(logout -> logout
.logoutSuccessUrl("/login?logout")
.invalidateHttpSession(true)
.deleteCookies("JSESSIONID")
)
.sessionManagement(session -> session
.sessionCreationPolicy(SessionCreationPolicy.IF_REQUIRED)
.maximumSessions(1)
.expiredUrl("/login?expired")
);
return http.build();
}
}

Python (Flask)

from flask import Flask, session, request, jsonify
from flask_session import Session
import redis

app = Flask(__name__)
app.secret_key = 'your-secret-key-change-this'

# Redis Session 配置
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.from_url('redis://localhost:6379')
app.config['PERMANENT_SESSION_LIFETIME'] = 1800 # 30 分钟
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Strict'

Session(app)

@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
user = authenticate_user(data['username'], data['password'])
if user:
session['user_id'] = user.id
session['username'] = user.username
return jsonify({'success': True})
return jsonify({'error': 'Invalid credentials'}), 401

@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return jsonify({'success': True})

@app.route('/profile')
def profile():
if 'user_id' not in session:
return jsonify({'error': 'Not authenticated'}), 401
return jsonify({'user_id': session['user_id'], 'username': session['username']})

Go

package main

import (
"encoding/gob"
"net/http"
"github.com/gorilla/sessions"
"github.com/gorilla/securecookie"
)

var store *sessions.CookieStore

func init() {
store = sessions.NewCookieStore(securecookie.GenerateRandomKey(32))
store.Options = &sessions.Options{
Path: "/",
MaxAge: 1800, // 30 分钟
HttpOnly: true,
Secure: true,
SameSite: http.SameSiteStrictMode,
}
gob.Register(&User{})
}

func loginHandler(w http.ResponseWriter, r *http.Request) {
username := r.FormValue("username")
password := r.FormValue("password")

user, err := authenticateUser(username, password)
if err != nil {
http.Error(w, "Invalid credentials", http.StatusUnauthorized)
return
}

session, _ := store.Get(r, "session-name")
session.Values["user"] = user
session.Save(r, w)

w.Write([]byte("Login successful"))
}

func logoutHandler(w http.ResponseWriter, r *http.Request) {
session, _ := store.Get(r, "session-name")
session.Options.MaxAge = -1
session.Save(r, w)
w.Write([]byte("Logged out"))
}

func authMiddleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
session, _ := store.Get(r, "session-name")
if session.Values["user"] == nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
next(w, r)
}
}

JWT

Node.js (jsonwebtoken)

const jwt = require('jsonwebtoken');

const SECRET_KEY = process.env.JWT_SECRET || 'your-secret-key';
const ACCESS_TOKEN_EXPIRY = '15m';
const REFRESH_TOKEN_EXPIRY = '7d';

// 生成 Access Token
function generateAccessToken(user) {
return jwt.sign(
{ userId: user.id, username: user.username, role: user.role },
SECRET_KEY,
{ expiresIn: ACCESS_TOKEN_EXPIRY }
);
}

// 生成 Refresh Token
function generateRefreshToken(user) {
return jwt.sign(
{ userId: user.id },
SECRET_KEY,
{ expiresIn: REFRESH_TOKEN_EXPIRY }
);
}

// 验证 Token
function verifyToken(token) {
try {
return jwt.verify(token, SECRET_KEY);
} catch (err) {
return null;
}
}

// 认证中间件
function authMiddleware(req, res, next) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'No token provided' });
}

const token = authHeader.split(' ')[1];
const decoded = verifyToken(token);
if (!decoded) {
return res.status(401).json({ error: 'Invalid or expired token' });
}

req.user = decoded;
next();
}

Python (PyJWT)

import jwt
import datetime
from functools import wraps
from flask import request, jsonify

SECRET_KEY = 'your-secret-key'
ACCESS_TOKEN_EXPIRY = datetime.timedelta(minutes=15)
REFRESH_TOKEN_EXPIRY = datetime.timedelta(days=7)

def generate_access_token(user):
payload = {
'user_id': user.id,
'username': user.username,
'role': user.role,
'exp': datetime.datetime.utcnow() + ACCESS_TOKEN_EXPIRY,
'iat': datetime.datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

def generate_refresh_token(user):
payload = {
'user_id': user.id,
'exp': datetime.datetime.utcnow() + REFRESH_TOKEN_EXPIRY,
'iat': datetime.datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None

def auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401

token = auth_header.split(' ')[1]
payload = verify_token(token)
if not payload:
return jsonify({'error': 'Invalid or expired token'}), 401

request.user = payload
return f(*args, **kwargs)
return decorated

Go (golang-jwt)

package auth

import (
"errors"
"time"
"github.com/golang-jwt/jwt/v5"
)

var secretKey = []byte("your-secret-key")

type Claims struct {
UserID uint `json:"user_id"`
Username string `json:"username"`
Role string `json:"role"`
jwt.RegisteredClaims
}

func GenerateAccessToken(userID uint, username, role string) (string, error) {
claims := Claims{
UserID: userID,
Username: username,
Role: role,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(15 * time.Minute)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}

token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(secretKey)
}

func GenerateRefreshToken(userID uint) (string, error) {
claims := jwt.RegisteredClaims{
Subject: string(rune(userID)),
ExpiresAt: jwt.NewNumericDate(time.Now().Add(7 * 24 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
}

token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(secretKey)
}

func VerifyToken(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return secretKey, nil
})

if err != nil {
return nil, err
}

if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}

return nil, errors.New("invalid token")
}

Java (jjwt)

import io.jsonwebtoken.*;
import io.jsonwebtoken.security.Keys;
import java.security.Key;
import java.util.Date;

public class JwtUtil {
private static final Key SECRET_KEY = Keys.secretKeyFor(SignatureAlgorithm.HS256);
private static final long ACCESS_TOKEN_EXPIRY = 15 * 60 * 1000; // 15 分钟
private static final long REFRESH_TOKEN_EXPIRY = 7 * 24 * 60 * 60 * 1000; // 7 天

public static String generateAccessToken(Long userId, String username, String role) {
return Jwts.builder()
.setSubject(String.valueOf(userId))
.claim("username", username)
.claim("role", role)
.setIssuedAt(new Date())
.setExpiration(new Date(System.currentTimeMillis() + ACCESS_TOKEN_EXPIRY))
.signWith(SECRET_KEY)
.compact();
}

public static String generateRefreshToken(Long userId) {
return Jwts.builder()
.setSubject(String.valueOf(userId))
.setIssuedAt(new Date())
.setExpiration(new Date(System.currentTimeMillis() + REFRESH_TOKEN_EXPIRY))
.signWith(SECRET_KEY)
.compact();
}

public static Claims verifyToken(String token) {
try {
return Jwts.parserBuilder()
.setSigningKey(SECRET_KEY)
.build()
.parseClaimsJws(token)
.getBody();
} catch (JwtException e) {
return null;
}
}
}

OAuth 2.0

Express.js + Passport

const passport = require('passport');
const GoogleStrategy = require('passport-google-oauth20').Strategy;
const GitHubStrategy = require('passport-github2').Strategy;

// Google OAuth
passport.use(new GoogleStrategy({
clientID: process.env.GOOGLE_CLIENT_ID,
clientSecret: process.env.GOOGLE_CLIENT_SECRET,
callbackURL: '/auth/google/callback'
},
(accessToken, refreshToken, profile, done) => {
// 查找或创建用户
User.findOrCreate({ googleId: profile.id }, (err, user) => {
return done(err, user);
});
}
));

// GitHub OAuth
passport.use(new GitHubStrategy({
clientID: process.env.GITHUB_CLIENT_ID,
clientSecret: process.env.GITHUB_CLIENT_SECRET,
callbackURL: '/auth/github/callback'
},
(accessToken, refreshToken, profile, done) => {
User.findOrCreate({ githubId: profile.id }, (err, user) => {
return done(err, user);
});
}
));

// 路由
app.get('/auth/google', passport.authenticate('google', { scope: ['profile', 'email'] }));
app.get('/auth/google/callback',
passport.authenticate('google', { failureRedirect: '/login' }),
(req, res) => res.redirect('/dashboard')
);

app.get('/auth/github', passport.authenticate('github', { scope: ['user:email'] }));
app.get('/auth/github/callback',
passport.authenticate('github', { failureRedirect: '/login' }),
(req, res) => res.redirect('/dashboard')
);

Spring Boot OAuth2 Client

# application.yml
spring:
security:
oauth2:
client:
registration:
google:
client-id: ${GOOGLE_CLIENT_ID}
client-secret: ${GOOGLE_CLIENT_SECRET}
scope:
- email
- profile
github:
client-id: ${GITHUB_CLIENT_ID}
client-secret: ${GITHUB_CLIENT_SECRET}
scope:
- user:email
@Configuration
@EnableWebSecurity
public class SecurityConfig {

@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/", "/login", "/error").permitAll()
.anyRequest().authenticated()
)
.oauth2Login(oauth2 -> oauth2
.loginPage("/login")
.defaultSuccessUrl("/dashboard")
)
.logout(logout -> logout
.logoutSuccessUrl("/")
);
return http.build();
}
}

Python (Authlib)

from authlib.integrations.flask_client import OAuth

oauth = OAuth(app)

# 注册 Google
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)

# 注册 GitHub
oauth.register(
name='github',
client_id=os.getenv('GITHUB_CLIENT_ID'),
client_secret=os.getenv('GITHUB_CLIENT_SECRET'),
access_token_url='https://github.com/login/oauth/access_token',
authorize_url='https://github.com/login/oauth/authorize',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'}
)

@app.route('/login/google')
def google_login():
redirect_uri = url_for('google_callback', _external=True)
return oauth.google.authorize_redirect(redirect_uri)

@app.route('/auth/google/callback')
def google_callback():
token = oauth.google.authorize_access_token()
user_info = token.get('userinfo')
# 处理用户信息
return redirect('/dashboard')

@app.route('/login/github')
def github_login():
redirect_uri = url_for('github_callback', _external=True)
return oauth.github.authorize_redirect(redirect_uri)

@app.route('/auth/github/callback')
def github_callback():
token = oauth.github.authorize_access_token()
resp = oauth.github.get('user')
user_info = resp.json()
# 处理用户信息
return redirect('/dashboard')

OpenID Connect

Node.js (openid-client)

const { Issuer, generators } = require('openid-client');

async function setupOIDC() {
// Discovery
const googleIssuer = await Issuer.discover('https://accounts.google.com');

const client = new googleIssuer.Client({
client_id: process.env.GOOGLE_CLIENT_ID,
client_secret: process.env.GOOGLE_CLIENT_SECRET,
redirect_uris: ['http://localhost:3000/auth/callback'],
response_types: ['code']
});

return client;
}

// 登录
app.get('/login', (req, res) => {
const codeVerifier = generators.codeVerifier();
const codeChallenge = generators.codeChallenge(codeVerifier);
const nonce = generators.nonce();
const state = generators.state();

req.session.codeVerifier = codeVerifier;
req.session.nonce = nonce;
req.session.state = state;

const authUrl = client.authorizationUrl({
scope: 'openid email profile',
code_challenge: codeChallenge,
code_challenge_method: 'S256',
nonce: nonce,
state: state
});

res.redirect(authUrl);
});

// 回调
app.get('/auth/callback', async (req, res) => {
const params = client.callbackParams(req);

const tokenSet = await client.callback(
'http://localhost:3000/auth/callback',
params,
{
code_verifier: req.session.codeVerifier,
nonce: req.session.nonce,
state: req.session.state
}
);

const userinfo = await client.userinfo(tokenSet.access_token);

// 验证 ID Token
// tokenSet.claims() 包含 ID Token 的声明

req.session.user = userinfo;
res.redirect('/dashboard');
});

Python (python-social-auth)

# settings.py
AUTHENTICATION_BACKENDS = (
'social_core.backends.google.GoogleOAuth2',
'social_core.backends.github.GithubOAuth2',
)

SOCIAL_AUTH_GOOGLE_OAUTH2_KEY = os.getenv('GOOGLE_CLIENT_ID')
SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET = os.getenv('GOOGLE_CLIENT_SECRET')

SOCIAL_AUTH_GITHUB_KEY = os.getenv('GITHUB_CLIENT_ID')
SOCIAL_AUTH_GITHUB_SECRET = os.getenv('GITHUB_CLIENT_SECRET')

# urls.py
urlpatterns = [
path('auth/', include('social_django.urls', namespace='social')),
]

# 登录链接
# <a href="{% url 'social:begin' 'google-oauth2' %}">Login with Google</a>
# <a href="{% url 'social:begin' 'github' %}">Login with GitHub</a>

密码哈希

bcrypt (Node.js)

const bcrypt = require('bcrypt');

const SALT_ROUNDS = 12;

// 哈希密码
async function hashPassword(password) {
return await bcrypt.hash(password, SALT_ROUNDS);
}

// 验证密码
async function verifyPassword(password, hash) {
return await bcrypt.compare(password, hash);
}

bcrypt (Python)

import bcrypt

def hash_password(password: str) -> str:
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode(), salt).decode()

def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())

Argon2 (推荐)

// Node.js
const argon2 = require('argon2');

async function hashPassword(password) {
return await argon2.hash(password);
}

async function verifyPassword(password, hash) {
return await argon2.verify(hash, password);
}
# Python
from argon2 import PasswordHasher

ph = PasswordHasher()

def hash_password(password: str) -> str:
return ph.hash(password)

def verify_password(password: str, hashed: str) -> bool:
try:
return ph.verify(hashed, password)
except:
return False

安全头配置

Express.js (helmet)

const helmet = require('helmet');

app.use(helmet());

// 自定义配置
app.use(helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'", "'unsafe-inline'"],
styleSrc: ["'self'", "'unsafe-inline'"],
imgSrc: ["'self'", "data:", "https:"],
}
},
hsts: {
maxAge: 31536000,
includeSubDomains: true,
preload: true
}
}));

Nginx

server {
# 安全头
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'" always;

# 禁用敏感头
server_tokens off;
}

Rate Limiting

Express.js

const rateLimit = require('express-rate-limit');

const loginLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 分钟
max: 5, // 最多 5 次尝试
message: { error: 'Too many login attempts, please try again later' },
standardHeaders: true,
legacyHeaders: false,
});

app.post('/login', loginLimiter, (req, res) => {
// 登录逻辑
});

Redis 滑动窗口

const Redis = require('ioredis');
const redis = new Redis();

async function rateLimit(key, limit, windowSeconds) {
const now = Date.now();
const windowStart = now - windowSeconds * 1000;

const pipeline = redis.pipeline();
pipeline.zremrangebyscore(key, 0, windowStart);
pipeline.zadd(key, now, `${now}:${Math.random()}`);
pipeline.zcard(key);
pipeline.expire(key, windowSeconds);

const results = await pipeline.exec();
const count = results[2][1];

return count <= limit;
}

// 使用
app.use(async (req, res, next) => {
const key = `rate:${req.ip}`;
const allowed = await rateLimit(key, 100, 60); // 每分钟 100 次

if (!allowed) {
return res.status(429).json({ error: 'Rate limit exceeded' });
}
next();
});

CSRF 防护

Express.js (csurf)

const csrf = require('csurf');
const cookieParser = require('cookie-parser');

app.use(cookieParser());
app.use(csrf({ cookie: true }));

// 获取 CSRF Token
app.get('/form', (req, res) => {
res.render('form', { csrfToken: req.csrfToken() });
});

// 表单中包含 token
// <input type="hidden" name="_csrf" value="{{csrfToken}}">

Django

# settings.py
MIDDLEWARE = [
'django.middleware.csrf.CsrfViewMiddleware',
# ...
]

# 模板中
# {% csrf_token %}

常用命令

JWT 调试

# 解码 JWT (不验证签名)
echo "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" | cut -d. -f2 | base64 -d 2>/dev/null | jq .

# 使用 jwt-cli
npm install -g @tsndr/jwt-cli
jwt decode <token>
jwt encode --secret=secret '{"user":"test"}'

OpenSSL

# 生成密钥
openssl rand -base64 32

# 生成 RSA 密钥对
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem

# 生成自签名证书
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes

Redis Session

# 查看所有 session
redis-cli KEYS "sess:*"

# 查看特定 session
redis-cli GET "sess:abc123"

# 删除 session
redis-cli DEL "sess:abc123"

# 设置 session 过期时间
redis-cli EXPIRE "sess:abc123" 1800