PyTorch训练RL实现CartPole平衡实验

edwin99
edwin99
2024-02-09 15:46
26 阅读
0 评论
文章封面
目录
正在加载目录...

强化学习训练小车平衡杆子(左右移动小车,保持竖直杆平衡),用OpenAI Gym的CartPole-v1环境

本地或在线运行:本地用VSCode,在线用Colab

 

安装Gym:

import sys

!{sys.executable} -m pip install gym

 

CartPole环境属性:动作空间Action Space(离散,0=左,1=右),观察空间Observation Space(维度4,0=小车位置,1=速度,2=杆角度,3=杆角速度)

代码:

import gym

 

env = gym.make("CartPole-v1")

 

print(f"Action space: {env.action_space}")

print(f"Observation space: {env.observation_space}")

 

模拟:

模拟循环到env.step返done=True;用随机AP选策略env.action_space.sample();

在新窗口运行,多试多观察不同行为

 

env.reset()

 

done = Falsetotal_reward = 0while not done:

env.render()

obs, rew, done, info = env.step(env.action_space.sample())

total_reward += rew

print(f"{obs} -> {rew}")

print(f"Total reward: {total_reward}")

观察/状态值(state):小车位置,小车速度,杆角,杆旋转速率

奖励机制:每步+1,目标最大化总奖励(延长平衡时间)

 

两种策略:

确定性策略:a=π(s)直接输出动作

概率性策略:π(a|s)输出动作的概率分布

 

Policy Gradient方法:强化学习+神经网络直接预测动作概率

神经网络输出动作选择概率,用梯度上升优化

训练目标是最大化长期回报(调参数,高回报动作概率增加)

 

代码:

import numpy as npimport matplotlib.pyplot as pltimport torch

 

num_inputs = 4num_actions = 2

 

model = torch.nn.Sequential(

torch.nn.Linear(num_inputs, 128, bias=False, dtype=torch.float32),

torch.nn.ReLU(),

torch.nn.Linear(128, num_actions, bias = False, dtype=torch.float32),

torch.nn.Softmax(dim=1))

多次实验+更新参数,定义函数,几率实验的状态+动作+动作概率+奖励序列

梯度上升优化策略网络+用轨迹数据算梯度

def run_episode(max_steps_per_episode = 10000,render=False):

states, actions, probs, rewards = [],[],[],[]

state = env.reset()

for _ in range(max_steps_per_episode):

if render:

env.render()

action_probs = model(torch.from_numpy(np.expand_dims(state,0)))[0]

action = np.random.choice(num_actions, p=np.squeeze(action_probs.detach().numpy()))

nstate, reward, done, info = env.step(action)

if done:

break

states.append(state)

actions.append(action)

probs.append(action_probs.detach().numpy())

rewards.append(reward)

state = nstate

return np.vstack(states), np.vstack(actions), np.vstack(probs), np.vstack(rewards)

未训练网络:运行1回合epoch时,总奖励=回合长度很低

s, a, p, r = run_episode()

print(f"Total reward: {np.sum(r)}")

 

折扣奖励:每一步计算总奖励时按gamma对早期奖励衰减

然后归一化:对折扣后奖励向量标准化 = 训练权重

 

 

eps = 0.0001

 

def discounted_rewards(rewards,gamma=0.99,normalize=True):

ret = []

s = 0

for r in rewards[::-1]:

s = r + gamma * s

ret.insert(0, s)

if normalize:

ret = (ret-np.mean(ret))/(np.std(ret)+eps)

return ret

 

 

 

训练:运动实验 - 梯度 - 奖励加权 - 目标动作混合 - 更新

  1. 执行300 episode,每回合收集轨迹数据(状态,动作,奖励)

  2. 梯度计算,比较实际动作与网络预测概率的差异,生成梯度

  3. 归一化后,折扣奖励 x 梯度

  4. 预测概率 + 梯度调整后结果,用lr控制混合比例

  5. 状态+混合目标动作训练网络,迭代优化

 

代码:

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

 

def train_on_batch(x, y):

x = torch.from_numpy(x)

y = torch.from_numpy(y)

optimizer.zero_grad()

predictions = model(x)

loss = -torch.mean(torch.log(predictions) * y)

loss.backward()

optimizer.step()

return loss

 

alpha = 1e-4

 

history = []for epoch in range(300):

states, actions, probs, rewards = run_episode()

one_hot_actions = np.eye(2)[actions.T][0]

