Skip to content
Snippets Groups Projects
Commit 207b1458 authored by tuhe's avatar tuhe
Browse files

Week 13

parent a4f5118f
No related branches found
No related tags found
No related merge requests found
Showing
with 956 additions and 0 deletions
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 13."""
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np
import random
from collections import deque
from irlc import cache_read, cache_write
class BasicBuffer:
"""
The buffer class is used to keep track of past experience and sample it for learning.
"""
def __init__(self, max_size=2000):
"""
Creates a new (empty) buffer.
:param max_size: Maximum number of elements in the buffer. This should be a large number like 100'000.
"""
self.buffer = deque(maxlen=max_size)
def push(self, state, action, reward, next_state, done):
"""
Add information from a single step, :math:`(s_t, a_t, r_{t+1}, s_{t+1}, \\text{done})` to the buffer.
.. runblock:: pycon
>>> import gymnasium as gym
>>> from irlc.ex13.buffer import BasicBuffer
>>> env = gym.make("CartPole-v1")
>>> b = BasicBuffer()
>>> s, info = env.reset()
>>> a = env.action_space.sample()
>>> sp, r, done, _, info = env.step(a)
>>> b.push(s, a, r, sp, done)
>>> len(b) # Get number of elements in buffer
:param state: A state :math:`s_t`
:param action: Action taken :math:`a_t`
:param reward: Reward obtained :math:`r_{t+1}`
:param next_state: Next state transitioned to :math:`s_{t+1}`
:param done: ``True`` if the environment terminated else ``False``
:return: ``None``
"""
experience = (state, action, np.array([reward]), next_state, done)
self.buffer.append(experience)
def sample(self, batch_size):
"""
Sample ``batch_size`` elements from the buffer for use in training a deep Q-learning method.
The elements returned all be numpy ``ndarray`` where the first dimension is the batch dimension, i.e. of size
``batch_size``.
.. runblock:: pycon
>>> import gymnasium as gym
>>> from irlc.ex13.buffer import BasicBuffer
>>> env = gym.make("CartPole-v1")
>>> b = BasicBuffer()
>>> s, info = env.reset()
>>> a = env.action_space.sample()
>>> sp, r, done, _, _ = env.step(a)
>>> b.push(s, a, r, sp, done)
>>> S, A, R, SP, DONE = b.sample(batch_size=32)
>>> S.shape # Dimension batch_size x n
>>> R.shape # Dimension batch_size x 1
:param batch_size: Number of elements to sample
:return:
- S - Matrix of size ``batch_size x n`` of sampled states
- A - Matrix of size ``batch_size x n`` of sampled actions
- R - Matrix of size ``batch_size x n`` of sampled rewards
- SP - Matrix of size ``batch_size x n`` of sampled states transitioned to
- DONE - Matrix of size ``batch_size x 1`` of bools indicating if the environment terminated
"""
state_batch = []
action_batch = []
reward_batch = []
next_state_batch = []
done_batch = []
assert len(self.buffer) > 0, "The replay buffer must be non-empty in order to sample a batch: Use push()"
batch = random.choices(self.buffer, k=batch_size)
for state, action, reward, next_state, done in batch:
state_batch.append(state)
action_batch.append(action)
reward_batch.append(reward)
next_state_batch.append(next_state)
done_batch.append(done)
return map(lambda x: np.asarray(x), (state_batch, action_batch, reward_batch, next_state_batch, done_batch))
def __len__(self):
return len(self.buffer)
def save(self, path):
"""
Use this to save the content of the buffer to a file
:param path: Path where to save (use same argument with ``load``)
:return: ``None``
"""
cache_write(self.buffer, path)
def load(self, path):
"""
Use this to load buffer content from a file
:param path: Path to load from (use same argument with ``save``)
:return: ``None``
"""
self.buffer = cache_read(path)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
USE_KERAS = False # Toggle to use Keras/Pytorch
import gymnasium as gym
import numpy as np
import os
from matplotlib import pyplot as plt
from irlc.ex01.agent import train
from irlc.ex13.buffer import BasicBuffer
from irlc import cache_write, cache_read, cache_exists
from irlc.ex09.rl_agent import TabularAgent
from irlc.ex13.torch_networks import TorchNetwork as QNetwork # Torch network architechture
class DeepQAgent(TabularAgent):
def __init__(self, env, network=None, buffer=None, gamma=0.99, epsilon=None, alpha=0.001, batch_size=32,
replay_buffer_size=2000, replay_buffer_minreplay=500):
# Ensure 'epsilon' is a function to allow gradually decreasing exploration rate
epsilon = epsilon if callable(epsilon) else lambda steps, episodes: epsilon
super().__init__(env, gamma=gamma, epsilon=epsilon)
self.memory = BasicBuffer(replay_buffer_size) if buffer is None else buffer
"""
All the 'deep' stuff is handled by a seperate class. For instance
self.Q(s)
will return a [batch_size x actions] matrix of Q-values
"""
self.Q = network(env, trainable=True) if network else QNetwork(env, trainable=True, learning_rate=alpha)
self.batch_size = batch_size
self.replay_buffer_minreplay = replay_buffer_minreplay
self.steps, self.episodes = 0, 0
def pi(self, s, k, info_s=None):
eps_ = self.epsilon(self.steps, self.episodes) # get the learning rate
# return action by regular epsilon-greedy exploration
return self.env.action_space.sample() if np.random.rand() < eps_ else np.argmax(self.Q(s[np.newaxis,...]))
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
self.memory.push(s, a, r, sp, done) # save current observation
if len(self.memory) > self.replay_buffer_minreplay:
self.experience_replay() # do the actual training step
self.steps, self.episodes = self.steps + 1, self.episodes + done
def experience_replay(self):
"""
Perform the actual deep-Q learning step.
The actual learning is handled by calling self.Q.fit(s,target)
where s is defined as below (i.e. all states from the replay buffer)
and target is the desired value of self.Q(s).
Note that target must therefore be of size Batch x Actions. In other words fit minimize
|Q(s) - target|^2
which must implement the proper cost. This can be done by setting most entries of target equal to self.Q(s)
and the other equal to y, which is Q-learning target for Q(s,a). """
""" First we sample from replay buffer. Returns numpy Arrays of dimension
> [self.batch_size] x [...]]
for instance 'a' will be of dimension [self.batch_size x 1].
"""
s,a,r,sp,done = self.memory.sample(self.batch_size)
# TODO: 3 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
self.Q.fit(s, target)
def save(self, path): # allows us to save/load model
if not os.path.isdir(path):
os.makedirs(path)
self.Q.save(os.path.join(path, "Q"))
cache_write(dict(steps=self.steps, episodes=self.episodes), os.path.join(path, "agent.pkl"))
mpath = os.path.join(path, "memory.pkl")
import shutil
if os.path.isfile(mpath):
shutil.move(mpath, mpath +".backup") # shuffle file
self.memory.save(mpath)
def load(self, path): # allows us to save/load model
if not cache_exists(os.path.join(path, "agent.pkl")):
return False
for k, v in cache_read(os.path.join(path, "agent.pkl")).items():
self.__dict__[k] = v
self.Q.load(os.path.join(path, "Q"))
self.memory.load(os.path.join(path, "memory.pkl"))
return True
def __str__(self):
return f"basic_DQN{self.gamma}"
def linear_interp(maxval, minval, delay, miniter):
"""
Will return a function f(i) with the following signature:
f(i) = maxval for i < delay
f(i) = linear interpolate between max/minval until delay+miniter
f(i) = miniter for i > delay+miniter
"""
return lambda steps, episodes: min(max([maxval- ((steps-delay)/miniter)*(maxval-minval), minval]), maxval)
cartpole_dqn_options = dict(gamma=0.95, epsilon=linear_interp(maxval=1,minval=0.01,delay=300,miniter=5000),
replay_buffer_minreplay=300, replay_buffer_size=500000)
def mk_cartpole():
env = gym.make("CartPole-v1", max_episode_steps=200)
agent = DeepQAgent(env, **cartpole_dqn_options)
return env, agent
if __name__ == "__main__":
env_id = "CartPole-v1"
ex = f"experiments/cartpole_dqn"
num_episodes = 200 # We train for 200 episodes
# for j in range(10): # You can uncomment this to run the experiment 10 times.
env, agent = mk_cartpole()
train(env, agent, experiment_name=ex, num_episodes=num_episodes, max_runs=10)
from irlc import main_plot, savepdf
main_plot([ex], units="Unit", estimator=None, smoothing_window=None)
savepdf("cartpole_dqn")
plt.show()
""" Part 2: The following code showcase how to use the save/load method to store intermediate results
and resume training. Note you have to manually remove 'bad' runs otherwise it will resume where
it left off """
ex = f"experiments/cartpole_dqn_cache"
num_episodes = 20 # we train 20 just episodes at a time
for j in range(10): # train for a total of 200 episodes
env, agent = mk_cartpole()
"""
saveload_model=True means it will store and load intermediate results
i.e. we can resume training later. It will not be very useful for cartpole, but necesary for e.g.
the atari environment which can run for days
"""
agent.load(ex)
train(env, agent, experiment_name=ex, num_episodes=num_episodes, resume_stats=True) # Resume stat collection from last checkpoint.
agent.save(ex)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import gymnasium as gym
import numpy as np
import os
from irlc.ex13.deepq_agent import DeepQAgent
from matplotlib import pyplot as plt
from irlc.ex13.torch_networks import TorchNetwork as QNetwork # Torch network architechture
class DoubleQAgent(DeepQAgent):
def __init__(self, env, network=None, buffer=None, gamma=0.99, epsilon=0.2, alpha=0.001, tau=0.1, batch_size=32,
replay_buffer_size=2000, replay_buffer_minreplay=500):
super().__init__(env, network=network, buffer=buffer, gamma=gamma,epsilon=epsilon, alpha=alpha, batch_size=batch_size,
replay_buffer_size=replay_buffer_size, replay_buffer_minreplay=replay_buffer_minreplay)
# The target network play the role of q_{phi'} in the slides.
self.target = QNetwork(env, learning_rate=alpha, trainable=False) if network is None else network(env, learning_rate=alpha, trainable=False)
self.tau = tau # Rate at which the weights in the target network is updated (see slides)
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
self.memory.push(s, a, r, sp, done)
if len(self.memory) > self.replay_buffer_minreplay:
self.experience_replay()
# TODO: 1 lines missing.
raise NotImplementedError("update Phi here in the self.target network")
self.steps, self.episodes = self.steps + 1, self.episodes + done
def experience_replay(self):
r""" Update the double-Q method, i.e. make sure to select actions a' using self.Q
but evaluate the Q-values using the target network (see slides).
In other words,
> self.target(s)
is a Q-function network which evaluates
> q-hat_{\phi'}(s,:).
Asides this, the code will be nearly identical to the basic DQN agent """
s,a,r,sp,done = self.memory.sample(self.batch_size)
# TODO: 5 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
self.Q.fit(s, target=target)
def save(self, path):
super().save(path)
self.target.save(os.path.join(path, "Q_target")) # also save target network
def load(self, path):
loaded = super().load(path)
if loaded:
self.Q.load(os.path.join(path, "Q_target")) # also load target network
return loaded
def __str__(self):
return f"doubleDQN_{self.gamma}"
from irlc.ex13.deepq_agent import cartpole_dqn_options
cartpole_doubleq_options = {**cartpole_dqn_options, 'tau': 0.08}
def mk_cartpole():
env = gym.make("CartPole-v1", max_episode_steps=200)
agent = DoubleQAgent(env, **cartpole_doubleq_options)
return env, agent
if __name__ == "__main__":
from irlc import main_plot, savepdf
env_id = "CartPole-v1"
MAX_EPISODES = 200
for j in range(20):
env, agent = mk_cartpole()
from irlc.ex01.agent import train
ex = f"experiments/cartpole_double_dqn"
train(env, agent, experiment_name=ex, num_episodes=MAX_EPISODES, max_runs=10)
main_plot([f"experiments/cartpole_dqn", ex], smoothing_window=None)
savepdf("cartpole_double_dqn")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
class DQNNetwork:
"""
A class representing a deep Q network.
Note that this function is batched. I.e. ``s`` is assumed to be a numpy array of dimension ``batch_size x n``
The following example shows how you can evaluate the Q-values in a given state. An example:
.. runblock:: pycon
>>> from irlc.ex13.torch_networks import TorchNetwork
>>> import gymnasium as gym
>>> import numpy as np
>>> env = gym.make("CartPole-v1")
>>> Q = TorchNetwork(env, trainable=True, learning_rate=0.001) # DQN network requires an env to set network dimensions
>>> batch_size = 32 # As an example
>>> states = np.random.rand(batch_size, env.observation_space.shape[0]) # Creates some dummy input
>>> states.shape # batch_size x n
>>> qvals = Q(states) # Evaluate Q(s,a)
>>> qvals.shape # This is a tensor of dimension batch_size x actions
>>> print(qvals[0,1]) # Get Q(s_0, 1)
>>> Y = np.random.rand(batch_size, env.action_space.n) # Generate target Q-values (training data)
>>> Q.fit(states, Y) # Train the Q-network for 1 gradient descent step
"""
def update_Phi(self, source, tau=0.01):
r"""
Update (adapts) the weights in this network towards those in source by a small amount.
For each weight :math:`w_i` in (this) network, and each corresponding weight :math:`w'_i` in the ``source`` network,
the following Polyak update is performed:
.. math::
w_i \leftarrow w_i + \tau (w'_i - w_i)
:param source: Target network to update towards
:param tau: Update rate (rate of change :math:`\\tau`
:return: ``None``
"""
raise NotImplementedError
def __call__(self, s):
"""
Evaluate the Q-values in the given (batched) state.
:param s: A matrix of size ``batch_size x n`` where :math:`n` is the state dimension.
:return: The Q-values as a ``batch_size x d`` dimensional matrix where :math:`d` is the number of actions.
"""
raise NotImplementedError
def fit(self, s, target):
r"""
Fit the network weights by minimizing
.. math::
\frac{1}{B}\sum_{i=1}^B \sum_{a=1}^K \| q_\phi(s_i)_a - y_{i,a} \|^2
where ``target`` corresponds to :math:`y` and is a ``[batch_size x actions]`` matrix of target Q-values.
:param s:
:param target:
:return:
"""
raise NotImplementedError
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import gymnasium as gym
import matplotlib.pyplot as plt
from irlc import main_plot, savepdf
from irlc.ex01.agent import train
from irlc.ex13.double_deepq_agent import DoubleQAgent
from irlc.ex13.torch_networks import TorchDuelNetwork as DuelNetwork
from irlc.ex13.buffer import BasicBuffer
from irlc.ex13.double_deepq_agent import cartpole_doubleq_options
class DuelQAgent(DoubleQAgent):
def __init__(self, env, network=None, buffer=None, gamma=0.99, epsilon=None, alpha=0.001, tau=0.1, batch_size=32,
replay_buffer_size=2000, replay_buffer_minreplay=500):
network = DuelNetwork if network is None else network # Only relevant change
buffer = buffer if buffer is not None else BasicBuffer(max_size=500000)
super().__init__(env, network=network, buffer=buffer, gamma=gamma,epsilon=epsilon, alpha=alpha, tau=tau,batch_size=batch_size,
replay_buffer_size=replay_buffer_size, replay_buffer_minreplay=replay_buffer_minreplay)
self.target.update_Phi(self.Q)
def __str__(self):
return f"DuelQ_{self.gamma}"
def mk_cartpole():
env = gym.make("CartPole-v1", max_episode_steps=200)
agent = DuelQAgent(env, **cartpole_doubleq_options)
return env, agent
if __name__ == "__main__":
for _ in range(10): # Train 10 times.
env,agent = mk_cartpole()
ex = f"experiments/cartpole_duel_dqn"
train(env, agent, experiment_name=ex, num_episodes=200)
plt.close()
main_plot([f"experiments/cartpole_dqn", f"experiments/cartpole_double_dqn", ex], smoothing_window=None)
savepdf("cartpole_duel_dqn")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import numpy as np
from irlc.ex01.agent import train
import gymnasium as gym
from irlc import main_plot
import matplotlib.pyplot as plt
from irlc import savepdf
from irlc.ex11.sarsa_agent import SarsaAgent
from irlc.ex11.q_agent import QAgent
from irlc.ex12.sarsa_lambda_agent import SarsaLambdaAgent
from irlc.ex13.maze_dyna_environment import MazeEnvironment
class DynaQ(QAgent):
r"""
Implement the tabular dyna-Q agent (SB18, Section 8.7).
"""
def __init__(self, env, gamma=1.0, alpha=0.5, epsilon=0.1, n=5):
super().__init__(env, gamma, alpha=alpha, epsilon=epsilon)
"""
Model is a list of experience, i.e. of the form
Model = [ (s_t, a_t, r_{t+1}, s_{t+1}, done_t), ...]
"""
self.Model = []
self.n = n # number of planning steps
def q_update(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
"""
Update the Q-function self.Q[s,a] as in regular Q-learning
"""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
self.q_update(s,a,r,sp,done, info_s, info_sp)
self.Model.append( (s,a, r,sp, done))
for _ in range(self.n):
""" Obtain a random transition from the replay buffer. You can use np.random.randint
then call self.q_update on the random sample. """
# TODO: 2 lines missing.
raise NotImplementedError("Implement function body")
def __str__(self):
return f"DynaQ_{self.gamma}_{self.epsilon}_{self.alpha}_{self.n}"
def dyna_experiment(env, env_name='maze',num_episodes=50,epsilon=0.1, alpha=0.1, gamma=.95, runs=2):
for _ in range(runs): # Increase runs for nicer error bars
agents = [QAgent(env, epsilon=epsilon, alpha=alpha,gamma=gamma),
SarsaAgent(env, epsilon=epsilon, alpha=alpha, gamma=gamma),
SarsaLambdaAgent(env, epsilon=epsilon, alpha=alpha, gamma=gamma,lamb=0.9),
DynaQ(env, epsilon=epsilon, alpha=alpha,gamma=gamma,n=5),
DynaQ(env, epsilon=epsilon, alpha=alpha,gamma=gamma, n=50),
]
experiments = []
for agent in agents:
expn = f"experiments/b{env_name}_{str(agent)}"
train(env, agent, expn, num_episodes=num_episodes, max_runs=100)
experiments.append(expn)
return experiments
if __name__ == "__main__":
from irlc.ex09.mdp import MDP2GymEnv
""" The maze-environment is created as an MDP, and we then convert it to a Gym environment.
Alternatively, use the irlc.gridworld.gridworld_environments.py - method to specify the layout as in the other gridworld examples. """
env = MDP2GymEnv(MazeEnvironment())
experiments = dyna_experiment(env, env_name='maze',num_episodes=50,epsilon=0.1, alpha=0.1, gamma=.95, runs=4)
main_plot(experiments, smoothing_window=None, y_key="Length")
plt.ylim([0, 500])
plt.title("Dyna Q on simple Maze (Figure 8.2)")
savepdf("dynaq_maze_8_2")
plt.show()
# Part 2: Cliffwalking as reference.
env = gym.make('CliffWalking-v0')
gamma, alpha, epsilon = 1, 0.5, 0.1
# Call the dyna_experiment(...) function here similar to the previous call but using new parameters.
# TODO: 1 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
main_plot(experiments, smoothing_window=5)
plt.ylim([-150, 0])
plt.title("Dyna-Q learning on " + env.spec.name)
savepdf("dyna_cliff")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import numpy as np
from irlc.ex01.agent import train
from irlc import main_plot
import matplotlib.pyplot as plt
from irlc.ex09.mdp import MDP, MDP2GymEnv
from irlc import savepdf
from irlc.ex11.sarsa_agent import SarsaAgent
from irlc.ex11.q_agent import QAgent
from irlc.ex13.tabular_double_q import TabularDoubleQ
class MaximizationBiasEnvironment(MDP):
"""
The Maximization Bias yafcport from (SB18, Example 6.7).
For easy implementation, we fix the number of transitions from state B to terminal state to
normal_transitions. The code ensure they still have average reward 0.1, i.e. no action will be preferred.
there are B_actions possible actions from state B in this yafcport (the number is not given in the yafcport).
"""
def __init__(self, B_actions=10, normal_transitions=100, **kwargs):
self.state_A = 0
self.state_B = 1
self.LEFT = 0
self.RIGHT = 1
self.B_actions = B_actions
self.n_transitions = normal_transitions
super().__init__(initial_state=self.state_A, **kwargs)
def is_terminal(self, state):
return state == 2
def A(self, s):
# define the actions pace
if s == self.state_A:
return [self.LEFT, self.RIGHT]
elif s == self.state_B: # in state B
return [n for n in range(self.B_actions)]
else:
return [0] # terminal; return a dummy action 0 which does nothing (some code is sensitive to empty action spaces)
def Psr(self, s, a):
t = 2 # terminal state
if s == self.state_A:
if a == self.RIGHT:
# TODO: 1 lines missing.
raise NotImplementedError("Implement what the environment does in state A with a RIGHT action")
else:
# TODO: 1 lines missing.
raise NotImplementedError("Implement what the environment does in state A with a LEFT action")
else: # s is in state B
p = 1/self.n_transitions # transition probability
rewards = [np.random.randn() for _ in range(self.n_transitions)]
rewards = [r - np.mean(rewards)-0.1 for r in rewards]
return { (t, r): p for r in rewards}
if __name__ == "__main__":
"""
The Maximization Bias from (SB18, Example 6.7).
I have fixed the number of "junk" actions in state B to 10, but it can easily be changed
in the environment.
I don't have an easy way to get the number of 'left'-actions, so instead i plot
the trajectory length: it is 1 for a right action, and 2 for a left.
"""
env = MDP2GymEnv(MaximizationBiasEnvironment())
for _ in range(100):
epsilon = 0.1
alpha = 0.1
gamma = 1
agents = [QAgent(env, epsilon=epsilon, alpha=alpha),
SarsaAgent(env, epsilon=epsilon, alpha=alpha),
TabularDoubleQ(env, epsilon=epsilon, alpha=alpha)]
experiments = []
for agent in agents:
expn = f"experiments/bias_{str(agent)}"
train(env, agent, expn, num_episodes=300, max_runs=100)
experiments.append(expn)
main_plot(experiments, smoothing_window=10, y_key="Length")
plt.ylim([1, 2])
plt.title("Double-Q learning on Maximization-Bias ex. (Figure 6.5)")
savepdf("maximization_bias_6_5")
plt.show()
main_plot(experiments, smoothing_window=10)
savepdf("maximization_bias_6_5_reward")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
The DynaQ Maze environment.
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
from irlc.ex09.mdp import MDP
class MazeEnvironment(MDP):
r"""
The Maze environment from (SB18, Example 8.1)
"""
def __init__(self, **kwargs):
self.maze_ = HiddenMaze()
super().__init__(initial_state=tuple(self.maze_.START_STATE), **kwargs)
def is_terminal(self, state):
return state == tuple(self.maze_.GOAL_STATES[0])
def A(self, s):
return self.maze_.actions
def Psr(self, s, a):
xy, r = self.maze_.step(list(s), a)
return { (tuple(xy), r): 1 }
# A wrapper class for a maze, containing all the information about the maze.
# Basically it's initialized to DynaMaze by default, however it can be easily adapted
# to other maze
class HiddenMaze:
def __init__(self):
# maze width
self.WORLD_WIDTH = 9
# maze height
self.WORLD_HEIGHT = 6
# all possible actions
self.ACTION_UP = 0
self.ACTION_DOWN = 1
self.ACTION_LEFT = 2
self.ACTION_RIGHT = 3
self.actions = [self.ACTION_UP, self.ACTION_DOWN, self.ACTION_LEFT, self.ACTION_RIGHT]
# start state
self.START_STATE = [2, 0]
# goal state
self.GOAL_STATES = [[0, 8]]
# all obstacles
self.obstacles = [[1, 2], [2, 2], [3, 2], [0, 7], [1, 7], [2, 7], [4, 5]]
self.old_obstacles = None
self.new_obstacles = None
# time to change obstacles
self.obstacle_switch_time = None
# initial state action pair values
# self.stateActionValues = np.zeros((self.WORLD_HEIGHT, self.WORLD_WIDTH, len(self.actions)))
# the size of q value
self.q_size = (self.WORLD_HEIGHT, self.WORLD_WIDTH, len(self.actions))
# max steps
self.max_steps = float('inf')
# track the resolution for this maze
self.resolution = 1
# extend a state to a higher resolution maze
# @state: state in lower resoultion maze
# @factor: extension factor, one state will become factor^2 states after extension
def extend_state(self, state, factor):
new_state = [state[0] * factor, state[1] * factor]
new_states = []
for i in range(0, factor):
for j in range(0, factor):
new_states.append([new_state[0] + i, new_state[1] + j])
return new_states
# extend a state into higher resolution
# one state in original maze will become @factor^2 states in @return new maze
def extend_maze(self, factor):
new_maze = HiddenMaze()
new_maze.WORLD_WIDTH = self.WORLD_WIDTH * factor
new_maze.WORLD_HEIGHT = self.WORLD_HEIGHT * factor
new_maze.START_STATE = [self.START_STATE[0] * factor, self.START_STATE[1] * factor]
new_maze.GOAL_STATES = self.extend_state(self.GOAL_STATES[0], factor)
new_maze.obstacles = []
for state in self.obstacles:
new_maze.obstacles.extend(self.extend_state(state, factor))
new_maze.q_size = (new_maze.WORLD_HEIGHT, new_maze.WORLD_WIDTH, len(new_maze.actions))
# new_maze.stateActionValues = np.zeros((new_maze.WORLD_HEIGHT, new_maze.WORLD_WIDTH, len(new_maze.actions)))
new_maze.resolution = factor
return new_maze
# take @action in @state
# @return: [new state, reward]
def step(self, state, action):
x, y = state
if action == self.ACTION_UP:
x = max(x - 1, 0)
elif action == self.ACTION_DOWN:
x = min(x + 1, self.WORLD_HEIGHT - 1)
elif action == self.ACTION_LEFT:
y = max(y - 1, 0)
elif action == self.ACTION_RIGHT:
y = min(y + 1, self.WORLD_WIDTH - 1)
if [x, y] in self.obstacles:
x, y = state
if [x, y] in self.GOAL_STATES:
reward = 1.0
else:
reward = 0.0
return [x, y], reward
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import numpy as np
from irlc.ex01.agent import train
import gymnasium as gym
from irlc import main_plot
import matplotlib.pyplot as plt
from irlc import savepdf
from irlc.ex11.sarsa_agent import SarsaAgent
from irlc.ex11.q_agent import QAgent
from irlc import Agent
class TabularDoubleQ(QAgent):
r"""
Implement the tabular version of the double-Q learning agent from
(SB18, Section 6.7).
Note we will copy the Q-datastructure from the Agent class.
"""
def __init__(self, env, gamma=1.0, alpha=0.5, epsilon=0.1):
super().__init__(env, gamma, epsilon)
self.alpha = alpha
# The two Q-value functions. These are of the same type as the regular self.Q function
from irlc.ex09.rl_agent import TabularQ
self.Q1 = TabularQ(env)
self.Q2 = TabularQ(env)
self.Q = None # remove self.Q (we will not use it in double Q)
def pi(self, s, k, info=None):
"""
Implement the epsilon-greedy action. The implementation is nearly identical to pi_eps in the Agent class
which can be used for inspiration, however we should use Q1+Q2 as the Q-value.
"""
a1, Q1 = self.Q1.get_Qs(s, info)
a2, Q2 = self.Q2.get_Qs(s, info)
Q = np.asarray(Q1) + np.asarray(Q2)
# TODO: 1 lines missing.
raise NotImplementedError("Return epsilon-greedy action using Q")
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
"""
Implement the double-Q learning rule, i.e. with probability np.random.rand() < 0.5 switch
the role of the two Q networks Q1 and Q2. Use the code for the regular Q-agent as inspiration.
"""
# TODO: 4 lines missing.
raise NotImplementedError("Implement function body")
def __str__(self):
return f"TabularDoubleQ_{self.gamma}_{self.epsilon}_{self.alpha}"
if __name__ == "__main__":
""" Part 1: Cliffwalking """
env = gym.make('CliffWalking-v0')
epsilon = 0.1
alpha = 0.25
gamma = 1.0
for _ in range(20):
agents = [QAgent(env, gamma=1, epsilon=epsilon, alpha=alpha),
SarsaAgent(env, gamma=1, epsilon=epsilon, alpha=alpha),
TabularDoubleQ(env, gamma=1, epsilon=epsilon, alpha=alpha)]
experiments = []
for agent in agents:
expn = f"experiments/doubleq_cliffwalk_{str(agent)}"
train(env, agent, expn, num_episodes=500, max_runs=20)
experiments.append(expn)
main_plot(experiments, smoothing_window=10)
plt.ylim([-100, 0])
plt.title("Double-Q learning on " + env.spec.name)
savepdf("double_Q_learning_cliff")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
# Use GPU; If the drivers give you grief you can turn GPU off without a too big hit on performance in the cartpole task
USE_CUDA = torch.cuda.is_available()
USE_CUDA = False # No, we use CPU.
Variable = lambda *args, **kwargs: autograd.Variable(*args, **kwargs).cuda() if USE_CUDA else autograd.Variable(*args, **kwargs)
from irlc.ex13.dqn_network import DQNNetwork
class TorchNetwork(nn.Module,DQNNetwork):
def __init__(self, env, trainable=True, learning_rate=0.001, hidden=30):
nn.Module.__init__(self)
DQNNetwork.__init__(self)
self.env = env
self.hidden = hidden
self.actions = env.action_space.n
self.build_model_()
if trainable:
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
if USE_CUDA:
self.cuda()
def build_feature_network(self):
num_observations = np.prod(self.env.observation_space.shape)
return (nn.Linear(num_observations, self.hidden),
nn.ReLU(),
nn.Linear(self.hidden, self.hidden),
nn.ReLU())
def build_model_(self):
num_actions = self.env.action_space.n
self.model = nn.Sequential(*self.build_feature_network(), nn.Linear(self.hidden,num_actions))
def forward(self, s):
s = Variable(torch.FloatTensor(s))
s = self.model(s)
return s
def __call__(self, s):
return self.forward(s).detach().numpy()
def fit(self, s, target):
q_value = self.forward(s)
loss = (q_value - torch.FloatTensor(target).detach()).pow(2).sum(axis=1).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_Phi(self, source, tau=1):
"""
Polyak adapt weights of this class given source:
I.e. tau=1 means adopt weights in one step,
tau = 0.001 means adopt very slowly, tau=1 means instant overwriting
"""
state = self.state_dict()
for k, wa in state.items():
wb = source.state_dict()[k]
state[k] = wa*(1 - tau) + wb * tau
self.load_state_dict(state)
def save(self, path):
if not os.path.exists(os.path.dirname(path)):
os.mkdir(os.path.dirname(path))
torch.save(self.state_dict(), path+".torchsave")
def load(self, path):
self.load_state_dict(torch.load(path+".torchsave"))
self.eval() # set batch norm layers, dropout, other stuff we don't use
class TorchDuelNetwork(TorchNetwork):
def build_model_(self):
self.feature = nn.Sequential(*self.build_feature_network())
self.advantage = nn.Sequential(nn.Linear(self.hidden, self.hidden),
nn.ReLU(),
nn.Linear(self.hidden, self.actions))
self.value = nn.Sequential(nn.Linear(self.hidden, self.hidden),
nn.ReLU(),
nn.Linear(self.hidden, 1))
def forward(self, s):
"""
Return tensor corresponding to Q-values when using dueling Q-networks (see exercise description)
"""
# TODO: 4 lines missing.
raise NotImplementedError("Implement function body")
return value + advantage - advantage.mean()
class TorchDuelNetworkAtari(TorchNetwork):
def build_feature_network(self):
hidden_size = 256
in_channels = self.env.observation_space.shape[-1]
num_actions = self.env.action_space.n
return (nn.Conv2d(in_channels, 32, kernel_size=8, stride=4),
nn.BatchNorm2d(32),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.BatchNorm2d(64),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.BatchNorm2d(64),
nn.Linear(7 * 7 * 64, hidden_size), # has to be adjusted for other resolutionz
nn.Linear(hidden_size, num_actions) )
if __name__ == "__main__":
a = 234
import gymnasium as gym
env = gym.make("CartPole-v0")
Q = DQNNetwork(env, trainable=True, learning_rate=0.001)
# self.Q = Network(env, trainable=True) # initialize the network
""" Assuming s has dimension [batch_dim x d] this returns a float numpy Array
array of Q-values of [batch_dim x actions], such that qvals[i,a] = Q(s_i,a) """
batch_size = 32 # As an example
# Creates some dummy input
states = [env.reset()[0] for _ in range(batch_size)]
states.shape # batch_size x n
qvals = Q(states)
qvals.shape # This is a tensor of dimension batch_size x actions
print(qvals[0,1]) # Get Q(s_0, 1)
Y = np.random.rand( (batch_size, 1)) # Generate target Q-values (training data)
Q.fit(states, Y) # Train the Q-network.
# Q = TorchNetwork()
for i, alpha in enumerate(alphas):
n = n_steps[i]
agent = LinearSemiGradSarsaN(env, gamma=1, alpha=alpha / num_of_tilings, epsilon=0, n=n)
experiment = f"experiments/mountaincar_10-2_{agent}_{episodes}"
train(env, agent, experiment_name=experiment, num_episodes=episodes, max_runs=max_runs)
experiments.append(experiment)
agent = LinearSemiGradSarsaLambda(env, gamma=1, alpha=alphas[1]/num_of_tilings, epsilon=0, lamb=0.9)
experiment = f"experiments/mountaincar_10-2_{agent}_{episodes}"
train(env, agent, experiment_name=experiment, num_episodes=episodes, max_runs=max_runs)
experiments.append(experiment)
agent = LinearSemiGradQAgent(env, gamma=1, alpha=alphas[1] / num_of_tilings, epsilon=0)
experiment = f"experiments/mountaincar_10-2_{agent}_{episodes}"
train(env, agent, experiment_name=experiment, num_episodes=episodes, max_runs=max_runs)
experiments.append(experiment)
\ No newline at end of file
a_prime = self.pi_eps(sp, info_sp) if not done else -1
\ No newline at end of file
delta = r + self.gamma * (self.Q[sp,a_prime] if not done else 0) - self.Q[s,a]
\ No newline at end of file
self.e[(s,a)] += 1
\ No newline at end of file
self.Q[s,a] += self.alpha * delta * ee
self.e[(s,a)] = self.gamma * self.lamb * ee
\ No newline at end of file
return self.Q(s, a)
\ No newline at end of file
self.Q.w += self.alpha * delta * self.Q.x(s,a) # Update q(s,a)/weights given change in q-values: delta = [G-\hat{q}(..)]
\ No newline at end of file
Q = self.Q.w @ self.x
Q_prime = self.Q.w @ x_prime if not done else None
delta = r + (self.gamma * Q_prime if not done else 0) - Q
self.z = self.gamma * self.lamb * self.z + (1-self.alpha * self.gamma * self.lamb *self.z @ self.x) * self.x
self.Q.w += self.alpha * (delta + Q - self.Q_old) * self.z - self.alpha * (Q-self.Q_old) * self.x
\ No newline at end of file
y = r[:,0] + self.gamma * np.max(self.Q(sp), axis=1) * (1-done)
target = self.Q(s)
target[range(len(a)), a] = y
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment