"""multigwe.py -- Multi GridWorlds Evolving Bringing together an Agent acting in one of multiple GridWorlds with DEAP evolutionary computation. Notion: Set up for being able to call an Agent with a provided set of weights and run their training in one of multiple Gridworld environments. DEAP keeps a population of weights and handles the evolutionary computation. Save the best instantiated Agent per each generation for later review and analysis. """ import sys # allow importing from the 'code/' dir sys.path.append("../code") import os import platform import pickle import json import traceback import datetime import numpy as np, itertools, copy import matplotlib.pyplot as plt from collections import defaultdict import importlib # module reloading import environments import agents # always forces a reload in case you have edited environments or agents importlib.reload(environments) importlib.reload(agents) from environments.gridworld import GridWorld from agents.q_agent import EvolvableAgent as Agent # DEAP imports import random from deap import creator, base, tools, algorithms import multiprocessing #pool = multiprocessing.Pool() #toolbox.register("map", pool.map) # Weight handling from mda import MultiDimArray def isotime(): return datetime.datetime.now().isoformat() def t2fn(timestamp): timestamp = timestamp.replace('.','_') timestamp = timestamp.replace(':','_') return timestamp class Holder(object): def __init__(self): pass class GoalsAndHolesWorld(object): """ Class for making and using a 2D GridWorld based on setting goals and holes (hazards) for an RL Agent to explore. Modifications for multiple maps... Need a 'maps' array """ def __init__(self, obsSpace, actSpace, goals, holes, startstate, agentclass, killed_reward=-10.0, max_training_trials=50, max_steps=32, alpha=0.005, gamma=0.95, epsilon=0.01, lmbda=0.42 ): self.maps = [] mymap = Holder() self.add_map(obsSpace, actSpace, goals, holes, startstate) # Instance now has the initial map in place self.agentclass = agentclass self.killed_reward = killed_reward self.max_training_trials = max_training_trials self.max_steps = max_steps self.alpha = alpha self.gamma = gamma self.epsilon = epsilon self.lmbda = lmbda print("Goals from initial env", self.maps[0].env.goals) pass def get_weights_len(self): mywl = np.prod(tuple(self.maps[0].obsSpace) + tuple(self.maps[0].actSpace)) return mywl def add_map(self, obsSpace, actSpace, goals, holes, startstate): mymap = Holder() mymap.obsSpace = tuple(obsSpace) mymap.actSpace = tuple(actSpace) mymap.goals = list(goals) mymap.holes = tuple(holes) mymap.startState = tuple(startstate) mymap.env = self.make_env(mymap.startState, mymap.obsSpace, mymap.goals, mymap.holes) self.maps.append(mymap) def make_env(self, startstate=None, dims=None, goals=None, holes=None): # Default: the first map in the list. if startstate in [None] and 0 < len(self.maps): startstate = self.maps[0].startState if dims in [None] and 0 < len(self.maps): dims = self.maps[0].obsSpace if goals in [None] and 0 < len(self.maps): goals = list(self.maps[0].goals) if holes in [None] and 0 < len(self.maps): holes = self.maps[0].holes print(startstate, dims, goals, holes) myenv = GridWorld(dims = dims, startState = startstate) myenv.goals.append(goals) for ii in range(holes[0][0], holes[0][1]+1): for jj in range(holes[1][0], holes[1][1]+1): print("adding hole at ", ii, jj) myenv.holes.append([ii,jj]) return myenv def run_trial(self, agent, env=None): if env in [None]: # Choose an environment """ if 1 == len(self.maps): mymap = self.maps[0] else: mymap = random.choice(self.maps) """ mymap = self.choose_map() env = mymap.env agent.reset() # soft-reset() (keeps learned weights) nextState = env.reset() lastState = nextState runtime = 0 while True: runtime += 1 status = 'alive' # set agent senses based on environment and allow agent to determine an action agent.sensoryState = nextState agent.plasticUpdate() # determine effect on environment state & any reward (in standard openAI-gym API format) nextState, reward, goal_achieved, _ = env.step(agent.action) #if (tuple(lastState) == tuple(self.env.goals)) or (tuple(nextState) == tuple(self.env.goals)): # print(agent.action, lastState, reward, goal_achieved, nextState) lastState = nextState agent.reward = reward if goal_achieved or (runtime >= self.max_steps): break # stop trial if agent explitly failed early elif reward <= self.killed_reward: agent.sensoryState = nextState agent.reward = reward agent.plasticUpdate() # allow 1 more update to 'learn' the bad reward agent.reset() nextState = env.reset() status = 'killed' runtime = self.max_steps break # print(time, agent.action, agent.reward, status) #print(" runtime", runtime) #if goal_achieved: # print(" Goal Achieved!!!") return agent, runtime def choose_map(self, map_index=None): """ If map_index in [0..len(self.maps)], return that one. Else return one randomly. """ # print("self.maps", self.maps) if map_index in [None]: # Random choice of map from alternatives if 1 == len(self.maps): # There can only be one mymap = self.maps[0] else: # Choose one of them mymap = random.choice(self.maps) elif 0 <= map_index and map_index < len(self.maps): mymap = self.maps[map_index] else: mymap = random.choice(self.maps) return mymap def evaluate(self, ind, return_agent=False): """ """ latest = 20 # Pull weights from ind # Choose an environment """ if 1 == len(self.maps): mymap = self.maps[0] else: mymap = random.choice(self.maps) """ # New way mymap = self.choose_map() myenv = mymap.env # Instantiate an Agent myagent = Agent(obsSpace=mymap.obsSpace, actSpace=mymap.actSpace, alpha=self.alpha, gamma=self.gamma, epsilon=self.epsilon, lmbda=self.lmbda) # Should consider one round of single trial to get the performance due to # inheritance, then proceed with full trials to 'develop' the agent, # and get its trained performance. # Put weights in the Agent myagent.weights = [x for x in ind] #print(" myagent.weights", myagent.weights) # run_trial calls time_to_solve_each_trial = [] # lower is better for trialN in range(self.max_training_trials): # some output to see it running # if (trialN % 10) == 0: print('.',end='') myagent, runtime = self.run_trial(myagent, env=myenv) # record trial results time_to_solve_each_trial.append(runtime) #print(" tts", time_to_solve_each_trial) # calculate fitness # Fitness is 1 - (avg. tts / max. time) # w = max(0.0, 1.0 - (np.mean(time_to_solve_each_trial) / self.max_steps)) ltts = len(time_to_solve_each_trial) latest = ltts // 2 # Latter half of steps #w = max(0.0, 1.0 - (np.mean(time_to_solve_each_trial[-latest:]) / self.max_steps)) # First half of steps w = max(0.0, 1.0 - (np.mean(time_to_solve_each_trial[:-latest]) / self.max_steps)) # return the fitness #print(" fitness", "%3.2f" % w) #print(" myagent.weights after", myagent.weights) if return_agent: return myagent, w, time_to_solve_each_trial else: return w, def multi_evaluate(self, ind, return_agent=False): """ Like 'evaluate', but when multiple maps exist, evaluate per each map, collect performance, and return fitness as the mean performance across all maps. """ latest = 20 # Pull weights from ind # Info across all maps/environments time_to_solve_each_trial = [] # lower is better for mymap in self.maps: myenv = mymap.env # Instantiate an Agent myagent = Agent(obsSpace=mymap.obsSpace, actSpace=mymap.actSpace, alpha=self.alpha, gamma=self.gamma, epsilon=self.epsilon, lmbda=self.lmbda) # Put weights in the Agent myagent.weights = [x for x in ind] #print(" myagent.weights", myagent.weights) # run_trial calls for trialN in range(self.max_training_trials): # some output to see it running # if (trialN % 10) == 0: print('.',end='') myagent, runtime = self.run_trial(myagent, env=myenv) # record trial results time_to_solve_each_trial.append(runtime) # calculate fitness # Fitness is 1 - (avg. tts / max. time) w = max(0.0, 1.0 - (np.mean(time_to_solve_each_trial) / self.max_steps)) # return the fitness if return_agent: return myagent, w, time_to_solve_each_trial else: return w, class MaxAve(object): def __init__(self, alpha=0.1): self.alpha = alpha pass def get_weights_len(self, wl=100): return wl def evaluate(self, ind): npwts = np.array([x for x in ind]) wtmax = np.max(np.abs(npwts)) wtmean = np.mean(np.abs(npwts)) if 0.0 != wtmax: w = wtmean / wtmax else: w = 0.0 return w, class EvolveWeights(object): """ Class to apply DEAP to evolve a population consisting of a set of weights. """ def __init__(self, gahw, popsize=100, maxgenerations=10000, cxpb=0.5, mtpb=0.05, wmin=-20.0, wmax=20.0, mut_center=0.0, mut_sigma=0.1, mut_indpb=0.05, tournsize=5, tournk=2, normalize_fitness=True, tag='gahw' ): self.tag = tag self.starttime = isotime() self.logbase = tag + "_" + t2fn(self.starttime) self.gahw = gahw self.weights_len = gahw.get_weights_len() self.popsize = popsize self.maxgenerations = maxgenerations self.cxpb = cxpb self.mtpb = mtpb self.wmin = wmin self.wmax = wmax self.mut_center = mut_center self.mut_sigma = mut_sigma self.mut_indpb = mut_indpb self.tournsize = tournsize self.tournk = tournk self.normalize_fitness = normalize_fitness pass def masv(self, pop): mav = [] maxs = [] for ind in pop: wts = [x for x in ind] mav.append(np.mean(np.abs(wts))) maxs.append(np.max(np.abs(wts))) allmax = np.max(maxs) mymasv = [x/allmax for x in mav] return mymasv def cxTwoPointCopy(self, ind1, ind2): """Execute a two points crossover with copy on the input individuals. The copy is required because the slicing in numpy returns a view of the data, which leads to a self overwriting in the swap operation. It prevents :: >>> import numpy as np >>> a = np.array((1,2,3,4)) >>> b = np.array((5,6,7,8)) >>> a[1:3], b[1:3] = b[1:3], a[1:3] >>> print(a) [1 6 7 4] >>> print(b) [5 6 7 8] """ size = len(ind1) cxpoint1 = random.randint(1, size) cxpoint2 = random.randint(1, size - 1) if cxpoint2 >= cxpoint1: cxpoint2 += 1 else: # Swap the two cx points cxpoint1, cxpoint2 = cxpoint2, cxpoint1 ind1[cxpoint1:cxpoint2], ind2[cxpoint1:cxpoint2] = ind2[cxpoint1:cxpoint2].copy(), ind1[cxpoint1:cxpoint2].copy() return ind1, ind2 def zero(self): return 0.0 def smallrandom(self, eps=None): """ Produce a small random number in [-eps .. eps]. A random variate in [-1 .. 1] is produced then multiplied by eps, so the final range is in [-eps .. eps]. """ if eps in [None]: eps = self.gahw.alpha rv = ((2.0 * random.random()) - 1.0) * eps return rv def setup(self): creator.create("FitnessMax", base.Fitness, weights=(1.0,)) creator.create("Individual", np.ndarray, fitness=creator.FitnessMax) self.toolbox = base.Toolbox() self.pool = multiprocessing.Pool() self.toolbox.register("map", self.pool.map) #toolbox.register("attr_bool", random.randint, 0, 1) # non-numpy non-float version # self.toolbox.register("attr_float", random.random) #self.toolbox.register("attr_float", self.zero) self.toolbox.register("attr_float", self.smallrandom) self.toolbox.register("individual", tools.initRepeat, creator.Individual, self.toolbox.attr_float, n=self.weights_len) self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) # self.toolbox.register("evaluate", self.gahw.evaluate) self.toolbox.register("evaluate", self.gahw.multi_evaluate) #toolbox.register("mate", tools.cxTwoPoint) # non-numpy non-float version self.toolbox.register("mate", self.cxTwoPointCopy) #toolbox.register("mutate", tools.mutFlipBit, indpb=0.05) # non-numpy non-float version self.toolbox.register("mutate", tools.mutGaussian, mu=self.mut_center, sigma=self.mut_sigma, indpb=self.mut_indpb) self.toolbox.register("select", tools.selTournament, tournsize=self.tournsize, k=self.tournk) def normalize_fitnesses(self, fitnesses): #print("fitnesses", ["%3.2f" % x[0] for x in fitnesses]) maxfitness = np.max([x[0] for x in fitnesses]) #print("maxfitness", maxfitness) listfit = [x[0] for x in fitnesses] #print("listfit", listfit) normfit = [x/maxfitness for x in listfit] #print("normfit", normfit) fitnesses = [tuple([x]) for x in normfit] #print("normed fitnesses", ["%3.2f" % x[0] for x in fitnesses]) return fitnesses def log_it(self, generation): pool = self.pool toolbox = self.toolbox self.pool = None self.toolbox = None pklfn = f"{self.logbase}__{generation+1}-{self.maxgenerations}.pkl" pickle.dump(self, open(pklfn, "wb")) self.pool = pool self.toolbox = toolbox def loop(self): self.population = self.toolbox.population(n=self.popsize) #print(self.masv(self.population)) NGEN=self.maxgenerations for gen in range(NGEN): print("generation", gen) offspring = algorithms.varAnd(self.population, self.toolbox, cxpb=self.cxpb, mutpb=self.mtpb) # print("offspring", offspring) # constrain genome values to [0,1] for offspring_i,individual in enumerate(offspring): np.clip(np.array(offspring[offspring_i]), self.wmin, self.wmax) # print("clipped offspring", offspring) # Evaluate the individuals with an invalid fitness (not yet evaluated) # print("check fitness.valid") invalid_ind = [ind for ind in offspring if not ind.fitness.valid] # print("invalid_ind", len(invalid_ind)) #print("setting fitness") fitnesses = self.toolbox.map(self.toolbox.evaluate, invalid_ind) if self.normalize_fitness: fitnesses = self.normalize_fitnesses(fitnesses) """ #print("fitnesses", ["%3.2f" % x[0] for x in fitnesses]) maxfitness = np.max([x[0] for x in fitnesses]) #print("maxfitness", maxfitness) listfit = [x[0] for x in fitnesses] #print("listfit", listfit) normfit = [x/maxfitness for x in listfit] #print("normfit", normfit) fitnesses = [tuple([x]) for x in normfit] #print("normed fitnesses", ["%3.2f" % x[0] for x in fitnesses]) """ # print("fitnesses", ["%3.2f" % x[0] for x in fitnesses]) self.fitness_dist(fitnesses) # print("update ind fitness") for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = fit #print("selection") #print("offspring\n", self.masv(offspring)) self.offspring = offspring self.population = self.toolbox.select(offspring, k=len(self.population)) if 0 == gen % 100: self.log_it(gen) #print("population after selection\n", self.masv(self.population)) #print("Report for generation", gen) self.report() def report(self): # post-evolution analysis fitnesses = self.toolbox.map(self.toolbox.evaluate, self.population) if self.normalize_fitness: fitnesses = self.normalize_fitnesses(fitnesses) self.fitnesses = fitnesses self.sortedFitnesses = sorted(fitnesses) self.sortedFitnesses.reverse() self.fitness_dist(fitnesses) self.bestFitness, self.worstFitness = self.sortedFitnesses[0], self.sortedFitnesses[-1] print("best/worst w", self.bestFitness, self.worstFitness) self.bestGenome = tools.selBest(self.population, k=1) # print(self.bestGenome) def ffmt(self, value, fmt="%3.2f"): return fmt % value def fitness_dist(self, fitnesses): listfit = [x[0] for x in fitnesses] pct05, pct25, pct50, pct75, pct95 = np.percentile(listfit, [0.05, 0.25, 0.5, 0.75, 0.95]) print(f"fitness dist: {self.ffmt(np.min(listfit))} {self.ffmt(pct05)} {self.ffmt(pct25)} {self.ffmt(pct50)} {self.ffmt(pct75)} {self.ffmt(pct95)} {self.ffmt(np.max(listfit))}") def driver(self): # Initialize self.setup() # Generation loop self.loop() # Report self.report() self.log_it(self.maxgenerations) print(self.masv(self.population)) pass def holes_block_direct_route(): # GridWorld as in 'gridworld.ipynb' gahw = GoalsAndHolesWorld((4,12), (4,), (3,11), [[3,3],[1,10]], (3,0), Agent, max_steps=200) ew = EvolveWeights(gahw, popsize=100, maxgenerations=10000, tournsize=75, tournk=3, normalize_fitness=False) ew.driver() def holes_block_direct_route_two_goals(): # GridWorld as in 'gridworld.ipynb' gahw = GoalsAndHolesWorld((4,13), (4,), (3,12), [[3,3],[1,11]], (2,6), Agent, max_steps=200) gahw.add_map((4,13), (4,), (3,0), [[3,3],[1,11]], (2,6)) ew = EvolveWeights(gahw, popsize=100, maxgenerations=100, tournsize=75, tournk=3, normalize_fitness=False) ew.driver() def holes_block_direct_route_two_goals_left(): # GridWorld as in 'gridworld.ipynb' gahw = GoalsAndHolesWorld((4,13), (4,), (3,0), [[3,3],[1,11]], (2,6), Agent, max_steps=200) gahw.add_map((4,13), (4,), (3,0), [[3,3],[1,11]], (2,6)) ew = EvolveWeights(gahw, popsize=100, maxgenerations=100, tournsize=75, tournk=3, normalize_fitness=False) ew.driver() def holes_block_direct_route_two_goals_right(): # GridWorld as in 'gridworld.ipynb' gahw = GoalsAndHolesWorld((4,13), (4,), (3,12), [[3,3],[1,11]], (2,6), Agent, max_steps=200) gahw.add_map((4,13), (4,), (3,12), [[3,3],[1,11]], (2,6)) ew = EvolveWeights(gahw, popsize=100, maxgenerations=100, tournsize=75, tournk=3, normalize_fitness=False) ew.driver() def maxave(): ma = MaxAve() ew = EvolveWeights(ma, popsize = 100, maxgenerations=500) ew.driver() if __name__ == "__main__": #holes_block_direct_route() print("Two different goals") holes_block_direct_route_two_goals() print("Two environments, both have goal on left.") holes_block_direct_route_two_goals_left() print("Two environments, both have goal on right.") holes_block_direct_route_two_goals_right() # maxave() pass