时间序列分析
时间序列数据在金融、气象、销售、物联网等领域广泛存在。Pandas 提供了强大的时间序列处理能力,包括日期时间处理、时间索引、重采样、窗口操作等功能。本章将详细介绍 Pandas 时间序列分析的核心技术。
时间序列概述
Pandas 处理四种时间相关概念:
| 概念 | 标量类 | 数组类 | 数据类型 | 说明 |
|---|---|---|---|---|
| 时间戳 | Timestamp | DatetimeIndex | datetime64[ns] | 特定时刻 |
| 时间差 | Timedelta | TimedeltaIndex | timedelta64[ns] | 时间长度 |
| 时间段 | Period | PeriodIndex | period[freq] | 时间跨度 |
| 日期偏移 | DateOffset | - | - | 日历相对时间 |
创建时间序列
创建时间戳
import pandas as pd
import numpy as np
# 从字符串创建
ts1 = pd.Timestamp('2024-01-15')
print(ts1) # 2024-01-15 00:00:00
# 指定时间
ts2 = pd.Timestamp('2024-01-15 10:30:45')
print(ts2) # 2024-01-15 10:30:45
# 使用 datetime 模块
from datetime import datetime
ts3 = pd.Timestamp(datetime(2024, 1, 15, 10, 30))
print(ts3) # 2024-01-15 10:30:00
# 使用时间戳属性
print(ts2.year) # 2024
print(ts2.month) # 1
print(ts2.day) # 15
print(ts2.hour) # 10
print(ts2.minute) # 30
print(ts2.day_name()) # Monday
创建时间序列索引
# date_range:生成日期范围
# 方式1:指定起止日期和频率
dates = pd.date_range('2024-01-01', '2024-01-10')
print(dates)
# DatetimeIndex(['2024-01-01', '2024-01-02', ..., '2024-01-10'],
# dtype='datetime64[ns]', freq='D')
# 方式2:指定起始日期和数量
dates = pd.date_range('2024-01-01', periods=5)
print(dates)
# DatetimeIndex(['2024-01-01', '2024-01-02', '2024-01-03',
# '2024-01-04', '2024-01-05'], dtype='datetime64[ns]', freq='D')
# 指定频率
hourly = pd.date_range('2024-01-01', periods=24, freq='h') # 每小时
monthly = pd.date_range('2024-01-01', periods=12, freq='ME') # 每月末
business = pd.date_range('2024-01-01', periods=10, freq='B') # 工作日
# 常用频率别名
# D - 天, h - 小时, min/T - 分钟, s - 秒
# W - 周, ME - 月末, MS - 月初, YE - 年末, YS - 年初
# B - 工作日, BM - 月末工作日
字符串转日期时间
# to_datetime:将字符串转换为日期时间
dates = ['2024-01-01', '2024-01-02', '2024-01-03']
dt_index = pd.to_datetime(dates)
print(dt_index)
# DatetimeIndex(['2024-01-01', '2024-01-02', '2024-01-03'],
# dtype='datetime64[ns]', freq=None)
# 处理不同格式的日期
dates_mixed = ['01/15/2024', '2024-01-16', 'Jan 17, 2024']
dt_mixed = pd.to_datetime(dates_mixed)
print(dt_mixed)
# 指定日期格式(提高解析速度)
dates_cn = ['2024年1月15日', '2024年1月16日']
dt_cn = pd.to_datetime(dates_cn, format='%Y年%m月%d日')
print(dt_cn)
# 处理无效日期
invalid_dates = ['2024-01-01', 'invalid', '2024-01-03']
# 默认会报错,使用 errors='coerce' 将无效值转为 NaT
dt_coerce = pd.to_datetime(invalid_dates, errors='coerce')
print(dt_coerce)
# DatetimeIndex(['2024-01-01', 'NaT', '2024-01-03'], dtype='datetime64[ns]', freq=None)
从多列组装日期
# 从年、月、日列创建日期
df = pd.DataFrame({
'year': [2024, 2024, 2024],
'month': [1, 2, 3],
'day': [15, 20, 25],
'hour': [10, 14, 16]
})
dt_col = pd.to_datetime(df[['year', 'month', 'day']])
print(dt_col)
# 0 2024-01-15
# 1 2024-02-20
# 2 2024-03-25
# dtype: datetime64[ns]
时间序列索引操作
创建时间序列数据
# 创建带时间索引的 Series
dates = pd.date_range('2024-01-01', periods=10, freq='D')
ts = pd.Series(np.random.randn(10), index=dates)
print(ts)
# 2024-01-01 0.469112
# 2024-01-02 -0.282863
# ...
# Freq: D, dtype: float64
# 创建带时间索引的 DataFrame
df = pd.DataFrame({
'price': np.random.randn(10).cumsum() + 100,
'volume': np.random.randint(100, 500, 10)
}, index=dates)
print(df.head())
按时间选择数据
# 创建示例数据
dates = pd.date_range('2024-01-01', periods=100, freq='D')
df = pd.DataFrame({
'value': np.random.randn(100).cumsum()
}, index=dates)
# 使用字符串选择
print(df['2024-01-05']) # 选择某一天
print(df['2024-01']) # 选择整个月
print(df['2024-01-15':'2024-01-20']) # 日期范围
# 使用 truncate 截取
result = df.truncate(before='2024-02-01', after='2024-02-15')
时间属性访问
# 通过 dt 访问器获取时间属性
df['year'] = df.index.year
df['month'] = df.index.month
df['day'] = df.index.day
df['weekday'] = df.index.day_name()
df['is_weekend'] = df.index.dayofweek >= 5
print(df.head())
# value year month day weekday is_weekend
# 2024-01-01 100.12 2024 1 1 Monday False
# 2024-01-02 99.87 2024 1 2 Tuesday False
常用的时间属性:
| 属性 | 说明 | 属性 | 说明 |
|---|---|---|---|
year | 年 | month | 月 |
day | 日 | hour | 小时 |
minute | 分钟 | second | 秒 |
dayofweek | 周几(0-6) | day_name() | 周几名称 |
dayofyear | 年中第几天 | weekofyear | 年中第几周 |
quarter | 季度 | is_month_end | 是否月末 |
重采样(Resampling)
重采样是时间序列分析的核心操作,用于改变时间频率。
降采样(高频到低频)
# 创建每小时的温度数据
hourly_data = pd.DataFrame({
'temperature': np.random.randn(24*7).cumsum() + 20
}, index=pd.date_range('2024-01-01', periods=24*7, freq='h'))
# 降采样为日数据
daily = hourly_data.resample('D').mean()
print(daily.head())
# temperature
# 2024-01-01 20.123456
# 2024-01-02 21.234567
# ...
# 使用不同的聚合方法
daily_max = hourly_data.resample('D').max() # 每日最高
daily_min = hourly_data.resample('D').min() # 每日最低
daily_ohlc = hourly_data.resample('D').ohlc() # 开高低收
# 自定义聚合
daily_stats = hourly_data.resample('D').agg({
'temperature': ['mean', 'max', 'min', 'std']
})
print(daily_stats.head())
升采样(低频到高频)
# 创建每日数据
daily_data = pd.DataFrame({
'price': [100, 101, 102, 103, 104]
}, index=pd.date_range('2024-01-01', periods=5, freq='D'))
# 升采样为每小时数据
hourly = daily_data.resample('h').asfreq() # 产生缺失值
print(hourly.head(10))
# 填充缺失值
hourly_ffill = daily_data.resample('h').ffill() # 前向填充
hourly_bfill = daily_data.resample('h').bfill() # 后向填充
hourly_interp = daily_data.resample('h').interpolate() # 插值
常用重采样频率
| 频率 | 说明 | 频率 | 说明 |
|---|---|---|---|
D | 日 | h | 小时 |
W | 周 | ME | 月末 |
MS | 月初 | QE | 季末 |
YE | 年末 | B | 工作日 |
BH | 工作小时 | CBM | 自定义月末 |
分组重采样
# 按月分组统计
monthly = df.resample('ME').agg({
'value': ['sum', 'mean', 'count']
})
# 按季度分组
quarterly = df.resample('QE').mean()
# 按周(指定起始日)
weekly_sun = df.resample('W-SUN').mean() # 周日为起始
时间偏移和窗口操作
时间偏移
# 创建时间戳
ts = pd.Timestamp('2024-01-15 10:30:00')
# 使用 Timedelta 偏移
print(ts + pd.Timedelta(days=3)) # 加3天
print(ts - pd.Timedelta(hours=2)) # 减2小时
print(ts + pd.Timedelta('5D 3h')) # 加5天3小时
# 使用 DateOffset 偏移(考虑日历)
from pandas.tseries.offsets import BDay, MonthEnd
# 工作日偏移
print(ts + BDay(1)) # 下一个工作日
print(ts + BDay(-1)) # 上一个工作日
# 月末偏移
print(ts + MonthEnd(1)) # 本月末
print(ts + MonthEnd(0)) # 移动到月末
shift 和 tshift
# 创建时间序列
ts = pd.Series([1, 2, 3, 4, 5],
index=pd.date_range('2024-01-01', periods=5))
# shift:移动数据值,索引不变
print(ts.shift(1)) # 数据向后移动1位,第一行为NaN
# 2024-01-01 NaN
# 2024-01-02 1.0
# 2024-01-03 2.0
# 2024-01-04 3.0
# 2024-01-05 4.0
# shift:移动索引
# 较新版本使用 freq 参数
print(ts.shift(1, freq='D')) # 索引向前移动1天
# 2024-01-02 1
# 2024-01-03 2
# ...
计算变化率
# 创建股票价格数据
prices = pd.Series([100, 102, 101, 105, 107, 106, 110],
index=pd.date_range('2024-01-01', periods=7))
# 计算价格变化
price_change = prices.diff() # 与前一天的差值
print(price_change)
# 计算百分比变化
pct_change = prices.pct_change() # 与前一天的变化率
print(pct_change)
# 2024-01-01 NaN
# 2024-01-02 0.020
# 2024-01-03 -0.010
# ...
滚动窗口计算
滚动窗口(Rolling Window)是时间序列分析的重要工具,用于计算移动平均、移动标准差等指标。
基本滚动计算
# 创建示例数据
ts = pd.Series(np.random.randn(100).cumsum(),
index=pd.date_range('2024-01-01', periods=100))
# 移动平均
ma_5 = ts.rolling(window=5).mean() # 5日移动平均
ma_10 = ts.rolling(window=10).mean() # 10日移动平均
# 移动标准差
rolling_std = ts.rolling(window=5).std()
# 移动最大/最小值
rolling_max = ts.rolling(window=5).max()
rolling_min = ts.rolling(window=5).min()
# 多种统计
rolling_stats = ts.rolling(window=5).agg(['mean', 'std', 'min', 'max'])
print(rolling_stats.head(10))
滚动窗口参数
# min_periods:最小观测数
# 前4个值为NaN,直到有5个值才计算
ma_5 = ts.rolling(window=5, min_periods=3).mean() # 至少3个值开始计算
# center:居中对齐
ma_center = ts.rolling(window=5, center=True).mean()
# 基于时间的窗口
ts_time = ts.rolling('5D').mean() # 5天窗口
扩展窗口
扩展窗口从开始到当前位置逐渐增大:
# 累计均值
expanding_mean = ts.expanding().mean()
# 累计最大值
expanding_max = ts.expanding().max()
# 累计标准差
expanding_std = ts.expanding().std()
指数加权移动平均
EWMA 给予近期数据更高权重:
# 指数加权移动平均
ewm_mean = ts.ewm(span=5).mean() # 跨度为5
ewm_mean2 = ts.ewm(alpha=0.3).mean() # 平滑因子
# 指数加权移动标准差
ewm_std = ts.ewm(span=5).std()
# EWMA 参数说明:
# span:跨度,对应 N 日移动平均
# alpha:平滑因子,范围 (0, 1)
# halflife:半衰期
时区处理
设置和转换时区
# 创建时间序列
dates = pd.date_range('2024-01-01 09:00', periods=5, freq='h')
ts = pd.Series(range(5), index=dates)
# 本地化为 UTC
ts_utc = ts.tz_localize('UTC')
print(ts_utc.index)
# DatetimeIndex(['2024-01-01 09:00:00+00:00', ...], tz='UTC')
# 转换时区
ts_shanghai = ts_utc.tz_convert('Asia/Shanghai')
print(ts_shanghai.index)
# DatetimeIndex(['2024-01-01 17:00:00+08:00', ...], tz='Asia/Shanghai')
# 直接创建带时区的时间序列
dates_tz = pd.date_range('2024-01-01', periods=5, freq='D', tz='Asia/Shanghai')
时区操作
# 查看可用时区
from pytz import all_timezones
print(len(all_timezones)) # 可用时区数量
# 常用时区
# 'UTC' - 协调世界时
# 'Asia/Shanghai' - 上海时间
# 'America/New_York' - 纽约时间
# 'Europe/London' - 伦敦时间
时间段(Period)
Period 表示时间跨度,如某个月、某个季度:
# 创建时间段
p = pd.Period('2024-01', freq='M')
print(p) # 2024-01
# 时间段属性
print(p.start_time) # 2024-01-01 00:00:00
print(p.end_time) # 2024-01-31 23:59:59.999999999
# 时间段运算
print(p + 1) # 2024-02
# 创建时间段序列
periods = pd.period_range('2024-01', periods=12, freq='M')
print(periods)
# PeriodIndex(['2024-01', '2024-02', ..., '2024-12'], dtype='period[M]')
# 时间戳与时间段转换
ts = pd.Series(range(12), index=pd.date_range('2024-01', periods=12, freq='M'))
# 转换为时间段索引
ps = ts.to_period()
print(ps.index)
# PeriodIndex(['2024-01', '2024-02', ..., '2024-12'], dtype='period[M]')
实战示例
示例:股票数据分析
import pandas as pd
import numpy as np
# 模拟股票数据
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='B') # 工作日
n = len(dates)
stock_data = pd.DataFrame({
'open': 100 + np.random.randn(n).cumsum(),
'high': 101 + np.random.randn(n).cumsum(),
'low': 99 + np.random.randn(n).cumsum(),
'close': 100 + np.random.randn(n).cumsum(),
'volume': np.random.randint(1000000, 5000000, n)
}, index=dates)
# 确保 high >= close >= low
stock_data['high'] = stock_data[['open', 'high', 'close']].max(axis=1)
stock_data['low'] = stock_data[['open', 'low', 'close']].min(axis=1)
# 计算技术指标
# 5日和20日移动平均
stock_data['MA5'] = stock_data['close'].rolling(window=5).mean()
stock_data['MA20'] = stock_data['close'].rolling(window=20).mean()
# 布林带
stock_data['BOLL_MID'] = stock_data['close'].rolling(window=20).mean()
stock_data['BOLL_UP'] = stock_data['BOLL_MID'] + 2 * stock_data['close'].rolling(window=20).std()
stock_data['BOLL_DOWN'] = stock_data['BOLL_MID'] - 2 * stock_data['close'].rolling(window=20).std()
# 日收益率
stock_data['returns'] = stock_data['close'].pct_change()
# 月度汇总
monthly = stock_data.resample('ME').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
monthly['monthly_return'] = monthly['close'].pct_change()
print("月度数据汇总:")
print(monthly.head())
# 年度统计
annual_stats = stock_data['returns'].describe()
print("\n年度收益统计:")
print(annual_stats)
示例:销售数据分析
# 模拟销售数据
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
n = len(dates)
sales = pd.DataFrame({
'date': dates,
'sales': np.random.randint(1000, 5000, n) +
np.sin(np.arange(n) * 2 * np.pi / 365) * 1000 + # 季节性
np.arange(n) * 2, # 趋势
'category': np.random.choice(['A', 'B', 'C'], n)
})
sales.set_index('date', inplace=True)
# 按周汇总
weekly_sales = sales.resample('W').sum()
# 按月汇总
monthly_sales = sales.resample('ME').agg({
'sales': 'sum',
'category': lambda x: x.mode()[0] if len(x) > 0 else None
})
# 计算环比增长
monthly_sales['mom_growth'] = monthly_sales['sales'].pct_change()
# 计算同比增长(需要多年数据)
# 这里用月度数据的年增长率近似
monthly_sales['yoy_growth'] = monthly_sales['sales'].pct_change(periods=12)
# 季节性分析
sales['month'] = sales.index.month
sales['weekday'] = sales.index.dayofweek
# 按月份分析
monthly_pattern = sales.groupby('month')['sales'].mean()
print("各月平均销售额:")
print(monthly_pattern)
# 按星期分析
weekday_pattern = sales.groupby('weekday')['sales'].mean()
print("\n各星期平均销售额:")
print(weekday_pattern)
示例:缺失时间填充
# 创建有缺失时间点的数据
incomplete_dates = pd.DatetimeIndex([
'2024-01-01', '2024-01-02', '2024-01-05',
'2024-01-08', '2024-01-10'
])
ts_incomplete = pd.Series([1, 2, 3, 4, 5], index=incomplete_dates)
# 填充缺失的日期
full_range = pd.date_range(ts_incomplete.index.min(),
ts_incomplete.index.max(),
freq='D')
ts_complete = ts_incomplete.reindex(full_range)
print("填充前:")
print(ts_incomplete)
print("\n填充缺失日期后:")
print(ts_complete.head(10))
# 填充缺失值
ts_ffill = ts_complete.ffill() # 前向填充
ts_interp = ts_complete.interpolate() # 线性插值
print("\n线性插值后:")
print(ts_interp.head(10))
小结
本章我们学习了:
- 时间序列创建:时间戳、时间范围、字符串转换
- 时间索引操作:按时间选择、时间属性访问
- 重采样:降采样、升采样、聚合方法
- 时间偏移:Timedelta、DateOffset、shift
- 滚动窗口:移动平均、移动标准差、EWMA
- 时区处理:时区设置和转换
- 时间段:Period 的创建和操作
- 实战应用:股票分析、销售分析、缺失值处理
练习
- 创建一个包含一整年每日数据的时间序列,计算每月的平均值和总和
- 对股票价格数据计算 5 日、10 日、20 日移动平均线
- 将按小时记录的数据重采样为每日数据,并计算每日的最高、最低、平均值