跳到主要内容

多因子选股策略

多因子选股是机构投资者广泛使用的量化方法,通过综合多个因子来筛选具有超额收益潜力的股票。本章将详细介绍多因子选股策略的构建流程。

多因子模型框架

模型架构

一个完整的多因子选股模型包含以下模块:

因子挖掘:从各类数据中提取有效因子 因子处理:清洗、标准化、正交化 因子组合:将多个因子合成为综合得分 组合构建:根据因子得分构建投资组合 组合优化:考虑风险、约束条件优化权重 绩效归因:分析收益来源

因子分类

多因子模型中常用的因子类型:

价值因子:PE、PB、PS、PCF、EV/EBITDA等 成长因子:营收增长率、净利润增长率、ROE变化等 质量因子:ROE、ROA、毛利率、资产负债率等 动量因子:过去N月收益率、相对强度等 情绪因子:分析师预期、机构持仓变化等 技术因子:波动率、换手率、RSI等

因子数据处理

数据获取

import pandas as pd
import numpy as np
import tushare as ts

class FactorDataLoader:
"""因子数据加载器"""

def __init__(self, token):
ts.set_token(token)
self.pro = ts.pro_api()

def get_stock_list(self, date):
"""获取股票列表"""
stocks = self.pro.stock_basic(exchange='', list_status='L')
# 过滤ST股票
stocks = stocks[~stocks['name'].str.contains('ST')]
return stocks['ts_code'].tolist()

def get_price_data(self, stocks, start_date, end_date):
"""获取价格数据"""
all_data = []
for stock in stocks:
df = self.pro.daily(ts_code=stock, start_date=start_date, end_date=end_date)
all_data.append(df)

price_df = pd.concat(all_data, ignore_index=True)
price_df['trade_date'] = pd.to_datetime(price_df['trade_date'])
return price_df

def get_financial_data(self, stocks, start_date, end_date):
"""获取财务数据"""
# 利润表
income = self.pro.income(ts_code=','.join(stocks[:100]),
start_date=start_date, end_date=end_date)
# 资产负债表
balance = self.pro.balancesheet(ts_code=','.join(stocks[:100]),
start_date=start_date, end_date=end_date)
# 每日指标
daily_basic = self.pro.daily_basic(ts_code=','.join(stocks[:100]),
start_date=start_date, end_date=end_date)

return income, balance, daily_basic

因子计算

class FactorCalculator:
"""因子计算器"""

@staticmethod
def calculate_value_factors(df):
"""计算价值因子"""
factors = pd.DataFrame(index=df.index)

# EP(市盈率倒数)
factors['EP'] = 1 / df['pe_ttm']

# BP(市净率倒数)
factors['BP'] = 1 / df['pb']

# SP(市销率倒数)
factors['SP'] = df['ps'] / df['total_mv'] * df['total_mv'] # 需要营收数据

# 综合价值因子
factors['value_composite'] = factors[['EP', 'BP']].mean(axis=1)

return factors

@staticmethod
def calculate_momentum_factors(price_df, lookback=12, skip=1):
"""计算动量因子"""
factors = pd.DataFrame()

# 按股票分组计算
for stock, group in price_df.groupby('ts_code'):
group = group.sort_values('trade_date')

# 月度收益率(跳过最近一个月)
monthly_prices = group.set_index('trade_date')['close'].resample('M').last()
momentum = monthly_prices.shift(skip) / monthly_prices.shift(lookback + skip) - 1

factors.loc[stock, 'momentum_12m'] = momentum.iloc[-1] if len(momentum) > lookback else np.nan

return factors

@staticmethod
def calculate_quality_factors(df):
"""计算质量因子"""
factors = pd.DataFrame(index=df.index)

# ROE
factors['ROE'] = df['roe']

# ROA
factors['ROA'] = df['roa']

# 资产负债率(反向,低负债质量高)
factors['leverage'] = -df['debt_to_assets']

# 综合质量因子
factors['quality_composite'] = factors[['ROE', 'ROA', 'leverage']].mean(axis=1)

return factors

@staticmethod
def calculate_size_factors(df):
"""计算规模因子"""
factors = pd.DataFrame(index=df.index)

# 市值对数(小市值得分高)
factors['size'] = -np.log(df['total_mv'])

return factors

@staticmethod
def calculate_volatility_factors(price_df, window=20):
"""计算波动率因子"""
factors = pd.DataFrame()

for stock, group in price_df.groupby('ts_code'):
group = group.sort_values('trade_date')

# 波动率(低波动得分高)
returns = group['close'].pct_change()
volatility = returns.rolling(window).std() * np.sqrt(252)

factors.loc[stock, 'volatility'] = -volatility.iloc[-1]

return factors

因子标准化

class FactorProcessor:
"""因子处理器"""

@staticmethod
def winsorize(series, limits=(0.01, 0.01)):
"""去极值"""
lower = series.quantile(limits[0])
upper = series.quantile(1 - limits[1])
return series.clip(lower, upper)

