跳到主要内容

Python 性能优化

性能优化是高级 Python 开发的重要技能。本章将介绍性能分析工具和常见的优化策略,帮助你写出更高效的代码。

性能优化原则

先测量,后优化

"过早优化是万恶之源" —— Donald Knuth

优化之前必须先测量,找到真正的瓶颈:

  1. 确定性能问题:代码是否真的慢?
  2. 定位瓶颈:哪部分代码消耗了大部分时间?
  3. 针对性优化:只优化热点代码
  4. 验证效果:优化后是否真的变快了?

性能优化流程

性能分析工具

timeit 模块

timeit 模块用于精确测量小段代码的执行时间。

命令行使用

python -m timeit "sum(range(1000))"
# 100000 loops, best of 5: 2.45 usec per loop

python -m timeit -n 1000 -r 5 "[x**2 for x in range(1000)]"
# -n: 每轮循环次数
# -r: 重复轮数

代码中使用

import timeit

time_taken = timeit.timeit("sum(range(1000))", number=10000)
print(f"执行 10000 次耗时: {time_taken:.4f} 秒")

time_taken = timeit.timeit(
"[x**2 for x in range(1000)]",
number=1000
)
print(f"列表推导式耗时: {time_taken:.4f} 秒")

time_taken = timeit.timeit(
"list(map(lambda x: x**2, range(1000)))",
number=1000
)
print(f"map 函数耗时: {time_taken:.4f} 秒")

比较不同实现

import timeit

setup = "data = list(range(10000))"

test_for = """
result = []
for x in data:
result.append(x * 2)
"""

test_list_comp = "result = [x * 2 for x in data]"
test_map = "result = list(map(lambda x: x * 2, data))"

time_for = timeit.timeit(test_for, setup=setup, number=1000)
time_comp = timeit.timeit(test_list_comp, setup=setup, number=1000)
time_map = timeit.timeit(test_map, setup=setup, number=1000)

print(f"for 循环: {time_for:.4f} 秒")
print(f"列表推导式: {time_comp:.4f} 秒")
print(f"map 函数: {time_map:.4f} 秒")

解释timeit 会多次运行代码并取最佳结果,减少系统干扰。setup 参数用于设置测试环境,不计入计时。

cProfile 模块

cProfile 是 Python 内置的性能分析器,可以分析整个程序或函数的执行情况。

分析整个脚本

python -m cProfile -s cumtime myscript.py

输出示例:

         214 function calls (207 primitive calls) in 0.002 seconds

Ordered by: cumulative time

ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.002 0.002 {built-in method builtins.exec}
1 0.000 0.000 0.002 0.002 myscript.py:1(<module>)
10 0.001 0.000 0.001 0.000 myscript.py:5(process_item)

解释

  • ncalls:调用次数
  • tottime:函数本身消耗的时间(不含子函数)
  • cumtime:累计时间(含子函数)
  • percall:每次调用平均时间

在代码中使用

import cProfile
import pstats
from io import StringIO

def slow_function():
total = 0
for i in range(1000000):
total += i
return total

def fast_function():
return sum(range(1000000))

def main():
slow_function()
fast_function()

pr = cProfile.Profile()
pr.enable()

main()

pr.disable()

s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10)
print(s.getvalue())

使用上下文管理器(Python 3.8+)

import cProfile

with cProfile.Profile() as pr:
result = sum(range(1000000))

pr.print_stats(sort='cumulative')

保存分析结果

import cProfile

cProfile.run('main()', 'profile_stats')

import pstats
p = pstats.Stats('profile_stats')
p.strip_dirs().sort_stats('cumulative').print_stats(20)

line_profiler(逐行分析)

line_profiler 可以分析每一行代码的执行时间。

安装

pip install line_profiler

使用装饰器

from line_profiler import profile

@profile
def process_data(data):
result = []
for item in data:
processed = item * 2
result.append(processed)
return result

if __name__ == '__main__':
data = list(range(100000))
process_data(data)

运行:

kernprof -l -v myscript.py

输出示例:

Timer unit: 1e-06 s

Total time: 0.025 s
File: myscript.py
Function: process_data at line 3

Line # Hits Time Per Hit % Time Line Contents
==============================================================
3 @profile
4 def process_data(data):
5 1 2 2.0 0.0 result = []
6 100001 15000 0.1 60.0 for item in data:
7 100000 10000 0.1 40.0 processed = item * 2
8 100000 5000 0.0 0.0 result.append(processed)
9 1 0 0.0 0.0 return result

