Newer
Older
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
"""
from collections import defaultdict
import tabulate
import sympy as sym
import numpy as np
import matplotlib.pyplot as plt
from gymnasium.spaces import Box
from irlc.ex03.control_cost import SymbolicQRCost
class ControlModel:
r"""Represents the continious time model of a control environment.
See (Her25, Section 13.2) for a top-level description.
The model represents the physical system we are simulating and can be considered a control-equivalent of the
:class:`irlc.ex02.dp_model.DPModel`. The class must keep track of the following:
.. math::
\frac{dx}{dt} = f(x, u, t)
And the cost-function which is defined as an integral
.. math::
c_F(t_0, t_F, x(t_0), x(t_F)) + \int_{t_0}^{t_F} c(t, x, u) dt
as well as constraints and boundary conditions on :math:`x`, :math:`u` and the initial conditions state :math:`x(t_0)`.
this course, the cost function will always be quadratic and can be obtained using the function :func:`~irlc.ex03.control_model.ControlModel.get_cost`.
If you want to implement your own model, the best approach is to start with an existing model and modify it for
your needs. The overall idea is that you implement the dynamics, :func:`~irlc.ex03.control_model.ControlModel.sym_f`, and the cost function :func:`~irlc.ex03.control_model.ControlModel.get_cost`,
and (optionally) define whichever bounds are suitable for your problem.
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
state_labels = None # Labels (as lists) used for visualizations.
action_labels = None # Labels (as lists) used for visualizations.
def __init__(self):
if self.state_labels is None:
self.state_labels = [f'x{i}' for i in range(self.state_size)]
if self.action_labels is None:
self.action_labels = [f'u{i}' for i in range(self.action_size)]
t = sym.symbols("t")
x = sym.symbols(f"x0:{self.state_size}")
u = sym.symbols(f"u0:{self.action_size}")
try:
f = self.sym_f(x, u, t)
except Exception as e:
print("control_model.py> There is a problem with the way you have specified the dynamics. The function sym_f must accept lists as inputs")
raise e
if len(f) != len(x):
print("control_model.py> Your function ControlModel.sym_f must output a list of symbolic expressions.")
assert len(f) == len(x)
self._f_np = sym.lambdify((x, u, t), self.sym_f(x, u, t))
def x0_bound(self) -> Box:
r"""The bound on the initial state :math:`\mathbf{x}_0`.
The default bound is ``Box(0, 0, shape=(self.state_size,))``, i.e. :math:`\mathbf{x}_0 = 0`.
:return: An appropriate gymnasium Box instance.
"""
return Box(0, 0, shape=(self.state_size,))
def xF_bound(self) -> Box:
r"""The bound on the terminal state :math:`\mathbf{x}_F`.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(self.state_size,))
def x_bound(self) -> Box:
r"""The bound on all other states :math:`\mathbf{x}(t)`.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(self.state_size,))
def u_bound(self) -> Box:
r"""The bound on the terminal state :math:`\mathbf{u}(t)`.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(self.action_size,))
def t0_bound(self) -> Box:
r"""The bound on the initial time :math:`\mathbf{t}_0`.
I have included this bound for completeness: In practice, there is no reason why you should change it
from the default bound is ``Box(0, 0, shape=(1,))``, i.e. :math:`\mathbf{t}_0 = 0`.
:return: An appropriate gymnasium Box instance.
"""
return Box(0, 0, shape=(1,))
def tF_bound(self) -> Box:
r"""The bound on the final time :math:`\mathbf{t}_F`, i.e. when the environment terminates.
:return: An appropriate gymnasium Box instance.
"""
return Box(-np.inf, np.inf, shape=(1,))
def get_cost(self) -> SymbolicQRCost:
"""Return the cost.
This function should return a :func:`~irlc.ex03.control_cost.SymbolicQRCost` instance. The function must be implemented
since the cost object is used to guess the number of states and actions. Note that you can easily specify a trivial cost
instance such as :python:`return SymbolicQRCost.zero(n_states, n_actions)`.
:return: A :func:`~irlc.ex03.control_cost.SymbolicQRCost` instance representing the models cost function.
"""
raise NotImplementedError("When you implement the model, you must implement the get_cost() function.\nfor instance, use return SymbolicQRCost(Q=np.eye(n), R=np.eye(d))")
def sym_f(self, x, u, t=None):
"""
The symbolic (:mod:`sympy`) version of the dynamics :math:`f(x, u, t)`. This is the main place where you specify
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
the dynamics when you build a new model. you should look at concrete implementations of models for specifics.
:param x: A list of symbolic expressions ``['x0', 'x1', ..]`` corresponding to :math:`x`
:param u: A list of symbolic expressions ``['u0', 'u1', ..]`` corresponding to :math:`u`
:param t: A single symbolic expression corresponding to the time :math:`t` (seconds)
:return: A list of symbolic expressions ``[f0, f1, ...]`` of the same length as ``x`` where each element is a coordinate of :math:`f`
"""
raise NotImplementedError("Implement a function which return the environment dynamics f(x,u,t) as a sympy exression")
def f(self, x, u, t=0) -> np.ndarray:
r"""Evaluate the dynamics.
This function will evaluate the dynamics. In other words, it will evaluate :math:`\mathbf{f}` in the following expression:
.. math::
\dot{\mathbf{x}} = \mathbf{f}(\mathbf{x}, \mathbf{u}, t)
:param x: A numpy ndarray corresponding to the state
:param u: A numpy ndarray corresponding to the control
:param t: A :python:`float` corresponding to the time.
:return: The time derivative of the state, :math:`\mathbf{x}(t)`.
"""
return np.asarray( self._f_np(x, u, t) )
def simulate(self, x0, u_fun, t0, tF, N_steps=1000, method='rk4'):
r"""Simulate the effect of a policy on the model.
By default, it uses Runge-Kutta 4 (RK4) with a fine discretization -- this is slow, but in nearly all cases exact. See (Her25, Algorithm 18) for more information.
The input argument ``u_fun`` should be a function which returns a list or tuple with same dimension as
``model.action_space``, :math:`d`.
:param x0: The initial state of the simulation. Must be a list of floats of same dimension as ``env.observation_space``, :math:`n`.
:param u_fun: Can be either:
- Either a policy function that can be called as ``u_fun(x, t)`` and returns an action ``u`` in the ``action_space``
- A single action (i.e. a list of floats of same length as the action space). The model will be simulated with a constant action in this case.
:param float t0: Starting time :math:`t_0`
:param float tF: Stopping time :math:`t_F`; the model will be simulated for :math:`t_F - t_0` seconds
:param int N_steps: Steps :math:`N` in the RK4 simulation
:param str method: Simulation method. Either ``'rk4'`` (default) or ``'euler'``
The last output argument is the total cost of the trajectory:
.. math::
\int_{t_0}^{t_F} c(x(t), u(t), t) dt + c_f(t_0, t_F, x(t_0), x(t_F) )
the integral is approximated using the simulated trajectory.
:return:
- xs - A numpy ``ndarray`` of dimension :math:`(N+1)\\times n` containing the observations :math:`x`
- us - A numpy ``ndarray`` of dimension :math:`(N+1)\\times d` containing the actions :math:`u`
- ts - A numpy ``ndarray`` of dimension :math:`(N+1)` containing the corresponding times :math:`t` (seconds)
- total_cost - A ``float`` of the total cost of the trajectory.
"""
assert N_steps >= 1, "Simulation should include at least 1 time step."
u_fun = ensure_policy(u_fun)
tt = np.linspace(t0, tF, N_steps+1) # Time grid t_k = tt[k] between t0 and tF.
xs = [ np.asarray(x0) ]
us = [ u_fun(x0, t0 )]
for k in range(N_steps):
Delta = tt[k+1] - tt[k]
tn = tt[k]
xn = xs[k]
un = us[k] # ensure the action u is a vector.
unp = u_fun(xn, tn + Delta)
if method == 'rk4':
r""" Implementation of RK4 here. See: (Her25, Algorithm 18) """
k1 = np.asarray(self.f(xn, un, tn))
k2 = np.asarray(self.f(xn + Delta * k1/2, u_fun(xn, tn+Delta/2), tn+Delta/2))
k3 = np.asarray(self.f(xn + Delta * k2/2, u_fun(xn, tn+Delta/2), tn+Delta/2))
k4 = np.asarray(self.f(xn + Delta * k3, u_fun(xn, tn + Delta), tn+Delta))
xnp = xn + 1/6 * Delta * (k1 + 2*k2 + 2*k3 + k4)
elif method == 'euler':
xnp = xn + Delta * np.asarray(self.f(xn, un, tn))
else:
raise Exception("Bad integration method", method)
xs.append(xnp)
us.append(unp)
xs = np.stack(xs, axis=0)
us = np.stack(us, axis=0)
# Compute the cost.
cost = self.get_cost()
total_cost = float( sum( [ Delta * cost._private_evaluate_numpy_c(x, u, k) for k, (x, u) in enumerate(zip(xs[:-1], us[:-1]))] ) + cost._private_evaluate_numpy_cf(t0, tF, x0, xs[-1]) )
return xs, us, tt, total_cost
@property
def state_size(self):
"""
This field represents the dimensionality of the state-vector :math:`n`. Use it as ``model.state_size``
:return: Dimensionality of the state vector :math:`x`
"""
return self.get_cost().state_size
@property
def action_size(self):
"""
This field represents the dimensionality of the action-vector :math:`d`. Use it as ``model.action_size``
:return: Dimensionality of the action vector :math:`u`
"""
return self.get_cost().action_size
def render(self, x, render_mode="human"):
"""
Responsible for rendering the state. You don't have to worry about this function.
:param x: State to render
:param str render_mode: Rendering mode. Select ``"human"`` for a visualization.
:return: Either none or a ``ndarray`` for plotting.
"""
raise NotImplementedError()
def close(self):
pass
def phi_x(self, x : list) -> list:
r"""Coordinate transformation of the state when the model is discretized.
This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is
discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list
of symbols corresponding to the (discrete) coordinates.
:param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)`
:return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`.
"""
return x
def phi_x_inv(self, x: list) -> list:
r"""Inverse of coordinate transformation for the state.
This function should specify the inverse of the coordinate transformation :math:`\Phi_x`, i.e. :math:`\Phi_x^{-1}`.
In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`x(t) = \Phi_x^{-1}(x_k)`.
:param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}_k`
:return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{x}(t)`.
"""
return x
def phi_u(self, u: list) -> list:
r"""Coordinate transformation of the action when the model is discretized.
This function specifies the coordinate transformation :math:`x_k = \Phi_x(x(t_k))` which is applied to the environment when it is
discretized. It should accept a list of symbols, corresponding to :math:`x`, and return a new list
of symbols corresponding to the (discrete) coordinates.
:param x: A list of symbols ``[x0, x1, ..., xn]`` corresponding to :math:`\mathbf{x}(t)`
:return: A new list of symbols corresponding to the discrete coordinates :math:`\mathbf{x}_k`.
"""
return u
def phi_u_inv(self, u: list) -> list:
r"""Inverse of coordinate transformation for the action.
This function should specify the inverse of the coordinate transformation :math:`\Phi_u`, i.e. :math:`\Phi_u^{-1}`.
In other words, it has to map from the discrete coordinates to the continuous-time coordinates: :math:`u(t) = \Phi_u^{-1}(u_k)`.
:param x: A list of symbols ``[u0, u1, ..., ud]`` corresponding to :math:`\mathbf{u}_k`
:return: A new list of symbols corresponding to the continuous-time coordinates :math:`\mathbf{u}(t)`.
"""
return u
def __str__(self):
"""
Return a string representation of the model. This is a potentially helpful way to summarize the content of the
model. You can use it as:
.. runblock:: pycon
>>> from irlc.ex04.model_pendulum import SinCosPendulumModel
>>> model = SinCosPendulumModel()
>>> print(model)
:return: A string containing the details of the model.
"""
split = "-"*20
s = [f"{self.__class__}"] + ['='*50]
s += ["Dynamics:", split]
t = sym.symbols("t")
x = sym.symbols(f"x0:{self.state_size}")
u = sym.symbols(f"u0:{self.action_size}")
s += [typeset_eq(x, u, self.sym_f(x, u, t) )]
s += ["Cost:", split, str(self.get_cost())]
dd = defaultdict(list)
bounds = [ ('x', self.x_bound()), ('x0', self.x0_bound()), ('xF', self.xF_bound()),
('u', self.u_bound()),
('t0', self.t0_bound()), ('tF', self.tF_bound())]
for v, box in bounds:
if (box.low == -np.inf).all() and (box.high == np.inf).all():
continue
dd['low'].append(box.low_repr)
dd['variable'].append("<= " + v + " <=")
dd['high'].append(box.high_repr)
if len(dd) > 0:
s += ["Bounds:", split]
s += [tabulate.tabulate(dd, headers='keys')]
else:
s += ['No bounds are applied to the x and u-variables.']
return "\n".join(s)
def symv(s, n):
"""
Returns a vector of symbolic functions. For instance if s='x' and n=3 then it will return
[x0,x1,x2]
where x0,..,x2 are symbolic variables.
"""
return sym.symbols(" ".join(["%s%i," % (s, i) for i in range(n)]))
def ensure_policy(u):
"""
Ensure u corresponds to a policy function with input arguments u(x, t)
"""
if callable(u):
return lambda x, t: np.asarray(u(x,t)).reshape((-1,))
else:
return lambda x, t: np.asarray(u).reshape((-1,))
def plot_trajectory(x_res, tt, lt='k-', ax=None, labels=None, legend=None):
M = x_res.shape[1]
if labels is None:
labels = [f"x_{i}" for i in range(M)]
if ax is None:
if M == 2:
a = 234
if M == 3:
r = 1
c = 3
else:
r = 2 if M > 1 else 1
c = (M + 1) // 2
H = 2*r if r > 1 else 3
W = 6*c
# if M == 2:
# W = 12
f, ax = plt.subplots(r,c, figsize=(W,H))
if M == 1:
ax = np.asarray([ax])
print(M,r,c)
for i in range(M):
if len(ax) <= i:
print("issue!")
a = ax.flat[i]
a.plot(tt, x_res[:, i], lt, label=legend)
a.set_xlabel("Time/seconds")
a.set_ylabel(labels[i])
# a.set_title(labels[i])
a.grid(True)
if legend is not None and i == 0:
a.legend()
# if i == M:
plt.tight_layout()
return ax
def make_space_above(axes, topmargin=1.0):
""" increase figure size to make topmargin (in inches) space for
titles, without changing the axes sizes"""
fig = axes.flatten()[0].figure
s = fig.subplotpars
w, h = fig.get_size_inches()
figh = h - (1-s.top)*h + topmargin
fig.subplots_adjust(bottom=s.bottom*h/figh, top=1-topmargin/figh)
fig.set_figheight(figh)
def typeset_eq(x, u, f):
def ascii_vector(ls):
ml = max(map(len, ls))
ls = [" " * (ml - len(s)) + s for s in ls]
ls = ["[" + s + "]" for s in ls]
return "\n".join(ls)
v = [str(z) for z in f]
def cstack(ls: list):
# ls = [l.splitlines() for l in ls]
height = max([len(l) for l in ls])
widths = [len(l[0]) for l in ls]
for k in range(len(ls)):
missing2 = (height - len(ls[k])) // 2
missing1 = (height - len(ls[k]) - missing2)
tpad = [" " * widths[k]] * missing1
bpad = [" " * widths[k]] * missing2
ls[k] = tpad + ls[k] + bpad
r = [""] * len(ls[0])
for w in range(len(ls)):
for h in range(len(ls[0])):
r[h] += ls[w][h]
return r
xx = [str(x) for x in x]
uu = [str(u) for u in u]
xx = ascii_vector(xx).splitlines()
uu = ascii_vector(uu).splitlines()
cm = cstack([xx, [", "], uu])
eq = cstack([["f("], cm, [")"]])
eq = cstack([[" "], eq, [" = "], ascii_vector(v).splitlines()])
return "\n".join(eq)