From 7c9fcd0dd474a3784c5223d381d1234768565fca Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 11 Apr 2026 06:37:56 -0400 Subject: [PATCH] Migrate Track 1 simulation kernel into renunney --- README.md | 7 +- docs/MIGRATION.md | 10 +- docs/WORKFLOW.md | 7 +- src/renunney/__init__.py | 38 +++ src/renunney/track1_analysis.py | 6 +- src/renunney/track1_api.py | 2 +- src/renunney/track1_reference.py | 428 +++++++++++++++++++++++++++++++ src/renunney/track1_threshold.py | 6 +- tests/test_track1_dynamics.py | 95 +++++++ tests/test_track1_reference.py | 59 +++++ 10 files changed, 636 insertions(+), 22 deletions(-) create mode 100644 src/renunney/track1_reference.py create mode 100644 tests/test_track1_dynamics.py create mode 100644 tests/test_track1_reference.py diff --git a/README.md b/README.md index 283e14c..b54f2df 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ plane and the Track 1 runner/API boundary are now local to `renunney`. - a local Track 1 runner and config/API layer, - a local Track 1 analysis layer for tracking summaries and loci-regression, - a local Track 1 threshold/search layer for Nunney-style threshold checks, +- a local Track 1 simulation kernel, - a Makefile for common tasks, - migration notes for pulling code into this repo in stages. @@ -86,8 +87,10 @@ The current state is split: - Track 1 runner and config/API layer: local to `renunney` - Track 1 analysis layer: local to `renunney` - Track 1 threshold/search layer: local to `renunney` -- Track 1 simulation backend: still in the older `cost_of_substitution` - directory and imported through the local compatibility layer +- Track 1 simulation kernel: local to `renunney` +- Track 1 report, dataset, fit, and extinction-model helpers: still imported + from the older `cost_of_substitution` directory through the local + compatibility layer This repo is now the clean operational entry point while the simulation code is migrated in later stages. diff --git a/docs/MIGRATION.md b/docs/MIGRATION.md index ee884e7..1bb068b 100644 --- a/docs/MIGRATION.md +++ b/docs/MIGRATION.md @@ -31,16 +31,14 @@ Operational code still lives in: - `src/renunney/track1_analysis.py` 5. Track 1 threshold/search boundary has been migrated locally: - `src/renunney/track1_threshold.py` -6. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable. -7. Migrate the Track 1 simulation core after the runner path is stable: - - `python/track1_reference.py` - - `python/track1_threshold.py` - - `python/track1_analysis.py` -8. Migrate report, dataset, fit, and orchestration-adjacent Track 1 modules after the kernel boundary is stable: +6. Track 1 simulation kernel has been migrated locally: + - `src/renunney/track1_reference.py` +7. Migrate report, dataset, fit, and orchestration-adjacent Track 1 modules next: - `python/track1_report.py` - `python/track1_dataset.py` - `python/track1_fit.py` - `python/track1_extinction.py` +8. Reduce or remove the remaining compatibility-layer imports after those modules are local. 9. Migrate docs and example configs last, after path references are updated. ## Constraint diff --git a/docs/WORKFLOW.md b/docs/WORKFLOW.md index c3a71d9..549251a 100644 --- a/docs/WORKFLOW.md +++ b/docs/WORKFLOW.md @@ -47,8 +47,9 @@ make status ## Current Assumption The Makefile now drives the local orchestration code in `renunney`, while the -Track 1 runner/API boundary, analysis layer, and threshold/search layer are -also local to `renunney`. The simulation kernel is still imported from the -legacy `cost_of_substitution` directory through the compatibility layer in +Track 1 runner/API boundary, analysis layer, threshold/search layer, and +simulation kernel are also local to `renunney`. The remaining Track 1 +report/dataset/fit helpers are still imported from the legacy +`cost_of_substitution` directory through the compatibility layer in `src/renunney/legacy.py`. The paper-scale Figure 1 configs used for submission are now local to `renunney/config`. diff --git a/src/renunney/__init__.py b/src/renunney/__init__.py index eeca0b9..d851912 100644 --- a/src/renunney/__init__.py +++ b/src/renunney/__init__.py @@ -28,6 +28,26 @@ from .track1_analysis import ( sweep_number_of_loci, ) from .track1_api import Track1RunConfig, config_from_mapping, load_config, run_config, save_payload +from .track1_reference import ( + GenerationSummary, + PopulationState, + Track1Parameters, + allele_tracking_metrics, + approximate_ne, + expected_female_productivity, + expected_mutations_for_population, + female_fecundity, + female_fraction, + generation_metrics, + genotype_fitness, + initialize_population, + is_extinct, + paper_mutation_supply_M, + realize_birth_counts, + simulate_one_generation, + simulate_run, + summarize_generation, +) from .track1_threshold import ( ThresholdCheck, ThresholdSearchResult, @@ -56,6 +76,8 @@ __all__ = [ "LinearCostFit", "LocusThresholdRow", "NumberOfLociSweep", + "GenerationSummary", + "PopulationState", "repo_root", "run_one_job", "run_worker_loop", @@ -64,17 +86,33 @@ __all__ = [ "ThresholdCheck", "ThresholdSearchResult", "TrackingSummary", + "Track1Parameters", "Track1RunConfig", + "allele_tracking_metrics", + "approximate_ne", "config_from_mapping", + "expected_female_productivity", + "expected_mutations_for_population", "evaluate_threshold_candidate", + "female_fecundity", + "female_fraction", "fit_linear_cost_by_loci", + "generation_metrics", + "genotype_fitness", + "initialize_population", + "is_extinct", "load_config", "nunney_threshold_accepts", + "paper_mutation_supply_M", "published_threshold_accepts", + "realize_birth_counts", "run_config", + "simulate_one_generation", + "simulate_run", "run_extinction_check", "save_payload", "search_threshold_over_candidates", "summarize_tracking", + "summarize_generation", "sweep_number_of_loci", ] diff --git a/src/renunney/track1_analysis.py b/src/renunney/track1_analysis.py index ab8d2df..68e3529 100644 --- a/src/renunney/track1_analysis.py +++ b/src/renunney/track1_analysis.py @@ -14,13 +14,9 @@ from typing import Iterable, Optional import numpy as np -from .legacy import ensure_legacy_python_path +from .track1_reference import GenerationSummary, Track1Parameters from .track1_threshold import ThresholdSearchResult, search_threshold_over_candidates -ensure_legacy_python_path() - -from track1_reference import GenerationSummary, Track1Parameters - @dataclass(frozen=True) class LocusThresholdRow: diff --git a/src/renunney/track1_api.py b/src/renunney/track1_api.py index 8fb9a9d..829e22a 100644 --- a/src/renunney/track1_api.py +++ b/src/renunney/track1_api.py @@ -16,13 +16,13 @@ from typing import Any, Optional from .legacy import ensure_legacy_python_path from .track1_analysis import summarize_tracking, sweep_number_of_loci +from .track1_reference import Track1Parameters, simulate_run from .track1_threshold import evaluate_threshold_candidate, search_threshold_over_candidates ensure_legacy_python_path() from track1_dataset import generate_extinction_dataset from track1_fit import class_balance, fit_payload_from_jsonl, load_jsonl -from track1_reference import Track1Parameters, simulate_run from track1_report import generate_report_bundle diff --git a/src/renunney/track1_reference.py b/src/renunney/track1_reference.py new file mode 100644 index 0000000..47ee331 --- /dev/null +++ b/src/renunney/track1_reference.py @@ -0,0 +1,428 @@ +""" +track1_reference.py + +Local Track 1 reference module for renunney. + +This is the historically faithful Nunney-style simulation kernel used by the +local analysis, threshold, and orchestration layers. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +import numpy as np + + +@dataclass(frozen=True, init=False) +class Track1Parameters: + """Reference parameters for the Track 1 baseline.""" + + K: int + N0: int + n: int + u: float + R: float + T: int + epochs: int = 8 + p: float = 0.5 + a_max: Optional[int] = None + + def __init__( + self, + K: int, + N0: int, + n: int, + u: float | None = None, + R: float = 10.0, + T: int = 300, + epochs: int = 8, + p: float = 0.5, + a_max: Optional[int] = None, + M: float | None = None, + ) -> None: + if u is None: + if M is None: + raise ValueError("Track1Parameters requires u, or convenience M to derive u.") + u = float(M / (2.0 * K)) + object.__setattr__(self, "K", int(K)) + object.__setattr__(self, "N0", int(N0)) + object.__setattr__(self, "n", int(n)) + object.__setattr__(self, "u", float(u)) + object.__setattr__(self, "R", float(R)) + object.__setattr__(self, "T", int(T)) + object.__setattr__(self, "epochs", int(epochs)) + object.__setattr__(self, "p", float(p)) + object.__setattr__(self, "a_max", a_max) + + @property + def M(self) -> float: + return float(2.0 * self.K * self.u) + + def resolved_a_max(self) -> int: + if self.a_max is not None: + return self.a_max + return self.epochs + + def intrinsic_growth_rate(self) -> float: + return float(np.log(self.R / 2.0)) + + def total_generations(self) -> int: + return self.epochs * self.T + (self.T // 2) + + +@dataclass +class PopulationState: + """Diploid population state for one generation.""" + + genomes: np.ndarray + sexes: np.ndarray + + @property + def size(self) -> int: + return int(self.genomes.shape[0]) + + +@dataclass(frozen=True) +class GenerationSummary: + """Per-generation diagnostics for Track 1.""" + + t: int + N: int + female_fraction: float + male_count: int + female_count: int + fecundity: float + mean_fitness: float + mean_expected_female_productivity: float + target_value: float + mean_allele_value: float + mean_genotype_value: float + mean_tracking_gap: float + paper_M: float + expected_mutations_current_N: float + realized_mutation_count: int + realized_mutation_rate_per_allele: float + birth_count: int + surviving_offspring_count: int + ne_approx: float + extinct: bool + + +def initialize_population(params: Track1Parameters, rng: np.random.Generator) -> PopulationState: + genomes = np.zeros((params.N0, 2, params.n), dtype=np.int16) + sexes = rng.binomial(1, 1.0 - params.p, size=params.N0).astype(np.int8) + return PopulationState(genomes=genomes, sexes=sexes) + + +def female_fecundity(r: float, N: int, K: int) -> float: + return float(2.0 * np.exp(r * (1.0 - (N / K) ** (1.0 / r)))) + + +def genotype_fitness(genomes: np.ndarray, r: float, T: int, t: int) -> np.ndarray: + target = t / T + locus_means = 0.5 * (genomes[:, 0, :] + genomes[:, 1, :]) + squared_distance = np.square(locus_means - target, dtype=np.float64) + return np.exp(-(r / genomes.shape[2]) * np.sum(squared_distance, axis=1)) + + +def expected_female_productivity(fecundity: float, fitness: np.ndarray) -> np.ndarray: + return fecundity * fitness + + +def paper_mutation_supply_M(K: int, u: float) -> float: + return float(2.0 * K * u) + + +def expected_mutations_for_population(N: int, n: int, u: float) -> float: + return float(2.0 * N * n * u) + + +def allele_tracking_metrics(genomes: np.ndarray, T: int, t: int) -> tuple[float, float, float, float]: + target_value = float(t / T) + if genomes.size == 0: + return target_value, 0.0, 0.0, -target_value + mean_allele_value = float(np.mean(genomes)) + genotype_values = 0.5 * (genomes[:, 0, :] + genomes[:, 1, :]) + mean_genotype_value = float(np.mean(genotype_values)) + mean_tracking_gap = mean_genotype_value - target_value + return target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap + + +def approximate_ne(N: int, female_fraction: float) -> float: + male_fraction = 1.0 - female_fraction + denom = female_fraction + male_fraction + if N <= 0 or female_fraction <= 0.0 or male_fraction <= 0.0 or denom == 0.0: + return 0.0 + return float((2.0 * N * female_fraction * male_fraction) / denom) + + +def female_fraction(sexes: np.ndarray) -> float: + if sexes.size == 0: + return 0.0 + return float(np.sum(sexes == 0) / sexes.size) + + +def is_extinct(state: PopulationState) -> bool: + if state.size == 0: + return True + female_count = int(np.sum(state.sexes == 0)) + male_count = int(np.sum(state.sexes == 1)) + return female_count == 0 or male_count == 0 + + +def choose_gamete(parent: np.ndarray, rng: np.random.Generator) -> np.ndarray: + n = parent.shape[1] + picks = rng.integers(0, 2, size=n) + gamete = np.empty(n, dtype=parent.dtype) + gamete[:] = parent[picks, np.arange(n)] + return gamete + + +def choose_mutant_allele(current: int, a_max: int, rng: np.random.Generator) -> int: + if a_max <= 0: + return current + draw = int(rng.integers(0, a_max)) + return draw if draw < current else draw + 1 + + +def mutate_zygote(zygote: np.ndarray, params: Track1Parameters, rng: np.random.Generator) -> np.ndarray: + a_max = params.resolved_a_max() + out = zygote.copy() + for chrom in range(out.shape[0]): + for locus in range(out.shape[1]): + if rng.random() <= params.u: + out[chrom, locus] = choose_mutant_allele(int(out[chrom, locus]), a_max, rng) + return out + + +def produce_offspring( + mother: np.ndarray, + father: np.ndarray, + params: Track1Parameters, + rng: np.random.Generator, + locus_indices: np.ndarray, +) -> tuple[np.ndarray, int]: + n = params.n + offspring = np.empty((2, n), dtype=mother.dtype) + maternal_picks = rng.integers(0, 2, size=n) + paternal_picks = rng.integers(0, 2, size=n) + offspring[0, :] = mother[maternal_picks, locus_indices] + offspring[1, :] = father[paternal_picks, locus_indices] + a_max = params.resolved_a_max() + mutation_count = 0 + for chrom in range(2): + for locus in range(n): + if rng.random() <= params.u: + offspring[chrom, locus] = choose_mutant_allele(int(offspring[chrom, locus]), a_max, rng) + mutation_count += 1 + return offspring, mutation_count + + +def realize_birth_counts(fecundity: float, sexes: np.ndarray, rng: np.random.Generator) -> np.ndarray: + female_mask = sexes == 0 + counts = np.zeros(sexes.shape[0], dtype=np.int32) + counts[female_mask] = rng.poisson(fecundity, size=int(np.sum(female_mask))) + return counts + + +def summarize_generation(state: PopulationState, params: Track1Parameters, t: int) -> GenerationSummary: + if state.size == 0: + return GenerationSummary( + t=t, + N=0, + female_fraction=0.0, + male_count=0, + female_count=0, + fecundity=0.0, + mean_fitness=0.0, + mean_expected_female_productivity=0.0, + target_value=float(t / params.T), + mean_allele_value=0.0, + mean_genotype_value=0.0, + mean_tracking_gap=float(-(t / params.T)), + paper_M=params.M, + expected_mutations_current_N=0.0, + realized_mutation_count=0, + realized_mutation_rate_per_allele=0.0, + birth_count=0, + surviving_offspring_count=0, + ne_approx=0.0, + extinct=True, + ) + fit, fec, exp_fp, ff, female_count, male_count = generation_metrics(state, params, t) + mean_expected_fp = float(np.mean(exp_fp[state.sexes == 0])) if female_count > 0 else 0.0 + target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap = allele_tracking_metrics( + state.genomes, + T=params.T, + t=t, + ) + return GenerationSummary( + t=t, + N=state.size, + female_fraction=ff, + male_count=male_count, + female_count=female_count, + fecundity=fec, + mean_fitness=float(np.mean(fit)), + mean_expected_female_productivity=mean_expected_fp, + target_value=target_value, + mean_allele_value=mean_allele_value, + mean_genotype_value=mean_genotype_value, + mean_tracking_gap=mean_tracking_gap, + paper_M=params.M, + expected_mutations_current_N=expected_mutations_for_population(state.size, params.n, params.u), + realized_mutation_count=0, + realized_mutation_rate_per_allele=0.0, + birth_count=0, + surviving_offspring_count=0, + ne_approx=approximate_ne(state.size, ff), + extinct=is_extinct(state), + ) + + +def generation_metrics( + state: PopulationState, + params: Track1Parameters, + t: int, +) -> tuple[np.ndarray, float, np.ndarray, float, int, int]: + r = params.intrinsic_growth_rate() + fit = genotype_fitness(state.genomes, r=r, T=params.T, t=t) + fec = female_fecundity(r=r, N=state.size, K=params.K) + exp_fp = expected_female_productivity(fec, fit) + female_count = int(np.sum(state.sexes == 0)) + male_count = int(state.size - female_count) + ff = float(female_count / state.size) if state.size > 0 else 0.0 + return fit, fec, exp_fp, ff, female_count, male_count + + +def simulate_one_generation( + state: PopulationState, + params: Track1Parameters, + t: int, + rng: np.random.Generator, +) -> tuple[PopulationState, GenerationSummary]: + if state.size == 0: + summary = summarize_generation(state, params, t) + return state, summary + + fit, fec, exp_fp, ff, female_count, male_count = generation_metrics(state, params, t) + extinct = female_count == 0 or male_count == 0 + target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap = allele_tracking_metrics( + state.genomes, + T=params.T, + t=t, + ) + expected_mutations_current_N = expected_mutations_for_population(state.size, params.n, params.u) + summary = GenerationSummary( + t=t, + N=state.size, + female_fraction=ff, + male_count=male_count, + female_count=female_count, + fecundity=fec, + mean_fitness=float(np.mean(fit)), + mean_expected_female_productivity=float(np.mean(exp_fp[state.sexes == 0])) if female_count > 0 else 0.0, + target_value=target_value, + mean_allele_value=mean_allele_value, + mean_genotype_value=mean_genotype_value, + mean_tracking_gap=mean_tracking_gap, + paper_M=paper_mutation_supply_M(params.K, params.u), + expected_mutations_current_N=expected_mutations_current_N, + realized_mutation_count=0, + realized_mutation_rate_per_allele=0.0, + birth_count=0, + surviving_offspring_count=0, + ne_approx=approximate_ne(state.size, ff), + extinct=extinct, + ) + if extinct: + return state, summary + + birth_counts = realize_birth_counts(fec, state.sexes, rng) + + female_indices = np.flatnonzero(state.sexes == 0) + male_indices = np.flatnonzero(state.sexes == 1) + total_births = int(np.sum(birth_counts[female_indices])) + + new_genomes = np.zeros((total_births, 2, params.n), dtype=np.int16) + new_sexes = np.zeros(total_births, dtype=np.int8) + locus_indices = np.arange(params.n) + r = params.intrinsic_growth_rate() + offspring_t = t + 1 + + survivor_cursor = 0 + realized_mutation_count = 0 + for mother_index in female_indices: + count = int(birth_counts[mother_index]) + if count == 0: + continue + father_index = int(male_indices[rng.integers(0, male_indices.size)]) + for _ in range(count): + offspring, offspring_mutations = produce_offspring( + state.genomes[mother_index], + state.genomes[father_index], + params, + rng, + locus_indices, + ) + realized_mutation_count += offspring_mutations + offspring_fitness = float( + genotype_fitness(offspring[np.newaxis, :, :], r=r, T=params.T, t=offspring_t)[0] + ) + if rng.random() <= offspring_fitness: + new_genomes[survivor_cursor] = offspring + new_sexes[survivor_cursor] = int(rng.binomial(1, 1.0 - params.p)) + survivor_cursor += 1 + + allele_exposures = 2 * total_births * params.n + next_state = PopulationState(genomes=new_genomes[:survivor_cursor], sexes=new_sexes[:survivor_cursor]) + summary = GenerationSummary( + t=summary.t, + N=summary.N, + female_fraction=summary.female_fraction, + male_count=summary.male_count, + female_count=summary.female_count, + fecundity=summary.fecundity, + mean_fitness=summary.mean_fitness, + mean_expected_female_productivity=summary.mean_expected_female_productivity, + target_value=summary.target_value, + mean_allele_value=summary.mean_allele_value, + mean_genotype_value=summary.mean_genotype_value, + mean_tracking_gap=summary.mean_tracking_gap, + paper_M=summary.paper_M, + expected_mutations_current_N=summary.expected_mutations_current_N, + realized_mutation_count=realized_mutation_count, + realized_mutation_rate_per_allele=0.0 + if allele_exposures == 0 + else float(realized_mutation_count / allele_exposures), + birth_count=total_births, + surviving_offspring_count=survivor_cursor, + ne_approx=summary.ne_approx, + extinct=summary.extinct, + ) + return next_state, summary + + +def simulate_run( + params: Track1Parameters, + seed: Optional[int] = None, +) -> list[GenerationSummary]: + rng = np.random.default_rng(seed) + state = initialize_population(params, rng) + t = -(params.T // 2) + summaries: list[GenerationSummary] = [] + + for _ in range(params.total_generations()): + state, summary = simulate_one_generation(state, params, t, rng) + summaries.append(summary) + if is_extinct(state): + terminal_t = t if summary.extinct else t + 1 + terminal_summary = summarize_generation(state, params, terminal_t) + if summaries[-1] != terminal_summary: + summaries.append(terminal_summary) + break + t += 1 + + return summaries diff --git a/src/renunney/track1_threshold.py b/src/renunney/track1_threshold.py index 31d029d..7d1bc64 100644 --- a/src/renunney/track1_threshold.py +++ b/src/renunney/track1_threshold.py @@ -15,11 +15,7 @@ import json from pathlib import Path from typing import Iterable, Optional -from .legacy import ensure_legacy_python_path - -ensure_legacy_python_path() - -from track1_reference import Track1Parameters, simulate_run +from .track1_reference import Track1Parameters, simulate_run @dataclass(frozen=True) diff --git a/tests/test_track1_dynamics.py b/tests/test_track1_dynamics.py new file mode 100644 index 0000000..76a5737 --- /dev/null +++ b/tests/test_track1_dynamics.py @@ -0,0 +1,95 @@ +import sys +from pathlib import Path + +import numpy as np + +ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +import renunney.track1_reference as ref + + +def test_zero_mutation_preserves_zero_alleles_in_one_generation(): + params = ref.Track1Parameters(K=5000, N0=2, n=2, u=0.0, R=10.0, T=20) + state = ref.PopulationState( + genomes=np.zeros((2, 2, 2), dtype=np.int16), + sexes=np.array([0, 1], dtype=np.int8), + ) + next_state, summary = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(1)) + assert summary.mean_allele_value == 0.0 + assert int(next_state.genomes.sum()) == 0 + + +def test_zero_offspring_fitness_prevents_survival(monkeypatch): + params = ref.Track1Parameters(K=5000, N0=2, n=1, u=0.0, R=10.0, T=20) + state = ref.PopulationState( + genomes=np.zeros((2, 2, 1), dtype=np.int16), + sexes=np.array([0, 1], dtype=np.int8), + ) + + def fake_fitness(genomes, r, T, t): + if genomes.shape[0] == 1: + return np.zeros(1, dtype=float) + return np.ones(genomes.shape[0], dtype=float) + + monkeypatch.setattr(ref, "genotype_fitness", fake_fitness) + monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0) + monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0], dtype=np.int32)) + + next_state, _ = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(2)) + assert next_state.size == 0 + + +def test_unit_offspring_fitness_keeps_all_births(monkeypatch): + params = ref.Track1Parameters(K=5000, N0=2, n=1, u=0.0, R=10.0, T=20) + state = ref.PopulationState( + genomes=np.zeros((2, 2, 1), dtype=np.int16), + sexes=np.array([0, 1], dtype=np.int8), + ) + + monkeypatch.setattr(ref, "genotype_fitness", lambda genomes, r, T, t: np.ones(genomes.shape[0], dtype=float)) + monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0) + monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0], dtype=np.int32)) + + next_state, summary = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(2)) + assert summary.female_count == 1 + assert next_state.size == 3 + + +def test_one_father_is_used_per_female_reproductive_event(monkeypatch): + params = ref.Track1Parameters(K=5000, N0=3, n=2, u=0.0, R=10.0, T=20) + state = ref.PopulationState( + genomes=np.array( + [ + [[0, 0], [0, 0]], + [[1, 1], [1, 1]], + [[2, 2], [2, 2]], + ], + dtype=np.int16, + ), + sexes=np.array([0, 1, 1], dtype=np.int8), + ) + + monkeypatch.setattr(ref, "genotype_fitness", lambda genomes, r, T, t: np.ones(genomes.shape[0], dtype=float)) + monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0) + monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0, 0], dtype=np.int32)) + + next_state, _ = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(3)) + paternal_alleles = next_state.genomes[:, 1, :] + assert next_state.size == 3 + assert np.unique(paternal_alleles).size == 1 + + +def test_absence_of_males_or_females_counts_as_extinction(): + female_only = ref.PopulationState( + genomes=np.zeros((2, 2, 1), dtype=np.int16), + sexes=np.array([0, 0], dtype=np.int8), + ) + male_only = ref.PopulationState( + genomes=np.zeros((2, 2, 1), dtype=np.int16), + sexes=np.array([1, 1], dtype=np.int8), + ) + assert ref.is_extinct(female_only) is True + assert ref.is_extinct(male_only) is True diff --git a/tests/test_track1_reference.py b/tests/test_track1_reference.py new file mode 100644 index 0000000..3a7efba --- /dev/null +++ b/tests/test_track1_reference.py @@ -0,0 +1,59 @@ +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +import renunney.track1_reference as ref + + +def test_fecundity_matches_r_definition(): + params = ref.Track1Parameters(K=5000, N0=50, n=1, u=5e-6, R=10.0, T=40) + r = params.intrinsic_growth_rate() + assert ref.female_fecundity(r, N=0, K=params.K) == pytest_approx(10.0) + + +def test_initialize_population_shapes_and_allele_zero_fixation(): + params = ref.Track1Parameters(K=5000, N0=12, n=3, u=5e-6, R=10.0, T=40) + state = ref.initialize_population(params, ref.np.random.default_rng(1)) + assert state.genomes.shape == (12, 2, 3) + assert state.sexes.shape == (12,) + assert int(state.genomes.sum()) == 0 + + +def test_simulate_run_returns_generation_summaries(): + params = ref.Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20) + out = ref.simulate_run(params, seed=2) + assert out + assert out[0].N == 20 + assert out[0].t == -(params.T // 2) + + +def test_generation_summary_reports_tracking_fields_for_initial_population(): + params = ref.Track1Parameters(K=5000, N0=20, n=2, u=5e-6, R=10.0, T=20) + out = ref.simulate_run(params, seed=2) + first = out[0] + assert first.mean_allele_value == pytest_approx(0.0) + assert first.mean_genotype_value == pytest_approx(0.0) + assert first.target_value == pytest_approx(-0.5) + assert first.mean_tracking_gap == pytest_approx(0.5) + assert first.paper_M == pytest_approx(0.05) + assert first.expected_mutations_current_N == pytest_approx(0.0004) + + +def test_generation_summary_reports_realized_mutations_for_high_u(): + params = ref.Track1Parameters(K=5000, N0=20, n=1, u=0.5, R=10.0, T=20) + out = ref.simulate_run(params, seed=2) + first = out[0] + assert first.realized_mutation_count >= 0 + assert 0.0 <= first.realized_mutation_rate_per_allele <= 1.0 + + +def pytest_approx(value: float, tol: float = 1e-9): + class Approx: + def __eq__(self, other): + return abs(other - value) <= tol + + return Approx()