From 4ddfc88ec149332014fd01887d2621d6f6fd64e6 Mon Sep 17 00:00:00 2001 From: Tue Herlau <tuhe@dtu.dk> Date: Wed, 5 Mar 2025 13:35:20 +0100 Subject: [PATCH] Week 5 code --- irlc/ex05/__init__.py | 2 + irlc/ex05/direct.py | 372 ++++++++++++++++++++++++++++ irlc/ex05/direct_agent.py | 74 ++++++ irlc/ex05/direct_cartpole_kelly.py | 56 +++++ irlc/ex05/direct_cartpole_time.py | 28 +++ irlc/ex05/direct_pendulum.py | 28 +++ irlc/ex05/direct_plot.py | 82 ++++++ irlc/ex05/model_cartpole.py | 171 +++++++++++++ irlc/ex05/pendulum_model_variant.py | 21 ++ 9 files changed, 834 insertions(+) create mode 100644 irlc/ex05/__init__.py create mode 100644 irlc/ex05/direct.py create mode 100644 irlc/ex05/direct_agent.py create mode 100644 irlc/ex05/direct_cartpole_kelly.py create mode 100644 irlc/ex05/direct_cartpole_time.py create mode 100644 irlc/ex05/direct_pendulum.py create mode 100644 irlc/ex05/direct_plot.py create mode 100644 irlc/ex05/model_cartpole.py create mode 100644 irlc/ex05/pendulum_model_variant.py diff --git a/irlc/ex05/__init__.py b/irlc/ex05/__init__.py new file mode 100644 index 0000000..23f7751 --- /dev/null +++ b/irlc/ex05/__init__.py @@ -0,0 +1,2 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +"""This directory contains the exercises for week 5.""" diff --git a/irlc/ex05/direct.py b/irlc/ex05/direct.py new file mode 100644 index 0000000..3196e9b --- /dev/null +++ b/irlc/ex05/direct.py @@ -0,0 +1,372 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025. +""" +from irlc.ex03.control_model import ControlModel +import numpy as np +import sympy as sym +import sys +from scipy.optimize import Bounds, minimize +from scipy.interpolate import interp1d +from irlc.ex03.control_model import symv +from irlc.ex04.discrete_control_model import sympy_modules_ +from irlc import Timer +from tqdm import tqdm + +def bounds2fun(t0 : float, tF : float, bounds : Bounds) -> np.ndarray: + """ + Given start and end times [t0, tF] and a scipy Bounds object with upper/lower bounds on some variable x, i.e. so that: + + > bounds.lb <= x <= bounds.ub + + this function returns a new function f such that f(t0) equals bounds.lb and f(tF) = bounds.ub and + f(t) interpolates between the uppower/lower bounds linearly, i.e. + + > bounds.lb <= f(t) <= bounds.ub + + The function will return a numpy ``ndarray``. + """ + return interp1d(np.asarray([t0, tF]), np.stack([np.reshape(b, (-1,)) for b in bounds], axis=1)) + +def direct_solver(model : ControlModel, options : dict) -> list[dict]: + """ + Main direct solver method, see (Her25, Algorithm 21). Given a list of options of length S, the solver performers collocation + using the settings found in the dictionary options[i], and use the result of options[i] to initialize collocation on options[i+1]. + + This iterative refinement scheme is required to obtain good overall solutions. + + :param model: A ContinuousTimeModel instance + :param options: An options-structure. This is a list of dictionaries of options for each collocation iteration + :return: A list of solutions, one for each collocation step. The last will be the 'best' solution (highest N) + + """ + if isinstance(options, dict): + options = [options] + solutions = [] # re-use result of current solutions to initialize next with a higher value of N + for i, opt in enumerate(options): + optimizer_options = opt['optimizer_options'] # to be passed along to minimize() + if i == 0 or "guess" in opt: + # No solutions-function is given. Re-calculate by linearly interpreting bounds (see (Her25, Subsection 15.3.4)) + guess = opt['guess'] + guess['u'] = bounds2fun(guess['t0'],guess['tF'],guess['u']) if isinstance(guess['u'], list) else guess['u'] + guess['x'] = bounds2fun(guess['t0'],guess['tF'],guess['x']) if isinstance(guess['x'], list) else guess['x'] + else: + """ For an iterative solver ((Her25, Subsection 15.3.4)), initialize the guess at iteration i to be the solution at iteration i-1. + The guess consists of a guess for t0, tF (just numbers) as well as x, u (state/action trajectories), + the later two being functions. The format of the guess is just a dictionary (you have seen several examples) + i.e. + + > guess = {'t0': (number), 'tF': (number), 'x': (function), 'u': (function)} + + and you can get the solution by using solutions[i - 1]['fun']. (insert a breakpoint and check the fields) + """ + # TODO: 1 lines missing. + raise NotImplementedError("Define guess = {'t0': ..., ...} here.") + N = opt['N'] + print(f"{i}> Collocation starting with grid-size N={N}") + sol = collocate(model, N=N, optimizer_options=optimizer_options, guess=guess, verbose=opt.get('verbose', False)) + solutions.append(sol) + + print("Was collocation success full at each iteration?") + for i, s in enumerate(solutions): + print(f"{i}> Success? {s['solver']['success']}") + return solutions + +def collocate(model : ControlModel, N=25, optimizer_options=None, guess : dict = None, verbose=True) -> dict: + r""" + Performs collocation by discretizing the model using a grid-size of N and optimize to find the optimal solution. + The 'model' should be a ControlModel instance, optimizer_options contains options for the optimizer, and guess + is a dictionary used to initialize the optimizer containing keys:: + + guess = {'t0': Start time (float), + 'tF': Terminal time (float), + 'x': A *function* which takes time as input and return a guess for x(t), + 'u': A *function* which takes time as input and return a guess for u(t), + } + + So for instance + + .. code-block:: python + + guess['x'](0.5) + + will return the state :math:`\mathbf x(0.5)` as a numpy ndarray. + + The overall structure of the optimization procedure is as follows: + + #. Define the following variables. They will all be lists: + - ``z``: Variables to be optimized over. Each element ``z[k]`` is a symbolic variable. This will allow us to compute derivatives. + - ``z0``: A list of numbers representing the initial guess. Computed using 'guess' (above) + - ``z_lb``, ``z_ub``: Lists of numbers representting the upper/lower bounds on z. Use bound-methods in :class:`irlc.ex03.control_model.ControlModel` to get these. + #. Create a symbolic expression representing the cost-function J + This is defined using the symbolic variables similar to the toy-problem we saw last week. This allows us to compute derivatives of the cost + #. Create *symbolic* expressions representing all constraints + The lists ``Iineq`` and ``Ieq`` contains *lists* of constraints. The solver will ensure that for any i:: + + Ieq[i] == 0 + + and:: + + Iineq[i] <= 0 + + This allows us to just specify each element in 'eqC' and 'ineqC' as a single symbolic expression. Once more, we use symbolic expressions so + derivatives can be computed automatically. The most important constraints are in 'eqC', as these must include the collocation-constraints (see algorithm in notes) + #. Compile all symbolic expressions into a format useful for the optimizer + The optimizer accepts numpy functions, so we turn all symbolic expressions and derivatives into numpy (similar to the example last week). + It is then fed into the optimizer and, fingers crossed, the optimizer spits out a value 'z*', which represents the optimal values. + + #. Unpack z: + The value 'z*' then has to be unpacked and turned into function u*(t) and x*(t) (as in the notes). These functions can then be put into the + solution-dictionary and used to initialize the next guess (or assuming we terminate, these are simply our solution). + + :param model: A :class:`irlc.ex03.control_model.ControlModel` instance + :param N: The number of collocation knot points :math:`N` + :param optimizer_options: Options for the scipy optimizer. You can ignore this. + :param guess: A dictionary containing the initial guess. See the online documentation. + :param verbose: Whether to print out extra details during the run. Useful only for debugging. + :return: A dictionary containing the solution. It is compatible with the :python:`guess` datastructure . + """ + timer = Timer(start=True) + cost = model.get_cost() + t0, tF = sym.symbols("t0"), sym.symbols("tF") + ts = t0 + np.linspace(0, 1, N) * (tF-t0) # N points linearly spaced between [t0, tF] TODO: Convert this to a list. + xs, us = [], [] + for i in range(N): + xs.append(list(symv("x_%i_" % i, model.state_size))) + us.append(list(symv("u_%i_" % i, model.action_size))) + + ''' (1) Construct guess z0, all simple bounds [z_lb, z_ub] for the problem and collect all symbolic variables as z ''' + # sb = model.simple_bounds() # get simple inequality boundaries in problem (v_lb <= v <= v_ub) + z = [] # list of all *symbolic* variables in the problem + # These lists contain the guess z0 and lower/upper bounds (list-of-numbers): z_lb[k] <= z0[k] <= z_ub[k]. + # They should be lists of *numbers*. + z0, z_lb, z_ub = [], [], [] + ts_eval = sym.lambdify((t0, tF), ts.tolist(), modules='numpy') + for k in range(N): + x_low = list(model.x0_bound().low if k == 0 else (model.xF_bound().low if k == N - 1 else model.x_bound().low)) + x_high = list(model.x0_bound().high if k == 0 else (model.xF_bound().high if k == N - 1 else model.x_bound().high)) + u_low, u_high = list(model.u_bound().low), list(model.u_bound().high) + + tk = ts_eval(guess['t0'], guess['tF'])[k] + """ In these lines, update z, z0, z_lb, and z_ub with values corresponding to xs[k], us[k]. + The values are all lists; i.e. z[j] (symbolic) has guess z0[j] (float) and bounds z_lb[j], z_ub[j] (floats) """ + # TODO: 2 lines missing. + raise NotImplementedError("Updates for x_k, u_k") + + """ Update z, z0, z_lb, and z_ub with bounds/guesses corresponding to t0 and tF (same format as above). """ + # z, z0, z_lb, z_ub = z+[t0], z0+[guess['t0']], z_lb+[model.bounds['t0_low']], z_ub+[model.bounds['t0_high']] + # TODO: 2 lines missing. + raise NotImplementedError("Updates for t0, tF") + assert len(z) == len(z0) == len(z_lb) == len(z_ub) + if verbose: + print(f"z={z}\nz0={np.asarray(z0).round(1).tolist()}\nz_lb={np.asarray(z_lb).round(1).tolist()}\nz_ub={np.asarray(z_ub).round(1).tolist()}") + print(">>> Trapezoid collocation of problem") # problem in this section + fs, cs = [], [] # lists of symbolic variables corresponding to f_k and c_k, see (Her25, Algorithm 20). + for k in range(N): + """ Update both fs and cs; these are lists of symbolic expressions such that fs[k] corresponds to f_k and cs[k] to c_k in the slides. + Use the functions env.sym_f and env.sym_c """ + # fs.append( symbolic variable corresponding to f_k; see env.sym_f). similarly update cs.append(env.sym_c(...) ). + ## TODO: Half of each line of code in the following 2 lines have been replaced by garbage. Make it work and remove the error. + #---------------------------------------------------------------------------------------------------------------------------- + # fs.append(model.sym_f(x=????????????????????????? + # cs.append(cost.sym_c(x=x???????????????????????? + raise NotImplementedError("Compute f[k] and c[k] here (see slides) and add them to above lists") + + J = cost.sym_cf(x0=xs[0], t0=t0, xF=xs[-1], tF=tF) # terminal cost; you need to update this variable with all the cs[k]'s. + Ieq, Iineq = [], [] # all symbolic equality/inequality constraints are stored in these lists + for k in range(N - 1): + # Update cost function ((Her25, eq. (15.15))). Use the above defined symbolic expressions ts, hk and cs. + # TODO: 2 lines missing. + raise NotImplementedError("Update J here") + # Set up equality constraints. See (Her25, eq. (15.18)). + for j in range(model.state_size): + """Create all collocation equality-constraints here and add them to Ieq. I.e. + + xs[k+1] - xs[k] = 0.5 h_k (f_{k+1} + f_k) + + Note we have to create these coordinate-wise which is why we loop over j. + """ + ## TODO: Half of each line of code in the following 1 lines have been replaced by garbage. Make it work and remove the error. + #---------------------------------------------------------------------------------------------------------------------------- + # Ieq.append((xs[k+1][j] - xs[k][j])?????????????????????????????????? + raise NotImplementedError("Update collocation constraints here") + """ + To solve problems with dynamical path constriants like Brachiostone, update Iineq here to contain the + inequality constraint model.sym_h(...) <= 0. For the other problems this can simply be left blank """ + if hasattr(model, 'sym_h'): + # TODO: 1 lines missing. + raise NotImplementedError("Update symbolic path-dependent constraint h(x,u,t)<=0 here") + + print(">>> Creating objective and derivative...") + timer.tic("Building symbolic objective") + J_fun = sym.lambdify([z], J, modules='numpy') # create a python function from symbolic expression + # To compute the Jacobian, you can use sym.derive_by_array(J, z) to get the correct symbolic expression, then use sym.lamdify (as above) to get a numpy function. + ## TODO: Half of each line of code in the following 1 lines have been replaced by garbage. Make it work and remove the error. + #---------------------------------------------------------------------------------------------------------------------------- + # J_jac = sym.lambdify([z], sym.deri??????????????????????????????????? + raise NotImplementedError("Jacobian of J. See how this is computed for equality/inequality constratins for help.") + if verbose: + print(f"{Ieq=}\n{Iineq=}\n{J=}") + timer.toc() + print(">>> Differentiating equality constraints..."), timer.tic("Differentiating equality constraints") + constraints = [] + for eq in tqdm(Ieq, file=sys.stdout): # don't write to error output. + constraints.append(constraint2dict(eq, z, type='eq')) + timer.toc() + print(">>> Differentiating inequality constraints"), timer.tic("Differentiating inequality constraints") + constraints += [constraint2dict(ineq, z, type='ineq') for ineq in Iineq] + timer.toc() + + c_viol = sum(abs(np.minimum(z_ub - np.asarray(z0), 0))) + sum(abs(np.maximum(np.asarray(z_lb) - np.asarray(z0), 0))) + if c_viol > 0: # check if: z_lb <= z0 <= z_ub. Violations only serious if large + print(f">>> Warning! Constraint violations found of total magnitude: {c_viol:4} before optimization") + + print(">>> Running optimizer..."), timer.tic("Optimizing") + z_B = Bounds(z_lb, z_ub) + res = minimize(J_fun, x0=z0, method='SLSQP', jac=J_jac, constraints=constraints, options=optimizer_options, bounds=z_B) + # Compute value of equality constraints to check violations + timer.toc() + eqC_fun = sym.lambdify([z], Ieq) + eqC_val_ = eqC_fun(res.x) + eqC_val = np.zeros((N - 1, model.state_size)) + + x_res = np.zeros((N, model.state_size)) + u_res = np.zeros((N, model.action_size)) + t0_res = res.x[-2] + tF_res = res.x[-1] + + m = model.state_size + model.action_size + for k in range(N): + dx = res.x[k * m:(k + 1) * m] + if k < N - 1: + eqC_val[k, :] = eqC_val_[k * model.state_size:(k + 1) * model.state_size] + x_res[k, :] = dx[:model.state_size] + u_res[k, :] = dx[model.state_size:] + + # Generate solution structure + ts_numpy = ts_eval(t0_res, tF_res) + # make linear interpolation similar to (Her25, eq. (15.22)) + ufun = interp1d(ts_numpy, np.transpose(u_res), kind='linear') + # Evaluate function values fk points (useful for debugging but not much else): + f_eval = sym.lambdify((t0, tF, xs, us), fs) + fs_numpy = f_eval(t0_res, tF_res, x_res, u_res) + fs_numpy = np.asarray(fs_numpy) + + r""" Interpolate to get x(t) as described in (Her25, eq. (15.26)). The function should accept both lists and numbers for t.""" + x_fun = lambda t_new: np.stack([trapezoid_interpolant(ts_numpy, np.transpose(x_res), np.transpose(fs_numpy), t_new=t) for t in np.reshape(np.asarray(t_new), (-1,))], axis=1) + + if verbose: + newt = np.linspace(ts_numpy[0], ts_numpy[-1], len(ts_numpy)-1) + print( x_fun(newt) ) + + sol = { + 'grid': {'x': x_res, 'u': u_res, 'ts': ts_numpy, 'fs': fs_numpy}, + 'fun': {'x': x_fun, 'u': ufun, 'tF': tF_res, 't0': t0_res}, + 'solver': res, + 'eqC_val': eqC_val, + 'inputs': {'z': z, 'z0': z0, 'z_lb': z_lb, 'z_ub': z_ub}, + } + print(timer.display()) + return sol + +def trapezoid_interpolant(ts : list, xs : list, fs : list, t_new=None): + r""" + This function implements (Her25, eq. (15.26)) to evaluate :math:`\mathbf{x}(t)` at a point :math:`t =` ``t_new``. + + The other inputs represent the output of the direct optimization procedure. I.e., ``ts`` is a list of length + :math:`N+1` corresponding to :math:`t_k`, ``xs`` is a list of :math:`\mathbf x_k`, and ``fs`` is a list corresponding + to :math:`\mathbf f_k`. To implement the method, you should first determine which :math:`k` the new time point ``t_new`` + corresponds to, i.e. where :math:`t_k \leq t_\text{new} < t_{k+1}`. + + + :param ts: List of time points ``[.., t_k, ..]`` + :param xs: List of numpy ndarrays ``[.., x_k, ...]`` + :param fs: List of numpy ndarrays ``[.., f_k, ...]`` + :param t_new: The time point we should evaluate the function in. + :return: The state evaluated at time ``t_new``, i.e. :math:`\mathbf x(t_\text{new})`. + """ + # TODO: 3 lines missing. + raise NotImplementedError("Determine the time index k here so that ts[k] <= t_new < ts[k+1].") + + ts = np.asarray(ts) + tau = t_new - ts[k] + hk = ts[k + 1] - ts[k] + r""" + Make interpolation here. Should be a numpy array of dimensions [xs.shape[0], len(I)] + What the code does is that for each t in ts, we work out which knot-point interval the code falls within. I.e. + insert a breakpoint and make sure you understand what e.g. the code tau = t_new - ts[I] does. + + Given this information, we can recover the relevant (evaluated) knot-points as for instance + fs[:,I] and those at the next time step as fs[:,I]. With this information, the problem is simply an + implementation of (Her25, eq. (15.26)), i.e. + + > x_interp = xs[:,I] + tau * fs[:,I] + (...) + + """ + ## TODO: Half of each line of code in the following 1 lines have been replaced by garbage. Make it work and remove the error. + #---------------------------------------------------------------------------------------------------------------------------- + # x_interp = xs[:, k] + tau * fs[:, k] + (tau ???????????????????????????????????????????? + raise NotImplementedError("Insert your solution and remove this error.") + return x_interp + + +def constraint2dict(symb, all_vars, type='eq'): + ''' Turn constraints into a dict with type, fun, and jacobian field. ''' + if type == "ineq": symb = -1 * symb # To agree with sign convention in optimizer + + f = sym.lambdify([all_vars], symb, modules=sympy_modules_) + # np.atan = np.arctan # Monkeypatch numpy to contain atan. Passing "numpy" does not seem to fix this. + jac = sym.lambdify([all_vars], sym.derive_by_array(symb, all_vars), modules=sympy_modules_) + eq_cons = {'type': type, + 'fun': f, + 'jac': jac} + return eq_cons + +def get_opts(N, ftol=1e-6, guess=None, verbose=False) -> dict: + # helper function to instantiate the optimizer objects. They will be returned as a dictionary. You can ignore the details.. + d = {'N': N, + 'optimizer_options': {'maxiter': 1000, + 'ftol': ftol, + 'iprint': 1, + 'disp': True, + 'eps': 1.5e-8}, # 'eps': 1.4901161193847656e-08, + 'verbose': verbose} + if guess: + d['guess'] = guess + return d + +def guess(model : ControlModel) -> dict: + def mfin(z): + return [z_ if np.isfinite(z_) else 0 for z_ in z] + xL = mfin(model.x0_bound().low) + xU = mfin(model.xF_bound().high) + tF = 10 if not np.isfinite(model.tF_bound().high[0]) else model.tF_bound().high[0] + gs = {'t0': 0, + 'tF': tF, + 'x': [xL, xU], + 'u': [mfin(model.u_bound().low), mfin(model.u_bound().high)]} + return gs + + +def run_direct_small_problem(): + # from irlc.ex04.model_pendulum import SinCosPendulumModel + from irlc.ex05.pendulum_model_variant import DirectPendulumModel + model = DirectPendulumModel(max_torque=10) + """ + Test out implementation on a very small grid. The overall solution will be fairly bad, + but we can print out the various symbolic expressions + + We use verbose=True to get debug-information. + """ + print("Solving with a small grid, N=5") + options = [get_opts(N=5, ftol=1e-3, guess=guess(model), verbose=True)] + solutions = direct_solver(model, options) + return model, solutions + + +if __name__ == "__main__": + from irlc.ex05.direct_plot import plot_solutions + model, solutions = run_direct_small_problem() + plot_solutions(model, solutions, animate=False, pdf="direct_pendulum_small") diff --git a/irlc/ex05/direct_agent.py b/irlc/ex05/direct_agent.py new file mode 100644 index 0000000..64c40a6 --- /dev/null +++ b/irlc/ex05/direct_agent.py @@ -0,0 +1,74 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex05.direct import direct_solver, get_opts, guess +from irlc.ex04.discrete_control_model import DiscreteControlModel +from irlc.ex04.control_environment import ControlEnvironment +from irlc import train, Agent +import numpy as np +import matplotlib.pyplot as plt +from irlc import savepdf + +class DirectAgent(Agent): + def __init__(self, env: ControlEnvironment, options=None): + cmod = env.discrete_model.continuous_model # Get the continuous-time model for planning + + if options is None: + options = [get_opts(N=10, ftol=1e-3, guess=guess(cmod), verbose=False), + get_opts(N=60, ftol=1e-6, verbose=False) + ] + solutions = direct_solver(cmod, options) + + # The next 3 lines are for plotting purposes. You can ignore them. + self.x_grid = np.stack([env.discrete_model.phi_x(x) for x in solutions[-1]['grid']['x']]) + self.u_grid = np.stack([env.discrete_model.phi_u(u) for u in solutions[-1]['grid']['u']]) + self.ts_grid = np.stack(solutions[-1]['grid']['ts']) + # set self.ufun equal to the solution (policy) function. You can get it by looking at `solutions` computed above + self.solutions = solutions + # TODO: 1 lines missing. + raise NotImplementedError("set self.ufun = solutions[....][somethingsomething] (insert a breakpoint, it should be self-explanatory).") + super().__init__(env) + + def pi(self, x, k, info=None): + """ Return the action given x and t. As a hint, you will only use t, and self.ufun computed a few lines above""" + # TODO: 7 lines missing. + raise NotImplementedError("Implement function body") + return u + +def train_direct_agent(animate=True, plot=False): + from irlc.ex05.pendulum_model_variant import DirectPendulumModel + model = DirectPendulumModel() + """ + Test out implementation on a fairly small grid. Note this will work fairly terribly. + """ + guess = {'t0': 0, + 'tF': 3, + 'x': [np.asarray([0, 0]), np.asarray([np.pi, 0])], + 'u': [np.asarray([0]), np.asarray([0])]} + + options = [get_opts(N=8, ftol=1e-3, guess=guess), + get_opts(N=40, ftol=1e-3), + get_opts(N=60, ftol=1e-6) + ] + + dmod = DiscreteControlModel(model=model, dt=0.1) # Discretize the pendulum model. Used for creating the environment. + denv = ControlEnvironment(discrete_model=dmod, Tmax=4, render_mode='human' if animate else None) + agent = DirectAgent(denv, options=options) + denv.Tmax = agent.solutions[-1]['fun']['tF'] # Specify max runtime of the environment. Must be based on the Agent's solution. + stats, traj = train(denv, agent=agent, num_episodes=1, return_trajectory=True) + + if plot: + from irlc import plot_trajectory + plot_trajectory(traj[0], env=denv) + savepdf("direct_agent_pendulum") + plt.show() + + return stats, traj, agent + +if __name__ == "__main__": + stats, traj, agent = train_direct_agent(animate=True, plot=True) + print("Obtained cost", -stats[0]['Accumulated Reward']) + + # Let's try to plot the state-vectors for the two models. They are not going to agree that well. + plt.plot(agent.ts_grid, agent.x_grid, 'r-', label="Direct solver prediction") + plt.plot(traj[0].time, traj[0].state, 'k-', label='Simulation') + plt.legend() + plt.show() diff --git a/irlc/ex05/direct_cartpole_kelly.py b/irlc/ex05/direct_cartpole_kelly.py new file mode 100644 index 0000000..589cddf --- /dev/null +++ b/irlc/ex05/direct_cartpole_kelly.py @@ -0,0 +1,56 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Kel17] Matthew Kelly. An introduction to trajectory optimization: how to do your own direct collocation. SIAM Review, 59(4):849–904, 2017. (See kelly2017.pdf). +""" +from irlc.ex05.direct import guess +from irlc.ex05.model_cartpole import CartpoleModel +from irlc.ex03.control_cost import SymbolicQRCost +from irlc.ex05.direct import direct_solver, get_opts +import numpy as np +from gymnasium.spaces import Box + +class KellyCartpoleModel(CartpoleModel): + """Completes the Cartpole swingup task in exactly 2 seconds. + + The only changes to the original cartpole model is the inclusion of a new bound on ``tf_bound(self)``, + to limit the end-time to :math:`t_F = 2`, and an updated cost function so that :math:`Q=0` and :math:`R=I`. + """ + def get_cost(self) -> SymbolicQRCost: + # TODO: 2 lines missing. + raise NotImplementedError("Construct and return a new cost-function here.") + + def tF_bound(self) -> Box: + # TODO: 2 lines missing. + raise NotImplementedError("Implement the bound on tF here") + +def make_cartpole_kelly17(): + r""" + Creates Cartpole problem. Details about the cost function can be found in (Kel17, Section 6) + and details about the physical parameters can be found in (Kel17, Appendix E, table 3). + """ + # this will generate a different carpole environment with an emphasis on applying little force u. + duration = 2.0 + maxForce = 20 + model = KellyCartpoleModel(max_force=maxForce, mp=0.3, l=0.5, mc=1.0, dist=1) + guess2 = guess(model) + guess2['tF'] = duration # Our guess should match the constraints. + return model, guess2 + +def compute_solutions(): + model, guess = make_cartpole_kelly17() + options = [get_opts(N=10, ftol=1e-3, guess=guess), + get_opts(N=40, ftol=1e-6)] + solutions = direct_solver(model, options) + return model, solutions + +def direct_cartpole(): + model, solutions = compute_solutions() + from irlc.ex05.direct_plot import plot_solutions + print("Did we succeed?", solutions[-1]['solver']['success']) + plot_solutions(model, solutions, animate=True, pdf="direct_cartpole_force") + model.close() + +if __name__ == "__main__": + direct_cartpole() diff --git a/irlc/ex05/direct_cartpole_time.py b/irlc/ex05/direct_cartpole_time.py new file mode 100644 index 0000000..ccf6336 --- /dev/null +++ b/irlc/ex05/direct_cartpole_time.py @@ -0,0 +1,28 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex05.model_cartpole import CartpoleModel +from irlc.ex05.direct import direct_solver, get_opts +from irlc.ex05.direct_plot import plot_solutions +from irlc.ex05.direct import guess + +def compute_solutions(): + """ + See: https://github.com/MatthewPeterKelly/OptimTraj/blob/master/demo/cartPole/MAIN_minTime.m + """ + model = CartpoleModel(max_force=50, mp=0.5, mc=2.0, l=0.5) + guess2 = guess(model) + guess2['tF'] = 2 + guess2['u'] = [[0], [0]] + + options = [get_opts(N=8, ftol=1e-3, guess=guess2), # important. + get_opts(N=16, ftol=1e-6), # This is a hard problem and we need gradual grid-refinement. + get_opts(N=32, ftol=1e-6), + get_opts(N=70, ftol=1e-6) + ] + solutions = direct_solver(model, options) + return model, solutions + +if __name__ == "__main__": + model, solutions = compute_solutions() + x_sim, u_sim, t_sim = plot_solutions(model, solutions[:], animate=True, pdf="direct_cartpole_mintime") + model.close() + print("Did we succeed?", solutions[-1]['solver']['success']) diff --git a/irlc/ex05/direct_pendulum.py b/irlc/ex05/direct_pendulum.py new file mode 100644 index 0000000..2709612 --- /dev/null +++ b/irlc/ex05/direct_pendulum.py @@ -0,0 +1,28 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex05.direct import direct_solver, get_opts +from irlc.ex05.pendulum_model_variant import DirectPendulumModel +from irlc.ex05.direct_plot import plot_solutions +import numpy as np + + +def compute_pendulum_solutions(): + model = DirectPendulumModel() + """ + Test out implementation on a fairly small grid. Note this will work fairly terribly. + """ + guess = {'t0': 0, + 'tF': 3, + 'x': [np.asarray([0, 0]), np.asarray([np.pi, 0])], + 'u': [np.asarray([0]), np.asarray([0.5])]} + + options = [get_opts(N=8, ftol=1e-3, guess=guess), + get_opts(N=40, ftol=1e-3), + get_opts(N=60, ftol=1e-6), + ] + + solutions = direct_solver(model, options) + return model, solutions + +if __name__ == "__main__": + model, solutions = compute_pendulum_solutions() + plot_solutions(model, solutions, animate=True, pdf="direct_pendulum_real") diff --git a/irlc/ex05/direct_plot.py b/irlc/ex05/direct_plot.py new file mode 100644 index 0000000..d4a84cc --- /dev/null +++ b/irlc/ex05/direct_plot.py @@ -0,0 +1,82 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import matplotlib.pyplot as plt +import numpy as np +from irlc.ex03.control_model import plot_trajectory +from irlc import savepdf + +""" +Helper function for plotting. +""" +def plot_solutions(model, solutions, animate=True, pdf=None, plot_defects=True, Ix=None, animate_repeats=1, animate_all=False, plot=True): + + for k, sol in enumerate(solutions): + grd = sol['grid'] + x_res = sol['grid']['x'] + u_res = sol['grid']['u'] + ts = sol['grid']['ts'] + u_fun = lambda x, t: sol['fun']['u'](t) + N = len(ts) + if pdf is not None: + pdf_out = f"{pdf}_sol{N}" + + x_sim, u_sim, t_sim, cost = model.simulate(x0=grd['x'][0, :], u_fun=u_fun, t0=grd['ts'][0], tF=grd['ts'][-1], N_steps=1000) + if animate and (k == len(solutions)-1 or animate_all): + for _ in range(animate_repeats): + animate_rollout(model, x0=grd['x'][0, :], u_fun=u_fun, t0=grd['ts'][0], tF=grd['ts'][-1], N_steps=1000, fps=30) + + eqC_val = sol['eqC_val'] + labels = model.state_labels + + if Ix is not None: + labels = [l for k, l in enumerate(labels) if k in Ix] + x_res = x_res[:,np.asarray(Ix)] + x_sim = x_sim[:,np.asarray(Ix)] + + print("Initial State: " + ",".join(labels)) + print(x_res[0]) + print("Final State:") + print(x_res[-1]) + if plot: + ax = plot_trajectory(x_res, ts, lt='ko-', labels=labels, legend="Direct state prediction $x(t)$") + plot_trajectory(x_sim, t_sim, lt='-', ax=ax, labels=labels, legend="RK4 exact simulation") + # plt.suptitle("State", fontsize=14, y=0.98) + # make_space_above(ax, topmargin=0.5) + + if pdf is not None: + savepdf(pdf_out +"_x") + plt.show(block=False) + # print("plotting...") + plot_trajectory(u_res, ts, lt='ko-', labels=model.action_labels, legend="Direct action prediction $u(t)$") + # print("plotting... B") + # plt.suptitle("Action", fontsize=14, y=0.98) + # print("plotting... C") + # make_space_above(ax, topmargin=0.5) + # print("plotting... D") + if pdf is not None: + savepdf(pdf_out +"_u") + plt.show(block=False) + if plot_defects: + plot_trajectory(eqC_val, ts[:-1], lt='-', labels=labels) + plt.suptitle("Defects (equality constraint violations)") + if pdf is not None: + savepdf(pdf_out +"_defects") + plt.show(block=False) + return x_sim, u_sim, t_sim + + +def animate_rollout(model, x0, u_fun, t0, tF, N_steps = 1000, fps=10): + """ Helper function to animate a policy. """ + + import time + # if sys.gettrace() is not None: + # print("Not animating stuff in debugger as it crashes.") + # return + y, _, tt, cost = model.simulate(x0, u_fun, t0, tF, N_steps=N_steps) + secs = tF-t0 + frames = int( np.ceil( secs * fps ) ) + I = np.round( np.linspace(0, N_steps-1, frames)).astype(int) + y = y[I,:] + + for i in range(frames): + model.render(x=y[i], render_mode="human") + time.sleep(1/fps) diff --git a/irlc/ex05/model_cartpole.py b/irlc/ex05/model_cartpole.py new file mode 100644 index 0000000..b5c4f43 --- /dev/null +++ b/irlc/ex05/model_cartpole.py @@ -0,0 +1,171 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex04.discrete_control_cost import DiscreteQRCost +import sympy as sym +import numpy as np +import gymnasium as gym +from gymnasium.spaces import Box +from irlc.ex03.control_model import ControlModel +from irlc.ex03.control_cost import SymbolicQRCost +from irlc.ex04.discrete_control_model import DiscreteControlModel +from irlc.ex04.control_environment import ControlEnvironment + +class CartpoleModel(ControlModel): + state_labels = ["$x$", r"$\frac{dx}{dt}$", r"$\theta$", r"$\frac{d \theta}{dt}$"] + action_labels = ["Cart force $u$"] + + def __init__(self, mc=2, + mp=0.5, + l=0.5, + max_force=50, dist=1.0): + self.mc = mc + self.mp = mp + self.l = l + self.max_force = max_force + self.dist = dist + self.cp_render = {} + super().__init__() + + + def tF_bound(self) -> Box: + return Box(0.01, np.inf, shape=(1,), dtype=np.float64) + + def x_bound(self) -> Box: + return Box(np.asarray([-2 * self.dist, -np.inf, -2 * np.pi, -np.inf]), np.asarray([2 * self.dist, np.inf, 2 * np.pi, np.inf]), dtype=np.float64) + + def x0_bound(self) -> Box: + return Box(np.asarray([0, 0, np.pi, 0]), np.asarray([0, 0, np.pi, 0]), dtype=np.float64) + + def xF_bound(self) -> Box: + return Box(np.asarray([self.dist, 0, 0, 0]), np.asarray([self.dist, 0, 0, 0]), dtype=np.float64) + + def u_bound(self) -> Box: + return Box(np.asarray([-self.max_force]), np.asarray([self.max_force]), dtype=np.float64) + + def get_cost(self) -> SymbolicQRCost: + return SymbolicQRCost(R=np.eye(1) * 0, Q=np.eye(4) * 0, qc=1) # just minimum time + + def sym_f(self, x, u, t=None): + mp = self.mp + l = self.l + mc = self.mc + g = 9.81 # Gravity on earth. + + x_dot = x[1] + theta = x[2] + sin_theta = sym.sin(theta) + cos_theta = sym.cos(theta) + theta_dot = x[3] + F = u[0] + # Define dynamics model as per Razvan V. Florian's + # "Correct equations for the dynamics of the cart-pole system". + # Friction is neglected. + + # Eq. (23) + temp = (F + mp * l * theta_dot ** 2 * sin_theta) / (mc + mp) + numerator = g * sin_theta - cos_theta * temp + denominator = l * (4.0 / 3.0 - mp * cos_theta ** 2 / (mc + mp)) + theta_dot_dot = numerator / denominator + + # Eq. (24) + x_dot_dot = temp - mp * l * theta_dot_dot * cos_theta / (mc + mp) + xp = [x_dot, + x_dot_dot, + theta_dot, + theta_dot_dot] + return xp + + def close(self): + for r in self.cp_render.values(): + r.close() + + def render(self, x, render_mode="human"): + if render_mode not in self.cp_render: + self.cp_render[render_mode] = gym.make("CartPole-v1", render_mode=render_mode) # environment only used for rendering. Change to v1 in gym 0.26. + self.cp_render[render_mode].max_time_limit = 10000 + self.cp_render[render_mode].reset() + self.cp_render[render_mode].unwrapped.state = np.asarray(x) # environment is wrapped + return self.cp_render[render_mode].render() + +class SinCosCartpoleModel(CartpoleModel): + def phi_x(self, x): + x, dx, theta, theta_dot = x[0], x[1], x[2], x[3] + return [x, dx, sym.sin(theta), sym.cos(theta), theta_dot] + + def phi_x_inv(self, x): + x, dx, sin_theta, cos_theta, theta_dot = x[0], x[1], x[2], x[3], x[4] + theta = sym.atan2(sin_theta, cos_theta) # Obtain angle theta from sin(theta),cos(theta) + return [x, dx, theta, theta_dot] + + def phi_u(self, u): + return [sym.atanh(u[0] / self.max_force)] + + def phi_u_inv(self, u): + return [sym.tanh(u[0]) * self.max_force] + +def _cartpole_discrete_cost(model): + pole_length = model.continuous_model.l + + state_size = model.state_size + Q = np.eye(state_size) + Q[0, 0] = 1.0 + Q[1, 1] = Q[4, 4] = 0. + Q[0, 2] = Q[2, 0] = pole_length + Q[2, 2] = Q[3, 3] = pole_length ** 2 + + print("Warning: I altered the cost-matrix to prevent underflow. This is not great.") + R = np.array([[0.1]]) + Q_terminal = 1 * Q + + q = np.asarray([0,0,0,-1,0]) + # Instantaneous control cost. + c3 = DiscreteQRCost(Q=Q*0, R=R * 0.1, q=1 * q, qN=q * 1) + c3 += c3.goal_seeking_cost(Q=Q, x_target=model.x_upright) + c3 += c3.goal_seeking_terminal_cost(QN=Q_terminal, xN_target=model.x_upright) + cost = c3 + return cost + +class GymSinCosCartpoleModel(DiscreteControlModel): + state_labels = ['x', 'd_x', r'$\sin(\theta)$', r'$\cos(\theta)$', r'$d\theta/dt$'] + action_labels = ['Torque $u$'] + + def __init__(self, dt=0.02, cost=None, transform_actions=True, **kwargs): + model = SinCosCartpoleModel(**kwargs) + self.transform_actions = transform_actions + super().__init__(model=model, dt=dt, cost=cost) + self.x_upright = np.asarray(self.phi_x(model.xF_bound().low )) + if cost is None: + cost = _cartpole_discrete_cost(self) + self.cost = cost + + @property + def max_force(self): + return self.continuous_model.maxForce + + +class GymSinCosCartpoleEnvironment(ControlEnvironment): + def __init__(self, Tmax=5, transform_actions=True, supersample_trajectory=False, render_mode='human', **kwargs): + discrete_model = GymSinCosCartpoleModel(transform_actions=transform_actions, **kwargs) + self.observation_space = Box(low=-np.inf, high=np.inf, shape=(5,), dtype=np.float64) + if transform_actions: + self.action_space = Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float64) + super().__init__(discrete_model, Tmax=Tmax,render_mode=render_mode, supersample_trajectory=supersample_trajectory) + + +class DiscreteCartpoleModel(DiscreteControlModel): + def __init__(self, dt=0.02, cost=None, **kwargs): + model = CartpoleModel(**kwargs) + super().__init__(model=model, dt=dt, cost=cost) + + +class CartpoleEnvironment(ControlEnvironment): + def __init__(self, Tmax=5, supersample_trajectory=False, render_mode='human', **kwargs): + discrete_model = DiscreteCartpoleModel(**kwargs) + super().__init__(discrete_model, Tmax=Tmax, supersample_trajectory=supersample_trajectory, render_mode=render_mode) + + +if __name__ == "__main__": + from irlc import Agent, train + env = GymSinCosCartpoleEnvironment(render_mode='human') + agent = Agent(env) + stats, traj = train(env, agent, num_episodes=1, max_steps=100) + env.close() diff --git a/irlc/ex05/pendulum_model_variant.py b/irlc/ex05/pendulum_model_variant.py new file mode 100644 index 0000000..8b1d739 --- /dev/null +++ b/irlc/ex05/pendulum_model_variant.py @@ -0,0 +1,21 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import sympy as sym +from irlc.ex03.control_model import ControlModel +from irlc.ex03.control_cost import SymbolicQRCost +from irlc.ex04.discrete_control_model import DiscreteControlModel +import gymnasium as gym +from gymnasium.spaces.box import Box +from irlc.ex04.control_environment import ControlEnvironment +import numpy as np +from irlc.ex04.model_pendulum import PendulumModel + + +class DirectPendulumModel(PendulumModel): + def __init__(self, l=1., m=.8, max_torque=4.0): + super().__init__(l=l, m=m, max_torque=max_torque) + + def get_cost(self) -> SymbolicQRCost: + return SymbolicQRCost(R=np.ones((1, 1)), Q=0 * np.eye(2)) + + def tF_bound(self) -> Box: + return Box(0.5, 3, shape=(1,), dtype=float) -- GitLab