Skip to content
Snippets Groups Projects
Commit 13d8f54b authored by tuhe's avatar tuhe
Browse files

Exercises for week 12+13

parent 112efef9
No related branches found
No related tags found
No related merge requests found
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 12."""
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.utils.common import log_time_series
from irlc.ex10 import envs
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from irlc import savepdf
from irlc.ex01.agent import train
from irlc.ex12.semi_grad_nstep_sarsa import LinearSemiGradSarsaN
import gymnasium as gym
from irlc import main_plot
from irlc.ex12.semi_grad_sarsa_lambda import LinearSemiGradSarsa
# Helper function for plotting the value functions.
def plot_surface_2(X,Y,Z,fig=None, ax=None, **kwargs):
if fig is None and ax is None:
fig = plt.figure(figsize=(20, 10))
if ax is None:
ax = fig.add_subplot(projection='3d')
surf = ax.plot_surface(X, Y, Z, cmap=plt.cm.coolwarm, linewidth=1, edgecolors='k', **kwargs)
ax.view_init(ax.elev, -120)
if fig is not None:
fig.colorbar(surf, shrink=0.5, aspect=5)
return ax
def plot_mountaincar_value_function(env, value_function, ax):
"""
3d plot
"""
grid_size = 40
low = env.unwrapped.observation_space.low
high = env.unwrapped.observation_space.high
X,Y = np.meshgrid( np.linspace(low[0], high[0], grid_size), np.linspace(low[1], high[1], grid_size) )
Z = X*0
for i, (x,y) in enumerate(zip(X.flat, Y.flat)):
Z.flat[i] = value_function( (x,y) )
plot_surface_2(X,Y,Z,ax=ax)
ax.set_xlabel('Position')
ax.set_ylabel('Velocity')
ax.set_zlabel('Cost to go')
def figure_10_1():
episodes = 9000
plot_episodes = [1, 99, episodes - 1]
scale = 8
fig = plt.figure(figsize=(4*scale, scale))
axes = [fig.add_subplot(1, len(plot_episodes), i+1, projection='3d') for i in range(len(plot_episodes))]
num_of_tilings = 8
alpha = 0.3
env = gym.make("MountainCar-v0")
agent = LinearSemiGradSarsa(env, gamma=1, alpha=alpha/num_of_tilings, epsilon=0)
for ep in tqdm(range(episodes)):
train(env, agent, num_episodes=1, max_steps=np.inf, verbose=False)
if ep in plot_episodes:
v = lambda s: -max(agent.Q.get_Qs(s)[1])
ax = axes[plot_episodes.index(ep)]
plot_mountaincar_value_function(env, v, ax=ax)
ax.set_title(f'Episode {ep+1}')
from irlc import savepdf
savepdf("semigrad_sarsa_10-1")
plt.show()
def figure_10_2():
episodes = 500
num_of_tilings = 8
alphas = [0.1, 0.2, 0.5]
env = gym.make("MountainCar500-v0")
experiments = []
for alpha in alphas:
agent = LinearSemiGradSarsa(env, gamma=1, alpha=alpha / num_of_tilings, epsilon=0)
experiment = f"experiments/mountaincar_10-2_{agent}_{episodes}"
train(env, agent, experiment_name=experiment, num_episodes=episodes,max_runs=10)
experiments.append(experiment)
main_plot(experiments=experiments, y_key="Length")
plt.xlabel('Episode')
plt.ylabel('Steps per episode')
plt.title(env.spec.name + " - Semigrad Sarsa - Figure 10.2")
savepdf("mountaincar_10-2")
plt.show()
def figure_10_3():
from irlc.ex12.semi_grad_sarsa_lambda import LinearSemiGradSarsaLambda
from irlc.ex11.semi_grad_q import LinearSemiGradQAgent
max_runs = 10
episodes = 500
num_of_tilings = 8
alphas = [0.5, 0.3]
n_steps = [1, 8]
env = gym.make("MountainCar500-v0")
experiments = []
""" Plot results of experiments here. """
# TODO: 16 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
main_plot(experiments=experiments, y_key="Length")
plt.xlabel('Episode')
plt.ylabel('Steps per episode')
plt.title(env.spec.name + " - Semigrad N-step Sarsa - Figure 10.3")
savepdf("mountaincar_10-3")
plt.show()
def figure_10_4():
alphas = np.arange(0.25, 1.75, 0.25)
n_steps = np.power(2, np.arange(0, 5))
episodes = 50
env = gym.make("MountainCar500-v0")
experiments = []
num_of_tilings = 8
max_asteps = 500
run = True
for n_step_index, n_step in enumerate(n_steps):
aexp = []
did_run = False
for alpha_index, alpha in enumerate(alphas):
if not run:
continue
if (n_step == 8 and alpha > 1) or (n_step == 16 and alpha > 0.75):
# In these cases it won't converge, so ignore them
asteps = max_asteps #max_steps * episodes
else:
n = n_step
agent = LinearSemiGradSarsaN(env, gamma=1, alpha=alpha / num_of_tilings, epsilon=0, n=n)
_, stats, _ = train(env, agent, num_episodes=episodes)
asteps = np.mean( [s['Length'] for s in stats] )
did_run = did_run or stats is not None
aexp.append({'alpha': alpha, 'average_steps': asteps})
experiment = f"experiments/mc_10-4_lsgn_{n_step}"
experiments.append(experiment)
if did_run:
log_time_series(experiment, aexp)
main_plot(experiments, x_key="alpha", y_key="average_steps", ci=None)
plt.xlabel('alpha')
plt.ylabel('Steps per episode')
plt.title("Figure 10.4: Semigrad n-step Sarsa on mountain car")
plt.ylim([150, 300])
savepdf("mountaincar_10-4")
plt.show()
if __name__ == '__main__':
figure_10_1()
figure_10_2()
figure_10_3()
figure_10_4()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from collections import defaultdict
import gymnasium as gym
from irlc.ex01.agent import train
from irlc import main_plot, savepdf
import matplotlib.pyplot as plt
from irlc.ex11.sarsa_agent import SarsaAgent
class SarsaLambdaAgent(SarsaAgent):
def __init__(self, env, gamma=0.99, epsilon=0.1, alpha=0.5, lamb=0.9):
"""
Implementation of Sarsa(Lambda) in the tabular version, see
http://incompleteideas.net/book/first/ebook/node77.html
for details. Remember to reset the
eligibility trace E after each episode, i.e. set E(s,a) = 0.
Note 'lamb' is an abbreveation of lambda, because lambda is a reserved keyword in python.
The constructor initializes e, the eligibility trace. Since we want to easily be able to find the non-zero
elements it will be convenient to use a dictionary. I.e.
self.e[(s,a)] is the eligibility trace e(s,a) (or E(s,a) if you prefer).
Note that Sarsa(Lambda) generalize Sarsa. This means that we again must generate the next action A' from S' in the train method and
store it for when we take actions in the policy method pi. I.e. we can re-use the Sarsa Agents code for the policy (self.pi).
"""
super().__init__(env, gamma=gamma, alpha=alpha, epsilon=epsilon)
self.lamb = lamb
# We use a dictionary to store the eligibility trace. It can be indexed as self.e[s,a].
self.e = defaultdict(float)
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
# TODO: 1 lines missing.
raise NotImplementedError("a_prime = ... (get action for S'=sp using self.pi_eps; see Sarsa)")
# TODO: 1 lines missing.
raise NotImplementedError("delta = ... (The ordinary Sarsa learning signal)")
# TODO: 1 lines missing.
raise NotImplementedError("Update the eligibility trace e(s,a) += 1")
for (s,a), ee in self.e.items():
# TODO: 2 lines missing.
raise NotImplementedError("Update Q values and eligibility trace")
if done: # Clear eligibility trace after each episode and update variables for Sarsa
self.e.clear()
else:
self.a = a_prime
def __str__(self):
return f"SarsaLambda_{self.gamma}_{self.epsilon}_{self.alpha}_{self.lamb}"
if __name__ == "__main__":
envn = 'CliffWalking-v0'
env = gym.make(envn)
alpha =0.05
sarsaLagent = SarsaLambdaAgent(env,gamma=0.99, epsilon=0.1, alpha=alpha, lamb=0.9)
sarsa = SarsaAgent(env,gamma=0.99,alpha=alpha,epsilon=0.1)
methods = [("SarsaL", sarsaLagent), ("Sarsa", sarsa)]
experiments = []
for k, (name,agent) in enumerate(methods):
expn = f"experiments/{envn}_{name}"
train(env, agent, expn, num_episodes=500, max_runs=10)
experiments.append(expn)
main_plot(experiments, smoothing_window=10, resample_ticks=200)
plt.ylim([-100, 0])
savepdf("cliff_sarsa_lambda")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex12.sarsa_lambda_agent import SarsaLambdaAgent
from irlc.gridworld.gridworld_environments import OpenGridEnvironment
from irlc import train, interactive
def keyboard_play(Agent, method_label='MC', num_episodes=1000, alpha=0.5, autoplay=False, **args):
print("Evaluating", Agent, "on the open gridworld environment.")
print("Press p to follow the agents policy or use the keyboard to input actions")
print("(Please be aware that Sarsa, N-step Sarsa, and Sarsa(Lambda) do not always make the right updates when you input actions with the keyboard)")
env = OpenGridEnvironment(render_mode='human', frames_per_second=10)
try:
agent = Agent(env, gamma=0.99, epsilon=0.1, alpha=alpha, **args)
except Exception as e: # If it is a value agent without the epsilon.
agent = Agent(env, gamma=0.99, alpha=alpha, **args)
env, agent = interactive(env, agent, autoplay=autoplay)
train(env, agent, num_episodes=num_episodes)
env.close()
if __name__ == "__main__":
"""
Example: Play a three episodes and save a snapshot of the Q-values as a .pdf
"""
env = OpenGridEnvironment(render_mode='human')
agent = SarsaLambdaAgent(env, gamma=0.99, epsilon=0.1, alpha=.5)
env, agent = interactive(env, agent, autoplay=True)
train(env, agent, num_episodes=3)
from irlc import savepdf
savepdf("sarsa_lambda_opengrid", env=env)
env.close()
""" Example: Keyboard play
You can input actions manually with the keyboard, but the Q-values are not necessarily updates correctly in this mode. Can you tell why?
You can let the agent play by pressing `p`, in which case the Q-values will be updated correctly. """
keyboard_play(SarsaLambdaAgent, method_label="Sarsa(Lambda)", lamb=0.8)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex01.agent import train
import gymnasium as gym
from irlc.ex11.semi_grad_sarsa import LinearSemiGradSarsa
from irlc.ex11.nstep_sarsa_agent import SarsaNAgent
class LinearSemiGradSarsaN(SarsaNAgent, LinearSemiGradSarsa):
def __init__(self, env, gamma=0.99, alpha=0.5, epsilon=0.1, q_encoder=None, n=1):
"""
Note you can access the super-classes as:
>> SarsaNAgent.pi(self, s) # Call the pi(s) as implemented in SarsaNAgent
Alternatively, just inherit from Agent and set up data structure as required.
"""
SarsaNAgent.__init__(self, env, gamma, alpha=alpha, epsilon=epsilon, n=n)
LinearSemiGradSarsa.__init__(self, env, gamma, alpha=alpha, epsilon=epsilon, q_encoder=q_encoder)
def pi(self, s, k, info=None):
return SarsaNAgent.pi(self, s, k, info)
def _q(self, s, a):
"""
Return Q(s,a) using the linear function approximator with weights self.w; i.e. use self.q
"""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def _upd_q(self, s, a, delta):
"""
Update the weight-vector w using the appropriate rule (see exercise description). I.e. the update
should be of the form
self.w += self.alpha * delta * (gradient of Q(s,a;w)
where
delta = (G^n - Q(s,a;w)
"""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def __str__(self):
return f"LinSemiGradSarsaN{self.gamma}_{self.epsilon}_{self.alpha}_{self.n}"
experiment_nsarsa = "experiments/mountaincar_SarsaN"
if __name__ == "__main__":
from irlc.ex12.semi_grad_sarsa_lambda import alpha, plot_including_week10, experiment_sarsaL, episodes
import irlc.ex10.envs
env = gym.make("MountainCar500-v0")
for _ in range(10):
agent = LinearSemiGradSarsaN(env, gamma=1, alpha=alpha, epsilon=0, n=4)
train(env, agent, experiment_nsarsa, num_episodes=episodes, max_runs=10)
# plot while including the results from last week for Sarsa and Q-learning
plot_including_week10([experiment_sarsaL, experiment_nsarsa],output="semigrad_sarsan")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import gymnasium as gym
import numpy as np
from irlc.ex01.agent import train
from irlc import main_plot, savepdf
import matplotlib.pyplot as plt
from irlc.ex11.semi_grad_sarsa import LinearSemiGradSarsa
class LinearSemiGradSarsaLambda(LinearSemiGradSarsa):
def __init__(self, env, gamma=0.99, epsilon=0.1, alpha=0.5, lamb=0.9, q_encoder=None):
r"""
Sarsa(Lambda) with linear feature approximators (see (SB18, Section 12.7)).
"""
super().__init__(env, gamma, alpha=alpha, epsilon=epsilon, q_encoder=q_encoder)
self.z = np.zeros(self.Q.d) # Vector to store eligibility trace (same dimension as self.w)
self.lamb = lamb # lambda in Sarsa(lambda). We cannot use the reserved keyword 'lambda'.
def pi(self, s, k, info=None):
if k == 0: # If beginning of episode.
self.a = self.pi_eps(s, info)
self.x = self.Q.x(s,self.a)
self.Q_old = 0
return self.a
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
a_prime = self.pi_eps(sp, info_sp) if not done else -1
x_prime = self.Q.x(sp, a_prime) if not done else None
"""
Update the eligibility trace self.z and the weights self.w here.
Note Q-values are approximated as Q = w @ x.
We use Q_prime = w * x(s', a') to denote the new q-values for (stored for next iteration as in the pseudo code)
"""
# TODO: 5 lines missing.
raise NotImplementedError("Update z, w")
if done: # Reset eligibility trace and time step t as in Sarsa.
self.z = self.z * 0
else:
self.Q_old, self.x, self.a = Q_prime, x_prime, a_prime
def __str__(self):
return f"LinearSarsaLambda_{self.gamma}_{self.epsilon}_{self.alpha}_{self.lamb}"
from irlc.ex11.semi_grad_q import experiment_q, x, episodes
from irlc.ex11.semi_grad_sarsa import experiment_sarsa
from irlc.ex10 import envs
experiment_sarsaL = "experiments/mountaincar_sarsaL"
num_of_tilings = 8
alpha = 1 / num_of_tilings / 2 # learning rate
def plot_including_week10(experiments, output):
exps = ["../ex11/" + e for e in [experiment_q, experiment_sarsa]] + experiments
main_plot(exps, x_key=x, y_key='Length', smoothing_window=30, resample_ticks=100)
savepdf(output)
plt.show()
# Turn off averaging
main_plot(exps, x_key=x, y_key='Length', smoothing_window=30, units="Unit", estimator=None, resample_ticks=100)
savepdf(output+"_individual")
plt.show()
if __name__ == "__main__":
env = gym.make("MountainCar500-v0")
for _ in range(5): # run experiment 10 times
agent = LinearSemiGradSarsaLambda(env, gamma=1, alpha=alpha, epsilon=0)
train(env, agent, experiment_sarsaL, num_episodes=episodes, max_runs=10)
# Make plots (we use an external function so we can re-use it for the semi-gradient n-step controller)
plot_including_week10([experiment_sarsaL], output="semigrad_sarsaL")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment