当前位置: 首页 > news >正文

基于Transformer的股票市场多因子量化选股模型,深度解析:基于Transformer的股票市场多因子量化选股模型

目录

引言:传统多因子模型的困境与Transformer的破局

第一部分:理论基础与架构设计

1.1 为什么是Transformer?

1.2 模型架构全景

1.3 与现有模型的对比

第二部分:数据获取与预处理(完整代码)

2.1 数据源配置

2.2 数据管道优化

第三部分:Transformer模型实现(完整代码)

3.1 核心模型架构

3.2 高级特性:时间自适应层归一化(AdaNorm)

3.3 训练流程实现

第四部分:训练与回测框架

4.1 数据准备与划分

4.2 训练主流程

第五部分:回测模拟与性能分析

5.1 回测引擎实现

5.2 基准对比测试

第六部分:模型解释性与注意力可视化

6.1 注意力权重可视化

6.2 SHAP值解释


引言:传统多因子模型的困境与Transformer的破局

在量化投资领域,多因子选股模型一直是基石性的存在。从Fama-French三因子模型到Barra结构化风险模型,投资者们一直在寻找能够解释股票收益率截面差异的因子组合。然而,传统多因子模型普遍面临几个核心痛点:因子之间的非线性交互难以捕捉、时间序列动态特性被忽略、以及因子权重的人为主观设定带来的过拟合风险。

2024年以来,随着Transformer架构在时间序列预测领域的不断突破,将这一革命性的深度学习架构引入多因子选股模型已成为量化研究的前沿方向。Transformer的自注意力机制天然适合捕捉股票之间的相互影响关系,其位置编码能够建模时间动态性,而多头注意力则可以自动发现不同因子之间的复杂交互模式。

本文将系统性地介绍如何构建一个完整的基于Transformer的多因子量化选股模型,涵盖数据预处理、因子工程、Transformer架构设计、训练策略、回测框架以及实盘部署的完整链路。全文将提供完整的可运行代码,并基于A股市场的真实数据案例进行说明。


第一部分:理论基础与架构设计

1.1 为什么是Transformer?

传统多因子模型通常采用线性回归、岭回归或Lasso回归来估计因子权重,其核心假设是因子与未来收益之间存在线性关系。然而,现实中的股票市场具有典型的非线性、非平稳和高噪声特征。

卷积神经网络(CNN)擅长提取局部特征,但难以捕捉全市场股票之间的全局依赖关系;循环神经网络(RNN)及其变体LSTM虽然能处理时间序列,但存在梯度消失问题且难以并行计算。Transformer的self-attention机制能够:

  • 直接建模任意两只股票之间的相关性

  • 动态计算因子权重而非静态线性组合

  • 并行处理全市场股票,训练效率大幅提升

1.2 模型架构全景

我们的模型采用编码器-解码器架构,但针对选股任务进行了定制化改造。整体架构包含以下模块:

text

输入层:[股票特征矩阵] → 嵌入层 → 位置编码 ↓ Transformer编码器(6层) - 多头自注意力(8个头) - 前馈神经网络 - 残差连接与层归一化 ↓ 因子交叉注意力模块(创新点) ↓ 时间聚合层(自适应加权) ↓ 输出层:[预期收益率/排序分数]
1.3 与现有模型的对比
模型类型因子交互时间动态全市场关系训练效率
线性多因子
XGBoost树分裂滚动窗口
LSTM隐状态
Transformer自注意力位置编码注意力矩阵

第二部分:数据获取与预处理(完整代码)

2.1 数据源配置

我们使用baostock作为A股数据源(免费且稳定),同时结合Tushare作为备选。

python

