From b7cbf41a8dca8a7a7bcca7615f05549b3fea1321 Mon Sep 17 00:00:00 2001 From: Tue Herlau <tuhe@dtu.dk> Date: Mon, 10 Feb 2025 18:13:41 +0100 Subject: [PATCH] Exercise 03 --- irlc/ex03/__init__.py | 2 + irlc/ex03/basic_pendulum.py | 39 +++ irlc/ex03/control_cost.py | 306 +++++++++++++++++++++ irlc/ex03/control_model.py | 443 ++++++++++++++++++++++++++++++ irlc/ex03/inventory_evaluation.py | 26 ++ irlc/ex03/kuramoto.py | 123 +++++++++ irlc/ex03/toy_2d_control.py | 23 ++ 7 files changed, 962 insertions(+) create mode 100644 irlc/ex03/__init__.py create mode 100644 irlc/ex03/basic_pendulum.py create mode 100644 irlc/ex03/control_cost.py create mode 100644 irlc/ex03/control_model.py create mode 100644 irlc/ex03/inventory_evaluation.py create mode 100644 irlc/ex03/kuramoto.py create mode 100644 irlc/ex03/toy_2d_control.py diff --git a/irlc/ex03/__init__.py b/irlc/ex03/__init__.py new file mode 100644 index 0000000..01980ca --- /dev/null +++ b/irlc/ex03/__init__.py @@ -0,0 +1,2 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +"""This directory contains the exercises for week 3.""" diff --git a/irlc/ex03/basic_pendulum.py b/irlc/ex03/basic_pendulum.py new file mode 100644 index 0000000..200f08b --- /dev/null +++ b/irlc/ex03/basic_pendulum.py @@ -0,0 +1,39 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import sympy as sym +import numpy as np +from irlc.ex03.control_model import ControlModel +from irlc.ex03.control_cost import SymbolicQRCost +from gymnasium.spaces import Box + +class BasicPendulumModel(ControlModel): + def sym_f(self, x, u, t=None): + g = 9.82 + l = 1 + m = 2 + theta_dot = x[1] # Parameterization: x = [theta, theta'] + theta_dot_dot = g / l * sym.sin(x[0]) + 1 / (m * l ** 2) * u[0] + return [theta_dot, theta_dot_dot] + + def get_cost(self) -> SymbolicQRCost: + return SymbolicQRCost(Q=np.eye(2), R=np.eye(1)) + + def u_bound(self) -> Box: + return Box(np.asarray([-10]), np.asarray([10]), dtype=float) + + def x0_bound(self) -> Box: + return Box(np.asarray( [np.pi, 0] ), np.asarray( [np.pi, 0]), dtype=float) + +if __name__ == "__main__": + p = BasicPendulumModel() + print(p) + + from irlc.ex04.discrete_control_model import DiscreteControlModel + model = BasicPendulumModel() + discrete_pendulum = DiscreteControlModel(model, dt=0.5) # Using a discretization time step: 0.5 seconds. + x0 = model.x0_bound().low # Get the initial state: x0 = [np.pi, 0]. + u0 = [0] # No action. Note the action must be a list. + x1 = discrete_pendulum.f(x0, u0) + print(x1) + print("Now, lets compute the Euler step manually to confirm") + x1_manual = x0 + 0.5 * model.f(x0, u0, 0) + print(x1_manual) diff --git a/irlc/ex03/control_cost.py b/irlc/ex03/control_cost.py new file mode 100644 index 0000000..47b53b9 --- /dev/null +++ b/irlc/ex03/control_cost.py @@ -0,0 +1,306 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025. +""" +import sympy as sym +import numpy as np +def mat(x): # Helper function. + return sym.Matrix(x) if x is not None else x + + +class SymbolicQRCost: + r""" + This class represents the cost function for a continuous-time model. In the simulations, we are going to assume + that the cost function takes the form: + + .. math:: + \int_{t_0}^{t_F} c(x(t), u(t)) dt + c_F(x_F) + + And this class will specifically implement the two functions :math:`c` and :math:`c_F`. They will be assumed to have the quadratic form: + + .. math:: + c(x, u) & = \frac{1}{2} x^T Q x + \frac{1}{2} u^T R u + u^T H x + q^T x + r^T u + q_0, \\ + c_F(x_F) & = \frac{1}{2} x_F^T Q_F x_F + q_F^T x_F + q_{0,F}. + + So what all of this boils down to is that the class just need to store a bunch of matrices and vectors. + + You can add and scale cost-functions + ********************************************************** + + A slightly smart thing about the cost functions are that you can add and scale them. The following provides an + example: + + .. runblock:: pycon + + >>> from irlc.ex03.control_cost import SymbolicQRCost + >>> import numpy as np + >>> cost1 = SymbolicQRCost(np.eye(2), np.zeros(1) ) # Set Q = I, R = 0 + >>> cost2 = SymbolicQRCost(np.ones((2,2)), np.zeros(1) ) # Set Q = 2x2 matrices of 1's, R = 0 + >>> print(cost1.Q) # Will be the identity matrix. + >>> cost = cost1 * 3 + cost2 * 2 + >>> print(cost.Q) # Will be 3 x I + 2 + + """ + + def __init__(self, Q, R, q=None, qc=None, r=None, H=None, QN=None, qN=None, qcN=None): + """ + The constructor can be used to manually create a cost function. You will rarely want to call the constructor + directly but instead use the helper methods (see class documentation). + What the class basically does is that it stores the input parameters as fields. In other words, you can access the quadratic + term of the cost function, :math:`\\frac{1}{2}x^T Q x`, as ``cost.Q``. + + :param Q: The matrix :math:`Q` + :param R: The matrix :math:`R` + :param q: The vector :math:`q` + :param qc: The constant :math:`q_0` + :param r: The vector :math:`r` + :param H: The matrix :math:`H` + :param QN: The terminal cost matrix :math:`Q_N` + :param qN: The terminal cost vector :math:`q_N` + :param qcN: The terminal cost constant :math:`q_{0,N}` + """ + + n = Q.shape[0] + d = R.shape[0] + self.Q = Q + self.R = R + self.q = np.zeros( (n,)) if q is None else q + self.qc = 0 if qc == None else qc + self.r = np.zeros( (d,)) if r is None else r + self.H = np.zeros((d,n)) if H is None else H + self.QN = np.zeros((n,n)) if QN is None else QN + self.qN = np.zeros((n,)) if qN is None else qN + self.qcN = 0 if qcN == None else qcN + self.flds = ('Q', 'R', 'q', 'qc', 'r', 'H', 'QN', 'qN', 'qcN') + self.flds_term = ('QN', 'qN', 'qcN') + + # self.c_numpy = None + # self.cF_numpy = None + + + @classmethod + def zero(cls, state_size, action_size): + """ + Creates an all-zero cost function, i.e. all terms :math:`Q`, :math:`R` are set to zer0. + + .. runblock:: pycon + + >>> from irlc.ex03.control_cost import SymbolicQRCost + >>> cost = SymbolicQRCost.zero(2, 1) + >>> cost.Q # 2x2 zero matrix + >>> cost.R # 1x1 zero matrix. + + :param state_size: Dimension of the state vector :math:`n` + :param action_size: Dimension of the action vector :math:`d` + :return: A ``SymbolicQRCost`` with all zero terms. + """ + + return cls(Q=np.zeros( (state_size,state_size)), R=np.zeros((action_size,action_size)) ) + + + def sym_c(self, x, u, t=None): + """ + Evaluate the (instantaneous) part of the function :math:`c(x,u, t)`. An example: + + .. runblock:: pycon + + >>> from irlc.ex03.control_cost import SymbolicQRCost + >>> import numpy as np + >>> cost = SymbolicQRCost(np.eye(2), np.eye(1)) # Set Q = I, R = 0 + >>> cost.sym_c(x = np.asarray([1,2]), u=np.asarray([0])) # should return 0.5 * x^T Q x = 0.5 * (1 + 4) + + :param x: The state :math:`x(t)` + :param u: The action :math:`u(t)` + :param t: The current time step :math:`t` (this will be ignored) + :return: A ``sympy`` symbolic expression corresponding to the instantaneous cost. + """ + u = sym.Matrix(u) + x = sym.Matrix(x) + c = 1 / 2 * (x.transpose() @ self.Q @ x) + 1 / 2 * (u.transpose() @ self.R @ u) + u.transpose() @ self.H @ x + sym.Matrix(self.q).transpose() @ x + sym.Matrix(self.r).transpose() @ u + sym.Matrix([[self.qc]]) + assert c.shape == (1,1) + return c[0,0] + + + def sym_cf(self, t0, tF, x0, xF): + """ + Evaluate the terminal (constant) term in the cost function :math:`c_F(t_0, t_F, x_0, x_F)`. An example: + + .. runblock:: pycon + + >>> from irlc.ex03.control_cost import SymbolicQRCost + >>> import numpy as np + >>> cost = SymbolicQRCost(np.eye(2), np.zeros(1), QN=np.eye(2)) # Set Q = I, R = 0 + >>> cost.sym_cf(0, 0, 0, xF=2*np.ones((2,))) # should return 0.5 * xF^T * xF = 0.5 * 8 + + :param t0: Starting time :math:`t_0` (not used) + :param tF: Stopping time :math:`t_F` (not used) + :param x0: Initial state :math:`x_0` (not used) + :param xF: Termi lanstate :math:`x_F` (**this one is used**) + :return: A ``sympy`` symbolic expression corresponding to the terminal cost. + """ + xF = sym.Matrix(xF) + c = 0.5 * xF.transpose() @ self.QN @ xF + xF.transpose() @ sym.Matrix(self.qN) + sym.Matrix([[self.qcN]]) + assert c.shape == (1,1) + return c[0,0] + + def discretize(self, dt): + r""" + Discretize the cost function so it is suitable for a discrete control problem. See (Her25, Subsection 13.1.5) for more information. + + :param dt: The discretization time step :math:`\Delta` + :return: An :class:`~irlc.ex04.cost_discrete.DiscreteQRCost` instance corresponding to a discretized version of this cost function. + """ + from irlc.ex04.discrete_control_cost import DiscreteQRCost + return DiscreteQRCost(**{f: self.__getattribute__(f) * (1 if f in self.flds_term else dt) for f in self.flds} ) + + + def __add__(self, c): + return SymbolicQRCost(**{k: self.__dict__[k] + c.__dict__[k] for k in self.flds}) + + def __mul__(self, c): + return SymbolicQRCost(**{k: self.__dict__[k] * c for k in self.flds}) + + def __str__(self): + title = "Continuous-time cost function" + label1 = "Non-zero terms in c(x, u)" + label2 = "Non-zero terms in c_F(x)" + terms1 = [s for s in self.flds if s not in self.flds_term] + terms2 = self.flds_term + return _repr_cost(self, title, label1, label2, terms1, terms2) + + def goal_seeking_terminal_cost(self, xF_target, QF=None): + """ + Create a cost function which is minimal when the terminal state :math:`x_F` is equal to a goal state :math:`x_F^*`. + Concretely, it will return a cost function of the form + + .. math:: + c_F(x_F) = \\frac{1}{2} (x_F^* - x_F)^\\top Q_F (x_F^* - x_F) + + .. runblock:: pycon + + >>> from irlc.ex03.control_cost import SymbolicQRCost + >>> import numpy as np + >>> cost = SymbolicQRCost.zero(2, 1) + >>> cost += cost.goal_seeking_terminal_cost(xF_target=np.ones((2,))) + >>> print(cost.qN) + >>> print(cost) + + :param xF_target: Target state :math:`x_F^*` + :param QF: Cost matrix :math:`Q_F` + :return: A ``SymbolicQRCost`` object corresponding to the goal-seeking cost function + """ + if QF is None: + QF = np.eye(xF_target.size) + QF, qN, qcN = targ2matrices(xF_target, Q=QF) + return SymbolicQRCost(Q=self.Q*0, R=self.R*0, QN=QF, qN=qN, qcN=qcN) + + def goal_seeking_cost(self, x_target, Q=None): + """ + Create a cost function which is minimal when the state :math:`x` is equal to a goal state :math:`x^*`. + Concretely, it will return a cost function of the form + + .. math:: + c(x, u) = \\frac{1}{2} (x^* - x)^\\top Q (x^* - x) + + .. runblock:: pycon + + >>> from irlc.ex03.control_cost import SymbolicQRCost + >>> import numpy as np + >>> cost = SymbolicQRCost.zero(2, 1) + >>> cost += cost.goal_seeking_cost(x_target=np.ones((2,))) + >>> print(cost.q) + >>> print(cost) + + :param x_target: Target state :math:`x^*` + :param Q: Cost matrix :math:`Q` + :return: A ``SymbolicQRCost`` object corresponding to the goal-seeking cost function + """ + if Q is None: + Q = np.eye(x_target.size) + Q, q, qc = targ2matrices(x_target, Q=Q) + return SymbolicQRCost(Q=Q, R=self.R*0, q=q, qc=qc) + + def term(self, Q=None, R=None,r=None): + dd = {} + lc = locals() + for f in self.flds: + if f in lc and lc[f] is not None: + dd[f] = lc[f] + else: + dd[f] = self.__getattribute__(f)*0 + return SymbolicQRCost(**dd) + + @property + def state_size(self): + return self.Q.shape[0] + + @property + def action_size(self): + return self.R.shape[0] + + def _private_evaluate_numpy_c(self, x, u, t): + if not hasattr(self, '_np_c') or self._np_c is None: + sx = sym.symbols(f"x0:{self.state_size}") + su = sym.symbols(f"u0:{self.action_size}") + st = sym.symbols('t') + self._np_c = sym.lambdify( (sx,su,st), self.sym_c(sx, su, st) ) + return float( self._np_c(x, u, t) ) + + def _private_evaluate_numpy_cf(self, t0, tF, x0, xF): + if not hasattr(self, '_np_cF') or self._np_cF is None: + sx0 = sym.symbols(f"x0:{self.state_size}") + sxF = sym.symbols(f"x0:{self.state_size}") + st0, stF = sym.symbols('t0 tF') + self._np_cF = sym.lambdify( (st0, stF, sx0, sxF), self.sym_cf(st0, stF, sx0, sxF) ) + + return float( self._np_cF(t0, tF, x0, xF) ) + + + + +def _repr_cost(cost, title, label1, label2, terms1, terms2): + self = cost + def _get(flds, label): + d = {s: self.__dict__[s] for s in flds if np.sum(np.sum(self.__dict__[s] != 0)) != 0} + out = "" + if len(d) > 0: + # out = "" + out += f"> {label}:\n" + for s, m in d.items(): + mm = f"{m}" + if len(mm.splitlines()) > 1: + mm = "\n" + mm + out += f" * {s} = {mm}\n" + + return d, out + + nz_c, o1 = _get([s for s in terms1], label1) + out = "" + out += f"{title}:\n" + out += o1 + nz_term, o2 = _get(terms2, label2) + out += o2 + if len(nz_c) + len(nz_term) == 0: + print("All terms in the cost-function are zero.") + return out + + + +def targ2matrices(t, Q=None): # Helper function + """ + Given a target vector :math:`t` and a matrix :math:`Q` this function returns cost-matrices suitable for implementing: + + .. math:: + \\frac{1}{2} * (x - t)^Q (x - t) = \\frac{1}{2} * x^T Q x + 1/2 * t^T * t - x * t + + :param t: + :param Q: + :return: + """ + n = t.size + if Q is None: + Q = np.eye(n) + + return Q, -1/2 * (Q @ t + t @ Q.T), 1/2 * t @ Q @ t diff --git a/irlc/ex03/control_model.py b/irlc/ex03/control_model.py new file mode 100644 index 0000000..c9426c9 --- /dev/null +++ b/irlc/ex03/control_model.py @@ -0,0 +1,443 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025. +""" +from collections import defaultdict +import tabulate +import sympy as sym +import numpy as np +import matplotlib.pyplot as plt +from gymnasium.spaces import Box +from irlc.ex03.control_cost import SymbolicQRCost + + +class ControlModel: + r"""Represents the continious time model of a control environment. + + + See (Her25, Section 13.2) for a top-level description. + + The model represents the physical system we are simulating and can be considered a control-equivalent of the + :class:`irlc.ex02.dp_model.DPModel`. The class must keep track of the following: + + .. math:: + \frac{dx}{dt} = f(x, u, t) + + And the cost-function which is defined as an integral + + .. math:: + c_F(t_0, t_F, x(t_0), x(t_F)) + \int_{t_0}^{t_F} c(t, x, u) dt + + as well as constraints and boundary conditions on :math:`x`, :math:`u` and the initial conditions state :math:`x(t_0)`. + this course, the cost function will always be quadratic, and can be accessed as ``model.get_cost``. + + If you want to implement your own model, the best approach is to start with an existing model and modify it for + your needs. The overall idea is that you implement the dynamics,``sym_f``, and the cost function ``get_cost``, + and optionally define bounds as needed. + """ + state_labels = None # Labels (as lists) used for visualizations. + action_labels = None # Labels (as lists) used for visualizations. + + def __init__(self): + """ + The cost must be an instance of :class:`irlc.ex04.cost_continuous.SymbolicQRCost`. + Bounds is a dictionary but otherwise optional; the model should give it a default value. + + :param cost: A quadratic cost function + :param dict bounds: A dictionary of boundary constraints. + """ + if self.state_labels is None: + self.state_labels = [f'x{i}' for i in range(self.state_size)] + if self.action_labels is None: + self.action_labels = [f'u{i}' for i in range(self.action_size)] + + t = sym.symbols("t") + x = sym.symbols(f"x0:{self.state_size}") + u = sym.symbols(f"u0:{self.action_size}") + try: + f = self.sym_f(x, u, t) + except Exception as e: + print("control_model.py> There is a problem with the way you have specified the dynamics. The function sym_f must accept lists as inputs") + raise e + if len(f) != len(x): + print("control_model.py> Your function ControlModel.sym_f must output a list of symbolic expressions.") + assert len(f) == len(x) + + self._f_np = sym.lambdify((x, u, t), self.sym_f(x, u, t)) + + def x0_bound(self) -> Box: + r"""The bound on the initial state :math:`\mathbf{x}_0`. + + The default bound is ``Box(0, 0, shape=(self.state_size,))``, i.e. :math:`\mathbf{x}_0 = 0`. + + :return: An appropriate gymnasium Box instance. + """ + return Box(0, 0, shape=(self.state_size,)) + + def xF_bound(self) -> Box: + r"""The bound on the terminal state :math:`\mathbf{x}_F`. + + :return: An appropriate gymnasium Box instance. + """ + return Box(-np.inf, np.inf, shape=(self.state_size,)) + + def x_bound(self) -> Box: + r"""The bound on all other states :math:`\mathbf{x}(t)`. + + :return: An appropriate gymnasium Box instance. + """ + return Box(-np.inf, np.inf, shape=(self.state_size,)) + + def u_bound(self) -> Box: + r"""The bound on the terminal state :math:`\mathbf{u}(t)`. + + :return: An appropriate gymnasium Box instance. + """ + return Box(-np.inf, np.inf, shape=(self.action_size,)) + + def t0_bound(self) -> Box: + r"""The bound on the initial time :math:`\mathbf{t}_0`. + + I have included this bound for completeness: In practice, there is no reason why you should change it + from the default bound is ``Box(0, 0, shape=(1,))``, i.e. :math:`\mathbf{t}_0 = 0`. + + :return: An appropriate gymnasium Box instance. + """ + return Box(0, 0, shape=(1,)) + + def tF_bound(self) -> Box: + r"""The bound on the final time :math:`\mathbf{t}_F`, i.e. when the environment terminates. + + :return: An appropriate gymnasium Box instance. + """ + return Box(-np.inf, np.inf, shape=(1,)) + + def get_cost(self) -> SymbolicQRCost: + """Return the cost. + + This function should return a :class:`SymbolicQRCost` instance. The function must be implemented + since the cost object is used to guess the number of states and actions. Note that you can specify a trivial cost + instance such as :python:`return SymbolicQRCost.zero(n_states, n_actions)`. + + :return: A :class:`SymbolicQRCost` instance representing the models cost function. + """ + raise NotImplementedError("When you implement the model, you must implement the get_cost() function.\nfor instance, use return SymbolicQRCost(Q=np.eye(n), R=np.eye(d))") + + def sym_f(self, x, u, t=None): + """ + The symbolic (``sympy``) version of the dynamics :math:`f(x, u, t)`. This is the main place where you specify + the dynamics when you build a new model. you should look at concrete implementations of models for specifics. + + :param x: A list of symbolic expressions ``['x0', 'x1', ..]`` corresponding to :math:`x` + :param u: A list of symbolic expressions ``['u0', 'u1', ..]`` corresponding to :math:`u` + :param t: A single symbolic expression corresponding to the time :math:`t` (seconds) + :return: A list of symbolic expressions ``[f0, f1, ...]`` of the same length as ``x`` where each element is a coordinate of :math:`f` + """ + raise NotImplementedError("Implement a function which return the environment dynamics f(x,u,t) as a sympy exression") + + def f(self, x, u, t=0) -> np.ndarray: + r"""Evaluate the dynamics. + + This function will evaluate the dynamics. In other words, it will evaluate :math:`\mathbf{f}` in the following expression: + + .. math:: + + \dot{\mathbf{x}} = \mathbf{f}(\mathbf{x}, \mathbf{u}, t) + + :param x: A numpy ndarray corresponding to the state + :param u: A numpy ndarray corresponding to the control + :param t: A :python:`float` corresponding to the time. + :return: The time derivative of the state, :math:`\mathbf{x}(t)`. + """ + return np.asarray( self._f_np(x, u, t) ) + + + def simulate(self, x0, u_fun, t0, tF, N_steps=1000, method='rk4'): + r"""Simulate the effect of a policy on the model. + + By default, it uses Runge-Kutta 4 (RK4) with a fine discretization -- this is slow, but in nearly all cases exact. See (Her25, Algorithm 18) for more information. + + The input argument ``u_fun`` should be a function which returns a list or tuple with same dimension as + ``model.action_space``, :math:`d`. + + :param x0: The initial state of the simulation. Must be a list of floats of same dimension as ``env.observation_space``, :math:`n`. + :param u_fun: Can be either: + - Either a policy function that can be called as ``u_fun(x, t)`` and returns an action ``u`` in the ``action_space`` + - A single action (i.e. a list of floats of same length as the action space). The model will be simulated with a constant action in this case. + :param float t0: Starting time :math:`t_0` + :param float tF: Stopping time :math:`t_F`; the model will be simulated for :math:`t_F - t_0` seconds + :param int N_steps: Steps :math:`N` in the RK4 simulation + :param str method: Simulation method. Either ``'rk4'`` (default) or ``'euler'`` + + The last output argument is the total cost of the trajectory: + + .. math:: + + \int_{t_0}^{t_F} c(x(t), u(t), t) dt + c_f(t_0, t_F, x(t_0), x(t_F) ) + + the integral is approximated using the simulated trajectory. + + :return: + - xs - A numpy ``ndarray`` of dimension :math:`(N+1)\\times n` containing the observations :math:`x` + - us - A numpy ``ndarray`` of dimension :math:`(N+1)\\times d` containing the actions :math:`u` + - ts - A numpy ``ndarray`` of dimension :math:`(N+1)` containing the corresponding times :math:`t` (seconds) + - total_cost - A ``float`` of the total cost of the trajectory. + """ + assert N_steps >= 1, "Simulation should include at least 1 time step." + u_fun = ensure_policy(u_fun) + tt = np.linspace(t0, tF, N_steps+1) # Time grid t_k = tt[k] between t0 and tF. + xs = [ np.asarray(x0) ] + us = [ u_fun(x0, t0 )] + for k in range(N_steps): + Delta = tt[k+1] - tt[k] + tn = tt[k] + xn = xs[k] + un = us[k] # ensure the action u is a vector. + unp = u_fun(xn, tn + Delta) + if method == 'rk4': + r""" Implementation of RK4 here. See: (Her25, Algorithm 18) """ + k1 = np.asarray(self.f(xn, un, tn)) + k2 = np.asarray(self.f(xn + Delta * k1/2, u_fun(xn, tn+Delta/2), tn+Delta/2)) + k3 = np.asarray(self.f(xn + Delta * k2/2, u_fun(xn, tn+Delta/2), tn+Delta/2)) + k4 = np.asarray(self.f(xn + Delta * k3, u_fun(xn, tn + Delta), tn+Delta)) + xnp = xn + 1/6 * Delta * (k1 + 2*k2 + 2*k3 + k4) + elif method == 'euler': + xnp = xn + Delta * np.asarray(self.f(xn, un, tn)) + else: + raise Exception("Bad integration method", method) + xs.append(xnp) + us.append(unp) + xs = np.stack(xs, axis=0) + us = np.stack(us, axis=0) + + # Compute the cost. + cost = self.get_cost() + total_cost = float( sum( [ Delta * cost._private_evaluate_numpy_c(x, u, k) for k, (x, u) in enumerate(zip(xs[:-1], us[:-1]))] ) + cost._private_evaluate_numpy_cf(t0, tF, x0, xs[-1]) ) + return xs, us, tt, total_cost + + @property + def state_size(self): + """ + This field represents the dimensionality of the state-vector :math:`n`. Use it as ``model.state_size`` + :return: Dimensionality of the state vector :math:`x` + """ + return self.get_cost().state_size + + @property + def action_size(self): + """ + This field represents the dimensionality of the action-vector :math:`d`. Use it as ``model.action_size`` + :return: Dimensionality of the action vector :math:`u` + """ + return self.get_cost().action_size + + def render(self, x, render_mode="human"): + """ + Responsible for rendering the state. You don't have to worry about this function. + + :param x: State to render + :param str render_mode: Rendering mode. Select ``"human"`` for a visualization. + :return: Either none or a ``ndarray`` for plotting. + """ + raise NotImplementedError() + + def close(self): + pass + + def phi_x(self, x : list) -> list: + r"""Coordinate transformation of the state when the model is discretized. + + This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is + discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list + of symbols corresponding to the (discrete) coordinates. + + :param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)` + :return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`. + """ + return x + + def phi_x_inv(self, x: list) -> list: + r"""Inverse of coordinate transformation for the state. + + This function should specify the inverse of the coordinate transformation :math:`\Phi_x`, i.e. :math:`\Phi_x^{-1}`. + In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`x(t) = \Phi_x^{-1}(x_k)`. + + :param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}_k` + :return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{x}(t)`. + """ + return x + + def phi_u(self, u: list) -> list: + r"""Coordinate transformation of the action when the model is discretized. + + This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is + discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list + of symbols corresponding to the (discrete) coordinates. + + :param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)` + :return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`. + """ + return u + + def phi_u_inv(self, u: list) -> list: + r"""Inverse of coordinate transformation for the action. + + This function should specify the inverse of the coordinate transformation :math:`\Phi_u`, i.e. :math:`\Phi_u^{-1}`. + In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`u(t) = \Phi_u^{-1}(u_k)`. + + :param x: A list of symbols ``[u0, u1, ..., ud]`` corresponding to :math:`\mathbf{u}_k` + :return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{u}(t)`. + """ + return u + + def __str__(self): + """ + Return a string representation of the model. This is a potentially helpful way to summarize the content of the + model. You can use it as: + + .. runblock:: pycon + + >>> from irlc.ex04.model_pendulum import SinCosPendulumModel + >>> model = SinCosPendulumModel() + >>> print(model) + + :return: A string containing the details of the model. + """ + split = "-"*20 + s = [f"{self.__class__}"] + ['='*50] + s += ["Dynamics:", split] + t = sym.symbols("t") + x = sym.symbols(f"x0:{self.state_size}") + u = sym.symbols(f"u0:{self.action_size}") + + s += [typeset_eq(x, u, self.sym_f(x, u, t) )] + + s += ["Cost:", split, str(self.get_cost())] + + dd = defaultdict(list) + bounds = [ ('x', self.x_bound()), ('x0', self.x0_bound()), ('xF', self.xF_bound()), + ('u', self.u_bound()), + ('t0', self.t0_bound()), ('tF', self.tF_bound())] + + for v, box in bounds: + if (box.low == -np.inf).all() and (box.high == np.inf).all(): + continue + dd['low'].append(box.low_repr) + dd['variable'].append("<= " + v + " <=") + dd['high'].append(box.high_repr) + + if len(dd) > 0: + s += ["Bounds:", split] + s += [tabulate.tabulate(dd, headers='keys')] + else: + s += ['No bounds are applied to the x and u-variables.'] + return "\n".join(s) + + +def symv(s, n): + """ + Returns a vector of symbolic functions. For instance if s='x' and n=3 then it will return + [x0,x1,x2] + where x0,..,x2 are symbolic variables. + """ + return sym.symbols(" ".join(["%s%i," % (s, i) for i in range(n)])) + +def ensure_policy(u): + """ + Ensure u corresponds to a policy function with input arguments u(x, t) + """ + if callable(u): + return lambda x, t: np.asarray(u(x,t)).reshape((-1,)) + else: + return lambda x, t: np.asarray(u).reshape((-1,)) + +def plot_trajectory(x_res, tt, lt='k-', ax=None, labels=None, legend=None): + M = x_res.shape[1] + if labels is None: + labels = [f"x_{i}" for i in range(M)] + + if ax is None: + if M == 2: + a = 234 + if M == 3: + r = 1 + c = 3 + else: + r = 2 if M > 1 else 1 + c = (M + 1) // 2 + + H = 2*r if r > 1 else 3 + W = 6*c + # if M == 2: + # W = 12 + f, ax = plt.subplots(r,c, figsize=(W,H)) + if M == 1: + ax = np.asarray([ax]) + print(M,r,c) + + for i in range(M): + if len(ax) <= i: + print("issue!") + + a = ax.flat[i] + a.plot(tt, x_res[:, i], lt, label=legend) + + a.set_xlabel("Time/seconds") + a.set_ylabel(labels[i]) + # a.set_title(labels[i]) + a.grid(True) + if legend is not None and i == 0: + a.legend() + # if i == M: + plt.tight_layout() + return ax + +def make_space_above(axes, topmargin=1.0): + """ increase figure size to make topmargin (in inches) space for + titles, without changing the axes sizes""" + fig = axes.flatten()[0].figure + s = fig.subplotpars + w, h = fig.get_size_inches() + + figh = h - (1-s.top)*h + topmargin + fig.subplots_adjust(bottom=s.bottom*h/figh, top=1-topmargin/figh) + fig.set_figheight(figh) + +def typeset_eq(x, u, f): + def ascii_vector(ls): + ml = max(map(len, ls)) + ls = [" " * (ml - len(s)) + s for s in ls] + ls = ["[" + s + "]" for s in ls] + return "\n".join(ls) + + v = [str(z) for z in f] + + def cstack(ls: list): + # ls = [l.splitlines() for l in ls] + height = max([len(l) for l in ls]) + widths = [len(l[0]) for l in ls] + + for k in range(len(ls)): + missing2 = (height - len(ls[k])) // 2 + missing1 = (height - len(ls[k]) - missing2) + tpad = [" " * widths[k]] * missing1 + bpad = [" " * widths[k]] * missing2 + ls[k] = tpad + ls[k] + bpad + + r = [""] * len(ls[0]) + for w in range(len(ls)): + for h in range(len(ls[0])): + r[h] += ls[w][h] + + return r + + xx = [str(x) for x in x] + uu = [str(u) for u in u] + xx = ascii_vector(xx).splitlines() + uu = ascii_vector(uu).splitlines() + cm = cstack([xx, [", "], uu]) + eq = cstack([["f("], cm, [")"]]) + eq = cstack([[" "], eq, [" = "], ascii_vector(v).splitlines()]) + return "\n".join(eq) diff --git a/irlc/ex03/inventory_evaluation.py b/irlc/ex03/inventory_evaluation.py new file mode 100644 index 0000000..c5d7eda --- /dev/null +++ b/irlc/ex03/inventory_evaluation.py @@ -0,0 +1,26 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex02.inventory import InventoryDPModel + +def a_expected_items_next_day(x : int, u : int) -> float: + model = InventoryDPModel() + expected_number_of_items = None + # TODO: Code has been removed from here. + raise NotImplementedError("Insert your solution and remove this error.") + return expected_number_of_items + + +def b_evaluate_policy(pi : list, x0 : int) -> float: + # TODO: Code has been removed from here. + raise NotImplementedError("Insert your solution and remove this error.") + return J_pi_x0 + +if __name__ == "__main__": + model = InventoryDPModel() + # Create a policy that always buy an item if the inventory is empty. + pi = [{s: 1 if s == 0 else 0 for s in model.S(k)} for k in range(model.N)] + x = 0 + u = 1 + x0 = 1 + a_expected_items_next_day(x=0, u=1) + print(f"Given inventory is {x=} and we buy {u=}, the expected items on day k=1 is {a_expected_items_next_day(x, u)} and should be 0.1") + print(f"Evaluation of policy is {b_evaluate_policy(pi, x0)} and should be 2.7") diff --git a/irlc/ex03/kuramoto.py b/irlc/ex03/kuramoto.py new file mode 100644 index 0000000..e38b195 --- /dev/null +++ b/irlc/ex03/kuramoto.py @@ -0,0 +1,123 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025. +""" +import sympy as sym +from irlc.ex03.control_model import ControlModel +from irlc.ex03.control_cost import SymbolicQRCost +import numpy as np +from irlc import savepdf +from gymnasium.spaces import Box + + +class KuramotoModel(ControlModel): + r""" + The Kuramoto model. It implements the following dynamics: + + .. math:: + + \dot{x}(t) = u(t) +\cos(x(t)) + + I.e. the state and control variables are both one-dimensional. The cost function is simply: + + .. math:: + + c(t) = \frac{1}{2}x(t)^2 + \frac{1}{2}u(t)^2 + + This is a QR cost with :math:`Q=R=1`. + """ + def u_bound(self) -> Box: + return Box(-2, 2, shape=(1,)) + + def x0_bound(self) -> Box: + return Box(0, 0, shape=(1,)) + + def get_cost(self) -> SymbolicQRCost: + """ + Create a cost-object. The code defines a quadratic cost (with the given matrices) and allows easy computation + of derivatives, etc. There are automatic ways to discretize the cost so you don't have to bother with that. + See the online documentation for further details. + """ + return SymbolicQRCost(Q=np.zeros((1, 1)), R=np.ones((1,1))) + + def sym_f(self, x: list, u: list, t=None): + r""" Return a symbolic expression representing the Kuramoto model. + The inputs x, u are themselves *lists* of symbolic variables (insert breakpoint and check their value). + you have to use them to create a symbolic object representing f, and return it as a list. That is, you are going to return + + .. codeblock:: python + + return [f_val] + + where ``f_val`` is the symbolic expression corresponding to the dynamics, i.e. :math:`u(t) + \cos( x(t))`. + Note you can use trigonometric functions like ``sym.cos``. + """ + # TODO: 1 lines missing. + raise NotImplementedError("Implement symbolic expression as a singleton list here") + # define the symbolic expression + return symbolic_f_list + + +def f(x, u): + """ Implement the kuramoto osscilator model's dynamics, i.e. f such that dx/dt = f(x,u). + The answer should be returned as a singleton list. """ + cmodel = KuramotoModel() + # TODO: 1 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + # Use the ContiniousKuramotoModel to compute f(x,u). If in doubt, insert a breakpoint and let pycharms autocomplete + # guide you. See my video to Exercise 2 for how to use the debugger. Don't forget to specify t (for instance t=0). + # Note that sympys error messages can be a bit unforgiving. + return f_value + +def rk4_simulate(x0, u, t0, tF, N=1000): + """ + Implement the RK4 algorithm (Her25, Algorithm 18). + In this function, x0 and u are constant numpy ndarrays. I.e. u is not a function, which simplify the RK4 + algorithm a bit. + + The function you want to integrate, f, is already defined above. You can likewise assume f is not a function of + time. t0 and tF play the same role as in the algorithm. + + The function should return a numpy ndarray xs of dimension (N,) (containing all the x-values) and a numpy ndarray + tt containing the corresponding time points. + + Hints: + * Call f as in f(x, u). You defined f earlier in this exercise. + """ + tt = np.linspace(t0, tF, N+1) # Time grid t_k = tt[k] between t0 and tF. + xs = [ x0 ] + f(x0, u) # This is how you can call f. + for k in range(N): + x_next = None # Obtain x_next = x_{k+1} using a single RK4 step. + # Remember to insert breakpoints and use the console to examine what the various variables are. + # TODO: 7 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + xs.append(x_next) + xs = np.stack(xs, axis=0) + return xs, tt + +if __name__ == "__main__": + # Create a symbolic model corresponding to the Kuramoto model: + # Evaluate the dynamics dx / dt = f(x, u). + + print("Value of f(x,u) in x=2, u=0.3", f([2], [0.3])) + print("Value of f(x,u) in x=0, u=1", f([0], [1])) + + cmodel = KuramotoModel() + print(cmodel) + x0 = cmodel.x0_bound().low # Get the starting state x0. We exploit that the bound on x0 is an equality constraint. + u = 1.3 + xs, ts = rk4_simulate(x0, [u], t0=0, tF=20, N=100) + xs_true, us_true, ts_true, cost = cmodel.simulate(x0, u_fun=u, t0=0, tF=20, N_steps=100) + """You should generally use cmodel.simulate(...) to simulate the environment. Note that u_fun in the simulate + function can be set to a constant. Use this compute numpy ndarrays corresponding to the time, x and u values. + """ + # Plot the exact simulation of the environment + import matplotlib.pyplot as plt + plt.plot(ts_true, xs_true, 'k.-', label='RK4 state sequence x(t) (using model.simulate)') + plt.plot(ts, xs, 'r-', label='RK4 state sequence x(t) (using your code)') + plt.legend() + #savepdf('kuramoto_rk4') + plt.show(block=False) diff --git a/irlc/ex03/toy_2d_control.py b/irlc/ex03/toy_2d_control.py new file mode 100644 index 0000000..187dd03 --- /dev/null +++ b/irlc/ex03/toy_2d_control.py @@ -0,0 +1,23 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import sympy as sym +from irlc.ex03.control_model import ControlModel +from irlc.ex03.control_cost import SymbolicQRCost +import numpy as np + +class Toy2DControl(ControlModel): + def get_cost(self): + # You get the cost-function for free because it can be anything as far as this problem is concerned. + return SymbolicQRCost(Q=np.eye(2), R=np.eye(1)) + + # TODO: 2 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + +def toy_simulation(u0 : float, T : float) -> float: + # TODO: 4 lines missing. + raise NotImplementedError("Create a Toy2dControl instance and use model.simulate(..) to get the final state.") + return wT + +if __name__ == "__main__": + x0 = np.asarray([np.pi/2, 0]) + wT = toy_simulation(u0=0.4, T=5) + print(f"Starting in x0=[pi/2, 0], after T=5 seconds the system is an an angle {wT=} (should be 1.265)") -- GitLab