PPO训练小车
PPO 训练小车(以经典 CartPole 为例),核心是Actor-Critic 架构 + 裁剪目标 + GAE 优势估计,通过多轮数据复用稳定更新策略,让小车学会平衡杆或完成导航。下面从原理、环境、代码、训练到调优,给出完整可运行方案。
一、PPO 训练小车核心原理
PPO(Proximal Policy Optimization)是Actor-Critic架构的策略梯度算法,核心是限制策略更新幅度,避免训练震荡。
- Actor(策略网络):输入状态,输出动作概率分布(离散 / 连续),指导小车动作。
- Critic(价值网络):输入状态,输出状态价值 V (s),评估当前状态好坏。
- 裁剪目标(PPO-Clip):
- 重要性采样比:rt(θ)=πθold(at∣st)πθ(at∣st)
- 裁剪损失:LCLIP(θ)=E[min(rtAt,clip(rt,1−ϵ,1+ϵ)At)]
- ϵ通常取 0.2,防止策略更新过大。
- 优势函数(GAE):At=∑k=0∞(γλ)kδt+k,δt=rt+γV(st+1)−V(st),平衡偏差与方差。
二、环境选择与搭建
1. 经典小车环境(CartPole-v1)
- 状态空间(4 维):小车位置、速度、杆角度、杆角速度。
- 动作空间(离散 2 维):左移、右移。
- 奖励:每步 + 1,杆倒 / 车出界则回合结束,目标累计奖励≥475。
- 安装依赖:
bash
运行
pip install gymnasium torch numpy matplotlib2. 自定义 / ROS 小车环境(可选)
- 用 Gazebo+ROS 搭建 TurtleBot3,定义观测(激光 / 图像)、动作(线速度 / 角速度)、奖励函数(避障 + 进度)。
- 或用 MetaDrive 做自动驾驶仿真,动作空间为连续(转向 + 油门)。
三、完整 PPO 训练小车代码(PyTorch)
1. 网络定义(Actor+Critic)
python
运行
import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import gymnasium as gym import numpy as np from collections import deque import matplotlib.pyplot as plt # 设备配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Actor网络:输出动作概率 class Actor(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim=64): super(Actor, self).__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return F.softmax(self.fc3(x), dim=-1) # Critic网络:输出状态价值 class Critic(nn.Module): def __init__(self, state_dim, hidden_dim=64): super(Critic, self).__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return self.fc3(x)2. PPO Agent 实现
python
运行
class PPO: def __init__(self, state_dim, action_dim, lr_actor=3e-4, lr_critic=1e-3, gamma=0.99, lmbda=0.95, eps_clip=0.2, epochs=10): # 网络初始化 self.actor = Actor(state_dim, action_dim).to(device) self.critic = Critic(state_dim).to(device) self.optimizer_actor = optim.Adam(self.actor.parameters(), lr=lr_actor) self.optimizer_critic = optim.Adam(self.critic.parameters(), lr=lr_critic) # PPO超参数 self.gamma = gamma # 折扣因子 self.lmbda = lmbda # GAE参数 self.eps_clip = eps_clip # 裁剪系数 self.epochs = epochs # 每批数据训练轮数 # 经验池 self.memory = [] # 存储经验 def store(self, state, action, reward, log_prob, done): self.memory.append((state, action, reward, log_prob, done)) # 选择动作(训练/测试) def select_action(self, state, training=True): state = torch.FloatTensor(state).unsqueeze(0).to(device) probs = self.actor(state) dist = torch.distributions.Categorical(probs) action = dist.sample() log_prob = dist.log_prob(action) if training: return action.item(), log_prob.item() else: return torch.argmax(probs).item() # 测试取最优动作 # 计算GAE优势 def compute_gae(self, rewards, dones, values): advantages = [] advantage = 0 next_value = 0 for t in reversed(range(len(rewards))): delta = rewards[t] + self.gamma * next_value * (1 - dones[t]) - values[t] advantage = delta + self.gamma * self.lmbda * (1 - dones[t]) * advantage advantages.insert(0, advantage) next_value = values[t] # 优势归一化 advantages = torch.FloatTensor(advantages).to(device) advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) return advantages # PPO更新核心 def update(self): # 提取经验 states = torch.FloatTensor([s for s, a, r, lp, d in self.memory]).to(device) actions = torch.LongTensor([a for s, a, r, lp, d in self.memory]).to(device) rewards = torch.FloatTensor([r for s, a, r, lp, d in self.memory]).to(device) old_log_probs = torch.FloatTensor([lp for s, a, r, lp, d in self.memory]).to(device) dones = torch.FloatTensor([d for s, a, r, lp, d in self.memory]).to(device) # 计算价值与优势 values = self.critic(states).squeeze() advantages = self.compute_gae(rewards, dones, values.detach().cpu().numpy()) returns = advantages + values.detach() # TD目标 # 多轮更新 for _ in range(self.epochs): # 新策略概率 new_probs = self.actor(states) new_dist = torch.distributions.Categorical(new_probs) new_log_probs = new_dist.log_prob(actions) # 重要性采样比 ratio = torch.exp(new_log_probs - old_log_probs) # 裁剪损失 surr1 = ratio * advantages surr2 = torch.clamp(ratio, 1-self.eps_clip, 1+self.eps_clip) * advantages loss_actor = -torch.min(surr1, surr2).mean() # Critic损失 loss_critic = F.mse_loss(self.critic(states).squeeze(), returns) # 反向传播 self.optimizer_actor.zero_grad() self.optimizer_critic.zero_grad() loss_actor.backward() loss_critic.backward() self.optimizer_actor.step() self.optimizer_critic.step() # 清空经验池 self.memory = []3. 训练主循环
python
运行
def train_ppo(): # 环境初始化 env = gym.make("CartPole-v1") state_dim = env.observation_space.shape[0] action_dim = env.action_space.n ppo = PPO(state_dim, action_dim) max_episodes = 1000 max_steps = 500 reward_history = [] avg_reward = deque(maxlen=100) for episode in range(max_episodes): state, _ = env.reset() total_reward = 0 done = False for step in range(max_steps): # 选择动作 action, log_prob = ppo.select_action(state) next_state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated # 存储经验 ppo.store(state, action, reward, log_prob, done) total_reward += reward state = next_state if done: break # 更新策略 ppo.update() # 记录奖励 avg_reward.append(total_reward) reward_history.append(total_reward) print(f"Episode {episode+1}, Total Reward: {total_reward}, Avg Reward: {np.mean(avg_reward):.2f}") # 收敛条件:平均奖励≥475 if np.mean(avg_reward) >= 475: print(f"训练完成!Episode {episode+1} 达到收敛条件") torch.save(ppo.actor.state_dict(), "cartpole_ppo_actor.pth") torch.save(ppo.critic.state_dict(), "cartpole_ppo_critic.pth") break # 绘制奖励曲线 plt.plot(reward_history) plt.xlabel("Episode") plt.ylabel("Total Reward") plt.title("PPO Training on CartPole-v1") plt.show() if __name__ == "__main__": train_ppo()四、训练流程与关键步骤
- 环境交互:每回合用旧策略采样轨迹,存储
(s,a,r,log_prob,done)。 - GAE 计算:基于 Critic 价值,计算每步优势At,并归一化。
- 多轮更新:同一批数据训练
epochs次,用裁剪损失限制策略更新。 - 收敛判断:连续 100 回合平均奖励≥475(CartPole 满分 500)。
五、超参数调优(关键)
表格
| 参数 | 含义 | 推荐值 | 调优方向 |
|---|---|---|---|
lr_actor | Actor 学习率 | 3e-4 | 收敛慢调大,震荡调小 |
lr_critic | Critic 学习率 | 1e-3 | 通常比 Actor 大 |
gamma | 折扣因子 | 0.99 | 长期依赖调大 |
lmbda | GAE 参数 | 0.95 | 平衡偏差 / 方差 |
eps_clip | 裁剪系数 | 0.2 | 震荡调小,收敛慢调大 |
epochs | 每批训练轮数 | 10 | 数据复用次数 |
六、测试与部署
python
运行
def test_ppo(): env = gym.make("CartPole-v1", render_mode="human") state_dim = env.observation_space.shape[0] action_dim = env.action_space.n ppo = PPO(state_dim, action_dim) # 加载模型 ppo.actor.load_state_dict(torch.load("cartpole_ppo_actor.pth")) ppo.critic.load_state_dict(torch.load("cartpole_ppo_critic.pth")) for episode in range(10): state, _ = env.reset() total_reward = 0 done = False while not done: action = ppo.select_action(state, training=False) next_state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated total_reward += reward state = next_state print(f"Test Episode {episode+1}, Reward: {total_reward}") env.close() if __name__ == "__main__": test_ppo()七、扩展到真实小车 / ROS
- 状态空间:替换为激光雷达、相机图像、里程计(如 2D/3D 坐标、速度)。
- 动作空间:连续动作(线速度
v、角速度w),Actor 输出高斯分布均值 / 方差。 - 奖励函数:
- 正向:到达目标点 + 100、每步前进 + 1、避障 + 5
- 负向:碰撞 - 200、超时 - 50、偏离路径 - 10
- 环境对接:用
openai_ros或自定义 Gym 环境,实现 ROS 与 PPO Agent 通信。
八、常见问题与解决
- 训练震荡:减小
lr_actor、增大eps_clip、增加epochs。 - 收敛慢:增大学习率、调整
gamma/lmbda、增加经验池大小。 - 策略退化:确保优势归一化、裁剪损失正确、Critic 价值估计准确。