# -*- coding: utf-8 -*- import numpy as np import pandas as pd import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset import baostock as bs import datetime from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split import warnings warnings.filterwarnings('ignore') # 设置随机种子确保可复现性 def set_seed(seed=42): np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) set_seed() class StockDataFetcher: """股票数据获取与预处理类""" def __init__(self, start_date='2018-01-01', end_date='2024-12-31'): self.start_date = start_date self.end_date = end_date self.stock_pool = self._get_stock_pool() def _get_stock_pool(self): """获取股票池(沪深300成分股)""" bs.login() # 获取最新的沪深300成分股 rs = bs.query_hs300_stocks() stock_list = [] while rs.next(): stock_list.append(rs.get_row_data()[0]) bs.logout() return stock_list[:200] # 为了演示效率,只取前200只 def fetch_daily_data(self): """获取日频交易数据和基础因子""" bs.login() all_data = [] for stock in self.stock_pool: # 获取日K线数据 k_data = bs.query_history_k_data_plus( stock, "date,open,high,low,close,volume,amount,peTTM,pbMRQ,psTTM,pctChg", start_date=self.start_date, end_date=self.end_date, frequency="d" ) stock_df = [] while k_data.next(): stock_df.append(k_data.get_row_data()) if stock_df: df = pd.DataFrame(stock_df, columns=['date','open','high','low','close', 'volume','amount','peTTM','pbMRQ', 'psTTM','pctChg']) df['stock_code'] = stock df = df.astype({'open':float,'high':float,'low':float,'close':float, 'volume':float,'amount':float,'peTTM':float,'pbMRQ':float, 'psTTM':float,'pctChg':float}) all_data.append(df) bs.logout() return pd.concat(all_data, ignore_index=True) def compute_technical_factors(self, df): """计算技术面因子""" # 动量因子(5日、20日、60日收益率) for period in [5, 20, 60]: df[f'momentum_{period}d'] = df.groupby('stock_code')['close'].pct_change(period) # 波动率因子(20日历史波动率) df['volatility_20d'] = df.groupby('stock_code')['pctChg'].rolling(20).std().reset_index(0, drop=True) # 成交量比率(5日均量/20日均量) df['volume_5_20_ratio'] = (df.groupby('stock_code')['volume'].rolling(5).mean().reset_index(0, drop=True) / (df.groupby('stock_code')['volume'].rolling(20).mean().reset_index(0, drop=True) + 1e-8)) # 相对强弱指标RSI def compute_rsi(series, period=14): delta = series.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / (loss + 1e-8) rsi = 100 - (100 / (1 + rs)) return rsi df['rsi_14d'] = df.groupby('stock_code')['close'].transform(lambda x: compute_rsi(x)) return df def compute_fundamental_factors(self, df): """计算基本面因子""" # 盈利因子(EP) df['ep'] = 1.0 / (df['peTTM'] + 1e-8) df['ep'] = df['ep'].clip(upper=10) # 限制极端值 # 账面市值比(BM) df['bm'] = 1.0 / (df['pbMRQ'] + 1e-8) df['bm'] = df['bm'].clip(upper=10) # 市销率倒数 df['sp'] = 1.0 / (df['psTTM'] + 1e-8) # 规模因子(对数市值) df['log_cap'] = np.log(df['volume'] * df['close']) return df def compute_alpha_factors(self, df): """计算高阶alpha因子""" # Alpha#001 (反转因子) df['alpha_001'] = -df.groupby('stock_code')['close'].pct_change(5) # Alpha#004 (趋势强度) df['alpha_004'] = (df['close'] - df['close'].shift(20)) / (df['close'].shift(20) + 1e-8) # Alpha#006 (成交量加权动量) df['alpha_006'] = (df['close'] - df['close'].shift(10)) / (df['close'].shift(10) + 1e-8) * df['volume'] # 自定义复合因子(聪明钱因子) df['smart_money'] = (df['volume'] * df['pctChg']).rolling(5).mean() / (df['volume'].rolling(5).mean() + 1e-8) return df def prepare_features(self): """整合所有特征准备训练数据""" print("正在获取原始数据...") raw_df = self.fetch_daily_data() print(f"原始数据形状: {raw_df.shape}") print("正在计算技术因子...") df = self.compute_technical_factors(raw_df) print("正在计算基本面因子...") df = self.compute_fundamental_factors(df) print("正在计算Alpha因子...") df = self.compute_alpha_factors(df) # 定义最终特征列 feature_cols = [ 'momentum_5d', 'momentum_20d', 'momentum_60d', 'volatility_20d', 'volume_5_20_ratio', 'rsi_14d', 'ep', 'bm', 'sp', 'log_cap', 'alpha_001', 'alpha_004', 'alpha_006', 'smart_money' ] # 缺失值处理 df[feature_cols] = df[feature_cols].fillna(method='bfill').fillna(method='ffill').fillna(0) # 计算标签(未来20日收益率) df['future_return_20d'] = df.groupby('stock_code')['close'].transform( lambda x: (x.shift(-20) - x) / (x + 1e-8) ) # 删除含有NaN的标签行 df = df.dropna(subset=['future_return_20d']) # 截面数据标准化(每个交易日独立进行) dates = df['date'].unique() all_features = [] all_labels = [] for date in dates: day_data = df[df['date'] == date].copy() if len(day_data) > 10: # 确保有足够股票 scaler = StandardScaler() features_scaled = scaler.fit_transform(day_data[feature_cols]) all_features.append(features_scaled) all_labels.append(day_data['future_return_20d'].values) X = np.array(all_features) y = np.array(all_labels) print(f"特征数据形状: {X.shape}") print(f"标签数据形状: {y.shape}") return X, y, dates
2.2 数据管道优化

为了支持高效的Transformer训练,我们需要构建一个批量数据加载器,支持按时间窗口批量采样。

python

class TransformerDataset(Dataset): """为Transformer设计的时序数据集""" def __init__(self, features, labels, lookback=60): """ features: (T, N, D) T个交易日,N只股票,D个因子 labels: (T, N) 未来收益标签 lookback: 回溯窗口长度 """ self.features = torch.FloatTensor(features) self.labels = torch.FloatTensor(labels) self.lookback = lookback # 滑动窗口构建样本 self.sample_indices = [] for t in range(lookback, len(features)): for stock_idx in range(features.shape[1]): # 确保标签不是NaN if not np.isnan(labels[t, stock_idx]): self.sample_indices.append((t, stock_idx)) def __len__(self): return len(self.sample_indices) def __getitem__(self, idx): t, stock_idx = self.sample_indices[idx] # 获取该股票过去lookback天的特征序列 x = self.features[t-self.lookback:t, stock_idx, :] # (lookback, D) y = self.labels[t, stock_idx] return x, torch.tensor(y, dtype=torch.float32)

第三部分:Transformer模型实现(完整代码)

3.1 核心模型架构

python

class PositionalEncoding(nn.Module): """位置编码层——注入时间信息""" def __init__(self, d_model, max_len=5000, dropout=0.1): super().__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): # x: (seq_len, batch, d_model) x = x + self.pe[:x.size(0), :] return self.dropout(x) class FactorCrossAttention(nn.Module): """因子交叉注意力模块——创新点:让不同因子相互交互""" def __init__(self, d_model, n_heads, d_factor=64): super().__init__() self.attention = nn.MultiheadAttention(d_model, n_heads, batch_first=True) self.factor_proj = nn.Linear(d_model, d_factor) self.cross_attn = nn.MultiheadAttention(d_factor, n_heads, batch_first=True) def forward(self, x): # x: (batch, seq_len, d_model) # 自注意力捕捉时序依赖 attn_out, _ = self.attention(x, x, x) x = x + attn_out # 因子投影到因子空间进行交叉 factor_space = self.factor_proj(x) cross_out, _ = self.cross_attn(factor_space, factor_space, factor_space) return x + cross_out class StockTransformer(nn.Module): """完整的股票多因子Transformer模型""" def __init__(self, input_dim=14, # 输入因子维度 d_model=128, # 模型隐藏维度 n_heads=8, # 注意力头数 num_layers=6, # 编码器层数 dropout=0.1, # Dropout比率 ff_dim=256, # 前馈网络维度 max_seq_len=60): # 最大序列长度 super().__init__() # 输入嵌入层 self.input_proj = nn.Linear(input_dim, d_model) self.pos_encoder = PositionalEncoding(d_model, max_seq_len, dropout) # Transformer编码器层(使用改进的因子交叉注意力) encoder_layers = [] for _ in range(num_layers): encoder_layers.append( nn.TransformerEncoderLayer( d_model=d_model, nhead=n_heads, dim_feedforward=ff_dim, dropout=dropout, batch_first=True, activation='gelu' ) ) self.transformer_encoder = nn.Sequential(*encoder_layers) # 因子交叉注意力模块 self.factor_cross = FactorCrossAttention(d_model, n_heads) # 时序自适应聚合层(注意力池化) self.temporal_attention = nn.MultiheadAttention(d_model, n_heads, batch_first=True) # 输出头 self.output_layer = nn.Sequential( nn.Linear(d_model, 64), nn.GELU(), nn.Dropout(dropout), nn.Linear(64, 32), nn.GELU(), nn.Dropout(dropout), nn.Linear(32, 1) ) # 可学习的聚合查询向量 self.query_vector = nn.Parameter(torch.randn(1, 1, d_model)) # 初始化权重 self._init_weights() def _init_weights(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) def forward(self, x, mask=None): """ x: (batch_size, seq_len, input_dim) """ # 输入投影 x = self.input_proj(x) # (batch, seq_len, d_model) # 位置编码(需要转置为seq_len,batch,d_model) x = x.transpose(0, 1) x = self.pos_encoder(x) x = x.transpose(0, 1) # Transformer编码 x = self.transformer_encoder(x) # 因子交叉注意力 x = self.factor_cross(x) # 时序自适应聚合 # 使用可学习的查询向量聚合整个序列 query = self.query_vector.expand(x.size(0), -1, -1) # (batch, 1, d_model) aggregated, _ = self.temporal_attention(query, x, x) # (batch, 1, d_model) aggregated = aggregated.squeeze(1) # (batch, d_model) # 输出预测 output = self.output_layer(aggregated) # (batch, 1) return output.squeeze(-1) def predict_rank(self, x): """预测排序分数(用于选股)""" return self.forward(x) class RankingAwareLoss(nn.Module): """排序感知损失函数——考虑股票之间的相对排序关系""" def __init__(self, margin=0.1, pair_weight=0.5): super().__init__() self.margin = margin self.pair_weight = pair_weight def forward(self, pred, target): # 均方误差损失 mse_loss = nn.MSELoss()(pred, target) # 成对排序损失(Pairwise Ranking Loss) batch_size = pred.size(0) if batch_size < 2: return mse_loss # 生成所有可能的配对 pred_diff = pred.unsqueeze(1) - pred.unsqueeze(0) target_diff = target.unsqueeze(1) - target.unsqueeze(0) # 只有真实的排序方向与预测方向不一致时才产生损失 rank_loss = torch.relu(self.margin - target_diff * pred_diff) # 只考虑有效配对(真实差异不为0) mask = (target_diff.abs() > 1e-6).float() rank_loss = (rank_loss * mask).sum() / (mask.sum() + 1e-8) return mse_loss + self.pair_weight * rank_loss
3.2 高级特性:时间自适应层归一化(AdaNorm)

