from __future__ import annotations from dataclasses import dataclass import math import random from typing import Callable from .encoding import encode_note_sequence from .types import NOTE_VOCABULARY_SIZE, SEQUENCE_LENGTH NoiseFn = Callable[[float, float], float] @dataclass(frozen=True) class HopfieldParams: epsilon: float = 0.005 resistance_scale: float = 3.5 capacitance_scale: float = 10.0 weight_scale: float = 1.0 input_scale: float = 1.0 iteration_scale: float = 1.0 global_resistance: float = 1.0 global_capacitance: float = 1.0 @dataclass(frozen=True) class HopfieldResult: input_notes: tuple[int, ...] output_notes: tuple[int, ...] candidate_note: int iterations: int activations: tuple[tuple[float, ...], ...] outputs: tuple[tuple[float, ...], ...] @dataclass(frozen=True) class HopfieldNetworkState: activations: tuple[tuple[float, ...], ...] outputs: tuple[tuple[float, ...], ...] external_inputs: tuple[tuple[float, ...], ...] @dataclass(frozen=True) class HopfieldRunResult: state: HopfieldNetworkState iterations: int def make_gaussian_noise(rng: random.Random | None = None) -> NoiseFn: generator = rng or random.Random() def gaussian_noise(mean: float, variance: float) -> float: u1 = generator.random() u2 = generator.random() # Match the Pascal Box-Muller form and avoid ln(0). u1 = max(u1, 1e-12) x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2) return variance * x + mean return gaussian_noise def tanh_clamped(value: float, exp_max: float = 80.0) -> float: value = max(min(value, exp_max), -exp_max) return (math.exp(value) - math.exp(-value)) / (math.exp(value) + math.exp(-value)) def run_hopfield_network( external_inputs: tuple[tuple[float, ...], ...], weight_matrix: tuple[tuple[float, ...], ...], *, params: HopfieldParams | None = None, initial_activations: tuple[tuple[float, ...], ...] | None = None, ) -> HopfieldRunResult: model = params or HopfieldParams() row_count = len(external_inputs) if row_count == 0: raise ValueError("external_inputs cannot be empty") column_count = len(external_inputs[0]) if any(len(row) != column_count for row in external_inputs): raise ValueError("external_inputs rows must be the same length") _validate_weight_matrix(weight_matrix, active_size=row_count * column_count) base_activations = initial_activations or tuple( tuple(0.5 for _ in range(column_count)) for _ in range(row_count) ) if len(base_activations) != row_count or any(len(row) != column_count for row in base_activations): raise ValueError("initial_activations shape must match external_inputs") activations = [ [list(row) for row in base_activations], [list(row) for row in base_activations], ] outputs = [ [[0.0 for _ in range(column_count)] for _ in range(row_count)], [[0.0 for _ in range(column_count)] for _ in range(row_count)], ] inputs = [ [list(row) for row in external_inputs], [list(row) for row in external_inputs], ] outputs[1][0][0] = 20.0 time_step = 0 _update_outputs(activations, outputs, time_step, model, row_count, column_count) iterations = 0 while not _done(outputs, model.epsilon, row_count, column_count): time_step = time_step % 2 next_time = (time_step + 1) % 2 _update_outputs(activations, outputs, time_step, model, row_count, column_count) for row_index in range(row_count): for column_index in range(column_count): delta = _delta_neuron_activation( row_index=row_index, column_index=column_index, row_count=row_count, column_count=column_count, time_step=time_step, activations=activations, outputs=outputs, inputs=inputs, weight_matrix=weight_matrix, params=model, ) activations[next_time][row_index][column_index] = ( activations[time_step][row_index][column_index] + model.iteration_scale * delta ) time_step += 1 iterations += 1 final_slot = time_step % 2 state = HopfieldNetworkState( activations=tuple(tuple(row) for row in activations[final_slot]), outputs=tuple(tuple(row) for row in outputs[final_slot]), external_inputs=tuple(tuple(row) for row in external_inputs), ) return HopfieldRunResult(state=state, iterations=iterations) def generate_next_note( notes: list[int] | tuple[int, ...], weight_matrix: tuple[tuple[float, ...], ...], *, params: HopfieldParams | None = None, noise: NoiseFn | None = None, ) -> HopfieldResult: model = params or HopfieldParams() gaussian_noise = noise or make_gaussian_noise() input_notes = encode_note_sequence(notes) inputs = [[0.0 for _ in range(SEQUENCE_LENGTH)] for _ in range(NOTE_VOCABULARY_SIZE)] for note_index in range(NOTE_VOCABULARY_SIZE): for position in range(SEQUENCE_LENGTH): note_value = input_notes[position] if note_value == 0: current_input = gaussian_noise(0.5, 0.25) elif note_value == note_index + 1: current_input = 0.67 + gaussian_noise(0.0, 0.1) else: current_input = 0.33 + gaussian_noise(0.0, 0.1) inputs[note_index][position] = current_input output_notes = list(input_notes) run_result = run_hopfield_network( tuple(tuple(row) for row in inputs), weight_matrix, params=model, ) for position in range(SEQUENCE_LENGTH): output_notes[position] = _max_cell_in_column(run_result.state.outputs, position) return HopfieldResult( input_notes=input_notes, output_notes=tuple(output_notes), candidate_note=output_notes[-1], iterations=run_result.iterations, activations=run_result.state.activations, outputs=run_result.state.outputs, ) def _validate_weight_matrix( weight_matrix: tuple[tuple[float, ...], ...], *, active_size: int, ) -> None: if len(weight_matrix) < active_size: raise ValueError(f"weight matrix needs at least {active_size} rows") if any(len(row) < active_size for row in weight_matrix[:active_size]): raise ValueError(f"weight matrix needs at least {active_size} columns") def _update_outputs( activations: list[list[list[float]]], outputs: list[list[list[float]]], time_step: int, params: HopfieldParams, row_count: int, column_count: int, ) -> None: for row_index in range(row_count): for column_index in range(column_count): outputs[time_step][row_index][column_index] = 0.5 * ( 1.0 + tanh_clamped( activations[time_step][row_index][column_index] / params.global_capacitance ) ) def _done( outputs: list[list[list[float]]], epsilon: float, row_count: int, column_count: int, ) -> bool: for row_index in range(row_count): for column_index in range(column_count): if abs(outputs[0][row_index][column_index] - outputs[1][row_index][column_index]) > epsilon: return False return True def _weight_coord(row_index: int, column_index: int, row_count: int) -> int: return row_count * column_index + row_index def _delta_neuron_activation( *, row_index: int, column_index: int, row_count: int, column_count: int, time_step: int, activations: list[list[list[float]]], outputs: list[list[list[float]]], inputs: list[list[list[float]]], weight_matrix: tuple[tuple[float, ...], ...], params: HopfieldParams, ) -> float: weight_sum = 0.0 current_index = _weight_coord(row_index, column_index, row_count) for other_row in range(row_count): for other_column in range(column_count): other_index = _weight_coord(other_row, other_column, row_count) weight_sum += ( weight_matrix[current_index][other_index] * params.weight_scale * outputs[time_step][other_row][other_column] ) activation = activations[time_step][row_index][column_index] neuron_input = inputs[time_step][row_index][column_index] numerator = ( -(activation / (params.global_resistance * params.resistance_scale)) + (neuron_input * params.input_scale) + weight_sum ) return numerator / (params.global_capacitance * params.capacitance_scale) def _max_cell_in_column( output_grid: list[list[float]] | tuple[tuple[float, ...], ...], position: int, ) -> int: max_value = 0.0 max_note = 1 for note_index in range(NOTE_VOCABULARY_SIZE): output = output_grid[note_index][position] if output > max_value: max_value = output max_note = note_index + 1 return max_note