跳到主要内容

安全认证

安全认证是 Web 应用的核心功能之一。FastAPI 提供了多种安全工具,支持 OAuth2、JWT、API Key 等多种认证方式,并且这些功能都基于标准规范。

安全概念

OAuth2

OAuth2 是一个授权框架,定义了多种授权方式(称为"流")。FastAPI 支持:

  • 密码流(Password Flow):适用于同一组织的用户认证
  • 授权码流(Authorization Code Flow):第三方应用授权
  • 客户端凭据流(Client Credentials Flow):服务间认证
  • 隐式流(Implicit Flow):已不推荐使用

JWT

JWT(JSON Web Token)是一种开放标准,用于在各方之间安全传输信息。JWT 由三部分组成:

header.payload.signature
  • Header:令牌类型和签名算法
  • Payload:用户数据(声明)
  • Signature:签名,确保数据未被篡改

OpenAPI 安全方案

OpenAPI 定义了几种安全方案:

方案说明
apiKeyAPI 密钥,可在查询参数、Header 或 Cookie 中
httpHTTP 认证(Basic、Bearer 等)
oauth2OAuth2 认证
openIdConnectOpenID Connect 发现

OAuth2 密码流

这是最常用的认证方式,适合用户使用用户名密码登录的应用。

基本实现

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

# OAuth2 密码流
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟用户数据库
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
"disabled": False,
}
}

def fake_hash_password(password: str):
return "fakehashed" + password

class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None

class UserInDB(User):
hashed_password: str

def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)

def fake_decode_token(token):
# 实际应用中应该验证 JWT
user = get_user(fake_users_db, token)
return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
return user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""登录获取 token"""
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="用户名或密码错误")

user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="用户名或密码错误")

return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
"""获取当前用户信息"""
return current_user

OAuth2PasswordRequestForm

OAuth2PasswordRequestForm 是一个表单类,包含:

字段说明
username用户名
password密码
scope权限范围(空格分隔)
client_id客户端 ID(可选)
client_secret客户端密钥(可选)

OAuth2PasswordBearer

OAuth2PasswordBearer 会:

  • 从请求头 Authorization: Bearer <token> 获取 token
  • 如果没有提供 token,返回 401 错误
  • 自动在 OpenAPI 文档中添加认证功能

JWT 实现

实际应用中应该使用 JWT 来生成和验证 token。

安装依赖

pip install python-jose[cryptography] passlib[bcrypt]

JWT 工具函数

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext

# 配置
SECRET_KEY = "your-secret-key-keep-it-secret" # 生产环境应使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 密码哈希
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建 JWT token"""
to_encode = data.copy()

if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)

to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

def decode_token(token: str) -> Optional[dict]:
"""解码 JWT token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None

完整认证系统

from datetime import datetime, timedelta
from typing import Annotated

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# 配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

app = FastAPI()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模型
class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: str | None = None

class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool = False

class UserInDB(User):
hashed_password: str

# 数据库操作
def get_user(username: str):
# 实际应用中从数据库获取
pass

def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user

# 依赖
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)

payload = decode_token(token)
if payload is None:
raise credentials_exception

username: str = payload.get("sub")
if username is None:
raise credentials_exception

token_data = TokenData(username=username)

user = get_user(username=token_data.username)
if user is None:
raise credentials_exception

return user

async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户已禁用")
return current_user

# 路由
@app.post("/token", response_model=Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
"""登录获取 token"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)

access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)

return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""获取当前用户"""
return current_user

@app.get("/users/me/items")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""获取当前用户的资源"""
return [{"item_id": "Foo", "owner": current_user.username}]

OAuth2 作用域

OAuth2 作用域用于限制 token 的权限范围。

定义作用域

from fastapi import FastAPI, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"read": "读取权限",
"write": "写入权限",
"admin": "管理员权限",
}
)

在登录时设置作用域

@app.post("/token")
async def login_for_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 验证用户
user = authenticate_user(form_data.username, form_data.password)

# 检查请求的作用域是否被允许
if form_data.scopes:
# 验证用户是否有这些作用域权限
pass

# 创建带有作用域的 token
access_token = create_access_token(
data={
"sub": user.username,
"scopes": form_data.scopes
}
)

return {"access_token": access_token, "token_type": "bearer"}

验证作用域

from fastapi import Security, HTTPException
from fastapi.security import SecurityScopes

async def get_current_user(
security_scopes: SecurityScopes,
token: Annotated[str, Depends(oauth2_scheme)]
):
payload = decode_token(token)

# 获取 token 中的作用域
token_scopes = payload.get("scopes", [])

# 验证是否具有所需作用域
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"权限不足,需要 {scope} 作用域"
)

return get_user(payload.get("sub"))

# 使用 Security 声明所需作用域
@app.get("/users/me")
async def read_users_me(
current_user: User = Security(get_current_user, scopes=["read"])
):
return current_user

