跳到主要内容

Flask - Web 开发框架

Flask 是一个轻量级的 Python Web 开发框架,它简洁而灵活,被称为"微框架",因为它保持核心简单但可扩展。Flask 非常适合构建小型应用、API 服务和原型开发。

什么是 Flask?

Flask 是一个用 Python 编写的轻量级 Web 应用框架,主要特点包括:

  • 轻量级:核心简单,只包含必要的功能
  • 灵活:可以根据需要选择和集成扩展
  • 易上手:学习曲线平缓,适合初学者
  • 文档完善:官方文档详细,社区活跃
  • RESTful:天然支持 RESTful API 开发

安装 Flask

# 使用 pip 安装
pip install flask

# 安装常用扩展
pip install flask-sqlalchemy flask-login flask-wtf flask-migrate

快速开始

最小 Flask 应用

from flask import Flask

# 创建应用实例
app = Flask(__name__)

# 定义路由
@app.route('/')
def hello():
return 'Hello, Flask!'

# 运行应用
if __name__ == '__main__':
app.run(debug=True)

运行这个脚本:

python app.py

访问 http://127.0.0.1:5000/ 即可看到 "Hello, Flask!"。

应用结构

myapp/
├── app.py # 应用入口
├── config.py # 配置文件
├── requirements.txt # 依赖列表
├── static/ # 静态文件(CSS、JS、图片)
│ ├── css/
│ ├── js/
│ └── images/
├── templates/ # HTML 模板
│ ├── base.html
│ ├── index.html
│ └── about.html
└── models.py # 数据模型

路由和视图函数

基本路由

from flask import Flask

app = Flask(__name__)

# 基本路由
@app.route('/')
def index():
return '首页'

# 带路径的路由
@app.route('/about')
def about():
return '关于我们'

# 动态路由
@app.route('/user/<username>')
def show_user(username):
return f'用户: {username}'

# 指定类型
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'文章 ID: {post_id}'

# 多个参数
@app.route('/user/<username>/post/<int:post_id>')
def show_user_post(username, post_id):
return f'用户 {username} 的文章 {post_id}'

路由方法

from flask import Flask, request

app = Flask(__name__)

# GET 请求(默认)
@app.route('/get')
def get_example():
return 'GET 请求'

# POST 请求
@app.route('/post', methods=['POST'])
def post_example():
return 'POST 请求'

# 多种方法
@app.route('/item', methods=['GET', 'POST', 'PUT', 'DELETE'])
def item():
if request.method == 'GET':
return '获取项目'
elif request.method == 'POST':
return '创建项目'
elif request.method == 'PUT':
return '更新项目'
elif request.method == 'DELETE':
return '删除项目'

URL 构建

from flask import Flask, url_for

app = Flask(__name__)

@app.route('/')
def index():
return '首页'

@app.route('/user/<username>')
def profile(username):
return f'{username} 的个人资料'

@app.route('/login')
def login():
return '登录页面'

# 使用 url_for 构建 URL
with app.test_request_context():
print(url_for('index')) # /
print(url_for('login')) # /login
print(url_for('login', next='/')) # /login?next=/
print(url_for('profile', username='张三')) # /user/%E5%BC%A0%E4%B8%89

请求和响应

请求对象

from flask import Flask, request

app = Flask(__name__)

@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
# 获取表单数据
username = request.form.get('username')
password = request.form.get('password')

# 获取 JSON 数据
data = request.get_json()

# 获取查询参数
next_page = request.args.get('next', '/')

# 获取请求头
user_agent = request.headers.get('User-Agent')

# 获取文件
file = request.files.get('file')

return f'登录用户: {username}'

return '''
<form method="post">
<input type="text" name="username" placeholder="用户名">
<input type="password" name="password" placeholder="密码">
<button type="submit">登录</button>
</form>
'''

响应对象

from flask import Flask, make_response, jsonify, redirect, abort

app = Flask(__name__)

# 返回字符串
@app.route('/string')
def return_string():
return 'Hello, World!'

# 返回 JSON
@app.route('/json')
def return_json():
return jsonify({
'name': '张三',
'age': 25,
'items': [1, 2, 3]
})

# 自定义响应
@app.route('/custom')
def custom_response():
response = make_response('自定义响应', 200)
response.headers['X-Custom-Header'] = 'Value'
response.set_cookie('session', 'abc123')
return response

# 重定向
@app.route('/redirect')
def redirect_example():
return redirect('/login')

# 返回错误
@app.route('/not-found')
def not_found():
abort(404)

模板渲染

from flask import Flask, render_template

app = Flask(__name__)

@app.route('/hello/<name>')
def hello(name):
return render_template('hello.html', name=name, items=[1, 2, 3])

