用PyTorch手把手实现DDPG算法,搞定OpenAI Gym连续控制任务(附完整代码)
用PyTorch手把手实现DDPG算法,搞定OpenAI Gym连续控制任务
深度确定性策略梯度(DDPG)作为强化学习领域的重要算法,在机器人控制、自动驾驶等连续动作空间场景中展现出独特优势。本文将带您从零开始构建完整的DDPG实现,通过PyTorch框架解决OpenAI Gym中的经典控制问题Pendulum-v0。不同于理论讲解,我们聚焦工程实践中的关键细节,提供可直接运行的代码方案。
1. 环境配置与核心架构
在开始编码前,需要配置基础环境并理解DDPG的双网络架构。Pendulum-v0环境模拟倒立摆控制任务,其状态空间包含摆角的正余弦值和角速度,动作空间为连续扭矩值。
import gym import torch import numpy as np env = gym.make('Pendulum-v0') state_dim = env.observation_space.shape[0] # 状态维度:3 action_dim = env.action_space.shape[0] # 动作维度:1 action_bound = env.action_space.high[0] # 动作范围:[-2.0, 2.0]DDPG采用Actor-Critic架构,包含四个神经网络:
- 在线Actor:策略网络,输入状态输出确定性动作
- 目标Actor:稳定训练的策略网络副本
- 在线Critic:价值网络,评估状态-动作对的Q值
- 目标Critic:稳定训练的价值网络副本
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") class Actor(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim=64): super().__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return torch.tanh(self.fc3(x)) * action_bound2. 经验回放与噪声探索
DDPG通过经验回放机制打破数据相关性,使用OU噪声实现有效探索。我们实现一个高效的回放缓冲区:
class ReplayBuffer: def __init__(self, capacity): self.buffer = collections.deque(maxlen=capacity) def add(self, state, action, reward, next_state, done): self.buffer.append((state, action, reward, next_state, done)) def sample(self, batch_size): transitions = random.sample(self.buffer, batch_size) return zip(*transitions)对于连续动作空间的探索,采用Ornstein-Uhlenbeck过程噪声:
class OUNoise: def __init__(self, action_dim, mu=0, theta=0.15, sigma=0.2): self.action_dim = action_dim self.mu = mu self.theta = theta self.sigma = sigma self.reset() def reset(self): self.state = np.ones(self.action_dim) * self.mu def sample(self): dx = self.theta * (self.mu - self.state) dx += self.sigma * np.random.randn(self.action_dim) self.state += dx return self.state3. 网络训练与软更新机制
DDPG的核心训练流程包含Critic的TD误差最小化和Actor的策略梯度上升:
def update(self, batch): states, actions, rewards, next_states, dones = batch # Critic损失计算 next_actions = self.target_actor(next_states) target_q = self.target_critic(next_states, next_actions) target_q = rewards + (1 - dones) * self.gamma * target_q current_q = self.critic(states, actions) critic_loss = F.mse_loss(current_q, target_q.detach()) # Actor策略优化 actor_loss = -self.critic(states, self.actor(states)).mean() # 网络参数更新 self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() # 目标网络软更新 self.soft_update(self.actor, self.target_actor) self.soft_update(self.critic, self.target_critic)软更新通过参数混合实现稳定训练:
def soft_update(self, local_model, target_model): for target_param, local_param in zip(target_model.parameters(), local_model.parameters()): target_param.data.copy_(self.tau*local_param.data + (1.0-self.tau)*target_param.data)4. 完整训练流程与性能优化
将各模块整合为完整训练流程,关键参数设置如下:
| 参数 | 推荐值 | 作用 |
|---|---|---|
| buffer_size | 100000 | 经验回放容量 |
| batch_size | 64 | 训练批大小 |
| gamma | 0.99 | 折扣因子 |
| tau | 0.005 | 软更新系数 |
| actor_lr | 1e-4 | Actor学习率 |
| critic_lr | 1e-3 | Critic学习率 |
训练循环实现:
def train_agent(env, agent, episodes=1000): returns = [] for episode in range(episodes): state = env.reset() episode_return = 0 noise.reset() while True: action = agent.select_action(state) next_state, reward, done, _ = env.step(action) agent.replay_buffer.add(state, action, reward, next_state, done) if len(agent.replay_buffer) > batch_size: agent.update() state = next_state episode_return += reward if done: break returns.append(episode_return) print(f"Episode {episode}: Return {episode_return:.1f}") return returns实际训练中常见问题与解决方案:
训练不稳定
- 增大回放缓冲区容量
- 降低学习率
- 增加目标网络更新频率
探索不足
- 调整OU噪声参数
- 初期采用更大噪声幅度
- 逐步衰减噪声强度
收敛速度慢
- 优化网络结构(增加层宽/深度)
- 尝试不同的激活函数
- 调整批归一化策略
5. 实战效果分析与调优建议
在Pendulum-v0环境中,典型的训练曲线呈现三个阶段:
- 探索期(0-200回合):回报波动大,智能体随机尝试不同动作
- 学习期(200-600回合):回报快速上升,策略明显改善
- 稳定期(600+回合):回报趋于稳定,策略接近最优
通过修改网络结构和训练参数可进一步提升性能:
# 更深的网络结构 class DeepCritic(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim=256): super().__init__() self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, hidden_dim) self.fc4 = nn.Linear(hidden_dim, 1) def forward(self, x, a): x = torch.cat([x, a], dim=1) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = F.relu(self.fc3(x)) return self.fc4(x)最终实现的DDPG算法能够在100-200个训练回合内稳定倒立摆,平均回报达到-200以下(原始环境定义倒立垂直向上为0,向下为-1600)。相比离散动作空间的DQN,DDPG在连续控制任务中展现出三大优势:
- 动作精度高:可输出连续扭矩值
- 训练效率高:不需要离散化动作空间
- 策略更平滑:确定性策略避免动作抖动
实际部署时,建议保存训练好的模型参数:
torch.save({ 'actor': actor.state_dict(), 'critic': critic.state_dict(), }, 'ddpg_model.pth')