跳到主要内容

NumPy 掩码数组

在实际数据处理中,经常会遇到缺失值、无效值或需要忽略的数据。NumPy 的掩码数组(Masked Arrays)提供了一种优雅的方式来处理这类问题,它允许你"遮住"某些数据,使其在计算时被忽略。本章将详细介绍掩码数组的使用方法和应用场景。

什么是掩码数组?

掩码数组由两部分组成:

  • 数据数组:实际的数值数据
  • 掩码数组:布尔数组,True 表示对应位置的数据被"遮住"(无效)

当某个数据被掩码遮住后,在大多数计算中会被忽略,就像它不存在一样。

为什么需要掩码数组?

假设你在测量温度,但某些传感器的读数是无效的(比如传感器故障)。如果直接使用普通数组,你需要:

# 不使用掩码数组的处理方式
temperatures = [25.3, 9999, 24.1, 9999, 26.5] # 9999 表示无效值

# 计算平均值时需要手动排除无效值
valid_temps = [t for t in temperatures if t != 9999]
average = sum(valid_temps) / len(valid_temps)

使用掩码数组则更加优雅:

import numpy as np
import numpy.ma as ma

temperatures = ma.array([25.3, 9999, 24.1, 9999, 26.5],
mask=[False, True, False, True, False])
average = temperatures.mean() # 自动忽略被遮住的值

创建掩码数组

使用 ma.array

import numpy as np
import numpy.ma as ma

# 创建掩码数组,指定哪些值无效
data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, True, False])

print(f"数据: {data}")
print(f"掩码: {data.mask}")
print(f"有效数据: {data.compressed()}") # 返回未被遮住的数据

使用 ma.masked_values

根据特定值创建掩码:

import numpy as np
import numpy.ma as ma

# 常用于处理用特殊值表示缺失数据的场景
data = np.array([1.0, -999.0, 3.0, -999.0, 5.0])
masked_data = ma.masked_values(data, -999.0)

print(f"原始数据: {data}")
print(f"掩码数组: {masked_data}")
print(f"均值(自动排除 -999): {masked_data.mean():.2f}")

使用 ma.masked_where

根据条件创建掩码:

import numpy as np
import numpy.ma as ma

data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 遮住所有大于5的值
masked = ma.masked_where(data > 5, data)

print(f"原数据: {data}")
print(f"遮住 > 5 的值: {masked}")
print(f"有效数据: {masked.compressed()}")

# 遮住小于3或大于7的值
masked2 = ma.masked_where((data < 3) | (data > 7), data)
print(f"\n遮住 < 3 或 > 7 的值: {masked2}")

其他创建方法

import numpy as np
import numpy.ma as ma

# masked_equal: 遮住等于某值的元素
data1 = ma.masked_equal([1, 2, 1, 3, 1, 4], 1)
print(f"遮住等于1的值: {data1}")

# masked_not_equal: 遮住不等于某值的元素
data2 = ma.masked_not_equal([1, 2, 3, 4, 5], 3)
print(f"遮住不等于3的值: {data2}")

# masked_greater: 遮住大于某值的元素
data3 = ma.masked_greater([1, 5, 10, 15, 20], 10)
print(f"遮住 > 10 的值: {data3}")

# masked_less: 遮住小于某值的元素
data4 = ma.masked_less([1, 5, 10, 15, 20], 10)
print(f"遮住 < 10 的值: {data4}")

# masked_inside: 遮住某范围内的值
data5 = ma.masked_inside([1, 3, 5, 7, 9], 3, 7)
print(f"遮住 [3, 7] 范围内的值: {data5}")

# masked_outside: 遮住某范围外的值
data6 = ma.masked_outside([1, 3, 5, 7, 9], 3, 7)
print(f"遮住 [3, 7] 范围外的值: {data6}")

处理 NaN 和 Inf

import numpy as np
import numpy.ma as ma

# 处理 NaN
data_nan = np.array([1.0, np.nan, 3.0, np.nan, 5.0])
masked_nan = ma.masked_invalid(data_nan)

print(f"含 NaN 的数组: {data_nan}")
print(f"遮住无效值后: {masked_nan}")
print(f"均值: {masked_nan.mean():.2f}")

