275 lines
9.1 KiB
Python
275 lines
9.1 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
import math
|
|
import random
|
|
from typing import Callable
|
|
|
|
from .encoding import encode_note_sequence
|
|
from .types import NOTE_VOCABULARY_SIZE, SEQUENCE_LENGTH
|
|
|
|
NoiseFn = Callable[[float, float], float]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HopfieldParams:
|
|
epsilon: float = 0.005
|
|
resistance_scale: float = 3.5
|
|
capacitance_scale: float = 10.0
|
|
weight_scale: float = 1.0
|
|
input_scale: float = 1.0
|
|
iteration_scale: float = 1.0
|
|
global_resistance: float = 1.0
|
|
global_capacitance: float = 1.0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HopfieldResult:
|
|
input_notes: tuple[int, ...]
|
|
output_notes: tuple[int, ...]
|
|
candidate_note: int
|
|
iterations: int
|
|
activations: tuple[tuple[float, ...], ...]
|
|
outputs: tuple[tuple[float, ...], ...]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HopfieldNetworkState:
|
|
activations: tuple[tuple[float, ...], ...]
|
|
outputs: tuple[tuple[float, ...], ...]
|
|
external_inputs: tuple[tuple[float, ...], ...]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HopfieldRunResult:
|
|
state: HopfieldNetworkState
|
|
iterations: int
|
|
|
|
|
|
def make_gaussian_noise(rng: random.Random | None = None) -> NoiseFn:
|
|
generator = rng or random.Random()
|
|
|
|
def gaussian_noise(mean: float, variance: float) -> float:
|
|
u1 = generator.random()
|
|
u2 = generator.random()
|
|
# Match the Pascal Box-Muller form and avoid ln(0).
|
|
u1 = max(u1, 1e-12)
|
|
x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
|
|
return variance * x + mean
|
|
|
|
return gaussian_noise
|
|
|
|
|
|
def tanh_clamped(value: float, exp_max: float = 80.0) -> float:
|
|
value = max(min(value, exp_max), -exp_max)
|
|
return (math.exp(value) - math.exp(-value)) / (math.exp(value) + math.exp(-value))
|
|
|
|
|
|
def run_hopfield_network(
|
|
external_inputs: tuple[tuple[float, ...], ...],
|
|
weight_matrix: tuple[tuple[float, ...], ...],
|
|
*,
|
|
params: HopfieldParams | None = None,
|
|
initial_activations: tuple[tuple[float, ...], ...] | None = None,
|
|
) -> HopfieldRunResult:
|
|
model = params or HopfieldParams()
|
|
row_count = len(external_inputs)
|
|
if row_count == 0:
|
|
raise ValueError("external_inputs cannot be empty")
|
|
column_count = len(external_inputs[0])
|
|
if any(len(row) != column_count for row in external_inputs):
|
|
raise ValueError("external_inputs rows must be the same length")
|
|
_validate_weight_matrix(weight_matrix, active_size=row_count * column_count)
|
|
|
|
base_activations = initial_activations or tuple(
|
|
tuple(0.5 for _ in range(column_count)) for _ in range(row_count)
|
|
)
|
|
if len(base_activations) != row_count or any(len(row) != column_count for row in base_activations):
|
|
raise ValueError("initial_activations shape must match external_inputs")
|
|
|
|
activations = [
|
|
[list(row) for row in base_activations],
|
|
[list(row) for row in base_activations],
|
|
]
|
|
outputs = [
|
|
[[0.0 for _ in range(column_count)] for _ in range(row_count)],
|
|
[[0.0 for _ in range(column_count)] for _ in range(row_count)],
|
|
]
|
|
inputs = [
|
|
[list(row) for row in external_inputs],
|
|
[list(row) for row in external_inputs],
|
|
]
|
|
|
|
outputs[1][0][0] = 20.0
|
|
time_step = 0
|
|
_update_outputs(activations, outputs, time_step, model, row_count, column_count)
|
|
iterations = 0
|
|
|
|
while not _done(outputs, model.epsilon, row_count, column_count):
|
|
time_step = time_step % 2
|
|
next_time = (time_step + 1) % 2
|
|
_update_outputs(activations, outputs, time_step, model, row_count, column_count)
|
|
for row_index in range(row_count):
|
|
for column_index in range(column_count):
|
|
delta = _delta_neuron_activation(
|
|
row_index=row_index,
|
|
column_index=column_index,
|
|
row_count=row_count,
|
|
column_count=column_count,
|
|
time_step=time_step,
|
|
activations=activations,
|
|
outputs=outputs,
|
|
inputs=inputs,
|
|
weight_matrix=weight_matrix,
|
|
params=model,
|
|
)
|
|
activations[next_time][row_index][column_index] = (
|
|
activations[time_step][row_index][column_index]
|
|
+ model.iteration_scale * delta
|
|
)
|
|
time_step += 1
|
|
iterations += 1
|
|
|
|
final_slot = time_step % 2
|
|
state = HopfieldNetworkState(
|
|
activations=tuple(tuple(row) for row in activations[final_slot]),
|
|
outputs=tuple(tuple(row) for row in outputs[final_slot]),
|
|
external_inputs=tuple(tuple(row) for row in external_inputs),
|
|
)
|
|
return HopfieldRunResult(state=state, iterations=iterations)
|
|
|
|
|
|
def generate_next_note(
|
|
notes: list[int] | tuple[int, ...],
|
|
weight_matrix: tuple[tuple[float, ...], ...],
|
|
*,
|
|
params: HopfieldParams | None = None,
|
|
noise: NoiseFn | None = None,
|
|
) -> HopfieldResult:
|
|
model = params or HopfieldParams()
|
|
gaussian_noise = noise or make_gaussian_noise()
|
|
input_notes = encode_note_sequence(notes)
|
|
inputs = [[0.0 for _ in range(SEQUENCE_LENGTH)] for _ in range(NOTE_VOCABULARY_SIZE)]
|
|
|
|
for note_index in range(NOTE_VOCABULARY_SIZE):
|
|
for position in range(SEQUENCE_LENGTH):
|
|
note_value = input_notes[position]
|
|
if note_value == 0:
|
|
current_input = gaussian_noise(0.5, 0.25)
|
|
elif note_value == note_index + 1:
|
|
current_input = 0.67 + gaussian_noise(0.0, 0.1)
|
|
else:
|
|
current_input = 0.33 + gaussian_noise(0.0, 0.1)
|
|
inputs[note_index][position] = current_input
|
|
|
|
output_notes = list(input_notes)
|
|
run_result = run_hopfield_network(
|
|
tuple(tuple(row) for row in inputs),
|
|
weight_matrix,
|
|
params=model,
|
|
)
|
|
for position in range(SEQUENCE_LENGTH):
|
|
output_notes[position] = _max_cell_in_column(run_result.state.outputs, position)
|
|
|
|
return HopfieldResult(
|
|
input_notes=input_notes,
|
|
output_notes=tuple(output_notes),
|
|
candidate_note=output_notes[-1],
|
|
iterations=run_result.iterations,
|
|
activations=run_result.state.activations,
|
|
outputs=run_result.state.outputs,
|
|
)
|
|
|
|
|
|
def _validate_weight_matrix(
|
|
weight_matrix: tuple[tuple[float, ...], ...],
|
|
*,
|
|
active_size: int,
|
|
) -> None:
|
|
if len(weight_matrix) < active_size:
|
|
raise ValueError(f"weight matrix needs at least {active_size} rows")
|
|
if any(len(row) < active_size for row in weight_matrix[:active_size]):
|
|
raise ValueError(f"weight matrix needs at least {active_size} columns")
|
|
|
|
|
|
def _update_outputs(
|
|
activations: list[list[list[float]]],
|
|
outputs: list[list[list[float]]],
|
|
time_step: int,
|
|
params: HopfieldParams,
|
|
row_count: int,
|
|
column_count: int,
|
|
) -> None:
|
|
for row_index in range(row_count):
|
|
for column_index in range(column_count):
|
|
outputs[time_step][row_index][column_index] = 0.5 * (
|
|
1.0
|
|
+ tanh_clamped(
|
|
activations[time_step][row_index][column_index] / params.global_capacitance
|
|
)
|
|
)
|
|
|
|
|
|
def _done(
|
|
outputs: list[list[list[float]]],
|
|
epsilon: float,
|
|
row_count: int,
|
|
column_count: int,
|
|
) -> bool:
|
|
for row_index in range(row_count):
|
|
for column_index in range(column_count):
|
|
if abs(outputs[0][row_index][column_index] - outputs[1][row_index][column_index]) > epsilon:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _weight_coord(row_index: int, column_index: int, row_count: int) -> int:
|
|
return row_count * column_index + row_index
|
|
|
|
|
|
def _delta_neuron_activation(
|
|
*,
|
|
row_index: int,
|
|
column_index: int,
|
|
row_count: int,
|
|
column_count: int,
|
|
time_step: int,
|
|
activations: list[list[list[float]]],
|
|
outputs: list[list[list[float]]],
|
|
inputs: list[list[list[float]]],
|
|
weight_matrix: tuple[tuple[float, ...], ...],
|
|
params: HopfieldParams,
|
|
) -> float:
|
|
weight_sum = 0.0
|
|
current_index = _weight_coord(row_index, column_index, row_count)
|
|
for other_row in range(row_count):
|
|
for other_column in range(column_count):
|
|
other_index = _weight_coord(other_row, other_column, row_count)
|
|
weight_sum += (
|
|
weight_matrix[current_index][other_index]
|
|
* params.weight_scale
|
|
* outputs[time_step][other_row][other_column]
|
|
)
|
|
activation = activations[time_step][row_index][column_index]
|
|
neuron_input = inputs[time_step][row_index][column_index]
|
|
numerator = (
|
|
-(activation / (params.global_resistance * params.resistance_scale))
|
|
+ (neuron_input * params.input_scale)
|
|
+ weight_sum
|
|
)
|
|
return numerator / (params.global_capacitance * params.capacitance_scale)
|
|
|
|
|
|
def _max_cell_in_column(
|
|
output_grid: list[list[float]] | tuple[tuple[float, ...], ...],
|
|
position: int,
|
|
) -> int:
|
|
max_value = 0.0
|
|
max_note = 1
|
|
for note_index in range(NOTE_VOCABULARY_SIZE):
|
|
output = output_grid[note_index][position]
|
|
if output > max_value:
|
|
max_value = output
|
|
max_note = note_index + 1
|
|
return max_note
|