Migrate Track 1 simulation kernel into renunney
This commit is contained in:
parent
83350e68c8
commit
7c9fcd0dd4
|
|
@ -24,6 +24,7 @@ plane and the Track 1 runner/API boundary are now local to `renunney`.
|
|||
- a local Track 1 runner and config/API layer,
|
||||
- a local Track 1 analysis layer for tracking summaries and loci-regression,
|
||||
- a local Track 1 threshold/search layer for Nunney-style threshold checks,
|
||||
- a local Track 1 simulation kernel,
|
||||
- a Makefile for common tasks,
|
||||
- migration notes for pulling code into this repo in stages.
|
||||
|
||||
|
|
@ -86,8 +87,10 @@ The current state is split:
|
|||
- Track 1 runner and config/API layer: local to `renunney`
|
||||
- Track 1 analysis layer: local to `renunney`
|
||||
- Track 1 threshold/search layer: local to `renunney`
|
||||
- Track 1 simulation backend: still in the older `cost_of_substitution`
|
||||
directory and imported through the local compatibility layer
|
||||
- Track 1 simulation kernel: local to `renunney`
|
||||
- Track 1 report, dataset, fit, and extinction-model helpers: still imported
|
||||
from the older `cost_of_substitution` directory through the local
|
||||
compatibility layer
|
||||
|
||||
This repo is now the clean operational entry point while the simulation code is
|
||||
migrated in later stages.
|
||||
|
|
|
|||
|
|
@ -31,16 +31,14 @@ Operational code still lives in:
|
|||
- `src/renunney/track1_analysis.py`
|
||||
5. Track 1 threshold/search boundary has been migrated locally:
|
||||
- `src/renunney/track1_threshold.py`
|
||||
6. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable.
|
||||
7. Migrate the Track 1 simulation core after the runner path is stable:
|
||||
- `python/track1_reference.py`
|
||||
- `python/track1_threshold.py`
|
||||
- `python/track1_analysis.py`
|
||||
8. Migrate report, dataset, fit, and orchestration-adjacent Track 1 modules after the kernel boundary is stable:
|
||||
6. Track 1 simulation kernel has been migrated locally:
|
||||
- `src/renunney/track1_reference.py`
|
||||
7. Migrate report, dataset, fit, and orchestration-adjacent Track 1 modules next:
|
||||
- `python/track1_report.py`
|
||||
- `python/track1_dataset.py`
|
||||
- `python/track1_fit.py`
|
||||
- `python/track1_extinction.py`
|
||||
8. Reduce or remove the remaining compatibility-layer imports after those modules are local.
|
||||
9. Migrate docs and example configs last, after path references are updated.
|
||||
|
||||
## Constraint
|
||||
|
|
|
|||
|
|
@ -47,8 +47,9 @@ make status
|
|||
## Current Assumption
|
||||
|
||||
The Makefile now drives the local orchestration code in `renunney`, while the
|
||||
Track 1 runner/API boundary, analysis layer, and threshold/search layer are
|
||||
also local to `renunney`. The simulation kernel is still imported from the
|
||||
legacy `cost_of_substitution` directory through the compatibility layer in
|
||||
Track 1 runner/API boundary, analysis layer, threshold/search layer, and
|
||||
simulation kernel are also local to `renunney`. The remaining Track 1
|
||||
report/dataset/fit helpers are still imported from the legacy
|
||||
`cost_of_substitution` directory through the compatibility layer in
|
||||
`src/renunney/legacy.py`. The paper-scale Figure 1 configs used for submission
|
||||
are now local to `renunney/config`.
|
||||
|
|
|
|||
|
|
@ -28,6 +28,26 @@ from .track1_analysis import (
|
|||
sweep_number_of_loci,
|
||||
)
|
||||
from .track1_api import Track1RunConfig, config_from_mapping, load_config, run_config, save_payload
|
||||
from .track1_reference import (
|
||||
GenerationSummary,
|
||||
PopulationState,
|
||||
Track1Parameters,
|
||||
allele_tracking_metrics,
|
||||
approximate_ne,
|
||||
expected_female_productivity,
|
||||
expected_mutations_for_population,
|
||||
female_fecundity,
|
||||
female_fraction,
|
||||
generation_metrics,
|
||||
genotype_fitness,
|
||||
initialize_population,
|
||||
is_extinct,
|
||||
paper_mutation_supply_M,
|
||||
realize_birth_counts,
|
||||
simulate_one_generation,
|
||||
simulate_run,
|
||||
summarize_generation,
|
||||
)
|
||||
from .track1_threshold import (
|
||||
ThresholdCheck,
|
||||
ThresholdSearchResult,
|
||||
|
|
@ -56,6 +76,8 @@ __all__ = [
|
|||
"LinearCostFit",
|
||||
"LocusThresholdRow",
|
||||
"NumberOfLociSweep",
|
||||
"GenerationSummary",
|
||||
"PopulationState",
|
||||
"repo_root",
|
||||
"run_one_job",
|
||||
"run_worker_loop",
|
||||
|
|
@ -64,17 +86,33 @@ __all__ = [
|
|||
"ThresholdCheck",
|
||||
"ThresholdSearchResult",
|
||||
"TrackingSummary",
|
||||
"Track1Parameters",
|
||||
"Track1RunConfig",
|
||||
"allele_tracking_metrics",
|
||||
"approximate_ne",
|
||||
"config_from_mapping",
|
||||
"expected_female_productivity",
|
||||
"expected_mutations_for_population",
|
||||
"evaluate_threshold_candidate",
|
||||
"female_fecundity",
|
||||
"female_fraction",
|
||||
"fit_linear_cost_by_loci",
|
||||
"generation_metrics",
|
||||
"genotype_fitness",
|
||||
"initialize_population",
|
||||
"is_extinct",
|
||||
"load_config",
|
||||
"nunney_threshold_accepts",
|
||||
"paper_mutation_supply_M",
|
||||
"published_threshold_accepts",
|
||||
"realize_birth_counts",
|
||||
"run_config",
|
||||
"simulate_one_generation",
|
||||
"simulate_run",
|
||||
"run_extinction_check",
|
||||
"save_payload",
|
||||
"search_threshold_over_candidates",
|
||||
"summarize_tracking",
|
||||
"summarize_generation",
|
||||
"sweep_number_of_loci",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -14,13 +14,9 @@ from typing import Iterable, Optional
|
|||
|
||||
import numpy as np
|
||||
|
||||
from .legacy import ensure_legacy_python_path
|
||||
from .track1_reference import GenerationSummary, Track1Parameters
|
||||
from .track1_threshold import ThresholdSearchResult, search_threshold_over_candidates
|
||||
|
||||
ensure_legacy_python_path()
|
||||
|
||||
from track1_reference import GenerationSummary, Track1Parameters
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LocusThresholdRow:
|
||||
|
|
|
|||
|
|
@ -16,13 +16,13 @@ from typing import Any, Optional
|
|||
|
||||
from .legacy import ensure_legacy_python_path
|
||||
from .track1_analysis import summarize_tracking, sweep_number_of_loci
|
||||
from .track1_reference import Track1Parameters, simulate_run
|
||||
from .track1_threshold import evaluate_threshold_candidate, search_threshold_over_candidates
|
||||
|
||||
ensure_legacy_python_path()
|
||||
|
||||
from track1_dataset import generate_extinction_dataset
|
||||
from track1_fit import class_balance, fit_payload_from_jsonl, load_jsonl
|
||||
from track1_reference import Track1Parameters, simulate_run
|
||||
from track1_report import generate_report_bundle
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,428 @@
|
|||
"""
|
||||
track1_reference.py
|
||||
|
||||
Local Track 1 reference module for renunney.
|
||||
|
||||
This is the historically faithful Nunney-style simulation kernel used by the
|
||||
local analysis, threshold, and orchestration layers.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclass(frozen=True, init=False)
|
||||
class Track1Parameters:
|
||||
"""Reference parameters for the Track 1 baseline."""
|
||||
|
||||
K: int
|
||||
N0: int
|
||||
n: int
|
||||
u: float
|
||||
R: float
|
||||
T: int
|
||||
epochs: int = 8
|
||||
p: float = 0.5
|
||||
a_max: Optional[int] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
K: int,
|
||||
N0: int,
|
||||
n: int,
|
||||
u: float | None = None,
|
||||
R: float = 10.0,
|
||||
T: int = 300,
|
||||
epochs: int = 8,
|
||||
p: float = 0.5,
|
||||
a_max: Optional[int] = None,
|
||||
M: float | None = None,
|
||||
) -> None:
|
||||
if u is None:
|
||||
if M is None:
|
||||
raise ValueError("Track1Parameters requires u, or convenience M to derive u.")
|
||||
u = float(M / (2.0 * K))
|
||||
object.__setattr__(self, "K", int(K))
|
||||
object.__setattr__(self, "N0", int(N0))
|
||||
object.__setattr__(self, "n", int(n))
|
||||
object.__setattr__(self, "u", float(u))
|
||||
object.__setattr__(self, "R", float(R))
|
||||
object.__setattr__(self, "T", int(T))
|
||||
object.__setattr__(self, "epochs", int(epochs))
|
||||
object.__setattr__(self, "p", float(p))
|
||||
object.__setattr__(self, "a_max", a_max)
|
||||
|
||||
@property
|
||||
def M(self) -> float:
|
||||
return float(2.0 * self.K * self.u)
|
||||
|
||||
def resolved_a_max(self) -> int:
|
||||
if self.a_max is not None:
|
||||
return self.a_max
|
||||
return self.epochs
|
||||
|
||||
def intrinsic_growth_rate(self) -> float:
|
||||
return float(np.log(self.R / 2.0))
|
||||
|
||||
def total_generations(self) -> int:
|
||||
return self.epochs * self.T + (self.T // 2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PopulationState:
|
||||
"""Diploid population state for one generation."""
|
||||
|
||||
genomes: np.ndarray
|
||||
sexes: np.ndarray
|
||||
|
||||
@property
|
||||
def size(self) -> int:
|
||||
return int(self.genomes.shape[0])
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GenerationSummary:
|
||||
"""Per-generation diagnostics for Track 1."""
|
||||
|
||||
t: int
|
||||
N: int
|
||||
female_fraction: float
|
||||
male_count: int
|
||||
female_count: int
|
||||
fecundity: float
|
||||
mean_fitness: float
|
||||
mean_expected_female_productivity: float
|
||||
target_value: float
|
||||
mean_allele_value: float
|
||||
mean_genotype_value: float
|
||||
mean_tracking_gap: float
|
||||
paper_M: float
|
||||
expected_mutations_current_N: float
|
||||
realized_mutation_count: int
|
||||
realized_mutation_rate_per_allele: float
|
||||
birth_count: int
|
||||
surviving_offspring_count: int
|
||||
ne_approx: float
|
||||
extinct: bool
|
||||
|
||||
|
||||
def initialize_population(params: Track1Parameters, rng: np.random.Generator) -> PopulationState:
|
||||
genomes = np.zeros((params.N0, 2, params.n), dtype=np.int16)
|
||||
sexes = rng.binomial(1, 1.0 - params.p, size=params.N0).astype(np.int8)
|
||||
return PopulationState(genomes=genomes, sexes=sexes)
|
||||
|
||||
|
||||
def female_fecundity(r: float, N: int, K: int) -> float:
|
||||
return float(2.0 * np.exp(r * (1.0 - (N / K) ** (1.0 / r))))
|
||||
|
||||
|
||||
def genotype_fitness(genomes: np.ndarray, r: float, T: int, t: int) -> np.ndarray:
|
||||
target = t / T
|
||||
locus_means = 0.5 * (genomes[:, 0, :] + genomes[:, 1, :])
|
||||
squared_distance = np.square(locus_means - target, dtype=np.float64)
|
||||
return np.exp(-(r / genomes.shape[2]) * np.sum(squared_distance, axis=1))
|
||||
|
||||
|
||||
def expected_female_productivity(fecundity: float, fitness: np.ndarray) -> np.ndarray:
|
||||
return fecundity * fitness
|
||||
|
||||
|
||||
def paper_mutation_supply_M(K: int, u: float) -> float:
|
||||
return float(2.0 * K * u)
|
||||
|
||||
|
||||
def expected_mutations_for_population(N: int, n: int, u: float) -> float:
|
||||
return float(2.0 * N * n * u)
|
||||
|
||||
|
||||
def allele_tracking_metrics(genomes: np.ndarray, T: int, t: int) -> tuple[float, float, float, float]:
|
||||
target_value = float(t / T)
|
||||
if genomes.size == 0:
|
||||
return target_value, 0.0, 0.0, -target_value
|
||||
mean_allele_value = float(np.mean(genomes))
|
||||
genotype_values = 0.5 * (genomes[:, 0, :] + genomes[:, 1, :])
|
||||
mean_genotype_value = float(np.mean(genotype_values))
|
||||
mean_tracking_gap = mean_genotype_value - target_value
|
||||
return target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap
|
||||
|
||||
|
||||
def approximate_ne(N: int, female_fraction: float) -> float:
|
||||
male_fraction = 1.0 - female_fraction
|
||||
denom = female_fraction + male_fraction
|
||||
if N <= 0 or female_fraction <= 0.0 or male_fraction <= 0.0 or denom == 0.0:
|
||||
return 0.0
|
||||
return float((2.0 * N * female_fraction * male_fraction) / denom)
|
||||
|
||||
|
||||
def female_fraction(sexes: np.ndarray) -> float:
|
||||
if sexes.size == 0:
|
||||
return 0.0
|
||||
return float(np.sum(sexes == 0) / sexes.size)
|
||||
|
||||
|
||||
def is_extinct(state: PopulationState) -> bool:
|
||||
if state.size == 0:
|
||||
return True
|
||||
female_count = int(np.sum(state.sexes == 0))
|
||||
male_count = int(np.sum(state.sexes == 1))
|
||||
return female_count == 0 or male_count == 0
|
||||
|
||||
|
||||
def choose_gamete(parent: np.ndarray, rng: np.random.Generator) -> np.ndarray:
|
||||
n = parent.shape[1]
|
||||
picks = rng.integers(0, 2, size=n)
|
||||
gamete = np.empty(n, dtype=parent.dtype)
|
||||
gamete[:] = parent[picks, np.arange(n)]
|
||||
return gamete
|
||||
|
||||
|
||||
def choose_mutant_allele(current: int, a_max: int, rng: np.random.Generator) -> int:
|
||||
if a_max <= 0:
|
||||
return current
|
||||
draw = int(rng.integers(0, a_max))
|
||||
return draw if draw < current else draw + 1
|
||||
|
||||
|
||||
def mutate_zygote(zygote: np.ndarray, params: Track1Parameters, rng: np.random.Generator) -> np.ndarray:
|
||||
a_max = params.resolved_a_max()
|
||||
out = zygote.copy()
|
||||
for chrom in range(out.shape[0]):
|
||||
for locus in range(out.shape[1]):
|
||||
if rng.random() <= params.u:
|
||||
out[chrom, locus] = choose_mutant_allele(int(out[chrom, locus]), a_max, rng)
|
||||
return out
|
||||
|
||||
|
||||
def produce_offspring(
|
||||
mother: np.ndarray,
|
||||
father: np.ndarray,
|
||||
params: Track1Parameters,
|
||||
rng: np.random.Generator,
|
||||
locus_indices: np.ndarray,
|
||||
) -> tuple[np.ndarray, int]:
|
||||
n = params.n
|
||||
offspring = np.empty((2, n), dtype=mother.dtype)
|
||||
maternal_picks = rng.integers(0, 2, size=n)
|
||||
paternal_picks = rng.integers(0, 2, size=n)
|
||||
offspring[0, :] = mother[maternal_picks, locus_indices]
|
||||
offspring[1, :] = father[paternal_picks, locus_indices]
|
||||
a_max = params.resolved_a_max()
|
||||
mutation_count = 0
|
||||
for chrom in range(2):
|
||||
for locus in range(n):
|
||||
if rng.random() <= params.u:
|
||||
offspring[chrom, locus] = choose_mutant_allele(int(offspring[chrom, locus]), a_max, rng)
|
||||
mutation_count += 1
|
||||
return offspring, mutation_count
|
||||
|
||||
|
||||
def realize_birth_counts(fecundity: float, sexes: np.ndarray, rng: np.random.Generator) -> np.ndarray:
|
||||
female_mask = sexes == 0
|
||||
counts = np.zeros(sexes.shape[0], dtype=np.int32)
|
||||
counts[female_mask] = rng.poisson(fecundity, size=int(np.sum(female_mask)))
|
||||
return counts
|
||||
|
||||
|
||||
def summarize_generation(state: PopulationState, params: Track1Parameters, t: int) -> GenerationSummary:
|
||||
if state.size == 0:
|
||||
return GenerationSummary(
|
||||
t=t,
|
||||
N=0,
|
||||
female_fraction=0.0,
|
||||
male_count=0,
|
||||
female_count=0,
|
||||
fecundity=0.0,
|
||||
mean_fitness=0.0,
|
||||
mean_expected_female_productivity=0.0,
|
||||
target_value=float(t / params.T),
|
||||
mean_allele_value=0.0,
|
||||
mean_genotype_value=0.0,
|
||||
mean_tracking_gap=float(-(t / params.T)),
|
||||
paper_M=params.M,
|
||||
expected_mutations_current_N=0.0,
|
||||
realized_mutation_count=0,
|
||||
realized_mutation_rate_per_allele=0.0,
|
||||
birth_count=0,
|
||||
surviving_offspring_count=0,
|
||||
ne_approx=0.0,
|
||||
extinct=True,
|
||||
)
|
||||
fit, fec, exp_fp, ff, female_count, male_count = generation_metrics(state, params, t)
|
||||
mean_expected_fp = float(np.mean(exp_fp[state.sexes == 0])) if female_count > 0 else 0.0
|
||||
target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap = allele_tracking_metrics(
|
||||
state.genomes,
|
||||
T=params.T,
|
||||
t=t,
|
||||
)
|
||||
return GenerationSummary(
|
||||
t=t,
|
||||
N=state.size,
|
||||
female_fraction=ff,
|
||||
male_count=male_count,
|
||||
female_count=female_count,
|
||||
fecundity=fec,
|
||||
mean_fitness=float(np.mean(fit)),
|
||||
mean_expected_female_productivity=mean_expected_fp,
|
||||
target_value=target_value,
|
||||
mean_allele_value=mean_allele_value,
|
||||
mean_genotype_value=mean_genotype_value,
|
||||
mean_tracking_gap=mean_tracking_gap,
|
||||
paper_M=params.M,
|
||||
expected_mutations_current_N=expected_mutations_for_population(state.size, params.n, params.u),
|
||||
realized_mutation_count=0,
|
||||
realized_mutation_rate_per_allele=0.0,
|
||||
birth_count=0,
|
||||
surviving_offspring_count=0,
|
||||
ne_approx=approximate_ne(state.size, ff),
|
||||
extinct=is_extinct(state),
|
||||
)
|
||||
|
||||
|
||||
def generation_metrics(
|
||||
state: PopulationState,
|
||||
params: Track1Parameters,
|
||||
t: int,
|
||||
) -> tuple[np.ndarray, float, np.ndarray, float, int, int]:
|
||||
r = params.intrinsic_growth_rate()
|
||||
fit = genotype_fitness(state.genomes, r=r, T=params.T, t=t)
|
||||
fec = female_fecundity(r=r, N=state.size, K=params.K)
|
||||
exp_fp = expected_female_productivity(fec, fit)
|
||||
female_count = int(np.sum(state.sexes == 0))
|
||||
male_count = int(state.size - female_count)
|
||||
ff = float(female_count / state.size) if state.size > 0 else 0.0
|
||||
return fit, fec, exp_fp, ff, female_count, male_count
|
||||
|
||||
|
||||
def simulate_one_generation(
|
||||
state: PopulationState,
|
||||
params: Track1Parameters,
|
||||
t: int,
|
||||
rng: np.random.Generator,
|
||||
) -> tuple[PopulationState, GenerationSummary]:
|
||||
if state.size == 0:
|
||||
summary = summarize_generation(state, params, t)
|
||||
return state, summary
|
||||
|
||||
fit, fec, exp_fp, ff, female_count, male_count = generation_metrics(state, params, t)
|
||||
extinct = female_count == 0 or male_count == 0
|
||||
target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap = allele_tracking_metrics(
|
||||
state.genomes,
|
||||
T=params.T,
|
||||
t=t,
|
||||
)
|
||||
expected_mutations_current_N = expected_mutations_for_population(state.size, params.n, params.u)
|
||||
summary = GenerationSummary(
|
||||
t=t,
|
||||
N=state.size,
|
||||
female_fraction=ff,
|
||||
male_count=male_count,
|
||||
female_count=female_count,
|
||||
fecundity=fec,
|
||||
mean_fitness=float(np.mean(fit)),
|
||||
mean_expected_female_productivity=float(np.mean(exp_fp[state.sexes == 0])) if female_count > 0 else 0.0,
|
||||
target_value=target_value,
|
||||
mean_allele_value=mean_allele_value,
|
||||
mean_genotype_value=mean_genotype_value,
|
||||
mean_tracking_gap=mean_tracking_gap,
|
||||
paper_M=paper_mutation_supply_M(params.K, params.u),
|
||||
expected_mutations_current_N=expected_mutations_current_N,
|
||||
realized_mutation_count=0,
|
||||
realized_mutation_rate_per_allele=0.0,
|
||||
birth_count=0,
|
||||
surviving_offspring_count=0,
|
||||
ne_approx=approximate_ne(state.size, ff),
|
||||
extinct=extinct,
|
||||
)
|
||||
if extinct:
|
||||
return state, summary
|
||||
|
||||
birth_counts = realize_birth_counts(fec, state.sexes, rng)
|
||||
|
||||
female_indices = np.flatnonzero(state.sexes == 0)
|
||||
male_indices = np.flatnonzero(state.sexes == 1)
|
||||
total_births = int(np.sum(birth_counts[female_indices]))
|
||||
|
||||
new_genomes = np.zeros((total_births, 2, params.n), dtype=np.int16)
|
||||
new_sexes = np.zeros(total_births, dtype=np.int8)
|
||||
locus_indices = np.arange(params.n)
|
||||
r = params.intrinsic_growth_rate()
|
||||
offspring_t = t + 1
|
||||
|
||||
survivor_cursor = 0
|
||||
realized_mutation_count = 0
|
||||
for mother_index in female_indices:
|
||||
count = int(birth_counts[mother_index])
|
||||
if count == 0:
|
||||
continue
|
||||
father_index = int(male_indices[rng.integers(0, male_indices.size)])
|
||||
for _ in range(count):
|
||||
offspring, offspring_mutations = produce_offspring(
|
||||
state.genomes[mother_index],
|
||||
state.genomes[father_index],
|
||||
params,
|
||||
rng,
|
||||
locus_indices,
|
||||
)
|
||||
realized_mutation_count += offspring_mutations
|
||||
offspring_fitness = float(
|
||||
genotype_fitness(offspring[np.newaxis, :, :], r=r, T=params.T, t=offspring_t)[0]
|
||||
)
|
||||
if rng.random() <= offspring_fitness:
|
||||
new_genomes[survivor_cursor] = offspring
|
||||
new_sexes[survivor_cursor] = int(rng.binomial(1, 1.0 - params.p))
|
||||
survivor_cursor += 1
|
||||
|
||||
allele_exposures = 2 * total_births * params.n
|
||||
next_state = PopulationState(genomes=new_genomes[:survivor_cursor], sexes=new_sexes[:survivor_cursor])
|
||||
summary = GenerationSummary(
|
||||
t=summary.t,
|
||||
N=summary.N,
|
||||
female_fraction=summary.female_fraction,
|
||||
male_count=summary.male_count,
|
||||
female_count=summary.female_count,
|
||||
fecundity=summary.fecundity,
|
||||
mean_fitness=summary.mean_fitness,
|
||||
mean_expected_female_productivity=summary.mean_expected_female_productivity,
|
||||
target_value=summary.target_value,
|
||||
mean_allele_value=summary.mean_allele_value,
|
||||
mean_genotype_value=summary.mean_genotype_value,
|
||||
mean_tracking_gap=summary.mean_tracking_gap,
|
||||
paper_M=summary.paper_M,
|
||||
expected_mutations_current_N=summary.expected_mutations_current_N,
|
||||
realized_mutation_count=realized_mutation_count,
|
||||
realized_mutation_rate_per_allele=0.0
|
||||
if allele_exposures == 0
|
||||
else float(realized_mutation_count / allele_exposures),
|
||||
birth_count=total_births,
|
||||
surviving_offspring_count=survivor_cursor,
|
||||
ne_approx=summary.ne_approx,
|
||||
extinct=summary.extinct,
|
||||
)
|
||||
return next_state, summary
|
||||
|
||||
|
||||
def simulate_run(
|
||||
params: Track1Parameters,
|
||||
seed: Optional[int] = None,
|
||||
) -> list[GenerationSummary]:
|
||||
rng = np.random.default_rng(seed)
|
||||
state = initialize_population(params, rng)
|
||||
t = -(params.T // 2)
|
||||
summaries: list[GenerationSummary] = []
|
||||
|
||||
for _ in range(params.total_generations()):
|
||||
state, summary = simulate_one_generation(state, params, t, rng)
|
||||
summaries.append(summary)
|
||||
if is_extinct(state):
|
||||
terminal_t = t if summary.extinct else t + 1
|
||||
terminal_summary = summarize_generation(state, params, terminal_t)
|
||||
if summaries[-1] != terminal_summary:
|
||||
summaries.append(terminal_summary)
|
||||
break
|
||||
t += 1
|
||||
|
||||
return summaries
|
||||
|
|
@ -15,11 +15,7 @@ import json
|
|||
from pathlib import Path
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from .legacy import ensure_legacy_python_path
|
||||
|
||||
ensure_legacy_python_path()
|
||||
|
||||
from track1_reference import Track1Parameters, simulate_run
|
||||
from .track1_reference import Track1Parameters, simulate_run
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
SRC_DIR = ROOT / "src"
|
||||
if str(SRC_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(SRC_DIR))
|
||||
|
||||
import renunney.track1_reference as ref
|
||||
|
||||
|
||||
def test_zero_mutation_preserves_zero_alleles_in_one_generation():
|
||||
params = ref.Track1Parameters(K=5000, N0=2, n=2, u=0.0, R=10.0, T=20)
|
||||
state = ref.PopulationState(
|
||||
genomes=np.zeros((2, 2, 2), dtype=np.int16),
|
||||
sexes=np.array([0, 1], dtype=np.int8),
|
||||
)
|
||||
next_state, summary = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(1))
|
||||
assert summary.mean_allele_value == 0.0
|
||||
assert int(next_state.genomes.sum()) == 0
|
||||
|
||||
|
||||
def test_zero_offspring_fitness_prevents_survival(monkeypatch):
|
||||
params = ref.Track1Parameters(K=5000, N0=2, n=1, u=0.0, R=10.0, T=20)
|
||||
state = ref.PopulationState(
|
||||
genomes=np.zeros((2, 2, 1), dtype=np.int16),
|
||||
sexes=np.array([0, 1], dtype=np.int8),
|
||||
)
|
||||
|
||||
def fake_fitness(genomes, r, T, t):
|
||||
if genomes.shape[0] == 1:
|
||||
return np.zeros(1, dtype=float)
|
||||
return np.ones(genomes.shape[0], dtype=float)
|
||||
|
||||
monkeypatch.setattr(ref, "genotype_fitness", fake_fitness)
|
||||
monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0)
|
||||
monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0], dtype=np.int32))
|
||||
|
||||
next_state, _ = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(2))
|
||||
assert next_state.size == 0
|
||||
|
||||
|
||||
def test_unit_offspring_fitness_keeps_all_births(monkeypatch):
|
||||
params = ref.Track1Parameters(K=5000, N0=2, n=1, u=0.0, R=10.0, T=20)
|
||||
state = ref.PopulationState(
|
||||
genomes=np.zeros((2, 2, 1), dtype=np.int16),
|
||||
sexes=np.array([0, 1], dtype=np.int8),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(ref, "genotype_fitness", lambda genomes, r, T, t: np.ones(genomes.shape[0], dtype=float))
|
||||
monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0)
|
||||
monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0], dtype=np.int32))
|
||||
|
||||
next_state, summary = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(2))
|
||||
assert summary.female_count == 1
|
||||
assert next_state.size == 3
|
||||
|
||||
|
||||
def test_one_father_is_used_per_female_reproductive_event(monkeypatch):
|
||||
params = ref.Track1Parameters(K=5000, N0=3, n=2, u=0.0, R=10.0, T=20)
|
||||
state = ref.PopulationState(
|
||||
genomes=np.array(
|
||||
[
|
||||
[[0, 0], [0, 0]],
|
||||
[[1, 1], [1, 1]],
|
||||
[[2, 2], [2, 2]],
|
||||
],
|
||||
dtype=np.int16,
|
||||
),
|
||||
sexes=np.array([0, 1, 1], dtype=np.int8),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(ref, "genotype_fitness", lambda genomes, r, T, t: np.ones(genomes.shape[0], dtype=float))
|
||||
monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0)
|
||||
monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0, 0], dtype=np.int32))
|
||||
|
||||
next_state, _ = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(3))
|
||||
paternal_alleles = next_state.genomes[:, 1, :]
|
||||
assert next_state.size == 3
|
||||
assert np.unique(paternal_alleles).size == 1
|
||||
|
||||
|
||||
def test_absence_of_males_or_females_counts_as_extinction():
|
||||
female_only = ref.PopulationState(
|
||||
genomes=np.zeros((2, 2, 1), dtype=np.int16),
|
||||
sexes=np.array([0, 0], dtype=np.int8),
|
||||
)
|
||||
male_only = ref.PopulationState(
|
||||
genomes=np.zeros((2, 2, 1), dtype=np.int16),
|
||||
sexes=np.array([1, 1], dtype=np.int8),
|
||||
)
|
||||
assert ref.is_extinct(female_only) is True
|
||||
assert ref.is_extinct(male_only) is True
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
SRC_DIR = ROOT / "src"
|
||||
if str(SRC_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(SRC_DIR))
|
||||
|
||||
import renunney.track1_reference as ref
|
||||
|
||||
|
||||
def test_fecundity_matches_r_definition():
|
||||
params = ref.Track1Parameters(K=5000, N0=50, n=1, u=5e-6, R=10.0, T=40)
|
||||
r = params.intrinsic_growth_rate()
|
||||
assert ref.female_fecundity(r, N=0, K=params.K) == pytest_approx(10.0)
|
||||
|
||||
|
||||
def test_initialize_population_shapes_and_allele_zero_fixation():
|
||||
params = ref.Track1Parameters(K=5000, N0=12, n=3, u=5e-6, R=10.0, T=40)
|
||||
state = ref.initialize_population(params, ref.np.random.default_rng(1))
|
||||
assert state.genomes.shape == (12, 2, 3)
|
||||
assert state.sexes.shape == (12,)
|
||||
assert int(state.genomes.sum()) == 0
|
||||
|
||||
|
||||
def test_simulate_run_returns_generation_summaries():
|
||||
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
|
||||
out = ref.simulate_run(params, seed=2)
|
||||
assert out
|
||||
assert out[0].N == 20
|
||||
assert out[0].t == -(params.T // 2)
|
||||
|
||||
|
||||
def test_generation_summary_reports_tracking_fields_for_initial_population():
|
||||
params = ref.Track1Parameters(K=5000, N0=20, n=2, u=5e-6, R=10.0, T=20)
|
||||
out = ref.simulate_run(params, seed=2)
|
||||
first = out[0]
|
||||
assert first.mean_allele_value == pytest_approx(0.0)
|
||||
assert first.mean_genotype_value == pytest_approx(0.0)
|
||||
assert first.target_value == pytest_approx(-0.5)
|
||||
assert first.mean_tracking_gap == pytest_approx(0.5)
|
||||
assert first.paper_M == pytest_approx(0.05)
|
||||
assert first.expected_mutations_current_N == pytest_approx(0.0004)
|
||||
|
||||
|
||||
def test_generation_summary_reports_realized_mutations_for_high_u():
|
||||
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=0.5, R=10.0, T=20)
|
||||
out = ref.simulate_run(params, seed=2)
|
||||
first = out[0]
|
||||
assert first.realized_mutation_count >= 0
|
||||
assert 0.0 <= first.realized_mutation_rate_per_allele <= 1.0
|
||||
|
||||
|
||||
def pytest_approx(value: float, tol: float = 1e-9):
|
||||
class Approx:
|
||||
def __eq__(self, other):
|
||||
return abs(other - value) <= tol
|
||||
|
||||
return Approx()
|
||||
Loading…
Reference in New Issue