NumPy 排序与搜索
排序和搜索是数据处理中最常见的操作。NumPy 提供了高效且功能丰富的排序和搜索函数,能够处理各种复杂场景。本章将详细介绍这些函数的使用方法和最佳实践。
基本排序
sort 函数
np.sort 返回排序后的数组副本,不修改原数组:
import numpy as np
# 一维数组排序
arr = np.array([3, 1, 4, 1, 5, 9, 2, 6])
sorted_arr = np.sort(arr)
print(f"原数组: {arr}")
print(f"排序后: {sorted_arr}")
# 注意:原数组未改变
# 降序排序(使用切片)
sorted_desc = np.sort(arr)[::-1]
print(f"降序: {sorted_desc}")
原地排序
ndarray.sort() 方法直接修改原数组,不返回新数组:
import numpy as np
arr = np.array([3, 1, 4, 1, 5, 9, 2, 6])
arr.sort() # 原地排序,无返回值
print(f"排序后(原数组已改变): {arr}")
选择建议:
- 如果需要保留原数组,使用
np.sort() - 如果不需要原数组,使用
arr.sort()更节省内存
多维数组排序
import numpy as np
arr = np.array([[3, 1, 4],
[1, 5, 9],
[2, 6, 5]])
# 默认沿最后一个轴排序(每行单独排序)
sorted_default = np.sort(arr)
print(f"默认排序(每行):\n{sorted_default}")
# 沿 axis=0 排序(每列单独排序)
sorted_axis0 = np.sort(arr, axis=0)
print(f"\naxis=0 排序(每列):\n{sorted_axis0}")
# 展平后排序
sorted_flat = np.sort(arr, axis=None)
print(f"\n展平排序: {sorted_flat}")
排序算法选择
NumPy 支持多种排序算法:
import numpy as np
import time
arr = np.random.rand(10000)
# 不同排序算法的比较
algorithms = ['quicksort', 'mergesort', 'heapsort', 'stable']
for algo in algorithms:
start = time.time()
sorted_arr = np.sort(arr, kind=algo)
elapsed = time.time() - start
print(f"{algo:12s}: {elapsed:.6f}s")
算法特点对比:
| 算法 | 时间复杂度 | 稳定性 | 特点 |
|---|---|---|---|
quicksort | 不稳定 | 默认,通常最快 | |
mergesort | 稳定 | 需要额外内存 | |
heapsort | 不稳定 | 内存效率高 | |
stable | 稳定 | 自动选择稳定算法 |
稳定性指的是相等元素的相对顺序是否保持不变。这在处理结构化数据时很重要:
import numpy as np
# 稳定排序示例
dt = np.dtype([('name', 'U10'), ('score', 'i4')])
arr = np.array([('Alice', 90), ('Bob', 90), ('Carol', 85)], dtype=dt)
# 不稳定排序:Bob 和 Alice 的顺序可能改变
sorted_unstable = np.sort(arr, order='score', kind='quicksort')
print(f"不稳定排序: {sorted_unstable}")
# 稳定排序:保持原始相对顺序
sorted_stable = np.sort(arr, order='score', kind='stable')
print(f"稳定排序: {sorted_stable}")
排序索引
argsort - 返回排序索引
argsort 返回排序后的索引数组,而不是排序后的值:
import numpy as np
arr = np.array([30, 10, 40, 20])
indices = np.argsort(arr)
print(f"原数组: {arr}")
print(f"排序索引: {indices}") # [1, 3, 0, 2]
print(f"排序后: {arr[indices]}") # [10, 20, 30, 40]
# 应用:获取前 k 小的元素及其索引
k = 2
smallest_k_indices = np.argsort(arr)[:k]
print(f"\n前 {k} 小的元素: {arr[smallest_k_indices]}")
print(f"它们的索引: {smallest_k_indices}")
argsort 的实际应用
import numpy as np
# 示例:对学生成绩排序
names = np.array(['Alice', 'Bob', 'Carol', 'David', 'Eve'])
scores = np.array([85, 92, 78, 92, 88])
# 按成绩排序
sorted_indices = np.argsort(scores)[::-1] # 降序
print("成绩排名:")
for i, idx in enumerate(sorted_indices, 1):
print(f"第{i}名: {names[idx]} - {scores[idx]}分")
# 找成绩最高和最低的学生
best_idx = np.argmax(scores)
worst_idx = np.argmin(scores)
print(f"\n最高分: {names[best_idx]} - {scores[best_idx]}")
print(f"最低分: {names[worst_idx]} - {scores[worst_idx]}")
多维数组的 argsort
import numpy as np
arr = np.array([[3, 1, 4],
[1, 5, 9]])
# 每行的排序索引
row_indices = np.argsort(arr, axis=1)
print(f"每行排序索引:\n{row_indices}")
# 应用:将每行按索引重新排列
sorted_arr = np.take_along_axis(arr, row_indices, axis=1)
print(f"\n排序后:\n{sorted_arr}")
# 每列的排序索引
col_indices = np.argsort(arr, axis=0)
print(f"\n每列排序索引:\n{col_indices}")
lexsort - 多键排序
lexsort 用于根据多个键进行排序,最后一个键是主排序键:
import numpy as np
# 学生数据:先按班级排序,同班级按成绩排序
names = np.array(['Alice', 'Bob', 'Carol', 'David', 'Eve'])
classes = np.array([2, 1, 2, 1, 2]) # 班级
scores = np.array([85, 92, 88, 78, 90]) # 成绩
# lexsort: 最后一个参数是主排序键
# 先按班级排序,同班级按成绩排序
indices = np.lexsort((scores, classes)) # 注意顺序是反的
print("按班级和成绩排序:")
for idx in indices:
print(f"班级{classes[idx]}: {names[idx]} - {scores[idx]}分")
结构化数组的排序
import numpy as np
# 创建结构化数组
dt = np.dtype([('name', 'U10'), ('class', 'i4'), ('score', 'i4')])
students = np.array([
('Alice', 2, 85),
('Bob', 1, 92),
('Carol', 2, 88),
('David', 1, 78),
('Eve', 2, 90)
], dtype=dt)
# 按单个字段排序
sorted_by_score = np.sort(students, order='score')
print("按成绩排序:")
print(sorted_by_score)
# 按多个字段排序
sorted_multi = np.sort(students, order=['class', 'score'])
print("\n按班级和成绩排序:")
print(sorted_multi)
搜索函数
argmax 和 argmin
返回最大值和最小值的索引:
import numpy as np
arr = np.array([3, 1, 4, 1, 5, 9, 2, 6])
# 最大值的索引
print(f"最大值索引: {np.argmax(arr)}") # 5
print(f"最大值: {arr[np.argmax(arr)]}") # 9
# 最小值的索引
print(f"最小值索引: {np.argmin(arr)}") # 1
print(f"最小值: {arr[np.argmin(arr)]}") # 1
# 多维数组
arr_2d = np.array([[3, 1, 4],
[1, 5, 9]])
print(f"\n展平后的最大值索引: {np.argmax(arr_2d)}")
# 沿轴求最大值索引
print(f"每行最大值索引: {np.argmax(arr_2d, axis=1)}") # [2, 2]
print(f"每列最大值索引: {np.argmax(arr_2d, axis=0)}") # [0, 1, 1]
注意:如果有多个最大值,argmax 只返回第一个:
import numpy as np
arr = np.array([1, 5, 3, 5, 2])
print(f"第一个最大值的索引: {np.argmax(arr)}") # 1
# 找所有最大值的索引
max_val = np.max(arr)
all_max_indices = np.where(arr == max_val)[0]
print(f"所有最大值的索引: {all_max_indices}") # [1, 3]
where 函数
np.where 根据条件返回元素索引或进行条件选择:
import numpy as np
arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
# 返回满足条件的索引
indices = np.where(arr > 5)
print(f"大于5的索引: {indices}")
print(f"大于5的元素: {arr[indices]}")
# 三元选择:where(condition, x, y)
# 满足条件取 x,否则取 y
result = np.where(arr > 5, arr * 2, arr * -1)
print(f"\n条件选择结果: {result}")
# 多维数组
arr_2d = np.array([[1, 2, 3],
[4, 5, 6]])
rows, cols = np.where(arr_2d > 3)
print(f"\n大于3的位置: 行{rows}, 列{cols}")
nonzero 函数
返回非零元素的索引:
import numpy as np
arr = np.array([0, 1, 0, 2, 0, 3, 0])
# 非零元素索引
indices = np.nonzero(arr)
print(f"非零索引: {indices}")
print(f"非零元素: {arr[indices]}")
# 布尔数组的非零索引(True 被视为非零)
bool_arr = np.array([True, False, True, False, True])
true_indices = np.nonzero(bool_arr)
print(f"\nTrue 的索引: {true_indices}")
# 二维数组
arr_2d = np.array([[0, 1, 0],
[2, 0, 3]])
rows, cols = np.nonzero(arr_2d)
print(f"\n非零位置: 行{rows}, 列{cols}")
flatnonzero 函数
返回展平数组的非零索引:
import numpy as np
arr = np.array([[0, 1, 0],
[2, 0, 3]])
# 展平后的非零索引
indices = np.flatnonzero(arr)
print(f"展平后的非零索引: {indices}")
# 等价于
indices_eq = np.nonzero(arr.flatten())[0]
print(f"等价方式: {indices_eq}")
extract 函数
从数组中提取满足条件的元素:
import numpy as np
arr = np.arange(12).reshape((3, 4))
# 提取偶数
condition = arr % 2 == 0
evens = np.extract(condition, arr)
print(f"偶数: {evens}")
# 与布尔索引等价
evens_v2 = arr[condition]
print(f"布尔索引方式: {evens_v2}")
二分搜索
searchsorted 函数
在已排序数组中查找插入位置,使用二分查找算法:
import numpy as np
arr = np.array([1, 3, 5, 7, 9]) # 必须是已排序的
# 查找插入位置(保持有序)
x = 4
pos = np.searchsorted(arr, x)
print(f"{x} 应插入到位置 {pos}") # 2
# 验证
arr_inserted = np.insert(arr, pos, x)
print(f"插入后: {arr_inserted}")
# 多个值
values = [2, 4, 6, 8]
positions = np.searchsorted(arr, values)
print(f"\n多值插入位置: {positions}")
side 参数
控制插入位置是在匹配元素的左侧还是右侧:
import numpy as np
arr = np.array([1, 2, 2, 2, 3])
x = 2
# 'left': 在匹配元素的左侧插入
pos_left = np.searchsorted(arr, x, side='left')
print(f"'left': {x} 插入位置 {pos_left}") # 1
# 'right': 在匹配元素的右侧插入
pos_right = np.searchsorted(arr, x, side='right')
print(f"'right': {x} 插入位置 {pos_right}") # 4
# 应用:统计每个值的出现次数
values = np.array([1, 2, 3, 4])
lefts = np.searchsorted(arr, values, side='left')
rights = np.searchsorted(arr, values, side='right')
counts = rights - lefts
print(f"\n值 {values} 的出现次数: {counts}")
searchsorted 实际应用
import numpy as np
# 应用:数据分桶(分位数分组)
data = np.random.rand(100) * 100
# 定义分位数边界
bins = np.array([0, 25, 50, 75, 100])
# 确定每个数据点属于哪个区间
bucket_indices = np.searchsorted(bins, data, side='right') - 1
bucket_indices = np.clip(bucket_indices, 0, len(bins) - 2)
print("分位数分布:")
for i in range(len(bins) - 1):
count = np.sum(bucket_indices == i)
print(f"[{bins[i]}, {bins[i+1]}): {count} 个数据点")
# 应用:时间序列数据的时间区间查找
timestamps = np.array([100, 200, 300, 400, 500]) # 时间戳
events = np.array([150, 250, 350, 450]) # 事件时间
# 找每个事件前的最后一个时间戳索引
prev_timestamp_idx = np.searchsorted(timestamps, events) - 1
print(f"\n每个事件前的时间戳索引: {prev_timestamp_idx}")
分区操作
partition 函数
partition 将数组分为两部分:前 k 小的元素在左边,其他在右边,但不完全排序:
import numpy as np
arr = np.array([7, 2, 9, 1, 5, 4, 8, 3, 6])
# 第 k 小的元素在位置 k,左边都小于等于它,右边都大于等于它
k = 4
partitioned = np.partition(arr, k)
print(f"原数组: {arr}")
print(f"partition 后: {partitioned}")
print(f"第 {k} 小的元素: {partitioned[k]}")
# 获取前 k 小的元素(不保证有序)
k_smallest = np.partition(arr, k)[:k]
print(f"\n前 {k} 小的元素(无序): {k_smallest}")
# 如果需要有序,再排序
k_smallest_sorted = np.sort(k_smallest)
print(f"前 {k} 小的元素(有序): {k_smallest_sorted}")
argpartition 函数
返回分区后的索引:
import numpy as np
arr = np.array([7, 2, 9, 1, 5, 4, 8, 3, 6])
k = 3
# 获取前 k 小的元素的索引
indices = np.argpartition(arr, k)[:k]
print(f"前 {k} 小的索引: {indices}")
print(f"前 {k} 小的值: {arr[indices]}")
# 获取前 k 大的元素
k_largest_indices = np.argpartition(arr, -k)[-k:]
print(f"\n前 {k} 大的索引: {k_largest_indices}")
print(f"前 {k} 大的值: {arr[k_largest_indices]}")
partition vs sort 性能对比
import numpy as np
import time
arr = np.random.rand(1000000)
k = 10
# sort 方法
start = time.time()
sorted_arr = np.sort(arr)
k_smallest_sort = sorted_arr[:k]
time_sort = time.time() - start
# partition 方法
start = time.time()
k_smallest_partition = np.partition(arr, k)[:k]
k_smallest_partition = np.sort(k_smallest_partition)
time_partition = time.time() - start
print(f"sort 耗时: {time_sort:.6f}s")
print(f"partition 耗时: {time_partition:.6f}s")
print(f"结果一致: {np.allclose(sorted(k_smallest_sort), sorted(k_smallest_partition))}")
性能差异的原因:sort 是 ,partition 是 。当只需要前 k 小的元素时,partition 更高效。
计数函数
count_nonzero
统计非零元素数量:
import numpy as np
arr = np.array([0, 1, 0, 2, 0, 3, 0])
count = np.count_nonzero(arr)
print(f"非零元素数量: {count}") # 3
# 统计满足条件的元素数量
count_positive = np.count_nonzero(arr > 0)
print(f"正数数量: {count_positive}")
# 多维数组沿轴计数
arr_2d = np.array([[0, 1, 0],
[2, 0, 3]])
print(f"\n每行非零数: {np.count_nonzero(arr_2d, axis=1)}") # [1, 2]
print(f"每列非零数: {np.count_nonzero(arr_2d, axis=0)}") # [1, 1, 1]
bincount
统计每个非负整数值的出现次数:
import numpy as np
arr = np.array([0, 1, 1, 2, 2, 2, 3, 3, 3, 3])
counts = np.bincount(arr)
print(f"值: {np.arange(len(counts))}")
print(f"计数: {counts}")
# 指定 minlength
counts = np.bincount(arr, minlength=5)
print(f"\n指定 minlength=5: {counts}")
# 带权重的计数
values = np.array([0, 1, 1, 2, 2])
weights = np.array([1, 2, 3, 4, 5])
weighted_counts = np.bincount(values, weights=weights)
print(f"\n加权计数: {weighted_counts}")
# 0: weight=1, 1: weight=2+3=5, 2: weight=4+5=9
histogram
计算直方图统计:
import numpy as np
data = np.random.randn(1000)
# 基本直方图
hist, bin_edges = np.histogram(data, bins=10)
print("直方图统计:")
for i in range(len(hist)):
print(f"[{bin_edges[i]:.2f}, {bin_edges[i+1]:.2f}): {hist[i]}")
# 指定分桶边界
bins = [-3, -2, -1, 0, 1, 2, 3]
hist, _ = np.histogram(data, bins=bins)
print(f"\n自定义分桶: {hist}")
# 二维直方图
x = np.random.randn(1000)
y = np.random.randn(1000)
hist_2d, x_edges, y_edges = np.histogram2d(x, y, bins=5)
print(f"\n2D 直方图形状: {hist_2d.shape}")
集合操作
unique 函数
找出数组中的唯一值:
import numpy as np
arr = np.array([1, 2, 2, 3, 3, 3, 4, 4, 4, 4])
# 唯一值
unique_vals = np.unique(arr)
print(f"唯一值: {unique_vals}")
# 返回索引
unique_vals, indices = np.unique(arr, return_index=True)
print(f"\n唯一值首次出现索引: {indices}")
# 返回逆索引
unique_vals, inverse = np.unique(arr, return_inverse=True)
print(f"\n逆索引: {inverse}")
print(f"还原: {unique_vals[inverse]}")
# 返回计数
unique_vals, counts = np.unique(arr, return_counts=True)
print(f"\n唯一值及计数:")
for v, c in zip(unique_vals, counts):
print(f" {v}: {c}次")
# 多维数组
arr_2d = np.array([[1, 2], [2, 3], [1, 2]])
unique_2d = np.unique(arr_2d, axis=0) # 按行去重
print(f"\n按行去重:\n{unique_2d}")
集合运算函数
import numpy as np
a = np.array([1, 2, 3, 4, 5])
b = np.array([3, 4, 5, 6, 7])
# 交集
intersection = np.intersect1d(a, b)
print(f"交集: {intersection}")
# 并集
union = np.union1d(a, b)
print(f"并集: {union}")
# 差集(在 a 不在 b)
difference = np.setdiff1d(a, b)
print(f"a - b: {difference}")
# 对称差集(并集减交集)
symmetric_diff = np.setxor1d(a, b)
print(f"对称差集: {symmetric_diff}")
# 检查元素是否在集合中
mask = np.isin(a, b)
print(f"\na 中哪些在 b 里: {mask}")
print(f"对应元素: {a[mask]}")
实战示例
示例1:查找 Top-K 元素
import numpy as np
def find_top_k(arr, k, largest=True):
"""查找数组中最大/最小的 k 个元素"""
if largest:
indices = np.argpartition(arr, -k)[-k:]
indices = indices[np.argsort(arr[indices])][::-1]
else:
indices = np.argpartition(arr, k)[:k]
indices = indices[np.argsort(arr[indices])]
return indices, arr[indices]
# 示例
data = np.random.randint(0, 100, 20)
print(f"数据: {data}")
# 最大的 5 个
idx, vals = find_top_k(data, 5, largest=True)
print(f"\n最大的 5 个: {vals}")
print(f"索引: {idx}")
# 最小的 5 个
idx, vals = find_top_k(data, 5, largest=False)
print(f"\n最小的 5 个: {vals}")
print(f"索引: {idx}")
示例2:数据离散化
import numpy as np
def discretize(data, bins, labels=None):
"""将连续数据离散化为分箱"""
# 使用 searchsorted 找分箱位置
indices = np.digitize(data, bins) - 1
indices = np.clip(indices, 0, len(bins) - 1)
if labels is not None:
return np.array(labels)[indices]
return indices
# 示例:将年龄分组
ages = np.array([5, 12, 18, 25, 35, 45, 55, 65, 75, 85])
bins = [0, 18, 35, 60, 100]
labels = ['儿童青少年', '青年', '中年', '老年']
groups = discretize(ages, bins, labels)
print("年龄分组:")
for age, group in zip(ages, groups):
print(f" {age}岁: {group}")
示例3:去重并保持原顺序
import numpy as np
def unique_preserve_order(arr):
"""去重但保持元素的首次出现顺序"""
_, indices = np.unique(arr, return_index=True)
return arr[np.sort(indices)]
# 示例
arr = np.array([3, 1, 2, 1, 3, 2, 4, 1])
result = unique_preserve_order(arr)
print(f"原数组: {arr}")
print(f"去重(保持顺序): {result}")
# 对比普通的 unique(会排序)
result_sorted = np.unique(arr)
print(f"unique(排序后): {result_sorted}")
示例4:快速查找重复元素
import numpy as np
def find_duplicates(arr):
"""找出数组中的重复元素"""
sorted_arr = np.sort(arr)
# 相邻元素相同的即为重复
is_duplicate = np.concatenate([
[False],
(sorted_arr[:-1] == sorted_arr[1:])
])
return np.unique(sorted_arr[is_duplicate])
# 示例
arr = np.array([1, 2, 3, 2, 4, 3, 5, 3, 6, 1])
duplicates = find_duplicates(arr)
print(f"数组: {arr}")
print(f"重复元素: {duplicates}")
小结
本章介绍了 NumPy 中排序和搜索的核心函数:
| 功能 | 函数 | 说明 |
|---|---|---|
| 排序 | sort, argsort | 排序和排序索引 |
| 多键排序 | lexsort | 按多个键排序 |
| 查找极值 | argmax, argmin | 最大最小值索引 |
| 条件查找 | where, nonzero | 条件筛选 |
| 二分查找 | searchsorted | 有序数组查找 |
| 分区 | partition, argpartition | 高效获取 Top-K |
| 计数 | bincount, histogram | 值计数和直方图 |
| 集合操作 | unique, intersect1d 等 | 集合运算 |
掌握这些函数,能够高效地处理各种数据筛选和分析任务。
练习
- 找出数组中第二大的元素
- 实现一个函数,返回数组中出现频率最高的 k 个元素
- 将一个数组按奇偶分成两部分,奇数在前,偶数在后
- 实现一个简单的推荐系统:找出与给定用户最相似的 k 个用户
- 统计一个文本中各字符的出现频率,并按频率排序