Skip to content
Snippets Groups Projects
Commit b7cbf41a authored by tuhe's avatar tuhe
Browse files

Exercise 03

parent 62809ce7
No related branches found
No related tags found
No related merge requests found
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 3."""
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import sympy as sym
import numpy as np
from irlc.ex03.control_model import ControlModel
from irlc.ex03.control_cost import SymbolicQRCost
from gymnasium.spaces import Box
class BasicPendulumModel(ControlModel):
def sym_f(self, x, u, t=None):
g = 9.82
l = 1
m = 2
theta_dot = x[1] # Parameterization: x = [theta, theta']
theta_dot_dot = g / l * sym.sin(x[0]) + 1 / (m * l ** 2) * u[0]
return [theta_dot, theta_dot_dot]
def get_cost(self) -> SymbolicQRCost:
return SymbolicQRCost(Q=np.eye(2), R=np.eye(1))
def u_bound(self) -> Box:
return Box(np.asarray([-10]), np.asarray([10]), dtype=float)
def x0_bound(self) -> Box:
return Box(np.asarray( [np.pi, 0] ), np.asarray( [np.pi, 0]), dtype=float)
if __name__ == "__main__":
p = BasicPendulumModel()
print(p)
from irlc.ex04.discrete_control_model import DiscreteControlModel
model = BasicPendulumModel()
discrete_pendulum = DiscreteControlModel(model, dt=0.5) # Using a discretization time step: 0.5 seconds.
x0 = model.x0_bound().low # Get the initial state: x0 = [np.pi, 0].
u0 = [0] # No action. Note the action must be a list.
x1 = discrete_pendulum.f(x0, u0)
print(x1)
print("Now, lets compute the Euler step manually to confirm")
x1_manual = x0 + 0.5 * model.f(x0, u0, 0)
print(x1_manual)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
"""
import sympy as sym
import numpy as np
def mat(x): # Helper function.
return sym.Matrix(x) if x is not None else x
class SymbolicQRCost:
r"""
This class represents the cost function for a continuous-time model. In the simulations, we are going to assume
that the cost function takes the form:
.. math::
\int_{t_0}^{t_F} c(x(t), u(t)) dt + c_F(x_F)
And this class will specifically implement the two functions :math:`c` and :math:`c_F`. They will be assumed to have the quadratic form:
.. math::
c(x, u) & = \frac{1}{2} x^T Q x + \frac{1}{2} u^T R u + u^T H x + q^T x + r^T u + q_0, \\
c_F(x_F) & = \frac{1}{2} x_F^T Q_F x_F + q_F^T x_F + q_{0,F}.
So what all of this boils down to is that the class just need to store a bunch of matrices and vectors.
You can add and scale cost-functions
**********************************************************
A slightly smart thing about the cost functions are that you can add and scale them. The following provides an
example:
.. runblock:: pycon
>>> from irlc.ex03.control_cost import SymbolicQRCost
>>> import numpy as np
>>> cost1 = SymbolicQRCost(np.eye(2), np.zeros(1) ) # Set Q = I, R = 0
>>> cost2 = SymbolicQRCost(np.ones((2,2)), np.zeros(1) ) # Set Q = 2x2 matrices of 1's, R = 0
>>> print(cost1.Q) # Will be the identity matrix.
>>> cost = cost1 * 3 + cost2 * 2
>>> print(cost.Q) # Will be 3 x I + 2
"""
def __init__(self, Q, R, q=None, qc=None, r=None, H=None, QN=None, qN=None, qcN=None):
"""
The constructor can be used to manually create a cost function. You will rarely want to call the constructor
directly but instead use the helper methods (see class documentation).
What the class basically does is that it stores the input parameters as fields. In other words, you can access the quadratic
term of the cost function, :math:`\\frac{1}{2}x^T Q x`, as ``cost.Q``.
:param Q: The matrix :math:`Q`
:param R: The matrix :math:`R`
:param q: The vector :math:`q`
:param qc: The constant :math:`q_0`
:param r: The vector :math:`r`
:param H: The matrix :math:`H`
:param QN: The terminal cost matrix :math:`Q_N`
:param qN: The terminal cost vector :math:`q_N`
:param qcN: The terminal cost constant :math:`q_{0,N}`
"""
n = Q.shape[0]
d = R.shape[0]
self.Q = Q
self.R = R
self.q = np.zeros( (n,)) if q is None else q
self.qc = 0 if qc == None else qc
self.r = np.zeros( (d,)) if r is None else r
self.H = np.zeros((d,n)) if H is None else H
self.QN = np.zeros((n,n)) if QN is None else QN
self.qN = np.zeros((n,)) if qN is None else qN
self.qcN = 0 if qcN == None else qcN
self.flds = ('Q', 'R', 'q', 'qc', 'r', 'H', 'QN', 'qN', 'qcN')
self.flds_term = ('QN', 'qN', 'qcN')
# self.c_numpy = None
# self.cF_numpy = None
@classmethod
def zero(cls, state_size, action_size):
"""
Creates an all-zero cost function, i.e. all terms :math:`Q`, :math:`R` are set to zer0.
.. runblock:: pycon
>>> from irlc.ex03.control_cost import SymbolicQRCost
>>> cost = SymbolicQRCost.zero(2, 1)
>>> cost.Q # 2x2 zero matrix
>>> cost.R # 1x1 zero matrix.
:param state_size: Dimension of the state vector :math:`n`
:param action_size: Dimension of the action vector :math:`d`
:return: A ``SymbolicQRCost`` with all zero terms.
"""
return cls(Q=np.zeros( (state_size,state_size)), R=np.zeros((action_size,action_size)) )
def sym_c(self, x, u, t=None):
"""
Evaluate the (instantaneous) part of the function :math:`c(x,u, t)`. An example:
.. runblock:: pycon
>>> from irlc.ex03.control_cost import SymbolicQRCost
>>> import numpy as np
>>> cost = SymbolicQRCost(np.eye(2), np.eye(1)) # Set Q = I, R = 0
>>> cost.sym_c(x = np.asarray([1,2]), u=np.asarray([0])) # should return 0.5 * x^T Q x = 0.5 * (1 + 4)
:param x: The state :math:`x(t)`
:param u: The action :math:`u(t)`
:param t: The current time step :math:`t` (this will be ignored)
:return: A ``sympy`` symbolic expression corresponding to the instantaneous cost.
"""
u = sym.Matrix(u)
x = sym.Matrix(x)
c = 1 / 2 * (x.transpose() @ self.Q @ x) + 1 / 2 * (u.transpose() @ self.R @ u) + u.transpose() @ self.H @ x + sym.Matrix(self.q).transpose() @ x + sym.Matrix(self.r).transpose() @ u + sym.Matrix([[self.qc]])
assert c.shape == (1,1)
return c[0,0]
def sym_cf(self, t0, tF, x0, xF):
"""
Evaluate the terminal (constant) term in the cost function :math:`c_F(t_0, t_F, x_0, x_F)`. An example:
.. runblock:: pycon
>>> from irlc.ex03.control_cost import SymbolicQRCost
>>> import numpy as np
>>> cost = SymbolicQRCost(np.eye(2), np.zeros(1), QN=np.eye(2)) # Set Q = I, R = 0
>>> cost.sym_cf(0, 0, 0, xF=2*np.ones((2,))) # should return 0.5 * xF^T * xF = 0.5 * 8
:param t0: Starting time :math:`t_0` (not used)
:param tF: Stopping time :math:`t_F` (not used)
:param x0: Initial state :math:`x_0` (not used)
:param xF: Termi lanstate :math:`x_F` (**this one is used**)
:return: A ``sympy`` symbolic expression corresponding to the terminal cost.
"""
xF = sym.Matrix(xF)
c = 0.5 * xF.transpose() @ self.QN @ xF + xF.transpose() @ sym.Matrix(self.qN) + sym.Matrix([[self.qcN]])
assert c.shape == (1,1)
return c[0,0]
def discretize(self, dt):
r"""
Discretize the cost function so it is suitable for a discrete control problem. See (Her25, Subsection 13.1.5) for more information.
:param dt: The discretization time step :math:`\Delta`
:return: An :class:`~irlc.ex04.cost_discrete.DiscreteQRCost` instance corresponding to a discretized version of this cost function.
"""
from irlc.ex04.discrete_control_cost import DiscreteQRCost
return DiscreteQRCost(**{f: self.__getattribute__(f) * (1 if f in self.flds_term else dt) for f in self.flds} )
def __add__(self, c):
return SymbolicQRCost(**{k: self.__dict__[k] + c.__dict__[k] for k in self.flds})
def __mul__(self, c):
return SymbolicQRCost(**{k: self.__dict__[k] * c for k in self.flds})
def __str__(self):
title = "Continuous-time cost function"
label1 = "Non-zero terms in c(x, u)"
label2 = "Non-zero terms in c_F(x)"
terms1 = [s for s in self.flds if s not in self.flds_term]
terms2 = self.flds_term
return _repr_cost(self, title, label1, label2, terms1, terms2)
def goal_seeking_terminal_cost(self, xF_target, QF=None):
"""
Create a cost function which is minimal when the terminal state :math:`x_F` is equal to a goal state :math:`x_F^*`.
Concretely, it will return a cost function of the form
.. math::
c_F(x_F) = \\frac{1}{2} (x_F^* - x_F)^\\top Q_F (x_F^* - x_F)
.. runblock:: pycon
>>> from irlc.ex03.control_cost import SymbolicQRCost
>>> import numpy as np
>>> cost = SymbolicQRCost.zero(2, 1)
>>> cost += cost.goal_seeking_terminal_cost(xF_target=np.ones((2,)))
>>> print(cost.qN)
>>> print(cost)
:param xF_target: Target state :math:`x_F^*`
:param QF: Cost matrix :math:`Q_F`
:return: A ``SymbolicQRCost`` object corresponding to the goal-seeking cost function
"""
if QF is None:
QF = np.eye(xF_target.size)
QF, qN, qcN = targ2matrices(xF_target, Q=QF)
return SymbolicQRCost(Q=self.Q*0, R=self.R*0, QN=QF, qN=qN, qcN=qcN)
def goal_seeking_cost(self, x_target, Q=None):
"""
Create a cost function which is minimal when the state :math:`x` is equal to a goal state :math:`x^*`.
Concretely, it will return a cost function of the form
.. math::
c(x, u) = \\frac{1}{2} (x^* - x)^\\top Q (x^* - x)
.. runblock:: pycon
>>> from irlc.ex03.control_cost import SymbolicQRCost
>>> import numpy as np
>>> cost = SymbolicQRCost.zero(2, 1)
>>> cost += cost.goal_seeking_cost(x_target=np.ones((2,)))
>>> print(cost.q)
>>> print(cost)
:param x_target: Target state :math:`x^*`
:param Q: Cost matrix :math:`Q`
:return: A ``SymbolicQRCost`` object corresponding to the goal-seeking cost function
"""
if Q is None:
Q = np.eye(x_target.size)
Q, q, qc = targ2matrices(x_target, Q=Q)
return SymbolicQRCost(Q=Q, R=self.R*0, q=q, qc=qc)
def term(self, Q=None, R=None,r=None):
dd = {}
lc = locals()
for f in self.flds:
if f in lc and lc[f] is not None:
dd[f] = lc[f]
else:
dd[f] = self.__getattribute__(f)*0
return SymbolicQRCost(**dd)
@property
def state_size(self):
return self.Q.shape[0]
@property
def action_size(self):
return self.R.shape[0]
def _private_evaluate_numpy_c(self, x, u, t):
if not hasattr(self, '_np_c') or self._np_c is None:
sx = sym.symbols(f"x0:{self.state_size}")
su = sym.symbols(f"u0:{self.action_size}")
st = sym.symbols('t')
self._np_c = sym.lambdify( (sx,su,st), self.sym_c(sx, su, st) )
return float( self._np_c(x, u, t) )
def _private_evaluate_numpy_cf(self, t0, tF, x0, xF):
if not hasattr(self, '_np_cF') or self._np_cF is None:
sx0 = sym.symbols(f"x0:{self.state_size}")
sxF = sym.symbols(f"x0:{self.state_size}")
st0, stF = sym.symbols('t0 tF')
self._np_cF = sym.lambdify( (st0, stF, sx0, sxF), self.sym_cf(st0, stF, sx0, sxF) )
return float( self._np_cF(t0, tF, x0, xF) )
def _repr_cost(cost, title, label1, label2, terms1, terms2):
self = cost
def _get(flds, label):
d = {s: self.__dict__[s] for s in flds if np.sum(np.sum(self.__dict__[s] != 0)) != 0}
out = ""
if len(d) > 0:
# out = ""
out += f"> {label}:\n"
for s, m in d.items():
mm = f"{m}"
if len(mm.splitlines()) > 1:
mm = "\n" + mm
out += f" * {s} = {mm}\n"
return d, out
nz_c, o1 = _get([s for s in terms1], label1)
out = ""
out += f"{title}:\n"
out += o1
nz_term, o2 = _get(terms2, label2)
out += o2
if len(nz_c) + len(nz_term) == 0:
print("All terms in the cost-function are zero.")
return out
def targ2matrices(t, Q=None): # Helper function
"""
Given a target vector :math:`t` and a matrix :math:`Q` this function returns cost-matrices suitable for implementing:
.. math::
\\frac{1}{2} * (x - t)^Q (x - t) = \\frac{1}{2} * x^T Q x + 1/2 * t^T * t - x * t
:param t:
:param Q:
:return:
"""
n = t.size
if Q is None:
Q = np.eye(n)
return Q, -1/2 * (Q @ t + t @ Q.T), 1/2 * t @ Q @ t
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
"""
from collections import defaultdict
import tabulate
import sympy as sym
import numpy as np
import matplotlib.pyplot as plt
from gymnasium.spaces import Box
from irlc.ex03.control_cost import SymbolicQRCost
class ControlModel:
r"""Represents the continious time model of a control environment.
See (Her25, Section 13.2) for a top-level description.
The model represents the physical system we are simulating and can be considered a control-equivalent of the
:class:`irlc.ex02.dp_model.DPModel`. The class must keep track of the following:
.. math::
\frac{dx}{dt} = f(x, u, t)
And the cost-function which is defined as an integral
.. math::
c_F(t_0, t_F, x(t_0), x(t_F)) + \int_{t_0}^{t_F} c(t, x, u) dt
as well as constraints and boundary conditions on :math:`x`, :math:`u` and the initial conditions state :math:`x(t_0)`.
this course, the cost function will always be quadratic, and can be accessed as ``model.get_cost``.
If you want to implement your own model, the best approach is to start with an existing model and modify it for
your needs. The overall idea is that you implement the dynamics,``sym_f``, and the cost function ``get_cost``,
and optionally define bounds as needed.
"""
state_labels = None # Labels (as lists) used for visualizations.
action_labels = None # Labels (as lists) used for visualizations.
def __init__(self):
"""
The cost must be an instance of :class:`irlc.ex04.cost_continuous.SymbolicQRCost`.
Bounds is a dictionary but otherwise optional; the model should give it a default value.
:param cost: A quadratic cost function
:param dict bounds: A dictionary of boundary constraints.
"""
if self.state_labels is None:
self.state_labels = [f'x{i}' for i in range(self.state_size)]
if self.action_labels is None:
self.action_labels = [f'u{i}' for i in range(self.action_size)]
t = sym.symbols("t")
x = sym.symbols(f"x0:{self.state_size}")
u = sym.symbols(f"u0:{self.action_size}")
try:
f = self.sym_f(x, u, t)
except Exception as e:
print("control_model.py> There is a problem with the way you have specified the dynamics. The function sym_f must accept lists as inputs")
raise e
if len(f) != len(x):
print("control_model.py> Your function ControlModel.sym_f must output a list of symbolic expressions.")
assert len(f) == len(x)
self._f_np = sym.lambdify((x, u, t), self.sym_f(x, u, t))
def x0_bound(self) -> Box:
r"""The bound on the initial state :math:`\mathbf{x}_0`.
The default bound is ``Box(0, 0, shape=(self.state_size,))``, i.e. :math:`\mathbf{x}_0 = 0`.
:return: An appropriate gymnasium Box instance.
"""
return Box(0, 0, shape=(self.state_size,))
def xF_bound(self) -> Box:
r"""The bound on the terminal state :math:`\mathbf{x}_F`.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(self.state_size,))
def x_bound(self) -> Box:
r"""The bound on all other states :math:`\mathbf{x}(t)`.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(self.state_size,))
def u_bound(self) -> Box:
r"""The bound on the terminal state :math:`\mathbf{u}(t)`.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(self.action_size,))
def t0_bound(self) -> Box:
r"""The bound on the initial time :math:`\mathbf{t}_0`.
I have included this bound for completeness: In practice, there is no reason why you should change it
from the default bound is ``Box(0, 0, shape=(1,))``, i.e. :math:`\mathbf{t}_0 = 0`.
:return: An appropriate gymnasium Box instance.
"""
return Box(0, 0, shape=(1,))
def tF_bound(self) -> Box:
r"""The bound on the final time :math:`\mathbf{t}_F`, i.e. when the environment terminates.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(1,))
def get_cost(self) -> SymbolicQRCost:
"""Return the cost.
This function should return a :class:`SymbolicQRCost` instance. The function must be implemented
since the cost object is used to guess the number of states and actions. Note that you can specify a trivial cost
instance such as :python:`return SymbolicQRCost.zero(n_states, n_actions)`.
:return: A :class:`SymbolicQRCost` instance representing the models cost function.
"""
raise NotImplementedError("When you implement the model, you must implement the get_cost() function.\nfor instance, use return SymbolicQRCost(Q=np.eye(n), R=np.eye(d))")
def sym_f(self, x, u, t=None):
"""
The symbolic (``sympy``) version of the dynamics :math:`f(x, u, t)`. This is the main place where you specify
the dynamics when you build a new model. you should look at concrete implementations of models for specifics.
:param x: A list of symbolic expressions ``['x0', 'x1', ..]`` corresponding to :math:`x`
:param u: A list of symbolic expressions ``['u0', 'u1', ..]`` corresponding to :math:`u`
:param t: A single symbolic expression corresponding to the time :math:`t` (seconds)
:return: A list of symbolic expressions ``[f0, f1, ...]`` of the same length as ``x`` where each element is a coordinate of :math:`f`
"""
raise NotImplementedError("Implement a function which return the environment dynamics f(x,u,t) as a sympy exression")
def f(self, x, u, t=0) -> np.ndarray:
r"""Evaluate the dynamics.
This function will evaluate the dynamics. In other words, it will evaluate :math:`\mathbf{f}` in the following expression:
.. math::
\dot{\mathbf{x}} = \mathbf{f}(\mathbf{x}, \mathbf{u}, t)
:param x: A numpy ndarray corresponding to the state
:param u: A numpy ndarray corresponding to the control
:param t: A :python:`float` corresponding to the time.
:return: The time derivative of the state, :math:`\mathbf{x}(t)`.
"""
return np.asarray( self._f_np(x, u, t) )
def simulate(self, x0, u_fun, t0, tF, N_steps=1000, method='rk4'):
r"""Simulate the effect of a policy on the model.
By default, it uses Runge-Kutta 4 (RK4) with a fine discretization -- this is slow, but in nearly all cases exact. See (Her25, Algorithm 18) for more information.
The input argument ``u_fun`` should be a function which returns a list or tuple with same dimension as
``model.action_space``, :math:`d`.
:param x0: The initial state of the simulation. Must be a list of floats of same dimension as ``env.observation_space``, :math:`n`.
:param u_fun: Can be either:
- Either a policy function that can be called as ``u_fun(x, t)`` and returns an action ``u`` in the ``action_space``
- A single action (i.e. a list of floats of same length as the action space). The model will be simulated with a constant action in this case.
:param float t0: Starting time :math:`t_0`
:param float tF: Stopping time :math:`t_F`; the model will be simulated for :math:`t_F - t_0` seconds
:param int N_steps: Steps :math:`N` in the RK4 simulation
:param str method: Simulation method. Either ``'rk4'`` (default) or ``'euler'``
The last output argument is the total cost of the trajectory:
.. math::
\int_{t_0}^{t_F} c(x(t), u(t), t) dt + c_f(t_0, t_F, x(t_0), x(t_F) )
the integral is approximated using the simulated trajectory.
:return:
- xs - A numpy ``ndarray`` of dimension :math:`(N+1)\\times n` containing the observations :math:`x`
- us - A numpy ``ndarray`` of dimension :math:`(N+1)\\times d` containing the actions :math:`u`
- ts - A numpy ``ndarray`` of dimension :math:`(N+1)` containing the corresponding times :math:`t` (seconds)
- total_cost - A ``float`` of the total cost of the trajectory.
"""
assert N_steps >= 1, "Simulation should include at least 1 time step."
u_fun = ensure_policy(u_fun)
tt = np.linspace(t0, tF, N_steps+1) # Time grid t_k = tt[k] between t0 and tF.
xs = [ np.asarray(x0) ]
us = [ u_fun(x0, t0 )]
for k in range(N_steps):
Delta = tt[k+1] - tt[k]
tn = tt[k]
xn = xs[k]
un = us[k] # ensure the action u is a vector.
unp = u_fun(xn, tn + Delta)
if method == 'rk4':
r""" Implementation of RK4 here. See: (Her25, Algorithm 18) """
k1 = np.asarray(self.f(xn, un, tn))
k2 = np.asarray(self.f(xn + Delta * k1/2, u_fun(xn, tn+Delta/2), tn+Delta/2))
k3 = np.asarray(self.f(xn + Delta * k2/2, u_fun(xn, tn+Delta/2), tn+Delta/2))
k4 = np.asarray(self.f(xn + Delta * k3, u_fun(xn, tn + Delta), tn+Delta))
xnp = xn + 1/6 * Delta * (k1 + 2*k2 + 2*k3 + k4)
elif method == 'euler':
xnp = xn + Delta * np.asarray(self.f(xn, un, tn))
else:
raise Exception("Bad integration method", method)
xs.append(xnp)
us.append(unp)
xs = np.stack(xs, axis=0)
us = np.stack(us, axis=0)
# Compute the cost.
cost = self.get_cost()
total_cost = float( sum( [ Delta * cost._private_evaluate_numpy_c(x, u, k) for k, (x, u) in enumerate(zip(xs[:-1], us[:-1]))] ) + cost._private_evaluate_numpy_cf(t0, tF, x0, xs[-1]) )
return xs, us, tt, total_cost
@property
def state_size(self):
"""
This field represents the dimensionality of the state-vector :math:`n`. Use it as ``model.state_size``
:return: Dimensionality of the state vector :math:`x`
"""
return self.get_cost().state_size
@property
def action_size(self):
"""
This field represents the dimensionality of the action-vector :math:`d`. Use it as ``model.action_size``
:return: Dimensionality of the action vector :math:`u`
"""
return self.get_cost().action_size
def render(self, x, render_mode="human"):
"""
Responsible for rendering the state. You don't have to worry about this function.
:param x: State to render
:param str render_mode: Rendering mode. Select ``"human"`` for a visualization.
:return: Either none or a ``ndarray`` for plotting.
"""
raise NotImplementedError()
def close(self):
pass
def phi_x(self, x : list) -> list:
r"""Coordinate transformation of the state when the model is discretized.
This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is
discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list
of symbols corresponding to the (discrete) coordinates.
:param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)`
:return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`.
"""
return x
def phi_x_inv(self, x: list) -> list:
r"""Inverse of coordinate transformation for the state.
This function should specify the inverse of the coordinate transformation :math:`\Phi_x`, i.e. :math:`\Phi_x^{-1}`.
In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`x(t) = \Phi_x^{-1}(x_k)`.
:param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}_k`
:return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{x}(t)`.
"""
return x
def phi_u(self, u: list) -> list:
r"""Coordinate transformation of the action when the model is discretized.
This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is
discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list
of symbols corresponding to the (discrete) coordinates.
:param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)`
:return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`.
"""
return u
def phi_u_inv(self, u: list) -> list:
r"""Inverse of coordinate transformation for the action.
This function should specify the inverse of the coordinate transformation :math:`\Phi_u`, i.e. :math:`\Phi_u^{-1}`.
In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`u(t) = \Phi_u^{-1}(u_k)`.
:param x: A list of symbols ``[u0, u1, ..., ud]`` corresponding to :math:`\mathbf{u}_k`
:return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{u}(t)`.
"""
return u
def __str__(self):
"""
Return a string representation of the model. This is a potentially helpful way to summarize the content of the
model. You can use it as:
.. runblock:: pycon
>>> from irlc.ex04.model_pendulum import SinCosPendulumModel
>>> model = SinCosPendulumModel()
>>> print(model)
:return: A string containing the details of the model.
"""
split = "-"*20
s = [f"{self.__class__}"] + ['='*50]
s += ["Dynamics:", split]
t = sym.symbols("t")
x = sym.symbols(f"x0:{self.state_size}")
u = sym.symbols(f"u0:{self.action_size}")
s += [typeset_eq(x, u, self.sym_f(x, u, t) )]
s += ["Cost:", split, str(self.get_cost())]
dd = defaultdict(list)
bounds = [ ('x', self.x_bound()), ('x0', self.x0_bound()), ('xF', self.xF_bound()),
('u', self.u_bound()),
('t0', self.t0_bound()), ('tF', self.tF_bound())]
for v, box in bounds:
if (box.low == -np.inf).all() and (box.high == np.inf).all():
continue
dd['low'].append(box.low_repr)
dd['variable'].append("<= " + v + " <=")
dd['high'].append(box.high_repr)
if len(dd) > 0:
s += ["Bounds:", split]
s += [tabulate.tabulate(dd, headers='keys')]
else:
s += ['No bounds are applied to the x and u-variables.']
return "\n".join(s)
def symv(s, n):
"""
Returns a vector of symbolic functions. For instance if s='x' and n=3 then it will return
[x0,x1,x2]
where x0,..,x2 are symbolic variables.
"""
return sym.symbols(" ".join(["%s%i," % (s, i) for i in range(n)]))
def ensure_policy(u):
"""
Ensure u corresponds to a policy function with input arguments u(x, t)
"""
if callable(u):
return lambda x, t: np.asarray(u(x,t)).reshape((-1,))
else:
return lambda x, t: np.asarray(u).reshape((-1,))
def plot_trajectory(x_res, tt, lt='k-', ax=None, labels=None, legend=None):
M = x_res.shape[1]
if labels is None:
labels = [f"x_{i}" for i in range(M)]
if ax is None:
if M == 2:
a = 234
if M == 3:
r = 1
c = 3
else:
r = 2 if M > 1 else 1
c = (M + 1) // 2
H = 2*r if r > 1 else 3
W = 6*c
# if M == 2:
# W = 12
f, ax = plt.subplots(r,c, figsize=(W,H))
if M == 1:
ax = np.asarray([ax])
print(M,r,c)
for i in range(M):
if len(ax) <= i:
print("issue!")
a = ax.flat[i]
a.plot(tt, x_res[:, i], lt, label=legend)
a.set_xlabel("Time/seconds")
a.set_ylabel(labels[i])
# a.set_title(labels[i])
a.grid(True)
if legend is not None and i == 0:
a.legend()
# if i == M:
plt.tight_layout()
return ax
def make_space_above(axes, topmargin=1.0):
""" increase figure size to make topmargin (in inches) space for
titles, without changing the axes sizes"""
fig = axes.flatten()[0].figure
s = fig.subplotpars
w, h = fig.get_size_inches()
figh = h - (1-s.top)*h + topmargin
fig.subplots_adjust(bottom=s.bottom*h/figh, top=1-topmargin/figh)
fig.set_figheight(figh)
def typeset_eq(x, u, f):
def ascii_vector(ls):
ml = max(map(len, ls))
ls = [" " * (ml - len(s)) + s for s in ls]
ls = ["[" + s + "]" for s in ls]
return "\n".join(ls)
v = [str(z) for z in f]
def cstack(ls: list):
# ls = [l.splitlines() for l in ls]
height = max([len(l) for l in ls])
widths = [len(l[0]) for l in ls]
for k in range(len(ls)):
missing2 = (height - len(ls[k])) // 2
missing1 = (height - len(ls[k]) - missing2)
tpad = [" " * widths[k]] * missing1
bpad = [" " * widths[k]] * missing2
ls[k] = tpad + ls[k] + bpad
r = [""] * len(ls[0])
for w in range(len(ls)):
for h in range(len(ls[0])):
r[h] += ls[w][h]
return r
xx = [str(x) for x in x]
uu = [str(u) for u in u]
xx = ascii_vector(xx).splitlines()
uu = ascii_vector(uu).splitlines()
cm = cstack([xx, [", "], uu])
eq = cstack([["f("], cm, [")"]])
eq = cstack([[" "], eq, [" = "], ascii_vector(v).splitlines()])
return "\n".join(eq)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex02.inventory import InventoryDPModel
def a_expected_items_next_day(x : int, u : int) -> float:
model = InventoryDPModel()
expected_number_of_items = None
# TODO: Code has been removed from here.
raise NotImplementedError("Insert your solution and remove this error.")
return expected_number_of_items
def b_evaluate_policy(pi : list, x0 : int) -> float:
# TODO: Code has been removed from here.
raise NotImplementedError("Insert your solution and remove this error.")
return J_pi_x0
if __name__ == "__main__":
model = InventoryDPModel()
# Create a policy that always buy an item if the inventory is empty.
pi = [{s: 1 if s == 0 else 0 for s in model.S(k)} for k in range(model.N)]
x = 0
u = 1
x0 = 1
a_expected_items_next_day(x=0, u=1)
print(f"Given inventory is {x=} and we buy {u=}, the expected items on day k=1 is {a_expected_items_next_day(x, u)} and should be 0.1")
print(f"Evaluation of policy is {b_evaluate_policy(pi, x0)} and should be 2.7")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
"""
import sympy as sym
from irlc.ex03.control_model import ControlModel
from irlc.ex03.control_cost import SymbolicQRCost
import numpy as np
from irlc import savepdf
from gymnasium.spaces import Box
class KuramotoModel(ControlModel):
r"""
The Kuramoto model. It implements the following dynamics:
.. math::
\dot{x}(t) = u(t) +\cos(x(t))
I.e. the state and control variables are both one-dimensional. The cost function is simply:
.. math::
c(t) = \frac{1}{2}x(t)^2 + \frac{1}{2}u(t)^2
This is a QR cost with :math:`Q=R=1`.
"""
def u_bound(self) -> Box:
return Box(-2, 2, shape=(1,))
def x0_bound(self) -> Box:
return Box(0, 0, shape=(1,))
def get_cost(self) -> SymbolicQRCost:
"""
Create a cost-object. The code defines a quadratic cost (with the given matrices) and allows easy computation
of derivatives, etc. There are automatic ways to discretize the cost so you don't have to bother with that.
See the online documentation for further details.
"""
return SymbolicQRCost(Q=np.zeros((1, 1)), R=np.ones((1,1)))
def sym_f(self, x: list, u: list, t=None):
r""" Return a symbolic expression representing the Kuramoto model.
The inputs x, u are themselves *lists* of symbolic variables (insert breakpoint and check their value).
you have to use them to create a symbolic object representing f, and return it as a list. That is, you are going to return
.. codeblock:: python
return [f_val]
where ``f_val`` is the symbolic expression corresponding to the dynamics, i.e. :math:`u(t) + \cos( x(t))`.
Note you can use trigonometric functions like ``sym.cos``.
"""
# TODO: 1 lines missing.
raise NotImplementedError("Implement symbolic expression as a singleton list here")
# define the symbolic expression
return symbolic_f_list
def f(x, u):
""" Implement the kuramoto osscilator model's dynamics, i.e. f such that dx/dt = f(x,u).
The answer should be returned as a singleton list. """
cmodel = KuramotoModel()
# TODO: 1 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
# Use the ContiniousKuramotoModel to compute f(x,u). If in doubt, insert a breakpoint and let pycharms autocomplete
# guide you. See my video to Exercise 2 for how to use the debugger. Don't forget to specify t (for instance t=0).
# Note that sympys error messages can be a bit unforgiving.
return f_value
def rk4_simulate(x0, u, t0, tF, N=1000):
"""
Implement the RK4 algorithm (Her25, Algorithm 18).
In this function, x0 and u are constant numpy ndarrays. I.e. u is not a function, which simplify the RK4
algorithm a bit.
The function you want to integrate, f, is already defined above. You can likewise assume f is not a function of
time. t0 and tF play the same role as in the algorithm.
The function should return a numpy ndarray xs of dimension (N,) (containing all the x-values) and a numpy ndarray
tt containing the corresponding time points.
Hints:
* Call f as in f(x, u). You defined f earlier in this exercise.
"""
tt = np.linspace(t0, tF, N+1) # Time grid t_k = tt[k] between t0 and tF.
xs = [ x0 ]
f(x0, u) # This is how you can call f.
for k in range(N):
x_next = None # Obtain x_next = x_{k+1} using a single RK4 step.
# Remember to insert breakpoints and use the console to examine what the various variables are.
# TODO: 7 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
xs.append(x_next)
xs = np.stack(xs, axis=0)
return xs, tt
if __name__ == "__main__":
# Create a symbolic model corresponding to the Kuramoto model:
# Evaluate the dynamics dx / dt = f(x, u).
print("Value of f(x,u) in x=2, u=0.3", f([2], [0.3]))
print("Value of f(x,u) in x=0, u=1", f([0], [1]))
cmodel = KuramotoModel()
print(cmodel)
x0 = cmodel.x0_bound().low # Get the starting state x0. We exploit that the bound on x0 is an equality constraint.
u = 1.3
xs, ts = rk4_simulate(x0, [u], t0=0, tF=20, N=100)
xs_true, us_true, ts_true, cost = cmodel.simulate(x0, u_fun=u, t0=0, tF=20, N_steps=100)
"""You should generally use cmodel.simulate(...) to simulate the environment. Note that u_fun in the simulate
function can be set to a constant. Use this compute numpy ndarrays corresponding to the time, x and u values.
"""
# Plot the exact simulation of the environment
import matplotlib.pyplot as plt
plt.plot(ts_true, xs_true, 'k.-', label='RK4 state sequence x(t) (using model.simulate)')
plt.plot(ts, xs, 'r-', label='RK4 state sequence x(t) (using your code)')
plt.legend()
#savepdf('kuramoto_rk4')
plt.show(block=False)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import sympy as sym
from irlc.ex03.control_model import ControlModel
from irlc.ex03.control_cost import SymbolicQRCost
import numpy as np
class Toy2DControl(ControlModel):
def get_cost(self):
# You get the cost-function for free because it can be anything as far as this problem is concerned.
return SymbolicQRCost(Q=np.eye(2), R=np.eye(1))
# TODO: 2 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
def toy_simulation(u0 : float, T : float) -> float:
# TODO: 4 lines missing.
raise NotImplementedError("Create a Toy2dControl instance and use model.simulate(..) to get the final state.")
return wT
if __name__ == "__main__":
x0 = np.asarray([np.pi/2, 0])
wT = toy_simulation(u0=0.4, T=5)
print(f"Starting in x0=[pi/2, 0], after T=5 seconds the system is an an angle {wT=} (should be 1.265)")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment