跳到主要内容

后台任务

后台任务(Background Tasks)允许你在返回响应之后执行代码。这对于不需要客户端等待的操作非常有用,如发送邮件通知、处理文件、记录日志等。

为什么需要后台任务

在 Web 应用中,某些操作耗时较长但不需要立即返回结果:

  • 发送邮件:连接邮件服务器可能需要几秒钟
  • 文件处理:上传文件后进行格式转换或分析
  • 数据处理:批量处理、报表生成
  • 日志记录:写入日志文件或发送到日志服务

如果让客户端等待这些操作完成,会严重影响用户体验。后台任务让 API 立即返回响应,然后在后台异步执行这些操作。

基本使用

导入和声明

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
# 添加后台任务
background_tasks.add_task(send_email, email, "欢迎注册")

# 立即返回响应
return {"message": "通知已发送"}

创建任务函数

任务函数就是普通的 Python 函数:

def send_email(email: str, message: str):
"""发送邮件的后台任务"""
# 模拟发送邮件操作
import time
time.sleep(2) # 模拟耗时操作

with open("emails.log", "a") as f:
f.write(f"发送邮件到 {email}: {message}\n")

任务函数可以是:

  • 普通函数 def
  • 异步函数 async def

FastAPI 会自动正确处理这两种类型。

添加任务

使用 add_task() 方法添加后台任务:

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
# 添加任务
background_tasks.add_task(
send_email, # 任务函数
email, # 位置参数
message="欢迎注册" # 关键字参数
)

return {"message": "通知将在后台发送"}

add_task() 接收:

  • 任务函数(必需)
  • 位置参数(可选)
  • 关键字参数(可选)

完整示例

邮件通知

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel, EmailStr

app = FastAPI()

class EmailRequest(BaseModel):
email: EmailStr
subject: str
body: str

def send_email_task(email: str, subject: str, body: str):
"""后台发送邮件任务"""
# 模拟邮件发送
import time
time.sleep(3) # 模拟网络延迟

# 实际应用中,这里会连接 SMTP 服务器
with open("email.log", "a") as f:
f.write(f"收件人: {email}\n")
f.write(f"主题: {subject}\n")
f.write(f"内容: {body}\n")
f.write("-" * 50 + "\n")

@app.post("/send-email")
async def send_email(
request: EmailRequest,
background_tasks: BackgroundTasks
):
"""发送邮件接口"""
# 添加后台任务
background_tasks.add_task(
send_email_task,
email=request.email,
subject=request.subject,
body=request.body
)

return {
"message": "邮件正在发送中",
"email": request.email
}

文件处理

from fastapi import FastAPI, BackgroundTasks, UploadFile, File
import shutil
from pathlib import Path

app = FastAPI()

def process_file_task(file_path: Path, original_name: str):
"""后台处理文件任务"""
# 模拟文件处理
import time
time.sleep(5)

# 实际应用中可能包括:
# - 图片压缩
# - 格式转换
# - 内容分析
# - 数据导入

processed_path = file_path.with_suffix(".processed")

with open(file_path, "r") as src, open(processed_path, "w") as dst:
content = src.read()
# 简单的处理:添加标记
dst.write(f"处理后的文件: {original_name}\n")
dst.write(content)

# 清理原始文件
file_path.unlink()

@app.post("/upload")
async def upload_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
"""上传文件并后台处理"""
# 保存上传的文件
file_path = Path("uploads") / file.filename
file_path.parent.mkdir(exist_ok=True)

with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)

# 添加后台处理任务
background_tasks.add_task(
process_file_task,
file_path=file_path,
original_name=file.filename
)

return {
"message": "文件已上传,正在后台处理",
"filename": file.filename
}

与依赖注入结合

BackgroundTasks 可以在依赖注入的多个层级中使用,FastAPI 会自动合并所有任务:

from typing import Annotated
from fastapi import FastAPI, BackgroundTasks, Depends

app = FastAPI()

def write_log(message: str):
"""写入日志"""
with open("app.log", "a") as f:
f.write(f"{message}\n")

