Synaptopus/src/synaptopus/backprop.py

260 lines
9.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
import json
import math
import random
from typing import Iterable
def sigmoid(x: float) -> float:
clamped = max(min(-x, 80.0), -80.0)
return 1.0 / (1.0 + math.exp(clamped))
@dataclass(frozen=True)
class BackpropLayerState:
activations: tuple[float, ...]
deltas: tuple[float, ...]
biases: tuple[float, ...]
@dataclass(frozen=True)
class BackpropResult:
outputs: tuple[float, ...]
loss: float
layer_states: tuple[BackpropLayerState, ...]
class BackpropNetwork:
def __init__(
self,
*,
layer_sizes: tuple[int, ...],
learning_rate: float,
momentum: float,
weights: list[list[list[float]]],
biases: list[list[float]],
) -> None:
if len(layer_sizes) < 2:
raise ValueError("layer_sizes must include at least input and output layers")
if any(size <= 0 for size in layer_sizes):
raise ValueError("all layer sizes must be positive")
if len(weights) != len(layer_sizes) - 1:
raise ValueError("weights must connect each adjacent layer")
if len(biases) != len(layer_sizes) - 1:
raise ValueError("biases must match the number of non-input layers")
self.layer_sizes = layer_sizes
self.learning_rate = learning_rate
self.momentum = momentum
self.weights = weights
self.biases = biases
self.last_weight_updates = [
[[0.0 for _ in neuron] for neuron in layer]
for layer in weights
]
self.last_bias_updates = [
[0.0 for _ in layer]
for layer in biases
]
@property
def input_size(self) -> int:
return self.layer_sizes[0]
@property
def output_size(self) -> int:
return self.layer_sizes[-1]
@property
def hidden_layers(self) -> tuple[int, ...]:
return self.layer_sizes[1:-1]
@classmethod
def random(
cls,
*,
input_size: int,
hidden_layers: tuple[int, ...],
output_size: int,
learning_rate: float = 0.5,
momentum: float = 0.1,
rng: random.Random | None = None,
) -> "BackpropNetwork":
generator = rng or random.Random()
layer_sizes = (input_size, *hidden_layers, output_size)
weights: list[list[list[float]]] = []
biases: list[list[float]] = []
for left_size, right_size in zip(layer_sizes[:-1], layer_sizes[1:]):
weights.append(
[
[generator.uniform(-1.0, 1.0) for _ in range(left_size)]
for _ in range(right_size)
]
)
biases.append([generator.uniform(-0.25, 0.25) for _ in range(right_size)])
return cls(
layer_sizes=layer_sizes,
learning_rate=learning_rate,
momentum=momentum,
weights=weights,
biases=biases,
)
def predict(self, inputs: Iterable[float]) -> BackpropResult:
activations = self._forward(inputs)
layer_states = tuple(
BackpropLayerState(
activations=tuple(layer_activation),
deltas=tuple(0.0 for _ in layer_activation),
biases=tuple(self.biases[layer_index - 1]),
)
for layer_index, layer_activation in enumerate(activations[1:], start=1)
)
return BackpropResult(
outputs=tuple(activations[-1]),
loss=0.0,
layer_states=layer_states,
)
def train_step(self, inputs: Iterable[float], targets: Iterable[float]) -> BackpropResult:
input_values = tuple(float(value) for value in inputs)
target_values = tuple(float(value) for value in targets)
if len(input_values) != self.input_size:
raise ValueError(f"expected {self.input_size} inputs, got {len(input_values)}")
if len(target_values) != self.output_size:
raise ValueError(f"expected {self.output_size} targets, got {len(target_values)}")
activations = self._forward(input_values)
deltas: list[list[float]] = [
[0.0 for _ in range(size)]
for size in self.layer_sizes[1:]
]
output_activations = activations[-1]
output_deltas: list[float] = []
losses: list[float] = []
for activation, target in zip(output_activations, target_values):
error = target - activation
losses.append(0.5 * error * error)
output_deltas.append(error * activation * (1.0 - activation))
deltas[-1] = output_deltas
for layer_index in range(len(deltas) - 2, -1, -1):
current_activations = activations[layer_index + 1]
next_weights = self.weights[layer_index + 1]
next_deltas = deltas[layer_index + 1]
current_deltas: list[float] = []
for neuron_index, activation in enumerate(current_activations):
downstream = 0.0
for next_neuron_index, next_delta in enumerate(next_deltas):
downstream += next_delta * next_weights[next_neuron_index][neuron_index]
current_deltas.append(activation * (1.0 - activation) * downstream)
deltas[layer_index] = current_deltas
for layer_index, (layer_weights, layer_biases) in enumerate(zip(self.weights, self.biases)):
source_activations = activations[layer_index]
layer_deltas = deltas[layer_index]
for neuron_index in range(len(layer_weights)):
bias_update = (
self.learning_rate * layer_deltas[neuron_index]
+ self.momentum * self.last_bias_updates[layer_index][neuron_index]
)
self.last_bias_updates[layer_index][neuron_index] = bias_update
layer_biases[neuron_index] += bias_update
for source_index in range(len(source_activations)):
update = (
self.learning_rate
* layer_deltas[neuron_index]
* source_activations[source_index]
)
update += (
self.momentum
* self.last_weight_updates[layer_index][neuron_index][source_index]
)
self.last_weight_updates[layer_index][neuron_index][source_index] = update
layer_weights[neuron_index][source_index] += update
layer_states = tuple(
BackpropLayerState(
activations=tuple(activations[layer_index + 1]),
deltas=tuple(deltas[layer_index]),
biases=tuple(self.biases[layer_index]),
)
for layer_index in range(len(deltas))
)
return BackpropResult(
outputs=tuple(activations[-1]),
loss=sum(losses),
layer_states=layer_states,
)
def _forward(self, inputs: Iterable[float]) -> list[list[float]]:
input_values = tuple(float(value) for value in inputs)
if len(input_values) != self.input_size:
raise ValueError(f"expected {self.input_size} inputs, got {len(input_values)}")
activations: list[list[float]] = [list(input_values)]
current = list(input_values)
for layer_weights, layer_biases in zip(self.weights, self.biases):
next_values: list[float] = []
for neuron_weights, bias in zip(layer_weights, layer_biases):
total = sum(weight * value for weight, value in zip(neuron_weights, current)) + bias
next_values.append(sigmoid(total))
activations.append(next_values)
current = next_values
return activations
def to_dict(self) -> dict[str, object]:
return {
"layer_sizes": list(self.layer_sizes),
"learning_rate": self.learning_rate,
"momentum": self.momentum,
"weights": self.weights,
"biases": self.biases,
"last_weight_updates": self.last_weight_updates,
"last_bias_updates": self.last_bias_updates,
}
@classmethod
def from_dict(cls, data: dict[str, object]) -> "BackpropNetwork":
network = cls(
layer_sizes=tuple(int(value) for value in data["layer_sizes"]), # type: ignore[index]
learning_rate=float(data["learning_rate"]),
momentum=float(data["momentum"]),
weights=[
[
[float(weight) for weight in neuron]
for neuron in layer
]
for layer in data["weights"] # type: ignore[index]
],
biases=[
[float(bias) for bias in layer]
for layer in data["biases"] # type: ignore[index]
],
)
network.last_weight_updates = [
[
[float(weight) for weight in neuron]
for neuron in layer
]
for layer in data.get("last_weight_updates", network.last_weight_updates) # type: ignore[arg-type]
]
network.last_bias_updates = [
[float(bias) for bias in layer]
for layer in data.get("last_bias_updates", network.last_bias_updates) # type: ignore[arg-type]
]
return network
def save_json(self, path: str) -> None:
with open(path, "w", encoding="utf-8") as handle:
json.dump(self.to_dict(), handle, indent=2)
@classmethod
def load_json(cls, path: str) -> "BackpropNetwork":
with open(path, "r", encoding="utf-8") as handle:
return cls.from_dict(json.load(handle))