跳到主要内容

性能优化

处理大规模数据时,Pandas 的性能优化至关重要。本章介绍提升 Pandas 性能的各种技巧和最佳实践。

性能诊断

计时方法

import pandas as pd
import numpy as np
import time

# 使用 time 模块
start = time.time()
# 执行代码
result = df.groupby('column').mean()
end = time.time()
print(f"耗时: {end - start:.4f} 秒")

# 使用 Jupyter magic 命令
# %timeit df.groupby('column').mean()
# %%timeit
# 代码块

# 使用 timeit 模块
import timeit
timeit.timeit(lambda: df.groupby('column').mean(), number=100)

内存使用分析

# 查看内存使用
df = pd.DataFrame({'A': range(1000000), 'B': range(1000000)})

print(df.memory_usage())
# Index 128
# A 8000000
# B 8000000

print(df.memory_usage(deep=True)) # 深度分析(包括字符串)

# 查看总内存
print(f"总内存: {df.memory_usage().sum() / 1024**2:.2f} MB")

# 查看数据类型信息
print(df.info(memory_usage='deep'))

向量化操作

避免循环

# ❌ 慢:使用循环
df = pd.DataFrame({'A': range(1000000), 'B': range(1000000)})

result = []
for i in range(len(df)):
result.append(df['A'].iloc[i] + df['B'].iloc[i])
df['C'] = result

# ✅ 快:向量化操作
df['C'] = df['A'] + df['B'] # 快几十倍

使用 NumPy 函数

# ❌ 慢:使用 apply
df['log_A'] = df['A'].apply(np.log)

# ✅ 快:使用 NumPy 向量化
df['log_A'] = np.log(df['A'])

# 其他常用 NumPy 函数
df['sqrt'] = np.sqrt(df['A'])
df['exp'] = np.exp(df['A'])
df['abs'] = np.abs(df['A'])
df['where'] = np.where(df['A'] > 500000, 'high', 'low')

内置方法优于 apply

# ❌ 慢:使用 apply
df['sum'] = df[['A', 'B']].apply(sum, axis=1)

# ✅ 快:使用内置方法
df['sum'] = df[['A', 'B']].sum(axis=1)

# 字符串操作
# ❌ 慢
df['upper'] = df['name'].apply(lambda x: x.upper())

# ✅ 快:使用 str 访问器
df['upper'] = df['name'].str.upper()

数据类型优化

使用合适的数据类型

# 创建示例数据
df = pd.DataFrame({
'id': range(1000000),
'small_int': np.random.randint(0, 100, 1000000),
'small_float': np.random.random(1000000),
'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000)
})

# 查看默认类型
print(df.dtypes)
# id int64
# small_int int64
# small_float float64
# category object

# 优化整数类型
df['small_int'] = df['small_int'].astype('int8') # 0-255 范围

# 优化浮点类型
df['small_float'] = df['small_float'].astype('float32')

# 转换为分类类型
df['category'] = df['category'].astype('category')

