性能优化
处理大规模数据时,Pandas 的性能优化至关重要。本章介绍提升 Pandas 性能的各种技巧和最佳实践。
性能诊断
计时方法
import pandas as pd
import numpy as np
import time
# 使用 time 模块
start = time.time()
# 执行代码
result = df.groupby('column').mean()
end = time.time()
print(f"耗时: {end - start:.4f} 秒")
# 使用 Jupyter magic 命令
# %timeit df.groupby('column').mean()
# %%timeit
# 代码块
# 使用 timeit 模块
import timeit
timeit.timeit(lambda: df.groupby('column').mean(), number=100)
内存使用分析
# 查看内存使用
df = pd.DataFrame({'A': range(1000000), 'B': range(1000000)})
print(df.memory_usage())
# Index 128
# A 8000000
# B 8000000
print(df.memory_usage(deep=True)) # 深度分析(包括字符串)
# 查看总内存
print(f"总内存: {df.memory_usage().sum() / 1024**2:.2f} MB")
# 查看数据类型信息
print(df.info(memory_usage='deep'))
向量化操作
避免循环
# ❌ 慢:使用循环
df = pd.DataFrame({'A': range(1000000), 'B': range(1000000)})
result = []
for i in range(len(df)):
result.append(df['A'].iloc[i] + df['B'].iloc[i])
df['C'] = result
# ✅ 快:向量化操作
df['C'] = df['A'] + df['B'] # 快几十倍
使用 NumPy 函数
# ❌ 慢:使用 apply
df['log_A'] = df['A'].apply(np.log)
# ✅ 快:使用 NumPy 向量化
df['log_A'] = np.log(df['A'])
# 其他常用 NumPy 函数
df['sqrt'] = np.sqrt(df['A'])
df['exp'] = np.exp(df['A'])
df['abs'] = np.abs(df['A'])
df['where'] = np.where(df['A'] > 500000, 'high', 'low')
内置方法优于 apply
# ❌ 慢:使用 apply
df['sum'] = df[['A', 'B']].apply(sum, axis=1)
# ✅ 快:使用内置方法
df['sum'] = df[['A', 'B']].sum(axis=1)
# 字符串操作
# ❌ 慢
df['upper'] = df['name'].apply(lambda x: x.upper())
# ✅ 快:使用 str 访问器
df['upper'] = df['name'].str.upper()
数据类型优化
使用合适的数据类型
# 创建示例数据
df = pd.DataFrame({
'id': range(1000000),
'small_int': np.random.randint(0, 100, 1000000),
'small_float': np.random.random(1000000),
'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000)
})
# 查看默认类型
print(df.dtypes)
# id int64
# small_int int64
# small_float float64
# category object
# 优化整数类型
df['small_int'] = df['small_int'].astype('int8') # 0-255 范围
# 优化浮点类型
df['small_float'] = df['small_float'].astype('float32')
# 转换为分类类型
df['category'] = df['category'].astype('category')
# 查看优化后的内存使用
print(f"优化前: 45.77 MB")
print(f"优化后: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
类型选择指南
| 数据范围 | 推荐类型 |
|---|---|
| 0 到 255 | uint8 |
| -128 到 127 | int8 |
| 0 到 65535 | uint16 |
| -32768 到 32767 | int16 |
| 0 到 4294967295 | uint32 |
| -2147483648 到 2147483647 | int32 |
| 小数(低精度) | float32 |
| 小数(高精度) | float64 |
| 有限值集合(字符串) | category |
使用分类类型
# 对于重复值多的字符串列
df['status'] = df['status'].astype('category')
# 指定分类顺序
df['level'] = pd.Categorical(
df['level'],
categories=['low', 'medium', 'high'],
ordered=True
)
# 分类类型的优势
# 1. 内存占用更小
# 2. 某些操作更快
# 3. 支持排序
# 适用场景
# - 基数比(唯一值/总行数)< 50%
# - 字符串列且重复值多
# - 需要定义顺序的类别
索引优化
排序索引
# 排序后的索引查询更快
df = df.sort_index()
# 检查是否已排序
print(df.index.is_monotonic_increasing)
唯一索引
# 唯一索引的查询性能更好
print(df.index.is_unique)
# 如果可能,使用唯一值作为索引
df = df.set_index('unique_id')
避免链式索引
# ❌ 慢且有警告
df[df['A'] > 100]['B'] = 10
# ✅ 快且正确
df.loc[df['A'] > 100, 'B'] = 10
# ❌ 慢
df['A'][df['A'] > 100] = 10
# ✅ 快
df.loc[df['A'] > 100, 'A'] = 10
数据读取优化
分块读取
# 对于大文件,分块读取
chunk_size = 100000
chunks = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# 处理每个块
processed = chunk[chunk['value'] > 0]
chunks.append(processed)
df = pd.concat(chunks, ignore_index=True)
指定数据类型
# 读取时指定数据类型,避免推断开销
dtypes = {
'id': 'int32',
'name': 'str',
'category': 'category',
'value': 'float32'
}
df = pd.read_csv('data.csv', dtype=dtypes)
只读取需要的列
# 只读取需要的列
df = pd.read_csv('data.csv', usecols=['col1', 'col2', 'col3'])
# 只读取前几行预览
df = pd.read_csv('data.csv', nrows=1000)
# 跳过不需要的行
df = pd.read_csv('data.csv', skiprows=range(1, 100)) # 跳过前99行数据
使用更快的文件格式
# CSV vs Parquet vs Feather vs Pickle
# 写入
df.to_csv('data.csv', index=False)
df.to_parquet('data.parquet', index=False)
df.to_feather('data.feather')
df.to_pickle('data.pkl')
# 读取性能比较(大致排序)
# 1. Feather - 最快(但文件较大)
# 2. Pickle - 快
# 3. Parquet - 中等(压缩好,适合存储)
# 4. CSV - 最慢(但通用性好)
# 推荐
# - 临时数据:Feather
# - 长期存储:Parquet
# - 数据交换:CSV
查询优化
使用 query 方法
# 对于复杂条件,query 可能更快且代码更清晰
# ❌ 普通方式
result = df[(df['A'] > 100) & (df['B'] < 50) & (df['C'] == 'value')]
# ✅ query 方法
result = df.query('A > 100 and B < 50 and C == "value"')
# 使用外部变量
threshold = 100
result = df.query('A > @threshold')
使用 isin 代替多个 OR
# ❌ 慢:多个 OR
result = df[(df['category'] == 'A') |
(df['category'] == 'B') |
(df['category'] == 'C')]
# ✅ 快:使用 isin
result = df[df['category'].isin(['A', 'B', 'C'])]
避免重复计算
# ❌ 重复计算
df['new_col'] = df['A'] * df['B'] / (df['A'] * df['B'] + 1)
# ✅ 缓存中间结果
temp = df['A'] * df['B']
df['new_col'] = temp / (temp + 1)
分组聚合优化
使用内置聚合函数
# ❌ 慢:使用 apply
result = df.groupby('category')['value'].apply(np.mean)
# ✅ 快:使用内置方法
result = df.groupby('category')['value'].mean()
# 或使用 agg
result = df.groupby('category').agg({'value': 'mean'})
预过滤数据
# ❌ 慢:在分组后过滤
result = df.groupby('category').filter(lambda x: len(x) > 100)
# ✅ 快:先过滤再分组
counts = df['category'].value_counts()
valid_categories = counts[counts > 100].index
result = df[df['category'].isin(valid_categories)].groupby('category').mean()
使用 categorize 加速分组
# 对于大数据集,分类类型可以加速分组
df['category'] = df['category'].astype('category')
result = df.groupby('category')['value'].mean()
内存管理
释放内存
# 删除不需要的变量
del large_df
# 强制垃圾回收
import gc
gc.collect()
# 删除不需要的列
df = df.drop(['unused_col1', 'unused_col2'], axis=1)
使用 inplace
# 对于大数据,使用 inplace 避免复制
df.drop('column', axis=1, inplace=True)
df.sort_values('column', inplace=True)
# 注意:不是所有操作都支持 inplace
及时转换类型
# 读取后立即优化类型
df = pd.read_csv('data.csv')
df = df.astype({
'int_col': 'int32',
'float_col': 'float32',
'str_col': 'category'
})
并行处理
使用多核处理
# 安装:pip install swifter
import swifter
# 并行应用函数
df['new_col'] = df['col'].swifter.apply(lambda x: complex_function(x))
# 并行 apply
result = df.swifter.apply(lambda row: row['A'] + row['B'], axis=1)
使用 modin
# 安装:pip install modin
# Modin 使用 Ray 或 Dask 进行并行处理
# 替换 import pandas as pd
import modin.pandas as pd
# 代码基本不变,自动并行化
df = pd.read_csv('large_file.csv')
result = df.groupby('column').mean()
实战示例
示例 1:大数据处理优化
import pandas as pd
import numpy as np
# 创建大型测试数据
np.random.seed(42)
n = 10_000_000
# ❌ 慢:直接创建
# df = pd.DataFrame({
# 'id': range(n),
# 'category': np.random.choice(['A', 'B', 'C', 'D'], n),
# 'value': np.random.randn(n)
# })
# ✅ 快:优化类型后创建
df = pd.DataFrame({
'id': pd.array(range(n), dtype='int32'),
'category': pd.Categorical(np.random.choice(['A', 'B', 'C', 'D'], n)),
'value': pd.array(np.random.randn(n), dtype='float32')
})
print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 优化聚合操作
# ❌ 慢
# result = df.groupby('category')['value'].apply(lambda x: x.mean())
# ✅ 快
result = df.groupby('category')['value'].mean()
print(result)
示例 2:文件处理优化
# 模拟大文件处理
# 方法 1:分块处理
def process_large_file(filepath, output_path, chunksize=100000):
"""分块处理大文件"""
results = []
for i, chunk in enumerate(pd.read_csv(filepath, chunksize=chunksize)):
# 处理每个块
chunk = chunk[chunk['value'] > 0] # 过滤
chunk['processed'] = chunk['value'] * 2 # 计算
results.append(chunk)
# 可选:定期写入
if (i + 1) % 10 == 0:
pd.concat(results).to_csv(
output_path,
mode='a',
header=not pd.io.common.file_exists(output_path)
)
results = []
return pd.concat(results)
# 方法 2:使用 Parquet 格式
# 保存
# df.to_parquet('data.parquet')
# 读取特定列
# df = pd.read_parquet('data.parquet', columns=['col1', 'col2'])
# 过滤读取(Parquet 支持谓词下推)
# df = pd.read_parquet('data.parquet', filters=[('value', '>', 100)])
示例 3:内存优化对比
def compare_memory_usage():
"""比较不同类型的内存使用"""
n = 1_000_000
# 整数类型
int64_df = pd.DataFrame({'col': range(n)})
int32_df = pd.DataFrame({'col': pd.array(range(n), dtype='int32')})
int16_df = pd.DataFrame({'col': pd.array(range(n), dtype='int16')})
print("整数类型内存对比:")
print(f"int64: {int64_df.memory_usage().sum() / 1024**2:.2f} MB")
print(f"int32: {int32_df.memory_usage().sum() / 1024**2:.2f} MB")
print(f"int16: {int16_df.memory_usage().sum() / 1024**2:.2f} MB")
# 字符串类型
categories = ['Category_A', 'Category_B', 'Category_C', 'Category_D']
str_df = pd.DataFrame({'col': np.random.choice(categories, n)})
cat_df = pd.DataFrame({'col': pd.Categorical(np.random.choice(categories, n))})
print("\n字符串类型内存对比:")
print(f"object: {str_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"category: {cat_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
compare_memory_usage()
性能优化清单
数据读取
- 指定数据类型
dtype - 只读取需要的列
usecols - 使用更快的格式(Parquet/Feather)
- 大文件分块读取
数据类型
- 使用最小的整数类型
- 使用
float32代替float64 - 字符串列使用
category - 布尔列使用
bool
操作优化
- 使用向量化操作
- 避免循环和
apply - 使用内置方法
- 缓存中间结果
索引优化
- 排序索引
- 使用唯一索引
- 避免链式索引
- 使用
loc代替[]
内存管理
- 及时删除不需要的数据
- 使用
inplace参数 - 定期垃圾回收
- 监控内存使用
小结
核心原则:
- 向量化优先:避免循环,使用向量化操作
- 类型优化:使用最小的数据类型
- 减少复制:使用 inplace 和引用
- 合理存储:选择合适的文件格式
常用优化方法:
- 类型转换:
astype() - 分类类型:
category - 向量化:NumPy 函数
- 索引排序:
sort_index() - 内存监控:
memory_usage()
练习
- 创建一个大型 DataFrame,对比不同数据类型的内存使用
- 对比向量化操作和循环操作的性能差异
- 将 CSV 文件转换为 Parquet 格式,比较读取速度
- 使用
query()和普通布尔索引对比查询性能 - 实现一个分块处理大文件的函数
下一步
掌握了性能优化后,让我们学习 实战案例!