# 处理 Inf
data_inf = np.array([1.0, np.inf, 3.0, -np.inf, 5.0])
masked_inf = ma.masked_invalid(data_inf)

print(f"\n含 Inf 的数组: {data_inf}")
print(f"遮住无效值后: {masked_inf}")

掩码数组的基本操作

访问数据

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, True, False])

# 访问数据(使用 data 属性获取原始数据)
print(f"原始数据: {data.data}")
print(f"掩码: {data.mask}")

# 访问单个元素
print(f"第一个元素: {data[0]}")
print(f"第二个元素(被遮住): {data[1]}") # 显示为 --

# 检查是否被遮住
print(f"\n第一个元素是否被遮住: {ma.is_masked(data[0])}")
print(f"第二个元素是否被遮住: {ma.is_masked(data[1])}")

修改掩码

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5])
print(f"初始状态: {data}")

# 手动遮住某些值
data[1] = ma.masked
data[3] = ma.masked
print(f"遮住索引 1 和 3: {data}")

# 取消遮住(恢复数据)
data[1] = 10 # 赋值会自动取消遮住
print(f"恢复索引 1: {data}")

# 使用掩码数组批量修改
data.mask = [False, False, True, False, False]
print(f"重新设置掩码: {data}")

获取有效数据

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, True, False])

# compressed() 返回未被遮住的数据(一维数组)
valid_data = data.compressed()
print(f"有效数据: {valid_data}")

# count() 统计有效元素数量
print(f"有效元素数量: {data.count()}")

# 获取有效数据的索引
valid_indices = np.where(~data.mask)[0]
print(f"有效数据索引: {valid_indices}")

掩码数组的计算

掩码数组最强大的特性是:被遮住的数据在计算中会被自动忽略

基本统计运算

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
mask=[False, True, False, True, False, True, False, True, False, True])

print(f"数据: {data}")
print(f"有效数据: {data.compressed()}")

print(f"\n统计运算(自动排除被遮住的数据):")
print(f"求和: {data.sum()}")
print(f"均值: {data.mean():.2f}")
print(f"标准差: {data.std():.2f}")
print(f"最小值: {data.min()}")
print(f"最大值: {data.max()}")
print(f"中位数: {ma.median(data):.2f}")

沿轴计算

import numpy as np
import numpy.ma as ma

# 二维掩码数组
data_2d = ma.array(
[[1, 2, 3],
[4, 5, 6],
[7, 8, 9]],
mask=[[False, True, False],
[True, False, True],
[False, False, True]]
)

print(f"二维数组:\n{data_2d}")

print(f"\n沿轴 0(列)求和: {data_2d.sum(axis=0)}")
print(f"沿轴 1(行)求和: {data_2d.sum(axis=1)}")

print(f"\n沿轴 0 均值: {data_2d.mean(axis=0)}")
print(f"沿轴 1 均值: {data_2d.mean(axis=1)}")

# count 沿轴统计
print(f"\n每列有效数据数: {data_2d.count(axis=0)}")
print(f"每行有效数据数: {data_2d.count(axis=1)}")

算术运算

import numpy as np
import numpy.ma as ma

a = ma.array([1, 2, 3, 4], mask=[False, True, False, True])
b = ma.array([10, 20, 30, 40], mask=[True, False, False, True])

# 算术运算会传播掩码
print(f"a = {a}")
print(f"b = {b}")
print(f"a + b = {a + b}") # 任一操作数被遮住,结果就被遮住
print(f"a * 2 = {a * 2}") # 与标量运算
print(f"a * b = {a * b}")

与普通数组的运算

import numpy as np
import numpy.ma as ma

masked = ma.array([1, 2, 3, 4], mask=[False, True, False, True])
normal = np.array([10, 20, 30, 40])

# 掩码数组与普通数组运算
result = masked + normal
print(f"掩码数组 + 普通数组: {result}")
print(f"结果类型: {type(result)}")

# 结果仍然是掩码数组,掩码被保留

填充值

被遮住的数据可以用填充值替换,这在输出或保存数据时很有用。

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, True, False])

# 默认填充值
print(f"默认填充值: {data.fill_value}")

# 使用 filled() 方法填充
filled = data.filled(fill_value=-999)
print(f"填充后: {filled}")
print(f"类型: {type(filled)}") # 变为普通 ndarray

