Skip to content
Snippets Groups Projects
Commit 6ca50716 authored by tuhe's avatar tuhe
Browse files

Semester start

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1028 additions and 0 deletions
File added
File added
File added
File added
# Folder for the exam and midterms
Before the exam:
- Ensure that the `irlc`-code generally works (you can run exercises, the packages we use such as `gymnasium` or `numpy` are installed, etc.)
- You have no problem running the various `unitgrade`-test scripts and generating `.token`-files
During the exam:
- Download a `.zip` file with the code from the digital exam
- For the midterm, you can find the file on DTU Learn
- The `zip` file will contain the toolbox code including solutions. It will also contain a directory:
```bash
irlc/exam/exam2024spring
```
- This directory contains the code you need to work on for the exam. Replace the directory on your local computer with this directory and you should be all set up
- The `.zip` file will also contain solutions to nearly all exercises. Use these if benefits you.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from collections import defaultdict
import numpy as np
from irlc import TabularAgent # , PlayWrapper, VideoMonitor, train
from irlc.ex09.mdp_warmup import value_function2q_function
class ValueIterationAgent2(TabularAgent):
def __init__(self, env, gamma=.99, epsilon=0, theta=1e-5, only_current_state=False):
self.v = defaultdict(lambda: 0)
self.steps = 0
self.mdp = env.mdp
self.only_current_state = only_current_state
super().__init__(env, gamma, epsilon=epsilon)
def pi(self, s, k, info=None):
# TODO: 2 lines missing.
raise NotImplementedError("Implement function body")
return self.random_pi(s) if np.random.rand() < self.epsilon else a
@property
def label(self):
label = f"Value iteration after {self.steps} steps"
return label
def v2Q(self, s): # used for rendering right now
return value_function2q_function(self.mdp, s, self.gamma, self.v)
def train(self, s, a, r, sp, done=False, info_sp=None):
delta = 0
v2 = {}
for s in self.env.P.keys():
v, v2[s] = self.v[s], max(value_function2q_function(self.mdp, s, self.gamma, self.v).values()) if len(self.mdp.A(s)) > 0 else 0
delta = max(delta, np.abs(v - self.v[s]))
self.v = v2
for s in self.mdp.nonterminal_states:
for a in self.mdp.A(s):
self.Q[s,a] = self.v2Q(s)[a]
self.delta = delta
self.steps += 1
def __str__(self):
return f"VIAgent_{self.gamma}"
class PolicyEvaluationAgent2(TabularAgent):
def __init__(self, env, mdp=None, gamma=0.99, steps_between_policy_improvement=10, only_update_current=False):
if mdp is None:
mdp = env.mdp
self.mdp = mdp
self.v = defaultdict(lambda: 0)
self.imp_steps = 0
self.steps_between_policy_improvement = steps_between_policy_improvement
self.steps = 0
self.policy = {}
self.only_update_current = only_update_current
for s in mdp.nonterminal_states:
self.policy[s] = {}
for a in mdp.A(s):
self.policy[s][a] = 1/len(mdp.A(s))
super().__init__(env, gamma)
def pi(self, s,k, info=None):
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
return np.random.choice(a, p=pa)
def v2Q(self, s): # used for rendering right now
return value_function2q_function(self.mdp, s, self.gamma, self.v)
@property
def label(self):
if self.steps_between_policy_improvement is None:
label = f"Policy evaluation after {self.steps} steps"
else:
dd = self.steps % self.steps_between_policy_improvement == 0
# print(dd)
label = f"PI after {self.steps} steps/{self.imp_steps-dd} policy improvements"
return label
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
if not self.only_update_current:
v2 = {}
for s in self.mdp.nonterminal_states:
q = value_function2q_function(self.mdp, s, self.gamma, self.v)
if len(q) == 0:
v2[s] = 0
else:
v2[s] = sum( [qv * self.policy[s][a] for a, qv in q.items()] )
for s in self.mdp.nonterminal_states:
for a,q in self.v2Q(s).items():
self.Q[s,a] = q
for k, v in v2.items():
self.v[k] = v2[k]
else:
# Only update Q-value in current state:
Q_ = 0
# print(a)
for (sp, r), p in self.mdp.Psr(s, a).items():
Q_ += p*(r + (0 if self.mdp.is_terminal(sp) else sum([self.Q[sp, ap]*pa for ap, pa in self.policy[sp].items()]) ))
# Q_ += p * (r + (0 if self.mdp.is_terminal(sp) else sum(
# [self.Q[sp, ap] * pa for ap, pa in self.policy[sp].items()])))
self.Q[s, a] = Q_
v_ = 0
for a in self.mdp.A(s):
for (sp, r), p in self.mdp.Psr(s, a).items():
v_ += self.policy[s][a] * (self.v[sp] * self.gamma + r)*p
self.v[s] = v_
if self.steps_between_policy_improvement is not None and (self.steps+1) % self.steps_between_policy_improvement == 0:
self.policy = {}
for s in self.mdp.nonterminal_states:
q = value_function2q_function(self.mdp, s, self.gamma, self.v)
if len(q) == 0:
continue
a_ = max(q, key=q.get) # optimal action
self.policy[s] = {}
for a in self.mdp.A(s):
self.policy[s][a] = 1 if q[a] == max(q.values()) else 0 #if a == a_ else 0
n = sum(self.policy[s].values())
for a in self.policy[s]:
self.policy[s][a] *= 1/n
self.imp_steps += 1
self.steps += 1
def __str__(self):
return f"PIAgent_{self.gamma}"
class ValueIterationAgent3(TabularAgent):
def __init__(self, env, mdp=None, epsilon=0, gamma=0.99, steps_between_policy_improvement=10, only_update_current=False):
if mdp is None:
mdp = env.mdp
self.mdp = mdp
self.v = defaultdict(lambda: 0)
self.imp_steps = 0
self.steps_between_policy_improvement = steps_between_policy_improvement
self.steps = 0
self.policy = {}
self.only_update_current = only_update_current
self.v = defaultdict(float)
for s in mdp.nonterminal_states:
self.policy[s] = {}
for a in mdp.A(s):
self.policy[s][a] = 1/len(mdp.A(s))
super().__init__(env, gamma, epsilon=epsilon)
def pi(self, s,k, info=None):
from irlc import Agent
if np.random.rand() <self.epsilon:
return Agent.pi(self, s, k=k, info=info)
a, pa = zip(*self.policy[s].items())
return np.random.choice(a, p=pa)
def v2Q(self, s): # used for rendering right now
if not self.only_update_current:
a,q = self.Q.get_Qs(s)
return {a_: q_ for a_, q_ in zip(a,q)}
else:
return value_function2q_function(self.mdp, s, self.gamma, self.v)
def vi_q(self, s, a):
Q_ = 0
for (sp, r), p in self.mdp.Psr(s, a).items():
if self.mdp.is_terminal(sp):
QT = 0
else:
qvals = [self.Q[sp, a_] for a_ in self.mdp.A(sp)]
QT = max(qvals) * (1-self.epsilon) + self.epsilon*np.mean(qvals)
Q_ += p * (r + self.gamma * QT)
return Q_
@property
def label(self):
label = f"Value Iteration after {self.steps} steps"
return label
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
s_ = s
if not self.only_update_current:
q_ = dict()
for s in self.mdp.nonterminal_states:
for a in self.mdp.A(s):
q_[s,a] = self.vi_q(s, a)
for (s,a), q in q_.items():
self.Q[s,a] = q
else:
# Only update Q-value in current state:
# s = s_
qq = value_function2q_function(self.mdp, s, self.gamma, self.v)
self.v[s] = max(qq.values())
self.Q[s, a] = self.vi_q(s,a)
for s in self.mdp.nonterminal_states:
# q = qs_(self.mdp, s, self.gamma, self.v)
# if len(q) == 0:
# continue
# a_ = max(q, key=q.get) # optimal action
self.policy[s] = {}
qs = [self.Q[s,a] for a in self.mdp.A(s)]
for a in self.mdp.A(s):
self.policy[s][a] = 1 if self.Q[s,a] >= max(qs)-1e-6 else 0 #if a == a_ else 0
S = sum(self.policy[s].values())
for a in self.mdp.A(s):
self.policy[s][a] = self.policy[s][a] / S
if not self.only_update_current:
self.v[s] = max([self.Q[s, a_] for a_ in self.mdp.A(s)])
self.steps += 1
def __str__(self):
return f"PIAgent_{self.gamma}"
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import numpy as np
from collections import defaultdict
from gymnasium.envs.toy_text.frozen_lake import FrozenLakeEnv
from gymnasium.spaces.discrete import Discrete
from irlc.ex09.mdp import MDP2GymEnv
from irlc.gridworld.gridworld_mdp import GridworldMDP, FrozenGridMDP
from irlc import Timer
from gymnasium.spaces.multi_discrete import MultiDiscrete
import pygame
grid_cliff_grid = [[' ',' ',' ',' ',' ', ' ',' ',' ',' ',' ', ' '],
[' ',' ',' ',' ',' ', ' ',' ',' ',' ',' ', ' '],
['S',-100, -100, -100, -100,-100, -100, -100, -100, -100, 0]]
grid_cliff_grid2 = [[' ',' ',' ',' ',' '],
['S',' ',' ',' ',' '],
[-100,-100, -100, -100, 0]]
grid_discount_grid = [[' ',' ',' ',' ',' '],
[' ','#',' ',' ',' '],
[' ','#', 1,'#', 10],
['S',' ',' ',' ',' '],
[-10,-10, -10, -10, -10]]
grid_bridge_grid = [[ '#',-100, -100, -100, -100, -100, '#'],
[ 1, 'S', ' ', ' ', ' ', ' ', 10],
[ '#',-100, -100, -100, -100, -100, '#']]
grid_book_grid = [[' ',' ',' ',+1],
[' ','#',' ',-1],
['S',' ',' ',' ']]
grid_maze_grid = [[' ',' ',' ', +1],
['#','#',' ','#'],
[' ','#',' ',' '],
[' ','#','#',' '],
['S',' ',' ',' ']]
sutton_corner_maze = [[ 1, ' ', ' ', ' '],
[' ', ' ', ' ', ' '],
[' ', 'S', ' ', ' '],
[' ', ' ', ' ', 1]]
# A big yafcport open maze.
grid_open_grid = [[' ']*8 for _ in range(5)]
grid_open_grid[0][0] = 'S'
grid_open_grid[-1][-1] = 1
class GridworldEnvironment(MDP2GymEnv):
metadata = {
'render_modes': ['human', 'rgb_array'],
'render_fps': 1000,
}
def get_keys_to_action(self):
return {(pygame.K_LEFT,): GridworldMDP.WEST, (pygame.K_RIGHT,): GridworldMDP.EAST,
(pygame.K_UP,): GridworldMDP.NORTH, (pygame.K_DOWN,): GridworldMDP.SOUTH}
def _get_mdp(self, grid, uniform_initial_state=False):
return GridworldMDP(grid, living_reward=self.living_reward)
def __init__(self, grid=None, uniform_initial_state=True, living_reward=0,zoom=1, view_mode=0, render_mode=None, print_states=False,
frames_per_second=None,
**kwargs):
self.print_states = print_states
self.living_reward = living_reward
mdp = self._get_mdp(grid)
self.render_mode = render_mode
super().__init__(mdp, render_mode=render_mode)
self.action_space = Discrete(4)
# self.observation_space = MultiDiscrete([mdp.height, mdp.width]) # N.b. the state space does not contain the terminal state.
self.render_episodes = 0
self.render_steps = 0
self.timer = Timer()
self.view_mode = view_mode
self.agent = None # If this is set, the environment will try to render the internal state of the agent.
# It is a little hacky, it allows us to make the visualizations etc.
# Set up rendering if required.
self.display_pygame = None
self.zoom = zoom # Save zoom level.
self.total_reward = 0
self.frames_per_second = frames_per_second
def _step(*args, **kwargs):
s = self.state
o = type(self).step(self, *args, **kwargs)
done = o[2]
a = args[0]
self.total_reward += o[1]
self.render_steps += 1
self.render_episodes += done
if self.print_states:
if isinstance(self, FrozenLake):
pr = f" This occurred with probability: P(s', r | s, a) = {self.mdp.Psr(s, a)[(o[0], o[1])]:.2f}."
else:
pr = ""
if done:
pt = f" Total reward for this episode was {self.total_reward}."
else:
pt = ""
print(f"s={s}, a={a} --> s'={o[0]}, r={o[1]}. {pr}{pt}")
return o
self.step = _step
def reset(self, *args, **kwargs):
o = super().reset(*args, **kwargs)
self.total_reward = 0
if self.print_states:
print(f"Starting in state s={o[0]}")
return o
def keypress(self, key):
if key.unicode == 'm': # Change the view mode.
self.view_mode += 1
self.render()
return
if key == 116: # This may easily not be used.
self.view_mode += 1
self.render()
def render(self):
if self.display_pygame is None:
from irlc.gridworld.gridworld_graphics_display import GraphicsGridworldDisplay
self.display_pygame = GraphicsGridworldDisplay(self.mdp, size=int(150 * self.zoom), frames_per_second=self.frames_per_second) # last item is grid size
agent = self.agent
label = None
method_label = agent.method if hasattr(agent, 'method') else ''
if label is None and len(method_label) > 0:
label = f"{method_label} AFTER {self.render_steps} STEPS"
state = self.state
avail_modes = []
if agent != None:
label = (agent.label if hasattr(agent, 'label') else label if label is not None else '') #if label is None else label
v = agent.v if hasattr(agent, 'v') else None
Q = agent.Q if hasattr(agent, 'Q') else None
v2Q = agent.v2Q if hasattr(agent, 'v2Q') else None
avail_modes = []
if Q is not None:
avail_modes.append("Q")
avail_modes.append("v")
elif v is not None:
avail_modes.append("v")
if len(avail_modes) > 0:
self.view_mode = self.view_mode % len(avail_modes)
if avail_modes[self.view_mode] == 'v':
preferred_actions = None
if v == None:
preferred_actions = {}
v = {s: max(Q.get_Qs(s)[1]) for s in self.mdp.nonterminal_states}
for s in self.mdp.nonterminal_states:
acts, values = Q.get_Qs(s)
preferred_actions[s] = [a for (a,w) in zip(acts, values) if np.round(w, 2) == np.round(v[s], 2)]
if v2Q is not None:
preferred_actions = {}
for s in self.mdp.nonterminal_states:
q = v2Q(s)
mv = np.round( max( q.values() ), 2)
preferred_actions[s] = [k for k, v in q.items() if np.round(v, 2) == mv]
if agent != None and hasattr(agent, 'policy') and agent.policy is not None and state in agent.policy and isinstance(agent.policy[state], dict):
for s in self.mdp.nonterminal_states:
preferred_actions[s] = [a for a, v in agent.policy[s].items() if v == max(agent.policy[s].values()) ]
if hasattr(agent, 'returns_count_N'):
returns_count = agent.returns_count_N
else:
returns_count = None
if hasattr(agent, 'returns_sum_S'):
returns_sum = agent.returns_sum_S
else:
returns_sum = None
self.display_pygame.displayValues(mdp=self.mdp, v=v, preferred_actions=preferred_actions, currentState=state, message=label, returns_count=returns_count, returns_sum=returns_sum)
elif avail_modes[self.view_mode] == 'Q':
if hasattr(agent, 'e') and isinstance(agent.e, defaultdict):
eligibility_trace = defaultdict(float)
for k, v in agent.e.items():
eligibility_trace[k] = v
else:
eligibility_trace = None
if hasattr(agent, 'returns_count_N'):
returns_count = agent.returns_count_N
elif hasattr(agent, 'returns_count'):
returns_count = agent.returns_count
else:
returns_count = None
if hasattr(agent, 'returns_sum_S'):
returns_sum = agent.returns_sum_S
elif hasattr(agent, 'returns_sum'):
returns_sum = agent.returns_sum
else:
returns_sum = None
self.display_pygame.displayQValues(self.mdp, Q, currentState=state, message=label, eligibility_trace=eligibility_trace, returns_count=returns_count, returns_sum=returns_sum)
else:
raise Exception("No view mode selected")
else:
# self.pygame_display = Gridworl
self.display_pygame.displayNullValues(self.mdp, currentState=state, message=label)
# self.display.displayNullValues(self.mdp, currentState=state)
render_out2 = self.display_pygame.blit(render_mode=self.render_mode)
return render_out2
def close(self):
# print("Closing time...")
if self.display_pygame is not None:
self.display_pygame.close()
class BookGridEnvironment(GridworldEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(grid_book_grid, *args, **kwargs)
class BridgeGridEnvironment(GridworldEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(grid_bridge_grid, *args, **kwargs)
class CliffGridEnvironment(GridworldEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(grid_cliff_grid, living_reward=-1, *args, **kwargs)
class CliffGridEnvironment2(GridworldEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(grid_cliff_grid2, living_reward=-1, *args, **kwargs)
class OpenGridEnvironment(GridworldEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(grid_open_grid, *args, **kwargs)
r"""
Implement Suttons little corner-maze environment (see (SB18, Example 4.1)).
You can make an instance using:
> from irlc.gridworld.gridworld_environments import SuttonCornerGridEnvironment
> env = SuttonCornerGridEnvironment()
To get access the the mdp (as a MDP-class instance, for instance to see the states env.mdp.nonterminal_states) use
> env.mdp
"""
class SuttonCornerGridEnvironment(GridworldEnvironment):
def __init__(self, *args, living_reward=-1, **kwargs): # living_reward=-1 means the agent gets a reward of -1 per step.
super().__init__(sutton_corner_maze, *args, living_reward=living_reward, **kwargs)
class SuttonMazeEnvironment(GridworldEnvironment):
def __init__(self, *args, render_mode=None, living_reward=0, **kwargs):
sutton_maze_grid = [[' ', ' ', ' ', ' ', ' ', ' ', ' ', '#', +1],
[' ', ' ', '#', ' ', ' ', ' ', ' ', '#', ' '],
['S', ' ', '#', ' ', ' ', ' ', ' ', '#', ' '],
[' ', ' ', '#', ' ', ' ', ' ', ' ', ' ', ' '],
[' ', ' ', ' ', ' ', ' ', '#', ' ', ' ', ' '],
[' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ']]
super().__init__(sutton_maze_grid, *args, render_mode=render_mode, living_reward=living_reward, **kwargs)
grid_book_grid_ = [[' ',' ',' ',+1],
[' ','#',' ',-1],
['S',' ',' ',' ']]
frozen_lake_4 = [['S',' ',' ',' '],
[' ','#',' ',-1],
[ 0 , ' ', ' ', +1]]
class FrozenLake(GridworldEnvironment):
def _get_mdp(self, grid, uniform_initial_state=False):
return FrozenGridMDP(grid, is_slippery=self.is_slippery, living_reward=self.living_reward)
def __init__(self, is_slippery=True, living_reward=0, *args, **kwargs):
self.is_slippery = is_slippery
menv = FrozenLakeEnv(is_slippery=is_slippery) # Load frozen-lake game layout and convert to our format 'grid'
gym2grid = dict(F=' ', G=1, H=0)
grid = [[gym2grid.get(s.decode("ascii"), s.decode("ascii")) for s in l] for l in menv.desc.tolist()]
menv.close()
super().__init__(grid=grid, *args, living_reward=living_reward, **kwargs)
if __name__ == "__main__":
import gym
# env = gym.make('CartPole-v1', render_mode="human")
# env.reset()
#
# a = 234 gym
# env = gym.make('CartPole-v1', render_mode="human")
# env.reset()
from irlc import interactive, Agent, train
from irlc.ex11.q_agent import QAgent
from irlc.ex11.sarsa_agent import SarsaAgent
# env = SuttonMazeEnvironment(render_mode="human", zoom=0.75)
# env = OpenGridEnvironment(render_mode='human', zoom=0.75)
# env = OpenGridEnvironment()
env = CliffGridEnvironment()
agent = QAgent(env)
# env, agent = interactive(env, QAgent(env))
# stats, trajectories = train(env, agent, num_episodes=100, experiment_name='q_learning')
stats, trajectories = train(env, SarsaAgent(env), num_episodes=100, experiment_name='sarsa')
from irlc import main_plot
main_plot(experiments=['q_learning', 'sarsa'])
from matplotlib import pyplot as plt
plt.show()
# from irlc import VideoMonitor, train, Agent, PlayWrapper
# agent = Agent(env)
env.reset()
env.close()
This diff is collapsed.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from collections import defaultdict
from irlc.ex09.mdp import MDP
class GridworldMDP(MDP):
TERMINAL = "Terminal state"
NORTH = 0 # These are the four available actions.
EAST = 1
SOUTH = 2
WEST = 3
actions2labels = {NORTH: 'North',
SOUTH: 'South',
EAST: 'East',
WEST: 'West'} # This dictionary is useful for labelling purposes but otherwise serve no purpose.
def __init__(self, grid, living_reward=0.0, noise=0.0):
self.grid = {}
self.height = len(grid)
self.width = len(grid[0])
initial_state = None
for dy, line in enumerate(grid):
y = self.height - dy - 1
for x, el in enumerate(line):
self.grid[x, y] = el
if el == 'S':
initial_state = (x, y)
self.noise = noise
self.living_reward = living_reward
super().__init__(initial_state=initial_state)
def A(self, state):
"""
Returns list of valid actions available in 'state'.
You can try to go into walls (but will state in your location)
and when you are on the exit-squares (i.e., the ones with numbers), you have a single action available
'North' which will take you to the terminal square.
"""
return (self.NORTH,) if type(self.grid[state]) in [int, float] else (self.NORTH, self.EAST, self.SOUTH, self.WEST)
def is_terminal(self, state):
return state == self.TERMINAL
def Psr(self, state, action):
if type(self.grid[state]) in [float, int]:
return {(self.TERMINAL, self.grid[state]): 1.}
probabilities = defaultdict(float)
for a, pr in [(action, 1-self.noise), ((action - 1) % 4, self.noise/2), ((action + 1) % 4, self.noise/2)]:
sp = self.f(state, a)
r = self.grid[state] if type(self.grid[state]) in [int, float] else self.living_reward
probabilities[(sp, r)] += pr
return probabilities
def f(self, state, action):
x, y = state
nxt = {self.NORTH: (x, y+1),
self.WEST: (x-1, y),
self.EAST: (x+1, y),
self.SOUTH: (x, y-1)}
return nxt[action] if self._legal(nxt[action]) else state
def _legal(self, state):
return state in self.grid and self.grid[state] != "#"
class FrozenGridMDP(GridworldMDP):
def __init__(self, grid, is_slippery=True, living_reward=0):
self.is_slippery = is_slippery
super().__init__(grid, noise=2/3 if is_slippery else 0, living_reward=living_reward)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# In-class examples
This folder contains various examples used throughout class. You should be able to run most of the examples
if you find it helpful (and many of the examples are simply running the exercise code), however,
in some instances I have made small changes to the exercises to provide additional visualizations etc. Also note that the code is sometimes not
well organized -- in other words, the folder is provided "as is" for those who find it helpful, and you are free to ignore it.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.pacman.pacman_environment import PacmanEnvironment
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.pacman.pacman_environment import PacmanEnvironment
from irlc.pacman.pacman_graphics_display import PacmanGraphics, FirstPersonPacmanGraphics
import asyncio
class AsyncPacmanGraphics(PacmanGraphics):
async def update(self, newState, animate=False, ghostbeliefs=None, path=None, visitedlist=None):
agentIndex = newState.data._agentMoved
agentState = newState.data.agentStates[agentIndex]
if self.agentImages[agentIndex][0].isPacman != agentState.isPacman: self.swapImages(agentIndex, agentState)
prevState, prevImage = self.agentImages[agentIndex]
if animate:
if agentState.isPacman:
await self.animatePacman(agentState, prevState, prevImage, state=newState, ghostbeliefs=ghostbeliefs, path=path, visitedlist=visitedlist)
else:
self.moveGhost(agentState, agentIndex, prevState, prevImage)
self.agentImages[agentIndex] = (agentState, prevImage)
if newState.data._foodEaten != None:
self.removeFood(newState.data._foodEaten, self.food)
if newState.data._capsuleEaten != None:
self.removeCapsule(newState.data._capsuleEaten, self.capsules)
if 'ghostDistances' in dir(newState):
self.infoPane.updateGhostDistances(newState.data.ghostDistances)
self.master_render(newState, ghostbeliefs=ghostbeliefs, path=path, visitedlist=visitedlist)
async def animatePacman(self, pacman, prevPacman, image, nframe=1, frames=4, state=None, ghostbeliefs=None, path=None, visitedlist=None):
if self.frameTime < 0:
print('Press any key to step forward, "q" to play')
if self.frameTime > 0.01 or self.frameTime < 0:
fx, fy = self.getPosition(prevPacman)
px, py = self.getPosition(pacman)
for nframe in range(1,int(frames) + 1):
pos = px*nframe/frames + fx*(frames-nframe)/frames, py*nframe/frames + fy*(frames-nframe)/frames
self.movePacman(pos, self.getDirection(pacman), image, pacman=pacman)
pacman.draw_extra['delta_xy'] = (pos[0]-px, pos[1]-py)
await asyncio.sleep(self.frameTime/frames)
self.master_render(state, ghostbeliefs=ghostbeliefs, path=path, visitedlist=visitedlist)
self.blit(render_mode='human')
else:
self.movePacman(self.getPosition(pacman), self.getDirection(pacman), image, pacman=pacman)
pass
class AsyncPacmanEnvironment(PacmanEnvironment):
def _private_make_graphics(self):
if self.first_person_graphics:
self.graphics_display = FirstPersonPacmanGraphics(self.game.state, self.options_zoom, showGhosts=True,
frameTime=self.options_frametime,
ghostbeliefs=self.ghostbeliefs)
else:
self.graphics_display = AsyncPacmanGraphics(self.game.state, self.options_zoom, frameTime=self.options_frametime,
method=self.method)
async def async_step(self, action):
r_ = self.game.state._unsafe_getScore()
done = False
if action not in self.state.A():
raise Exception(f"Agent tried {action=} available actions {self.state.A()}")
# Let player play `action`, then let the ghosts play their moves in sequence.
for agent_index in range(len(self.game.agents)):
a = self.game.agents[agent_index].getAction(self.game.state) if agent_index > 0 else action
self.game.state = self.game.state.f(a)
self.game.rules.process(self.game.state, self.game)
if self.graphics_display is not None and self.animate_movement and agent_index == 0:
await self.graphics_display.update(self.game.state, animate=self.animate_movement, ghostbeliefs=self.ghostbeliefs, path=self.path, visitedlist=self.visitedlist)
done = self.game.gameOver or self.game.state.is_won() or self.game.state.is_lost()
if done:
break
reward = self.game.state._unsafe_getScore() - r_
return self.state, reward, done, False, {'mask': self.action_space._make_mask(self.state.A())}
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# feature_extractor.py
# --------------------
# Licensing Information: You are free to use or extend these projects for
# educational purposes provided that (1) you do not distribute or publish
# solutions, (2) you retain this notice, and (3) you provide clear
# attribution to UC Berkeley, including a link to http://ai.berkeley.edu.
#
# Attribution Information: The Pacman AI projects were developed at UC Berkeley.
# The core projects and autograders were primarily created by John DeNero
# (denero@cs.berkeley.edu) and Dan Klein (klein@cs.berkeley.edu).
# Student side autograding was added by Brad Miller, Nick Hay, and
# Pieter Abbeel (pabbeel@cs.berkeley.edu).
from irlc.pacman.pacman_utils import Actions
## Other classes
class FeatureExtractor:
def getFeatures(self, state, action):
"""
Returns a dict from features to counts
Usually, the count will just be 1.0 for
indicator functions.
"""
raise NotImplementedError()
class IdentityExtractor(FeatureExtractor):
def getFeatures(self, state, action):
from collections import defaultdict
feats = defaultdict(lambda: 0)
# feats = util.Counter()
feats[(state,action)] = 1.0
return feats
class CoordinateExtractor(FeatureExtractor):
def getFeatures(self, state, action):
from collections import defaultdict
feats = defaultdict(lambda: 0)
# feats = util.Counter()
feats[state] = 1.0
feats['x=%d' % state[0]] = 1.0
feats['y=%d' % state[0]] = 1.0
feats['action=%s' % action] = 1.0
return feats
def closestFood(pos, food, walls):
"""
closestFood -- this is similar to the function that we have
worked on in the search project; here its all in one place
"""
fringe = [(pos[0], pos[1], 0)]
expanded = set()
while fringe:
pos_x, pos_y, dist = fringe.pop(0)
if (pos_x, pos_y) in expanded:
continue
expanded.add((pos_x, pos_y))
# if we find a food at this location then exit
if food[pos_x][pos_y]:
return dist
# otherwise spread out from the location to its neighbours
nbrs = Actions.getLegalNeighbors((pos_x, pos_y), walls)
for nbr_x, nbr_y in nbrs:
fringe.append((nbr_x, nbr_y, dist+1))
# no food found
return None
class SimpleExtractor(FeatureExtractor):
"""
Returns simple features for a basic reflex Pacman:
- whether food will be eaten
- how far away the next food is
- whether a ghost collision is imminent
- whether a ghost is one step away
"""
def getFeatures(self, state, action):
# extract the grid of food and wall locations and get the ghost locations
food = state._unsafe_getFood()
walls = state._unsafe_getWalls()
ghosts = state._unsafe_getGhostPositions()
from collections import defaultdict
features = defaultdict(lambda: 0)
# features = util.Counter()
features["bias"] = 1.0
# compute the location of pacman after he takes the action
x, y = state._unsafe_getPacmanPosition()
dx, dy = Actions.directionToVector(action)
next_x, next_y = int(x + dx), int(y + dy)
# count the number of ghosts 1-step away
features["#-of-ghosts-1-step-away"] = sum((next_x, next_y) in Actions.getLegalNeighbors(g, walls) for g in ghosts)
# if there is no danger of ghosts then add the food feature
if not features["#-of-ghosts-1-step-away"] and food[next_x][next_y]:
features["eats-food"] = 1.0
dist = closestFood((next_x, next_y), food, walls)
if dist is not None:
# make the distance a number less than one otherwise the update
# will diverge wildly
features["closest-food"] = float(dist) / (walls.width * walls.height)
# features.divideAll(10.0)
features = {k: v/10.0 for k, v in features.items() }
return features
This diff is collapsed.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# layout.py
# ---------
# Licensing Information: You are free to use or extend these projects for
# educational purposes provided that (1) you do not distribute or publish
# solutions, (2) you retain this notice, and (3) you provide clear
# attribution to UC Berkeley, including a link to http://ai.berkeley.edu.
#
# Attribution Information: The Pacman AI projects were developed at UC Berkeley.
# The core projects and autograders were primarily created by John DeNero
# (denero@cs.berkeley.edu) and Dan Klein (klein@cs.berkeley.edu).
# Student side autograding was added by Brad Miller, Nick Hay, and
# Pieter Abbeel (pabbeel@cs.berkeley.edu).
# from irlc.berkley.util import manhattanDistance
from irlc.pacman.pacman_utils import Grid
import os
import random
VISIBILITY_MATRIX_CACHE = {}
def manhattanDistance( xy1, xy2 ):
"Returns the Manhattan distance between points xy1 and xy2"
return abs( xy1[0] - xy2[0] ) + abs( xy1[1] - xy2[1] )
class Layout:
"""
A Layout manages the static information about the game board.
"""
def __init__(self, layoutText):
self.width = len(layoutText[0])
self.height= len(layoutText)
self.walls = Grid(self.width, self.height, False)
self.food = Grid(self.width, self.height, False)
self.capsules = []
self.agentPositions = []
self.numGhosts = 0
self.processLayoutText(layoutText)
self.layoutText = layoutText
self.totalFood = len(self.food.asList())
# self.initializeVisibilityMatrix()
def getNumGhosts(self):
return self.numGhosts
# def initializeVisibilityMatrix(self):
# global VISIBILITY_MATRIX_CACHE
# if reduce(str.__add__, self.layoutText) not in VISIBILITY_MATRIX_CACHE:
# from game import Directions
# vecs = [(-0.5,0), (0.5,0),(0,-0.5),(0,0.5)]
# dirs = [Directions.NORTH, Directions.SOUTH, Directions.WEST, Directions.EAST]
# vis = Grid(self.width, self.height, {Directions.NORTH:set(), Directions.SOUTH:set(), Directions.EAST:set(), Directions.WEST:set(), Directions.STOP:set()})
# for x in range(self.width):
# for y in range(self.height):
# if self.walls[x][y] == False:
# for vec, direction in zip(vecs, dirs):
# dx, dy = vec
# nextx, nexty = x + dx, y + dy
# while (nextx + nexty) != int(nextx) + int(nexty) or not self.walls[int(nextx)][int(nexty)] :
# vis[x][y][direction].add((nextx, nexty))
# nextx, nexty = x + dx, y + dy
# self.visibility = vis
# VISIBILITY_MATRIX_CACHE[reduce(str.__add__, self.layoutText)] = vis
# else:
# self.visibility = VISIBILITY_MATRIX_CACHE[reduce(str.__add__, self.layoutText)]
def isWall(self, pos):
x, col = pos
return self.walls[x][col]
def getRandomLegalPosition(self):
x = random.choice(range(self.width))
y = random.choice(range(self.height))
while self.isWall( (x, y) ):
x = random.choice(range(self.width))
y = random.choice(range(self.height))
return (x,y)
def getRandomCorner(self):
poses = [(1,1), (1, self.height - 2), (self.width - 2, 1), (self.width - 2, self.height - 2)]
return random.choice(poses)
def getFurthestCorner(self, pacPos):
poses = [(1,1), (1, self.height - 2), (self.width - 2, 1), (self.width - 2, self.height - 2)]
dist, pos = max([(manhattanDistance(p, pacPos), p) for p in poses])
return pos
# def isVisibleFrom(self, ghostPos, pacPos, pacDirection):
# row, col = [int(x) for x in pacPos]
# return ghostPos in self.visibility[row][col][pacDirection]
def __str__(self):
return "\n".join(self.layoutText)
def deepCopy(self):
return Layout(self.layoutText[:])
def processLayoutText(self, layoutText):
"""
Coordinates are flipped from the input format to the (x,y) convention here
The shape of the maze. Each character
represents a different type of object.
% - Wall
. - Food
o - Capsule
G - Ghost
P - Pacman
Other characters are ignored.
"""
maxY = self.height - 1
for y in range(self.height):
for x in range(self.width):
layoutChar = layoutText[maxY - y][x]
self.processLayoutChar(x, y, layoutChar)
self.agentPositions.sort()
self.agentPositions = [ ( i == 0, pos) for i, pos in self.agentPositions]
def processLayoutChar(self, x, y, layoutChar):
if layoutChar == '%':
self.walls[x][y] = True
elif layoutChar == '.':
self.food[x][y] = True
elif layoutChar == 'o':
self.capsules.append((x, y))
elif layoutChar == 'P':
self.agentPositions.append( (0, (x, y) ) )
elif layoutChar in ['G']:
self.agentPositions.append( (1, (x, y) ) )
self.numGhosts += 1
elif layoutChar in ['1', '2', '3', '4']:
self.agentPositions.append( (int(layoutChar), (x,y)))
self.numGhosts += 1
def getLayout(name, back = 2):
if name.endswith('.lay'):
layout = tryToLoad('layouts/' + name)
if layout == None: layout = tryToLoad(name)
else:
layout = tryToLoad('layouts/' + name + '.lay')
if layout == None: layout = tryToLoad(name + '.lay')
if layout == None and back >= 0:
curdir = os.path.abspath('.')
os.chdir('..')
layout = getLayout(name, back -1)
os.chdir(curdir)
return layout
def tryToLoad(fullname):
import pathlib
fullname = os.path.join(fullname, pathlib.Path(__file__).parent.absolute(), fullname)
if(not os.path.exists(fullname)): return None
# os.path.abspath(fullname)
f = open(fullname)
try: return Layout([line.strip() for line in f])
finally: f.close()
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%. % %.%
% %%%%% % %%% %%% %%%%%%% % %
% % % % % % % %
%%%%% %%%%% %%% % % % %%% %%%%% % %%%
% % % % % % % % % % % % %
% %%% % % % %%% %%%%% %%% % %%% %%% %
% % % % % % % % %
%%% %%%%%%%%% %%%%%%% %%% %%% % % % %
% % % % % % %
% % %%%%% % %%% % % %%% % %%% %%% % %
% % % % % % % % % % % % % %
% % % %%%%%%% % %%%%%%%%% %%% % %%% %
% % % % % % % % % %
%%% %%% % %%%%% %%%%% %%% %%% %%%%% %
% % % % % % % % %
% % % % % % %%% %%% %%% % % % % % %
% % % % % %% % % % % % % % % %
% % %%%%% % %%% %%% % %%% %%% %%%%%
% % % % % % % % % % %
% %%% % % % %%% %%% %%%%%%%%% % %%%
% % % % % % %
% %%% %%%%%%%%%%%%%%%%%%%%% % % %%% %
% % % %
% % % %%%%% %%% % % % % %%%%%%%%%%%%%
% % % % % % % % % % % %
% % %%% %%% % % % %%%%%%%%% %%% % % %
% % % % % % %P % % % % % %
% %%% %%% %%% % %%% % % %%%%% % %%%%%
% % % % % % % %
%%% % %%%%% %%%%% %%% %%% % %%% % %%%
% % % % % % % % % % % % % % %
% % %%% % % % % %%%%%%%%% % % % % % %
% % % %
% % % %%% %%% %%%%%%% %%% %%% %%% %
%.% % % % % .%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment