TriuneCadence/composer_ans/hopfield.py

275 lines
9.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
import math
import random
from typing import Callable
from .encoding import encode_note_sequence
from .types import NOTE_VOCABULARY_SIZE, SEQUENCE_LENGTH
NoiseFn = Callable[[float, float], float]
@dataclass(frozen=True)
class HopfieldParams:
epsilon: float = 0.005
resistance_scale: float = 3.5
capacitance_scale: float = 10.0
weight_scale: float = 1.0
input_scale: float = 1.0
iteration_scale: float = 1.0
global_resistance: float = 1.0
global_capacitance: float = 1.0
@dataclass(frozen=True)
class HopfieldResult:
input_notes: tuple[int, ...]
output_notes: tuple[int, ...]
candidate_note: int
iterations: int
activations: tuple[tuple[float, ...], ...]
outputs: tuple[tuple[float, ...], ...]
@dataclass(frozen=True)
class HopfieldNetworkState:
activations: tuple[tuple[float, ...], ...]
outputs: tuple[tuple[float, ...], ...]
external_inputs: tuple[tuple[float, ...], ...]
@dataclass(frozen=True)
class HopfieldRunResult:
state: HopfieldNetworkState
iterations: int
def make_gaussian_noise(rng: random.Random | None = None) -> NoiseFn:
generator = rng or random.Random()
def gaussian_noise(mean: float, variance: float) -> float:
u1 = generator.random()
u2 = generator.random()
# Match the Pascal Box-Muller form and avoid ln(0).
u1 = max(u1, 1e-12)
x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
return variance * x + mean
return gaussian_noise
def tanh_clamped(value: float, exp_max: float = 80.0) -> float:
value = max(min(value, exp_max), -exp_max)
return (math.exp(value) - math.exp(-value)) / (math.exp(value) + math.exp(-value))
def run_hopfield_network(
external_inputs: tuple[tuple[float, ...], ...],
weight_matrix: tuple[tuple[float, ...], ...],
*,
params: HopfieldParams | None = None,
initial_activations: tuple[tuple[float, ...], ...] | None = None,
) -> HopfieldRunResult:
model = params or HopfieldParams()
row_count = len(external_inputs)
if row_count == 0:
raise ValueError("external_inputs cannot be empty")
column_count = len(external_inputs[0])
if any(len(row) != column_count for row in external_inputs):
raise ValueError("external_inputs rows must be the same length")
_validate_weight_matrix(weight_matrix, active_size=row_count * column_count)
base_activations = initial_activations or tuple(
tuple(0.5 for _ in range(column_count)) for _ in range(row_count)
)
if len(base_activations) != row_count or any(len(row) != column_count for row in base_activations):
raise ValueError("initial_activations shape must match external_inputs")
activations = [
[list(row) for row in base_activations],
[list(row) for row in base_activations],
]
outputs = [
[[0.0 for _ in range(column_count)] for _ in range(row_count)],
[[0.0 for _ in range(column_count)] for _ in range(row_count)],
]
inputs = [
[list(row) for row in external_inputs],
[list(row) for row in external_inputs],
]
outputs[1][0][0] = 20.0
time_step = 0
_update_outputs(activations, outputs, time_step, model, row_count, column_count)
iterations = 0
while not _done(outputs, model.epsilon, row_count, column_count):
time_step = time_step % 2
next_time = (time_step + 1) % 2
_update_outputs(activations, outputs, time_step, model, row_count, column_count)
for row_index in range(row_count):
for column_index in range(column_count):
delta = _delta_neuron_activation(
row_index=row_index,
column_index=column_index,
row_count=row_count,
column_count=column_count,
time_step=time_step,
activations=activations,
outputs=outputs,
inputs=inputs,
weight_matrix=weight_matrix,
params=model,
)
activations[next_time][row_index][column_index] = (
activations[time_step][row_index][column_index]
+ model.iteration_scale * delta
)
time_step += 1
iterations += 1
final_slot = time_step % 2
state = HopfieldNetworkState(
activations=tuple(tuple(row) for row in activations[final_slot]),
outputs=tuple(tuple(row) for row in outputs[final_slot]),
external_inputs=tuple(tuple(row) for row in external_inputs),
)
return HopfieldRunResult(state=state, iterations=iterations)
def generate_next_note(
notes: list[int] | tuple[int, ...],
weight_matrix: tuple[tuple[float, ...], ...],
*,
params: HopfieldParams | None = None,
noise: NoiseFn | None = None,
) -> HopfieldResult:
model = params or HopfieldParams()
gaussian_noise = noise or make_gaussian_noise()
input_notes = encode_note_sequence(notes)
inputs = [[0.0 for _ in range(SEQUENCE_LENGTH)] for _ in range(NOTE_VOCABULARY_SIZE)]
for note_index in range(NOTE_VOCABULARY_SIZE):
for position in range(SEQUENCE_LENGTH):
note_value = input_notes[position]
if note_value == 0:
current_input = gaussian_noise(0.5, 0.25)
elif note_value == note_index + 1:
current_input = 0.67 + gaussian_noise(0.0, 0.1)
else:
current_input = 0.33 + gaussian_noise(0.0, 0.1)
inputs[note_index][position] = current_input
output_notes = list(input_notes)
run_result = run_hopfield_network(
tuple(tuple(row) for row in inputs),
weight_matrix,
params=model,
)
for position in range(SEQUENCE_LENGTH):
output_notes[position] = _max_cell_in_column(run_result.state.outputs, position)
return HopfieldResult(
input_notes=input_notes,
output_notes=tuple(output_notes),
candidate_note=output_notes[-1],
iterations=run_result.iterations,
activations=run_result.state.activations,
outputs=run_result.state.outputs,
)
def _validate_weight_matrix(
weight_matrix: tuple[tuple[float, ...], ...],
*,
active_size: int,
) -> None:
if len(weight_matrix) < active_size:
raise ValueError(f"weight matrix needs at least {active_size} rows")
if any(len(row) < active_size for row in weight_matrix[:active_size]):
raise ValueError(f"weight matrix needs at least {active_size} columns")
def _update_outputs(
activations: list[list[list[float]]],
outputs: list[list[list[float]]],
time_step: int,
params: HopfieldParams,
row_count: int,
column_count: int,
) -> None:
for row_index in range(row_count):
for column_index in range(column_count):
outputs[time_step][row_index][column_index] = 0.5 * (
1.0
+ tanh_clamped(
activations[time_step][row_index][column_index] / params.global_capacitance
)
)
def _done(
outputs: list[list[list[float]]],
epsilon: float,
row_count: int,
column_count: int,
) -> bool:
for row_index in range(row_count):
for column_index in range(column_count):
if abs(outputs[0][row_index][column_index] - outputs[1][row_index][column_index]) > epsilon:
return False
return True
def _weight_coord(row_index: int, column_index: int, row_count: int) -> int:
return row_count * column_index + row_index
def _delta_neuron_activation(
*,
row_index: int,
column_index: int,
row_count: int,
column_count: int,
time_step: int,
activations: list[list[list[float]]],
outputs: list[list[list[float]]],
inputs: list[list[list[float]]],
weight_matrix: tuple[tuple[float, ...], ...],
params: HopfieldParams,
) -> float:
weight_sum = 0.0
current_index = _weight_coord(row_index, column_index, row_count)
for other_row in range(row_count):
for other_column in range(column_count):
other_index = _weight_coord(other_row, other_column, row_count)
weight_sum += (
weight_matrix[current_index][other_index]
* params.weight_scale
* outputs[time_step][other_row][other_column]
)
activation = activations[time_step][row_index][column_index]
neuron_input = inputs[time_step][row_index][column_index]
numerator = (
-(activation / (params.global_resistance * params.resistance_scale))
+ (neuron_input * params.input_scale)
+ weight_sum
)
return numerator / (params.global_capacitance * params.capacitance_scale)
def _max_cell_in_column(
output_grid: list[list[float]] | tuple[tuple[float, ...], ...],
position: int,
) -> int:
max_value = 0.0
max_note = 1
for note_index in range(NOTE_VOCABULARY_SIZE):
output = output_grid[note_index][position]
if output > max_value:
max_value = output
max_note = note_index + 1
return max_note