告别“炼丹”:手把手用PyTorch实现PPO算法训练CartPole平衡杆(附完整代码与调参心得)
从零实现PPO算法:PyTorch实战CartPole平衡杆训练与调参全解析
1. 强化学习与PPO算法基础
在开始实战之前,我们需要先理解几个核心概念。强化学习(Reinforcement Learning)是机器学习的一个重要分支,它关注的是智能体(Agent)如何通过与环境(Environment)的交互来学习最优策略(Policy),从而最大化累积奖励(Reward)。
PPO(Proximal Policy Optimization)是一种基于策略梯度的强化学习算法,它通过限制策略更新的幅度来保证训练的稳定性。与传统的策略梯度方法相比,PPO有两个主要优势:
- 重要性采样:允许重复使用旧策略收集的数据
- 裁剪机制:防止策略更新过大导致性能崩溃
import gym import torch import torch.nn as nn import torch.optim as optim import numpy as np from collections import deque import matplotlib.pyplot as plt # 设置随机种子保证可重复性 SEED = 42 torch.manual_seed(SEED) np.random.seed(SEED)2. 环境与网络架构搭建
2.1 CartPole环境介绍
CartPole是一个经典的强化学习测试环境,目标是通过左右移动小车来保持杆子竖直。环境提供以下状态信息:
- 小车位置
- 小车速度
- 杆子角度
- 杆子角速度
动作空间是离散的:0(向左移动)和1(向右移动)。每保持平衡一步获得+1奖励,当杆子倾斜超过15度或小车移动超出边界时回合结束。
env = gym.make('CartPole-v1') env.seed(SEED) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n print(f"状态空间维度: {state_dim}") print(f"动作空间维度: {action_dim}")2.2 Actor-Critic网络实现
PPO采用Actor-Critic架构,其中:
- Actor网络:输出动作概率分布
- Critic网络:评估状态价值
class ActorCritic(nn.Module): def __init__(self, state_dim, action_dim): super(ActorCritic, self).__init__() # 共享的特征提取层 self.shared_layers = nn.Sequential( nn.Linear(state_dim, 64), nn.Tanh(), nn.Linear(64, 64), nn.Tanh() ) # Actor分支 self.actor = nn.Sequential( nn.Linear(64, action_dim), nn.Softmax(dim=-1) ) # Critic分支 self.critic = nn.Linear(64, 1) def forward(self, state): features = self.shared_layers(state) action_probs = self.actor(features) state_value = self.critic(features) return action_probs, state_value3. PPO核心算法实现
3.1 经验收集与存储
PPO需要收集智能体与环境交互的经验(状态、动作、奖励等)用于训练。我们使用一个简单的缓冲区来存储这些数据。
class PPOBuffer: def __init__(self, buffer_size, state_dim): self.states = np.zeros((buffer_size, state_dim), dtype=np.float32) self.actions = np.zeros(buffer_size, dtype=np.int64) self.rewards = np.zeros(buffer_size, dtype=np.float32) self.values = np.zeros(buffer_size, dtype=np.float32) self.log_probs = np.zeros(buffer_size, dtype=np.float32) self.dones = np.zeros(buffer_size, dtype=np.bool_) self.ptr = 0 self.max_size = buffer_size def store(self, state, action, reward, value, log_prob, done): idx = self.ptr % self.max_size self.states[idx] = state self.actions[idx] = action self.rewards[idx] = reward self.values[idx] = value self.log_probs[idx] = log_prob self.dones[idx] = done self.ptr += 1 def get(self): return ( self.states, self.actions, self.rewards, self.values, self.log_probs, self.dones )3.2 优势函数计算
优势函数A(s,a) = Q(s,a) - V(s)衡量了在状态s下采取动作a比平均情况好多少。我们使用广义优势估计(GAE)来计算优势函数。
def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95): batch_size = len(rewards) advantages = np.zeros(batch_size, dtype=np.float32) last_advantage = 0 for t in reversed(range(batch_size)): if t == batch_size - 1: next_non_terminal = 1.0 - dones[t] next_value = values[t] else: next_non_terminal = 1.0 - dones[t] next_value = values[t+1] delta = rewards[t] + gamma * next_value * next_non_terminal - values[t] advantages[t] = delta + gamma * lam * next_non_terminal * last_advantage last_advantage = advantages[t] returns = advantages + values return advantages, returns3.3 PPO损失函数
PPO的核心在于其特殊的损失函数设计,包括策略损失、价值函数损失和熵奖励。
def ppo_loss(old_probs, states, actions, advantages, returns, clip_ratio=0.2, entropy_coef=0.01): # 计算新策略的概率和状态价值 new_probs, new_values = model(torch.FloatTensor(states)) new_probs = new_probs.gather(1, torch.LongTensor(actions).unsqueeze(1)) old_probs = torch.FloatTensor(old_probs).unsqueeze(1) # 重要性采样比率 ratio = (new_probs / old_probs).squeeze() # 裁剪策略损失 surr1 = ratio * torch.FloatTensor(advantages) surr2 = torch.clamp(ratio, 1.0 - clip_ratio, 1.0 + clip_ratio) * torch.FloatTensor(advantages) policy_loss = -torch.min(surr1, surr2).mean() # 价值函数损失 value_loss = nn.MSELoss()(new_values.squeeze(), torch.FloatTensor(returns)) # 熵奖励(鼓励探索) entropy = -(new_probs * torch.log(new_probs + 1e-10)).mean() entropy_bonus = entropy_coef * entropy total_loss = policy_loss + 0.5 * value_loss - entropy_bonus return total_loss, policy_loss.item(), value_loss.item(), entropy.item()4. 训练流程与调参技巧
4.1 完整训练循环
def train_ppo(env, model, optimizer, epochs=100, steps_per_epoch=4000, max_ep_len=1000, clip_ratio=0.2, train_iters=80, gamma=0.99, lam=0.95, lr=3e-4): buffer = PPOBuffer(steps_per_epoch, env.observation_space.shape[0]) episode_rewards = [] episode_lengths = [] for epoch in range(epochs): state = env.reset() ep_reward = 0 ep_len = 0 for t in range(steps_per_epoch): with torch.no_grad(): action_probs, value = model(torch.FloatTensor(state)) action = torch.multinomial(action_probs, 1).item() log_prob = torch.log(action_probs[action]) next_state, reward, done, _ = env.step(action) ep_reward += reward ep_len += 1 buffer.store(state, action, reward, value.item(), log_prob.item(), done) state = next_state if done or (ep_len == max_ep_len): episode_rewards.append(ep_reward) episode_lengths.append(ep_len) state = env.reset() ep_reward = 0 ep_len = 0 # 计算优势函数和回报 states, actions, rewards, values, log_probs, dones = buffer.get() advantages, returns = compute_gae(rewards, values, dones, gamma, lam) # 标准化优势函数 advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) # 更新模型 for _ in range(train_iters): loss, policy_loss, value_loss, entropy = ppo_loss( log_probs, states, actions, advantages, returns, clip_ratio ) optimizer.zero_grad() loss.backward() optimizer.step() # 打印训练信息 if (epoch + 1) % 10 == 0: avg_reward = np.mean(episode_rewards[-10:]) avg_len = np.mean(episode_lengths[-10:]) print(f"Epoch: {epoch+1}, Avg Reward: {avg_reward:.1f}, Avg Length: {avg_len:.1f}") print(f"Loss: {loss.item():.3f}, Policy Loss: {policy_loss:.3f}, Value Loss: {value_loss:.3f}, Entropy: {entropy:.3f}") return episode_rewards4.2 关键调参经验
在PPO算法中,以下几个参数对训练效果影响最大:
折扣因子gamma:控制未来奖励的重要性
- 较高值(0.99):更关注长期回报
- 较低值(0.9):更关注即时奖励
GAE参数lambda:平衡偏差和方差
- 接近1:低偏差但高方差
- 接近0:高偏差但低方差
裁剪比例clip_ratio:控制策略更新幅度
- 典型值:0.1-0.3
- 过大:失去PPO的约束效果
- 过小:学习速度变慢
学习率lr:影响参数更新速度
- 建议从3e-4开始尝试
- 可以配合学习率衰减使用
批量大小:每次更新使用的样本数
- CartPole:128-2048
- 更复杂环境:更大批量
# 超参数设置 hyperparams = { 'epochs': 200, 'steps_per_epoch': 4000, 'max_ep_len': 1000, 'clip_ratio': 0.2, 'train_iters': 80, 'gamma': 0.99, 'lam': 0.95, 'lr': 3e-4 } # 初始化模型和优化器 model = ActorCritic(state_dim, action_dim) optimizer = optim.Adam(model.parameters(), lr=hyperparams['lr']) # 开始训练 rewards = train_ppo(env, model, optimizer, **hyperparams)5. 训练结果分析与可视化
训练完成后,我们可以绘制奖励曲线来观察学习过程:
def plot_rewards(rewards, window_size=100): plt.figure(figsize=(12, 6)) # 原始奖励曲线 plt.subplot(1, 2, 1) plt.plot(rewards) plt.title('Raw Training Rewards') plt.xlabel('Episode') plt.ylabel('Reward') # 滑动平均奖励曲线 plt.subplot(1, 2, 2) moving_avg = np.convolve(rewards, np.ones(window_size)/window_size, mode='valid') plt.plot(moving_avg) plt.title(f'Moving Average (window={window_size})') plt.xlabel('Episode') plt.ylabel('Reward') plt.tight_layout() plt.show() plot_rewards(rewards)典型的训练曲线会经历以下几个阶段:
- 探索期:奖励波动大,智能体随机尝试不同动作
- 学习期:奖励开始稳步上升
- 稳定期:奖励达到环境最大值(CartPole为500)
如果训练出现问题,可能表现为:
- 奖励不增长:学习率可能过大/过小,或网络结构不合适
- 奖励突然下降:策略更新过大,需要减小clip_ratio
- 奖励波动大:增大批量大小或调整GAE参数
6. 模型测试与部署
训练完成后,我们可以测试模型在实际环境中的表现:
def test_model(env, model, episodes=10, render=False): total_rewards = [] for ep in range(episodes): state = env.reset() done = False ep_reward = 0 while not done: if render: env.render() with torch.no_grad(): action_probs, _ = model(torch.FloatTensor(state)) action = torch.argmax(action_probs).item() state, reward, done, _ = env.step(action) ep_reward += reward total_rewards.append(ep_reward) print(f"Episode {ep+1}: Reward = {ep_reward}") avg_reward = np.mean(total_rewards) print(f"\nAverage reward over {episodes} episodes: {avg_reward:.1f}") return total_rewards # 测试训练好的模型 test_rewards = test_model(env, model, episodes=10, render=True) env.close()对于实际部署,我们可以保存模型参数供后续使用:
# 保存模型 torch.save(model.state_dict(), 'ppo_cartpole.pth') # 加载模型 loaded_model = ActorCritic(state_dim, action_dim) loaded_model.load_state_dict(torch.load('ppo_cartpole.pth')) loaded_model.eval()7. 常见问题与解决方案
在实际实现PPO算法时,可能会遇到以下典型问题:
训练不稳定
- 现象:奖励曲线剧烈波动
- 解决方案:减小学习率,增大clip_ratio,增加批量大小
收敛速度慢
- 现象:奖励增长缓慢
- 解决方案:增大学习率,减小clip_ratio,调整网络结构
过早收敛到次优策略
- 现象:奖励停滞在较低水平
- 解决方案:增加熵奖励系数,减小gamma值
梯度爆炸
- 现象:损失变为NaN
- 解决方案:梯度裁剪,减小学习率,检查网络初始化
样本效率低
- 现象:需要大量交互数据
- 解决方案:增大GAE的lambda值,增加并行环境数量
# 梯度裁剪示例 for _ in range(train_iters): optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) # 梯度裁剪 optimizer.step()