Python 性能优化
性能优化是高级 Python 开发的重要技能。本章将介绍性能分析工具和常见的优化策略,帮助你写出更高效的代码。
性能优化原则
先测量,后优化
"过早优化是万恶之源" —— Donald Knuth
优化之前必须先测量,找到真正的瓶颈:
- 确定性能问题:代码是否真的慢?
- 定位瓶颈:哪部分代码消耗了大部分时间?
- 针对性优化:只优化热点代码
- 验证效果:优化后是否真的变快了?
性能优化流程
性能分析工具
timeit 模块
timeit 模块用于精确测量小段代码的执行时间。
命令行使用
python -m timeit "sum(range(1000))"
# 100000 loops, best of 5: 2.45 usec per loop
python -m timeit -n 1000 -r 5 "[x**2 for x in range(1000)]"
# -n: 每轮循环次数
# -r: 重复轮数
代码中使用
import timeit
time_taken = timeit.timeit("sum(range(1000))", number=10000)
print(f"执行 10000 次耗时: {time_taken:.4f} 秒")
time_taken = timeit.timeit(
"[x**2 for x in range(1000)]",
number=1000
)
print(f"列表推导式耗时: {time_taken:.4f} 秒")
time_taken = timeit.timeit(
"list(map(lambda x: x**2, range(1000)))",
number=1000
)
print(f"map 函数耗时: {time_taken:.4f} 秒")
比较不同实现
import timeit
setup = "data = list(range(10000))"
test_for = """
result = []
for x in data:
result.append(x * 2)
"""
test_list_comp = "result = [x * 2 for x in data]"
test_map = "result = list(map(lambda x: x * 2, data))"
time_for = timeit.timeit(test_for, setup=setup, number=1000)
time_comp = timeit.timeit(test_list_comp, setup=setup, number=1000)
time_map = timeit.timeit(test_map, setup=setup, number=1000)
print(f"for 循环: {time_for:.4f} 秒")
print(f"列表推导式: {time_comp:.4f} 秒")
print(f"map 函数: {time_map:.4f} 秒")
解释:timeit 会多次运行代码并取最佳结果,减少系统干扰。setup 参数用于设置测试环境,不计入计时。
cProfile 模块
cProfile 是 Python 内置的性能分析器,可以分析整个程序或函数的执行情况。
分析整个脚本
python -m cProfile -s cumtime myscript.py
输出示例:
214 function calls (207 primitive calls) in 0.002 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.002 0.002 {built-in method builtins.exec}
1 0.000 0.000 0.002 0.002 myscript.py:1(<module>)
10 0.001 0.000 0.001 0.000 myscript.py:5(process_item)
解释:
ncalls:调用次数tottime:函数本身消耗的时间(不含子函数)cumtime:累计时间(含子函数)percall:每次调用平均时间
在代码中使用
import cProfile
import pstats
from io import StringIO
def slow_function():
total = 0
for i in range(1000000):
total += i
return total
def fast_function():
return sum(range(1000000))
def main():
slow_function()
fast_function()
pr = cProfile.Profile()
pr.enable()
main()
pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10)
print(s.getvalue())
使用上下文管理器(Python 3.8+)
import cProfile
with cProfile.Profile() as pr:
result = sum(range(1000000))
pr.print_stats(sort='cumulative')
保存分析结果
import cProfile
cProfile.run('main()', 'profile_stats')
import pstats
p = pstats.Stats('profile_stats')
p.strip_dirs().sort_stats('cumulative').print_stats(20)
line_profiler(逐行分析)
line_profiler 可以分析每一行代码的执行时间。
安装
pip install line_profiler
使用装饰器
from line_profiler import profile
@profile
def process_data(data):
result = []
for item in data:
processed = item * 2
result.append(processed)
return result
if __name__ == '__main__':
data = list(range(100000))
process_data(data)
运行:
kernprof -l -v myscript.py
输出示例:
Timer unit: 1e-06 s
Total time: 0.025 s
File: myscript.py
Function: process_data at line 3
Line # Hits Time Per Hit % Time Line Contents
==============================================================
3 @profile
4 def process_data(data):
5 1 2 2.0 0.0 result = []
6 100001 15000 0.1 60.0 for item in data:
7 100000 10000 0.1 40.0 processed = item * 2
8 100000 5000 0.0 0.0 result.append(processed)
9 1 0 0.0 0.0 return result
解释:% Time 列显示每行代码占总时间的百分比,可以快速定位热点代码。
memory_profiler(内存分析)
分析 Python 程序的内存使用情况。
安装
pip install memory_profiler
使用
from memory_profiler import profile
@profile
def create_large_list():
data = [i ** 2 for i in range(100000)]
return data
if __name__ == '__main__':
create_large_list()
运行:
python -m memory_profiler myscript.py
输出示例:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
3 38.5 MiB 38.5 MiB 1 @profile
4 def create_large_list():
5 42.1 MiB 3.6 MiB 100003 data = [i ** 2 for i in range(100000)]
6 42.1 MiB 0.0 MiB 1 return data
常见优化策略
1. 使用内置函数
Python 内置函数用 C 实现,比 Python 循环快得多。
import timeit
data = list(range(10000))
def manual_sum():
total = 0
for x in data:
total += x
return total
print(timeit.timeit(manual_sum, number=1000))
print(timeit.timeit("sum(data)", globals=globals(), number=1000))
解释:sum() 内置函数比手动循环快 5-10 倍。
2. 使用列表推导式和生成器
import timeit
def for_loop():
result = []
for i in range(10000):
if i % 2 == 0:
result.append(i ** 2)
return result
def list_comp():
return [i ** 2 for i in range(10000) if i % 2 == 0]
def gen_expr():
return list(i ** 2 for i in range(10000) if i % 2 == 0)
print(f"for 循环: {timeit.timeit(for_loop, number=100):.4f}")
print(f"列表推导式: {timeit.timeit(list_comp, number=100):.4f}")
print(f"生成器表达式: {timeit.timeit(gen_expr, number=100):.4f}")
解释:列表推导式比 for 循环快约 30%,因为它在 C 层面执行循环。
3. 使用集合和字典进行快速查找
import timeit
data = list(range(10000))
target = 9999
def list_search():
return target in data
def set_search():
s = set(data)
return target in s
print(f"列表查找: {timeit.timeit(list_search, number=100):.4f}")
print(f"集合查找: {timeit.timeit(set_search, number=100):.4f}")
解释:列表查找是 O(n),集合查找是 O(1)。对于大量数据的成员检查,使用集合或字典。
4. 避免重复计算
import timeit
def bad():
result = []
for i in range(1000):
result.append(len(result) + i)
return result
def good():
result = []
length = 0
for i in range(1000):
result.append(length + i)
length += 1
return result
print(f"重复计算: {timeit.timeit(bad, number=1000):.4f}")
print(f"缓存结果: {timeit.timeit(good, number=1000):.4f}")
5. 使用字符串 join 而非 + 拼接
import timeit
def concat_plus():
result = ""
for i in range(1000):
result += str(i)
return result
def concat_join():
return "".join(str(i) for i in range(1000))
print(f"+ 拼接: {timeit.timeit(concat_plus, number=100):.4f}")
print(f"join: {timeit.timeit(concat_join, number=100):.4f}")
解释:字符串是不可变的,每次 + 都会创建新字符串。join() 一次性分配内存,效率更高。
6. 使用局部变量
import timeit
data = list(range(10000))
def use_global():
total = 0
for i in data:
total += i
return total
def use_local():
local_data = data
total = 0
for i in local_data:
total += i
return total
print(f"全局变量: {timeit.timeit(use_global, number=1000):.4f}")
print(f"局部变量: {timeit.timeit(use_local, number=1000):.4f}")
解释:局部变量访问比全局变量快,因为 Python 查找局部变量只需索引数组,而全局变量需要字典查找。
7. 使用 functools.lru_cache 缓存
import timeit
from functools import lru_cache
@lru_cache(maxsize=None)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
def fibonacci_no_cache(n):
if n <= 1:
return n
return fibonacci_no_cache(n - 1) + fibonacci_no_cache(n - 2)
print(f"带缓存: {timeit.timeit(lambda: fibonacci(30), number=1):.6f}")
print(f"无缓存: {timeit.timeit(lambda: fibonacci_no_cache(30), number=1):.6f}")
解释:lru_cache 装饰器自动缓存函数结果,对于递归或重复计算非常有效。
8. 使用 NumPy 进行数值计算
import timeit
import numpy as np
def python_sum():
return sum(x * x for x in range(100000))
def numpy_sum():
arr = np.arange(100000)
return np.sum(arr * arr)
print(f"Python: {timeit.timeit(python_sum, number=10):.4f}")
print(f"NumPy: {timeit.timeit(numpy_sum, number=10):.4f}")
解释:NumPy 使用 C 实现的向量化操作,比纯 Python 快数十倍。
算法优化
时间复杂度对比
import timeit
def linear_search(lst, target):
for i, item in enumerate(lst):
if item == target:
return i
return -1
def binary_search(lst, target):
left, right = 0, len(lst) - 1
while left <= right:
mid = (left + right) // 2
if lst[mid] == target:
return mid
elif lst[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1
data = list(range(100000))
target = 99999
print(f"线性搜索 O(n): {timeit.timeit(lambda: linear_search(data, target), number=10):.4f}")
print(f"二分搜索 O(log n): {timeit.timeit(lambda: binary_search(data, target), number=10):.4f}")
选择正确的数据结构
| 操作 | 列表 | 集合 | 字典 |
|---|---|---|---|
| 访问元素 | O(1) | - | O(1) |
| 查找元素 | O(n) | O(1) | O(1) |
| 插入末尾 | O(1) | O(1) | O(1) |
| 插入任意位置 | O(n) | - | - |
| 删除元素 | O(n) | O(1) | O(1) |
from collections import deque
def list_queue():
lst = []
for i in range(10000):
lst.insert(0, i)
for _ in range(10000):
lst.pop(0)
def deque_queue():
d = deque()
for i in range(10000):
d.appendleft(i)
for _ in range(10000):
d.popleft()
print(f"列表作为队列: {timeit.timeit(list_queue, number=10):.4f}")
print(f"deque 作为队列: {timeit.timeit(deque_queue, number=10):.4f}")
解释:列表在头部插入/删除是 O(n),deque 是 O(1)。根据操作选择合适的数据结构。
并发优化
多线程 vs 多进程
import time
import threading
import multiprocessing
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
def run_sequential():
for _ in range(4):
cpu_bound_task(100000)
def run_threading():
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_bound_task, args=(100000,))
threads.append(t)
t.start()
for t in threads:
t.join()
def run_multiprocessing():
processes = []
for _ in range(4):
p = multiprocessing.Process(target=cpu_bound_task, args=(100000,))
processes.append(p)
p.start()
for p in processes:
p.join()
if __name__ == '__main__':
print(f"顺序执行: {timeit.timeit(run_sequential, number=1):.4f}")
print(f"多线程: {timeit.timeit(run_threading, number=1):.4f}")
print(f"多进程: {timeit.timeit(run_multiprocessing, number=1):.4f}")
解释:
- CPU 密集型任务:多进程(绕过 GIL)
- I/O 密集型任务:多线程或异步
使用 concurrent.futures
import time
import concurrent.futures
def process_item(item):
time.sleep(0.01)
return item ** 2
items = list(range(100))
def sequential():
return [process_item(i) for i in items]
def thread_pool():
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(process_item, items))
def process_pool():
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
return list(executor.map(process_item, items))
print(f"顺序执行: {timeit.timeit(sequential, number=1):.4f}")
print(f"线程池: {timeit.timeit(thread_pool, number=1):.4f}")
print(f"进程池: {timeit.timeit(process_pool, number=1):.4f}")
性能优化检查清单
-
算法层面
- 是否选择了正确的时间复杂度?
- 是否使用了合适的数据结构?
- 是否有不必要的嵌套循环?
-
代码层面
- 是否使用了内置函数?
- 是否使用了列表推导式?
- 是否避免了重复计算?
- 是否使用了缓存?
-
内存层面
- 是否使用了生成器处理大数据?
- 是否有内存泄漏?
- 是否避免了不必要的对象创建?
-
并发层面
- CPU 密集型任务是否使用了多进程?
- I/O 密集型任务是否使用了异步或多线程?
小结
本章我们学习了:
- 性能优化的原则和流程
- timeit 精确测量代码执行时间
- cProfile 分析程序性能瓶颈
- line_profiler 逐行分析
- memory_profiler 内存分析
- 常见的优化策略
- 算法和数据结构选择
- 并发优化技巧
练习
- 使用 cProfile 分析一个你写的程序,找出性能瓶颈
- 比较列表推导式和 for 循环的性能差异
- 使用 lru_cache 优化一个递归函数
- 使用 NumPy 优化一个数值计算任务