python

class AdaptiveLayerNorm(nn.Module): """自适应层归一化——根据市场状态动态调整归一化参数""" def __init__(self, d_model, market_state_dim=5): super().__init__() self.d_model = d_model self.gamma_mlp = nn.Linear(market_state_dim, d_model) self.beta_mlp = nn.Linear(market_state_dim, d_model) def forward(self, x, market_state): # x: (batch, seq_len, d_model) # market_state: (batch, market_state_dim) 市场状态特征(如VIX、市场收益率等) gamma = self.gamma_mlp(market_state).unsqueeze(1) # (batch, 1, d_model) beta = self.beta_mlp(market_state).unsqueeze(1) # 标准层归一化 mean = x.mean(dim=-1, keepdim=True) std = x.std(dim=-1, keepdim=True) normalized = (x - mean) / (std + 1e-8) return gamma * normalized + beta
3.3 训练流程实现

python

class TransformerTrainer: """模型训练与验证管理器""" def __init__(self, model, learning_rate=1e-4, weight_decay=1e-5): self.model = model self.optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay) self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(self.optimizer, T_0=10, T_mult=2) self.criterion = RankingAwareLoss(margin=0.1) self.scaler = torch.cuda.amp.GradScaler() # 混合精度训练 def train_epoch(self, dataloader, device='cuda'): self.model.train() total_loss = 0 num_batches = 0 for batch_x, batch_y in dataloader: batch_x, batch_y = batch_x.to(device), batch_y.to(device) self.optimizer.zero_grad() # 混合精度前向传播 with torch.cuda.amp.autocast(): pred = self.model(batch_x) loss = self.criterion(pred, batch_y) # 反向传播 self.scaler.scale(loss).backward() self.scaler.step(self.optimizer) self.scaler.update() total_loss += loss.item() num_batches += 1 # 梯度裁剪 torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.scheduler.step() return total_loss / num_batches def validate(self, dataloader, device='cuda'): self.model.eval() total_loss = 0 all_preds = [] all_targets = [] with torch.no_grad(): for batch_x, batch_y in dataloader: batch_x, batch_y = batch_x.to(device), batch_y.to(device) pred = self.model(batch_x) loss = self.criterion(pred, batch_y) total_loss += loss.item() all_preds.extend(pred.cpu().numpy()) all_targets.extend(batch_y.cpu().numpy()) # 计算IC和Rank IC preds_array = np.array(all_preds) targets_array = np.array(all_targets) ic = np.corrcoef(preds_array, targets_array)[0, 1] rank_ic = np.corrcoef(np.argsort(preds_array), np.argsort(targets_array))[0, 1] return total_loss / len(dataloader), ic, rank_ic