模板文件 templates/hello.html

<!DOCTYPE html>
<html>
<head>
<title>Hello</title>
</head>
<body>
<h1>Hello, {{ name }}!</h1>

<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>

{% if name == 'Admin' %}
<p>欢迎管理员!</p>
{% else %}
<p>欢迎普通用户!</p>
{% endif %}
</body>
</html>

模板引擎(Jinja2)

基础语法

<!-- 变量 -->
<p>{{ username }}</p>
<p>{{ user.name }}</p>
<p>{{ items[0] }}</p>

<!-- 过滤器 -->
<p>{{ name|upper }}</p>
<p>{{ price|round(2) }}</p>
<p>{{ html_content|safe }}</p>
<p>{{ description|truncate(100) }}</p>

<!-- 控制结构 -->
{% if user.is_logged_in %}
<p>欢迎, {{ user.name }}</p>
{% else %}
<p>请登录</p>
{% endif %}

{% for item in items %}
<li>{{ loop.index }}. {{ item.name }}</li>
{% else %}
<li>没有项目</li>
{% endfor %}

<!-- 宏 -->
{% macro input(name, value='', type='text') %}
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
{% endmacro %}

{{ input('username') }}
{{ input('password', type='password') }}

模板继承

基础模板 templates/base.html

<!DOCTYPE html>
<html>
<head>
<title>{% block title %}默认标题{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav>
<a href="{{ url_for('index') }}">首页</a>
<a href="{{ url_for('about') }}">关于</a>
</nav>

<main>
{% block content %}{% endblock %}
</main>

<footer>
{% block footer %}
<p>&copy; 2024 我的网站</p>
{% endblock %}
</footer>
</body>
</html>

子模板 templates/index.html

{% extends "base.html" %}

{% block title %}首页 - 我的网站{% endblock %}

{% block content %}
<h1>欢迎来到首页</h1>
<p>这是首页内容。</p>
{% endblock %}

静态文件

from flask import Flask, url_for

app = Flask(__name__)

# 静态文件默认放在 static 目录
# 访问: /static/css/style.css

在模板中引用静态文件:

<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
<script src="{{ url_for('static', filename='js/app.js') }}"></script>
<img src="{{ url_for('static', filename='images/logo.png') }}" alt="Logo">

数据库集成(SQLAlchemy)

配置数据库

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

# 定义模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
posts = db.relationship('Post', backref='author', lazy=True)

def __repr__(self):
return f'<User {self.username}>'

class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.Text, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

def __repr__(self):
return f'<Post {self.title}>'

# 创建数据库
with app.app_context():
db.create_all()

数据库操作

from flask import Flask, jsonify, request
from models import db, User, Post

app = Flask(__name__)

# 创建用户
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify({'id': user.id, 'username': user.username}), 201

# 获取所有用户
@app.route('/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([{'id': u.id, 'username': u.username, 'email': u.email} for u in users])

# 获取单个用户
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = User.query.get_or_404(user_id)
return jsonify({'id': user.id, 'username': user.username, 'email': user.email})

# 更新用户
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
user = User.query.get_or_404(user_id)
data = request.get_json()
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
db.session.commit()
return jsonify({'id': user.id, 'username': user.username, 'email': user.email})

# 删除用户
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
return '', 204

表单处理(Flask-WTF)

安装和配置

pip install flask-wtf
from flask import Flask, render_template, redirect, url_for, flash
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, Email, Length

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# 定义表单
class LoginForm(FlaskForm):
username = StringField('用户名', validators=[DataRequired(), Length(min=3, max=20)])
password = PasswordField('密码', validators=[DataRequired(), Length(min=6)])
submit = SubmitField('登录')

class RegisterForm(FlaskForm):
username = StringField('用户名', validators=[DataRequired(), Length(min=3, max=20)])
email = StringField('邮箱', validators=[DataRequired(), Email()])
password = PasswordField('密码', validators=[DataRequired(), Length(min=6)])
submit = SubmitField('注册')

# 处理表单
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
# 验证用户
flash('登录成功!', 'success')
return redirect(url_for('index'))
return render_template('login.html', form=form)

模板 templates/login.html

{% extends "base.html" %}

{% block content %}
<h1>登录</h1>

<form method="POST">
{{ form.hidden_tag() }}

<div>
{{ form.username.label }}
{{ form.username() }}
{% if form.username.errors %}
{% for error in form.username.errors %}
<span class="error">{{ error }}</span>
{% endfor %}
{% endif %}
</div>

<div>
{{ form.password.label }}
{{ form.password() }}
{% if form.password.errors %}
{% for error in form.password.errors %}
<span class="error">{{ error }}</span>
{% endfor %}
{% endif %}
</div>

{{ form.submit() }}
</form>
{% endblock %}

用户认证(Flask-Login)

from flask import Flask, render_template, redirect, url_for, flash
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'

# 用户模型
class User(UserMixin):
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password = password

# 模拟用户数据库
users = {
'1': User('1', 'admin', 'password'),
'2': User('2', 'user', 'password')
}

@login_manager.user_loader
def load_user(user_id):
return users.get(user_id)

# 登录
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']

# 验证用户(实际应用中应该查询数据库)
user = next((u for u in users.values() if u.username == username), None)
if user and user.password == password:
login_user(user)
next_page = request.args.get('next')
return redirect(next_page or url_for('index'))
flash('用户名或密码错误', 'danger')

return render_template('login.html')

# 登出
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('已退出登录', 'info')
return redirect(url_for('index'))

# 受保护的路由
@app.route('/dashboard')
@login_required
def dashboard():
return f'欢迎, {current_user.username}!'

RESTful API 开发

使用 Flask-RESTful

pip install flask-restful
from flask import Flask
from flask_restful import Api, Resource, reqparse

app = Flask(__name__)
api = Api(app)

# 模拟数据存储
items = []

# 请求解析器
parser = reqparse.RequestParser()
parser.add_argument('name', required=True, help='名称不能为空')
parser.add_argument('price', type=float, required=True, help='价格不能为空')

# 资源类
class Item(Resource):
def get(self, item_id):
"""获取单个项目"""
item = next((item for item in items if item['id'] == item_id), None)
if item:
return item, 200
return {'message': '项目不存在'}, 404

def put(self, item_id):
"""更新项目"""
args = parser.parse_args()
item = next((item for item in items if item['id'] == item_id), None)
if item:
item['name'] = args['name']
item['price'] = args['price']
return item, 200
return {'message': '项目不存在'}, 404

def delete(self, item_id):
"""删除项目"""
global items
items = [item for item in items if item['id'] != item_id]
return {'message': '项目已删除'}, 200

class ItemList(Resource):
def get(self):
"""获取所有项目"""
return items, 200

def post(self):
"""创建新项目"""
args = parser.parse_args()
item = {
'id': len(items) + 1,
'name': args['name'],
'price': args['price']
}
items.append(item)
return item, 201

# 注册资源
api.add_resource(ItemList, '/items')
api.add_resource(Item, '/items/<int:item_id>')

原生 Flask RESTful API

from flask import Flask, jsonify, request
from functools import wraps

app = Flask(__name__)

# 模拟数据
users = [
{'id': 1, 'name': '张三', 'email': '[email protected]'},
{'id': 2, 'name': '李四', 'email': '[email protected]'}
]

# 认证装饰器
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or api_key != 'your-api-key':
return jsonify({'error': '无效的 API Key'}), 401
return f(*args, **kwargs)
return decorated_function

# API 路由
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
"""获取所有用户"""
return jsonify({'users': users})

@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""获取单个用户"""
user = next((u for u in users if u['id'] == user_id), None)
if user:
return jsonify({'user': user})
return jsonify({'error': '用户不存在'}), 404

@app.route('/api/users', methods=['POST'])
def create_user():
"""创建用户"""
data = request.get_json()

if not data or 'name' not in data or 'email' not in data:
return jsonify({'error': '缺少必要字段'}), 400

new_user = {
'id': len(users) + 1,
'name': data['name'],
'email': data['email']
}
users.append(new_user)

return jsonify({'user': new_user}), 201

@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""更新用户"""
user = next((u for u in users if u['id'] == user_id), None)
if not user:
return jsonify({'error': '用户不存在'}), 404

data = request.get_json()
user['name'] = data.get('name', user['name'])
user['email'] = data.get('email', user['email'])

return jsonify({'user': user})

@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""删除用户"""
global users
users = [u for u in users if u['id'] != user_id]
return '', 204

# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': '资源不存在'}), 404

@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': '服务器内部错误'}), 500

应用配置

配置管理

from flask import Flask

app = Flask(__name__)

# 方式1:直接配置
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'your-secret-key'

# 方式2:从对象导入
class Config:
DEBUG = False
SECRET_KEY = 'production-secret-key'
SQLALCHEMY_DATABASE_URI = 'sqlite:///app.db'

class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'

class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_DATABASE_URI = 'postgresql://user:pass@localhost/db'

# 根据环境加载配置
import os
env = os.environ.get('FLASK_ENV', 'development')
if env == 'production':
app.config.from_object(ProductionConfig)
else:
app.config.from_object(DevelopmentConfig)

# 方式3:从文件导入
app.config.from_pyfile('config.py')

# 方式4:从环境变量导入
app.config.from_envvar('FLASK_CONFIG')

蓝图(Blueprint)

蓝图用于组织大型应用的路由和视图。

# auth.py
from flask import Blueprint, render_template, redirect, url_for

auth_bp = Blueprint('auth', __name__, url_prefix='/auth')

@auth_bp.route('/login')
def login():
return render_template('auth/login.html')

@auth_bp.route('/register')
def register():
return render_template('auth/register.html')

# blog.py
from flask import Blueprint

blog_bp = Blueprint('blog', __name__)

@blog_bp.route('/')
def index():
return '博客首页'

@blog_bp.route('/<int:post_id>')
def show_post(post_id):
return f'文章 {post_id}'

# app.py
from flask import Flask
from auth import auth_bp
from blog import blog_bp

app = Flask(__name__)

# 注册蓝图
app.register_blueprint(auth_bp)
app.register_blueprint(blog_bp)

部署

使用 Gunicorn

# 安装 Gunicorn
pip install gunicorn

# 运行应用
gunicorn -w 4 -b 0.0.0.0:8000 app:app

# -w 4: 4 个工作进程
# -b: 绑定地址
# app:app: 模块名:应用实例名

使用 Docker

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 5000

CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:5000", "app:app"]

实际应用示例

示例 1:待办事项应用

from flask import Flask, render_template, request, redirect, url_for, jsonify
from datetime import datetime

app = Flask(__name__)

# 模拟数据库
todos = [
{'id': 1, 'title': '学习 Flask', 'completed': False, 'created_at': datetime.now()},
{'id': 2, 'title': '完成项目', 'completed': False, 'created_at': datetime.now()}
]

@app.route('/')
def index():
return render_template('todos.html', todos=todos)

@app.route('/api/todos', methods=['GET'])
def get_todos():
return jsonify(todos)

@app.route('/api/todos', methods=['POST'])
def create_todo():
data = request.get_json()
new_todo = {
'id': len(todos) + 1,
'title': data['title'],
'completed': False,
'created_at': datetime.now()
}
todos.append(new_todo)
return jsonify(new_todo), 201

@app.route('/api/todos/<int:todo_id>', methods=['PUT'])
def update_todo(todo_id):
todo = next((t for t in todos if t['id'] == todo_id), None)
if not todo:
return jsonify({'error': '待办事项不存在'}), 404

data = request.get_json()
todo['title'] = data.get('title', todo['title'])
todo['completed'] = data.get('completed', todo['completed'])

return jsonify(todo)

@app.route('/api/todos/<int:todo_id>', methods=['DELETE'])
def delete_todo(todo_id):
global todos
todos = [t for t in todos if t['id'] != todo_id]
return '', 204

if __name__ == '__main__':
app.run(debug=True)

示例 2:文件上传服务

from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory
import os
from werkzeug.utils import secure_filename

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB

ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}

def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/')
def index():
files = os.listdir(app.config['UPLOAD_FOLDER'])
return render_template('upload.html', files=files)

@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
flash('没有选择文件', 'danger')
return redirect(request.url)

file = request.files['file']
if file.filename == '':
flash('没有选择文件', 'danger')
return redirect(request.url)

if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
flash('文件上传成功', 'success')
return redirect(url_for('index'))
else:
flash('不支持的文件类型', 'danger')
return redirect(request.url)

@app.route('/uploads/<filename>')
def download_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)

if __name__ == '__main__':
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
app.run(debug=True)

小结

Flask 是一个轻量级但功能强大的 Web 框架,适合从简单到复杂的各种 Web 应用。

核心概念

  1. 应用实例:Flask 类的实例
  2. 路由:URL 到视图函数的映射
  3. 视图函数:处理请求并返回响应
  4. 模板:Jinja2 模板引擎
  5. 请求/响应:处理 HTTP 请求和响应

常用扩展

  • Flask-SQLAlchemy:数据库 ORM
  • Flask-WTF:表单处理
  • Flask-Login:用户认证
  • Flask-Migrate:数据库迁移
  • Flask-RESTful:RESTful API

最佳实践

  • 使用蓝图组织大型应用
  • 使用应用工厂模式
  • 分离配置(开发/生产环境)
  • 使用虚拟环境
  • 使用 Gunicorn 部署生产环境

练习

  1. 创建一个简单的博客应用,包含文章列表、详情页和创建功能
  2. 实现用户注册和登录功能
  3. 创建一个 RESTful API,实现 CRUD 操作
  4. 实现文件上传和下载功能
  5. 使用蓝图重构应用结构

参考资源