NumPy 文件操作
本章将介绍 NumPy 中读写数组数据的各种方法,包括二进制文件和文本文件。
二进制文件
NumPy 的二进制格式是存储和加载数组最高效的方式。
保存单个数组 - save
import numpy as np
import os
# 创建测试数组
arr = np.array([[1, 2, 3], [4, 5, 6]])
# 保存到 .npy 文件
np.save('data.npy', arr)
# 加载数据
loaded = np.load('data.npy')
print(f"加载的数据:\n{loaded}")
# 清理文件
os.remove('data.npy')
关键点:
save自动添加.npy扩展名- 文件是二进制格式,加载速度快
- 保存时保留数据类型和形状信息
保存多个数组 - savez
import numpy as np
# 创建多个数组
arr1 = np.array([1, 2, 3])
arr2 = np.array([[1, 2], [3, 4]])
arr3 = np.array(['hello', 'world'])
# 保存为 .npz 文件(多个数组打包)
np.savez('multi_data.npz', a=arr1, b=arr2, c=arr3)
# 加载 .npz 文件(返回类似字典的对象)
data = np.load('multi_data.npz')
print(f"keys: {list(data.keys())}")
print(f"a: {data['a']}")
print(f"b: {data['b']}")
print(f"c: {data['c']}")
data.close() # 关闭文件
# 或使用上下文管理器
with np.load('multi_data.npz') as data:
print(f"a: {data['a']}")
压缩保存 - savez_compressed
import numpy as np
# 创建较大的数组
large_arr = np.random.rand(1000, 1000)
# 普通保存
np.savez('large_normal.npz', data=large_arr)
# 压缩保存(文件更小,但保存/加载速度稍慢)
np.savez_compressed('large_compressed.npz', data=large_arr)
import os
size_normal = os.path.getsize('large_normal.npz')
size_compressed = os.path.getsize('large_compressed.npz')
print(f"普通: {size_normal / 1024 / 1024:.2f} MB")
print(f"压缩: {size_compressed / 1024 / 1024:.2f} MB")
# 清理
os.remove('large_normal.npz')
os.remove('large_compressed.npz')
使用建议:
- 对于数值型大数组,
savez_compressed可以节省大量磁盘空间 - 对于包含 Python 对象的数组,只能使用普通保存
- 注意:
allow_pickle=True默认允许 pickle,可能有安全风险
内存映射 - memmap
对于超大数组,不必一次性加载到内存,可以使用内存映射:
import numpy as np
import os
# 创建大数组
large_data = np.arange(10000000)
# 保存到文件
np.save('large_file.npy', large_data)
# 使用内存映射加载
mmap_arr = np.load('large_file.npy', mmap_mode='r')
print(f"形状: {mmap_arr.shape}")
# 内存映射支持切片操作(按需加载)
print(f"切片 [1000:1010]: {mmap_arr[1000:1010]}")
# 读取操作不会加载整个数组到内存
mmap_arr.close()
# 清理
os.remove('large_file.npy')
内存映射的优势:
- 可以处理超过内存容量的数组
- 只加载需要的部分到内存
- 适合只读场景或随机访问
文本文件
保存为文本 - savetxt
import numpy as np
# 创建示例数组
arr = np.array([[1.5, 2.3, 3.7],
[4.2, 5.1, 6.9]])
# 默认保存(空格分隔)
np.savetxt('data.txt', arr)
# 读取验证
loaded = np.loadtxt('data.txt')
print(f"默认格式:\n{loaded}")
# 指定分隔符(逗号)
np.savetxt('data.csv', arr, delimiter=',')
loaded_csv = np.loadtxt('data.csv', delimiter=',')
print(f"\nCSV格式:\n{loaded_csv}")
# 指定格式
np.savetxt('data_formatted.txt', arr, fmt='%.2f')
print("\n格式化保存完成")
# 添加表头和注释
np.savetxt('data_header.txt', arr,
header='Col1 Col2 Col3',
comments='# ',
fmt='%.2f')
print("带表头保存完成")
import os
for f in ['data.txt', 'data.csv', 'data_formatted.txt', 'data_header.txt']:
os.remove(f)
加载文本 - loadtxt
import numpy as np
import os
# 创建测试文件
content = """1.0 2.0 3.0
4.0 5.0 6.0
7.0 8.0 9.0"""
with open('test.txt', 'w') as f:
f.write(content)
# 基本加载
data = np.loadtxt('test.txt')
print(f"基本加载:\n{data}")
# 指定分隔符
csv_content = "1,2,3\n4,5,6\n7,8,9"
with open('test.csv', 'w') as f:
f.write(csv_content)
data_csv = np.loadtxt('test.csv', delimiter=',')
print(f"\nCSV加载:\n{data_csv}")
# 指定数据类型
data_float = np.loadtxt('test.txt', dtype=float)
print(f"\n数据类型: {data_float.dtype}")
# 跳过行和选择列
# 假设有标题行
header_content = """x y z
1 2 3
4 5 6"""
with open('test_header.txt', 'w') as f:
f.write(header_content)
data_skip = np.loadtxt('test_header.txt', skiprows=1)
print(f"\n跳过第一行:\n{data_skip}")
# 只加载特定列
data_cols = np.loadtxt('test_header.txt', skiprows=1, usecols=(0, 1))
print(f"\n只加载前两列:\n{data_cols}")
os.remove('test.txt')
os.remove('test.csv')
os.remove('test_header.txt')
处理缺失值 - genfromtxt
import numpy as np
import os
# 包含缺失值的数据
missing_content = """1.0 2.0 3.0
4.0 5.0
2.0 6.0
7.0 8.0"""
with open('test_missing.txt', 'w') as f:
f.write(missing_content)
# 默认会将缺失值标记为 nan
data = np.genfromtxt('test_missing.txt')
print(f"默认解析:\n{data}")
# 使用自定义缺失值标识
data_custom = np.genfromtxt('test_missing.txt',
missing_values=[' ', ''])
print(f"\n自定义缺失值:\n{data_custom}")
# 手动处理缺失值
data_filled = np.genfromtxt('test_missing.txt')
data_filled = np.nan_to_num(data_filled, nan=0) # 用0填充
print(f"\n填充缺失值后:\n{data_filled}")
os.remove('test_missing.txt')
genfromtxt vs loadtxt:
loadtxt:简单快速,要求数据完整genfromtxt:支持缺失值、复杂分隔符、自动类型检测,但速度较慢
其他格式
从字符串创建数组
import numpy as np
# 从字符串创建
str_data = "1 2 3 4 5"
arr = np.fromstring(str_data, sep=' ')
print(f"fromstring: {arr}")
# 解析格式化字符串
data = np.fromstring("1,2,3,4,5", sep=',')
print(f"逗号分隔: {data}")
# 二进制字符串
binary_str = b'\x01\x02\x03\x04'
arr_bin = np.frombuffer(binary_str, dtype=np.uint8)
print(f"二进制: {arr_bin}")
tofile 和 fromfile(不推荐)
import numpy as np
import os
arr = np.array([1, 2, 3, 4, 5])
# 保存(丢失了数据类型和形状信息)
arr.tofile('raw_data.bin')
# 加载(需要手动指定数据类型)
loaded = np.fromfile('raw_data.bin', dtype=np.int64)
print(f"fromfile: {loaded}")
# 注意:这种方式丢失了类型信息,不推荐
os.remove('raw_data.bin')
数组转列表
import numpy as np
arr = np.array([[1, 2, 3], [4, 5, 6]])
# 转换为 Python 列表
py_list = arr.tolist()
print(f"列表: {py_list}")
print(f"类型: {type(py_list)}")
# 嵌套层级保持
print(f"嵌套: {type(py_list[0])}")
# 转换为字符串表示
str_repr = repr(arr)
print(f"repr: {str_repr}")
# 格式化字符串
str_formatted = np.array2string(arr, precision=2, separator=',')
print(f"格式化: {str_formatted}")
实战示例:数据持久化工作流
import numpy as np
import os
class DataManager:
"""数据管理器示例"""
def __init__(self, base_dir='./data'):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
def save_training_data(self, X, y, epoch):
"""保存训练数据"""
filename = f'training_epoch_{epoch}.npz'
filepath = os.path.join(self.base_dir, filename)
np.savez(filepath, X=X, y=y, epoch=epoch)
print(f"已保存: {filepath}")
return filepath
def load_training_data(self, epoch):
"""加载训练数据"""
filename = f'training_epoch_{epoch}.npz'
filepath = os.path.join(self.base_dir, filename)
if os.path.exists(filepath):
return np.load(filepath)
return None
def save_results_csv(self, results_dict):
"""保存结果为 CSV"""
filepath = os.path.join(self.base_dir, 'results.csv')
# 转换为 2D 数组
data = np.array([[k, v] for k, v in results_dict.items()])
np.savetxt(filepath, data, delimiter=',',
header='metric,value', comments='')
print(f"结果已保存: {filepath}")
def cleanup(self):
"""清理数据文件"""
for f in os.listdir(self.base_dir):
os.remove(os.path.join(self.base_dir, f))
print("数据已清理")
# 使用示例
manager = DataManager('./temp_data')
# 模拟保存
X_train = np.random.rand(100, 10)
y_train = np.random.randint(0, 2, 100)
manager.save_training_data(X_train, y_train, epoch=1)
# 模拟加载
data = manager.load_training_data(1)
if data is not None:
print(f"加载: X shape={data['X'].shape}, epoch={data['epoch']}")
# 保存结果
results = {'accuracy': 0.95, 'loss': 0.05, 'f1': 0.93}
manager.save_results_csv(results)
# 清理
manager.cleanup()
os.rmdir('./temp_data')
文件格式选择指南
| 场景 | 推荐格式 | 原因 |
|---|---|---|
| NumPy 专用数据 | .npy / .npz | 高效、保留类型信息 |
| 大数据集 | .npz 压缩 | 节省磁盘空间 |
| 超大数组 | memmap | 不占用内存 |
| 与其他程序共享 | CSV / TXT | 人类可读、通用性强 |
| 有缺失值的数据 | genfromtxt | 自动处理缺失值 |
| 临时缓存 | tofile / fromfile | 简单快速 |
小结
本章介绍了 NumPy 的文件 I/O 操作:
-
二进制格式:
save/load:单个数组savez/load:多个数组savez_compressed:压缩格式memmap:内存映射,处理大文件
-
文本格式:
savetxt/loadtxt:简单高效genfromtxt:处理缺失值
-
其他工具:
fromstring/frombuffer:从字符串创建tolist:转换为 Python 列表
根据具体场景选择合适的格式,可以提高数据处理的效率和可维护性。
练习
- 将一个 1000x1000 的随机矩阵保存为压缩的 npz 文件
- 将同样的矩阵保存为 CSV,对比文件大小和加载时间
- 实现一个函数,自动检测文件格式并加载
- 使用内存映射处理一个超过可用内存的大数组
- 创建包含缺失值的 CSV 文件,使用 genfromtxt 加载并处理