第四部分:训练与回测框架

4.1 数据准备与划分

python

def prepare_data_for_training(X, y, lookback=60, train_ratio=0.7, val_ratio=0.15): """准备训练/验证/测试数据""" T, N, D = X.shape # 时间维度划分 train_end = int(T * train_ratio) val_end = int(T * (train_ratio + val_ratio)) X_train, y_train = X[:train_end], y[:train_end] X_val, y_val = X[train_end:val_end], y[train_end:val_end] X_test, y_test = X[val_end:], y[val_end:] # 构建数据集 train_dataset = TransformerDataset(X_train, y_train, lookback) val_dataset = TransformerDataset(X_val, y_val, lookback) test_dataset = TransformerDataset(X_test, y_test, lookback) # 批量采样(注意:由于TransformerDataset返回的是单股票序列,需要特殊处理批次) # 这里使用自定义collate函数 def collate_fn(batch): sequences, targets = zip(*batch) # 序列填充到相同长度(实际上所有序列长度都是lookback) sequences = torch.stack(sequences) targets = torch.tensor(targets) return sequences, targets train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn, num_workers=4) val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn, num_workers=4) test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn, num_workers=4) return train_loader, val_loader, test_loader
4.2 训练主流程

python

def main_training_pipeline(): # 1. 获取数据 fetcher = StockDataFetcher(start_date='2018-01-01', end_date='2024-06-30') X, y, dates = fetcher.prepare_features() # 2. 准备数据加载器 train_loader, val_loader, test_loader = prepare_data_for_training(X, y, lookback=60) # 3. 初始化模型 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model = StockTransformer( input_dim=14, d_model=128, n_heads=8, num_layers=6, dropout=0.1, ff_dim=256 ).to(device) print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}") # 4. 训练器 trainer = TransformerTrainer(model, learning_rate=1e-4) # 5. 训练循环 best_val_loss = float('inf') best_rank_ic = 0 patience = 10 patience_counter = 0 num_epochs = 100 for epoch in range(num_epochs): train_loss = trainer.train_epoch(train_loader, device) val_loss, val_ic, val_rank_ic = trainer.validate(val_loader, device) print(f"Epoch {epoch+1}/{num_epochs}") print(f" Train Loss: {train_loss:.6f}") print(f" Val Loss: {val_loss:.6f} | IC: {val_ic:.4f} | Rank IC: {val_rank_ic:.4f}") # 早停与模型保存 if val_rank_ic > best_rank_ic: best_rank_ic = val_rank_ic torch.save(model.state_dict(), 'best_transformer_model.pt') patience_counter = 0 print(f" *** 新最佳模型保存,Rank IC: {val_rank_ic:.4f} ***") else: patience_counter += 1 if patience_counter >= patience: print(f"早停触发,最佳Rank IC: {best_rank_ic:.4f}") break # 6. 测试集评估 model.load_state_dict(torch.load('best_transformer_model.pt')) test_loss, test_ic, test_rank_ic = trainer.validate(test_loader, device) print(f"\n测试集结果:") print(f" Loss: {test_loss:.6f} | IC: {test_ic:.4f} | Rank IC: {test_rank_ic:.4f}") return model, trainer

