495 lines
18 KiB
Python
495 lines
18 KiB
Python
"""
|
|
puzzle.py
|
|
|
|
|
|
"""
|
|
|
|
import numpy as np, itertools
|
|
from random import shuffle
|
|
from typing import List, Tuple, Union, Any
|
|
import copy
|
|
#import gym, gym_gridworlds # if using other environments
|
|
|
|
|
|
# overridden in agent.py, typically due to load order
|
|
LOGGING = True
|
|
|
|
import logging, sys
|
|
logging.basicConfig(stream=sys.stdout,level=logging.INFO)
|
|
log = logging.getLogger()
|
|
|
|
if not LOGGING:
|
|
# remove all logging functionality
|
|
for handler in log.handlers.copy():
|
|
try:
|
|
log.removeHandler(handler)
|
|
except ValueError: # in case another thread has already removed it
|
|
pass
|
|
log.addHandler(logging.NullHandler())
|
|
log.propagate = False
|
|
|
|
class Puzzle:
|
|
|
|
__slots__ = [
|
|
'tt',
|
|
'features',
|
|
'rewards',
|
|
'state',
|
|
'initialState',
|
|
'solved',
|
|
'solvable',
|
|
'maxrewards',
|
|
'originalrewards']
|
|
|
|
|
|
def __init__(self, tt:List[List[int]], features:List[int], rewards:List[float], initialState:int = 0):
|
|
self.tt = tt
|
|
self.features = features
|
|
self.rewards = rewards[:]
|
|
self.originalrewards = rewards
|
|
self.state = 0
|
|
self.initialState = initialState
|
|
self.solved = False
|
|
|
|
|
|
def __str__(self) -> str:
|
|
output = ""
|
|
output += "transition table:\n"
|
|
for row in self.tt:
|
|
output += f" {str(row)}\n"
|
|
output += f"solved: {self.solved}\n"
|
|
output += f"state: {self.state}\n"
|
|
output += f"features: {self.features}\n"
|
|
output += f"rewards: {self.rewards}\n"
|
|
return output
|
|
|
|
|
|
def reset(self):
|
|
'''must be called before first use'''
|
|
self.solved = False
|
|
self.state = self.initialState
|
|
self.rewards = self.originalrewards[:]
|
|
|
|
|
|
def setMaxRewards(self, maxRewards):
|
|
'''typically used by the ConvBelt class before reset()'''
|
|
self.maxrewards = set(self.rewards) & set(maxRewards)
|
|
self.solvable = bool(self.maxrewards)
|
|
|
|
|
|
def transition(self,action:int) -> Tuple[float, List[int], bool]:
|
|
self.state = self.tt[self.state][action]
|
|
finished = False
|
|
reward = self.rewards[self.state]
|
|
if self.rewards[self.state] in self.maxrewards:
|
|
self.rewards[self.state] = -1 # 'eat' the food and replace with empty reward
|
|
finished = True
|
|
self.solved = True
|
|
return (reward, self.features[self.state], finished)
|
|
|
|
def getFeatures(self) -> List[int]:
|
|
'''returns only the current observable features of the puzzle'''
|
|
return self.features[self.state]
|
|
|
|
|
|
def Action(index:Union[int,str]) -> Union[str,int]:
|
|
''' action str <-> int Action('pass')->1 Action(1)->'pass' '''
|
|
if isinstance(index, (int,np.int64)):
|
|
return ('idle','pass','investigate','eat')[index]
|
|
return {'idle':0,'pass':1,'investigate':2,'eat':3}[index]
|
|
|
|
|
|
class ConvBelt:
|
|
"""
|
|
__slots__ = [
|
|
'puzzles', # (list[Puzzle]) - list of puzzles, use append()
|
|
'pi', # (int) - currently selected puzzle / "puzzle index"
|
|
'puzzle', # (ref:Puzzle) - shortcut for self.puzzles[pi]
|
|
'randomize', # (bool) - shuffling of puzzles between trials
|
|
'maxrewards', # (list[float]) - the maximum achievable rewards
|
|
'action_space', # (tuple[int]) - number of actions available to agents, usually (4,)
|
|
'observation_space', # (tuple[int]) - features/dimensions given to agents (dim1 size, dim2 size...)
|
|
'puzzlesLeftToComplete', # (int) - faster tracking of how many are left, when 0 set self.solved
|
|
'solved', # (bool) - state flag for all puzzles solved (trial can be over)
|
|
'agentclass',
|
|
'killed_reward',
|
|
'max_training_trials',
|
|
'max_steps',
|
|
'alpha',
|
|
'gamma',
|
|
'epsilon',
|
|
'lmbda',
|
|
#'get_weights_len',
|
|
#'reset',
|
|
#'extend',
|
|
#'clear',
|
|
]
|
|
"""
|
|
|
|
def __init__(self,actionSpace,observationSpace,maxRewards, agentclass,
|
|
killed_reward=-10.0, max_training_trials=50, max_steps=32,
|
|
alpha=0.01, gamma=0.95, epsilon=0.01, lmbda=0.42, randomize=False):
|
|
'''please provide entire actionSpace, observationSpace, maxRewards for all puzzles
|
|
even those later added this environment'''
|
|
self.puzzles = []
|
|
self.pi = 0
|
|
self.puzzle = None
|
|
self.randomize = randomize
|
|
self.action_space = actionSpace
|
|
self.observation_space = observationSpace
|
|
self.maxrewards = maxRewards
|
|
self.puzzlesLeftToComplete = 0
|
|
self.solved = False
|
|
|
|
self.agentclass = agentclass
|
|
self.killed_reward = killed_reward
|
|
self.max_training_trials = max_training_trials
|
|
self.max_steps = max_steps
|
|
self.alpha = alpha
|
|
self.gamma = gamma
|
|
self.epsilon = epsilon
|
|
self.lmbda = lmbda
|
|
|
|
print(self.get_weights_len())
|
|
|
|
def get_weights_len(self):
|
|
"""
|
|
Return the length of weights needed for an agent.
|
|
"""
|
|
print("in ConvBelt.get_weights_len")
|
|
mywl = np.prod(tuple(self.observation_space) + tuple(self.action_space))
|
|
return mywl
|
|
|
|
def reset(self):
|
|
'''returns an initial observation while also resetting the environment'''
|
|
log.info("resetting all puzzles")
|
|
self.puzzlesLeftToComplete = 0
|
|
for puzzle in self.puzzles:
|
|
puzzle.reset()
|
|
if puzzle.solvable:
|
|
self.puzzlesLeftToComplete += 1
|
|
self.solved = not bool(self.puzzlesLeftToComplete)
|
|
if self.randomize: shuffle(self.puzzles)
|
|
self.pi = 0
|
|
if len(self.puzzles) == 0:
|
|
raise Exception("Please add puzzles to the belt/env first using append() or extend()")
|
|
self.puzzle = self.puzzles[self.pi]
|
|
return self.puzzle.getFeatures()
|
|
|
|
def append(self, newPuzzle:Puzzle):
|
|
log.info("adding new puzzle")
|
|
newPuzzle.setMaxRewards(self.maxrewards)
|
|
newPuzzle.reset()
|
|
if newPuzzle.solvable:
|
|
self.puzzlesLeftToComplete += 1
|
|
self.solved = False
|
|
self.puzzles.append(newPuzzle)
|
|
if self.puzzle is None:
|
|
self.reset()
|
|
|
|
def extend(self, newPuzzles:List[Puzzle]):
|
|
log.info(f"adding {len(newPuzzles)} new puzzles")
|
|
oldLength = len(self.puzzles)
|
|
self.puzzles.extend(newPuzzles)
|
|
newLength = len(self.puzzles)
|
|
for puzzle_i in range(oldLength, newLength):
|
|
puzzle = self.puzzles[puzzle_i]
|
|
puzzle.setMaxRewards(self.maxRewards)
|
|
puzzle.reset()
|
|
if puzzle.solvable:
|
|
self.puzzlesLeftToComplete += 1
|
|
self.solved = False
|
|
if self.puzzle is None:
|
|
self.reset()
|
|
|
|
def _post_removal(self):
|
|
if len(self.puzzles) == 0:
|
|
self.puzzle = None
|
|
log.info("puzzles list now empty")
|
|
if self.pi >= len(self.puzzles)-1:
|
|
self.pi = 0
|
|
log.info("resetting index to 0")
|
|
|
|
def clear(self):
|
|
'''clears the belt of puzzles'''
|
|
self.puzzles.clear()
|
|
log.info("removed ALL puzzles")
|
|
self.puzzlesLeftToComplete = 0
|
|
self._post_removal()
|
|
|
|
def remove(self, puzzle):
|
|
'''removes puzzle from belt of puzzles'''
|
|
if puzzle.solvable:
|
|
self.puzzlesLeftToComplete -= 1
|
|
self.puzzles.remove(puzzle)
|
|
log.info("removed puzzle")
|
|
self._post_removal()
|
|
|
|
def pop(self, index=None):
|
|
'''removes puzzle at index or from end'''
|
|
if index is None:
|
|
index = -1
|
|
puzzle = self.puzzles.pop(index)
|
|
if puzzle.solvable:
|
|
self.puzzlesLeftToComplete -= 1
|
|
log.info(f"popped puzzle at index {index}")
|
|
self._post_removal()
|
|
|
|
def _completed_a_puzzle(self):
|
|
self.puzzlesLeftToComplete -= 1
|
|
log.info(f"completed a puzzle - {self.puzzlesLeftToComplete} solvable puzzles remain")
|
|
if self.puzzlesLeftToComplete == 0:
|
|
self.solved = True
|
|
log.info(f"all puzzles completed - trial complete")
|
|
|
|
def step(self, action:int) -> Tuple[List[int], float, bool, Any]: # returns (state,reward,goal,_) (gym format)
|
|
if action == 1: # pass (change to next puzzle, and change no puzzle's state)
|
|
self.pi = (self.pi + 1) % len(self.puzzles)
|
|
# reports states of old and new puzzles instead of a transition
|
|
log.info(f"(puzzle-step) action {action} ({Action(action)}) from old puzzle state {self.puzzle.state} to new puzzle state {self.puzzles[self.pi].state}")
|
|
self.puzzle = self.puzzles[self.pi]
|
|
return (self.puzzle.features[self.puzzle.state], # features
|
|
-1, # reward of a pass
|
|
#self.puzzle.rewards[self.puzzle.state], # reward
|
|
self.solved, # done-flag
|
|
None) # DebugVisInfo
|
|
else:
|
|
log.info(f"(puzzle-step) action {action} ({Action(action)}) from state {self.puzzle.state} to {self.puzzle.tt[self.puzzle.state][action]}")
|
|
reward, features, puzzle_just_finished = self.puzzle.transition(action)
|
|
if puzzle_just_finished:
|
|
self._completed_a_puzzle()
|
|
return (features, reward, self.solved, None)
|
|
|
|
def render(self, env, brain):
|
|
# renders a puzzlebox environment
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
actions = []
|
|
rewards = []
|
|
states = []
|
|
brain.reset() # Warning!!: NOT MABE-reset(), but soft-reset() (keep weights)
|
|
nextState = env.reset()
|
|
states.append(nextState)
|
|
actions.append(0) # path is recording actions in this visualization
|
|
rewards.append(-1)
|
|
time = 0
|
|
print(env.puzzlesLeftToComplete)
|
|
while True:
|
|
time += 1
|
|
brain.sensoryState = nextState # SET INPUTS
|
|
brain.plasticUpdate()
|
|
nextState, reward, goal_achieved, _ = env.step(brain.action) # GET OUTPUTS
|
|
actions.append(brain.action)
|
|
rewards.append(reward)
|
|
states.append(nextState)
|
|
if env.puzzlesLeftToComplete == 0 or time == 600: break
|
|
#if goal_achieved or time == 100: break
|
|
brain.reward = reward
|
|
print(actions)
|
|
print(states)
|
|
plt.figure()
|
|
plt.plot(actions)
|
|
plt.scatter(list(range(len(actions))),actions)
|
|
plt.figure()
|
|
plt.plot(rewards)
|
|
plt.scatter(list(range(len(rewards))),rewards)
|
|
|
|
def evaluate(self, ind,
|
|
num_trials=200,
|
|
n_actions=4,
|
|
HARD_TIME_LIMIT=600):
|
|
"""
|
|
Given an individual agent's weights, evaluate it and
|
|
return its fitness.
|
|
"""
|
|
w = 0.0
|
|
|
|
# Need to refactor the following code taken from the
|
|
# Jupyter notebook.
|
|
|
|
# domain-specific settings
|
|
#num_trials=200
|
|
#n_actions = 4
|
|
#(optimal lmbda in the agent is domain dependent - could be evolved)
|
|
#HARD_TIME_LIMIT = 600
|
|
#KILLED_REWARD = -10 # not used here
|
|
#(standard reward) = -1.0 (means agent is potentially wasting time - set internal to agent code)
|
|
#(goal reward) = 1.0 (means the agent achieved something good - set internal to agent code)
|
|
|
|
# alpha # how much to weigh reward surprises that deviate from expectation
|
|
# gamma # how important exepcted rewards will be
|
|
# epsilon # fraction of exploration to exploitation (how often to choose a random action)
|
|
# lmbda # how slowly memory of preceeding actions fades away (1=never, 0=
|
|
|
|
agent = self.agentclass(obsSpace=self.observation_space, actSpace=self.action_space, alpha=self.alpha,
|
|
gamma=self.gamma, epsilon=self.epsilon, lmbda=self.lmbda)
|
|
|
|
|
|
# Put weights in the Agent
|
|
agent.weights = [x for x in ind]
|
|
|
|
time_to_solve_each_trial = []
|
|
rewards = []
|
|
|
|
for trialN in range(self.max_training_trials):
|
|
# some output to see it running
|
|
if (trialN % 10) == 0: print('.',end='')
|
|
# initialize the agent, environment, and time for this trial
|
|
agent.reset() # soft-reset() (keeps learned weights)
|
|
nextState = self.reset()
|
|
time = 0
|
|
while True:
|
|
time += 1
|
|
# set agent senses based on environment and allow agent to determine an action
|
|
agent.sensoryState = nextState
|
|
agent.plasticUpdate()
|
|
# determine effect on environment state & any reward (in standard openAI-gym API format)
|
|
nextState, reward, goal_achieved, _ = self.step(agent.action)
|
|
agent.reward = reward
|
|
if self.puzzlesLeftToComplete == 0 or time == self.max_steps:
|
|
agent.plasticUpdate()
|
|
break
|
|
# could have deadly rewards that stop the trial early
|
|
#elif reward <= -10:
|
|
# agent.sensoryState = nextState
|
|
# agent.reward = reward
|
|
# agent.plasticUpdate()
|
|
# agent.reset()
|
|
# nextState = self.reset()
|
|
rewards.append(reward)
|
|
time_to_solve_each_trial.append(time)
|
|
|
|
# Calculate fitness
|
|
# Rewards are in [-1 .. 1], have to rescale to [0 .. 1]
|
|
#scalerewards = (np.array(rewards) * 0.5) + 0.5
|
|
#w = np.mean(scalerewards)
|
|
w = sum(rewards)
|
|
|
|
return w,
|
|
|
|
|
|
def getObservationSpace(*items) -> Tuple[int]:
|
|
'''Returns total features dimensions over all puzzles, starting from 0.
|
|
Given 1 or more puzzles, finds union of observation space (features).
|
|
then returns the size of that space.
|
|
Ensures all puzzles have same feature dimensions, errors if not.
|
|
Useful when setting up a RL state space for certain feature sizes.
|
|
[3,1] would have dimensions [4,2], and [[0,2],[0,1]] would be [1,3]
|
|
|
|
>>> p1 = Puzzle(tt=[[]], rewards=[], features=[[0,1],[0,1],[3,1]])
|
|
>>> getObservationSpace(p1)
|
|
(4, 2)
|
|
>>> p2 = Puzzle(tt=[[]], rewards=[], features=[[1,1],[1,1],[2,4]])
|
|
>>> getObservationSpace(p2)
|
|
(3, 5)
|
|
>>> getObservationSpace(p1,p2)
|
|
(4, 5)
|
|
>>> puzzles = [p1,p2]
|
|
>>> getObservationSpace(puzzles)
|
|
(4, 5)
|
|
'''
|
|
if type(items) is tuple and isinstance(items[0], Puzzle):
|
|
# perform union (max) over feature space of all items
|
|
highest = copy.copy(items[0].features[0]) # features is [[int,int,...],...]
|
|
featurelen = len(highest)
|
|
for puzzle in items:
|
|
for featureset in puzzle.features:
|
|
if len(featureset) != featurelen:
|
|
raise Exception("not all features have the same length")
|
|
for feature_i in range(len(featureset)):
|
|
highest[feature_i] = max(highest[feature_i],featureset[feature_i])
|
|
return tuple((e+1 for e in highest)) # size is 1+highest due to 0-indexing of features
|
|
elif type(items) is tuple and type(items[0]) in (tuple,list):
|
|
return getObservationSpace(*items[0]) # unpack one layer
|
|
else:
|
|
raise Exception(f"Expected type of Puzzle(s), but got {type(items)}")
|
|
|
|
|
|
def getActionSpace(*items) -> Tuple[int]:
|
|
'''Returns total action dimensions over all puzzles, (num columns in tt).
|
|
Given 1 or more puzzles.
|
|
Ensures all puzzles have same dimensions, errors if not.
|
|
Useful when setting up a RL state space for certain action sizes.
|
|
|
|
>>> p1 = Puzzle(tt=[[0,0],[4,2]], rewards=[], features=[[]])
|
|
>>> getActionSpace(p1)
|
|
(2,)
|
|
>>> p2 = Puzzle(tt=[[0,0,1],[1,1,2]], rewards=[], features=[[]])
|
|
>>> getActionSpace(p2)
|
|
(3,)
|
|
>>> getActionSpace(p1,p2)
|
|
Traceback (most recent call last):
|
|
...
|
|
Exception: not all puzzles (rows) have the same tt col size
|
|
'''
|
|
|
|
if type(items) is tuple and isinstance(items[0], Puzzle):
|
|
# perform union (max) over feature space of all items
|
|
nrows, ncols = len(items[0].tt), len(items[0].tt[0])
|
|
for puzzle in items:
|
|
prows = len(puzzle.tt)
|
|
if prows != nrows:
|
|
raise Exception("not all puzzles have the same tt row size")
|
|
samerows = [len(c) == ncols for c in puzzle.tt]
|
|
if not all(samerows):
|
|
raise Exception("not all puzzles (rows) have the same tt col size")
|
|
return (ncols,)
|
|
elif type(items) is tuple and type(items[0]) in (tuple,list):
|
|
return getActionSpace(*items[0]) # unpack one layer
|
|
else:
|
|
raise Exception(f"Expected type of Puzzle(s), but got {type(items)}")
|
|
|
|
|
|
def _test_world():
|
|
'''full test of the conveyorbelt world
|
|
|
|
>>> import copy
|
|
>>> maxrewards = [1]
|
|
>>> easy_features = [[0,1],[0,1],[3,1],[0,0]]
|
|
>>> easy_rewards = [-1,-1,-1,1]
|
|
>>> easy_tt = np.array([[0,0,2,3], [0,0,0,0], [2,0,2,3], [3,3,3,3]])
|
|
>>> p1 = Puzzle(tt=easy_tt, features=easy_features, rewards=easy_rewards)
|
|
>>> p2 = copy.deepcopy(p1)
|
|
>>> puzzles = (p1,p2)
|
|
>>> world = ConvBelt(actionSpace = getActionSpace(puzzles), observationSpace = getObservationSpace(puzzles), maxRewards = maxrewards, randomize = False)
|
|
>>> world.append(p1)
|
|
>>> world.append(p2)
|
|
>>> # trial 1
|
|
>>> world.reset() # reset before first use just to be sure
|
|
>>> world.step(Action('investigate'))
|
|
(-1, [3, 1], False)
|
|
>>> world.step(Action('pass'))
|
|
(-1, [0, 1], False)
|
|
>>> world.step(Action('eat'))
|
|
(1, [0, 0], False)
|
|
>>> world.step(Action('pass'))
|
|
(-1, [3, 1], False)
|
|
>>> world.step(Action('eat'))
|
|
(1, [0, 0], True)
|
|
>>> world.step(Action('eat')) # try eating again, notice reward change
|
|
(-1, [0, 0], True)
|
|
>>> # trial 2
|
|
>>> world.reset()
|
|
>>> world.step(Action('investigate'))
|
|
(-1, [3, 1], False)
|
|
>>> world.step(Action('pass'))
|
|
(-1, [0, 1], False)
|
|
>>> world.step(Action('eat'))
|
|
(1, [0, 0], False)
|
|
>>> world.step(Action('pass'))
|
|
(-1, [3, 1], False)
|
|
>>> world.step(Action('eat'))
|
|
(1, [0, 0], True)
|
|
'''
|
|
|
|
if __name__ == '__main__':
|
|
'''test important functions and workflows with doctesting
|
|
run this python file by itself to run these tests, and set
|
|
LOGGING=True near top of file.'''
|
|
import doctest
|
|
from functools import partial
|
|
test = partial(doctest.run_docstring_examples, globs = globals())
|
|
test(getObservationSpace)
|
|
test(getActionSpace)
|
|
test(_test_world)
|