# 设置自定义填充值
data.fill_value = 0
print(f"\n自定义填充值: {data.filled()}")

# 创建时指定填充值
data2 = ma.array([1, 2, 3], mask=[False, True, False], fill_value=-1)
print(f"创建时指定填充值: {data2.filled()}")

高级操作

掩码的组合

import numpy as np
import numpy.ma as ma

data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# 创建多个掩码
mask1 = data > 5
mask2 = data < 3

# 组合掩码
combined_mask = mask1 | mask2 # 或
masked = ma.array(data, mask=combined_mask)
print(f"遮住 < 3 或 > 5: {masked}")

# 使用逻辑运算
masked2 = ma.masked_where((data < 3) | (data > 5), data)
print(f"等价写法: {masked2}")

设置和获取掩码

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5])

# 获取掩码
print(f"初始掩码: {data.mask}") # False 表示没有掩码

# 设置掩码(遮住部分数据)
data[[1, 3]] = ma.masked
print(f"修改后掩码: {data.mask}")

# 完全取消掩码
data.mask = ma.nomask
print(f"取消所有掩码: {data}")

掩码硬化和软化

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, False, False])

# 默认是软掩码:可以通过赋值取消遮住
data[1] = 10
print(f"软掩码,赋值后: {data}") # 索引1不再被遮住

# 硬化掩码
data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, False, False])
data.harden_mask()
data[1] = 10
print(f"硬掩码,赋值后: {data}") # 索引1仍然被遮住

# 软化掩码
data.soften_mask()
data[1] = 20
print(f"软掩码,赋值后: {data}") # 索引1不再被遮住

获取掩码数据的索引

import numpy as np
import numpy.ma as ma

data = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, True, False])

# nonzero: 返回未被遮住的非零元素索引
print(f"非零未遮住索引: {data.nonzero()}")

# where: 类似 np.where
indices = ma.where(data > 2)
print(f"大于 2 的索引: {indices}")

# 找到被遮住的位置
masked_indices = np.where(data.mask)[0]
print(f"被遮住的索引: {masked_indices}")

实际应用场景

场景1:气象数据处理

import numpy as np
import numpy.ma as ma

# 模拟一周的温度数据,某些天传感器故障
temperatures = np.array([25.3, -999, 24.1, 23.8, -999, 26.5, 27.1])
dates = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']

# 创建掩码数组,遮住无效值
temp_masked = ma.masked_values(temperatures, -999)

print("一周温度数据:")
for date, temp in zip(dates, temp_masked):
if ma.is_masked(temp):
print(f" {date}: 无数据")
else:
print(f" {date}: {temp:.1f}°C")

print(f"\n有效数据平均温度: {temp_masked.mean():.1f}°C")
print(f"最高温度: {temp_masked.max():.1f}°C")
print(f"最低温度: {temp_masked.min():.1f}°C")

场景2:科学实验数据处理

import numpy as np
import numpy.ma as ma

# 实验数据:可能有测量异常
measurements = np.array([10.2, 10.5, 50.0, 10.3, 10.4, -5.0, 10.1])

# 使用统计方法识别异常值:遮住偏离均值超过2倍标准差的值
mean = measurements.mean()
std = measurements.std()

# 先遮住明显异常的值,计算更准确的统计量
initial_mask = ma.masked_outside(measurements, mean - 3*std, mean + 3*std)
clean_mean = initial_mask.mean()
clean_std = initial_mask.std()

# 再用更严格的条件遮住异常值
final_masked = ma.masked_outside(measurements,
clean_mean - 2*clean_std,
clean_mean + 2*clean_std)

print(f"原始数据: {measurements}")
print(f"处理后数据: {final_masked}")
print(f"有效数据均值: {final_masked.mean():.2f}")
print(f"有效数据标准差: {final_masked.std():.2f}")

场景3:二维数据(图像)处理

import numpy as np
import numpy.ma as ma

# 模拟一个有坏点的传感器阵列
sensor_data = np.array([
[100, 105, 102, 108],
[103, -1, 107, -1], # -1 表示坏点
[101, 106, -1, 104],
[109, 102, 105, 107]
])

# 创建掩码数组
masked_sensor = ma.masked_values(sensor_data, -1)

print("传感器数据(带坏点):")
print(masked_sensor)