def get_query_logger(
background_tasks: BackgroundTasks,
q: str | None = None
):
"""依赖:记录查询参数"""
if q:
background_tasks.add_task(
write_log,
f"查询参数: {q}"
)
return q

@app.post("/process/{item_id}")
async def process_item(
item_id: int,
background_tasks: BackgroundTasks,
q: Annotated[str | None, Depends(get_query_logger)] = None
):
"""处理项目"""
# 添加路径操作函数中的后台任务
background_tasks.add_task(
write_log,
f"处理项目 ID: {item_id}"
)

# 响应返回后,两个任务都会执行:
# 1. 来自依赖的日志记录(如果有查询参数)
# 2. 来自路径操作函数的日志记录

return {
"item_id": item_id,
"query": q,
"message": "处理中"
}

任务执行顺序:

  1. 路径操作函数返回响应
  2. 执行依赖中的后台任务
  3. 执行路径操作函数中的后台任务

多个后台任务

可以添加多个后台任务:

@app.post("/register")
async def register_user(
user: UserCreate,
background_tasks: BackgroundTasks
):
# 任务1:发送欢迎邮件
background_tasks.add_task(
send_welcome_email,
email=user.email,
name=user.name
)

# 任务2:创建用户目录
background_tasks.add_task(
create_user_directory,
user_id=user.id
)

# 任务3:发送管理员通知
background_tasks.add_task(
notify_admin,
message=f"新用户注册: {user.email}"
)

return {"message": "注册成功"}

所有任务按添加顺序依次执行。

异步任务函数

任务函数可以是异步函数:

import aiofiles

async def async_write_log(message: str):
"""异步写入日志"""
async with aiofiles.open("app.log", "a") as f:
await f.write(f"{message}\n")

@app.post("/log")
async def log_message(
message: str,
background_tasks: BackgroundTasks
):
# 异步任务函数也能正常工作
background_tasks.add_task(async_write_log, message)
return {"message": "日志已记录"}

实际应用场景

发送通知邮件

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

app = FastAPI()

class Notification(BaseModel):
email: EmailStr
title: str
content: str

def send_notification_email(
to_email: str,
title: str,
content: str,
smtp_config: dict
):
"""后台发送通知邮件"""
try:
msg = MIMEMultipart()
msg['From'] = smtp_config['from_email']
msg['To'] = to_email
msg['Subject'] = title
msg.attach(MIMEText(content, 'html'))

with smtplib.SMTP(
smtp_config['host'],
smtp_config['port']
) as server:
server.starttls()
server.login(
smtp_config['username'],
smtp_config['password']
)
server.send_message(msg)

except Exception as e:
# 记录错误但不影响主流程
with open("email_errors.log", "a") as f:
f.write(f"发送失败: {to_email}, 错误: {e}\n")

@app.post("/notify")
async def send_notification(
notification: Notification,
background_tasks: BackgroundTasks
):
"""发送通知接口"""
smtp_config = {
'host': 'smtp.example.com',
'port': 587,
'username': 'your_username',
'password': 'your_password',
'from_email': '[email protected]'
}

background_tasks.add_task(
send_notification_email,
to_email=notification.email,
title=notification.title,
content=notification.content,
smtp_config=smtp_config
)

return {
"message": "通知已发送",
"email": notification.email
}

数据导出

from fastapi import FastAPI, BackgroundTasks
import csv
from io import StringIO

app = FastAPI()

def export_data_task(query_params: dict, output_path: str):
"""后台导出数据"""
# 模拟数据库查询
data = [
{"id": 1, "name": "商品A", "price": 99.9},
{"id": 2, "name": "商品B", "price": 199.9},
{"id": 3, "name": "商品C", "price": 299.9},
]

# 写入 CSV 文件
with open(output_path, "w", newline="", encoding="utf-8") as f:
if data:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)

@app.post("/export")
async def export_data(
background_tasks: BackgroundTasks,
format: str = "csv"
):
"""触发数据导出"""
import uuid
export_id = str(uuid.uuid4())
output_path = f"exports/data_{export_id}.csv"

