alice/code/curio_evolve_weights.py

342 lines
12 KiB
Python
Executable File

"""
ew.py
Evolve Weights
Uses DEAP to evolve a set of weights with mutation and crossover.
Integration with other code happens via programming by contract.
The 'environ' parameter must be an object that provides two
methods:
get_weights_len : returns a scalar integer indicating the 1D vector length for weights
evaluate : accepts a weight vector, returns a tuple object containing a single fitness value (e.g., (0.5,))
and has an attribute related to reinforcement learning for agents:
alpha
"""
import sys
# allow importing from the 'code/' dir
sys.path.append("../code")
import os
import platform
import pickle
import json
import traceback
import datetime
import copy
import numpy as np, itertools, copy
import matplotlib.pyplot as plt
from collections import defaultdict
import importlib # module reloading
#import environments
#import agents
# always forces a reload in case you have edited environments or agents
#importlib.reload(environments)
#importlib.reload(agents)
#from environments.gridworld import GridWorld
#import environments.puzzle as pz
#from environments.puzzle import Puzzle, ConvBelt, getActionSpace, getObservationSpace
#from agents.q_agent import EvolvableAgent as Agent
# DEAP imports
import random
from deap import creator, base, tools, algorithms
import multiprocessing
#pool = multiprocessing.Pool()
#toolbox.register("map", pool.map)
# Weight handling
#from mda import MultiDimArray
def isotime():
return datetime.datetime.now().isoformat()
def t2fn(timestamp):
timestamp = timestamp.replace('.','_')
timestamp = timestamp.replace(':','_')
return timestamp
class Holder(object):
def __init__(self):
pass
class EvolveWeights(object):
"""
Class to apply DEAP to evolve a population consisting of a set
of weights.
"""
def __init__(self,
# environ, # Instance of environ class
# What is needed from environ?
# weights_len (int)
# alpha (float)
# evaluate (method/function)
weights_len,
alpha=0.05,
evaluate=None,
popsize=100,
maxgenerations=10000,
cxpb=0.5,
mtpb=0.05,
wmin=-20.0,
wmax=20.0,
mut_center=0.0,
mut_sigma=0.1,
mut_indpb=0.05,
tournsize=5,
tournk=2,
normalize_fitness=True,
tag='environ'
):
self.tag = tag
self.starttime = isotime()
self.logbase = tag + "_" + t2fn(self.starttime)
# Excluding environment as a parameter
# self.environ = environ
# Instead, we need to pass in weights_len, alpha, evaluate
self.weights_len = weights_len # environ.get_weights_len()
self.alpha = alpha
self.evaluate = evaluate
self.popsize = popsize
self.maxgenerations = maxgenerations
self.cxpb = cxpb
self.mtpb = mtpb
self.wmin = wmin
self.wmax = wmax
self.mut_center = mut_center
self.mut_sigma = mut_sigma
self.mut_indpb = mut_indpb
self.tournsize = tournsize
self.tournk = tournk
self.normalize_fitness = normalize_fitness
pass
def masv(self, pop):
mav = []
maxs = []
for ind in pop:
wts = [x for x in ind]
mav.append(np.mean(np.abs(wts)))
maxs.append(np.max(np.abs(wts)))
allmax = np.max(maxs)
mymasv = [x/allmax for x in mav]
return mymasv
def cxTwoPointCopy(self, ind1, ind2):
"""Execute a two points crossover with copy on the input individuals. The
copy is required because the slicing in numpy returns a view of the data,
which leads to a self overwriting in the swap operation. It prevents
::
>>> import numpy as np
>>> a = np.array((1,2,3,4))
>>> b = np.array((5,6,7,8))
>>> a[1:3], b[1:3] = b[1:3], a[1:3]
>>> print(a)
[1 6 7 4]
>>> print(b)
[5 6 7 8]
"""
size = len(ind1)
cxpoint1 = random.randint(1, size)
cxpoint2 = random.randint(1, size - 1)
if cxpoint2 >= cxpoint1:
cxpoint2 += 1
else: # Swap the two cx points
cxpoint1, cxpoint2 = cxpoint2, cxpoint1
ind1[cxpoint1:cxpoint2], ind2[cxpoint1:cxpoint2] = ind2[cxpoint1:cxpoint2].copy(), ind1[cxpoint1:cxpoint2].copy()
return ind1, ind2
def zero(self):
return 0.0
def smallrandom(self, eps=None):
"""
Produce a small random number in [-eps .. eps].
A random variate in [-1 .. 1] is produced then
multiplied by eps, so the final range is in [-eps .. eps].
"""
if eps in [None]:
eps = self.alpha
rv = ((2.0 * random.random()) - 1.0) * eps
return rv
def setup(self):
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", np.ndarray, fitness=creator.FitnessMax)
self.toolbox = base.Toolbox()
self.pool = multiprocessing.Pool()
self.toolbox.register("map", self.pool.map)
#toolbox.register("attr_bool", random.randint, 0, 1) # non-numpy non-float version
# self.toolbox.register("attr_float", random.random)
#self.toolbox.register("attr_float", self.zero)
self.toolbox.register("attr_float", self.smallrandom)
self.toolbox.register("individual", tools.initRepeat, creator.Individual, self.toolbox.attr_float, n=self.weights_len)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
# self.toolbox.register("evaluate", self.evaluate)
self.toolbox.register("evaluate", self.evaluate)
#toolbox.register("mate", tools.cxTwoPoint) # non-numpy non-float version
self.toolbox.register("mate", self.cxTwoPointCopy)
#toolbox.register("mutate", tools.mutFlipBit, indpb=0.05) # non-numpy non-float version
self.toolbox.register("mutate", tools.mutGaussian, mu=self.mut_center, sigma=self.mut_sigma, indpb=self.mut_indpb)
self.toolbox.register("select", tools.selTournament, tournsize=self.tournsize, k=self.tournk)
def normalize_fitnesses(self, fitnesses):
#print("fitnesses", ["%3.2f" % x[0] for x in fitnesses])
maxfitness = np.max([x[0] for x in fitnesses])
#print("maxfitness", maxfitness)
listfit = [x[0] for x in fitnesses]
#print("listfit", listfit)
normfit = [x/maxfitness for x in listfit]
#print("normfit", normfit)
fitnesses = [tuple([x]) for x in normfit]
#print("normed fitnesses", ["%3.2f" % x[0] for x in fitnesses])
return fitnesses
def log_it(self, generation):
pool = self.pool
toolbox = self.toolbox
self.pool = None
self.toolbox = None
pklfn = f"{self.logbase}__{generation+1}-{self.maxgenerations}.pkl"
pickle.dump(self, open(pklfn, "wb"))
self.pool = pool
self.toolbox = toolbox
def loop(self):
self.population = self.toolbox.population(n=self.popsize)
#print(self.masv(self.population))
NGEN=self.maxgenerations
for gen in range(NGEN):
print("generation", gen)
offspring = algorithms.varAnd(self.population, self.toolbox, cxpb=self.cxpb, mutpb=self.mtpb)
# print("offspring", offspring)
# constrain genome values to [0,1]
for offspring_i,individual in enumerate(offspring):
np.clip(np.array(offspring[offspring_i]), self.wmin, self.wmax)
# print("clipped offspring", offspring)
# Evaluate the individuals with an invalid fitness (not yet evaluated)
# print("check fitness.valid")
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
# print("invalid_ind", len(invalid_ind))
#print("setting fitness")
fitnesses = self.toolbox.map(self.toolbox.evaluate, invalid_ind)
if self.normalize_fitness:
fitnesses = self.normalize_fitnesses(fitnesses)
"""
#print("fitnesses", ["%3.2f" % x[0] for x in fitnesses])
maxfitness = np.max([x[0] for x in fitnesses])
#print("maxfitness", maxfitness)
listfit = [x[0] for x in fitnesses]
#print("listfit", listfit)
normfit = [x/maxfitness for x in listfit]
#print("normfit", normfit)
fitnesses = [tuple([x]) for x in normfit]
#print("normed fitnesses", ["%3.2f" % x[0] for x in fitnesses])
"""
print("fitnesses", ["%3.2f" % x[0] for x in fitnesses])
self.fitness_dist(fitnesses)
# print("update ind fitness")
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
#print("selection")
#print("offspring\n", self.masv(offspring))
self.offspring = offspring
self.population = self.toolbox.select(offspring, k=len(self.population))
if 0 == gen % 100:
self.log_it(gen)
#print("population after selection\n", self.masv(self.population))
#print("Report for generation", gen)
self.report()
def report(self):
# post-evolution analysis
fitnesses = self.toolbox.map(self.toolbox.evaluate, self.population)
if self.normalize_fitness:
fitnesses = self.normalize_fitnesses(fitnesses)
self.fitnesses = fitnesses
self.sortedFitnesses = sorted(fitnesses)
self.sortedFitnesses.reverse()
self.fitness_dist(fitnesses)
self.bestFitness, self.worstFitness = self.sortedFitnesses[0], self.sortedFitnesses[-1]
print("best/worst w", self.bestFitness, self.worstFitness)
self.bestGenome = tools.selBest(self.population, k=1)
# print(self.bestGenome)
def ffmt(self, value, fmt="%3.2f"):
return fmt % value
def fitness_dist(self, fitnesses):
listfit = [x[0] for x in fitnesses]
pct05, pct25, pct50, pct75, pct95 = np.percentile(listfit, [0.05, 0.25, 0.5, 0.75, 0.95])
print(f"fitness dist: {self.ffmt(np.min(listfit))} {self.ffmt(pct05)} {self.ffmt(pct25)} {self.ffmt(pct50)} {self.ffmt(pct75)} {self.ffmt(pct95)} {self.ffmt(np.max(listfit))}")
def driver(self):
# Initialize
self.setup()
# Generation loop
self.loop()
# Report
self.report()
self.log_it(self.maxgenerations)
print(self.masv(self.population))
self.pool.close()
pass
def normalized(a, axis=-1, order=2):
l2 = np.atleast_1d(np.linalg.norm(a, order, axis))
l2[l2==0] = 1
return a / np.expand_dims(l2, axis)
def normalize(v):
if 0 == len(v):
return np.nan
return v/np.linalg.norm(v)
class MinEnv(object):
def __init__(self, wt_len=12, alpha=0.01, w=0.5):
self.alpha = alpha
self.wt_len = wt_len
self.w = w
def get_weights_len(self):
return self.wt_len
def evaluate(self, wts):
mywts = np.array([float(x) for x in wts])
# Max entropy
return np.std(normalize(mywts))/0.30,
def test_ew():
env1 = MinEnv()
ew = EvolveWeights(env1, popsize=100, maxgenerations=10, tournsize=75, tournk=3, normalize_fitness=False)
ew.driver()
if __name__ == "__main__":
print("ew.py start...")
test_ew()
print("ew.py done.")