后台任务
后台任务(Background Tasks)允许你在返回响应之后执行代码。这对于不需要客户端等待的操作非常有用,如发送邮件通知、处理文件、记录日志等。
为什么需要后台任务
在 Web 应用中,某些操作耗时较长但不需要立即返回结果:
- 发送邮件:连接邮件服务器可能需要几秒钟
- 文件处理:上传文件后进行格式转换或分析
- 数据处理:批量处理、报表生成
- 日志记录:写入日志文件或发送到日志服务
如果让客户端等待这些操作完成,会严重影响用户体验。后台任务让 API 立即返回响应,然后在后台异步执行这些操作。
基本使用
导入和声明
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
# 添加后台任务
background_tasks.add_task(send_email, email, "欢迎注册")
# 立即返回响应
return {"message": "通知已发送"}
创建任务函数
任务函数就是普通的 Python 函数:
def send_email(email: str, message: str):
"""发送邮件的后台任务"""
# 模拟发送邮件操作
import time
time.sleep(2) # 模拟耗时操作
with open("emails.log", "a") as f:
f.write(f"发送邮件到 {email}: {message}\n")
任务函数可以是:
- 普通函数
def - 异步函数
async def
FastAPI 会自动正确处理这两种类型。
添加任务
使用 add_task() 方法添加后台任务:
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
# 添加任务
background_tasks.add_task(
send_email, # 任务函数
email, # 位置参数
message="欢迎注册" # 关键字参数
)
return {"message": "通知将在后台发送"}
add_task() 接收:
- 任务函数(必需)
- 位置参数(可选)
- 关键字参数(可选)
完整示例
邮件通知
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel, EmailStr
app = FastAPI()
class EmailRequest(BaseModel):
email: EmailStr
subject: str
body: str
def send_email_task(email: str, subject: str, body: str):
"""后台发送邮件任务"""
# 模拟邮件发送
import time
time.sleep(3) # 模拟网络延迟
# 实际应用中,这里会连接 SMTP 服务器
with open("email.log", "a") as f:
f.write(f"收件人: {email}\n")
f.write(f"主题: {subject}\n")
f.write(f"内容: {body}\n")
f.write("-" * 50 + "\n")
@app.post("/send-email")
async def send_email(
request: EmailRequest,
background_tasks: BackgroundTasks
):
"""发送邮件接口"""
# 添加后台任务
background_tasks.add_task(
send_email_task,
email=request.email,
subject=request.subject,
body=request.body
)
return {
"message": "邮件正在发送中",
"email": request.email
}
文件处理
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
import shutil
from pathlib import Path
app = FastAPI()
def process_file_task(file_path: Path, original_name: str):
"""后台处理文件任务"""
# 模拟文件处理
import time
time.sleep(5)
# 实际应用中可能包括:
# - 图片压缩
# - 格式转换
# - 内容分析
# - 数据导入
processed_path = file_path.with_suffix(".processed")
with open(file_path, "r") as src, open(processed_path, "w") as dst:
content = src.read()
# 简单的处理:添加标记
dst.write(f"处理后的文件: {original_name}\n")
dst.write(content)
# 清理原始文件
file_path.unlink()
@app.post("/upload")
async def upload_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
"""上传文件并后台处理"""
# 保存上传的文件
file_path = Path("uploads") / file.filename
file_path.parent.mkdir(exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 添加后台处理任务
background_tasks.add_task(
process_file_task,
file_path=file_path,
original_name=file.filename
)
return {
"message": "文件已上传,正在后台处理",
"filename": file.filename
}
与依赖注入结合
BackgroundTasks 可以在依赖注入的多个层级中使用,FastAPI 会自动合并所有任务:
from typing import Annotated
from fastapi import FastAPI, BackgroundTasks, Depends
app = FastAPI()
def write_log(message: str):
"""写入日志"""
with open("app.log", "a") as f:
f.write(f"{message}\n")
def get_query_logger(
background_tasks: BackgroundTasks,
q: str | None = None
):
"""依赖:记录查询参数"""
if q:
background_tasks.add_task(
write_log,
f"查询参数: {q}"
)
return q
@app.post("/process/{item_id}")
async def process_item(
item_id: int,
background_tasks: BackgroundTasks,
q: Annotated[str | None, Depends(get_query_logger)] = None
):
"""处理项目"""
# 添加路径操作函数中的后台任务
background_tasks.add_task(
write_log,
f"处理项目 ID: {item_id}"
)
# 响应返回后,两个任务都会执行:
# 1. 来自依赖的日志记录(如果有查询参数)
# 2. 来自路径操作函数的日志记录
return {
"item_id": item_id,
"query": q,
"message": "处理中"
}
任务执行顺序:
- 路径操作函数返回响应
- 执行依赖中的后台任务
- 执行路径操作函数中的后台任务
多个后台任务
可以添加多个后台任务:
@app.post("/register")
async def register_user(
user: UserCreate,
background_tasks: BackgroundTasks
):
# 任务1:发送欢迎邮件
background_tasks.add_task(
send_welcome_email,
email=user.email,
name=user.name
)
# 任务2:创建用户目录
background_tasks.add_task(
create_user_directory,
user_id=user.id
)
# 任务3:发送管理员通知
background_tasks.add_task(
notify_admin,
message=f"新用户注册: {user.email}"
)
return {"message": "注册成功"}
所有任务按添加顺序依次执行。
异步任务函数
任务函数可以是异步函数:
import aiofiles
async def async_write_log(message: str):
"""异步写入日志"""
async with aiofiles.open("app.log", "a") as f:
await f.write(f"{message}\n")
@app.post("/log")
async def log_message(
message: str,
background_tasks: BackgroundTasks
):
# 异步任务函数也能正常工作
background_tasks.add_task(async_write_log, message)
return {"message": "日志已记录"}
实际应用场景
发送通知邮件
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
app = FastAPI()
class Notification(BaseModel):
email: EmailStr
title: str
content: str
def send_notification_email(
to_email: str,
title: str,
content: str,
smtp_config: dict
):
"""后台发送通知邮件"""
try:
msg = MIMEMultipart()
msg['From'] = smtp_config['from_email']
msg['To'] = to_email
msg['Subject'] = title
msg.attach(MIMEText(content, 'html'))
with smtplib.SMTP(
smtp_config['host'],
smtp_config['port']
) as server:
server.starttls()
server.login(
smtp_config['username'],
smtp_config['password']
)
server.send_message(msg)
except Exception as e:
# 记录错误但不影响主流程
with open("email_errors.log", "a") as f:
f.write(f"发送失败: {to_email}, 错误: {e}\n")
@app.post("/notify")
async def send_notification(
notification: Notification,
background_tasks: BackgroundTasks
):
"""发送通知接口"""
smtp_config = {
'host': 'smtp.example.com',
'port': 587,
'username': 'your_username',
'password': 'your_password',
'from_email': '[email protected]'
}
background_tasks.add_task(
send_notification_email,
to_email=notification.email,
title=notification.title,
content=notification.content,
smtp_config=smtp_config
)
return {
"message": "通知已发送",
"email": notification.email
}
数据导出
from fastapi import FastAPI, BackgroundTasks
import csv
from io import StringIO
app = FastAPI()
def export_data_task(query_params: dict, output_path: str):
"""后台导出数据"""
# 模拟数据库查询
data = [
{"id": 1, "name": "商品A", "price": 99.9},
{"id": 2, "name": "商品B", "price": 199.9},
{"id": 3, "name": "商品C", "price": 299.9},
]
# 写入 CSV 文件
with open(output_path, "w", newline="", encoding="utf-8") as f:
if data:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
@app.post("/export")
async def export_data(
background_tasks: BackgroundTasks,
format: str = "csv"
):
"""触发数据导出"""
import uuid
export_id = str(uuid.uuid4())
output_path = f"exports/data_{export_id}.csv"
background_tasks.add_task(
export_data_task,
query_params={},
output_path=output_path
)
return {
"message": "导出任务已启动",
"export_id": export_id
}
清理任务
from fastapi import FastAPI, BackgroundTasks
from datetime import datetime
import os
app = FastAPI()
def cleanup_old_files(directory: str, days: int):
"""清理旧文件"""
now = datetime.now()
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
file_time = datetime.fromtimestamp(
os.path.getmtime(filepath)
)
age = (now - file_time).days
if age > days:
os.remove(filepath)
print(f"已删除: {filepath}")
@app.post("/cleanup")
async def trigger_cleanup(
background_tasks: BackgroundTasks,
directory: str = "temp",
days: int = 7
):
"""触发清理任务"""
background_tasks.add_task(
cleanup_old_files,
directory=directory,
days=days
)
return {"message": f"清理任务已启动,将删除 {days} 天前的文件"}
后台任务 vs Celery
何时使用后台任务
FastAPI 的 BackgroundTasks 适合:
- 轻量级异步操作
- 需要访问应用上下文
- 简单的邮件发送、日志记录
- 单进程应用
何时使用 Celery
对于更复杂的需求,考虑使用 Celery:
- 需要多个工作进程处理任务
- 需要任务队列管理
- 需要任务重试和失败处理
- 需要定时任务
- 需要分布式处理
- 任务执行时间很长
Celery 示例:
# tasks.py
from celery import Celery
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@celery_app.task
def heavy_computation(data):
"""重量级计算任务"""
import time
time.sleep(60) # 模拟长时间处理
return {"result": "computed"}
# main.py
from fastapi import FastAPI
from tasks import heavy_computation
app = FastAPI()
@app.post("/compute")
async def compute(data: dict):
# 提交任务到 Celery 队列
task = heavy_computation.delay(data)
return {"task_id": task.id}
注意事项
任务失败不影响响应
后台任务的错误不会影响已返回的响应:
def failing_task():
raise Exception("任务失败")
@app.post("/test")
async def test(background_tasks: BackgroundTasks):
background_tasks.add_task(failing_task)
# 即使任务失败,这个响应已经返回
return {"message": "成功"} # 客户端会收到这个响应
不要在任务中修改响应
后台任务在响应返回后执行,无法修改响应内容:
# 错误示范
def task_modify_response(response):
response.body = "修改后的内容" # 太晚了,响应已发送
@app.post("/test")
async def test(response: Response, background_tasks: BackgroundTasks):
background_tasks.add_task(task_modify_response, response)
return {"message": "原始内容"}
数据库事务
确保数据库操作在任务中正确处理:
# 错误:请求的数据库会话在响应后关闭
@app.post("/items")
async def create_item(
item: Item,
session: Session = Depends(get_session), # 这个会话会关闭
background_tasks: BackgroundTasks = BackgroundTasks
):
background_tasks.add_task(process_item, session, item) # 危险!
return item
# 正确:在任务中创建新的数据库会话
def process_item(item_data: dict):
# 在任务中创建新的会话
with Session(engine) as session:
item = Item(**item_data)
session.add(item)
session.commit()
小结
本章我们学习了:
- 基本使用:使用
BackgroundTasks添加后台任务 - 任务函数:定义同步或异步任务函数
- 依赖注入:在多层级中使用后台任务
- 实际应用:邮件通知、文件处理、数据导出
- 与 Celery 对比:选择合适的异步方案
后台任务适用场景:
- 发送邮件通知
- 记录日志
- 文件处理
- 轻量级数据处理
Celery 适用场景:
- 分布式任务处理
- 定时任务
- 重任务队列管理
- 需要重试机制
练习
- 实现用户注册后发送欢迎邮件的后台任务
- 创建文件上传接口,后台处理文件并保存处理结果
- 实现后台日志记录功能,记录所有 API 请求
- 比较后台任务和 Celery 的性能差异(处理 100 个并发任务)