@staticmethod
def standardize(series):
"""标准化(Z-score)"""
return (series - series.mean()) / series.std()

@staticmethod
def rank_standardize(series):
"""排名标准化"""
return series.rank(pct=True)

@staticmethod
def orthogonalize(target, by):
"""正交化"""
from sklearn.linear_model import LinearRegression

X = by.values.reshape(-1, 1)
y = target.values

model = LinearRegression()
model.fit(X, y)

residual = y - model.predict(X)
return pd.Series(residual, index=target.index)

def process_factor(self, factor, industry_dummies=None, orthogonal_to=None):
"""完整因子处理流程"""
# 去极值
factor = self.winsorize(factor)

# 标准化
factor = self.standardize(factor)

# 行业中性化(可选)
if industry_dummies is not None:
# 对每个行业进行标准化
for industry in industry_dummies.columns:
mask = industry_dummies[industry] == 1
factor[mask] = self.standardize(factor[mask])

# 正交化(可选)
if orthogonal_to is not None:
factor = self.orthogonalize(factor, orthogonal_to)

return factor

因子分析

IC分析

class FactorAnalyzer:
"""因子分析器"""

def __init__(self, factor_data, return_data):
"""
参数:
factor_data: 因子数据(日期x股票)
return_data: 收益数据(日期x股票)
"""
self.factor_data = factor_data
self.return_data = return_data

def calculate_ic(self, factor_col, forward_period=20):
"""计算IC序列"""
ic_list = []

dates = self.factor_data.index[:-forward_period]

for date in dates:
factor = self.factor_data.loc[date, factor_col]
future_return = self.return_data.loc[date:].iloc[1:forward_period+1].sum()

# 对齐数据
common_stocks = factor.dropna().index.intersection(future_return.dropna().index)
if len(common_stocks) < 10:
continue

ic = factor[common_stocks].corr(future_return[common_stocks])
ic_list.append({'date': date, 'IC': ic})

ic_df = pd.DataFrame(ic_list).set_index('date')
return ic_df

def ic_summary(self, ic_df):
"""IC统计摘要"""
return {
'IC均值': ic_df['IC'].mean(),
'IC标准差': ic_df['IC'].std(),
'ICIR': ic_df['IC'].mean() / ic_df['IC'].std(),
'IC>0比例': (ic_df['IC'] > 0).mean(),
'IC绝对值均值': ic_df['IC'].abs().mean(),
't统计量': ic_df['IC'].mean() / (ic_df['IC'].std() / np.sqrt(len(ic_df)))
}

def group_test(self, factor_col, n_groups=5, forward_period=20):
"""分组测试"""
group_returns = {f'G{i+1}': [] for i in range(n_groups)}

dates = self.factor_data.index[:-forward_period]

for date in dates:
factor = self.factor_data.loc[date, factor_col]
future_return = self.return_data.loc[date:].iloc[1:forward_period+1].sum()

# 对齐数据
common_stocks = factor.dropna().index.intersection(future_return.dropna().index)
if len(common_stocks) < n_groups * 10:
continue

factor = factor[common_stocks]
future_return = future_return[common_stocks]

# 分组
labels = [f'G{i+1}' for i in range(n_groups)]
groups = pd.qcut(factor, n_groups, labels=labels)

# 计算各组收益
for label in labels:
stocks = groups[groups == label].index
group_return = future_return[stocks].mean()
group_returns[label].append({'date': date, 'return': group_return})

# 转换为DataFrame
result = {}
for label, data in group_returns.items():
df = pd.DataFrame(data).set_index('date')
result[label] = df['return']

return pd.DataFrame(result)

因子组合

等权组合

def equal_weight_combination(factor_dict):
"""
等权因子组合

参数:
factor_dict: {因子名: 因子值Series}

返回:
综合因子得分
"""
combined = pd.DataFrame(factor_dict).mean(axis=1)
return combined

IC加权组合

def ic_weight_combination(factor_dict, ic_dict):
"""
IC加权因子组合

参数:
factor_dict: {因子名: 因子值Series}
ic_dict: {因子名: IC均值}

返回:
综合因子得分
"""
# 计算权重(IC绝对值归一化)
ic_values = pd.Series(ic_dict)
weights = ic_values.abs() / ic_values.abs().sum()

# 加权组合
combined = pd.Series(0, index=list(factor_dict.values())[0].index)
for name, factor in factor_dict.items():
combined += weights[name] * factor

return combined

最大化ICIR组合

from scipy.optimize import minimize

def maximize_icir_weights(factor_returns, factor_ic):
"""
最大化ICIR的因子权重

参数:
factor_returns: 因子收益历史数据
factor_ic: 因子IC序列

返回:
最优权重
"""
n_factors = len(factor_returns.columns)

# 因子IC协方差矩阵
ic_cov = factor_ic.cov()
ic_mean = factor_ic.mean()

# 目标函数:最小化负ICIR
def neg_icir(weights):
portfolio_ic = factor_ic @ weights
return -portfolio_ic.mean() / portfolio_ic.std()

# 约束条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1} # 权重和为1
]

