""" puzzle.py """ import numpy as np, itertools from random import shuffle from typing import List, Tuple, Union, Any import copy #import gym, gym_gridworlds # if using other environments # overridden in agent.py, typically due to load order LOGGING = True import logging, sys logging.basicConfig(stream=sys.stdout,level=logging.INFO) log = logging.getLogger() if not LOGGING: # remove all logging functionality for handler in log.handlers.copy(): try: log.removeHandler(handler) except ValueError: # in case another thread has already removed it pass log.addHandler(logging.NullHandler()) log.propagate = False class Puzzle: __slots__ = [ 'tt', 'features', 'rewards', 'state', 'initialState', 'solved', 'solvable', 'maxrewards', 'originalrewards'] def __init__(self, tt:List[List[int]], features:List[int], rewards:List[float], initialState:int = 0): self.tt = tt self.features = features self.rewards = rewards[:] self.originalrewards = rewards self.state = 0 self.initialState = initialState self.solved = False def __str__(self) -> str: output = "" output += "transition table:\n" for row in self.tt: output += f" {str(row)}\n" output += f"solved: {self.solved}\n" output += f"state: {self.state}\n" output += f"features: {self.features}\n" output += f"rewards: {self.rewards}\n" return output def reset(self): '''must be called before first use''' self.solved = False self.state = self.initialState self.rewards = self.originalrewards[:] def setMaxRewards(self, maxRewards): '''typically used by the ConvBelt class before reset()''' self.maxrewards = set(self.rewards) & set(maxRewards) self.solvable = bool(self.maxrewards) def transition(self,action:int) -> Tuple[float, List[int], bool]: self.state = self.tt[self.state][action] finished = False reward = self.rewards[self.state] if self.rewards[self.state] in self.maxrewards: self.rewards[self.state] = -1 # 'eat' the food and replace with empty reward finished = True self.solved = True return (reward, self.features[self.state], finished) def getFeatures(self) -> List[int]: '''returns only the current observable features of the puzzle''' return self.features[self.state] def Action(index:Union[int,str]) -> Union[str,int]: ''' action str <-> int Action('pass')->1 Action(1)->'pass' ''' if isinstance(index, (int,np.int64)): return ('idle','pass','investigate','eat')[index] return {'idle':0,'pass':1,'investigate':2,'eat':3}[index] class ConvBelt: """ __slots__ = [ 'puzzles', # (list[Puzzle]) - list of puzzles, use append() 'pi', # (int) - currently selected puzzle / "puzzle index" 'puzzle', # (ref:Puzzle) - shortcut for self.puzzles[pi] 'randomize', # (bool) - shuffling of puzzles between trials 'maxrewards', # (list[float]) - the maximum achievable rewards 'action_space', # (tuple[int]) - number of actions available to agents, usually (4,) 'observation_space', # (tuple[int]) - features/dimensions given to agents (dim1 size, dim2 size...) 'puzzlesLeftToComplete', # (int) - faster tracking of how many are left, when 0 set self.solved 'solved', # (bool) - state flag for all puzzles solved (trial can be over) 'agentclass', 'killed_reward', 'max_training_trials', 'max_steps', 'alpha', 'gamma', 'epsilon', 'lmbda', #'get_weights_len', #'reset', #'extend', #'clear', ] """ def __init__(self,actionSpace,observationSpace,maxRewards, agentclass, killed_reward=-10.0, max_training_trials=50, max_steps=32, alpha=0.01, gamma=0.95, epsilon=0.01, lmbda=0.42, randomize=False): '''please provide entire actionSpace, observationSpace, maxRewards for all puzzles even those later added this environment''' self.puzzles = [] self.pi = 0 self.puzzle = None self.randomize = randomize self.action_space = actionSpace self.observation_space = observationSpace self.maxrewards = maxRewards self.puzzlesLeftToComplete = 0 self.solved = False self.agentclass = agentclass self.killed_reward = killed_reward self.max_training_trials = max_training_trials self.max_steps = max_steps self.alpha = alpha self.gamma = gamma self.epsilon = epsilon self.lmbda = lmbda print(self.get_weights_len()) def get_weights_len(self): """ Return the length of weights needed for an agent. """ print("in ConvBelt.get_weights_len") mywl = np.prod(tuple(self.observation_space) + tuple(self.action_space)) return mywl def reset(self): '''returns an initial observation while also resetting the environment''' log.info("resetting all puzzles") self.puzzlesLeftToComplete = 0 for puzzle in self.puzzles: puzzle.reset() if puzzle.solvable: self.puzzlesLeftToComplete += 1 self.solved = not bool(self.puzzlesLeftToComplete) if self.randomize: shuffle(self.puzzles) self.pi = 0 if len(self.puzzles) == 0: raise Exception("Please add puzzles to the belt/env first using append() or extend()") self.puzzle = self.puzzles[self.pi] return self.puzzle.getFeatures() def append(self, newPuzzle:Puzzle): log.info("adding new puzzle") newPuzzle.setMaxRewards(self.maxrewards) newPuzzle.reset() if newPuzzle.solvable: self.puzzlesLeftToComplete += 1 self.solved = False self.puzzles.append(newPuzzle) if self.puzzle is None: self.reset() def extend(self, newPuzzles:List[Puzzle]): log.info(f"adding {len(newPuzzles)} new puzzles") oldLength = len(self.puzzles) self.puzzles.extend(newPuzzles) newLength = len(self.puzzles) for puzzle_i in range(oldLength, newLength): puzzle = self.puzzles[puzzle_i] puzzle.setMaxRewards(self.maxRewards) puzzle.reset() if puzzle.solvable: self.puzzlesLeftToComplete += 1 self.solved = False if self.puzzle is None: self.reset() def _post_removal(self): if len(self.puzzles) == 0: self.puzzle = None log.info("puzzles list now empty") if self.pi >= len(self.puzzles)-1: self.pi = 0 log.info("resetting index to 0") def clear(self): '''clears the belt of puzzles''' self.puzzles.clear() log.info("removed ALL puzzles") self.puzzlesLeftToComplete = 0 self._post_removal() def remove(self, puzzle): '''removes puzzle from belt of puzzles''' if puzzle.solvable: self.puzzlesLeftToComplete -= 1 self.puzzles.remove(puzzle) log.info("removed puzzle") self._post_removal() def pop(self, index=None): '''removes puzzle at index or from end''' if index is None: index = -1 puzzle = self.puzzles.pop(index) if puzzle.solvable: self.puzzlesLeftToComplete -= 1 log.info(f"popped puzzle at index {index}") self._post_removal() def _completed_a_puzzle(self): self.puzzlesLeftToComplete -= 1 log.info(f"completed a puzzle - {self.puzzlesLeftToComplete} solvable puzzles remain") if self.puzzlesLeftToComplete == 0: self.solved = True log.info(f"all puzzles completed - trial complete") def step(self, action:int) -> Tuple[List[int], float, bool, Any]: # returns (state,reward,goal,_) (gym format) if action == 1: # pass (change to next puzzle, and change no puzzle's state) self.pi = (self.pi + 1) % len(self.puzzles) # reports states of old and new puzzles instead of a transition log.info(f"(puzzle-step) action {action} ({Action(action)}) from old puzzle state {self.puzzle.state} to new puzzle state {self.puzzles[self.pi].state}") self.puzzle = self.puzzles[self.pi] return (self.puzzle.features[self.puzzle.state], # features -1, # reward of a pass #self.puzzle.rewards[self.puzzle.state], # reward self.solved, # done-flag None) # DebugVisInfo else: log.info(f"(puzzle-step) action {action} ({Action(action)}) from state {self.puzzle.state} to {self.puzzle.tt[self.puzzle.state][action]}") reward, features, puzzle_just_finished = self.puzzle.transition(action) if puzzle_just_finished: self._completed_a_puzzle() return (features, reward, self.solved, None) def render(self, env, brain): # renders a puzzlebox environment import numpy as np import matplotlib.pyplot as plt actions = [] rewards = [] states = [] brain.reset() # Warning!!: NOT MABE-reset(), but soft-reset() (keep weights) nextState = env.reset() states.append(nextState) actions.append(0) # path is recording actions in this visualization rewards.append(-1) time = 0 print(env.puzzlesLeftToComplete) while True: time += 1 brain.sensoryState = nextState # SET INPUTS brain.plasticUpdate() nextState, reward, goal_achieved, _ = env.step(brain.action) # GET OUTPUTS actions.append(brain.action) rewards.append(reward) states.append(nextState) if env.puzzlesLeftToComplete == 0 or time == 600: break #if goal_achieved or time == 100: break brain.reward = reward print(actions) print(states) plt.figure() plt.plot(actions) plt.scatter(list(range(len(actions))),actions) plt.figure() plt.plot(rewards) plt.scatter(list(range(len(rewards))),rewards) def evaluate(self, ind, num_trials=200, n_actions=4, HARD_TIME_LIMIT=600): """ Given an individual agent's weights, evaluate it and return its fitness. """ w = 0.0 # Need to refactor the following code taken from the # Jupyter notebook. # domain-specific settings #num_trials=200 #n_actions = 4 #(optimal lmbda in the agent is domain dependent - could be evolved) #HARD_TIME_LIMIT = 600 #KILLED_REWARD = -10 # not used here #(standard reward) = -1.0 (means agent is potentially wasting time - set internal to agent code) #(goal reward) = 1.0 (means the agent achieved something good - set internal to agent code) # alpha # how much to weigh reward surprises that deviate from expectation # gamma # how important exepcted rewards will be # epsilon # fraction of exploration to exploitation (how often to choose a random action) # lmbda # how slowly memory of preceeding actions fades away (1=never, 0= agent = self.agentclass(obsSpace=self.observation_space, actSpace=self.action_space, alpha=self.alpha, gamma=self.gamma, epsilon=self.epsilon, lmbda=self.lmbda) # Put weights in the Agent agent.weights = [x for x in ind] time_to_solve_each_trial = [] rewards = [] for trialN in range(self.max_training_trials): # some output to see it running if (trialN % 10) == 0: print('.',end='') # initialize the agent, environment, and time for this trial agent.reset() # soft-reset() (keeps learned weights) nextState = self.reset() time = 0 while True: time += 1 # set agent senses based on environment and allow agent to determine an action agent.sensoryState = nextState agent.plasticUpdate() # determine effect on environment state & any reward (in standard openAI-gym API format) nextState, reward, goal_achieved, _ = self.step(agent.action) agent.reward = reward if self.puzzlesLeftToComplete == 0 or time == self.max_steps: agent.plasticUpdate() break # could have deadly rewards that stop the trial early #elif reward <= -10: # agent.sensoryState = nextState # agent.reward = reward # agent.plasticUpdate() # agent.reset() # nextState = self.reset() rewards.append(reward) time_to_solve_each_trial.append(time) # Calculate fitness # Rewards are in [-1 .. 1], have to rescale to [0 .. 1] #scalerewards = (np.array(rewards) * 0.5) + 0.5 #w = np.mean(scalerewards) w = sum(rewards) return w, def getObservationSpace(*items) -> Tuple[int]: '''Returns total features dimensions over all puzzles, starting from 0. Given 1 or more puzzles, finds union of observation space (features). then returns the size of that space. Ensures all puzzles have same feature dimensions, errors if not. Useful when setting up a RL state space for certain feature sizes. [3,1] would have dimensions [4,2], and [[0,2],[0,1]] would be [1,3] >>> p1 = Puzzle(tt=[[]], rewards=[], features=[[0,1],[0,1],[3,1]]) >>> getObservationSpace(p1) (4, 2) >>> p2 = Puzzle(tt=[[]], rewards=[], features=[[1,1],[1,1],[2,4]]) >>> getObservationSpace(p2) (3, 5) >>> getObservationSpace(p1,p2) (4, 5) >>> puzzles = [p1,p2] >>> getObservationSpace(puzzles) (4, 5) ''' if type(items) is tuple and isinstance(items[0], Puzzle): # perform union (max) over feature space of all items highest = copy.copy(items[0].features[0]) # features is [[int,int,...],...] featurelen = len(highest) for puzzle in items: for featureset in puzzle.features: if len(featureset) != featurelen: raise Exception("not all features have the same length") for feature_i in range(len(featureset)): highest[feature_i] = max(highest[feature_i],featureset[feature_i]) return tuple((e+1 for e in highest)) # size is 1+highest due to 0-indexing of features elif type(items) is tuple and type(items[0]) in (tuple,list): return getObservationSpace(*items[0]) # unpack one layer else: raise Exception(f"Expected type of Puzzle(s), but got {type(items)}") def getActionSpace(*items) -> Tuple[int]: '''Returns total action dimensions over all puzzles, (num columns in tt). Given 1 or more puzzles. Ensures all puzzles have same dimensions, errors if not. Useful when setting up a RL state space for certain action sizes. >>> p1 = Puzzle(tt=[[0,0],[4,2]], rewards=[], features=[[]]) >>> getActionSpace(p1) (2,) >>> p2 = Puzzle(tt=[[0,0,1],[1,1,2]], rewards=[], features=[[]]) >>> getActionSpace(p2) (3,) >>> getActionSpace(p1,p2) Traceback (most recent call last): ... Exception: not all puzzles (rows) have the same tt col size ''' if type(items) is tuple and isinstance(items[0], Puzzle): # perform union (max) over feature space of all items nrows, ncols = len(items[0].tt), len(items[0].tt[0]) for puzzle in items: prows = len(puzzle.tt) if prows != nrows: raise Exception("not all puzzles have the same tt row size") samerows = [len(c) == ncols for c in puzzle.tt] if not all(samerows): raise Exception("not all puzzles (rows) have the same tt col size") return (ncols,) elif type(items) is tuple and type(items[0]) in (tuple,list): return getActionSpace(*items[0]) # unpack one layer else: raise Exception(f"Expected type of Puzzle(s), but got {type(items)}") def _test_world(): '''full test of the conveyorbelt world >>> import copy >>> maxrewards = [1] >>> easy_features = [[0,1],[0,1],[3,1],[0,0]] >>> easy_rewards = [-1,-1,-1,1] >>> easy_tt = np.array([[0,0,2,3], [0,0,0,0], [2,0,2,3], [3,3,3,3]]) >>> p1 = Puzzle(tt=easy_tt, features=easy_features, rewards=easy_rewards) >>> p2 = copy.deepcopy(p1) >>> puzzles = (p1,p2) >>> world = ConvBelt(actionSpace = getActionSpace(puzzles), observationSpace = getObservationSpace(puzzles), maxRewards = maxrewards, randomize = False) >>> world.append(p1) >>> world.append(p2) >>> # trial 1 >>> world.reset() # reset before first use just to be sure >>> world.step(Action('investigate')) (-1, [3, 1], False) >>> world.step(Action('pass')) (-1, [0, 1], False) >>> world.step(Action('eat')) (1, [0, 0], False) >>> world.step(Action('pass')) (-1, [3, 1], False) >>> world.step(Action('eat')) (1, [0, 0], True) >>> world.step(Action('eat')) # try eating again, notice reward change (-1, [0, 0], True) >>> # trial 2 >>> world.reset() >>> world.step(Action('investigate')) (-1, [3, 1], False) >>> world.step(Action('pass')) (-1, [0, 1], False) >>> world.step(Action('eat')) (1, [0, 0], False) >>> world.step(Action('pass')) (-1, [3, 1], False) >>> world.step(Action('eat')) (1, [0, 0], True) ''' if __name__ == '__main__': '''test important functions and workflows with doctesting run this python file by itself to run these tests, and set LOGGING=True near top of file.''' import doctest from functools import partial test = partial(doctest.run_docstring_examples, globs = globals()) test(getObservationSpace) test(getActionSpace) test(_test_world)