认证与授权速查表
本文档提供认证与授权的常用命令和代码片段快速参考。
Session-Cookie
Express.js (Node.js)
const express = require('express');
const session = require('express-session');
const RedisStore = require('connect-redis').default;
const redis = require('redis');
const app = express();
const redisClient = redis.createClient({ url: 'redis://localhost:6379' });
redisClient.connect();
// 配置 Session
app.use(session({
store: new RedisStore({ client: redisClient }),
secret: 'your-secret-key-change-this',
resave: false,
saveUninitialized: false,
cookie: {
secure: true, // 仅 HTTPS
httpOnly: true, // 禁止 JS 访问
maxAge: 1000 * 60 * 30, // 30 分钟
sameSite: 'strict' // 防 CSRF
}
}));
// 登录
app.post('/login', async (req, res) => {
const { username, password } = req.body;
const user = await authenticateUser(username, password);
if (user) {
req.session.userId = user.id;
req.session.username = user.username;
res.json({ success: true });
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
// 登出
app.post('/logout', (req, res) => {
req.session.destroy((err) => {
res.clearCookie('connect.sid');
res.json({ success: true });
});
});
// 受保护路由
app.get('/profile', (req, res) => {
if (!req.session.userId) {
return res.status(401).json({ error: 'Not authenticated' });
}
res.json({ userId: req.session.userId, username: req.session.username });
});
Spring Boot (Java)
// SecurityConfig.java
@Configuration
@EnableWebSecurity
public class SecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/login", "/public/**").permitAll()
.anyRequest().authenticated()
)
.formLogin(form -> form
.loginPage("/login")
.defaultSuccessUrl("/dashboard")
.permitAll()
)
.logout(logout -> logout
.logoutSuccessUrl("/login?logout")
.invalidateHttpSession(true)
.deleteCookies("JSESSIONID")
)
.sessionManagement(session -> session
.sessionCreationPolicy(SessionCreationPolicy.IF_REQUIRED)
.maximumSessions(1)
.expiredUrl("/login?expired")
);
return http.build();
}
}
Python (Flask)
from flask import Flask, session, request, jsonify
from flask_session import Session
import redis
app = Flask(__name__)
app.secret_key = 'your-secret-key-change-this'
# Redis Session 配置
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.from_url('redis://localhost:6379')
app.config['PERMANENT_SESSION_LIFETIME'] = 1800 # 30 分钟
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Strict'
Session(app)
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
user = authenticate_user(data['username'], data['password'])
if user:
session['user_id'] = user.id
session['username'] = user.username
return jsonify({'success': True})
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return jsonify({'success': True})
@app.route('/profile')
def profile():
if 'user_id' not in session:
return jsonify({'error': 'Not authenticated'}), 401
return jsonify({'user_id': session['user_id'], 'username': session['username']})
Go
package main
import (
"encoding/gob"
"net/http"
"github.com/gorilla/sessions"
"github.com/gorilla/securecookie"
)
var store *sessions.CookieStore
func init() {
store = sessions.NewCookieStore(securecookie.GenerateRandomKey(32))
store.Options = &sessions.Options{
Path: "/",
MaxAge: 1800, // 30 分钟
HttpOnly: true,
Secure: true,
SameSite: http.SameSiteStrictMode,
}
gob.Register(&User{})
}
func loginHandler(w http.ResponseWriter, r *http.Request) {
username := r.FormValue("username")
password := r.FormValue("password")
user, err := authenticateUser(username, password)
if err != nil {
http.Error(w, "Invalid credentials", http.StatusUnauthorized)
return
}
session, _ := store.Get(r, "session-name")
session.Values["user"] = user
session.Save(r, w)
w.Write([]byte("Login successful"))
}
func logoutHandler(w http.ResponseWriter, r *http.Request) {
session, _ := store.Get(r, "session-name")
session.Options.MaxAge = -1
session.Save(r, w)
w.Write([]byte("Logged out"))
}
func authMiddleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
session, _ := store.Get(r, "session-name")
if session.Values["user"] == nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
next(w, r)
}
}
JWT
Node.js (jsonwebtoken)
const jwt = require('jsonwebtoken');
const SECRET_KEY = process.env.JWT_SECRET || 'your-secret-key';
const ACCESS_TOKEN_EXPIRY = '15m';
const REFRESH_TOKEN_EXPIRY = '7d';
// 生成 Access Token
function generateAccessToken(user) {
return jwt.sign(
{ userId: user.id, username: user.username, role: user.role },
SECRET_KEY,
{ expiresIn: ACCESS_TOKEN_EXPIRY }
);
}
// 生成 Refresh Token
function generateRefreshToken(user) {
return jwt.sign(
{ userId: user.id },
SECRET_KEY,
{ expiresIn: REFRESH_TOKEN_EXPIRY }
);
}
// 验证 Token
function verifyToken(token) {
try {
return jwt.verify(token, SECRET_KEY);
} catch (err) {
return null;
}
}
// 认证中间件
function authMiddleware(req, res, next) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'No token provided' });
}
const token = authHeader.split(' ')[1];
const decoded = verifyToken(token);
if (!decoded) {
return res.status(401).json({ error: 'Invalid or expired token' });
}
req.user = decoded;
next();
}
Python (PyJWT)
import jwt
import datetime
from functools import wraps
from flask import request, jsonify
SECRET_KEY = 'your-secret-key'
ACCESS_TOKEN_EXPIRY = datetime.timedelta(minutes=15)
REFRESH_TOKEN_EXPIRY = datetime.timedelta(days=7)
def generate_access_token(user):
payload = {
'user_id': user.id,
'username': user.username,
'role': user.role,
'exp': datetime.datetime.utcnow() + ACCESS_TOKEN_EXPIRY,
'iat': datetime.datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
def generate_refresh_token(user):
payload = {
'user_id': user.id,
'exp': datetime.datetime.utcnow() + REFRESH_TOKEN_EXPIRY,
'iat': datetime.datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split(' ')[1]
payload = verify_token(token)
if not payload:
return jsonify({'error': 'Invalid or expired token'}), 401
request.user = payload
return f(*args, **kwargs)
return decorated
Go (golang-jwt)
package auth
import (
"errors"
"time"
"github.com/golang-jwt/jwt/v5"
)
var secretKey = []byte("your-secret-key")
type Claims struct {
UserID uint `json:"user_id"`
Username string `json:"username"`
Role string `json:"role"`
jwt.RegisteredClaims
}
func GenerateAccessToken(userID uint, username, role string) (string, error) {
claims := Claims{
UserID: userID,
Username: username,
Role: role,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(15 * time.Minute)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(secretKey)
}
func GenerateRefreshToken(userID uint) (string, error) {
claims := jwt.RegisteredClaims{
Subject: string(rune(userID)),
ExpiresAt: jwt.NewNumericDate(time.Now().Add(7 * 24 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(secretKey)
}
func VerifyToken(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return secretKey, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, errors.New("invalid token")
}
Java (jjwt)
import io.jsonwebtoken.*;
import io.jsonwebtoken.security.Keys;
import java.security.Key;
import java.util.Date;
public class JwtUtil {
private static final Key SECRET_KEY = Keys.secretKeyFor(SignatureAlgorithm.HS256);
private static final long ACCESS_TOKEN_EXPIRY = 15 * 60 * 1000; // 15 分钟
private static final long REFRESH_TOKEN_EXPIRY = 7 * 24 * 60 * 60 * 1000; // 7 天
public static String generateAccessToken(Long userId, String username, String role) {
return Jwts.builder()
.setSubject(String.valueOf(userId))
.claim("username", username)
.claim("role", role)
.setIssuedAt(new Date())
.setExpiration(new Date(System.currentTimeMillis() + ACCESS_TOKEN_EXPIRY))
.signWith(SECRET_KEY)
.compact();
}
public static String generateRefreshToken(Long userId) {
return Jwts.builder()
.setSubject(String.valueOf(userId))
.setIssuedAt(new Date())
.setExpiration(new Date(System.currentTimeMillis() + REFRESH_TOKEN_EXPIRY))
.signWith(SECRET_KEY)
.compact();
}
public static Claims verifyToken(String token) {
try {
return Jwts.parserBuilder()
.setSigningKey(SECRET_KEY)
.build()
.parseClaimsJws(token)
.getBody();
} catch (JwtException e) {
return null;
}
}
}
OAuth 2.0
Express.js + Passport
const passport = require('passport');
const GoogleStrategy = require('passport-google-oauth20').Strategy;
const GitHubStrategy = require('passport-github2').Strategy;
// Google OAuth
passport.use(new GoogleStrategy({
clientID: process.env.GOOGLE_CLIENT_ID,
clientSecret: process.env.GOOGLE_CLIENT_SECRET,
callbackURL: '/auth/google/callback'
},
(accessToken, refreshToken, profile, done) => {
// 查找或创建用户
User.findOrCreate({ googleId: profile.id }, (err, user) => {
return done(err, user);
});
}
));
// GitHub OAuth
passport.use(new GitHubStrategy({
clientID: process.env.GITHUB_CLIENT_ID,
clientSecret: process.env.GITHUB_CLIENT_SECRET,
callbackURL: '/auth/github/callback'
},
(accessToken, refreshToken, profile, done) => {
User.findOrCreate({ githubId: profile.id }, (err, user) => {
return done(err, user);
});
}
));
// 路由
app.get('/auth/google', passport.authenticate('google', { scope: ['profile', 'email'] }));
app.get('/auth/google/callback',
passport.authenticate('google', { failureRedirect: '/login' }),
(req, res) => res.redirect('/dashboard')
);
app.get('/auth/github', passport.authenticate('github', { scope: ['user:email'] }));
app.get('/auth/github/callback',
passport.authenticate('github', { failureRedirect: '/login' }),
(req, res) => res.redirect('/dashboard')
);
Spring Boot OAuth2 Client
# application.yml
spring:
security:
oauth2:
client:
registration:
google:
client-id: ${GOOGLE_CLIENT_ID}
client-secret: ${GOOGLE_CLIENT_SECRET}
scope:
- email
- profile
github:
client-id: ${GITHUB_CLIENT_ID}
client-secret: ${GITHUB_CLIENT_SECRET}
scope:
- user:email
@Configuration
@EnableWebSecurity
public class SecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/", "/login", "/error").permitAll()
.anyRequest().authenticated()
)
.oauth2Login(oauth2 -> oauth2
.loginPage("/login")
.defaultSuccessUrl("/dashboard")
)
.logout(logout -> logout
.logoutSuccessUrl("/")
);
return http.build();
}
}
Python (Authlib)
from authlib.integrations.flask_client import OAuth
oauth = OAuth(app)
# 注册 Google
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
# 注册 GitHub
oauth.register(
name='github',
client_id=os.getenv('GITHUB_CLIENT_ID'),
client_secret=os.getenv('GITHUB_CLIENT_SECRET'),
access_token_url='https://github.com/login/oauth/access_token',
authorize_url='https://github.com/login/oauth/authorize',
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'}
)
@app.route('/login/google')
def google_login():
redirect_uri = url_for('google_callback', _external=True)
return oauth.google.authorize_redirect(redirect_uri)
@app.route('/auth/google/callback')
def google_callback():
token = oauth.google.authorize_access_token()
user_info = token.get('userinfo')
# 处理用户信息
return redirect('/dashboard')
@app.route('/login/github')
def github_login():
redirect_uri = url_for('github_callback', _external=True)
return oauth.github.authorize_redirect(redirect_uri)
@app.route('/auth/github/callback')
def github_callback():
token = oauth.github.authorize_access_token()
resp = oauth.github.get('user')
user_info = resp.json()
# 处理用户信息
return redirect('/dashboard')
OpenID Connect
Node.js (openid-client)
const { Issuer, generators } = require('openid-client');
async function setupOIDC() {
// Discovery
const googleIssuer = await Issuer.discover('https://accounts.google.com');
const client = new googleIssuer.Client({
client_id: process.env.GOOGLE_CLIENT_ID,
client_secret: process.env.GOOGLE_CLIENT_SECRET,
redirect_uris: ['http://localhost:3000/auth/callback'],
response_types: ['code']
});
return client;
}
// 登录
app.get('/login', (req, res) => {
const codeVerifier = generators.codeVerifier();
const codeChallenge = generators.codeChallenge(codeVerifier);
const nonce = generators.nonce();
const state = generators.state();
req.session.codeVerifier = codeVerifier;
req.session.nonce = nonce;
req.session.state = state;
const authUrl = client.authorizationUrl({
scope: 'openid email profile',
code_challenge: codeChallenge,
code_challenge_method: 'S256',
nonce: nonce,
state: state
});
res.redirect(authUrl);
});
// 回调
app.get('/auth/callback', async (req, res) => {
const params = client.callbackParams(req);
const tokenSet = await client.callback(
'http://localhost:3000/auth/callback',
params,
{
code_verifier: req.session.codeVerifier,
nonce: req.session.nonce,
state: req.session.state
}
);
const userinfo = await client.userinfo(tokenSet.access_token);
// 验证 ID Token
// tokenSet.claims() 包含 ID Token 的声明
req.session.user = userinfo;
res.redirect('/dashboard');
});
Python (python-social-auth)
# settings.py
AUTHENTICATION_BACKENDS = (
'social_core.backends.google.GoogleOAuth2',
'social_core.backends.github.GithubOAuth2',
)
SOCIAL_AUTH_GOOGLE_OAUTH2_KEY = os.getenv('GOOGLE_CLIENT_ID')
SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET = os.getenv('GOOGLE_CLIENT_SECRET')
SOCIAL_AUTH_GITHUB_KEY = os.getenv('GITHUB_CLIENT_ID')
SOCIAL_AUTH_GITHUB_SECRET = os.getenv('GITHUB_CLIENT_SECRET')
# urls.py
urlpatterns = [
path('auth/', include('social_django.urls', namespace='social')),
]
# 登录链接
# <a href="{% url 'social:begin' 'google-oauth2' %}">Login with Google</a>
# <a href="{% url 'social:begin' 'github' %}">Login with GitHub</a>
密码哈希
bcrypt (Node.js)
const bcrypt = require('bcrypt');
const SALT_ROUNDS = 12;
// 哈希密码
async function hashPassword(password) {
return await bcrypt.hash(password, SALT_ROUNDS);
}
// 验证密码
async function verifyPassword(password, hash) {
return await bcrypt.compare(password, hash);
}
bcrypt (Python)
import bcrypt
def hash_password(password: str) -> str:
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode(), salt).decode()
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
Argon2 (推荐)
// Node.js
const argon2 = require('argon2');
async function hashPassword(password) {
return await argon2.hash(password);
}
async function verifyPassword(password, hash) {
return await argon2.verify(hash, password);
}
# Python
from argon2 import PasswordHasher
ph = PasswordHasher()
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, hashed: str) -> bool:
try:
return ph.verify(hashed, password)
except:
return False
安全头配置
Express.js (helmet)
const helmet = require('helmet');
app.use(helmet());
// 自定义配置
app.use(helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'", "'unsafe-inline'"],
styleSrc: ["'self'", "'unsafe-inline'"],
imgSrc: ["'self'", "data:", "https:"],
}
},
hsts: {
maxAge: 31536000,
includeSubDomains: true,
preload: true
}
}));
Nginx
server {
# 安全头
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'" always;
# 禁用敏感头
server_tokens off;
}
Rate Limiting
Express.js
const rateLimit = require('express-rate-limit');
const loginLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 分钟
max: 5, // 最多 5 次尝试
message: { error: 'Too many login attempts, please try again later' },
standardHeaders: true,
legacyHeaders: false,
});
app.post('/login', loginLimiter, (req, res) => {
// 登录逻辑
});
Redis 滑动窗口
const Redis = require('ioredis');
const redis = new Redis();
async function rateLimit(key, limit, windowSeconds) {
const now = Date.now();
const windowStart = now - windowSeconds * 1000;
const pipeline = redis.pipeline();
pipeline.zremrangebyscore(key, 0, windowStart);
pipeline.zadd(key, now, `${now}:${Math.random()}`);
pipeline.zcard(key);
pipeline.expire(key, windowSeconds);
const results = await pipeline.exec();
const count = results[2][1];
return count <= limit;
}
// 使用
app.use(async (req, res, next) => {
const key = `rate:${req.ip}`;
const allowed = await rateLimit(key, 100, 60); // 每分钟 100 次
if (!allowed) {
return res.status(429).json({ error: 'Rate limit exceeded' });
}
next();
});
CSRF 防护
Express.js (csurf)
const csrf = require('csurf');
const cookieParser = require('cookie-parser');
app.use(cookieParser());
app.use(csrf({ cookie: true }));
// 获取 CSRF Token
app.get('/form', (req, res) => {
res.render('form', { csrfToken: req.csrfToken() });
});
// 表单中包含 token
// <input type="hidden" name="_csrf" value="{{csrfToken}}">
Django
# settings.py
MIDDLEWARE = [
'django.middleware.csrf.CsrfViewMiddleware',
# ...
]
# 模板中
# {% csrf_token %}
常用命令
JWT 调试
# 解码 JWT (不验证签名)
echo "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" | cut -d. -f2 | base64 -d 2>/dev/null | jq .
# 使用 jwt-cli
npm install -g @tsndr/jwt-cli
jwt decode <token>
jwt encode --secret=secret '{"user":"test"}'
OpenSSL
# 生成密钥
openssl rand -base64 32
# 生成 RSA 密钥对
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
# 生成自签名证书
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes
Redis Session
# 查看所有 session
redis-cli KEYS "sess:*"
# 查看特定 session
redis-cli GET "sess:abc123"
# 删除 session
redis-cli DEL "sess:abc123"
# 设置 session 过期时间
redis-cli EXPIRE "sess:abc123" 1800