alice/code/agents/q_agent.py

366 lines
15 KiB
Python
Executable File

"""
q_agent.py
This submodule contains the Agent class, which implements a Q-learning agent with eligibility traces (TD-lambda). The agent learns to make decisions based on its sensory state and rewards received from the environment. The agent uses an epsilon-greedy action-selection strategy.
Usage:
import q_agent
Class:
Agent(obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96)
Attributes:
obsSpace (tuple): The shape of the observation space.
actSpace (tuple): The shape of the action space.
ftrSpace (tuple): The shape of the feature space.
n_features (int): The total number of features.
n_actions (int): The total number of actions.
weights (numpy.ndarray): The Q-function weights.
trace (numpy.ndarray): The eligibility trace for each feature.
featureToIndexMap (numpy.ndarray): A mapping from feature indices to the corresponding weights.
allActions (list): A list of all possible actions.
alpha (float): The learning rate for updating weights.
gamma (float): The discount factor for future rewards.
epsilon (float): The exploration rate for epsilon-greedy action selection.
lmbda (float): The decay factor for eligibility traces.
sensoryState (numpy.ndarray): The current sensory state of the agent.
previousSensoryState (numpy.ndarray): The previous sensory state of the agent.
action (int): The current action taken by the agent.
previousAction (int): The previous action taken by the agent.
episoden (int): The episode number the agent is in.
recentReset (bool): Indicates if the agent was recently reset.
Methods:
reset():
Resets the agent's traces, sensory states, and actions.
predictPayoffsForAllActions() -> List[float]:
Predicts the expected payoffs for all possible actions given the current sensory state.
plasticUpdate():
Updates the agent's Q-function weights and eligibility traces based on the current sensory state, action, and received reward. Uses epsilon-greedy action selection.
staticUpdate():
Updates the agent's action based on the current sensory state without updating weights or traces. Uses greedy action selection.
Examples:
>>> from q_agent import Agent
>>> obsSpace, actSpace = (2, 2), (3,)
>>> agent = Agent(obsSpace=obsSpace, actSpace=actSpace)
"""
import traceback
import numpy as np
from collections import defaultdict
from typing import List, Tuple, Union
from deap import creator, base, tools, algorithms
LOGGING = False
import logging, sys
logging.basicConfig(stream=sys.stdout,level=logging.INFO)
log = logging.getLogger()
if not LOGGING:
# remove all logging functionality
for handler in log.handlers.copy():
try:
log.removeHandler(handler)
except ValueError: # in case another thread has already removed it
pass
log.addHandler(logging.NullHandler())
log.propagate = False
# The Agent class, similar to what
# is used in MABE. Note: this is unlike
# how standard RLML folks structure these
# algorithms. Here, we separate out concerns
# for modularity. A side-effect is that the
# update() (one cognitive step) receives the reward
# for the previous update-action. This means 1 extra
# update must be called if terminating.
class Agent():
def __init__(i, obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96):
i.obsSpace = np.array(obsSpace)
i.actSpace = np.array(actSpace)
i.ftrSpace = tuple(obsSpace)+tuple(actSpace)
i.n_features = np.prod(i.ftrSpace)
i.n_actions = actSpace[0] # not general
i.weights = np.zeros(i.n_features)
i.trace = np.zeros(i.n_features)
i.featureToIndexMap = np.arange(i.n_features).reshape(i.ftrSpace)
i.allActions = list(range(i.n_actions))
# new
i.alpha = alpha # how much to weigh reward surprises that deviate from expectation
i.gamma = gamma # how important exepcted rewards will be
i.epsilon = epsilon # fraction of exploration to exploitation (how often to choose a random action)
i.lmbda = lmbda # how important preceeding actions are in learning adaptation
i.sensoryState = np.zeros(len(i.obsSpace),dtype=np.int32)
i.previousSensoryState = np.zeros(len(i.obsSpace),dtype=np.int32)
i.action = 0
i.previousAction = 0
i.episoden = 0
i.recentReset = True
def reset(i): # only resets traces
log.info("resetting agent")
i.trace = np.zeros(i.n_features)
i.sensoryState = np.zeros(len(i.obsSpace),dtype=np.int32)
i.previousSensoryState = np.zeros(len(i.obsSpace),dtype=np.int32)
i.action = 0
i.previousAction = 0
i.reward = -1
i.recentReset = True
def predictPayoffsForAllActions(i) -> List[float]:
'''combines current sensoryState and all possible actions to return all possible payoffs by action
>>> obsSpace, actSpace, ftrSpace = (2,2), (3,), (2,2)+(3,)
>>> i = Agent(obsSpace=obsSpace, actSpace=actSpace)
>>> (i.featureToIndexMap == np.arange(i.n_features).reshape((2,2,3))).all()
True
>>> i.sensoryState[:] = [1,0]
>>> i.weights = np.zeros(12)
>>> i.weights[6:9] = [1.,2.,3.] # weights associated with features (1,0,<action>) with actions 0,1,2
>>> i.predictPayoffsForAllActions()
[1.0, 2.0, 3.0]
'''
#print(i.sensoryState, i.allActions)
try:
featureKeys = [tuple(i.sensoryState)+(action,) for action in i.allActions]
# featuresForEachAction = [i.featureToIndexMap[tuple(i.sensoryState)+(action,)] for action in i.allActions]
featuresForEachAction = [i.featureToIndexMap[fki] for fki in featureKeys]
#print('featureToIndexMap', i.featureToIndexMap)
#print('featureKeys', featureKeys)
#print('sensoryState', i.sensoryState, 'allActions', i.allActions)
return [i.weights[features].sum() for features in featuresForEachAction]
except:
estr = f"Error: {traceback.format_exc()}"
print(estr)
print('featureToIndexMap', i.featureToIndexMap)
print('featureKeys', featureKeys)
print('sensoryState', i.sensoryState, 'allActions', i.allActions)
return [np.nan for x in range(len(i.allActions))]
def plasticUpdate(i):
# This algorithm is a TD-lambda algorithm
# with epsilon-greedy action-selection
# (could use annealing of the epsilon - I removed it again)
# determine predicted payoff
nextActionPredictedPayoff = 0.0 # used to find surprise between expected and received payoff
nextAction = 0
# epsilon-greedy action-selection
# choose random
if np.random.random() < i.epsilon: # random
nextAction = np.random.choice(i.n_actions)
else: # choose best
try:
q_vals = i.predictPayoffsForAllActions()
nextAction = np.argmax(q_vals)
if i.reward >= 0.0: # goal achieved
nextActionPredictedPayoff = 0.0
else:
nextActionPredictedPayoff = q_vals[nextAction]
except:
estr = f"Error: {traceback.format_exc()}"
print(estr)
print("q_vals", q_vals)
# only update weights if accumulated at least 1 experience
if not i.recentReset:
# determine the corrected payoff version given the reward actually received
previousActionCorrectedPayoff = i.reward + (nextActionPredictedPayoff * i.gamma)
# use this information to update weights for last action-selection based on how surprised we were
features = i.featureToIndexMap[tuple(i.previousSensoryState)+(i.action,)]
previousActionPredictedPayoff = i.weights[features].sum()
surprise = previousActionCorrectedPayoff - previousActionPredictedPayoff
# do weight updates
i.trace[features] = 1.0
# do trace updates
i.weights += i.alpha * surprise * i.trace
i.trace *= i.lmbda
# keep track of state and action t, t-1
i.previousSensoryState = i.sensoryState[:]
i.action = nextAction
i.recentReset = False
def staticUpdate(i):
# same as plasticUpdate, but without learning
# (a.k.a. 'deployment')
# determine predicted payoff
nextActionPredictedPayoff = 0.0 # used to find surprise between expected and received payoff
nextAction = 0
# greedy action-selection
q_vals = i.predictPayoffsForAllActions()
nextAction = np.argmax(q_vals)
# step the storage of state and action in memory
i.previousSensoryState = i.sensoryState[:]
i.action = nextAction
"""
This derived class adds a mutation_rate attribute, as well as methods for mutation, crossover, and fitness handling. You can then use an evolutionary algorithm to evolve a population of EvolvableAgent instances by applying selection, crossover, and mutation operations based on the agents' fitness values.
"""
def tuple_shape(input_tuple):
if not isinstance(input_tuple, tuple):
try:
return input_tuple.shape
except:
raise TypeError("Input must be a tuple")
# Check if the tuple is nested (i.e., if it's a multidimensional tuple)
if any(isinstance(item, tuple) for item in input_tuple):
shape = []
while isinstance(input_tuple, tuple):
shape.append(len(input_tuple))
input_tuple = input_tuple[0]
return tuple(shape)
else:
return (len(input_tuple),)
class Holder(object):
def __init__(self):
pass
class EvolvableAgent(Agent):
""" EvolvableAgent
This class extends the Agent class from q_agent.py, adding functionality for evolutionary computation. The EvolvableAgent class can be used with evolutionary algorithms to optimize the agent's performance through mutation, crossover, and selection based on fitness values.
Usage:
import EvolvableAgent
Class:
EvolvableAgent(obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96, mutation_rate=0.05)
Attributes (in addition to Agent attributes):
mutation_rate (float): The probability of each weight being mutated during mutation.
fitness (float): The fitness value of the agent, used for evaluation and selection in an evolutionary algorithm.
Methods (in addition to Agent methods):
mutate():
Mutates the agent's weights by adding small random values, drawn from a normal distribution. The mutation_rate attribute determines the probability of each weight being mutated.
csharp
Copy code
crossover(other: 'EvolvableAgent') -> 'EvolvableAgent':
Performs uniform crossover between this agent and another agent, creating a new offspring agent.
Args:
other (EvolvableAgent): The other agent to perform crossover with.
Returns:
EvolvableAgent: The offspring agent resulting from the crossover.
set_fitness(fitness: float):
Sets the fitness value for the agent.
Args:
fitness (float): The fitness value to be set.
get_fitness() -> float:
Gets the fitness value of the agent.
Returns:
float: The fitness value of the agent.
Examples:
>>> from EvolvableAgent import EvolvableAgent
>>> obsSpace, actSpace = (2, 2), (3,)
>>> agent = EvolvableAgent(obsSpace=obsSpace, actSpace=actSpace, mutation_rate=0.05)
"""
def __init__(self, obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96, \
mutation_rate=0.05, crossover_rate=0.01, fitness=None):
# obsSpace, actSpace, alpha=0.1, gamma=0.95, epsilon=0.01, lmbda=0.96
super().__init__(obsSpace, actSpace, alpha, gamma, epsilon, lmbda)
self.germline = self.weights
self.mutation_rate = mutation_rate
self.crossover_rate = crossover_rate
self.wfitness = None
self.fitness = fitness
self.init_fitness = fitness
def mutate(self):
"""
Mutate the agent's weights by adding small random values, drawn from a normal distribution.
The mutation_rate attribute determines the probability of each weight being mutated.
"""
wtshape = self.weights.shape
glshape = self.germline.shape
mutation_mask = np.random.random(self.germline.shape) < self.mutation_rate
self.germline[mutation_mask] += np.random.normal(loc=0, scale=0.01, size=np.sum(mutation_mask))
self.weights = self.germline
assert glshape == self.germline.shape, "Error: mutate() germline shape has changed"
assert wtshape == self.weights.shape, "Error: mutate() weights shape has changed"
def crossover(self, other: 'EvolvableAgent') -> 'EvolvableAgent':
"""
Perform uniform crossover between this agent and another agent, creating a new offspring agent.
Args:
other (EvolvableAgent): The other agent to perform crossover with.
Returns:
EvolvableAgent: The offspring agent resulting from the crossover.
"""
wtshape = self.weights.shape
glshape = self.germline.shape
offspring = EvolvableAgent(self.obsSpace, self.actSpace, self.alpha, self.gamma, self.epsilon, self.lmbda, self.mutation_rate, self.crossover_rate, self.init_fitness4)
if np.random.random() <= self.crossover_rate:
crossover_mask = np.random.randint(0, 2, size=self.germline.shape, dtype=bool)
offspring.germline = np.where(crossover_mask, self.germline, other.germline)
else:
offspring.germline = self.germline
offspring.weights = offspring.germline
assert self.obsSpace.shape == offspring.obsSpace.shape, f"Error: offspring has different obsSpace {offspring.obsSpace} != {self.obsSpace}"
assert self.actSpace.shape == offspring.actSpace.shape, f"Error: offspring has different actSpace {offspring.actSpace} != {self.actSpace}"
assert tuple_shape(self.ftrSpace) == tuple_shape(offspring.ftrSpace), f"Error: offspring had different ftrSpace {offspring.ftrSpace} {offspring.obsSpace} {offspring.actSpace} != {self.ftrSpace} {self.obsSpace} {self.actSpace}"
assert glshape == offspring.germline.shape, "Error: offspring germline shape has changed"
assert wtshape == offspring.weights.shape, "Error: offspring weights shape has changed"
return offspring
def set_wfitness(self, fitness: float):
"""
Set the fitnevss value for the agent.
Args:
fitness (float): The fitness value to be set.
"""
self.wfitness = fitness
def get_wfitness(self) -> float:
"""
Get the fitness value of the agent.
Returns:
float: The fitness value of the agent.
"""
return self.wfitness
def set_fitness(self, fitness: float):
"""
Set the fitness value for the agent.
Args:
fitness (float): The fitness value to be set.
"""
self.fitness.values = (fitness,)
def get_fitness(self) -> float:
"""
Get the fitness value of the agent.
Returns:
float: The fitness value of the agent.
"""
return self.fitness.values[0]
if __name__ == '__main__':
'''test important functions and workflows with doctesting
run this python file by itself to run these tests, and set
LOGGING=True near top of file.'''
import doctest
from functools import partial
#doctest.testmod()
test = partial(doctest.run_docstring_examples, globs=globals())
test(Agent.predictPayoffsForAllActions)