Skip to content
Snippets Groups Projects
Commit 4ddfc88e authored by tuhe's avatar tuhe
Browse files

Week 5 code

parent 0545a3cc
No related branches found
No related tags found
No related merge requests found
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 5."""
This diff is collapsed.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex05.direct import direct_solver, get_opts, guess
from irlc.ex04.discrete_control_model import DiscreteControlModel
from irlc.ex04.control_environment import ControlEnvironment
from irlc import train, Agent
import numpy as np
import matplotlib.pyplot as plt
from irlc import savepdf
class DirectAgent(Agent):
def __init__(self, env: ControlEnvironment, options=None):
cmod = env.discrete_model.continuous_model # Get the continuous-time model for planning
if options is None:
options = [get_opts(N=10, ftol=1e-3, guess=guess(cmod), verbose=False),
get_opts(N=60, ftol=1e-6, verbose=False)
]
solutions = direct_solver(cmod, options)
# The next 3 lines are for plotting purposes. You can ignore them.
self.x_grid = np.stack([env.discrete_model.phi_x(x) for x in solutions[-1]['grid']['x']])
self.u_grid = np.stack([env.discrete_model.phi_u(u) for u in solutions[-1]['grid']['u']])
self.ts_grid = np.stack(solutions[-1]['grid']['ts'])
# set self.ufun equal to the solution (policy) function. You can get it by looking at `solutions` computed above
self.solutions = solutions
# TODO: 1 lines missing.
raise NotImplementedError("set self.ufun = solutions[....][somethingsomething] (insert a breakpoint, it should be self-explanatory).")
super().__init__(env)
def pi(self, x, k, info=None):
""" Return the action given x and t. As a hint, you will only use t, and self.ufun computed a few lines above"""
# TODO: 7 lines missing.
raise NotImplementedError("Implement function body")
return u
def train_direct_agent(animate=True, plot=False):
from irlc.ex05.pendulum_model_variant import DirectPendulumModel
model = DirectPendulumModel()
"""
Test out implementation on a fairly small grid. Note this will work fairly terribly.
"""
guess = {'t0': 0,
'tF': 3,
'x': [np.asarray([0, 0]), np.asarray([np.pi, 0])],
'u': [np.asarray([0]), np.asarray([0])]}
options = [get_opts(N=8, ftol=1e-3, guess=guess),
get_opts(N=40, ftol=1e-3),
get_opts(N=60, ftol=1e-6)
]
dmod = DiscreteControlModel(model=model, dt=0.1) # Discretize the pendulum model. Used for creating the environment.
denv = ControlEnvironment(discrete_model=dmod, Tmax=4, render_mode='human' if animate else None)
agent = DirectAgent(denv, options=options)
denv.Tmax = agent.solutions[-1]['fun']['tF'] # Specify max runtime of the environment. Must be based on the Agent's solution.
stats, traj = train(denv, agent=agent, num_episodes=1, return_trajectory=True)
if plot:
from irlc import plot_trajectory
plot_trajectory(traj[0], env=denv)
savepdf("direct_agent_pendulum")
plt.show()
return stats, traj, agent
if __name__ == "__main__":
stats, traj, agent = train_direct_agent(animate=True, plot=True)
print("Obtained cost", -stats[0]['Accumulated Reward'])
# Let's try to plot the state-vectors for the two models. They are not going to agree that well.
plt.plot(agent.ts_grid, agent.x_grid, 'r-', label="Direct solver prediction")
plt.plot(traj[0].time, traj[0].state, 'k-', label='Simulation')
plt.legend()
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Kel17] Matthew Kelly. An introduction to trajectory optimization: how to do your own direct collocation. SIAM Review, 59(4):849–904, 2017. (See kelly2017.pdf).
"""
from irlc.ex05.direct import guess
from irlc.ex05.model_cartpole import CartpoleModel
from irlc.ex03.control_cost import SymbolicQRCost
from irlc.ex05.direct import direct_solver, get_opts
import numpy as np
from gymnasium.spaces import Box
class KellyCartpoleModel(CartpoleModel):
"""Completes the Cartpole swingup task in exactly 2 seconds.
The only changes to the original cartpole model is the inclusion of a new bound on ``tf_bound(self)``,
to limit the end-time to :math:`t_F = 2`, and an updated cost function so that :math:`Q=0` and :math:`R=I`.
"""
def get_cost(self) -> SymbolicQRCost:
# TODO: 2 lines missing.
raise NotImplementedError("Construct and return a new cost-function here.")
def tF_bound(self) -> Box:
# TODO: 2 lines missing.
raise NotImplementedError("Implement the bound on tF here")
def make_cartpole_kelly17():
r"""
Creates Cartpole problem. Details about the cost function can be found in (Kel17, Section 6)
and details about the physical parameters can be found in (Kel17, Appendix E, table 3).
"""
# this will generate a different carpole environment with an emphasis on applying little force u.
duration = 2.0
maxForce = 20
model = KellyCartpoleModel(max_force=maxForce, mp=0.3, l=0.5, mc=1.0, dist=1)
guess2 = guess(model)
guess2['tF'] = duration # Our guess should match the constraints.
return model, guess2
def compute_solutions():
model, guess = make_cartpole_kelly17()
options = [get_opts(N=10, ftol=1e-3, guess=guess),
get_opts(N=40, ftol=1e-6)]
solutions = direct_solver(model, options)
return model, solutions
def direct_cartpole():
model, solutions = compute_solutions()
from irlc.ex05.direct_plot import plot_solutions
print("Did we succeed?", solutions[-1]['solver']['success'])
plot_solutions(model, solutions, animate=True, pdf="direct_cartpole_force")
model.close()
if __name__ == "__main__":
direct_cartpole()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex05.model_cartpole import CartpoleModel
from irlc.ex05.direct import direct_solver, get_opts
from irlc.ex05.direct_plot import plot_solutions
from irlc.ex05.direct import guess
def compute_solutions():
"""
See: https://github.com/MatthewPeterKelly/OptimTraj/blob/master/demo/cartPole/MAIN_minTime.m
"""
model = CartpoleModel(max_force=50, mp=0.5, mc=2.0, l=0.5)
guess2 = guess(model)
guess2['tF'] = 2
guess2['u'] = [[0], [0]]
options = [get_opts(N=8, ftol=1e-3, guess=guess2), # important.
get_opts(N=16, ftol=1e-6), # This is a hard problem and we need gradual grid-refinement.
get_opts(N=32, ftol=1e-6),
get_opts(N=70, ftol=1e-6)
]
solutions = direct_solver(model, options)
return model, solutions
if __name__ == "__main__":
model, solutions = compute_solutions()
x_sim, u_sim, t_sim = plot_solutions(model, solutions[:], animate=True, pdf="direct_cartpole_mintime")
model.close()
print("Did we succeed?", solutions[-1]['solver']['success'])
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex05.direct import direct_solver, get_opts
from irlc.ex05.pendulum_model_variant import DirectPendulumModel
from irlc.ex05.direct_plot import plot_solutions
import numpy as np
def compute_pendulum_solutions():
model = DirectPendulumModel()
"""
Test out implementation on a fairly small grid. Note this will work fairly terribly.
"""
guess = {'t0': 0,
'tF': 3,
'x': [np.asarray([0, 0]), np.asarray([np.pi, 0])],
'u': [np.asarray([0]), np.asarray([0.5])]}
options = [get_opts(N=8, ftol=1e-3, guess=guess),
get_opts(N=40, ftol=1e-3),
get_opts(N=60, ftol=1e-6),
]
solutions = direct_solver(model, options)
return model, solutions
if __name__ == "__main__":
model, solutions = compute_pendulum_solutions()
plot_solutions(model, solutions, animate=True, pdf="direct_pendulum_real")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import matplotlib.pyplot as plt
import numpy as np
from irlc.ex03.control_model import plot_trajectory
from irlc import savepdf
"""
Helper function for plotting.
"""
def plot_solutions(model, solutions, animate=True, pdf=None, plot_defects=True, Ix=None, animate_repeats=1, animate_all=False, plot=True):
for k, sol in enumerate(solutions):
grd = sol['grid']
x_res = sol['grid']['x']
u_res = sol['grid']['u']
ts = sol['grid']['ts']
u_fun = lambda x, t: sol['fun']['u'](t)
N = len(ts)
if pdf is not None:
pdf_out = f"{pdf}_sol{N}"
x_sim, u_sim, t_sim, cost = model.simulate(x0=grd['x'][0, :], u_fun=u_fun, t0=grd['ts'][0], tF=grd['ts'][-1], N_steps=1000)
if animate and (k == len(solutions)-1 or animate_all):
for _ in range(animate_repeats):
animate_rollout(model, x0=grd['x'][0, :], u_fun=u_fun, t0=grd['ts'][0], tF=grd['ts'][-1], N_steps=1000, fps=30)
eqC_val = sol['eqC_val']
labels = model.state_labels
if Ix is not None:
labels = [l for k, l in enumerate(labels) if k in Ix]
x_res = x_res[:,np.asarray(Ix)]
x_sim = x_sim[:,np.asarray(Ix)]
print("Initial State: " + ",".join(labels))
print(x_res[0])
print("Final State:")
print(x_res[-1])
if plot:
ax = plot_trajectory(x_res, ts, lt='ko-', labels=labels, legend="Direct state prediction $x(t)$")
plot_trajectory(x_sim, t_sim, lt='-', ax=ax, labels=labels, legend="RK4 exact simulation")
# plt.suptitle("State", fontsize=14, y=0.98)
# make_space_above(ax, topmargin=0.5)
if pdf is not None:
savepdf(pdf_out +"_x")
plt.show(block=False)
# print("plotting...")
plot_trajectory(u_res, ts, lt='ko-', labels=model.action_labels, legend="Direct action prediction $u(t)$")
# print("plotting... B")
# plt.suptitle("Action", fontsize=14, y=0.98)
# print("plotting... C")
# make_space_above(ax, topmargin=0.5)
# print("plotting... D")
if pdf is not None:
savepdf(pdf_out +"_u")
plt.show(block=False)
if plot_defects:
plot_trajectory(eqC_val, ts[:-1], lt='-', labels=labels)
plt.suptitle("Defects (equality constraint violations)")
if pdf is not None:
savepdf(pdf_out +"_defects")
plt.show(block=False)
return x_sim, u_sim, t_sim
def animate_rollout(model, x0, u_fun, t0, tF, N_steps = 1000, fps=10):
""" Helper function to animate a policy. """
import time
# if sys.gettrace() is not None:
# print("Not animating stuff in debugger as it crashes.")
# return
y, _, tt, cost = model.simulate(x0, u_fun, t0, tF, N_steps=N_steps)
secs = tF-t0
frames = int( np.ceil( secs * fps ) )
I = np.round( np.linspace(0, N_steps-1, frames)).astype(int)
y = y[I,:]
for i in range(frames):
model.render(x=y[i], render_mode="human")
time.sleep(1/fps)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex04.discrete_control_cost import DiscreteQRCost
import sympy as sym
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Box
from irlc.ex03.control_model import ControlModel
from irlc.ex03.control_cost import SymbolicQRCost
from irlc.ex04.discrete_control_model import DiscreteControlModel
from irlc.ex04.control_environment import ControlEnvironment
class CartpoleModel(ControlModel):
state_labels = ["$x$", r"$\frac{dx}{dt}$", r"$\theta$", r"$\frac{d \theta}{dt}$"]
action_labels = ["Cart force $u$"]
def __init__(self, mc=2,
mp=0.5,
l=0.5,
max_force=50, dist=1.0):
self.mc = mc
self.mp = mp
self.l = l
self.max_force = max_force
self.dist = dist
self.cp_render = {}
super().__init__()
def tF_bound(self) -> Box:
return Box(0.01, np.inf, shape=(1,), dtype=np.float64)
def x_bound(self) -> Box:
return Box(np.asarray([-2 * self.dist, -np.inf, -2 * np.pi, -np.inf]), np.asarray([2 * self.dist, np.inf, 2 * np.pi, np.inf]), dtype=np.float64)
def x0_bound(self) -> Box:
return Box(np.asarray([0, 0, np.pi, 0]), np.asarray([0, 0, np.pi, 0]), dtype=np.float64)
def xF_bound(self) -> Box:
return Box(np.asarray([self.dist, 0, 0, 0]), np.asarray([self.dist, 0, 0, 0]), dtype=np.float64)
def u_bound(self) -> Box:
return Box(np.asarray([-self.max_force]), np.asarray([self.max_force]), dtype=np.float64)
def get_cost(self) -> SymbolicQRCost:
return SymbolicQRCost(R=np.eye(1) * 0, Q=np.eye(4) * 0, qc=1) # just minimum time
def sym_f(self, x, u, t=None):
mp = self.mp
l = self.l
mc = self.mc
g = 9.81 # Gravity on earth.
x_dot = x[1]
theta = x[2]
sin_theta = sym.sin(theta)
cos_theta = sym.cos(theta)
theta_dot = x[3]
F = u[0]
# Define dynamics model as per Razvan V. Florian's
# "Correct equations for the dynamics of the cart-pole system".
# Friction is neglected.
# Eq. (23)
temp = (F + mp * l * theta_dot ** 2 * sin_theta) / (mc + mp)
numerator = g * sin_theta - cos_theta * temp
denominator = l * (4.0 / 3.0 - mp * cos_theta ** 2 / (mc + mp))
theta_dot_dot = numerator / denominator
# Eq. (24)
x_dot_dot = temp - mp * l * theta_dot_dot * cos_theta / (mc + mp)
xp = [x_dot,
x_dot_dot,
theta_dot,
theta_dot_dot]
return xp
def close(self):
for r in self.cp_render.values():
r.close()
def render(self, x, render_mode="human"):
if render_mode not in self.cp_render:
self.cp_render[render_mode] = gym.make("CartPole-v1", render_mode=render_mode) # environment only used for rendering. Change to v1 in gym 0.26.
self.cp_render[render_mode].max_time_limit = 10000
self.cp_render[render_mode].reset()
self.cp_render[render_mode].unwrapped.state = np.asarray(x) # environment is wrapped
return self.cp_render[render_mode].render()
class SinCosCartpoleModel(CartpoleModel):
def phi_x(self, x):
x, dx, theta, theta_dot = x[0], x[1], x[2], x[3]
return [x, dx, sym.sin(theta), sym.cos(theta), theta_dot]
def phi_x_inv(self, x):
x, dx, sin_theta, cos_theta, theta_dot = x[0], x[1], x[2], x[3], x[4]
theta = sym.atan2(sin_theta, cos_theta) # Obtain angle theta from sin(theta),cos(theta)
return [x, dx, theta, theta_dot]
def phi_u(self, u):
return [sym.atanh(u[0] / self.max_force)]
def phi_u_inv(self, u):
return [sym.tanh(u[0]) * self.max_force]
def _cartpole_discrete_cost(model):
pole_length = model.continuous_model.l
state_size = model.state_size
Q = np.eye(state_size)
Q[0, 0] = 1.0
Q[1, 1] = Q[4, 4] = 0.
Q[0, 2] = Q[2, 0] = pole_length
Q[2, 2] = Q[3, 3] = pole_length ** 2
print("Warning: I altered the cost-matrix to prevent underflow. This is not great.")
R = np.array([[0.1]])
Q_terminal = 1 * Q
q = np.asarray([0,0,0,-1,0])
# Instantaneous control cost.
c3 = DiscreteQRCost(Q=Q*0, R=R * 0.1, q=1 * q, qN=q * 1)
c3 += c3.goal_seeking_cost(Q=Q, x_target=model.x_upright)
c3 += c3.goal_seeking_terminal_cost(QN=Q_terminal, xN_target=model.x_upright)
cost = c3
return cost
class GymSinCosCartpoleModel(DiscreteControlModel):
state_labels = ['x', 'd_x', r'$\sin(\theta)$', r'$\cos(\theta)$', r'$d\theta/dt$']
action_labels = ['Torque $u$']
def __init__(self, dt=0.02, cost=None, transform_actions=True, **kwargs):
model = SinCosCartpoleModel(**kwargs)
self.transform_actions = transform_actions
super().__init__(model=model, dt=dt, cost=cost)
self.x_upright = np.asarray(self.phi_x(model.xF_bound().low ))
if cost is None:
cost = _cartpole_discrete_cost(self)
self.cost = cost
@property
def max_force(self):
return self.continuous_model.maxForce
class GymSinCosCartpoleEnvironment(ControlEnvironment):
def __init__(self, Tmax=5, transform_actions=True, supersample_trajectory=False, render_mode='human', **kwargs):
discrete_model = GymSinCosCartpoleModel(transform_actions=transform_actions, **kwargs)
self.observation_space = Box(low=-np.inf, high=np.inf, shape=(5,), dtype=np.float64)
if transform_actions:
self.action_space = Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float64)
super().__init__(discrete_model, Tmax=Tmax,render_mode=render_mode, supersample_trajectory=supersample_trajectory)
class DiscreteCartpoleModel(DiscreteControlModel):
def __init__(self, dt=0.02, cost=None, **kwargs):
model = CartpoleModel(**kwargs)
super().__init__(model=model, dt=dt, cost=cost)
class CartpoleEnvironment(ControlEnvironment):
def __init__(self, Tmax=5, supersample_trajectory=False, render_mode='human', **kwargs):
discrete_model = DiscreteCartpoleModel(**kwargs)
super().__init__(discrete_model, Tmax=Tmax, supersample_trajectory=supersample_trajectory, render_mode=render_mode)
if __name__ == "__main__":
from irlc import Agent, train
env = GymSinCosCartpoleEnvironment(render_mode='human')
agent = Agent(env)
stats, traj = train(env, agent, num_episodes=1, max_steps=100)
env.close()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import sympy as sym
from irlc.ex03.control_model import ControlModel
from irlc.ex03.control_cost import SymbolicQRCost
from irlc.ex04.discrete_control_model import DiscreteControlModel
import gymnasium as gym
from gymnasium.spaces.box import Box
from irlc.ex04.control_environment import ControlEnvironment
import numpy as np
from irlc.ex04.model_pendulum import PendulumModel
class DirectPendulumModel(PendulumModel):
def __init__(self, l=1., m=.8, max_torque=4.0):
super().__init__(l=l, m=m, max_torque=max_torque)
def get_cost(self) -> SymbolicQRCost:
return SymbolicQRCost(R=np.ones((1, 1)), Q=0 * np.eye(2))
def tF_bound(self) -> Box:
return Box(0.5, 3, shape=(1,), dtype=float)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment