from __future__ import annotations from dataclasses import dataclass from pathlib import Path import time from .beethoven import BeethovenCategorizer, BeethovenResult from .hopfield import HopfieldResult, generate_next_note from .io.legacy_files import extract_active_hopfield_submatrix, load_hopfield_weight_matrix from .salieri import SalieriCritic, SalieriResult from .types import CompositionContext, CompositionRecord, LegacyPaths @dataclass(frozen=True) class PipelineStep: context: CompositionContext hopfield: HopfieldResult salieri: SalieriResult beethoven: BeethovenResult accepted: bool objects: bool elapsed_seconds: float class CompositionPipeline: def __init__( self, *, hopfield_weights: tuple[tuple[float, ...], ...], salieri: SalieriCritic, beethoven: BeethovenCategorizer, object_threshold: int = 3, ) -> None: self.hopfield_weights = hopfield_weights self.salieri = salieri self.beethoven = beethoven self.object_threshold = object_threshold @classmethod def from_legacy_data(cls, root: str | Path) -> "CompositionPipeline": return cls.from_legacy_data_with_options(root) @classmethod def from_legacy_data_with_options( cls, root: str | Path, *, object_threshold: int = 3, art_vigilance: float = 0.9, art_vigilance_decay: float = 0.99, ) -> "CompositionPipeline": paths = LegacyPaths(root=Path(root)) hopfield_weights = extract_active_hopfield_submatrix( load_hopfield_weight_matrix(paths.hopfield_weights) ) salieri = SalieriCritic.from_legacy_paths(paths.root) beethoven = BeethovenCategorizer.with_params( vigilance=art_vigilance, vigilance_decay=art_vigilance_decay, ) return cls( hopfield_weights=hopfield_weights, salieri=salieri, beethoven=beethoven, object_threshold=object_threshold, ) def neural_step(self, context: CompositionContext) -> PipelineStep: start_time = time.perf_counter() hopfield = generate_next_note(context.notes, self.hopfield_weights) salieri = self.salieri.evaluate_and_train(hopfield.output_notes) beethoven = self.beethoven.categorize( hopfield.output_notes, is_classical=salieri.is_classical, ) since_novelty = 0 if (beethoven.art_result.delta_vigilance or beethoven.art_result.new_category) else context.since_novelty + 1 frustration = context.frustration + 1 if beethoven.art_result.delta_vigilance else context.frustration objects = since_novelty >= self.object_threshold if objects: since_novelty = 0 accepted = not ( (objects and salieri.is_classical) or ((not objects) and (not salieri.is_classical)) ) if accepted: next_context = CompositionContext( notes=hopfield.output_notes, delta_vigilance=beethoven.art_result.delta_vigilance, new_category=beethoven.art_result.new_category, is_classical=salieri.is_classical, candidate_note=hopfield.candidate_note, since_novelty=since_novelty, frustration=0, note_count=context.note_count + 1, ) else: reset_notes = list(hopfield.output_notes) reset_notes[-1] = 0 next_context = CompositionContext( notes=tuple(reset_notes), delta_vigilance=beethoven.art_result.delta_vigilance, new_category=beethoven.art_result.new_category, is_classical=salieri.is_classical, candidate_note=hopfield.candidate_note, since_novelty=since_novelty, frustration=frustration + 1, note_count=context.note_count, ) return PipelineStep( context=next_context, hopfield=hopfield, salieri=salieri, beethoven=beethoven, accepted=accepted, objects=objects, elapsed_seconds=time.perf_counter() - start_time, ) def step_until_accepted( self, context: CompositionContext, *, max_attempts: int = 500, ) -> PipelineStep: current = context for _ in range(max_attempts): step = self.neural_step(current) if step.accepted: return step current = step.context raise RuntimeError("failed to accept a note within max_attempts") def compose( self, *, max_notes: int, initial_context: CompositionContext | None = None, max_attempts_per_note: int = 500, ) -> CompositionRecord: compose_start = time.perf_counter() context = initial_context or CompositionContext() accepted_notes: list[int] = [] per_note_seconds: list[float] = [] for _ in range(max_notes): note_start = time.perf_counter() step = self.step_until_accepted(context, max_attempts=max_attempts_per_note) accepted_notes.append(step.context.notes[-1]) per_note_seconds.append(time.perf_counter() - note_start) context = step.context return CompositionRecord( notes=tuple(accepted_notes), per_note_seconds=tuple(per_note_seconds), total_seconds=time.perf_counter() - compose_start, )