强化学习训练小车平衡杆子(左右移动小车,保持竖直杆平衡),用OpenAI Gym的CartPole-v1环境
本地或在线运行:本地用VSCode,在线用Colab
安装Gym:
import sys
!{sys.executable} -m pip install gym
CartPole环境属性:动作空间Action Space(离散,0=左,1=右),观察空间Observation Space(维度4,0=小车位置,1=速度,2=杆角度,3=杆角速度)
代码:
import gym
env = gym.make("CartPole-v1")
print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")
模拟:
模拟循环到env.step返done=True;用随机AP选策略env.action_space.sample();
在新窗口运行,多试多观察不同行为
env.reset()
done = Falsetotal_reward = 0while not done:
env.render()
obs, rew, done, info = env.step(env.action_space.sample())
total_reward += rew
print(f"{obs} -> {rew}")
print(f"Total reward: {total_reward}")
观察/状态值(state):小车位置,小车速度,杆角,杆旋转速率
奖励机制:每步+1,目标最大化总奖励(延长平衡时间)
两种策略:
确定性策略:a=π(s)直接输出动作
概率性策略:π(a|s)输出动作的概率分布
Policy Gradient方法:强化学习+神经网络直接预测动作概率
神经网络输出动作选择概率,用梯度上升优化
训练目标是最大化长期回报(调参数,高回报动作概率增加)
代码:
import numpy as npimport matplotlib.pyplot as pltimport torch
num_inputs = 4num_actions = 2
model = torch.nn.Sequential(
torch.nn.Linear(num_inputs, 128, bias=False, dtype=torch.float32),
torch.nn.ReLU(),
torch.nn.Linear(128, num_actions, bias = False, dtype=torch.float32),
torch.nn.Softmax(dim=1))
多次实验+更新参数,定义函数,几率实验的状态+动作+动作概率+奖励序列
梯度上升优化策略网络+用轨迹数据算梯度
def run_episode(max_steps_per_episode = 10000,render=False):
states, actions, probs, rewards = [],[],[],[]
state = env.reset()
for _ in range(max_steps_per_episode):
if render:
env.render()
action_probs = model(torch.from_numpy(np.expand_dims(state,0)))[0]
action = np.random.choice(num_actions, p=np.squeeze(action_probs.detach().numpy()))
nstate, reward, done, info = env.step(action)
if done:
break
states.append(state)
actions.append(action)
probs.append(action_probs.detach().numpy())
rewards.append(reward)
state = nstate
return np.vstack(states), np.vstack(actions), np.vstack(probs), np.vstack(rewards)
未训练网络:运行1回合epoch时,总奖励=回合长度很低
s, a, p, r = run_episode()
print(f"Total reward: {np.sum(r)}")
折扣奖励:每一步计算总奖励时按gamma对早期奖励衰减
然后归一化:对折扣后奖励向量标准化 = 训练权重
eps = 0.0001
def discounted_rewards(rewards,gamma=0.99,normalize=True):
ret = []
s = 0
for r in rewards[::-1]:
s = r + gamma * s
ret.insert(0, s)
if normalize:
ret = (ret-np.mean(ret))/(np.std(ret)+eps)
return ret
训练:运动实验 - 梯度 - 奖励加权 - 目标动作混合 - 更新
-
执行300 episode,每回合收集轨迹数据(状态,动作,奖励)
-
梯度计算,比较实际动作与网络预测概率的差异,生成梯度
-
归一化后,折扣奖励 x 梯度
-
预测概率 + 梯度调整后结果,用lr控制混合比例
-
状态+混合目标动作训练网络,迭代优化
代码:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
def train_on_batch(x, y):
x = torch.from_numpy(x)
y = torch.from_numpy(y)
optimizer.zero_grad()
predictions = model(x)
loss = -torch.mean(torch.log(predictions) * y)
loss.backward()
optimizer.step()
return loss
alpha = 1e-4
history = []for epoch in range(300):
states, actions, probs, rewards = run_episode()
one_hot_actions = np.eye(2)[actions.T][0]
gradients = one_hot_actions-probs
dr = discounted_rewards(rewards)
gradients *= dr
target = alpha*np.vstack([gradients])+probs
train_on_batch(states,target)
history.append(np.sum(rewards))
if epoch%100==0:
print(f"{epoch} -> {np.sum(rewards)}")
plt.plot(history)
_ = run_episode(render=True)
Actor-Critic模型:双输出结构(Actor + Critic),然后联合训练;策略优化+值函数估计 -> 提升稳定性和效率
Actor:输出动作概率分布 = 策略梯度,主导动作选择
Critic:估计当前状态未来总奖励=值函数,评估动作价值
联合训练:Actor用Critic的奖励反馈来优化策略;Critic用TD误差(实际奖励-预测值)来更新估值
定义模型:
from itertools import count
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")env = gym.make("CartPole-v1")
state_size = env.observation_space.shape[0]action_size = env.action_space.nlr = 0.0001
class Actor(torch.nn.Module):
def __init__(self, state_size, action_size):
super(Actor, self).__init__()
self.state_size = state_size
self.action_size = action_size
self.linear1 = torch.nn.Linear(self.state_size, 128)
self.linear2 = torch.nn.Linear(128, 256)
self.linear3 = torch.nn.Linear(256, self.action_size)
def forward(self, state):
output = F.relu(self.linear1(state))
output = F.relu(self.linear2(output))
output = self.linear3(output)
distribution = torch.distributions.Categorical(F.softmax(output, dim=-1))
return distribution
class Critic(torch.nn.Module):
def __init__(self, state_size, action_size):
super(Critic, self).__init__()
self.state_size = state_size
self.action_size = action_size
self.linear1 = torch.nn.Linear(self.state_size, 128)
self.linear2 = torch.nn.Linear(128, 256)
self.linear3 = torch.nn.Linear(256, 1)
def forward(self, state):
output = F.relu(self.linear1(state))
output = F.relu(self.linear2(output))
value = self.linear3(output)
return value
可修改discounted_rewards(),run_episode(),适应A-C模型
代码:
def discounted_rewards(next_value, rewards, masks, gamma=0.99):
R = next_value
returns = []
for step in reversed(range(len(rewards))):
R = rewards[step] + gamma * R * masks[step]
returns.insert(0, R)
return returns
def run_episode(actor, critic, n_iters):
optimizerA = torch.optim.Adam(actor.parameters())
optimizerC = torch.optim.Adam(critic.parameters())
for iter in range(n_iters):
state = env.reset()
log_probs = []
values = []
rewards = []
masks = []
entropy = 0
env.reset()
for i in count():
env.render()
state = torch.FloatTensor(state).to(device)
dist, value = actor(state), critic(state)
action = dist.sample()
next_state, reward, done, _ = env.step(action.cpu().numpy())
log_prob = dist.log_prob(action).unsqueeze(0)
entropy += dist.entropy().mean()
log_probs.append(log_prob)
values.append(value)
rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
masks.append(torch.tensor([1-done], dtype=torch.float, device=device))
state = next_state
if done:
print('Iteration: {}, Score: {}'.format(iter, i))
break
next_state = torch.FloatTensor(next_state).to(device)
next_value = critic(next_state)
returns = discounted_rewards(next_value, rewards, masks)
log_probs = torch.cat(log_probs)
returns = torch.cat(returns).detach()
values = torch.cat(values)
advantage = returns - values
actor_loss = -(log_probs * advantage.detach()).mean()
critic_loss = advantage.pow(2).mean()
optimizerA.zero_grad()
optimizerC.zero_grad()
actor_loss.backward()
critic_loss.backward()
optimizerA.step()
optimizerC.step()
手动训练循环:用自定义损失函数计算+参数更新
actor = Actor(state_size, action_size).to(device)
critic = Critic(state_size, action_size).to(device)
run_episode(actor, critic, n_iters=100)
关闭环境:
env.close()
扩展学习:
-
训练山地车逃脱问题
-
TensorFlow训练RL实现CartPole平衡实验
-
CartPole滑冰问题
-
模拟小车冲出山谷实验