background_tasks.add_task(
export_data_task,
query_params={},
output_path=output_path
)

return {
"message": "导出任务已启动",
"export_id": export_id
}

清理任务

from fastapi import FastAPI, BackgroundTasks
from datetime import datetime
import os

app = FastAPI()

def cleanup_old_files(directory: str, days: int):
"""清理旧文件"""
now = datetime.now()

for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)

if os.path.isfile(filepath):
file_time = datetime.fromtimestamp(
os.path.getmtime(filepath)
)
age = (now - file_time).days

if age > days:
os.remove(filepath)
print(f"已删除: {filepath}")

@app.post("/cleanup")
async def trigger_cleanup(
background_tasks: BackgroundTasks,
directory: str = "temp",
days: int = 7
):
"""触发清理任务"""
background_tasks.add_task(
cleanup_old_files,
directory=directory,
days=days
)

return {"message": f"清理任务已启动,将删除 {days} 天前的文件"}

后台任务 vs Celery

何时使用后台任务

FastAPI 的 BackgroundTasks 适合:

  • 轻量级异步操作
  • 需要访问应用上下文
  • 简单的邮件发送、日志记录
  • 单进程应用

何时使用 Celery

对于更复杂的需求,考虑使用 Celery:

  • 需要多个工作进程处理任务
  • 需要任务队列管理
  • 需要任务重试失败处理
  • 需要定时任务
  • 需要分布式处理
  • 任务执行时间很长

Celery 示例:

# tasks.py
from celery import Celery

celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)

@celery_app.task
def heavy_computation(data):
"""重量级计算任务"""
import time
time.sleep(60) # 模拟长时间处理
return {"result": "computed"}

# main.py
from fastapi import FastAPI
from tasks import heavy_computation

app = FastAPI()

@app.post("/compute")
async def compute(data: dict):
# 提交任务到 Celery 队列
task = heavy_computation.delay(data)
return {"task_id": task.id}

注意事项

任务失败不影响响应

后台任务的错误不会影响已返回的响应:

def failing_task():
raise Exception("任务失败")

@app.post("/test")
async def test(background_tasks: BackgroundTasks):
background_tasks.add_task(failing_task)
# 即使任务失败,这个响应已经返回
return {"message": "成功"} # 客户端会收到这个响应

不要在任务中修改响应

后台任务在响应返回后执行,无法修改响应内容:

# 错误示范
def task_modify_response(response):
response.body = "修改后的内容" # 太晚了,响应已发送

@app.post("/test")
async def test(response: Response, background_tasks: BackgroundTasks):
background_tasks.add_task(task_modify_response, response)
return {"message": "原始内容"}

数据库事务

确保数据库操作在任务中正确处理:

# 错误:请求的数据库会话在响应后关闭
@app.post("/items")
async def create_item(
item: Item,
session: Session = Depends(get_session), # 这个会话会关闭
background_tasks: BackgroundTasks = BackgroundTasks
):
background_tasks.add_task(process_item, session, item) # 危险!
return item

# 正确:在任务中创建新的数据库会话
def process_item(item_data: dict):
# 在任务中创建新的会话
with Session(engine) as session:
item = Item(**item_data)
session.add(item)
session.commit()

小结

本章我们学习了:

  1. 基本使用:使用 BackgroundTasks 添加后台任务
  2. 任务函数:定义同步或异步任务函数
  3. 依赖注入:在多层级中使用后台任务
  4. 实际应用:邮件通知、文件处理、数据导出
  5. 与 Celery 对比:选择合适的异步方案

后台任务适用场景:

  • 发送邮件通知
  • 记录日志
  • 文件处理
  • 轻量级数据处理

Celery 适用场景:

  • 分布式任务处理
  • 定时任务
  • 重任务队列管理
  • 需要重试机制

练习

  1. 实现用户注册后发送欢迎邮件的后台任务
  2. 创建文件上传接口,后台处理文件并保存处理结果
  3. 实现后台日志记录功能,记录所有 API 请求
  4. 比较后台任务和 Celery 的性能差异(处理 100 个并发任务)