gradients = one_hot_actions-probs

dr = discounted_rewards(rewards)

gradients *= dr

target = alpha*np.vstack([gradients])+probs

train_on_batch(states,target)

history.append(np.sum(rewards))

if epoch%100==0:

print(f"{epoch} -> {np.sum(rewards)}")

 

plt.plot(history)

_ = run_episode(render=True)

 

Actor-Critic模型:双输出结构(Actor + Critic),然后联合训练;策略优化+值函数估计 -> 提升稳定性和效率

 

Actor:输出动作概率分布 = 策略梯度,主导动作选择

 

Critic:估计当前状态未来总奖励=值函数,评估动作价值

 

联合训练:Actor用Critic的奖励反馈来优化策略;Critic用TD误差(实际奖励-预测值)来更新估值

 

定义模型:

from itertools import count

import torch.nn.functional as F

 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")env = gym.make("CartPole-v1")

 

state_size = env.observation_space.shape[0]action_size = env.action_space.nlr = 0.0001

 

class Actor(torch.nn.Module):

def __init__(self, state_size, action_size):

super(Actor, self).__init__()

self.state_size = state_size

self.action_size = action_size

self.linear1 = torch.nn.Linear(self.state_size, 128)

self.linear2 = torch.nn.Linear(128, 256)

self.linear3 = torch.nn.Linear(256, self.action_size)

 

def forward(self, state):

output = F.relu(self.linear1(state))

output = F.relu(self.linear2(output))

output = self.linear3(output)

distribution = torch.distributions.Categorical(F.softmax(output, dim=-1))

return distribution

 

 

class Critic(torch.nn.Module):

def __init__(self, state_size, action_size):

super(Critic, self).__init__()

self.state_size = state_size

self.action_size = action_size

self.linear1 = torch.nn.Linear(self.state_size, 128)

self.linear2 = torch.nn.Linear(128, 256)

self.linear3 = torch.nn.Linear(256, 1)

 

def forward(self, state):

output = F.relu(self.linear1(state))

output = F.relu(self.linear2(output))

value = self.linear3(output)

return value

 

 

可修改discounted_rewards(),run_episode(),适应A-C模型

 

代码:

def discounted_rewards(next_value, rewards, masks, gamma=0.99):

R = next_value

returns = []

for step in reversed(range(len(rewards))):

R = rewards[step] + gamma * R * masks[step]

returns.insert(0, R)

return returns

 

def run_episode(actor, critic, n_iters):

optimizerA = torch.optim.Adam(actor.parameters())

optimizerC = torch.optim.Adam(critic.parameters())

for iter in range(n_iters):

state = env.reset()

log_probs = []

values = []

rewards = []

masks = []

entropy = 0

env.reset()

 

for i in count():

env.render()

state = torch.FloatTensor(state).to(device)

dist, value = actor(state), critic(state)

 

action = dist.sample()

next_state, reward, done, _ = env.step(action.cpu().numpy())

 

log_prob = dist.log_prob(action).unsqueeze(0)

entropy += dist.entropy().mean()

 

log_probs.append(log_prob)

values.append(value)

rewards.append(torch.tensor([reward], dtype=torch.float, device=device))

masks.append(torch.tensor([1-done], dtype=torch.float, device=device))

 

state = next_state

 

if done:

print('Iteration: {}, Score: {}'.format(iter, i))

break

 

 

next_state = torch.FloatTensor(next_state).to(device)

next_value = critic(next_state)

returns = discounted_rewards(next_value, rewards, masks)

 

log_probs = torch.cat(log_probs)

returns = torch.cat(returns).detach()

values = torch.cat(values)

 

advantage = returns - values

 

actor_loss = -(log_probs * advantage.detach()).mean()

critic_loss = advantage.pow(2).mean()

 

optimizerA.zero_grad()

optimizerC.zero_grad()

actor_loss.backward()

critic_loss.backward()

optimizerA.step()

optimizerC.step()

手动训练循环:用自定义损失函数计算+参数更新

actor = Actor(state_size, action_size).to(device)

critic = Critic(state_size, action_size).to(device)

run_episode(actor, critic, n_iters=100)

 

关闭环境:

env.close()

 

扩展学习:

  1. 训练山地车逃脱问题

  2. TensorFlow训练RL实现CartPole平衡实验

  3. CartPole滑冰问题

  4. 模拟小车冲出山谷实验

评论区 (0)

登录后参与评论

暂无评论,抢沙发吧!