Skip to content
Snippets Groups Projects
control_model.py 17.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • tuhe's avatar
    tuhe committed
    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
    # This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
    """
    
    References:
      [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
    """
    from collections import defaultdict
    import tabulate
    import sympy as sym
    import numpy as np
    import matplotlib.pyplot as plt
    from gymnasium.spaces import Box
    from irlc.ex03.control_cost import SymbolicQRCost
    
    
    class ControlModel: 
        r"""Represents the continious time model of a control environment.
    
    
        See (Her25, Section 13.2) for a top-level description.
    
        The model represents the physical system we are simulating and can be considered a control-equivalent of the
        :class:`irlc.ex02.dp_model.DPModel`. The class must keep track of the following:
    
        .. math::
            \frac{dx}{dt} = f(x, u, t)
    
        And the cost-function which is defined as an integral
    
        .. math::
            c_F(t_0, t_F, x(t_0), x(t_F)) + \int_{t_0}^{t_F} c(t, x, u) dt
    
        as well as constraints and boundary conditions on :math:`x`, :math:`u` and the initial conditions state :math:`x(t_0)`.
        this course, the cost function will always be quadratic, and can be accessed as ``model.get_cost``.
    
        If you want to implement your own model, the best approach is to start with an existing model and modify it for
        your needs. The overall idea is that you implement the dynamics,``sym_f``, and the cost function ``get_cost``,
        and optionally define bounds as needed.
        """
        state_labels = None     # Labels (as lists) used for visualizations.
        action_labels = None    # Labels (as lists) used for visualizations.
    
        def __init__(self): 
            """
            The cost must be an instance of :class:`irlc.ex04.cost_continuous.SymbolicQRCost`.
            Bounds is a dictionary but otherwise optional; the model should give it a default value.
    
            :param cost: A quadratic cost function
            :param dict bounds: A dictionary of boundary constraints.
            """
            if self.state_labels is None:
                self.state_labels = [f'x{i}' for i in range(self.state_size)]
            if self.action_labels is None:
                self.action_labels = [f'u{i}' for i in range(self.action_size)]
    
            t = sym.symbols("t") 
            x = sym.symbols(f"x0:{self.state_size}")
            u = sym.symbols(f"u0:{self.action_size}")
            try:
                f = self.sym_f(x, u, t)
            except Exception as e:
                print("control_model.py> There is a problem with the way you have specified the dynamics. The function sym_f must accept lists as inputs")
                raise e
            if len(f) != len(x):
                print("control_model.py> Your function ControlModel.sym_f must output a list of symbolic expressions.")
                assert len(f) == len(x)
    
            self._f_np = sym.lambdify((x, u, t), self.sym_f(x, u, t))  
    
        def x0_bound(self) -> Box: 
            r"""The bound on the initial state :math:`\mathbf{x}_0`.
    
            The default bound is ``Box(0, 0, shape=(self.state_size,))``, i.e. :math:`\mathbf{x}_0 = 0`.
    
            :return: An appropriate gymnasium Box instance.
            """
            return Box(0, 0, shape=(self.state_size,)) 
    
        def xF_bound(self) -> Box: 
            r"""The bound on the terminal state :math:`\mathbf{x}_F`.
    
            :return: An appropriate gymnasium Box instance.
            """
            return Box(-np.inf, np.inf, shape=(self.state_size,)) 
    
        def x_bound(self) -> Box: 
            r"""The bound on all other states :math:`\mathbf{x}(t)`.
    
            :return: An appropriate gymnasium Box instance.
            """
            return Box(-np.inf, np.inf, shape=(self.state_size,)) 
    
        def u_bound(self) -> Box: 
            r"""The bound on the terminal state :math:`\mathbf{u}(t)`.
    
            :return: An appropriate gymnasium Box instance.
            """
            return Box(-np.inf, np.inf, shape=(self.action_size,)) 
    
        def t0_bound(self) -> Box: 
            r"""The bound on the initial time :math:`\mathbf{t}_0`.
    
            I have included this bound for completeness: In practice, there is no reason why you should change it
            from the default bound is ``Box(0, 0, shape=(1,))``, i.e. :math:`\mathbf{t}_0 = 0`.
    
            :return: An appropriate gymnasium Box instance.
            """
            return Box(0, 0, shape=(1,)) 
    
        def tF_bound(self) -> Box:  
            r"""The bound on the final time :math:`\mathbf{t}_F`, i.e. when the environment terminates.
    
            :return: An appropriate gymnasium Box instance.
            """
            return Box(-np.inf, np.inf, shape=(1,)) 
    
        def get_cost(self) -> SymbolicQRCost:
            """Return the cost.
    
            This function should return a :class:`SymbolicQRCost` instance. The function must be implemented
            since the cost object is used to guess the number of states and actions. Note that you can specify a trivial cost
            instance such as :python:`return SymbolicQRCost.zero(n_states, n_actions)`.
    
            :return: A :class:`SymbolicQRCost` instance representing the models cost function.
            """
            raise NotImplementedError("When you implement the model, you must implement the get_cost() function.\nfor instance, use return SymbolicQRCost(Q=np.eye(n), R=np.eye(d))")
    
        def sym_f(self, x, u, t=None): 
            """
            The symbolic (``sympy``) version of the dynamics :math:`f(x, u, t)`. This is the main place where you specify
            the dynamics when you build a new model. you should look at concrete implementations of models for specifics.
    
            :param x: A list of symbolic expressions ``['x0', 'x1', ..]`` corresponding to :math:`x`
            :param u: A list of symbolic expressions ``['u0', 'u1', ..]`` corresponding to :math:`u`
            :param t: A single symbolic expression corresponding to the time :math:`t` (seconds)
            :return: A list of symbolic expressions ``[f0, f1, ...]`` of the same length as ``x`` where each element is a coordinate of :math:`f`
            """
            raise NotImplementedError("Implement a function which return the environment dynamics f(x,u,t) as a sympy exression") 
    
        def f(self, x, u, t=0) -> np.ndarray:
            r"""Evaluate the dynamics.
    
            This function will evaluate the dynamics. In other words, it will evaluate :math:`\mathbf{f}` in the following expression:
    
            .. math::
    
                \dot{\mathbf{x}} = \mathbf{f}(\mathbf{x}, \mathbf{u}, t)
    
            :param x: A numpy ndarray corresponding to the state
            :param u: A numpy ndarray corresponding to the control
            :param t: A :python:`float` corresponding to the time.
            :return: The time derivative of the state, :math:`\mathbf{x}(t)`.
            """
            return np.asarray( self._f_np(x, u, t) )
    
    
        def simulate(self, x0, u_fun, t0, tF, N_steps=1000, method='rk4'):  
            r"""Simulate the effect of a policy on the model.
    
            By default, it uses Runge-Kutta 4 (RK4) with a fine discretization -- this is slow, but in nearly all cases exact. See (Her25, Algorithm 18) for more information.
    
            The input argument ``u_fun`` should be a function which returns a list or tuple with same dimension as
            ``model.action_space``, :math:`d`.
    
            :param x0: The initial state of the simulation. Must be a list of floats of same dimension as ``env.observation_space``, :math:`n`.
            :param u_fun: Can be either:
                - Either a policy function that can be called as ``u_fun(x, t)`` and returns an action ``u`` in the ``action_space``
                - A single action (i.e. a list of floats of same length as the action space). The model will be simulated with a constant action in this case.
            :param float t0: Starting time :math:`t_0`
            :param float tF: Stopping time :math:`t_F`; the model will be simulated for :math:`t_F - t_0` seconds
            :param int N_steps: Steps :math:`N` in the RK4 simulation
            :param str method: Simulation method. Either ``'rk4'`` (default) or ``'euler'``
    
            The last output argument is the total cost of the trajectory:
    
            .. math::
    
                \int_{t_0}^{t_F} c(x(t), u(t), t) dt + c_f(t_0, t_F, x(t_0), x(t_F) )
    
            the integral is approximated using the simulated trajectory.
    
            :return:
                - xs - A numpy ``ndarray`` of dimension :math:`(N+1)\\times n` containing the observations :math:`x`
                - us - A numpy ``ndarray`` of dimension :math:`(N+1)\\times d` containing the actions :math:`u`
                - ts - A numpy ``ndarray`` of dimension :math:`(N+1)` containing the corresponding times :math:`t` (seconds)
                - total_cost - A ``float`` of the total cost of the trajectory.
            """
            assert N_steps >= 1, "Simulation should include at least 1 time step."
            u_fun = ensure_policy(u_fun)
            tt = np.linspace(t0, tF, N_steps+1)   # Time grid t_k = tt[k] between t0 and tF.
            xs = [ np.asarray(x0) ]
            us = [ u_fun(x0, t0 )]
            for k in range(N_steps):
                Delta = tt[k+1] - tt[k]
                tn = tt[k]
                xn = xs[k]
                un = us[k]   # ensure the action u is a vector.
                unp = u_fun(xn, tn + Delta)
                if method == 'rk4':
                    r""" Implementation of RK4 here. See: (Her25, Algorithm 18) """
                    k1 = np.asarray(self.f(xn, un, tn))
                    k2 = np.asarray(self.f(xn + Delta * k1/2, u_fun(xn, tn+Delta/2), tn+Delta/2))
                    k3 = np.asarray(self.f(xn + Delta * k2/2, u_fun(xn, tn+Delta/2), tn+Delta/2))
                    k4 = np.asarray(self.f(xn + Delta * k3,   u_fun(xn, tn + Delta), tn+Delta))
                    xnp = xn + 1/6 * Delta * (k1 + 2*k2 + 2*k3 + k4)
                elif method == 'euler':
                    xnp = xn + Delta * np.asarray(self.f(xn, un, tn))
                else:
                    raise Exception("Bad integration method", method)
                xs.append(xnp)
                us.append(unp)
            xs = np.stack(xs, axis=0)
            us = np.stack(us, axis=0)
    
            # Compute the cost.
            cost = self.get_cost()
            total_cost = float( sum( [ Delta * cost._private_evaluate_numpy_c(x, u, k) for k, (x, u) in enumerate(zip(xs[:-1], us[:-1]))] ) + cost._private_evaluate_numpy_cf(t0, tF, x0, xs[-1]) )
            return xs, us, tt, total_cost 
    
        @property
        def state_size(self):
            """
            This field represents the dimensionality of the state-vector :math:`n`. Use it as ``model.state_size``
            :return: Dimensionality of the state vector :math:`x`
            """
            return self.get_cost().state_size
    
        @property
        def action_size(self):
            """
            This field represents the dimensionality of the action-vector :math:`d`. Use it as ``model.action_size``
            :return: Dimensionality of the action vector :math:`u`
            """
            return self.get_cost().action_size
    
        def render(self, x, render_mode="human"):
            """
            Responsible for rendering the state. You don't have to worry about this function.
    
            :param x: State to render
            :param str render_mode: Rendering mode. Select ``"human"`` for a visualization.
            :return:  Either none or a ``ndarray`` for plotting.
            """
            raise NotImplementedError()
    
        def close(self):
            pass
    
        def phi_x(self, x : list) -> list:
            r"""Coordinate transformation of the state when the model is discretized.
    
            This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is
            discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list
            of symbols corresponding to the (discrete) coordinates.
    
            :param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)`
            :return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`.
            """
            return x
    
        def phi_x_inv(self, x: list) -> list:
            r"""Inverse of coordinate transformation for the state.
    
            This function should specify the inverse of the coordinate transformation :math:`\Phi_x`, i.e. :math:`\Phi_x^{-1}`.
            In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`x(t) = \Phi_x^{-1}(x_k)`.
    
            :param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}_k`
            :return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{x}(t)`.
            """
            return x
    
        def phi_u(self, u: list)  -> list:
            r"""Coordinate transformation of the action when the model is discretized.
    
            This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is
            discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list
            of symbols corresponding to the (discrete) coordinates.
    
            :param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)`
            :return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`.
            """
            return u
    
        def phi_u_inv(self, u: list)  -> list:
            r"""Inverse of coordinate transformation for the action.
    
            This function should specify the inverse of the coordinate transformation :math:`\Phi_u`, i.e. :math:`\Phi_u^{-1}`.
            In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`u(t) = \Phi_u^{-1}(u_k)`.
    
            :param x: A list of symbols ``[u0, u1, ..., ud]`` corresponding to :math:`\mathbf{u}_k`
            :return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{u}(t)`.
            """
            return u
    
        def __str__(self):
            """
            Return a string representation of the model. This is a potentially helpful way to summarize the content of the
            model. You can use it as:
    
            .. runblock:: pycon
    
                >>> from irlc.ex04.model_pendulum import SinCosPendulumModel
                >>> model = SinCosPendulumModel()
                >>> print(model)
    
            :return: A string containing the details of the model.
            """
            split = "-"*20
            s = [f"{self.__class__}"] + ['='*50]
            s += ["Dynamics:", split]
            t = sym.symbols("t")
            x = sym.symbols(f"x0:{self.state_size}")
            u = sym.symbols(f"u0:{self.action_size}")
    
            s += [typeset_eq(x, u, self.sym_f(x, u, t) )]
    
            s += ["Cost:", split, str(self.get_cost())]
    
            dd = defaultdict(list)
            bounds = [ ('x', self.x_bound()), ('x0', self.x0_bound()), ('xF', self.xF_bound()),
                       ('u', self.u_bound()),
                       ('t0', self.t0_bound()), ('tF', self.tF_bound())]
    
            for v, box in bounds:
                if (box.low == -np.inf).all() and (box.high == np.inf).all():
                    continue
                dd['low'].append(box.low_repr)
                dd['variable'].append("<= " + v + " <=")
                dd['high'].append(box.high_repr)
    
            if len(dd) > 0:
                s += ["Bounds:", split]
                s += [tabulate.tabulate(dd, headers='keys')]
            else:
                s += ['No bounds are applied to the x and u-variables.']
            return "\n".join(s)
    
    
    def symv(s, n):
        """
        Returns a vector of symbolic functions. For instance if s='x' and n=3 then it will return
        [x0,x1,x2]
        where x0,..,x2 are symbolic variables.
        """
        return sym.symbols(" ".join(["%s%i," % (s, i) for i in range(n)]))
    
    def ensure_policy(u):
        """
        Ensure u corresponds to a policy function with input arguments u(x, t)
        """
        if callable(u):
            return lambda x, t: np.asarray(u(x,t)).reshape((-1,))
        else:
            return lambda x, t: np.asarray(u).reshape((-1,))
    
    def plot_trajectory(x_res, tt, lt='k-', ax=None, labels=None, legend=None):
        M = x_res.shape[1]
        if labels is None:
            labels = [f"x_{i}" for i in range(M)]
    
        if ax is None:
            if M == 2:
                a = 234
            if M == 3:
                r = 1
                c = 3
            else:
                r = 2 if M > 1 else 1
                c = (M + 1) // 2
    
            H = 2*r if r > 1 else 3
            W = 6*c
            # if M == 2:
            #     W = 12
            f, ax = plt.subplots(r,c, figsize=(W,H))
            if M == 1:
                ax = np.asarray([ax])
            print(M,r,c)
    
        for i in range(M):
            if len(ax) <= i:
                print("issue!")
    
            a = ax.flat[i]
            a.plot(tt, x_res[:, i], lt, label=legend)
    
            a.set_xlabel("Time/seconds")
            a.set_ylabel(labels[i])
            # a.set_title(labels[i])
            a.grid(True)
            if legend is not None and i == 0:
                a.legend()
            # if i == M:
        plt.tight_layout()
        return ax
    
    def make_space_above(axes, topmargin=1.0):
        """ increase figure size to make topmargin (in inches) space for
            titles, without changing the axes sizes"""
        fig = axes.flatten()[0].figure
        s = fig.subplotpars
        w, h = fig.get_size_inches()
    
        figh = h - (1-s.top)*h  + topmargin
        fig.subplots_adjust(bottom=s.bottom*h/figh, top=1-topmargin/figh)
        fig.set_figheight(figh)
    
    def typeset_eq(x, u, f):
        def ascii_vector(ls):
            ml = max(map(len, ls))
            ls = [" " * (ml - len(s)) + s for s in ls]
            ls = ["[" + s + "]" for s in ls]
            return "\n".join(ls)
    
        v = [str(z) for z in f]
    
        def cstack(ls: list):
            # ls = [l.splitlines() for l in ls]
            height = max([len(l) for l in ls])
            widths = [len(l[0]) for l in ls]
    
            for k in range(len(ls)):
                missing2 = (height - len(ls[k])) // 2
                missing1 = (height - len(ls[k]) - missing2)
                tpad = [" " * widths[k]] * missing1
                bpad = [" " * widths[k]] * missing2
                ls[k] = tpad + ls[k] + bpad
    
            r = [""] * len(ls[0])
            for w in range(len(ls)):
                for h in range(len(ls[0])):
                    r[h] += ls[w][h]
    
            return r
    
        xx = [str(x) for x in x]
        uu = [str(u) for u in u]
        xx = ascii_vector(xx).splitlines()
        uu = ascii_vector(uu).splitlines()
        cm = cstack([xx, [", "], uu])
        eq = cstack([["f("], cm, [")"]])
        eq = cstack([["  "], eq, [" = "], ascii_vector(v).splitlines()])
        return "\n".join(eq)