@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Security(get_current_user, scopes=["admin"])
):
return {"deleted": user_id}

API Key 认证

适用于服务间调用或公开 API。

Header API Key

from fastapi import FastAPI, Security, HTTPException
from fastapi.security import APIKeyHeader

app = FastAPI()

api_key_header = APIKeyHeader(name="X-API-Key")

async def get_api_key(api_key: str = Security(api_key_header)):
if api_key != "your-secret-api-key":
raise HTTPException(status_code=403, detail="无效的 API Key")
return api_key

@app.get("/protected")
async def protected_route(api_key: str = Depends(get_api_key)):
return {"message": "访问成功"}

Query API Key

from fastapi.security import APIKeyQuery

api_key_query = APIKeyQuery(name="api_key")

async def get_api_key(api_key: str = Security(api_key_query)):
if api_key != "your-secret-api-key":
raise HTTPException(status_code=403, detail="无效的 API Key")
return api_key
from fastapi.security import APIKeyCookie

api_key_cookie = APIKeyCookie(name="session_id")

async def get_api_key(api_key: str = Security(api_key_cookie)):
if api_key != "your-secret-api-key":
raise HTTPException(status_code=403, detail="无效的 API Key")
return api_key

HTTP Basic 认证

最简单的认证方式:

from fastapi import FastAPI, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets

app = FastAPI()
security = HTTPBasic()

def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
correct_username = secrets.compare_digest(credentials.username, "admin")
correct_password = secrets.compare_digest(credentials.password, "secret")

if not (correct_username and correct_password):
raise HTTPException(
status_code=401,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Basic"},
)

return credentials.username

@app.get("/users/me")
def read_current_user(username: str = Depends(get_current_user)):
return {"username": username}

安全最佳实践

密码存储

from passlib.context import CryptContext

pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # 增加计算成本
)

# 存储密码时
hashed = pwd_context.hash(password)

# 验证密码时
pwd_context.verify(password, hashed)

JWT 安全

import os
from datetime import timedelta

# 使用强密钥
SECRET_KEY = os.getenv("SECRET_KEY", os.urandom(32).hex())

# 设置合理的过期时间
ACCESS_TOKEN_EXPIRE = timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)

# 使用安全的算法
ALGORITHM = "HS256" # 或 RS256(非对称)

HTTPS

生产环境必须使用 HTTPS:

from fastapi import FastAPI
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app = FastAPI()
app.add_middleware(HTTPSRedirectMiddleware)

CORS 配置

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # 限制来源
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)

敏感信息保护

# 用户模型:输出时排除敏感字段
class User(BaseModel):
username: str
email: EmailStr

class UserCreate(BaseModel):
username: str
email: EmailStr
password: str # 只在输入时使用

@app.post("/users", response_model=User)
async def create_user(user: UserCreate):
# 密码不会出现在响应中
return user

完整示例

from datetime import datetime, timedelta
from typing import Annotated, Optional

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, EmailStr

# 配置
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

app = FastAPI(
title="安全认证示例",
description="FastAPI OAuth2 + JWT 认证示例"
)

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模型
class User(BaseModel):
username: str
email: EmailStr
full_name: Optional[str] = None
disabled: bool = False

class UserCreate(User):
password: str

class UserInDB(User):
hashed_password: str

class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: Optional[str] = None

# 工具函数
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)

def hash_password(password: str) -> str:
return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

# 数据库模拟
fake_db: dict[str, UserInDB] = {}

# 依赖
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception

user = fake_db.get(username)
if user is None:
raise credentials_exception
return user

async def get_active_user(current_user: Annotated[User, Depends(get_current_user)]):
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户已禁用")
return current_user

# 路由
@app.post("/register", response_model=User)
async def register(user: UserCreate):
if user.username in fake_db:
raise HTTPException(status_code=400, detail="用户名已存在")

hashed = hash_password(user.password)
user_db = UserInDB(**user.model_dump(exclude={"password"}), hashed_password=hashed)
fake_db[user.username] = user_db
return user_db

@app.post("/token", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = fake_db.get(form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)

access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: Annotated[User, Depends(get_active_user)]):
return current_user

小结

本章我们学习了:

  1. OAuth2 密码流:用户名密码登录获取 token
  2. JWT:生成和验证 JWT token
  3. 作用域:限制 token 的权限范围
  4. API Key:服务间认证
  5. HTTP Basic:最简单的认证方式
  6. 安全最佳实践:密码存储、HTTPS、CORS

FastAPI 安全功能的优势:

  • 基于标准(OAuth2、OpenAPI)
  • 自动生成交互式文档
  • 支持多种认证方式
  • 与依赖注入无缝集成

练习

  1. 实现用户注册和登录接口,使用 bcrypt 存储密码
  2. 创建 JWT 认证系统,token 有效期为 30 分钟
  3. 实现基于角色的权限控制(管理员、普通用户)
  4. 添加 API Key 认证,支持多个有效的 API Key