# 边界条件
bounds = tuple((0, 1) for _ in range(n_factors))

# 初始权重
initial_weights = np.array([1/n_factors] * n_factors)

# 优化
result = minimize(
neg_icir,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints
)

return result.x

组合构建

简单选股

def select_stocks(factor_scores, n_stocks=50, method='top'):
"""
根据因子得分选股

参数:
factor_scores: 因子得分Series
n_stocks: 选股数量
method: 选股方法 ('top', 'bottom', 'both')

返回:
选中的股票列表
"""
if method == 'top':
selected = factor_scores.nlargest(n_stocks).index.tolist()
elif method == 'bottom':
selected = factor_scores.nsmallest(n_stocks).index.tolist()
else: # both
top = factor_scores.nlargest(n_stocks // 2).index.tolist()
bottom = factor_scores.nsmallest(n_stocks // 2).index.tolist()
selected = top + bottom

return selected

行业中性选股

def industry_neutral_selection(factor_scores, industry_map, n_stocks=50):
"""
行业中性选股

参数:
factor_scores: 因子得分
industry_map: 股票-行业映射
n_stocks: 总选股数量

返回:
选中的股票列表
"""
# 按行业分配股票数量
industry_counts = industry_map.value_counts()
industry_weights = industry_counts / industry_counts.sum()
industry_n_stocks = (industry_weights * n_stocks).astype(int)

selected = []
for industry, n in industry_n_stocks.items():
# 该行业的股票
stocks = industry_map[industry_map == industry].index
# 选择该行业得分最高的n只股票
industry_scores = factor_scores[stocks]
selected.extend(industry_scores.nlargest(n).index.tolist())

return selected

组合优化

def optimize_portfolio(expected_returns, cov_matrix, 
max_weight=0.05, min_weight=0,
target_volatility=None):
"""
组合优化

参数:
expected_returns: 预期收益
cov_matrix: 协方差矩阵
max_weight: 单只股票最大权重
min_weight: 单只股票最小权重
target_volatility: 目标波动率

返回:
最优权重
"""
n = len(expected_returns)

# 目标函数:最大化预期收益
def objective(weights):
return -weights @ expected_returns

# 约束条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1} # 权重和为1
]

if target_volatility:
constraints.append({
'type': 'eq',
'fun': lambda w: np.sqrt(w @ cov_matrix @ w) - target_volatility
})

# 边界条件
bounds = tuple((min_weight, max_weight) for _ in range(n))

# 初始权重
initial_weights = np.array([1/n] * n)

# 优化
result = minimize(
objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints
)

return result.x

完整策略示例

class MultiFactorStrategy:
"""多因子选股策略"""

def __init__(self, data_loader, n_stocks=50, rebalance_freq=20):
self.data_loader = data_loader
self.n_stocks = n_stocks
self.rebalance_freq = rebalance_freq

self.factor_calculator = FactorCalculator()
self.factor_processor = FactorProcessor()

def run(self, start_date, end_date):
"""运行策略"""
# 1. 获取数据
stocks = self.data_loader.get_stock_list(start_date)
price_data = self.data_loader.get_price_data(stocks, start_date, end_date)

# 2. 计算因子
factors = self.calculate_all_factors(price_data)

# 3. 因子处理
processed_factors = self.process_factors(factors)

# 4. 因子组合
combined_factor = self.combine_factors(processed_factors)

# 5. 选股
portfolio = self.select_portfolio(combined_factor)

# 6. 回测
returns = self.backtest(portfolio, price_data)

return returns

def calculate_all_factors(self, price_data):
"""计算所有因子"""
factors = {}

# 价值因子
factors['value'] = self.factor_calculator.calculate_value_factors(price_data)

# 动量因子
factors['momentum'] = self.factor_calculator.calculate_momentum_factors(price_data)

# 质量因子
factors['quality'] = self.factor_calculator.calculate_quality_factors(price_data)

# 规模因子
factors['size'] = self.factor_calculator.calculate_size_factors(price_data)

return factors

def process_factors(self, factors):
"""处理因子"""
processed = {}
for name, factor in factors.items():
processed[name] = self.factor_processor.process_factor(factor)
return processed

def combine_factors(self, factors):
"""组合因子"""
return equal_weight_combination(factors)

def select_portfolio(self, factor_scores):
"""选择投资组合"""
return select_stocks(factor_scores, self.n_stocks)

def backtest(self, portfolio, price_data):
"""回测"""
# 实现回测逻辑
pass

小结

多因子选股策略是量化投资的核心方法,本章介绍了:

  1. 因子挖掘:从各类数据中提取有效因子
  2. 因子处理:去极值、标准化、正交化
  3. 因子分析:IC分析、分组测试
  4. 因子组合:等权、IC加权、最大化ICIR
  5. 组合构建:简单选股、行业中性、组合优化

多因子策略的成功关键在于:

  • 选择有经济学逻辑支撑的因子
  • 持续监控因子有效性
  • 合理处理因子之间的相关性
  • 控制交易成本和风险