""" q_agent.py This submodule contains the Agent class, which implements a Q-learning agent with eligibility traces (TD-lambda). The agent learns to make decisions based on its sensory state and rewards received from the environment. The agent uses an epsilon-greedy action-selection strategy. Usage: import q_agent Class: Agent(obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96) Attributes: obsSpace (tuple): The shape of the observation space. actSpace (tuple): The shape of the action space. ftrSpace (tuple): The shape of the feature space. n_features (int): The total number of features. n_actions (int): The total number of actions. weights (numpy.ndarray): The Q-function weights. trace (numpy.ndarray): The eligibility trace for each feature. featureToIndexMap (numpy.ndarray): A mapping from feature indices to the corresponding weights. allActions (list): A list of all possible actions. alpha (float): The learning rate for updating weights. gamma (float): The discount factor for future rewards. epsilon (float): The exploration rate for epsilon-greedy action selection. lmbda (float): The decay factor for eligibility traces. sensoryState (numpy.ndarray): The current sensory state of the agent. previousSensoryState (numpy.ndarray): The previous sensory state of the agent. action (int): The current action taken by the agent. previousAction (int): The previous action taken by the agent. episoden (int): The episode number the agent is in. recentReset (bool): Indicates if the agent was recently reset. Methods: reset(): Resets the agent's traces, sensory states, and actions. predictPayoffsForAllActions() -> List[float]: Predicts the expected payoffs for all possible actions given the current sensory state. plasticUpdate(): Updates the agent's Q-function weights and eligibility traces based on the current sensory state, action, and received reward. Uses epsilon-greedy action selection. staticUpdate(): Updates the agent's action based on the current sensory state without updating weights or traces. Uses greedy action selection. Examples: >>> from q_agent import Agent >>> obsSpace, actSpace = (2, 2), (3,) >>> agent = Agent(obsSpace=obsSpace, actSpace=actSpace) """ import traceback import numpy as np from collections import defaultdict from typing import List, Tuple, Union from deap import creator, base, tools, algorithms LOGGING = False import logging, sys logging.basicConfig(stream=sys.stdout,level=logging.INFO) log = logging.getLogger() if not LOGGING: # remove all logging functionality for handler in log.handlers.copy(): try: log.removeHandler(handler) except ValueError: # in case another thread has already removed it pass log.addHandler(logging.NullHandler()) log.propagate = False # The Agent class, similar to what # is used in MABE. Note: this is unlike # how standard RLML folks structure these # algorithms. Here, we separate out concerns # for modularity. A side-effect is that the # update() (one cognitive step) receives the reward # for the previous update-action. This means 1 extra # update must be called if terminating. class Agent(): def __init__(i, obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96): i.obsSpace = np.array(obsSpace) i.actSpace = np.array(actSpace) i.ftrSpace = tuple(obsSpace)+tuple(actSpace) i.n_features = np.prod(i.ftrSpace) i.n_actions = actSpace[0] # not general i.weights = np.zeros(i.n_features) i.trace = np.zeros(i.n_features) i.featureToIndexMap = np.arange(i.n_features).reshape(i.ftrSpace) i.allActions = list(range(i.n_actions)) # new i.alpha = alpha # how much to weigh reward surprises that deviate from expectation i.gamma = gamma # how important exepcted rewards will be i.epsilon = epsilon # fraction of exploration to exploitation (how often to choose a random action) i.lmbda = lmbda # how important preceeding actions are in learning adaptation i.sensoryState = np.zeros(len(i.obsSpace),dtype=np.int32) i.previousSensoryState = np.zeros(len(i.obsSpace),dtype=np.int32) i.action = 0 i.previousAction = 0 i.episoden = 0 i.recentReset = True def reset(i): # only resets traces log.info("resetting agent") i.trace = np.zeros(i.n_features) i.sensoryState = np.zeros(len(i.obsSpace),dtype=np.int32) i.previousSensoryState = np.zeros(len(i.obsSpace),dtype=np.int32) i.action = 0 i.previousAction = 0 i.reward = -1 i.recentReset = True def predictPayoffsForAllActions(i) -> List[float]: '''combines current sensoryState and all possible actions to return all possible payoffs by action >>> obsSpace, actSpace, ftrSpace = (2,2), (3,), (2,2)+(3,) >>> i = Agent(obsSpace=obsSpace, actSpace=actSpace) >>> (i.featureToIndexMap == np.arange(i.n_features).reshape((2,2,3))).all() True >>> i.sensoryState[:] = [1,0] >>> i.weights = np.zeros(12) >>> i.weights[6:9] = [1.,2.,3.] # weights associated with features (1,0,) with actions 0,1,2 >>> i.predictPayoffsForAllActions() [1.0, 2.0, 3.0] ''' #print(i.sensoryState, i.allActions) try: featureKeys = [tuple(i.sensoryState)+(action,) for action in i.allActions] # featuresForEachAction = [i.featureToIndexMap[tuple(i.sensoryState)+(action,)] for action in i.allActions] featuresForEachAction = [i.featureToIndexMap[fki] for fki in featureKeys] #print('featureToIndexMap', i.featureToIndexMap) #print('featureKeys', featureKeys) #print('sensoryState', i.sensoryState, 'allActions', i.allActions) return [i.weights[features].sum() for features in featuresForEachAction] except: estr = f"Error: {traceback.format_exc()}" print(estr) print('featureToIndexMap', i.featureToIndexMap) print('featureKeys', featureKeys) print('sensoryState', i.sensoryState, 'allActions', i.allActions) return [np.nan for x in range(len(i.allActions))] def plasticUpdate(i): # This algorithm is a TD-lambda algorithm # with epsilon-greedy action-selection # (could use annealing of the epsilon - I removed it again) # determine predicted payoff nextActionPredictedPayoff = 0.0 # used to find surprise between expected and received payoff nextAction = 0 # epsilon-greedy action-selection # choose random if np.random.random() < i.epsilon: # random nextAction = np.random.choice(i.n_actions) else: # choose best try: q_vals = i.predictPayoffsForAllActions() nextAction = np.argmax(q_vals) if i.reward >= 0.0: # goal achieved nextActionPredictedPayoff = 0.0 else: nextActionPredictedPayoff = q_vals[nextAction] except: estr = f"Error: {traceback.format_exc()}" print(estr) print("q_vals", q_vals) # only update weights if accumulated at least 1 experience if not i.recentReset: # determine the corrected payoff version given the reward actually received previousActionCorrectedPayoff = i.reward + (nextActionPredictedPayoff * i.gamma) # use this information to update weights for last action-selection based on how surprised we were features = i.featureToIndexMap[tuple(i.previousSensoryState)+(i.action,)] previousActionPredictedPayoff = i.weights[features].sum() surprise = previousActionCorrectedPayoff - previousActionPredictedPayoff # do weight updates i.trace[features] = 1.0 # do trace updates i.weights += i.alpha * surprise * i.trace i.trace *= i.lmbda # keep track of state and action t, t-1 i.previousSensoryState = i.sensoryState[:] i.action = nextAction i.recentReset = False def staticUpdate(i): # same as plasticUpdate, but without learning # (a.k.a. 'deployment') # determine predicted payoff nextActionPredictedPayoff = 0.0 # used to find surprise between expected and received payoff nextAction = 0 # greedy action-selection q_vals = i.predictPayoffsForAllActions() nextAction = np.argmax(q_vals) # step the storage of state and action in memory i.previousSensoryState = i.sensoryState[:] i.action = nextAction """ This derived class adds a mutation_rate attribute, as well as methods for mutation, crossover, and fitness handling. You can then use an evolutionary algorithm to evolve a population of EvolvableAgent instances by applying selection, crossover, and mutation operations based on the agents' fitness values. """ def tuple_shape(input_tuple): if not isinstance(input_tuple, tuple): try: return input_tuple.shape except: raise TypeError("Input must be a tuple") # Check if the tuple is nested (i.e., if it's a multidimensional tuple) if any(isinstance(item, tuple) for item in input_tuple): shape = [] while isinstance(input_tuple, tuple): shape.append(len(input_tuple)) input_tuple = input_tuple[0] return tuple(shape) else: return (len(input_tuple),) class Holder(object): def __init__(self): pass class EvolvableAgent(Agent): """ EvolvableAgent This class extends the Agent class from q_agent.py, adding functionality for evolutionary computation. The EvolvableAgent class can be used with evolutionary algorithms to optimize the agent's performance through mutation, crossover, and selection based on fitness values. Usage: import EvolvableAgent Class: EvolvableAgent(obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96, mutation_rate=0.05) Attributes (in addition to Agent attributes): mutation_rate (float): The probability of each weight being mutated during mutation. fitness (float): The fitness value of the agent, used for evaluation and selection in an evolutionary algorithm. Methods (in addition to Agent methods): mutate(): Mutates the agent's weights by adding small random values, drawn from a normal distribution. The mutation_rate attribute determines the probability of each weight being mutated. csharp Copy code crossover(other: 'EvolvableAgent') -> 'EvolvableAgent': Performs uniform crossover between this agent and another agent, creating a new offspring agent. Args: other (EvolvableAgent): The other agent to perform crossover with. Returns: EvolvableAgent: The offspring agent resulting from the crossover. set_fitness(fitness: float): Sets the fitness value for the agent. Args: fitness (float): The fitness value to be set. get_fitness() -> float: Gets the fitness value of the agent. Returns: float: The fitness value of the agent. Examples: >>> from EvolvableAgent import EvolvableAgent >>> obsSpace, actSpace = (2, 2), (3,) >>> agent = EvolvableAgent(obsSpace=obsSpace, actSpace=actSpace, mutation_rate=0.05) """ def __init__(self, obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96, \ mutation_rate=0.05, crossover_rate=0.01, fitness=None): # obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96 super().__init__(obsSpace, actSpace, alpha, gamma, epsilon, lmbda) self.germline = self.weights self.mutation_rate = mutation_rate self.crossover_rate = crossover_rate self.wfitness = None self.fitness = fitness self.init_fitness = fitness def mutate(self): """ Mutate the agent's weights by adding small random values, drawn from a normal distribution. The mutation_rate attribute determines the probability of each weight being mutated. """ wtshape = self.weights.shape glshape = self.germline.shape mutation_mask = np.random.random(self.germline.shape) < self.mutation_rate self.germline[mutation_mask] += np.random.normal(loc=0, scale=0.01, size=np.sum(mutation_mask)) self.weights = self.germline assert glshape == self.germline.shape, "Error: mutate() germline shape has changed" assert wtshape == self.weights.shape, "Error: mutate() weights shape has changed" def crossover(self, other: 'EvolvableAgent') -> 'EvolvableAgent': """ Perform uniform crossover between this agent and another agent, creating a new offspring agent. Args: other (EvolvableAgent): The other agent to perform crossover with. Returns: EvolvableAgent: The offspring agent resulting from the crossover. """ wtshape = self.weights.shape glshape = self.germline.shape offspring = EvolvableAgent(self.obsSpace, self.actSpace, self.alpha, self.gamma, self.epsilon, self.lmbda, self.mutation_rate, self.crossover_rate, self.init_fitness4) if np.random.random() <= self.crossover_rate: crossover_mask = np.random.randint(0, 2, size=self.germline.shape, dtype=bool) offspring.germline = np.where(crossover_mask, self.germline, other.germline) else: offspring.germline = self.germline offspring.weights = offspring.germline assert self.obsSpace.shape == offspring.obsSpace.shape, f"Error: offspring has different obsSpace {offspring.obsSpace} != {self.obsSpace}" assert self.actSpace.shape == offspring.actSpace.shape, f"Error: offspring has different actSpace {offspring.actSpace} != {self.actSpace}" assert tuple_shape(self.ftrSpace) == tuple_shape(offspring.ftrSpace), f"Error: offspring had different ftrSpace {offspring.ftrSpace} {offspring.obsSpace} {offspring.actSpace} != {self.ftrSpace} {self.obsSpace} {self.actSpace}" assert glshape == offspring.germline.shape, "Error: offspring germline shape has changed" assert wtshape == offspring.weights.shape, "Error: offspring weights shape has changed" return offspring def set_wfitness(self, fitness: float): """ Set the fitnevss value for the agent. Args: fitness (float): The fitness value to be set. """ self.wfitness = fitness def get_wfitness(self) -> float: """ Get the fitness value of the agent. Returns: float: The fitness value of the agent. """ return self.wfitness def set_fitness(self, fitness: float): """ Set the fitness value for the agent. Args: fitness (float): The fitness value to be set. """ self.fitness.values = (fitness,) def get_fitness(self) -> float: """ Get the fitness value of the agent. Returns: float: The fitness value of the agent. """ return self.fitness.values[0] if __name__ == '__main__': '''test important functions and workflows with doctesting run this python file by itself to run these tests, and set LOGGING=True near top of file.''' import doctest from functools import partial #doctest.testmod() test = partial(doctest.run_docstring_examples, globs=globals()) test(Agent.predictPayoffsForAllActions)