Skip to content
Snippets Groups Projects
Commit 3b2436a0 authored by tuhe's avatar tuhe
Browse files

Exercise 2

parent 922ab3b9
No related branches found
No related tags found
No related merge requests found
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 2."""
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex02.dp_model import DPModel
class DeterministicInventoryDPModel(DPModel):
def __init__(self, N=3):
super().__init__(N=N)
def A(self, x, k): # Action space A_k(x)
return {0, 1, 2}
def S(self, k): # State space S_k
return {0, 1, 2}
def g(self, x, u, w, k): # Cost function g_k(x,u,w)
return u + (x + u - w) ** 2
def f(self, x, u, w, k): # Dynamics f_k(x,u,w)
return max(0, min(2, x + u - w ))
def Pw(self, x, u, k): # Distribution over random disturbances
"""In this problem we assume that p(w=k+1) = 1.
Return this as a dictionary of the form: {w : p(w)}."""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def gN(self, x):
return 0
def main():
model = DeterministicInventoryDPModel()
x = 1
u = 1
k = 1
p_w = model.Pw(x, u, k)
print("Probability that w=k+1 is p(w=k+1) =", p_w[k+1], "(should be 1)")
if __name__ == "__main__":
main()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
"""
from irlc.ex02.deterministic_inventory import DeterministicInventoryDPModel
from irlc.ex02.dp_model import DPModel
def DP_stochastic(model: DPModel):
r"""
Implement the stochastic DP algorithm. The implementation follows (Her25, Algorithm 1).
Once you are done, you should be able to call the function as:
.. runblock:: pycon
>>> from irlc.ex02.graph_traversal import SmallGraphDP
>>> from irlc.ex02.dp import DP_stochastic
>>> model = SmallGraphDP(t=5) # Instantiate the small graph with target node 5
>>> J, pi = DP_stochastic(model)
>>> print(pi[0][2]) # Action taken in state ``x=2`` at time step ``k=0``.
:param model: An instance of :class:`irlc.ex02.dp_model.DPModel` class. This represents the problem we wish to solve.
:return:
- ``J`` - A list of of cost function so that ``J[k][x]`` represents :math:`J_k(x)`
- ``pi`` - A list of dictionaries so that ``pi[k][x]`` represents :math:`\mu_k(x)`
"""
"""
In case you run into problems, I recommend following the hints in (Her25, Subsection 6.2.1) and focus on the
case without a noise term; once it works, you can add the w-terms. When you don't loop over noise terms, just specify
them as w = None in env.f and env.g.
"""
N = model.N
J = [{} for _ in range(N + 1)]
pi = [{} for _ in range(N)]
J[N] = {x: model.gN(x) for x in model.S(model.N)}
for k in range(N-1, -1, -1):
for x in model.S(k):
"""
Update pi[k][x] and Jstar[k][x] using the general DP algorithm given in (Her25, Algorithm 1).
If you implement it using the pseudo-code, I recommend you define Q (from the algorithm) as a dictionary like the J-function such that
> Q[u] = Q_u (for all u in model.A(x,k))
Then you find the u with the lowest value of Q_u, i.e.
> umin = arg_min_u Q[u]
(for help, google: `python find key in dictionary with minimum value').
Then you can use this to update J[k][x] = Q_umin and pi[k][x] = umin.
"""
# TODO: 4 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
"""
After the above update it should be the case that:
J[k][x] = J_k(x)
pi[k][x] = pi_k(x)
"""
return J, pi
if __name__ == "__main__": # Test dp on small graph given in (Her25, Subsection 6.2.1)
print("Testing the deterministic DP algorithm on the small graph environment")
model = DeterministicInventoryDPModel() # Instantiate the small graph with target node 5
J, pi = DP_stochastic(model)
# Print all optimal cost functions J_k(x_k)
for k in range(len(J)):
print(", ".join([f"J_{k}({i}) = {v:.1f}" for i, v in J[k].items()]))
print(f"Total cost when starting in state x_0 = 2: {J[0][2]=} (and should be 5)")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex01.agent import Agent
from irlc.ex02.dp import DP_stochastic
from irlc import train
import numpy as np
class DynamicalProgrammingAgent(Agent):
"""
This is an agent which plan using dynamical programming.
"""
def __init__(self, env, model=None):
super().__init__(env)
self.J, self.pi_ = DP_stochastic(model)
def pi(self, s, k, info=None):
if k >= len(self.pi_):
raise Exception("k >= N; I have not planned this far!")
## TODO: Half of each line of code in the following 1 lines have been replaced by garbage. Make it work and remove the error.
#----------------------------------------------------------------------------------------------------------------------------
# action = se????????????
raise NotImplementedError("Get the action according to the DP policy.")
return action
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None): # Do nothing; this is DP so no learning takes place.
pass
def main():
from irlc.ex01.inventory_environment import InventoryEnvironment
from irlc.ex02.inventory import InventoryDPModel
env = InventoryEnvironment(N=3)
inventory_model = InventoryDPModel(N=3)
agent = DynamicalProgrammingAgent(env, model=inventory_model)
stats, _ = train(env, agent, num_episodes=5000)
s, _ = env.reset() # Get initial state
Er = np.mean([stat['Accumulated Reward'] for stat in stats])
print("Estimated reward using trained policy and MC rollouts", Er)
print("Reward as computed using DP", -agent.J[0][s])
if __name__ == "__main__":
main()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np
class DPModel:
r""" The Dynamical Programming model class
The purpose of this class is to translate a dynamical programming problem, defined by the equations,
.. math::
x_{k+1} & = f_k(x_k, u_k, w_k) \\
\text{cost} & = g_k(x_k, u_k, w_k) \\
\text{terminal cost} & = g_N(x_N) \\
\text{Noise disturbances:} \quad w_k & \sim P_W(w_k | x_k, u_k) \\
\text{State/action spaces:} \quad & \mathcal A_k(x_k), \mathcal S_k
into a single python object which we can then use for planning.
.. Note::
This is the first time many of you encounter a class. If so, you might wonder why you can't just implement
the functions as usual, i.e. ``def f(x, k, ...):``, ``def g(x, k, ...):``,
as regular python function and just let that be it?
The reason is that we want to pass all these function (which taken together represents a planning problem)
to planning methods such as the DP-algorithm (see the function :func:`~irlc.ex02.dp.DP_stochastic`)
all at once.
It is not very convenient to pass the functions one at a time -- instead we collect them into a class and simply call the function as
>>> from irlc.ex02.inventory import InventoryDPModel
>>> from irlc.ex02.dp import DP_stochastic
>>> model = InventoryDPModel() # Intialize the model
>>> J, pi = DP_stochastic(model) # All functions are passed to DP_stochastic
To actually use the model, you need to extend it and implement the methods. The basic recipe for this is something like::
class MyDPModel(DPModel):
def f(self, x, u, w, k): # Note the `self`-variable. You can use it to access class variables such as`self.N`.
return x + u - w # Just an example
def S(self, k):
return [0, 1, 2] # State space S_k = {0, 1, 2}
# Implement the other functions A, g, gN and Pw here.
You should take a look at :func:`~irlc.ex02.inventory.InventoryDPModel` for a concrete example.
Once the functions have been implemented, you can call them as:
.. runblock:: pycon
>>> from irlc.ex02.inventory import InventoryDPModel
>>> model = InventoryDPModel(N=5) # Plan on a horizon of 5
>>> print("State space S_2", model.S(2))
>>> model.f(x=1, u=2, w=1, k=0) # Just an example. You don't have to use named arguments, although it helps on readability.
>>> model.A(1, k=2) # Action space A_1(2), i.e. the actions available at time step k=1 in state 2.
"""
def __init__(self, N):
"""
Called when the DP Model is initialized. By default, it simply stores the planning horizon ``N``
:param N: The planning horizon in the DP problem :math:`N`
"""
self.N = N # Store the planning horizon.
def f(self, x, u, w, k: int):
"""
Implements the transition function :math:`x_{k+1} = f_k(x, u, w)` and returns the next state :math:`x_{k+1}`
:param x: The state :math:`x_k`
:param u: The action taken :math:`u_k`
:param w: The random noise disturbance :math:`w_k`
:param k: The current time step :math:`k`
:return: The state the environment (deterministically) transitions to, i.e. :math:`x_{k+1}`
"""
raise NotImplementedError("Return f_k(x,u,w)")
def g(self, x, u, w, k: int) -> float:
"""
Implements the cost function :math:`c = g_k(x, u, w)` and returns the cost :math:`c`
:param x: The state :math:`x_k`
:param u: The action taken :math:`u_k`
:param w: The random noise disturbance :math:`w_k`
:param k: The current time step :math:`k`
:return: The cost (as a ``float``) incurred by the environment, i.e. :math:`g_k(x, u, w)`
"""
raise NotImplementedError("Return g_k(x,u,w)")
def gN(self, x) -> float:
"""
Implements the terminal cost function :math:`c = g_N(x)` and returns the terminal cost :math:`c`.
:param x: A state seen at the last time step :math:`x_N`
:return: The terminal cost (as a ``float``) incurred by the environment, i.e. :math:`g_N(x)`
"""
raise NotImplementedError("Return g_N(x)")
def S(self, k: int):
r"""
Computes the state space :math:`\mathcal S_k` at time step :math:`k`.
In other words, this function returns a set of all states the system can possibly be in at time step :math:`k`.
.. Note::
I think the cleanest implementation is one where this function returns a python ``set``. However, it won't matter
if the function returns a ``list`` or ``tuple`` instead.
:param k: The current time step :math:`k`
:return: The state space (as a ``list`` or ``set``) available at time step ``k``, i.e. :math:`\mathcal S_k`
"""
raise NotImplementedError("Return state space as set S_k = {x_1, x_2, ...}")
def A(self, x, k: int):
r"""
Computes the action space :math:`\mathcal A_k(x)` at time step :math:`k` in state `x`.
In other words, this function returns a ``set`` of all actions the agent can take in time step :math:`k`.
.. Note::
An example where the actions depend on the state is chess (in this case, the state is board position, and the actions are the legal moves)
:param k: The current time step :math:`k`
:param x: The state we want to compute the actions in :math:`x_k`
:return: The action space (as a ``list`` or ``set``) available at time step ``k``, i.e. :math:`\mathcal A_k(x_k)`
"""
raise NotImplementedError("Return action space as set A(x_k) = {u_1, u_2, ...}")
def Pw(self, x, u, k: int):
"""
Returns the random noise disturbances and their probability. In other words, this function implements the distribution:
.. math::
P_k(w_k | x_k, u_k)
To implement this distribution, we must keep track of both the possible values of the noise disturbances :math:`w_k`
as well as the (numerical) value of their probability :math:`p(w_k| ...)`.
To do this, the function returns a dictionary of the form ``P = {w1: p_w1, w2: p_w2, ...}`` where
- The keys ``w`` represents random noise disturbances
- the values ``P[w]`` represents their probability (i.e. a ``float``)
This can hopefully be made more clear with the Inventory environment:
.. runblock:: pycon
>>> from irlc.ex02.inventory import InventoryDPModel
>>> model = InventoryDPModel(N=5) # Plan on a horizon of 5
>>> print("Random noise disturbances in state x=1 using action u=0 is:", model.Pw(x=1, u=0, k=0))
>>> for w, pw in model.Pw(x=1, u=0, k=0).items(): # Iterate and print:
... print(f"p_k({w}|x, u) =", pw)
:param x: The state :math:`x_k`
:param u: The action taken :math:`u_k`
:param k: The current time step :math:`k`
:return: A dictionary representing the distribution of random noise disturbances :math:`P_k(w |x_k, u_k)` of the form ``{..., w_i: pw_i, ...}`` such that ``pw_i = P_k(w_i | x, u)``
"""
# Compute and return the random noise disturbances here.
# As an example:
return {'w_dummy': 1/3, 42: 2/3} # P(w_k="w_dummy") = 1/3, P(w_k =42)=2/3.
def w_rnd(self, x, u, k):
r"""
This helper function computes generates a random noise disturbance using the function
:func:`irlc.ex02.dp_model.DPModel.Pw`, i.e. it returns a sample:
.. math::
w \sim P_k(x_k, u_k)
This will be useful for simulating the model.
.. Note::
You don't have to implement or change this function.
:param x: The state :math:`x_k`
:param u: The action taken :math:`u_k`
:param k: The current time step :math:`k`
:return: A random noise disturbance :math:`w` distributed as :math:`P_k(x_k, u_k)`
"""
pW = self.Pw(x, u, k)
w, pw = zip(*pW.items()) # seperate w and p(w)
return np.random.choice(a=w, p=pw)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex02.inventory import InventoryDPModel
from irlc.ex02.dp import DP_stochastic
import numpy as np
# TODO: Code has been removed from here.
raise NotImplementedError("Insert your solution and remove this error.")
def a_get_policy(N: int, c: float, x0 : int) -> int:
# TODO: Code has been removed from here.
raise NotImplementedError("Insert your solution and remove this error.")
return u
def b_prob_one(N : int, x0 : int) -> float:
# TODO: Code has been removed from here.
raise NotImplementedError("Insert your solution and remove this error.")
return pr_empty
if __name__ == "__main__":
model = InventoryDPModel()
pi = [{s: 0 for s in model.S(k)} for k in range(model.N)]
x0 = 0
c = 0.5
N = 3
print(f"a) The policy choice for {c=} is {a_get_policy(N, c,x0)} should be 1")
print(f"b) The probability of ending up with a single element in the inventory is {b_prob_one(N, x0)} and should be 0.492")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025.
"""
r"""
Implements the inventory-control problem from (Her25, Subsection 5.1.2).
"""
from irlc.ex02.dp_model import DPModel
from irlc.ex02.dp import DP_stochastic
class InventoryDPModel(DPModel):
def __init__(self, N=3):
super().__init__(N=N)
def A(self, x, k): # Action space A_k(x)
return {0, 1, 2}
def S(self, k): # State space S_k
return {0, 1, 2}
def g(self, x, u, w, k): # Cost function g_k(x,u,w)
return u + (x + u - w) ** 2
def f(self, x, u, w, k): # Dynamics f_k(x,u,w)
return max(0, min(2, x + u - w ))
def Pw(self, x, u, k): # Distribution over random disturbances
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def gN(self, x):
return 0
def main():
inv = InventoryDPModel()
J,pi = DP_stochastic(inv)
print(f"Inventory control optimal policy/value functions")
for k in range(inv.N):
print(", ".join([f" J_{k}(x_{k}={i}) = {J[k][i]:.2f}" for i in inv.S(k)] ) )
for k in range(inv.N):
print(", ".join([f"pi_{k}(x_{k}={i}) = {pi[k][i]}" for i in inv.S(k)] ) )
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment