diff --git a/irlc/ex02/__init__.py b/irlc/ex02/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..97bfecdc3daf77e00a1987adf3a7d3dba98c5aa4 --- /dev/null +++ b/irlc/ex02/__init__.py @@ -0,0 +1,2 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +"""This directory contains the exercises for week 2.""" diff --git a/irlc/ex02/deterministic_inventory.py b/irlc/ex02/deterministic_inventory.py new file mode 100644 index 0000000000000000000000000000000000000000..9d44aad1836db1112842f8d5fc4abdfe603ebcbc --- /dev/null +++ b/irlc/ex02/deterministic_inventory.py @@ -0,0 +1,40 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex02.dp_model import DPModel + +class DeterministicInventoryDPModel(DPModel): + def __init__(self, N=3): + super().__init__(N=N) + + def A(self, x, k): # Action space A_k(x) + return {0, 1, 2} + + def S(self, k): # State space S_k + return {0, 1, 2} + + def g(self, x, u, w, k): # Cost function g_k(x,u,w) + return u + (x + u - w) ** 2 + + def f(self, x, u, w, k): # Dynamics f_k(x,u,w) + return max(0, min(2, x + u - w )) + + def Pw(self, x, u, k): # Distribution over random disturbances + """In this problem we assume that p(w=k+1) = 1. + Return this as a dictionary of the form: {w : p(w)}.""" + # TODO: 1 lines missing. + raise NotImplementedError("Implement function body") + + def gN(self, x): + return 0 + + +def main(): + model = DeterministicInventoryDPModel() + x = 1 + u = 1 + k = 1 + p_w = model.Pw(x, u, k) + print("Probability that w=k+1 is p(w=k+1) =", p_w[k+1], "(should be 1)") + + +if __name__ == "__main__": + main() diff --git a/irlc/ex02/dp.py b/irlc/ex02/dp.py new file mode 100644 index 0000000000000000000000000000000000000000..e1cf1ea85f2b0f2d9c94732b17baab4b422293b1 --- /dev/null +++ b/irlc/ex02/dp.py @@ -0,0 +1,71 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025. +""" +from irlc.ex02.deterministic_inventory import DeterministicInventoryDPModel +from irlc.ex02.dp_model import DPModel + +def DP_stochastic(model: DPModel): + r""" + Implement the stochastic DP algorithm. The implementation follows (Her25, Algorithm 1). + Once you are done, you should be able to call the function as: + + .. runblock:: pycon + + >>> from irlc.ex02.graph_traversal import SmallGraphDP + >>> from irlc.ex02.dp import DP_stochastic + >>> model = SmallGraphDP(t=5) # Instantiate the small graph with target node 5 + >>> J, pi = DP_stochastic(model) + >>> print(pi[0][2]) # Action taken in state ``x=2`` at time step ``k=0``. + + :param model: An instance of :class:`irlc.ex02.dp_model.DPModel` class. This represents the problem we wish to solve. + :return: + - ``J`` - A list of of cost function so that ``J[k][x]`` represents :math:`J_k(x)` + - ``pi`` - A list of dictionaries so that ``pi[k][x]`` represents :math:`\mu_k(x)` + """ + + """ + In case you run into problems, I recommend following the hints in (Her25, Subsection 6.2.1) and focus on the + case without a noise term; once it works, you can add the w-terms. When you don't loop over noise terms, just specify + them as w = None in env.f and env.g. + """ + N = model.N + J = [{} for _ in range(N + 1)] + pi = [{} for _ in range(N)] + J[N] = {x: model.gN(x) for x in model.S(model.N)} + for k in range(N-1, -1, -1): + for x in model.S(k): + """ + Update pi[k][x] and Jstar[k][x] using the general DP algorithm given in (Her25, Algorithm 1). + If you implement it using the pseudo-code, I recommend you define Q (from the algorithm) as a dictionary like the J-function such that + + > Q[u] = Q_u (for all u in model.A(x,k)) + + Then you find the u with the lowest value of Q_u, i.e. + + > umin = arg_min_u Q[u] + + (for help, google: `python find key in dictionary with minimum value'). + Then you can use this to update J[k][x] = Q_umin and pi[k][x] = umin. + """ + # TODO: 4 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + """ + After the above update it should be the case that: + + J[k][x] = J_k(x) + pi[k][x] = pi_k(x) + """ + return J, pi + + +if __name__ == "__main__": # Test dp on small graph given in (Her25, Subsection 6.2.1) + print("Testing the deterministic DP algorithm on the small graph environment") + model = DeterministicInventoryDPModel() # Instantiate the small graph with target node 5 + J, pi = DP_stochastic(model) + # Print all optimal cost functions J_k(x_k) + for k in range(len(J)): + print(", ".join([f"J_{k}({i}) = {v:.1f}" for i, v in J[k].items()])) + print(f"Total cost when starting in state x_0 = 2: {J[0][2]=} (and should be 5)") diff --git a/irlc/ex02/dp_agent.py b/irlc/ex02/dp_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..7e49efd69dacab5359d6fa208c5dce5b125f4ba5 --- /dev/null +++ b/irlc/ex02/dp_agent.py @@ -0,0 +1,44 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex01.agent import Agent +from irlc.ex02.dp import DP_stochastic +from irlc import train +import numpy as np + + +class DynamicalProgrammingAgent(Agent): + """ + This is an agent which plan using dynamical programming. + """ + def __init__(self, env, model=None): + super().__init__(env) + self.J, self.pi_ = DP_stochastic(model) + + def pi(self, s, k, info=None): + if k >= len(self.pi_): + raise Exception("k >= N; I have not planned this far!") + ## TODO: Half of each line of code in the following 1 lines have been replaced by garbage. Make it work and remove the error. + #---------------------------------------------------------------------------------------------------------------------------- + # action = se???????????? + raise NotImplementedError("Get the action according to the DP policy.") + return action + + def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None): # Do nothing; this is DP so no learning takes place. + pass + + +def main(): + from irlc.ex01.inventory_environment import InventoryEnvironment + from irlc.ex02.inventory import InventoryDPModel + + env = InventoryEnvironment(N=3) + inventory_model = InventoryDPModel(N=3) + agent = DynamicalProgrammingAgent(env, model=inventory_model) + stats, _ = train(env, agent, num_episodes=5000) + + s, _ = env.reset() # Get initial state + Er = np.mean([stat['Accumulated Reward'] for stat in stats]) + print("Estimated reward using trained policy and MC rollouts", Er) + print("Reward as computed using DP", -agent.J[0][s]) + +if __name__ == "__main__": + main() diff --git a/irlc/ex02/dp_model.py b/irlc/ex02/dp_model.py new file mode 100644 index 0000000000000000000000000000000000000000..1cb590585b8a6414aa405a1338787e753380e482 --- /dev/null +++ b/irlc/ex02/dp_model.py @@ -0,0 +1,185 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import numpy as np + +class DPModel: + r""" The Dynamical Programming model class + + The purpose of this class is to translate a dynamical programming problem, defined by the equations, + + .. math:: + + x_{k+1} & = f_k(x_k, u_k, w_k) \\ + \text{cost} & = g_k(x_k, u_k, w_k) \\ + \text{terminal cost} & = g_N(x_N) \\ + \text{Noise disturbances:} \quad w_k & \sim P_W(w_k | x_k, u_k) \\ + \text{State/action spaces:} \quad & \mathcal A_k(x_k), \mathcal S_k + + into a single python object which we can then use for planning. + + .. Note:: + + This is the first time many of you encounter a class. If so, you might wonder why you can't just implement + the functions as usual, i.e. ``def f(x, k, ...):``, ``def g(x, k, ...):``, + as regular python function and just let that be it? + + The reason is that we want to pass all these function (which taken together represents a planning problem) + to planning methods such as the DP-algorithm (see the function :func:`~irlc.ex02.dp.DP_stochastic`) + all at once. + It is not very convenient to pass the functions one at a time -- instead we collect them into a class and simply call the function as + + >>> from irlc.ex02.inventory import InventoryDPModel + >>> from irlc.ex02.dp import DP_stochastic + >>> model = InventoryDPModel() # Intialize the model + >>> J, pi = DP_stochastic(model) # All functions are passed to DP_stochastic + + + + To actually use the model, you need to extend it and implement the methods. The basic recipe for this is something like:: + + class MyDPModel(DPModel): + def f(self, x, u, w, k): # Note the `self`-variable. You can use it to access class variables such as`self.N`. + return x + u - w # Just an example + def S(self, k): + return [0, 1, 2] # State space S_k = {0, 1, 2} + # Implement the other functions A, g, gN and Pw here. + + + You should take a look at :func:`~irlc.ex02.inventory.InventoryDPModel` for a concrete example. + Once the functions have been implemented, you can call them as: + + .. runblock:: pycon + + >>> from irlc.ex02.inventory import InventoryDPModel + >>> model = InventoryDPModel(N=5) # Plan on a horizon of 5 + >>> print("State space S_2", model.S(2)) + >>> model.f(x=1, u=2, w=1, k=0) # Just an example. You don't have to use named arguments, although it helps on readability. + >>> model.A(1, k=2) # Action space A_1(2), i.e. the actions available at time step k=1 in state 2. + + """ + def __init__(self, N): + """ + Called when the DP Model is initialized. By default, it simply stores the planning horizon ``N`` + + :param N: The planning horizon in the DP problem :math:`N` + """ + self.N = N # Store the planning horizon. + + def f(self, x, u, w, k: int): + """ + Implements the transition function :math:`x_{k+1} = f_k(x, u, w)` and returns the next state :math:`x_{k+1}` + + :param x: The state :math:`x_k` + :param u: The action taken :math:`u_k` + :param w: The random noise disturbance :math:`w_k` + :param k: The current time step :math:`k` + :return: The state the environment (deterministically) transitions to, i.e. :math:`x_{k+1}` + """ + raise NotImplementedError("Return f_k(x,u,w)") + + def g(self, x, u, w, k: int) -> float: + """ + Implements the cost function :math:`c = g_k(x, u, w)` and returns the cost :math:`c` + + :param x: The state :math:`x_k` + :param u: The action taken :math:`u_k` + :param w: The random noise disturbance :math:`w_k` + :param k: The current time step :math:`k` + :return: The cost (as a ``float``) incurred by the environment, i.e. :math:`g_k(x, u, w)` + """ + raise NotImplementedError("Return g_k(x,u,w)") + + def gN(self, x) -> float: + """ + Implements the terminal cost function :math:`c = g_N(x)` and returns the terminal cost :math:`c`. + + :param x: A state seen at the last time step :math:`x_N` + :return: The terminal cost (as a ``float``) incurred by the environment, i.e. :math:`g_N(x)` + """ + raise NotImplementedError("Return g_N(x)") + + def S(self, k: int): + r""" + Computes the state space :math:`\mathcal S_k` at time step :math:`k`. + In other words, this function returns a set of all states the system can possibly be in at time step :math:`k`. + + .. Note:: + I think the cleanest implementation is one where this function returns a python ``set``. However, it won't matter + if the function returns a ``list`` or ``tuple`` instead. + + :param k: The current time step :math:`k` + :return: The state space (as a ``list`` or ``set``) available at time step ``k``, i.e. :math:`\mathcal S_k` + """ + raise NotImplementedError("Return state space as set S_k = {x_1, x_2, ...}") + + def A(self, x, k: int): + r""" + Computes the action space :math:`\mathcal A_k(x)` at time step :math:`k` in state `x`. + + In other words, this function returns a ``set`` of all actions the agent can take in time step :math:`k`. + + .. Note:: + An example where the actions depend on the state is chess (in this case, the state is board position, and the actions are the legal moves) + + :param k: The current time step :math:`k` + :param x: The state we want to compute the actions in :math:`x_k` + :return: The action space (as a ``list`` or ``set``) available at time step ``k``, i.e. :math:`\mathcal A_k(x_k)` + """ + raise NotImplementedError("Return action space as set A(x_k) = {u_1, u_2, ...}") + + def Pw(self, x, u, k: int): + """ + Returns the random noise disturbances and their probability. In other words, this function implements the distribution: + + .. math:: + + P_k(w_k | x_k, u_k) + + To implement this distribution, we must keep track of both the possible values of the noise disturbances :math:`w_k` + as well as the (numerical) value of their probability :math:`p(w_k| ...)`. + + To do this, the function returns a dictionary of the form ``P = {w1: p_w1, w2: p_w2, ...}`` where + + - The keys ``w`` represents random noise disturbances + - the values ``P[w]`` represents their probability (i.e. a ``float``) + + This can hopefully be made more clear with the Inventory environment: + + .. runblock:: pycon + + >>> from irlc.ex02.inventory import InventoryDPModel + >>> model = InventoryDPModel(N=5) # Plan on a horizon of 5 + >>> print("Random noise disturbances in state x=1 using action u=0 is:", model.Pw(x=1, u=0, k=0)) + >>> for w, pw in model.Pw(x=1, u=0, k=0).items(): # Iterate and print: + ... print(f"p_k({w}|x, u) =", pw) + + + :param x: The state :math:`x_k` + :param u: The action taken :math:`u_k` + :param k: The current time step :math:`k` + :return: A dictionary representing the distribution of random noise disturbances :math:`P_k(w |x_k, u_k)` of the form ``{..., w_i: pw_i, ...}`` such that ``pw_i = P_k(w_i | x, u)`` + """ + # Compute and return the random noise disturbances here. + # As an example: + return {'w_dummy': 1/3, 42: 2/3} # P(w_k="w_dummy") = 1/3, P(w_k =42)=2/3. + + def w_rnd(self, x, u, k): + r""" + This helper function computes generates a random noise disturbance using the function + :func:`irlc.ex02.dp_model.DPModel.Pw`, i.e. it returns a sample: + + .. math:: + w \sim P_k(x_k, u_k) + + This will be useful for simulating the model. + + .. Note:: + You don't have to implement or change this function. + + :param x: The state :math:`x_k` + :param u: The action taken :math:`u_k` + :param k: The current time step :math:`k` + :return: A random noise disturbance :math:`w` distributed as :math:`P_k(x_k, u_k)` + """ + pW = self.Pw(x, u, k) + w, pw = zip(*pW.items()) # seperate w and p(w) + return np.random.choice(a=w, p=pw) diff --git a/irlc/ex02/flower_store.py b/irlc/ex02/flower_store.py new file mode 100644 index 0000000000000000000000000000000000000000..35a4712bdd7f06688b8eaebc539e37b804e331ea --- /dev/null +++ b/irlc/ex02/flower_store.py @@ -0,0 +1,27 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex02.inventory import InventoryDPModel +from irlc.ex02.dp import DP_stochastic +import numpy as np + +# TODO: Code has been removed from here. +raise NotImplementedError("Insert your solution and remove this error.") + +def a_get_policy(N: int, c: float, x0 : int) -> int: + # TODO: Code has been removed from here. + raise NotImplementedError("Insert your solution and remove this error.") + return u + +def b_prob_one(N : int, x0 : int) -> float: + # TODO: Code has been removed from here. + raise NotImplementedError("Insert your solution and remove this error.") + return pr_empty + + +if __name__ == "__main__": + model = InventoryDPModel() + pi = [{s: 0 for s in model.S(k)} for k in range(model.N)] + x0 = 0 + c = 0.5 + N = 3 + print(f"a) The policy choice for {c=} is {a_get_policy(N, c,x0)} should be 1") + print(f"b) The probability of ending up with a single element in the inventory is {b_prob_one(N, x0)} and should be 0.492") diff --git a/irlc/ex02/inventory.py b/irlc/ex02/inventory.py new file mode 100644 index 0000000000000000000000000000000000000000..c4aaf58a6d7217f0575a38f3a141fa63dccc0cf5 --- /dev/null +++ b/irlc/ex02/inventory.py @@ -0,0 +1,46 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [Her25] Tue Herlau. Sequential decision making. (Freely available online), 2025. +""" +r""" +Implements the inventory-control problem from (Her25, Subsection 5.1.2). +""" +from irlc.ex02.dp_model import DPModel +from irlc.ex02.dp import DP_stochastic + +class InventoryDPModel(DPModel): + def __init__(self, N=3): + super().__init__(N=N) + + def A(self, x, k): # Action space A_k(x) + return {0, 1, 2} + + def S(self, k): # State space S_k + return {0, 1, 2} + + def g(self, x, u, w, k): # Cost function g_k(x,u,w) + return u + (x + u - w) ** 2 + + def f(self, x, u, w, k): # Dynamics f_k(x,u,w) + return max(0, min(2, x + u - w )) + + def Pw(self, x, u, k): # Distribution over random disturbances + # TODO: 1 lines missing. + raise NotImplementedError("Implement function body") + + def gN(self, x): + return 0 + +def main(): + inv = InventoryDPModel() + J,pi = DP_stochastic(inv) + print(f"Inventory control optimal policy/value functions") + for k in range(inv.N): + print(", ".join([f" J_{k}(x_{k}={i}) = {J[k][i]:.2f}" for i in inv.S(k)] ) ) + for k in range(inv.N): + print(", ".join([f"pi_{k}(x_{k}={i}) = {pi[k][i]}" for i in inv.S(k)] ) ) + +if __name__ == "__main__": + main()