解释% Time 列显示每行代码占总时间的百分比,可以快速定位热点代码。

memory_profiler(内存分析)

分析 Python 程序的内存使用情况。

安装

pip install memory_profiler

使用

from memory_profiler import profile

@profile
def create_large_list():
data = [i ** 2 for i in range(100000)]
return data

if __name__ == '__main__':
create_large_list()

运行:

python -m memory_profiler myscript.py

输出示例:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
3 38.5 MiB 38.5 MiB 1 @profile
4 def create_large_list():
5 42.1 MiB 3.6 MiB 100003 data = [i ** 2 for i in range(100000)]
6 42.1 MiB 0.0 MiB 1 return data

常见优化策略

1. 使用内置函数

Python 内置函数用 C 实现,比 Python 循环快得多。

import timeit

data = list(range(10000))

def manual_sum():
total = 0
for x in data:
total += x
return total

print(timeit.timeit(manual_sum, number=1000))
print(timeit.timeit("sum(data)", globals=globals(), number=1000))

解释sum() 内置函数比手动循环快 5-10 倍。

2. 使用列表推导式和生成器

import timeit

def for_loop():
result = []
for i in range(10000):
if i % 2 == 0:
result.append(i ** 2)
return result

def list_comp():
return [i ** 2 for i in range(10000) if i % 2 == 0]

def gen_expr():
return list(i ** 2 for i in range(10000) if i % 2 == 0)

print(f"for 循环: {timeit.timeit(for_loop, number=100):.4f}")
print(f"列表推导式: {timeit.timeit(list_comp, number=100):.4f}")
print(f"生成器表达式: {timeit.timeit(gen_expr, number=100):.4f}")

解释:列表推导式比 for 循环快约 30%,因为它在 C 层面执行循环。

3. 使用集合和字典进行快速查找

import timeit

data = list(range(10000))
target = 9999

def list_search():
return target in data

def set_search():
s = set(data)
return target in s

print(f"列表查找: {timeit.timeit(list_search, number=100):.4f}")
print(f"集合查找: {timeit.timeit(set_search, number=100):.4f}")

解释:列表查找是 O(n),集合查找是 O(1)。对于大量数据的成员检查,使用集合或字典。

4. 避免重复计算

import timeit

def bad():
result = []
for i in range(1000):
result.append(len(result) + i)
return result

def good():
result = []
length = 0
for i in range(1000):
result.append(length + i)
length += 1
return result

print(f"重复计算: {timeit.timeit(bad, number=1000):.4f}")
print(f"缓存结果: {timeit.timeit(good, number=1000):.4f}")

5. 使用字符串 join 而非 + 拼接

import timeit

def concat_plus():
result = ""
for i in range(1000):
result += str(i)
return result

def concat_join():
return "".join(str(i) for i in range(1000))

print(f"+ 拼接: {timeit.timeit(concat_plus, number=100):.4f}")
print(f"join: {timeit.timeit(concat_join, number=100):.4f}")

解释:字符串是不可变的,每次 + 都会创建新字符串。join() 一次性分配内存,效率更高。

6. 使用局部变量

import timeit

data = list(range(10000))

def use_global():
total = 0
for i in data:
total += i
return total

def use_local():
local_data = data
total = 0
for i in local_data:
total += i
return total

print(f"全局变量: {timeit.timeit(use_global, number=1000):.4f}")
print(f"局部变量: {timeit.timeit(use_local, number=1000):.4f}")

解释:局部变量访问比全局变量快,因为 Python 查找局部变量只需索引数组,而全局变量需要字典查找。

7. 使用 functools.lru_cache 缓存

import timeit
from functools import lru_cache

@lru_cache(maxsize=None)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)

def fibonacci_no_cache(n):
if n <= 1:
return n
return fibonacci_no_cache(n - 1) + fibonacci_no_cache(n - 2)

print(f"带缓存: {timeit.timeit(lambda: fibonacci(30), number=1):.6f}")
print(f"无缓存: {timeit.timeit(lambda: fibonacci_no_cache(30), number=1):.6f}")

解释lru_cache 装饰器自动缓存函数结果,对于递归或重复计算非常有效。

8. 使用 NumPy 进行数值计算

import timeit
import numpy as np

def python_sum():
return sum(x * x for x in range(100000))

def numpy_sum():
arr = np.arange(100000)
return np.sum(arr * arr)

print(f"Python: {timeit.timeit(python_sum, number=10):.4f}")
print(f"NumPy: {timeit.timeit(numpy_sum, number=10):.4f}")

解释:NumPy 使用 C 实现的向量化操作,比纯 Python 快数十倍。

算法优化

时间复杂度对比

import timeit

def linear_search(lst, target):
for i, item in enumerate(lst):
if item == target:
return i
return -1

def binary_search(lst, target):
left, right = 0, len(lst) - 1
while left <= right:
mid = (left + right) // 2
if lst[mid] == target:
return mid
elif lst[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1

data = list(range(100000))
target = 99999

print(f"线性搜索 O(n): {timeit.timeit(lambda: linear_search(data, target), number=10):.4f}")
print(f"二分搜索 O(log n): {timeit.timeit(lambda: binary_search(data, target), number=10):.4f}")

选择正确的数据结构

操作列表集合字典
访问元素O(1)-O(1)
查找元素O(n)O(1)O(1)
插入末尾O(1)O(1)O(1)
插入任意位置O(n)--
删除元素O(n)O(1)O(1)
from collections import deque

def list_queue():
lst = []
for i in range(10000):
lst.insert(0, i)
for _ in range(10000):
lst.pop(0)

def deque_queue():
d = deque()
for i in range(10000):
d.appendleft(i)
for _ in range(10000):
d.popleft()

print(f"列表作为队列: {timeit.timeit(list_queue, number=10):.4f}")
print(f"deque 作为队列: {timeit.timeit(deque_queue, number=10):.4f}")

解释:列表在头部插入/删除是 O(n),deque 是 O(1)。根据操作选择合适的数据结构。

并发优化

多线程 vs 多进程

import time
import threading
import multiprocessing

def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2
return total

def run_sequential():
for _ in range(4):
cpu_bound_task(100000)

def run_threading():
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_bound_task, args=(100000,))
threads.append(t)
t.start()
for t in threads:
t.join()

def run_multiprocessing():
processes = []
for _ in range(4):
p = multiprocessing.Process(target=cpu_bound_task, args=(100000,))
processes.append(p)
p.start()
for p in processes:
p.join()

if __name__ == '__main__':
print(f"顺序执行: {timeit.timeit(run_sequential, number=1):.4f}")
print(f"多线程: {timeit.timeit(run_threading, number=1):.4f}")
print(f"多进程: {timeit.timeit(run_multiprocessing, number=1):.4f}")

解释

  • CPU 密集型任务:多进程(绕过 GIL)
  • I/O 密集型任务:多线程或异步

使用 concurrent.futures

import time
import concurrent.futures

def process_item(item):
time.sleep(0.01)
return item ** 2

items = list(range(100))

def sequential():
return [process_item(i) for i in items]

def thread_pool():
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(process_item, items))

def process_pool():
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
return list(executor.map(process_item, items))

print(f"顺序执行: {timeit.timeit(sequential, number=1):.4f}")
print(f"线程池: {timeit.timeit(thread_pool, number=1):.4f}")
print(f"进程池: {timeit.timeit(process_pool, number=1):.4f}")

性能优化检查清单

  1. 算法层面

    • 是否选择了正确的时间复杂度?
    • 是否使用了合适的数据结构?
    • 是否有不必要的嵌套循环?
  2. 代码层面

    • 是否使用了内置函数?
    • 是否使用了列表推导式?
    • 是否避免了重复计算?
    • 是否使用了缓存?
  3. 内存层面

    • 是否使用了生成器处理大数据?
    • 是否有内存泄漏?
    • 是否避免了不必要的对象创建?
  4. 并发层面

    • CPU 密集型任务是否使用了多进程?
    • I/O 密集型任务是否使用了异步或多线程?

小结

本章我们学习了:

  1. 性能优化的原则和流程
  2. timeit 精确测量代码执行时间
  3. cProfile 分析程序性能瓶颈
  4. line_profiler 逐行分析
  5. memory_profiler 内存分析
  6. 常见的优化策略
  7. 算法和数据结构选择
  8. 并发优化技巧

练习

  1. 使用 cProfile 分析一个你写的程序,找出性能瓶颈
  2. 比较列表推导式和 for 循环的性能差异
  3. 使用 lru_cache 优化一个递归函数
  4. 使用 NumPy 优化一个数值计算任务

延伸阅读