第五部分:回测模拟与性能分析

5.1 回测引擎实现

python

class BacktestEngine: """基于预测信号的选股回测引擎""" def __init__(self, model, fetcher, lookback=60, top_k=30, rebalance_freq=5): self.model = model self.fetcher = fetcher self.lookback = lookback self.top_k = top_k self.rebalance_freq = rebalance_freq self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def generate_signals(self, X_dates, X_features): """生成每日选股信号""" self.model.eval() all_signals = {} with torch.no_grad(): for idx, date in enumerate(X_dates[self.lookback:]): # 获取过去lookback天的数据 feature_window = X_features[idx:idx+self.lookback] # (lookback, N, D) # 为每只股票生成预测 stock_scores = [] for stock_idx in range(feature_window.shape[1]): stock_seq = torch.FloatTensor(feature_window[:, stock_idx, :]).unsqueeze(0).to(self.device) score = self.model(stock_seq).cpu().item() stock_scores.append(score) # 选择top_k股票 scores_array = np.array(stock_scores) top_indices = np.argsort(-scores_array)[:self.top_k] all_signals[date] = { 'selected_stocks': top_indices, 'scores': scores_array[top_indices] } return all_signals def run_backtest(self, price_data, signals, initial_capital=1e6): """执行回测""" capital = initial_capital positions = {} portfolio_values = [] dates = sorted(signals.keys()) for i, date in enumerate(dates): if i % self.rebalance_freq == 0: # 调仓日 selected = signals[date]['selected_stocks'] # 等权重配置 weight_per_stock = 1.0 / len(selected) # 获取当日价格 day_prices = price_data[price_data['date'] == date] positions = {stock: weight_per_stock * capital for stock in selected} # 计算当日组合价值 day_prices = price_data[price_data['date'] == date] portfolio_value = sum( positions.get(stock, 0) * (day_prices[day_prices['stock_code'] == stock]['close'].values[0] if len(day_prices[day_prices['stock_code'] == stock]) > 0 else 1) for stock in positions ) portfolio_values.append(portfolio_value) # 计算绩效指标 returns = np.diff(portfolio_values) / portfolio_values[:-1] sharpe = np.sqrt(252) * np.mean(returns) / (np.std(returns) + 1e-8) max_drawdown = self._compute_max_drawdown(portfolio_values) return { 'final_value': portfolio_values[-1], 'total_return': (portfolio_values[-1] - initial_capital) / initial_capital, 'sharpe_ratio': sharpe, 'max_drawdown': max_drawdown, 'daily_returns': returns } def _compute_max_drawdown(self, values): peak = np.maximum.accumulate(values) drawdown = (values - peak) / peak return np.min(drawdown)
5.2 基准对比测试

python

def benchmark_comparison(): """与基准模型对比""" benchmarks = { '沪深300': 0.08, # 假设年化收益 '等权多因子': 0.12, 'LSTM选股': 0.15, 'Transformer(本文)': 0.22 } # 绘制对比图(示意代码) import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) models = list(benchmarks.keys()) returns = list(benchmarks.values()) colors = ['gray', 'blue', 'orange', 'red'] bars = plt.bar(models, returns, color=colors) plt.ylabel('年化收益率') plt.title('不同选股模型绩效对比') for bar, ret in zip(bars, returns): plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005, f'{ret:.1%}', ha='center', va='bottom') plt.savefig('benchmark_comparison.png', dpi=150)

第六部分:模型解释性与注意力可视化

6.1 注意力权重可视化

python

class AttentionVisualizer: """可视化Transformer的自注意力权重,揭示因子重要性""" def __init__(self, model): self.model = model def extract_attention_weights(self, sample_input): """从模型中提取注意力权重""" attentions = [] def hook_fn(module, input, output): # 捕获注意力权重 if hasattr(module, 'attention_weights'): attentions.append(module.attention_weights) # 注册钩子 for layer in self.model.transformer_encoder: if hasattr(layer, 'self_attn'): layer.self_attn.register_forward_hook(hook_fn) with torch.no_grad(): _ = self.model(sample_input) return attentions def plot_factor_importance(self, feature_names): """绘制因子重要性热力图""" # 分析输出层权重 first_layer_weights = self.model.input_proj.weight.detach().cpu().numpy() importance = np.abs(first_layer_weights).mean(axis=0) importance = importance / importance.sum() plt.figure(figsize=(12, 6)) indices = np.argsort(importance)[::-1] colors = plt.cm.RdYlGn(np.linspace(0, 1, len(feature_names))) plt.barh(range(len(feature_names)), importance[indices], color=colors) plt.yticks(range(len(feature_names)), np.array(feature_names)[indices]) plt.xlabel('相对重要性') plt.title('Transformer模型自动学习的因子重要性排序') plt.tight_layout() plt.savefig('factor_importance.png', dpi=150) return dict(zip(np.array(feature_names)[indices], importance[indices]))
6.2 SHAP值解释

python

import shap def shap_analysis(model, sample_data, feature_names): """使用SHAP进行模型解释""" # 创建解释器 explainer = shap.DeepExplainer(model, sample_data[:100]) # 计算SHAP值 shap_values = explainer.shap_values(sample_data[:10]) # 可视化 shap.summary_plot(shap_values, sample_data[:10], feature_names=feature_names, show=False) plt.savefig('shap_summary.png', dpi=150) return shap_values
http://www.jsqmd.com/news/764249/

相关文章:

  • GIS小白也能看懂的实战:5步教你将ArcGIS里的等高线和水系完美导入CAD做规划图
  • 终极蓝光技术分析工具BDInfo完全指南:从入门到精通
  • 热脱附设备选购关注点:品质好、性能强的品牌 - 品牌推荐大师1
  • 苏州鼎轩废旧电子产品:太仓专业的线路板回收公司推荐几家 - LYL仔仔
  • 2026在线网盘深度解析:为什么坚果云是企业与个人数据管理的综合优选?
  • #2026最新沙发皮革材料公司推荐!广东优质权威榜单发布,品质靠谱佛山等地皮革供应商精选 - 十大品牌榜
  • 某总部经济园办公楼群引入瑞冬地源热泵集中能源站
  • MacBook上FFmpeg全家桶安装指南:Homebrew一键搞定与手动配置全流程
  • 从备份到治理:workspace-archiver如何重塑文档管理工程实践
  • 告别手动复制粘贴:用C#和TIA Portal API批量导入HMI文本列表(附完整源码)
  • 鸣潮工具箱:如何用开源工具一键解锁120FPS与深度抽卡分析
  • Word 练习题(7)
  • 保姆级教程:用Python复现CVPR 2018视频异常检测经典算法(附代码)
  • 文本到视频生成技术的多维度评估体系与实践
  • 首驱、宝岛、九号、极核、台铃、雅迪哪个好?一篇讲清六大电动车品牌怎么选 - Top品牌推荐官
  • 国产智能体如何选?OpenClaw本地化替代方案深度解析 - 品牌2025
  • ComfyUI-WanVideoWrapper:零基础入门AI视频生成的完整指南
  • GPT-5.5适合哪些行业?企业落地应用场景全解析
  • VQ-VA WORLD框架:视觉问答技术的突破与应用
  • Python新手必看:TypeError: ‘str‘ object is not callable 的3个真实踩坑案例与修复
  • Windows系统优化终极指南:Chris Titus Tech WinUtil完整教程
  • 磁力链接转种子文件:3分钟掌握Magnet2Torrent终极指南
  • 用FPGA实现ISO15693读卡器:从协议解析到Verilog代码实战(附源码)
  • 国内盐雾腐蚀试验箱厂家哪家强?综合实力TOP3排行榜 - 品牌推荐大师
  • Python-pptx进阶玩法:给你的PPT批量添加视频封面和演讲者备注
  • Word 练习题(8)
  • 5分钟掌握Blender VR角色创作:VRM插件终极指南
  • 抖音同款斗地主残局,我用Python暴力破解了!附完整代码和避坑指南
  • 保姆级教程:彻底搞懂Pytorch的pin_memory和num_workers,解决训练中“假”的CUDA OOM错误
  • AMD迷你PC游戏性能优化:内存与操作系统影响解析