From 374f80678c0678e07b2cf94acd2052c836115fb1 Mon Sep 17 00:00:00 2001 From: Tue Herlau <tuhe@dtu.dk> Date: Thu, 3 Apr 2025 16:32:57 +0200 Subject: [PATCH] Exercise 9 --- irlc/ex09/__init__.py | 2 + irlc/ex09/gambler.py | 81 ++++++++ irlc/ex09/mdp.py | 301 +++++++++++++++++++++++++++++ irlc/ex09/mdp_warmup.py | 86 +++++++++ irlc/ex09/policy_evaluation.py | 68 +++++++ irlc/ex09/policy_iteration.py | 63 ++++++ irlc/ex09/rl_agent.py | 212 ++++++++++++++++++++ irlc/ex09/small_gridworld.py | 39 ++++ irlc/ex09/value_iteration.py | 73 +++++++ irlc/ex09/value_iteration_agent.py | 42 ++++ 10 files changed, 967 insertions(+) create mode 100644 irlc/ex09/__init__.py create mode 100644 irlc/ex09/gambler.py create mode 100644 irlc/ex09/mdp.py create mode 100644 irlc/ex09/mdp_warmup.py create mode 100644 irlc/ex09/policy_evaluation.py create mode 100644 irlc/ex09/policy_iteration.py create mode 100644 irlc/ex09/rl_agent.py create mode 100644 irlc/ex09/small_gridworld.py create mode 100644 irlc/ex09/value_iteration.py create mode 100644 irlc/ex09/value_iteration_agent.py diff --git a/irlc/ex09/__init__.py b/irlc/ex09/__init__.py new file mode 100644 index 0000000..e753a4f --- /dev/null +++ b/irlc/ex09/__init__.py @@ -0,0 +1,2 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +"""This directory contains the exercises for week 9.""" diff --git a/irlc/ex09/gambler.py b/irlc/ex09/gambler.py new file mode 100644 index 0000000..d7c26a4 --- /dev/null +++ b/irlc/ex09/gambler.py @@ -0,0 +1,81 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online). +""" +from irlc import savepdf +import matplotlib.pyplot as plt +from irlc.ex09.value_iteration import value_iteration +from irlc.ex09.mdp import MDP + +class GamblerMDP(MDP): + r""" + The gamler problem (see description given in (SB18, Example 4.3)) + + See the MDP class for more information about the methods. In summary: + - the state is the amount of money you have. if state = goal or state = 0 the game ends (use this for is_terminal) + - A are the available actions (a list). Note that these depends on the state; see below or example for details. + - Psr are the transitions (see MDP class for documentation) + """ + def __init__(self, goal=100, p_heads=0.4): + super().__init__(initial_state=goal//2) + self.goal = goal + self.p_heads = p_heads + + def is_terminal(self, state): + """ Implement if the state is terminal (0 or self.goal) """ + # TODO: 1 lines missing. + raise NotImplementedError("Return true only if state is terminal.") + + def A(self, s): + r""" Action is the amount you choose to gamle. + You can gamble from 0 and up to the amount of money you have (state), + but not so much you will exceed the goal amount (see (SB18) for details). + In other words, return this as a list, and the number of elements should depend on the state s. """ + # TODO: 1 lines missing. + raise NotImplementedError("Implement function body") + + def Psr(self, s, a): + """ Implement transition probabilities here. + the reward is 1 if you win (obtain goal amount) and otherwise 0. Remember the format should + return a dictionary with entries: + > { (sp, r) : probability } + + You can see the small-gridworld example (see exercise description) for an example of how to use this function, + but now you should keep in mind that since you can win (or not) the dictionary you return should have two entries: + one with a probability of self.p_heads (winning) and one with a probability of 1-self.p_heads (loosing). + """ + # TODO: 4 lines missing. + raise NotImplementedError("Implement function body") + return outcome_dict + +def gambler(): + r""" + Gambler's problem from (SB18, Example 4.3) + """ + mdp = GamblerMDP(p_heads=0.4) + pi, V = value_iteration(mdp, gamma=1., theta=1e-11) + + V = [V[s] for s in mdp.states] + plt.bar(mdp.states, V) + plt.xlabel('Capital') + plt.ylabel('Value Estimates') + plt.title('Final value function (expected return) vs State (Capital)') + plt.grid() + savepdf("gambler_valuefunction") + plt.show() + + y = [pi[s] for s in mdp.nonterminal_states] + plt.bar(mdp.nonterminal_states, y, align='center', alpha=0.5) + plt.xlabel('Capital') + plt.ylabel('Final policy (stake)') + plt.title('Capital vs Final Policy') + plt.grid() + savepdf("gambler_policy") + plt.show() + + +if __name__ == "__main__": + + gambler() diff --git a/irlc/ex09/mdp.py b/irlc/ex09/mdp.py new file mode 100644 index 0000000..f991573 --- /dev/null +++ b/irlc/ex09/mdp.py @@ -0,0 +1,301 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online). +""" +import numpy as np +import gymnasium as gym +from gymnasium import Env +from collections import defaultdict +from tqdm import tqdm +import sys + +class MDP: + r""" + This class represents a Markov Decision Process. It defines three main components: + - The actions available in a given state :math:`A(s)` + - The transition probabilities :math:`p(s', r | s, a)` + - A terminal check to determine if a state :math:`s` is terminal + - A way to specify the initial state: + + - As a single state the MDP always begins in (most common) + - As a general distribution :math:`p(s_0)`. + + In addition to this it allows you to access either + - The set of all states (including terminal states) as ``mdp.states`` + - The set of all non-terminal states as ``mdp.non_terminal_states`` + + .. note:: + The ``states`` and ``non_termianl_states`` are computed lazily. This means that if you don't access them, they won't use memory. + This allows you to specify MDPs with an infinite number of states without running out of memory. + """ + def __init__(self, initial_state=None, verbose=False): + """ + Initialize the MDP. In the case where ``initial_state`` is set to a value :math:`s_0`, the initial state distribution will be + + .. math:: + p(s_0) = 1 + + :param initial_state: An optional initial state. + :param verbose: If ``True``, the class will print out debug information (useful for very large MDPs) + """ + self.verbose=verbose + self.initial_state = initial_state # Starting state s_0 of the MDP. + # The following variables that begin with _ are used to cache computations. The reason why we don't compute them + # up-front is because their computation may be time-consuming and they might not be needed. + self._states = None + self._nonterminal_states = None + self._terminal_states = None + + def is_terminal(self, state) -> bool: + r""" + Determines if a state is terminal (i.e., the environment/model is complete). In (SB18), the terminal + state is written as :math:`s_T`. + + .. runblock:: pycon + + >>> from irlc.gridworld.gridworld_environments import FrozenLake + >>> mdp = FrozenLake().mdp + >>> mdp.is_terminal(mdp.initial_state) # False, obviously. + + + :param state: The state :math:`s` to check + :return: ``True`` if the state is terminal and otherwise ``False``. + """ + return False # Return true if the given state is terminal. + + def Psr(self, state, action) -> dict: + r""" + Represents the transition probabilities: + + .. math:: + P(s', r | s, a) + + When called with state ``state`` and action ``action``, the function returns a dictionary of the form + ``{(s1, r1): p1, (s2, r2): p2, ...}``, so that ``p2`` is the probability of transitioning to ``s2`` (and obtaining + reward ``r2``) given we are in state ``state`` and take action ``action``: + + .. math:: + P(s_2, r_2 | s,a) = p_2 + + An example: + + .. runblock:: pycon + + >>> from irlc.gridworld.gridworld_environments import FrozenLake + >>> mdp = FrozenLake().mdp + >>> transitions = mdp.Psr(mdp.initial_state, 0) # P( ... | s0, a=0) + >>> for (sp, r), p in transitions.items(): + ... print(f"P(s'={sp}, r={r} | s={mdp.initial_state}, a=0) = {p}") + + :param state: The state to compute the transition probabilities in + :param action: The action to compute the transition probabilities in + :return: A dictionary where the keys are state, reward pairs we will transition to, :math:`p(s', r | ...)`, and the values are their probability. + """ + raise NotImplementedError("Return state distribution as a dictionary (see class documentation)") + + def A(self, state) -> list: + r""" + Returns a list of actions available in the given state: + + .. math:: + A(s) + + An example to get the actions in the initial state: + + .. runblock:: pycon + + >>> from irlc.gridworld.gridworld_environments import FrozenLake + >>> mdp = FrozenLake().mdp + >>> mdp.A(mdp.initial_state) + + :param state: State to compute the actions in :math:`s` + :return: The list of available actions :math:`\mathcal A(s) = \{0, 1, ..., n-1\}` + """ + raise NotImplementedError("Return set/list of actions in given state A(s) = {a1, a2, ...}") + + def initial_state_distribution(self): + """ + (**Optional**) specify the initial state distribution. Should return a dictionary of the form: + ``{s0: p0, s1: p1, ..., sn: pn}``, in which case :math:`p(S_0 = s_k) = p_k`. + + You will typically not overwrite this function but just set the initial state. In that case the initial state distribution + is deterministic: + + + .. runblock:: pycon + + >>> from irlc.gridworld.gridworld_environments import FrozenLake + >>> mdp = FrozenLake().mdp + >>> mdp.initial_state_distribution() + + + + :return: An initial state distribution as a dictionary, where the keys are states, and the valuse are their probability. + """ + if self.initial_state is not None: + return {self.initial_state: 1} + else: + raise Exception("Either specify the initial state, or implement this method.") + + @property + def nonterminal_states(self): + r""" + The list of non-terminal states, i.e. :math:`\mathcal{S}` in (SB18) + + + .. runblock:: pycon + + >>> from irlc.gridworld.gridworld_environments import FrozenLake + >>> mdp = FrozenLake().mdp + >>> mdp.nonterminal_states + + :return: The list of non-terminal states :math:`\mathcal{S}` + """ + if self._nonterminal_states is None: + self._nonterminal_states = [s for s in self.states if not self.is_terminal(s)] + return self._nonterminal_states + + @property + def states(self): + r""" + The list of all states including terminal ones, i.e. :math:`\mathcal{S}^+` in (SB18). + The terminal states are those where ``is_terminal(state)`` is true. + + .. runblock:: pycon + + >>> from irlc.gridworld.gridworld_environments import FrozenLake + >>> mdp = FrozenLake().mdp + >>> mdp.states + + :return: The list all states :math:`\mathcal{S}^+` + """ + if self._states is None: + next_chunk = set(self.initial_state_distribution().keys()) + all_states = list(next_chunk) + while True: + new_states = set() + for s in tqdm(next_chunk, file=sys.stdout) if self.verbose else next_chunk: + if self.is_terminal(s): + continue + for a in self.A(s): + new_states = new_states | {sp for sp, r in self.Psr(s, a)} + + new_states = [s for s in new_states if s not in all_states] + if len(new_states) == 0: + break + all_states += new_states + next_chunk = new_states + self._states = list(set(all_states)) + + return self._states + + +def rng_from_dict(d): + """ Helper function. If d is a dictionary {x1: p1, x2: p2, ...} then this will sample an x_i with probability p_i """ + w, pw = zip(*d.items()) # seperate w and p(w) + i = np.random.choice(len(w), p=pw) # Required because numpy cast w to array (and w may contain tuples) + return w[i] + +class MDP2GymEnv(Env): + def A(self, state): + raise Exception("Don't use this function; it is here for legacy reasons") + + def __init__(self, mdp, render_mode=None): + # We ignore this variable in this class, however, the Gridworld environment will check if + # render_mode == "human" and use it to render the environment. See: + # https://younis.dev/blog/render-api/ + self.render_mode = render_mode + self.mdp = mdp + self.state = None + # actions = set + all_actions = set.union(*[set(self.mdp.A(s)) for s in self.mdp.nonterminal_states ]) + n = max(all_actions) - min(all_actions) + 1 + assert isinstance(n, int) + self.action_space = gym.spaces.Discrete(n=n, start=min(all_actions)) + # Make observation space: + states = self.mdp.nonterminal_states + if not hasattr(self, 'observation_space'): + if isinstance(states[0], tuple): + self.observation_space = gym.spaces.Tuple([gym.spaces.Discrete(n+1) for n in np.asarray(states).max(axis=0)]) + else: + print("Could not guess observation space. Set it manually.") + + + def reset(self, seed=None, options=None): + info = {} + if seed is not None: + np.random.seed(seed) + self.action_space.seed(seed) + self.observation_space.seed(seed) + info['seed'] = seed + + ps = self.mdp.initial_state_distribution() + self.state = rng_from_dict(ps) + if self.render_mode == "human": + self.render() + info['mask'] = self._mk_mask(self.state) + return self.state, info + + def step(self, action): + ps = self.mdp.Psr(self.state, action) + self.state, reward = rng_from_dict(ps) + terminated = self.mdp.is_terminal(self.state) + if self.render_mode == "human": + self.render() + info = {'mask': self._mk_mask(self.state)} if not terminated else None + return self.state, reward, terminated, False, info + + def _mk_mask(self, state): + # self.A(state) + mask = np.zeros((self.action_space.n,), dtype=np.int8) + for a in self.mdp.A(state): + mask[a - self.action_space.start] = 1 + return mask + + +class GymEnv2MDP(MDP): + def __init__(self, env): + super().__init__() + self._states = list(range(env.observation_space.n)) + if hasattr(env, 'env'): + env = env.env + self._terminal_states = [] + for s in env.unwrapped.P: + for a in env.unwrapped.P[s]: + for (pr, sp, reward, done) in env.unwrapped.P[s][a]: + if done: + self._terminal_states.append(sp) + + self._terminal_states = set(self._terminal_states) + self.env = env + + def is_terminal(self, state): + return state in self._terminal_states + + def A(self, state): + return list(self.env.unwrapped.P[state].keys()) + + def Psr(self, state, action): + d = defaultdict(float) + for (pr, sp, reward, done) in self.env.unwrapped.P[state][action]: + d[ (sp, reward)] += pr + return d + +if __name__ == '__main__': + """A handful of examples of using the MDP-class in conjunction with a gym environment:""" + env = gym.make("FrozenLake-v1") + mdp = GymEnv2MDP(env) + from irlc.ex09.value_iteration import value_iteration + value_iteration(mdp) + mdp = GymEnv2MDP(gym.make("FrozenLake-v1")) + print("N = ", mdp.nonterminal_states) + print("S = ", mdp.states) + print("Is state 3 terminal?", mdp.is_terminal(3), "is state 11 terminal?", mdp.is_terminal(11)) + state = 0 + print("A(S=0) =", mdp.A(state)) + action = 2 + mdp.Psr(state, action) # Get transition probabilities + for (next_state, reward), Pr in mdp.Psr(state, action).items(): + print(f"P(S'={next_state},R={reward} | S={state}, A={action} ) = {Pr:.2f}") diff --git a/irlc/ex09/mdp_warmup.py b/irlc/ex09/mdp_warmup.py new file mode 100644 index 0000000..aab1ac6 --- /dev/null +++ b/irlc/ex09/mdp_warmup.py @@ -0,0 +1,86 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online). +""" +from irlc.ex09.mdp import MDP + + +def value_function2q_function(mdp : MDP, s, gamma, v : dict) -> dict: + r"""This helper function converts a value function to an action-value function. + + Given a value-function ``v`` and a state ``s``, this function implements the update: + + .. math:: + + Q(s,a) = \mathbb{E}[r + \gamma * v(s') | s, a] = \sum_{r, s'} (r + \gamma v(s') ) p(s', r| s,a) + + as described in (SB18, ). It should return a dictionary of the form:: + + {a1: Q(s,a1), a2: Q(s,a2), ..., an: Q(s,an)} + + where the actions are keys. You can compute these using ``mdp.A(s)``. When done the following should work:: + + Qs = value_function2q_function(mdp, s, gamma, v) + Qs[a] # This is the Q-value Q(s,a) + + Hints: + + * Remember that ``v[s'] = 0`` if ``s'`` is a terminal state (this is explained in (SB18)). + + :param mdp: An MDP instance. Use this to compute :math:`p(s', r| s,a)` + :param s: A state + :param gamma: The discount factor :math:`\gamma` + :param v: The value function represented as a dictionary. + :return: A dictionary representing :math:`Q` of the form ``{a1: Q(s,a1), a2: Q(s,a2), ..., an: Q(s,an)}`` + """ + # TODO: 1 lines missing. + # TODO: 1 lines missing. + raise NotImplementedError("Implement function body") + return q_dict + +def expected_reward(mdp : MDP, s, a) -> float: + # TODO: 1 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + return expected_reward + +def q_function2value_function(policy : dict, Q : dict, s) -> float: + # TODO: 1 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + return V_s + +if __name__ == "__main__": + from irlc.gridworld.gridworld_environments import FrozenLake + mdp = FrozenLake(living_reward=0.2).mdp # Get the MDP of this environment. + + ## Part 1: Expected reward + s0 = mdp.initial_state + s0 = (0, 3) # initial state + a = 3 # Go east. + print("Expected reward E[r | s0, a] =", expected_reward(mdp, s=s0, a=0), "should be 0.2") + print("Expected reward E[r | s0, a] =", expected_reward(mdp, s=(1, 2), a=0), "should be 0") + + + ## Part 2 + # First let's create a non-trivial value function + V = {} + for s in mdp.nonterminal_states: + V[s] = s[0] + 2*s[1] + print("Value function is", V) + # Compute the corresponding Q(s,a)-values in state s0: + q_ = value_function2q_function(mdp, s=s0, gamma=0.9, v=V) + print(f"Q-values in {s0=} is", q_) + + ## Part 3 + # Create a non-trivial Q-function for this problem. + Q = {} + for s in mdp.nonterminal_states: + for a in mdp.A(s): + Q[s,a] = s[0] + 2*s[1] - 10*a # The particular values are not important in this example + # Create a policy. In this case pi(a=3) = 0.4. + pi = {0: 0.2, + 1: 0.2, + 2: 0.2, + 3: 0.4} + print(f"Value-function in {s0=} is", q_function2value_function(pi, Q, s=s0)) diff --git a/irlc/ex09/policy_evaluation.py b/irlc/ex09/policy_evaluation.py new file mode 100644 index 0000000..0af5527 --- /dev/null +++ b/irlc/ex09/policy_evaluation.py @@ -0,0 +1,68 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online). +""" +from collections import defaultdict +import numpy as np +import matplotlib.pyplot as plt +from irlc.ex09.mdp_warmup import value_function2q_function +from irlc.ex09.small_gridworld import SmallGridworldMDP, plot_value_function +from irlc import savepdf + + +def policy_evaluation(pi, mdp, gamma=.99, theta=0.00001): + r""" Implements the iterative policy-evaluation algorithm ((SB18, Section 4.1)). + The algorithm is given a policy pi which is represented as a dictionary so that + + > pi[s][a] = p + + is the probability p of taking action a in state s. The 'mdp' is a MDP-instance and the other terms have the same meaning as in the algorithm. + It should return a dictionary v so that + > v[s] + is the value-function evaluated in state s. I recommend using the qs_-function defined above. + """ + v = defaultdict(float) + Delta = theta #Initialize the 'Delta'-variable to a large value to make sure the first iteration of the method runs. + while Delta >= theta: # Outer loop in (SB18) + Delta = 0 # Remember to update Delta (same meaning as in (SB18)) + # Remember that 'S' in (SB18) is actually just the set of non-terminal states (NOT including terminal states!) + for s in mdp.nonterminal_states: # See the MDP class if you are curious about how this variable is defined. + """ Implement the main body of the policy evaluation algorithm here. You can do this directly, + or implement (and use) the value_function2q_function-function (consider what it does and compare to the algorithm). + If you do so, note that value_function2q_function(mdp, s, gamma, v) computes the equivalent of Q(s,a) (as a dictionary), + and in the algorithm, you then need to compute the expectation over pi: + > sum_a pi(a|s) Q(s,a) + In code it would be more akin to + q = value_function2q_function(...) + sum_a pi[s][a] * q[a] + + Don't be afraid to use a few more lines than I do. + """ + # TODO: 2 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + r""" stop condition. v_ is the current value of the value function (see algorithm listing in (SB18)) which you need to update. """ + Delta = max(Delta, np.abs(v_ - v[s])) + return v + + +if __name__ == "__main__": + mdp = SmallGridworldMDP() + """ + Create the random policy pi0 below. The policy is defined as a nested dict, i.e. + + > pi0[s][a] = (probability to take action a in state s) + + """ + pi0 = {s: {a: 1/len(mdp.A(s)) for a in mdp.A(s) } for s in mdp.nonterminal_states } + V = policy_evaluation(pi0, mdp, gamma=1) + plot_value_function(mdp, V) + plt.title("Value function using random policy") + savepdf("policy_eval") + plt.show() + + expected_v = np.array([0, -14, -20, -22, + -14, -18, -20, -20, + -20, -20, -18, -14, + -22, -20, -14, 0]) diff --git a/irlc/ex09/policy_iteration.py b/irlc/ex09/policy_iteration.py new file mode 100644 index 0000000..380b247 --- /dev/null +++ b/irlc/ex09/policy_iteration.py @@ -0,0 +1,63 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online). +""" +import numpy as np +from irlc.ex09.small_gridworld import SmallGridworldMDP +import matplotlib.pyplot as plt +from irlc.ex09.policy_evaluation import policy_evaluation +from irlc.ex09.mdp_warmup import value_function2q_function + +def policy_iteration(mdp, gamma=1.0): + r""" + Implement policy iteration (see (SB18, Section 4.3)). + + Note that policy iteration only considers deterministic policies. we will therefore use the shortcut by representing the policy pi + as a dictionary (similar to the DP-problem in week 2!) so that + > a = pi[s] + is the action in state s. + + """ + pi = {s: np.random.choice(mdp.A(s)) for s in mdp.nonterminal_states} + policy_stable = False + V = None # Sutton has an initialization-step, but it can actually be skipped if we intialize the policy randomly. + while not policy_stable: + # Evaluate the current policy using your code from the previous exercise. + # The main complication is that we need to transform our deterministic policy, pi[s], into a stochastic one pi[s][a]. + # It will be defined as: + # >>> pi_prob[s][a] = 1 if a = pi[s] and otherwise 0. + pi_prob = {s: {a: 1 if pi[s] == a else 0 for a in mdp.A(s)} for s in mdp.nonterminal_states} + V = policy_evaluation(pi_prob, mdp, gamma) + V = policy_evaluation( {s: {pi[s]: 1} for s in mdp.nonterminal_states}, mdp, gamma) + r""" Implement the method. This is step (3) in (SB18). """ + policy_stable = True # Will be set to False if the policy pi changes + r""" Implement the steps for policy improvement here. Start by writing a for-loop over all non-terminal states + you can see the policy_evaluation function for how to do this, but + I recommend looking at the property mdp.nonterminal_states (see MDP class for more information). + Hints: + * In the algorithm in (SB18), you need to perform an argmax_a over what is actually Q-values. The function + value_function2q_function(mdp, s, gamma, V) can compute these. + * The argmax itself, assuming you follow the above procedure, involves a dictionary. It can be computed + using methods similar to those we saw in week2 of the DP problem. + It is not a coincidence these algorithms are very similar -- if you think about it, the maximization step closely resembles the DP algorithm! + """ + # TODO: 6 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + return pi, V + +if __name__ == "__main__": + mdp = SmallGridworldMDP() + pi, v = policy_iteration(mdp, gamma=0.99) + expected_v = np.array([ 0, -1, -2, -3, + -1, -2, -3, -2, + -2, -3, -2, -1, + -3, -2, -1, 0]) + + from irlc.ex09.small_gridworld import plot_value_function + plot_value_function(mdp, v) + plt.title("Value function using policy iteration to find optimal policy") + from irlc import savepdf + savepdf("policy_iteration") + plt.show() diff --git a/irlc/ex09/rl_agent.py b/irlc/ex09/rl_agent.py new file mode 100644 index 0000000..b0399a1 --- /dev/null +++ b/irlc/ex09/rl_agent.py @@ -0,0 +1,212 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import numpy as np +from irlc.utils.common import defaultdict2 +from irlc import Agent + +class TabularAgent(Agent): + """ + This helper class will simplify the implementation of most basic reinforcement learning. Specifically it provides: + + - A :math:`Q(s,a)`-table data structure + - An epsilon-greedy exploration method + + The code for the class is very simple, and I think it is a good idea to at least skim it. + + The Q-data structure can be used a follows: + + .. runblock:: pycon + + >>> from irlc.ex09.rl_agent import TabularAgent + >>> from irlc.gridworld.gridworld_environments import BookGridEnvironment + >>> env = BookGridEnvironment() + >>> agent = TabularAgent(env) + >>> state, info = env.reset() # Get the info-dictionary corresponding to s + >>> agent.Q[state, 1] = 2.5 # Update a Q-value; action a=1 is now optimal. + >>> agent.Q[state, 1] # Check it has indeed been updated. + >>> agent.Q[state, 0] # Q-values are 0 by default. + >>> agent.Q.get_optimal_action(state, info) # Note we pass along the info-dictionary corresopnding to this state + + .. note:: + The ``get_optimal_action``-function requires an ``info`` dictionary. This is required since the info dictionary + contains information about which actions are available. To read more about the Q-values, see :class:`~irlc.ex09.rl_agent.TabularQ`. + """ + def __init__(self, env, gamma=0.99, epsilon=0): + r""" + Initialize a tabular environment. For convenience, it stores the discount factor :math:`\gamma` and + exploration parameter :math:`\varepsilon` for epsilon-greedy exploration. Access them as e.g. ``self.gamma`` + + When you implement an agent and overwrite the ``__init__``-method, you should include a call such as + ``super().__init__(gamma, epsilon)``. + + :param env: The gym environment + :param gamma: The discount factor :math:`\gamma` + :param epsilon: Exploration parameter :math:`\varepsilon` for epsilon-greedy exploration + """ + super().__init__(env) + self.gamma, self.epsilon = gamma, epsilon + self.Q = TabularQ(env) + + def pi_eps(self, s, info): + """ + Performs :math:`\\varepsilon`-greedy exploration with :math:`\\varepsilon =` ``self.epsilon`` and returns the + action. Recall this means that with probability :math:`\\varepsilon` it returns a random action, and otherwise + it returns an action associated with a maximal Q-value (:math:`\\arg\\max_a Q(s,a)`). An example: + + .. runblock:: pycon + + >>> from irlc.ex09.rl_agent import TabularAgent + >>> from irlc.gridworld.gridworld_environments import BookGridEnvironment + >>> env = BookGridEnvironment() + >>> agent = TabularAgent(env) + >>> state, info = env.reset() + >>> agent.pi_eps(state, info) # Note we pass along the info-dictionary corresopnding to this state + + .. note:: + The ``info`` dictionary is used to mask (exclude) actions that are not possible in the state. + It is similar to the info dictionary in ``agent.pi(s,info)``. + + :param s: A state :math:`s_t` + :param info: The corresponding ``info``-dictionary returned by the gym environment + :return: An action computed using :math:`\\varepsilon`-greedy action selection based the Q-values stored in the ``self.Q`` class. + """ + if info is not None and 'seed' in info: # In case info contains a seed, reset the random number generator. + np.random.seed(info['seed']) + return Agent.pi(self, s, k=0, info=info) if np.random.rand() < self.epsilon else self.Q.get_optimal_action(s, info) + + +class ValueAgent(TabularAgent): + """ + This is a simple wrapper class around the Agent class above. It fixes the policy and is therefore useful for doing + value estimation. + """ + def __init__(self, env, gamma=0.95, policy=None, v_init_fun=None): + self.env = env + self.policy = policy # policy to evaluate + """ self.v holds the value estimates. + Initially v[s] = 0 unless v_init_fun is given in which case v[s] = v_init_fun(s). """ + self.v = defaultdict2(float if v_init_fun is None else v_init_fun) + super().__init__(env, gamma=gamma) + self.Q = None # Blank out the Q-values which will not be used. + + def pi(self, s, k, info=None): + return TabularAgent.pi(self, s, k, info) if self.policy is None else self.policy(s) + + def value(self, s): + return self.v[s] + +def _masked_actions(action_space, mask): + """Helper function which applies a mask to the action space.""" + from irlc.utils.common import DiscreteTextActionSpace + if isinstance(action_space, DiscreteTextActionSpace): + return [a for a in range(action_space.n) if mask[a] == 1] + else: + return [a for a in range(action_space.n) if mask[a - action_space.start] == 1] + + +class TabularQ: + """ + This is a helper class for storing Q-values. It is used by the :class:`~ircl.ex09.rl_agent.TabularAgent` to store + Q-values where it can be be accessed as ``self.Q[s,a]``. + """ + def __init__(self, env): + """ + Initialize the table. It requires a gym environment to know how many actions there are for each state. + :param env: A gym environment. + """ + self._known_masks = {} # Cache the known action masks. + + def q_default(s): + if s in self._known_masks: + return {a: 0 for a in range(self.env.action_space.n) if self._known_masks[s][a- self.env.action_space.start] == 1} + else: + return {a: 0 for a in range(self.env.action_space.n)} + + # qfun = lambda s: OrderedDict({a: 0 for a in (env.P[s] if hasattr(env, 'P') else range(env.action_space.n))}) + self.q_ = defaultdict2(lambda s: q_default(s)) + self.env = env + + def get_Qs(self, state, info_s=None): + """ + Get a list of all known Q-values for this particular state. That is, in a given state, it will return the two + lists: + + .. math:: + \\begin{bmatrix} a_1 \\\\ a_2 \\\\ \\vdots \\\\ a_k \\end{bmatrix}, \\quad + \\begin{bmatrix} Q(s,a_1) \\\\ Q(s,a_1) \\\\ \\vdots \\\\ Q(s,a_k) \\end{bmatrix} \\\\ + + the ``info_s`` parameter will ensure actions are correctly masked. An example of how to use this function from + a policy: + + .. runblock:: pycon + + >>> from irlc.ex09.rl_agent import TabularAgent + >>> class MyAgent(TabularAgent): + ... def pi(self, s, k, info=None): + ... actions, q_values = self.Q.get_Qs(s, info) + + :param state: The state to query + :param info_s: The info-dictionary returned by the environment for this state. Used for action-masking. + :return: + - actions - A tuple containing all actions available in this state ``(a_1, a_2, ..., a_k)`` + - Qs - A tuple containing all Q-values available in this state ``(Q[s,a1], Q[s, a2], ..., Q[s,ak])`` + """ + if info_s is not None and 'mask' in info_s: + if state not in self._known_masks: + self._known_masks[state] = info_s['mask'] + # Probably a good idea to check the Q-values are okay... + avail_actions = _masked_actions(self.env.action_space, info_s['mask']) + self.q_[state] = {a: self.q_[state][a] for a in avail_actions} + + (actions, Qa) = zip(*self.q_[state].items()) + return tuple(actions), tuple(Qa) + + def get_optimal_action(self, state, info_s): + """ + For a given state ``state``, this function returns the optimal action for that state. + + .. math:: + a^* = \\arg\\max_a Q(s,a) + + An example: + .. runblock:: pycon + + >>> from irlc.ex09.rl_agent import TabularAgent + >>> class MyAgent(TabularAgent): + ... def pi(self, s, k, info=None): + ... a_star = self.Q.get_optimal_action(s, info) + + + :param state: State to find the optimal action in :math:`s` + :param info_s: The ``info``-dictionary corresponding to this state + :return: The optimal action according to the Q-table :math:`a^*` + """ + actions, Qa = self.get_Qs(state, info_s) + a_ = np.argmax(np.asarray(Qa) + np.random.rand(len(Qa)) * 1e-8) + return actions[a_] + + def _chk_mask(self, s, a): + if s in self._known_masks: + mask = self._known_masks[s] + if mask[a - self.env.action_space.start] == 0: + raise Exception(f" Invalid action. You tried to access Q[{s}, {a}], however the action {a} has been previously masked and therefore cannot exist in this state. The mask for {s} is mask={mask}.") + + def __getitem__(self, state_comma_action): + s, a = state_comma_action + self._chk_mask(s, a) + return self.q_[s][a] + + def __setitem__(self, state_comma_action, q_value): + s, a = state_comma_action + self._chk_mask(s, a) + self.q_[s][a] = q_value + + def to_dict(self): + """ + This helper function converts the known Q-values to a dictionary. This function is only used for + visualization purposes in some of the examples. + + :return: A dictionary ``q`` of all known Q-values of the form ``q[s][a]`` + """ + # Convert to a regular dictionary + d = {s: {a: Q for a, Q in Qs.items() } for s,Qs in self.q_.items()} + return d diff --git a/irlc/ex09/small_gridworld.py b/irlc/ex09/small_gridworld.py new file mode 100644 index 0000000..3271171 --- /dev/null +++ b/irlc/ex09/small_gridworld.py @@ -0,0 +1,39 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +import numpy as np +from irlc.ex09.mdp import MDP +import seaborn as sns + +# action space available to the agent +UP,RIGHT, DOWN, LEFT = 0, 1, 2, 3 +class SmallGridworldMDP(MDP): + def __init__(self, rows=4, cols=4): + self.rows, self.cols = rows, cols # Number of rows, columns. + super().__init__(initial_state=(rows//2, cols//2) ) # Initial state is in the middle of the board. + + def A(self, state): + return [UP, DOWN, RIGHT, LEFT] # All four directions available. + + def Psr(self, state, action): + row, col = state # state is in the format state = (row, col) + if action == UP: row -= 1 + if action == DOWN: row += 1 + if action == LEFT: col += 1 + if action == RIGHT: col -= 1 + + col = min(self.cols-1, max(col, 0)) # Check boundary conditions. + row = min(self.rows-1, max(row, 0)) + reward = -1 # Always get a reward of -1 + next_state = (row, col) + # Note that P(next_state, reward | state, action) = 1 because environment is deterministic + return {(next_state, reward): 1} + + def is_terminal(self, state): + row, col = state + return (row == 0 and col == 0) or (row == self.rows-1 and col == self.cols-1) + + +def plot_value_function(env, v): + A = np.zeros((env.rows, env.cols)) + for (row, col) in env.nonterminal_states: + A[row, col] = v[(row,col)] + sns.heatmap(A, cmap="YlGnBu", annot=True, cbar=False, square=True, fmt='g') diff --git a/irlc/ex09/value_iteration.py b/irlc/ex09/value_iteration.py new file mode 100644 index 0000000..f324203 --- /dev/null +++ b/irlc/ex09/value_iteration.py @@ -0,0 +1,73 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +""" + +References: + [SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online). +""" +import matplotlib.pyplot as plt +from collections import defaultdict +import numpy as np +from irlc.ex09.mdp_warmup import value_function2q_function +from irlc import savepdf + +def value_iteration(mdp, gamma=.99, theta=0.0001, max_iters=10 ** 6, verbose=False): + r"""Implement the value-iteration algorithm defined in (SB18, Section 4.4). + The inputs should be self-explanatory given the pseudo-code. + + I have also included a max_iters variable which represents an upper bound on the total number of iterations. This is useful + if you want to check what the algorithm does after a certain (e.g. 1 or 2) steps. + + The verbose-variable makes the algorithm print out the biggest change in the value-function in a single step. + This is useful if you run it on a large problem and want to know how much time remains, or simply get an idea of + how quickly it converges. + """ + V = defaultdict(lambda: 0) # value function + for i in range(max_iters): + Delta = 0 + for s in mdp.nonterminal_states: + """ Perform the update the value-function V[s] here for the given state. + Note that this has a lot of similarity to the policy-evaluation algorithm, and you can re-use + a lot of that solution, including value_function2q_function(...) (assuming you used that function). """ + # TODO: 2 lines missing. + raise NotImplementedError("Complete the algorithm here.") + if verbose: + print(i, Delta) + if Delta < theta: + break + # Turn the value-function into a policy. It implements the last line of the algorithm. + pi = values2policy(mdp, V, gamma) + return pi, V + +def values2policy(mdp, V, gamma): + r""" Turn the value-function V into a policy. The value function V is implemented as a dictionary so that + > value = V[s] + is the value-function in state s. + The procedure you implement is the very last line of the value-iteration algorithm (SB18, Section 4.4), and it should return + a policy pi as a dictionary so that + > a = pi[s] + is the action in state s. + + Note once again you can re-use the qs_-function. and the argmax -- in fact, the solution is very similar to your solution to the + policy-iteration problem in policy_iteration.py. + As you have properly noticed, even though we implement different algorithms, they are all build using the same + building-block. + """ + pi = {} + for s in mdp.nonterminal_states: + # Create the policy here. pi[s] = a is the action to be taken in state s. + # You can use the qs_ helper function to simplify things and perhaps + # re-use ideas from the dp.py problem from week 2. + # TODO: 2 lines missing. + raise NotImplementedError("Insert your solution and remove this error.") + return pi + +if __name__ == "__main__": + import seaborn as sns + from irlc.ex09.small_gridworld import SmallGridworldMDP, plot_value_function + env = SmallGridworldMDP() + policy, v = value_iteration(env, gamma=0.99, theta=1e-6) + plot_value_function(env, v) + + plt.title("Value function obtained using value iteration to find optimal policy") + savepdf("value_iteration") + plt.show() diff --git a/irlc/ex09/value_iteration_agent.py b/irlc/ex09/value_iteration_agent.py new file mode 100644 index 0000000..782e65e --- /dev/null +++ b/irlc/ex09/value_iteration_agent.py @@ -0,0 +1,42 @@ +# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text. +from irlc.ex09.value_iteration import value_iteration +from irlc import TabularAgent +import numpy as np + + +class ValueIterationAgent(TabularAgent): + def __init__(self, env, mdp=None, gamma=1, epsilon=0, **kwargs): + super().__init__(env) + self.epsilon = epsilon + # TODO: 1 lines missing. + raise NotImplementedError("Call the value_iteration function and store the policy for later.") + + def pi(self, s, k, info=None): + """ With probability (1-epsilon), the take optimal action as computed using value iteration + With probability epsilon, take a random action. You can do this using return self.random_pi(s) + """ + if np.random.rand() < self.epsilon: + return super().pi(s, k, info) # Recall that by default the policy takes random actions. + else: + """ Return the optimal action here. This should be computed using value-iteration. + To speed things up, I recommend calling value-iteration from the __init__-method and store the policy. """ + # TODO: 1 lines missing. + raise NotImplementedError("Compute and return optimal action according to value-iteration.") + return action + + def __str__(self): + return f"ValueIteration(epsilon={self.epsilon})" + + +if __name__ == "__main__": + from irlc.gridworld.gridworld_environments import SuttonCornerGridEnvironment + env = SuttonCornerGridEnvironment(living_reward=-1, render_mode='human') + from irlc import train, interactive + # Note you can access the MDP for a gridworld using env.mdp. The mdp will be an instance of the MDP class we have used for planning so far. + agent = ValueIterationAgent(env, mdp=env.mdp) # Make a ValueIteartion-based agent + # Visualize & interactivity. Press P or space to follow the policy. + agent.Q = None # This ensures that the value function is visualized. + env, agent = interactive(env, agent) + train(env, agent, num_episodes=20) # Train for 100 episodes + env.savepdf("smallgrid.pdf") # Take a snapshot of the final configuration + env.close() # Whenever you use a VideoMonitor, call this to avoid a dumb openglwhatever error message on exit -- GitLab