158 lines
5.5 KiB
Python
158 lines
5.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import time
|
|
|
|
from .beethoven import BeethovenCategorizer, BeethovenResult
|
|
from .hopfield import HopfieldResult, generate_next_note
|
|
from .io.legacy_files import extract_active_hopfield_submatrix, load_hopfield_weight_matrix
|
|
from .salieri import SalieriCritic, SalieriResult
|
|
from .types import CompositionContext, CompositionRecord, LegacyPaths
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PipelineStep:
|
|
context: CompositionContext
|
|
hopfield: HopfieldResult
|
|
salieri: SalieriResult
|
|
beethoven: BeethovenResult
|
|
accepted: bool
|
|
objects: bool
|
|
elapsed_seconds: float
|
|
|
|
|
|
class CompositionPipeline:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
hopfield_weights: tuple[tuple[float, ...], ...],
|
|
salieri: SalieriCritic,
|
|
beethoven: BeethovenCategorizer,
|
|
object_threshold: int = 3,
|
|
) -> None:
|
|
self.hopfield_weights = hopfield_weights
|
|
self.salieri = salieri
|
|
self.beethoven = beethoven
|
|
self.object_threshold = object_threshold
|
|
|
|
@classmethod
|
|
def from_legacy_data(cls, root: str | Path) -> "CompositionPipeline":
|
|
return cls.from_legacy_data_with_options(root)
|
|
|
|
@classmethod
|
|
def from_legacy_data_with_options(
|
|
cls,
|
|
root: str | Path,
|
|
*,
|
|
object_threshold: int = 3,
|
|
art_vigilance: float = 0.9,
|
|
art_vigilance_decay: float = 0.99,
|
|
) -> "CompositionPipeline":
|
|
paths = LegacyPaths(root=Path(root))
|
|
hopfield_weights = extract_active_hopfield_submatrix(
|
|
load_hopfield_weight_matrix(paths.hopfield_weights)
|
|
)
|
|
salieri = SalieriCritic.from_legacy_paths(paths.root)
|
|
beethoven = BeethovenCategorizer.with_params(
|
|
vigilance=art_vigilance,
|
|
vigilance_decay=art_vigilance_decay,
|
|
)
|
|
return cls(
|
|
hopfield_weights=hopfield_weights,
|
|
salieri=salieri,
|
|
beethoven=beethoven,
|
|
object_threshold=object_threshold,
|
|
)
|
|
|
|
def neural_step(self, context: CompositionContext) -> PipelineStep:
|
|
start_time = time.perf_counter()
|
|
hopfield = generate_next_note(context.notes, self.hopfield_weights)
|
|
salieri = self.salieri.evaluate_and_train(hopfield.output_notes)
|
|
beethoven = self.beethoven.categorize(
|
|
hopfield.output_notes,
|
|
is_classical=salieri.is_classical,
|
|
)
|
|
|
|
since_novelty = 0 if (beethoven.art_result.delta_vigilance or beethoven.art_result.new_category) else context.since_novelty + 1
|
|
frustration = context.frustration + 1 if beethoven.art_result.delta_vigilance else context.frustration
|
|
objects = since_novelty >= self.object_threshold
|
|
if objects:
|
|
since_novelty = 0
|
|
|
|
accepted = not (
|
|
(objects and salieri.is_classical) or ((not objects) and (not salieri.is_classical))
|
|
)
|
|
|
|
if accepted:
|
|
next_context = CompositionContext(
|
|
notes=hopfield.output_notes,
|
|
delta_vigilance=beethoven.art_result.delta_vigilance,
|
|
new_category=beethoven.art_result.new_category,
|
|
is_classical=salieri.is_classical,
|
|
candidate_note=hopfield.candidate_note,
|
|
since_novelty=since_novelty,
|
|
frustration=0,
|
|
note_count=context.note_count + 1,
|
|
)
|
|
else:
|
|
reset_notes = list(hopfield.output_notes)
|
|
reset_notes[-1] = 0
|
|
next_context = CompositionContext(
|
|
notes=tuple(reset_notes),
|
|
delta_vigilance=beethoven.art_result.delta_vigilance,
|
|
new_category=beethoven.art_result.new_category,
|
|
is_classical=salieri.is_classical,
|
|
candidate_note=hopfield.candidate_note,
|
|
since_novelty=since_novelty,
|
|
frustration=frustration + 1,
|
|
note_count=context.note_count,
|
|
)
|
|
|
|
return PipelineStep(
|
|
context=next_context,
|
|
hopfield=hopfield,
|
|
salieri=salieri,
|
|
beethoven=beethoven,
|
|
accepted=accepted,
|
|
objects=objects,
|
|
elapsed_seconds=time.perf_counter() - start_time,
|
|
)
|
|
|
|
def step_until_accepted(
|
|
self,
|
|
context: CompositionContext,
|
|
*,
|
|
max_attempts: int = 500,
|
|
) -> PipelineStep:
|
|
current = context
|
|
for _ in range(max_attempts):
|
|
step = self.neural_step(current)
|
|
if step.accepted:
|
|
return step
|
|
current = step.context
|
|
raise RuntimeError("failed to accept a note within max_attempts")
|
|
|
|
def compose(
|
|
self,
|
|
*,
|
|
max_notes: int,
|
|
initial_context: CompositionContext | None = None,
|
|
max_attempts_per_note: int = 500,
|
|
) -> CompositionRecord:
|
|
compose_start = time.perf_counter()
|
|
context = initial_context or CompositionContext()
|
|
accepted_notes: list[int] = []
|
|
per_note_seconds: list[float] = []
|
|
for _ in range(max_notes):
|
|
note_start = time.perf_counter()
|
|
step = self.step_until_accepted(context, max_attempts=max_attempts_per_note)
|
|
accepted_notes.append(step.context.notes[-1])
|
|
per_note_seconds.append(time.perf_counter() - note_start)
|
|
context = step.context
|
|
return CompositionRecord(
|
|
notes=tuple(accepted_notes),
|
|
per_note_seconds=tuple(per_note_seconds),
|
|
total_seconds=time.perf_counter() - compose_start,
|
|
)
|