# 计算每行、每列的平均值
print(f"\n每行平均: {masked_sensor.mean(axis=1)}")
print(f"每列平均: {masked_sensor.mean(axis=0)}")

# 用有效数据的平均值填充坏点
filled = masked_sensor.filled(masked_sensor.mean())
print(f"\n填充后的数据:")
print(filled)

场景4:时间序列缺失值处理

import numpy as np
import numpy.ma as ma

# 模拟股票价格数据(某些日期无交易)
prices = np.array([100.0, 101.5, np.nan, 103.2, 102.8, np.nan, 104.5])
days = np.arange(len(prices))

# 创建掩码数组
masked_prices = ma.masked_invalid(prices)

print("股票价格数据:")
for day, price in zip(days, masked_prices):
if ma.is_masked(price):
print(f" 第{day}天: 缺失")
else:
print(f" 第{day}天: {price:.2f}")

# 计算日收益率(忽略缺失值)
# 收益率 = (今天价格 - 昨天价格) / 昨天价格
returns = []
for i in range(1, len(masked_prices)):
if not ma.is_masked(masked_prices[i]) and not ma.is_masked(masked_prices[i-1]):
ret = (masked_prices[i] - masked_prices[i-1]) / masked_prices[i-1]
returns.append(ret)

print(f"\n有效日收益率: {[f'{r:.4f}' for r in returns]}")
print(f"平均日收益率: {np.mean(returns):.4f}")

掩码数组与 pandas 的比较

掩码数组和 pandas 的 NaN 处理有相似之处,但各有优势:

import numpy as np
import numpy.ma as ma

# NumPy 掩码数组
data_np = ma.array([1, 2, 3, 4, 5], mask=[False, True, False, True, False])

# 对比:普通 NumPy 数组使用 NaN
data_nan = np.array([1, np.nan, 3, np.nan, 5])

print("掩码数组均值:", data_np.mean())
print("NaN 数组均值:", np.nanmean(data_nan)) # 需要使用 nan 开头的函数

# 掩码数组的优势:
# 1. 可以遮住任意类型的值(包括整数),而 NaN 只能用于浮点数
# 2. 操作后自动传播掩码
# 3. 与普通 NumPy 函数兼容性更好

掩码数组的适用场景

  • 处理整数数组的缺失值
  • 需要区分"缺失"和"NaN"
  • 科学计算中需要精确控制哪些数据参与计算
  • 需要保持数据原始值同时排除其参与计算

pandas 的适用场景

  • 数据分析和数据清洗
  • 需要更丰富的缺失值处理函数
  • 与表格数据交互

常用函数参考

函数说明
ma.array(data, mask=...)创建掩码数组
ma.masked_values(data, value)遮住等于某值的元素
ma.masked_where(condition, data)根据条件遮住元素
ma.masked_invalid(data)遮住 NaN 和 Inf
ma.masked_equal(data, value)遮住等于某值的元素
ma.masked_not_equal(data, value)遮住不等于某值的元素
ma.masked_greater(data, value)遮住大于某值的元素
ma.masked_less(data, value)遮住小于某值的元素
ma.masked_inside(data, v1, v2)遮住范围内的元素
ma.masked_outside(data, v1, v2)遮住范围外的元素
ma.compress_rows(a)压缩含有遮住值的行
ma.compress_cols(a)压缩含有遮住值的列
ma.compressed(a)返回未被遮住的一维数组

小结

本章介绍了 NumPy 掩码数组的核心功能:

  1. 基本概念:理解数据数组和掩码数组的关系
  2. 创建方法:多种创建掩码数组的方式
  3. 计算特性:被遮住的数据在计算中自动忽略
  4. 填充值:使用填充值替换被遮住的数据
  5. 实际应用:气象数据、实验数据、图像处理等场景

掩码数组是处理缺失值和无效数据的强大工具,特别适合科学计算和数据分析中的数据清洗工作。

练习

  1. 创建一个包含缺失值的温度数据数组,计算有效数据的统计量
  2. 使用掩码数组实现异常值检测和剔除
  3. 处理一个二维传感器数据,填充坏点并用周围有效数据的均值
  4. 比较掩码数组和 NaN 处理方式在整数数组上的差异
  5. 实现一个简单的时间序列插值函数,利用掩码数组处理缺失值