模拟小车冲出山谷实验

edwin99
edwin99
2024-02-09 20:09
38 阅读
0 评论
文章封面
目录
正在加载目录...

import numpy as np

import tensorflow as tf

from tensorflow import keras

import matplotlib.pyplot as plt

import gym

 

env = gym.make("CartPole-v1")

 

class REINFORCE:

def __init__(self, env, path=None):

self.env=env #import env

self.state_shape=env.observation_space.shape # the state space

self.action_shape=env.action_space.n # the action space

self.gamma=0.99 # decay rate of past observations

self.alpha=1e-4 # learning rate in the policy gradient

self.learning_rate=0.01 # learning rate in deep learning

if not path:

self.model=self._create_model() #build model

else:

self.model=self.load_model(path) #import model

 

# record observations

self.states=[]

self.gradients=[]

self.rewards=[]

self.probs=[]

self.discounted_rewards=[]

self.total_rewards=[]

 

def hot_encode_action(self, action): '''encoding the actions into a binary list'''

 

action_encoded=np.zeros(self.action_shape, np.float32)

action_encoded[action]=1

 

return action_encoded

def remember(self, state, action, action_prob, reward): '''stores observations'''

encoded_action=self.hot_encode_action(action)

self.gradients.append(encoded_action-action_prob)

self.states.append(state)

self.rewards.append(reward)

self.probs.append(action_prob)

 

def _create_model(self): ''' builds the model using keras'''

model=keras.Sequential()

 

# input shape is of observations

model.add(keras.layers.Dense(24, input_shape=self.state_shape, activation="relu"))

# add a relu layer

model.add(keras.layers.Dense(12, activation="relu"))

 

# output shape is according to the number of action

# The softmax function outputs a probability distribution over the actions

model.add(keras.layers.Dense(self.action_shape, activation="softmax"))

model.compile(loss="categorical_crossentropy",

optimizer=keras.optimizers.Adam(lr=self.learning_rate))

return model

 

def get_action(self, state): '''samples the next action based on the policy probabilty distribution of the actions'''

 

# transform state

state=state.reshape([1, state.shape[0]])

# get action probably

action_probability_distribution=self.model.predict(state).flatten()

# norm action probability distribution

action_probability_distribution/=np.sum(action_probability_distribution)

# sample action

action=np.random.choice(self.action_shape,1,

p=action_probability_distribution)[0]

 

return action, action_probability_distribution

 

def get_discounted_rewards(self, rewards):

'''Use gamma to calculate the total reward discounting for rewards Following - \gamma ^ t * Gt'''

discounted_rewards=[]

cumulative_total_return=0

# iterate the rewards backwards and and calc the total return

for reward in rewards[::-1]:

cumulative_total_return=(cumulative_total_return*self.gamma)+reward

discounted_rewards.insert(0, cumulative_total_return)

 

# normalize discounted rewards

mean_rewards=np.mean(discounted_rewards)

std_rewards=np.std(discounted_rewards)

norm_discounted_rewards=(discounted_rewards-

mean_rewards)/(std_rewards+1e-7) # avoiding zero div

return norm_discounted_rewards

 

def update_policy(self): '''Updates the policy network using the NN model. This function is used after the MC sampling is done - following \delta \theta = \alpha * gradient + log pi'''

# get X

states=np.vstack(self.states)

 

# get Y

gradients=np.vstack(self.gradients)

rewards=np.vstack(self.rewards)

discounted_rewards=self.get_discounted_rewards(rewards)

gradients*=discounted_rewards

gradients=self.alpha*np.vstack([gradients])+self.probs

 

history=self.model.train_on_batch(states, gradients)

self.states, self.probs, self.gradients, self.rewards=[], [], [], []

 

return history

 

def train(self, episodes, rollout_n=1, render_n=50): '''train the model episodes - number of training iterations rollout_n- number of episodes between policy update render_n - number of episodes between env rendering '''

env=self.env

total_rewards=np.zeros(episodes)

 

for episode in range(episodes):

# each episode is a new game env

state=env.reset()

done=False

episode_reward=0 #record episode reward

while not done:

# play an action and record the game state & reward per episode

action, prob=self.get_action(state)

next_state, reward, done, _=env.step(action)

self.remember(state, action, prob, reward)

state=next_state

episode_reward+=reward

 

#if episode%render_n==0: ## render env to visualize.

#env.render()

if done:

# update policy

if episode%rollout_n==0:

history=self.update_policy()

 

total_rewards[episode]=episode_reward

if episode%10==0:

print(f"{episode} -> {episode_reward}")

self.total_rewards=total_rewards

 

r = REINFORCE(env)

 

r.train(200)

 

结果:

0 -> 39.0

10 -> 50.0

20 -> 18.0

30 -> 15.0

40 -> 16.0

50 -> 17.0

60 -> 20.0

70 -> 11.0

80 -> 20.0

90 -> 40.0

100 -> 36.0

110 -> 18.0

120 -> 79.0

130 -> 43.0

140 -> 41.0

150 -> 37.0

160 -> 95.0

170 -> 72.0

180 -> 138.0

190 -> 102.0

 

绘图:

plt.plot(r.total_rewards)

 

扩展学习:

  1. 训练山地车逃脱问题

  2. TensorFlow训练RL实现CartPole平衡实验

  3. CartPole滑冰问题

  4. PyTorch训练RL实现CartPole平衡实验

评论区 (0)

登录后参与评论

暂无评论,抢沙发吧!