# 查看优化后的内存使用
print(f"优化前: 45.77 MB")
print(f"优化后: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

类型选择指南

数据范围推荐类型
0 到 255uint8
-128 到 127int8
0 到 65535uint16
-32768 到 32767int16
0 到 4294967295uint32
-2147483648 到 2147483647int32
小数(低精度)float32
小数(高精度)float64
有限值集合(字符串)category

使用分类类型

# 对于重复值多的字符串列
df['status'] = df['status'].astype('category')

# 指定分类顺序
df['level'] = pd.Categorical(
df['level'],
categories=['low', 'medium', 'high'],
ordered=True
)

# 分类类型的优势
# 1. 内存占用更小
# 2. 某些操作更快
# 3. 支持排序

# 适用场景
# - 基数比(唯一值/总行数)< 50%
# - 字符串列且重复值多
# - 需要定义顺序的类别

索引优化

排序索引

# 排序后的索引查询更快
df = df.sort_index()

# 检查是否已排序
print(df.index.is_monotonic_increasing)

唯一索引

# 唯一索引的查询性能更好
print(df.index.is_unique)

# 如果可能,使用唯一值作为索引
df = df.set_index('unique_id')

避免链式索引

# ❌ 慢且有警告
df[df['A'] > 100]['B'] = 10

# ✅ 快且正确
df.loc[df['A'] > 100, 'B'] = 10

# ❌ 慢
df['A'][df['A'] > 100] = 10

# ✅ 快
df.loc[df['A'] > 100, 'A'] = 10

数据读取优化

分块读取

# 对于大文件,分块读取
chunk_size = 100000
chunks = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# 处理每个块
processed = chunk[chunk['value'] > 0]
chunks.append(processed)

df = pd.concat(chunks, ignore_index=True)

指定数据类型

# 读取时指定数据类型,避免推断开销
dtypes = {
'id': 'int32',
'name': 'str',
'category': 'category',
'value': 'float32'
}

df = pd.read_csv('data.csv', dtype=dtypes)

只读取需要的列

# 只读取需要的列
df = pd.read_csv('data.csv', usecols=['col1', 'col2', 'col3'])

# 只读取前几行预览
df = pd.read_csv('data.csv', nrows=1000)

# 跳过不需要的行
df = pd.read_csv('data.csv', skiprows=range(1, 100)) # 跳过前99行数据

使用更快的文件格式

# CSV vs Parquet vs Feather vs Pickle

# 写入
df.to_csv('data.csv', index=False)
df.to_parquet('data.parquet', index=False)
df.to_feather('data.feather')
df.to_pickle('data.pkl')

# 读取性能比较(大致排序)
# 1. Feather - 最快(但文件较大)
# 2. Pickle - 快
# 3. Parquet - 中等(压缩好,适合存储)
# 4. CSV - 最慢(但通用性好)

# 推荐
# - 临时数据:Feather
# - 长期存储:Parquet
# - 数据交换:CSV

查询优化

使用 query 方法

# 对于复杂条件,query 可能更快且代码更清晰
# ❌ 普通方式
result = df[(df['A'] > 100) & (df['B'] < 50) & (df['C'] == 'value')]

# ✅ query 方法
result = df.query('A > 100 and B < 50 and C == "value"')

# 使用外部变量
threshold = 100
result = df.query('A > @threshold')

使用 isin 代替多个 OR

# ❌ 慢:多个 OR
result = df[(df['category'] == 'A') |
(df['category'] == 'B') |
(df['category'] == 'C')]

# ✅ 快:使用 isin
result = df[df['category'].isin(['A', 'B', 'C'])]

避免重复计算

# ❌ 重复计算
df['new_col'] = df['A'] * df['B'] / (df['A'] * df['B'] + 1)

# ✅ 缓存中间结果
temp = df['A'] * df['B']
df['new_col'] = temp / (temp + 1)

分组聚合优化

使用内置聚合函数

# ❌ 慢:使用 apply
result = df.groupby('category')['value'].apply(np.mean)

# ✅ 快:使用内置方法
result = df.groupby('category')['value'].mean()

# 或使用 agg
result = df.groupby('category').agg({'value': 'mean'})

预过滤数据

# ❌ 慢:在分组后过滤
result = df.groupby('category').filter(lambda x: len(x) > 100)

# ✅ 快:先过滤再分组
counts = df['category'].value_counts()
valid_categories = counts[counts > 100].index
result = df[df['category'].isin(valid_categories)].groupby('category').mean()

使用 categorize 加速分组

# 对于大数据集,分类类型可以加速分组
df['category'] = df['category'].astype('category')
result = df.groupby('category')['value'].mean()

内存管理

释放内存

# 删除不需要的变量
del large_df

# 强制垃圾回收
import gc
gc.collect()

# 删除不需要的列
df = df.drop(['unused_col1', 'unused_col2'], axis=1)

使用 inplace

# 对于大数据,使用 inplace 避免复制
df.drop('column', axis=1, inplace=True)
df.sort_values('column', inplace=True)

# 注意:不是所有操作都支持 inplace

及时转换类型

# 读取后立即优化类型
df = pd.read_csv('data.csv')
df = df.astype({
'int_col': 'int32',
'float_col': 'float32',
'str_col': 'category'
})

并行处理

使用多核处理

# 安装:pip install swifter
import swifter

# 并行应用函数
df['new_col'] = df['col'].swifter.apply(lambda x: complex_function(x))

# 并行 apply
result = df.swifter.apply(lambda row: row['A'] + row['B'], axis=1)

使用 modin

# 安装:pip install modin
# Modin 使用 Ray 或 Dask 进行并行处理

# 替换 import pandas as pd
import modin.pandas as pd

# 代码基本不变,自动并行化
df = pd.read_csv('large_file.csv')
result = df.groupby('column').mean()

实战示例

示例 1:大数据处理优化

import pandas as pd
import numpy as np

# 创建大型测试数据
np.random.seed(42)
n = 10_000_000

# ❌ 慢:直接创建
# df = pd.DataFrame({
# 'id': range(n),
# 'category': np.random.choice(['A', 'B', 'C', 'D'], n),
# 'value': np.random.randn(n)
# })

# ✅ 快:优化类型后创建
df = pd.DataFrame({
'id': pd.array(range(n), dtype='int32'),
'category': pd.Categorical(np.random.choice(['A', 'B', 'C', 'D'], n)),
'value': pd.array(np.random.randn(n), dtype='float32')
})

print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 优化聚合操作
# ❌ 慢
# result = df.groupby('category')['value'].apply(lambda x: x.mean())

# ✅ 快
result = df.groupby('category')['value'].mean()
print(result)

示例 2:文件处理优化

# 模拟大文件处理

# 方法 1:分块处理
def process_large_file(filepath, output_path, chunksize=100000):
"""分块处理大文件"""
results = []

for i, chunk in enumerate(pd.read_csv(filepath, chunksize=chunksize)):
# 处理每个块
chunk = chunk[chunk['value'] > 0] # 过滤
chunk['processed'] = chunk['value'] * 2 # 计算
results.append(chunk)

# 可选:定期写入
if (i + 1) % 10 == 0:
pd.concat(results).to_csv(
output_path,
mode='a',
header=not pd.io.common.file_exists(output_path)
)
results = []

return pd.concat(results)

# 方法 2:使用 Parquet 格式
# 保存
# df.to_parquet('data.parquet')

# 读取特定列
# df = pd.read_parquet('data.parquet', columns=['col1', 'col2'])

# 过滤读取(Parquet 支持谓词下推)
# df = pd.read_parquet('data.parquet', filters=[('value', '>', 100)])

示例 3:内存优化对比

def compare_memory_usage():
"""比较不同类型的内存使用"""
n = 1_000_000

# 整数类型
int64_df = pd.DataFrame({'col': range(n)})
int32_df = pd.DataFrame({'col': pd.array(range(n), dtype='int32')})
int16_df = pd.DataFrame({'col': pd.array(range(n), dtype='int16')})

print("整数类型内存对比:")
print(f"int64: {int64_df.memory_usage().sum() / 1024**2:.2f} MB")
print(f"int32: {int32_df.memory_usage().sum() / 1024**2:.2f} MB")
print(f"int16: {int16_df.memory_usage().sum() / 1024**2:.2f} MB")

# 字符串类型
categories = ['Category_A', 'Category_B', 'Category_C', 'Category_D']
str_df = pd.DataFrame({'col': np.random.choice(categories, n)})
cat_df = pd.DataFrame({'col': pd.Categorical(np.random.choice(categories, n))})

print("\n字符串类型内存对比:")
print(f"object: {str_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"category: {cat_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

compare_memory_usage()

性能优化清单

数据读取

  • 指定数据类型 dtype
  • 只读取需要的列 usecols
  • 使用更快的格式(Parquet/Feather)
  • 大文件分块读取

数据类型

  • 使用最小的整数类型
  • 使用 float32 代替 float64
  • 字符串列使用 category
  • 布尔列使用 bool

操作优化

  • 使用向量化操作
  • 避免循环和 apply
  • 使用内置方法
  • 缓存中间结果

索引优化

  • 排序索引
  • 使用唯一索引
  • 避免链式索引
  • 使用 loc 代替 []

内存管理

  • 及时删除不需要的数据
  • 使用 inplace 参数
  • 定期垃圾回收
  • 监控内存使用

小结

核心原则

  1. 向量化优先:避免循环,使用向量化操作
  2. 类型优化:使用最小的数据类型
  3. 减少复制:使用 inplace 和引用
  4. 合理存储:选择合适的文件格式

常用优化方法

  • 类型转换:astype()
  • 分类类型:category
  • 向量化:NumPy 函数
  • 索引排序:sort_index()
  • 内存监控:memory_usage()

练习

  1. 创建一个大型 DataFrame,对比不同数据类型的内存使用
  2. 对比向量化操作和循环操作的性能差异
  3. 将 CSV 文件转换为 Parquet 格式,比较读取速度
  4. 使用 query() 和普通布尔索引对比查询性能
  5. 实现一个分块处理大文件的函数

下一步

掌握了性能优化后,让我们学习 实战案例