alice/code/environments/puzzle.py

495 lines
18 KiB
Python

"""
puzzle.py
"""
import numpy as np, itertools
from random import shuffle
from typing import List, Tuple, Union, Any
import copy
#import gym, gym_gridworlds # if using other environments
# overridden in agent.py, typically due to load order
LOGGING = True
import logging, sys
logging.basicConfig(stream=sys.stdout,level=logging.INFO)
log = logging.getLogger()
if not LOGGING:
# remove all logging functionality
for handler in log.handlers.copy():
try:
log.removeHandler(handler)
except ValueError: # in case another thread has already removed it
pass
log.addHandler(logging.NullHandler())
log.propagate = False
class Puzzle:
__slots__ = [
'tt',
'features',
'rewards',
'state',
'initialState',
'solved',
'solvable',
'maxrewards',
'originalrewards']
def __init__(self, tt:List[List[int]], features:List[int], rewards:List[float], initialState:int = 0):
self.tt = tt
self.features = features
self.rewards = rewards[:]
self.originalrewards = rewards
self.state = 0
self.initialState = initialState
self.solved = False
def __str__(self) -> str:
output = ""
output += "transition table:\n"
for row in self.tt:
output += f" {str(row)}\n"
output += f"solved: {self.solved}\n"
output += f"state: {self.state}\n"
output += f"features: {self.features}\n"
output += f"rewards: {self.rewards}\n"
return output
def reset(self):
'''must be called before first use'''
self.solved = False
self.state = self.initialState
self.rewards = self.originalrewards[:]
def setMaxRewards(self, maxRewards):
'''typically used by the ConvBelt class before reset()'''
self.maxrewards = set(self.rewards) & set(maxRewards)
self.solvable = bool(self.maxrewards)
def transition(self,action:int) -> Tuple[float, List[int], bool]:
self.state = self.tt[self.state][action]
finished = False
reward = self.rewards[self.state]
if self.rewards[self.state] in self.maxrewards:
self.rewards[self.state] = -1 # 'eat' the food and replace with empty reward
finished = True
self.solved = True
return (reward, self.features[self.state], finished)
def getFeatures(self) -> List[int]:
'''returns only the current observable features of the puzzle'''
return self.features[self.state]
def Action(index:Union[int,str]) -> Union[str,int]:
''' action str <-> int Action('pass')->1 Action(1)->'pass' '''
if isinstance(index, (int,np.int64)):
return ('idle','pass','investigate','eat')[index]
return {'idle':0,'pass':1,'investigate':2,'eat':3}[index]
class ConvBelt:
"""
__slots__ = [
'puzzles', # (list[Puzzle]) - list of puzzles, use append()
'pi', # (int) - currently selected puzzle / "puzzle index"
'puzzle', # (ref:Puzzle) - shortcut for self.puzzles[pi]
'randomize', # (bool) - shuffling of puzzles between trials
'maxrewards', # (list[float]) - the maximum achievable rewards
'action_space', # (tuple[int]) - number of actions available to agents, usually (4,)
'observation_space', # (tuple[int]) - features/dimensions given to agents (dim1 size, dim2 size...)
'puzzlesLeftToComplete', # (int) - faster tracking of how many are left, when 0 set self.solved
'solved', # (bool) - state flag for all puzzles solved (trial can be over)
'agentclass',
'killed_reward',
'max_training_trials',
'max_steps',
'alpha',
'gamma',
'epsilon',
'lmbda',
#'get_weights_len',
#'reset',
#'extend',
#'clear',
]
"""
def __init__(self,actionSpace,observationSpace,maxRewards, agentclass,
killed_reward=-10.0, max_training_trials=50, max_steps=32,
alpha=0.01, gamma=0.95, epsilon=0.01, lmbda=0.42, randomize=False):
'''please provide entire actionSpace, observationSpace, maxRewards for all puzzles
even those later added this environment'''
self.puzzles = []
self.pi = 0
self.puzzle = None
self.randomize = randomize
self.action_space = actionSpace
self.observation_space = observationSpace
self.maxrewards = maxRewards
self.puzzlesLeftToComplete = 0
self.solved = False
self.agentclass = agentclass
self.killed_reward = killed_reward
self.max_training_trials = max_training_trials
self.max_steps = max_steps
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
self.lmbda = lmbda
print(self.get_weights_len())
def get_weights_len(self):
"""
Return the length of weights needed for an agent.
"""
print("in ConvBelt.get_weights_len")
mywl = np.prod(tuple(self.observation_space) + tuple(self.action_space))
return mywl
def reset(self):
'''returns an initial observation while also resetting the environment'''
log.info("resetting all puzzles")
self.puzzlesLeftToComplete = 0
for puzzle in self.puzzles:
puzzle.reset()
if puzzle.solvable:
self.puzzlesLeftToComplete += 1
self.solved = not bool(self.puzzlesLeftToComplete)
if self.randomize: shuffle(self.puzzles)
self.pi = 0
if len(self.puzzles) == 0:
raise Exception("Please add puzzles to the belt/env first using append() or extend()")
self.puzzle = self.puzzles[self.pi]
return self.puzzle.getFeatures()
def append(self, newPuzzle:Puzzle):
log.info("adding new puzzle")
newPuzzle.setMaxRewards(self.maxrewards)
newPuzzle.reset()
if newPuzzle.solvable:
self.puzzlesLeftToComplete += 1
self.solved = False
self.puzzles.append(newPuzzle)
if self.puzzle is None:
self.reset()
def extend(self, newPuzzles:List[Puzzle]):
log.info(f"adding {len(newPuzzles)} new puzzles")
oldLength = len(self.puzzles)
self.puzzles.extend(newPuzzles)
newLength = len(self.puzzles)
for puzzle_i in range(oldLength, newLength):
puzzle = self.puzzles[puzzle_i]
puzzle.setMaxRewards(self.maxRewards)
puzzle.reset()
if puzzle.solvable:
self.puzzlesLeftToComplete += 1
self.solved = False
if self.puzzle is None:
self.reset()
def _post_removal(self):
if len(self.puzzles) == 0:
self.puzzle = None
log.info("puzzles list now empty")
if self.pi >= len(self.puzzles)-1:
self.pi = 0
log.info("resetting index to 0")
def clear(self):
'''clears the belt of puzzles'''
self.puzzles.clear()
log.info("removed ALL puzzles")
self.puzzlesLeftToComplete = 0
self._post_removal()
def remove(self, puzzle):
'''removes puzzle from belt of puzzles'''
if puzzle.solvable:
self.puzzlesLeftToComplete -= 1
self.puzzles.remove(puzzle)
log.info("removed puzzle")
self._post_removal()
def pop(self, index=None):
'''removes puzzle at index or from end'''
if index is None:
index = -1
puzzle = self.puzzles.pop(index)
if puzzle.solvable:
self.puzzlesLeftToComplete -= 1
log.info(f"popped puzzle at index {index}")
self._post_removal()
def _completed_a_puzzle(self):
self.puzzlesLeftToComplete -= 1
log.info(f"completed a puzzle - {self.puzzlesLeftToComplete} solvable puzzles remain")
if self.puzzlesLeftToComplete == 0:
self.solved = True
log.info(f"all puzzles completed - trial complete")
def step(self, action:int) -> Tuple[List[int], float, bool, Any]: # returns (state,reward,goal,_) (gym format)
if action == 1: # pass (change to next puzzle, and change no puzzle's state)
self.pi = (self.pi + 1) % len(self.puzzles)
# reports states of old and new puzzles instead of a transition
log.info(f"(puzzle-step) action {action} ({Action(action)}) from old puzzle state {self.puzzle.state} to new puzzle state {self.puzzles[self.pi].state}")
self.puzzle = self.puzzles[self.pi]
return (self.puzzle.features[self.puzzle.state], # features
-1, # reward of a pass
#self.puzzle.rewards[self.puzzle.state], # reward
self.solved, # done-flag
None) # DebugVisInfo
else:
log.info(f"(puzzle-step) action {action} ({Action(action)}) from state {self.puzzle.state} to {self.puzzle.tt[self.puzzle.state][action]}")
reward, features, puzzle_just_finished = self.puzzle.transition(action)
if puzzle_just_finished:
self._completed_a_puzzle()
return (features, reward, self.solved, None)
def render(self, env, brain):
# renders a puzzlebox environment
import numpy as np
import matplotlib.pyplot as plt
actions = []
rewards = []
states = []
brain.reset() # Warning!!: NOT MABE-reset(), but soft-reset() (keep weights)
nextState = env.reset()
states.append(nextState)
actions.append(0) # path is recording actions in this visualization
rewards.append(-1)
time = 0
print(env.puzzlesLeftToComplete)
while True:
time += 1
brain.sensoryState = nextState # SET INPUTS
brain.plasticUpdate()
nextState, reward, goal_achieved, _ = env.step(brain.action) # GET OUTPUTS
actions.append(brain.action)
rewards.append(reward)
states.append(nextState)
if env.puzzlesLeftToComplete == 0 or time == 600: break
#if goal_achieved or time == 100: break
brain.reward = reward
print(actions)
print(states)
plt.figure()
plt.plot(actions)
plt.scatter(list(range(len(actions))),actions)
plt.figure()
plt.plot(rewards)
plt.scatter(list(range(len(rewards))),rewards)
def evaluate(self, ind,
num_trials=200,
n_actions=4,
HARD_TIME_LIMIT=600):
"""
Given an individual agent's weights, evaluate it and
return its fitness.
"""
w = 0.0
# Need to refactor the following code taken from the
# Jupyter notebook.
# domain-specific settings
#num_trials=200
#n_actions = 4
#(optimal lmbda in the agent is domain dependent - could be evolved)
#HARD_TIME_LIMIT = 600
#KILLED_REWARD = -10 # not used here
#(standard reward) = -1.0 (means agent is potentially wasting time - set internal to agent code)
#(goal reward) = 1.0 (means the agent achieved something good - set internal to agent code)
# alpha # how much to weigh reward surprises that deviate from expectation
# gamma # how important exepcted rewards will be
# epsilon # fraction of exploration to exploitation (how often to choose a random action)
# lmbda # how slowly memory of preceeding actions fades away (1=never, 0=
agent = self.agentclass(obsSpace=self.observation_space, actSpace=self.action_space, alpha=self.alpha,
gamma=self.gamma, epsilon=self.epsilon, lmbda=self.lmbda)
# Put weights in the Agent
agent.weights = [x for x in ind]
time_to_solve_each_trial = []
rewards = []
for trialN in range(self.max_training_trials):
# some output to see it running
if (trialN % 10) == 0: print('.',end='')
# initialize the agent, environment, and time for this trial
agent.reset() # soft-reset() (keeps learned weights)
nextState = self.reset()
time = 0
while True:
time += 1
# set agent senses based on environment and allow agent to determine an action
agent.sensoryState = nextState
agent.plasticUpdate()
# determine effect on environment state & any reward (in standard openAI-gym API format)
nextState, reward, goal_achieved, _ = self.step(agent.action)
agent.reward = reward
if self.puzzlesLeftToComplete == 0 or time == self.max_steps:
agent.plasticUpdate()
break
# could have deadly rewards that stop the trial early
#elif reward <= -10:
# agent.sensoryState = nextState
# agent.reward = reward
# agent.plasticUpdate()
# agent.reset()
# nextState = self.reset()
rewards.append(reward)
time_to_solve_each_trial.append(time)
# Calculate fitness
# Rewards are in [-1 .. 1], have to rescale to [0 .. 1]
#scalerewards = (np.array(rewards) * 0.5) + 0.5
#w = np.mean(scalerewards)
w = sum(rewards)
return w,
def getObservationSpace(*items) -> Tuple[int]:
'''Returns total features dimensions over all puzzles, starting from 0.
Given 1 or more puzzles, finds union of observation space (features).
then returns the size of that space.
Ensures all puzzles have same feature dimensions, errors if not.
Useful when setting up a RL state space for certain feature sizes.
[3,1] would have dimensions [4,2], and [[0,2],[0,1]] would be [1,3]
>>> p1 = Puzzle(tt=[[]], rewards=[], features=[[0,1],[0,1],[3,1]])
>>> getObservationSpace(p1)
(4, 2)
>>> p2 = Puzzle(tt=[[]], rewards=[], features=[[1,1],[1,1],[2,4]])
>>> getObservationSpace(p2)
(3, 5)
>>> getObservationSpace(p1,p2)
(4, 5)
>>> puzzles = [p1,p2]
>>> getObservationSpace(puzzles)
(4, 5)
'''
if type(items) is tuple and isinstance(items[0], Puzzle):
# perform union (max) over feature space of all items
highest = copy.copy(items[0].features[0]) # features is [[int,int,...],...]
featurelen = len(highest)
for puzzle in items:
for featureset in puzzle.features:
if len(featureset) != featurelen:
raise Exception("not all features have the same length")
for feature_i in range(len(featureset)):
highest[feature_i] = max(highest[feature_i],featureset[feature_i])
return tuple((e+1 for e in highest)) # size is 1+highest due to 0-indexing of features
elif type(items) is tuple and type(items[0]) in (tuple,list):
return getObservationSpace(*items[0]) # unpack one layer
else:
raise Exception(f"Expected type of Puzzle(s), but got {type(items)}")
def getActionSpace(*items) -> Tuple[int]:
'''Returns total action dimensions over all puzzles, (num columns in tt).
Given 1 or more puzzles.
Ensures all puzzles have same dimensions, errors if not.
Useful when setting up a RL state space for certain action sizes.
>>> p1 = Puzzle(tt=[[0,0],[4,2]], rewards=[], features=[[]])
>>> getActionSpace(p1)
(2,)
>>> p2 = Puzzle(tt=[[0,0,1],[1,1,2]], rewards=[], features=[[]])
>>> getActionSpace(p2)
(3,)
>>> getActionSpace(p1,p2)
Traceback (most recent call last):
...
Exception: not all puzzles (rows) have the same tt col size
'''
if type(items) is tuple and isinstance(items[0], Puzzle):
# perform union (max) over feature space of all items
nrows, ncols = len(items[0].tt), len(items[0].tt[0])
for puzzle in items:
prows = len(puzzle.tt)
if prows != nrows:
raise Exception("not all puzzles have the same tt row size")
samerows = [len(c) == ncols for c in puzzle.tt]
if not all(samerows):
raise Exception("not all puzzles (rows) have the same tt col size")
return (ncols,)
elif type(items) is tuple and type(items[0]) in (tuple,list):
return getActionSpace(*items[0]) # unpack one layer
else:
raise Exception(f"Expected type of Puzzle(s), but got {type(items)}")
def _test_world():
'''full test of the conveyorbelt world
>>> import copy
>>> maxrewards = [1]
>>> easy_features = [[0,1],[0,1],[3,1],[0,0]]
>>> easy_rewards = [-1,-1,-1,1]
>>> easy_tt = np.array([[0,0,2,3], [0,0,0,0], [2,0,2,3], [3,3,3,3]])
>>> p1 = Puzzle(tt=easy_tt, features=easy_features, rewards=easy_rewards)
>>> p2 = copy.deepcopy(p1)
>>> puzzles = (p1,p2)
>>> world = ConvBelt(actionSpace = getActionSpace(puzzles), observationSpace = getObservationSpace(puzzles), maxRewards = maxrewards, randomize = False)
>>> world.append(p1)
>>> world.append(p2)
>>> # trial 1
>>> world.reset() # reset before first use just to be sure
>>> world.step(Action('investigate'))
(-1, [3, 1], False)
>>> world.step(Action('pass'))
(-1, [0, 1], False)
>>> world.step(Action('eat'))
(1, [0, 0], False)
>>> world.step(Action('pass'))
(-1, [3, 1], False)
>>> world.step(Action('eat'))
(1, [0, 0], True)
>>> world.step(Action('eat')) # try eating again, notice reward change
(-1, [0, 0], True)
>>> # trial 2
>>> world.reset()
>>> world.step(Action('investigate'))
(-1, [3, 1], False)
>>> world.step(Action('pass'))
(-1, [0, 1], False)
>>> world.step(Action('eat'))
(1, [0, 0], False)
>>> world.step(Action('pass'))
(-1, [3, 1], False)
>>> world.step(Action('eat'))
(1, [0, 0], True)
'''
if __name__ == '__main__':
'''test important functions and workflows with doctesting
run this python file by itself to run these tests, and set
LOGGING=True near top of file.'''
import doctest
from functools import partial
test = partial(doctest.run_docstring_examples, globs = globals())
test(getObservationSpace)
test(getActionSpace)
test(_test_world)