Compare commits

..

10 Commits

31 changed files with 3610 additions and 41 deletions

View File

@ -1,27 +1,29 @@
PYTHON := python3
REPO_ROOT := $(abspath .)
LEGACY_ROOT := $(REPO_ROOT)/../collaborations/to_ptbc/evc/cost_of_substitution
ORCH := $(REPO_ROOT)/scripts/run_orchestration.py
TRACK1 := $(REPO_ROOT)/scripts/run_track1.py
DB := $(REPO_ROOT)/runs/state/cos-orch.sqlite
RESULT_ROOT := $(REPO_ROOT)/runs/results
SCRATCH_ROOT := $(REPO_ROOT)/runs/scratch
MPLCONFIGDIR := $(SCRATCH_ROOT)/matplotlib
FIG1_M005 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_0_05.json
FIG1_M025 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_0_25.json
FIG1_M05 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_0_5.json
FIG1_M10 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_1_0.json
FIG1_M100 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_10_0.json
FIG1_M005 := $(REPO_ROOT)/config/track1_figure1_paper_M_0_05.json
FIG1_M025 := $(REPO_ROOT)/config/track1_figure1_paper_M_0_25.json
FIG1_M05 := $(REPO_ROOT)/config/track1_figure1_paper_M_0_5.json
FIG1_M10 := $(REPO_ROOT)/config/track1_figure1_paper_M_1_0.json
FIG1_M100 := $(REPO_ROOT)/config/track1_figure1_paper_M_10_0.json
.PHONY: help init doctor list-jobs run-one run-loop run-loop-one collate-figure1 \
.PHONY: help init doctor list-jobs run-one run-loop run-loop-one collate-figure1 track1-sim-smoke \
submit-figure1-m005 submit-figure1-m025 submit-figure1-m05 submit-figure1-m10 submit-figure1-m100 \
submit-all-figure1 status results-tree
help:
@echo "Targets:"
@echo " init Create run directories and initialize the SQLite registry"
@echo " doctor Show key paths and verify local orchestration and legacy backend paths"
@echo " doctor Show key paths and verify local orchestration and Track 1 paths"
@echo " list-jobs List jobs in the local registry"
@echo " track1-sim-smoke Run one local Track 1 simulation through renunney runner"
@echo " run-one Claim and run one queued job"
@echo " run-loop Run worker loop until queue empty"
@echo " run-loop-one Run exactly one queued job through the worker loop"
@ -36,33 +38,42 @@ help:
@echo " results-tree List the current result files"
init:
mkdir -p $(REPO_ROOT)/runs/state $(REPO_ROOT)/runs/results $(REPO_ROOT)/runs/scratch
mkdir -p $(REPO_ROOT)/runs/state $(REPO_ROOT)/runs/results $(REPO_ROOT)/runs/scratch $(MPLCONFIGDIR)
$(PYTHON) $(ORCH) init-db --db $(DB)
doctor:
@echo "REPO_ROOT=$(REPO_ROOT)"
@echo "LEGACY_ROOT=$(LEGACY_ROOT)"
@echo "ORCH=$(ORCH)"
@echo "TRACK1=$(TRACK1)"
@echo "DB=$(DB)"
@echo "RESULT_ROOT=$(RESULT_ROOT)"
@echo "SCRATCH_ROOT=$(SCRATCH_ROOT)"
@echo "MPLCONFIGDIR=$(MPLCONFIGDIR)"
test -f $(ORCH)
test -d $(LEGACY_ROOT)/python
test -f $(TRACK1)
list-jobs:
$(PYTHON) $(ORCH) list --db $(DB)
track1-sim-smoke:
mkdir -p $(MPLCONFIGDIR)
MPLCONFIGDIR=$(MPLCONFIGDIR) $(PYTHON) $(TRACK1) --mode simulate --K 5000 --N0 50 --n 1 --u 5e-6 --R 10 --T 40 --epochs 8 --seed 1
run-one:
$(PYTHON) $(ORCH) run-one --db $(DB) --result-root $(RESULT_ROOT) --scratch-root $(SCRATCH_ROOT)
mkdir -p $(MPLCONFIGDIR)
MPLCONFIGDIR=$(MPLCONFIGDIR) $(PYTHON) $(ORCH) run-one --db $(DB) --result-root $(RESULT_ROOT) --scratch-root $(SCRATCH_ROOT)
run-loop:
$(PYTHON) $(ORCH) run-loop --db $(DB) --result-root $(RESULT_ROOT) --scratch-root $(SCRATCH_ROOT)
mkdir -p $(MPLCONFIGDIR)
MPLCONFIGDIR=$(MPLCONFIGDIR) $(PYTHON) $(ORCH) run-loop --db $(DB) --result-root $(RESULT_ROOT) --scratch-root $(SCRATCH_ROOT)
run-loop-one:
$(PYTHON) $(ORCH) run-loop --db $(DB) --result-root $(RESULT_ROOT) --scratch-root $(SCRATCH_ROOT) --max-jobs 1
mkdir -p $(MPLCONFIGDIR)
MPLCONFIGDIR=$(MPLCONFIGDIR) $(PYTHON) $(ORCH) run-loop --db $(DB) --result-root $(RESULT_ROOT) --scratch-root $(SCRATCH_ROOT) --max-jobs 1
collate-figure1:
$(PYTHON) $(ORCH) collate-figure1 --db $(DB) --output $(RESULT_ROOT)/figure1-collated.json
mkdir -p $(MPLCONFIGDIR)
MPLCONFIGDIR=$(MPLCONFIGDIR) $(PYTHON) $(ORCH) collate-figure1 --db $(DB) --output $(RESULT_ROOT)/figure1-collated.json
submit-figure1-m005:
$(PYTHON) $(ORCH) submit-figure1 --db $(DB) --config $(FIG1_M005) --job-prefix fig1-m005 --created-by make

View File

@ -8,18 +8,27 @@ Clean working repository for:
## Current Scope
This repository is the clean operational wrapper around the current work in:
This repository was bootstrapped from earlier work in:
- [`../collaborations/to_ptbc/evc/cost_of_substitution`](/mnt/CIFS/pengolodh/Docs/Projects/collaborations/to_ptbc/evc/cost_of_substitution)
The Track 1 simulation backend still lives there. The orchestration control
plane is now local to `renunney`.
That earlier tree remains useful as provenance and historical context. The
Track 1 runtime and orchestration stack now live in `renunney`.
`renunney` provides:
- a clean git repo,
- a stable working directory layout,
- a local orchestration CLI and library,
- local paper-scale Figure 1 submission configs,
- a local Track 1 runner and config/API layer,
- a local Track 1 analysis layer for tracking summaries and loci-regression,
- a local Track 1 threshold/search layer for Nunney-style threshold checks,
- a local Track 1 simulation kernel,
- a local Track 1 report generator,
- a local Track 1 extinction-model data layer,
- a local Track 1 dataset generator,
- a local Track 1 fit layer,
- a Makefile for common tasks,
- migration notes for pulling code into this repo in stages.
@ -37,6 +46,10 @@ plane is now local to `renunney`.
- local worker scratch and cache files
- `src/renunney/`
- future in-repo Python package and migration target
- `scripts/`
- local CLI entrypoints
- `tests/`
- local verification for migrated boundaries
## Start
@ -52,6 +65,12 @@ Submit a paper-scale Figure 1 treatment:
make submit-figure1-m10
```
Run one local Track 1 simulation through the migrated runner/API boundary:
```bash
make track1-sim-smoke
```
Run one worker loop locally:
```bash
@ -69,8 +88,14 @@ make collate-figure1
The current state is split:
- orchestration control plane: local to `renunney`
- Track 1 simulation backend: still in the older `cost_of_substitution`
directory
- Track 1 runner and config/API layer: local to `renunney`
- Track 1 analysis layer: local to `renunney`
- Track 1 threshold/search layer: local to `renunney`
- Track 1 simulation kernel: local to `renunney`
- Track 1 report generator: local to `renunney`
- Track 1 extinction-model data layer: local to `renunney`
- Track 1 dataset generator: local to `renunney`
- Track 1 fit layer: local to `renunney`
This repo is now the clean operational entry point while the simulation code is
migrated in later stages.
This repo is now the clean operational entry point for the Track 1 runtime and
its orchestration stack.

View File

@ -6,5 +6,6 @@ Configuration templates and future environment-specific settings for:
- result roots
- scratch roots
- host-specific worker defaults
- paper-scale Figure 1 submission configs
This directory is intentionally small for now.

View File

@ -0,0 +1,17 @@
{
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 5e-06,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500],
"loci_values": [1, 2, 3, 4, 5, 6, 7],
"seed": 10,
"cache_path": "/tmp/track1-figure1-paper-m005-cache.json"
}

View File

@ -0,0 +1,17 @@
{
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 2.5e-05,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500],
"loci_values": [1, 2, 3, 4, 5, 6, 7],
"seed": 10,
"cache_path": "/tmp/track1-figure1-paper-m025-cache.json"
}

View File

@ -0,0 +1,17 @@
{
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 5e-05,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500],
"loci_values": [1, 2, 3, 4, 5, 6, 7],
"seed": 10,
"cache_path": "/tmp/track1-figure1-paper-m05-cache.json"
}

View File

@ -0,0 +1,17 @@
{
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 0.001,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500],
"loci_values": [1, 2, 3, 4, 5, 6, 7],
"seed": 10,
"cache_path": "/tmp/track1-figure1-paper-m100-cache.json"
}

View File

@ -0,0 +1,17 @@
{
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 0.0001,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500],
"loci_values": [1, 2, 3, 4, 5, 6, 7],
"seed": 10,
"cache_path": "/tmp/track1-figure1-paper-m10-cache.json"
}

View File

@ -23,15 +23,27 @@ Operational code still lives in:
1. Orchestration control plane has been migrated locally:
- `src/renunney/orchestration.py`
- `scripts/run_orchestration.py`
2. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable.
3. Migrate Track 1 runner and API next:
- `python/run_track1.py`
- `python/track1_api.py`
4. Migrate the Track 1 simulation core after the runner path is stable:
- `python/track1_reference.py`
- `python/track1_threshold.py`
- `python/track1_analysis.py`
5. Migrate docs and example configs last, after path references are updated.
2. Paper-scale Figure 1 submission configs have been copied locally into `config/`.
3. Track 1 runner and API boundary have been migrated locally:
- `scripts/run_track1.py`
- `src/renunney/track1_api.py`
4. Track 1 analysis boundary has been migrated locally:
- `src/renunney/track1_analysis.py`
5. Track 1 threshold/search boundary has been migrated locally:
- `src/renunney/track1_threshold.py`
6. Track 1 simulation kernel has been migrated locally:
- `src/renunney/track1_reference.py`
7. Track 1 report layer has been migrated locally:
- `src/renunney/track1_report.py`
8. Track 1 extinction-model data layer has been migrated locally:
- `src/renunney/track1_extinction.py`
9. Track 1 dataset generator has been migrated locally:
- `src/renunney/track1_dataset.py`
10. Track 1 fit layer has been migrated locally:
- `src/renunney/track1_fit.py`
11. Track 1 runtime path is now fully local to `renunney`.
12. Reduce or remove any remaining compatibility-layer imports outside the Track 1 runtime path.
13. Migrate docs and example configs last, after path references are updated.
## Constraint

View File

@ -14,6 +14,12 @@ Check paths:
make doctor
```
Run one local Track 1 simulation through the in-repo runner:
```bash
make track1-sim-smoke
```
Submit a Figure 1 treatment:
```bash
@ -41,5 +47,8 @@ make status
## Current Assumption
The Makefile now drives the local orchestration code in `renunney`, while the
simulation backend is still imported from the legacy `cost_of_substitution`
directory.
Track 1 runner/API boundary, analysis layer, threshold/search layer, and
simulation kernel, report generator, and extinction-model data layer are also
local to `renunney`, and the dataset generator and fit layer are now local as
well. The paper-scale Figure 1 configs used for submission are now local to
`renunney/config`.

105
scripts/run_track1.py Normal file
View File

@ -0,0 +1,105 @@
"""
run_track1.py
Local Track 1 runner entrypoint for renunney.
"""
from __future__ import annotations
import argparse
from pathlib import Path
import sys
REPO_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = REPO_ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
from renunney.track1_api import Track1RunConfig, load_config, run_config, save_payload
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run Track 1 cost-of-substitution experiments.")
parser.add_argument("--mode", choices=["simulate", "report", "threshold", "search", "loci_regression", "extinction_dataset", "extinction_fit"], default="simulate")
parser.add_argument("--config", type=str, default=None)
parser.add_argument("--K", type=int, default=5000)
parser.add_argument("--N0", type=int, default=500)
parser.add_argument("--n", type=int, default=1)
parser.add_argument("--u", type=float, default=5.0e-6)
parser.add_argument("--M", type=float, default=None)
parser.add_argument("--R", type=float, default=10.0)
parser.add_argument("--T", type=int, default=300)
parser.add_argument("--epochs", type=int, default=8)
parser.add_argument("--p", type=float, default=0.5)
parser.add_argument("--a-max", dest="a_max", type=int, default=None)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--runs", type=int, default=20)
parser.add_argument("--t-start", type=int, default=50)
parser.add_argument("--t-stop", type=int, default=500)
parser.add_argument("--t-step", type=int, default=10)
parser.add_argument("--t-values", type=str, default=None)
parser.add_argument("--loci-values", type=str, default=None)
parser.add_argument("--cache-path", type=str, default=None)
parser.add_argument("--jobs", type=int, default=1)
parser.add_argument("--report-dir", type=str, default=None)
parser.add_argument("--dataset-dir", type=str, default=None)
parser.add_argument("--run-rows-path", type=str, default=None)
parser.add_argument("--output", type=str, default=None)
return parser
def config_from_args(args: argparse.Namespace) -> Track1RunConfig:
loci_values = None
if args.loci_values:
loci_values = [int(part.strip()) for part in args.loci_values.split(",") if part.strip()]
t_values = None
if args.t_values:
t_values = [float(part.strip()) for part in args.t_values.split(",") if part.strip()]
return Track1RunConfig(
mode=args.mode,
K=args.K,
N0=args.N0,
n=args.n,
u=args.u if args.M is None else None,
M=args.M,
R=args.R,
T=args.T,
epochs=args.epochs,
p=args.p,
a_max=args.a_max,
seed=args.seed,
runs=args.runs,
t_start=args.t_start,
t_stop=args.t_stop,
t_step=args.t_step,
t_values=t_values,
loci_values=loci_values,
cache_path=args.cache_path,
jobs=args.jobs,
report_dir=args.report_dir,
dataset_dir=args.dataset_dir,
run_rows_path=args.run_rows_path,
)
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.config:
config = load_config(args.config)
else:
config = config_from_args(args)
payload = run_config(config)
import json
rendered = json.dumps(payload, indent=2, sort_keys=True)
if args.output:
save_payload(payload, Path(args.output))
print(rendered)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -18,6 +18,80 @@ from .orchestration import (
submit_job_manifest,
submit_track1_figure1_jobs,
)
from .track1_analysis import (
LinearCostFit,
LocusThresholdRow,
NumberOfLociSweep,
TrackingSummary,
fit_linear_cost_by_loci,
summarize_tracking,
sweep_number_of_loci,
)
from .track1_api import Track1RunConfig, config_from_mapping, load_config, run_config, save_payload
from .track1_dataset import GRID_KEYS, generate_extinction_dataset
from .track1_extinction import (
ExtinctionGenerationRow,
ExtinctionRunRow,
build_extinction_generation_rows,
build_extinction_run_row,
save_jsonl,
)
from .track1_fit import (
DEFAULT_RUN_FEATURES,
ExtinctionClassBalance,
ExtinctionFitSummary,
ExtinctionLogitModel,
class_balance,
design_matrix_from_run_rows,
fit_extinction_run_model,
fit_extinction_run_model_from_jsonl,
fit_logistic_regression,
fit_payload_from_jsonl,
load_jsonl,
predict_probabilities,
standardize_design_matrix,
summarize_fit,
)
from .track1_reference import (
GenerationSummary,
PopulationState,
Track1Parameters,
allele_tracking_metrics,
approximate_ne,
expected_female_productivity,
expected_mutations_for_population,
female_fecundity,
female_fraction,
generation_metrics,
genotype_fitness,
initialize_population,
is_extinct,
paper_mutation_supply_M,
realize_birth_counts,
simulate_one_generation,
simulate_run,
summarize_generation,
)
from .track1_report import (
SeriesCI,
aggregate_derived_series,
aggregate_series,
confidence_interval,
generate_report_bundle,
plot_mean_allele_vs_target,
plot_series,
plot_series_with_reference,
render_markdown_report,
)
from .track1_threshold import (
ThresholdCheck,
ThresholdSearchResult,
evaluate_threshold_candidate,
nunney_threshold_accepts,
published_threshold_accepts,
run_extinction_check,
search_threshold_over_candidates,
)
__all__ = [
"ClaimedJob",
@ -34,9 +108,76 @@ __all__ = [
"list_job_results",
"list_jobs",
"load_job_manifest",
"LinearCostFit",
"LocusThresholdRow",
"NumberOfLociSweep",
"GenerationSummary",
"PopulationState",
"repo_root",
"run_one_job",
"run_worker_loop",
"SeriesCI",
"submit_job_manifest",
"submit_track1_figure1_jobs",
"ThresholdCheck",
"ThresholdSearchResult",
"TrackingSummary",
"Track1Parameters",
"Track1RunConfig",
"allele_tracking_metrics",
"approximate_ne",
"class_balance",
"config_from_mapping",
"DEFAULT_RUN_FEATURES",
"design_matrix_from_run_rows",
"expected_female_productivity",
"expected_mutations_for_population",
"evaluate_threshold_candidate",
"generate_extinction_dataset",
"ExtinctionClassBalance",
"ExtinctionFitSummary",
"ExtinctionGenerationRow",
"ExtinctionLogitModel",
"ExtinctionRunRow",
"female_fecundity",
"female_fraction",
"fit_extinction_run_model",
"fit_extinction_run_model_from_jsonl",
"fit_linear_cost_by_loci",
"fit_logistic_regression",
"fit_payload_from_jsonl",
"generation_metrics",
"genotype_fitness",
"generate_report_bundle",
"GRID_KEYS",
"initialize_population",
"is_extinct",
"build_extinction_generation_rows",
"build_extinction_run_row",
"load_config",
"load_jsonl",
"nunney_threshold_accepts",
"paper_mutation_supply_M",
"published_threshold_accepts",
"predict_probabilities",
"plot_mean_allele_vs_target",
"plot_series",
"plot_series_with_reference",
"realize_birth_counts",
"render_markdown_report",
"run_config",
"simulate_one_generation",
"simulate_run",
"run_extinction_check",
"save_payload",
"save_jsonl",
"search_threshold_over_candidates",
"standardize_design_matrix",
"summarize_tracking",
"summarize_generation",
"summarize_fit",
"sweep_number_of_loci",
"aggregate_derived_series",
"aggregate_series",
"confidence_interval",
]

View File

@ -17,12 +17,8 @@ from hashlib import sha256
from pathlib import Path
from typing import Any, Optional
from .legacy import ensure_legacy_python_path
ensure_legacy_python_path()
from track1_analysis import LocusThresholdRow, fit_linear_cost_by_loci
from track1_api import config_from_mapping, run_config, save_payload
from .track1_analysis import LocusThresholdRow, fit_linear_cost_by_loci
from .track1_api import config_from_mapping, run_config, save_payload
def utc_now_iso() -> str:
@ -43,7 +39,7 @@ def default_worker_host() -> str:
def code_identity(cwd: str | Path | None = None) -> dict[str, Any]:
identity: dict[str, Any] = {
"runner": "renunney orchestration + legacy Track 1 backend",
"runner": "renunney orchestration + local Track 1 backend",
"python_version": platform.python_version(),
"git_commit": None,
}
@ -280,7 +276,7 @@ def expand_track1_figure1_manifest(
manifests.append(
{
"job_id": job_id,
"project": "cost_of_substitution",
"project": "renunney",
"track": "track1",
"job_kind": "track1_locus_threshold",
"priority": int(priority),

View File

@ -0,0 +1,163 @@
"""
track1_analysis.py
Local Track 1 analysis helpers for renunney.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Optional
import numpy as np
from .track1_reference import GenerationSummary, Track1Parameters
from .track1_threshold import ThresholdSearchResult, search_threshold_over_candidates
@dataclass(frozen=True)
class LocusThresholdRow:
"""Threshold result for one number-of-loci setting."""
n: int
threshold_T: Optional[float]
accepted: bool
@dataclass(frozen=True)
class LinearCostFit:
"""Linear regression C = C0 + n*C1."""
intercept_c0: float
slope_c1: float
r_squared: float
points_used: int
@dataclass(frozen=True)
class NumberOfLociSweep:
"""Combined threshold rows and fitted regression."""
rows: list[LocusThresholdRow]
fit: Optional[LinearCostFit]
@dataclass(frozen=True)
class TrackingSummary:
"""Condensed allele-tracking diagnostics for one simulation run."""
extinction_occurred: bool
first_extinction_t: Optional[int]
first_nonzero_allele_t: Optional[int]
last_nonzero_allele_t: Optional[int]
stayed_zero_after_initialization: bool
max_abs_tracking_gap: float
final_tracking_gap: float
mean_abs_tracking_gap: float
final_mean_allele_value: float
final_target_value: float
def summarize_tracking(
summaries: Iterable[GenerationSummary],
zero_tol: float = 1.0e-15,
) -> TrackingSummary:
summary_list = list(summaries)
if not summary_list:
return TrackingSummary(
extinction_occurred=False,
first_extinction_t=None,
first_nonzero_allele_t=None,
last_nonzero_allele_t=None,
stayed_zero_after_initialization=True,
max_abs_tracking_gap=0.0,
final_tracking_gap=0.0,
mean_abs_tracking_gap=0.0,
final_mean_allele_value=0.0,
final_target_value=0.0,
)
nonzero = [summary for summary in summary_list if abs(summary.mean_allele_value) > zero_tol]
extinct_rows = [summary for summary in summary_list if summary.extinct]
first_nonzero_t = None if not nonzero else int(nonzero[0].t)
last_nonzero_t = None if not nonzero else int(nonzero[-1].t)
first_extinction_t = None if not extinct_rows else int(extinct_rows[0].t)
post_init = summary_list[1:]
stayed_zero_after_initialization = all(
abs(summary.mean_allele_value) <= zero_tol for summary in post_init
)
abs_gaps = np.array([abs(summary.mean_tracking_gap) for summary in summary_list], dtype=float)
final = summary_list[-1]
return TrackingSummary(
extinction_occurred=bool(extinct_rows),
first_extinction_t=first_extinction_t,
first_nonzero_allele_t=first_nonzero_t,
last_nonzero_allele_t=last_nonzero_t,
stayed_zero_after_initialization=stayed_zero_after_initialization,
max_abs_tracking_gap=float(np.max(abs_gaps)),
final_tracking_gap=float(final.mean_tracking_gap),
mean_abs_tracking_gap=float(np.mean(abs_gaps)),
final_mean_allele_value=float(final.mean_allele_value),
final_target_value=float(final.target_value),
)
def fit_linear_cost_by_loci(rows: Iterable[LocusThresholdRow]) -> Optional[LinearCostFit]:
usable = [row for row in rows if row.accepted and row.threshold_T is not None]
if len(usable) < 2:
return None
x = np.array([row.n for row in usable], dtype=float)
y = np.array([row.threshold_T for row in usable], dtype=float)
slope, intercept = np.polyfit(x, y, 1)
yhat = intercept + slope * x
ss_res = float(np.sum((y - yhat) ** 2))
ss_tot = float(np.sum((y - np.mean(y)) ** 2))
r_squared = 1.0 if ss_tot == 0.0 else 1.0 - (ss_res / ss_tot)
return LinearCostFit(
intercept_c0=float(intercept),
slope_c1=float(slope),
r_squared=float(r_squared),
points_used=len(usable),
)
def sweep_number_of_loci(
params: Track1Parameters,
loci_values: Iterable[int],
candidate_T_values: Iterable[float],
runs: int = 20,
seed_start: int = 0,
cache_path: str | None = None,
jobs: int = 1,
) -> NumberOfLociSweep:
rows: list[LocusThresholdRow] = []
candidate_list = list(candidate_T_values)
for index, n_value in enumerate(loci_values):
run_params = Track1Parameters(
K=params.K,
N0=params.N0,
n=n_value,
u=params.u,
R=params.R,
T=params.T,
epochs=params.epochs,
p=params.p,
a_max=params.a_max,
)
result: Optional[ThresholdSearchResult] = search_threshold_over_candidates(
params=run_params,
candidate_T_values=candidate_list,
runs=runs,
seed_start=seed_start + (index * 100000),
cache_path=cache_path,
jobs=jobs,
)
rows.append(
LocusThresholdRow(
n=n_value,
threshold_T=None if result is None else float(result.threshold_T),
accepted=result is not None,
)
)
return NumberOfLociSweep(rows=rows, fit=fit_linear_cost_by_loci(rows))

282
src/renunney/track1_api.py Normal file
View File

@ -0,0 +1,282 @@
"""
track1_api.py
Local Track 1 API boundary for renunney.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Optional
from .track1_analysis import summarize_tracking, sweep_number_of_loci
from .track1_dataset import generate_extinction_dataset
from .track1_fit import class_balance, fit_payload_from_jsonl, load_jsonl
from .track1_reference import Track1Parameters, simulate_run
from .track1_report import generate_report_bundle
from .track1_threshold import evaluate_threshold_candidate, search_threshold_over_candidates
@dataclass(frozen=True, init=False)
class Track1RunConfig:
mode: str = "simulate"
K: int = 5000
N0: int = 500
n: int = 1
u: float = 5.0e-6
R: float = 10.0
T: int = 300
epochs: int = 8
p: float = 0.5
a_max: Optional[int] = None
seed: int = 0
runs: int = 20
t_start: int = 50
t_stop: int = 500
t_step: int = 10
t_values: Optional[list[float]] = None
loci_values: Optional[list[int]] = None
cache_path: Optional[str] = None
jobs: int = 1
report_dir: Optional[str] = None
dataset_dir: Optional[str] = None
run_rows_path: Optional[str] = None
grid: Optional[dict[str, list[Any]]] = None
def __init__(
self,
mode: str = "simulate",
K: int = 5000,
N0: int = 500,
n: int = 1,
u: float | None = 5.0e-6,
R: float = 10.0,
T: int = 300,
epochs: int = 8,
p: float = 0.5,
a_max: Optional[int] = None,
seed: int = 0,
runs: int = 20,
t_start: int = 50,
t_stop: int = 500,
t_step: int = 10,
t_values: Optional[list[float]] = None,
loci_values: Optional[list[int]] = None,
cache_path: Optional[str] = None,
jobs: int = 1,
report_dir: Optional[str] = None,
dataset_dir: Optional[str] = None,
run_rows_path: Optional[str] = None,
grid: Optional[dict[str, list[Any]]] = None,
M: float | None = None,
) -> None:
if u is None:
if M is None:
raise ValueError("Track1RunConfig requires u, or convenience M to derive u.")
u = float(M / (2.0 * K))
object.__setattr__(self, "mode", mode)
object.__setattr__(self, "K", int(K))
object.__setattr__(self, "N0", int(N0))
object.__setattr__(self, "n", int(n))
object.__setattr__(self, "u", float(u))
object.__setattr__(self, "R", float(R))
object.__setattr__(self, "T", int(T))
object.__setattr__(self, "epochs", int(epochs))
object.__setattr__(self, "p", float(p))
object.__setattr__(self, "a_max", a_max)
object.__setattr__(self, "seed", int(seed))
object.__setattr__(self, "runs", int(runs))
object.__setattr__(self, "t_start", int(t_start))
object.__setattr__(self, "t_stop", int(t_stop))
object.__setattr__(self, "t_step", int(t_step))
object.__setattr__(self, "t_values", t_values)
object.__setattr__(self, "loci_values", loci_values)
object.__setattr__(self, "cache_path", cache_path)
object.__setattr__(self, "jobs", int(jobs))
object.__setattr__(self, "report_dir", report_dir)
object.__setattr__(self, "dataset_dir", dataset_dir)
object.__setattr__(self, "run_rows_path", run_rows_path)
object.__setattr__(self, "grid", grid)
@property
def M(self) -> float:
return float(2.0 * self.K * self.u)
def to_parameters(self) -> Track1Parameters:
return Track1Parameters(
K=self.K,
N0=self.N0,
n=self.n,
u=self.u,
R=self.R,
T=self.T,
epochs=self.epochs,
p=self.p,
a_max=self.a_max,
)
def parameter_payload(params: Track1Parameters) -> dict[str, Any]:
payload = asdict(params)
payload["M"] = params.M
return payload
def config_from_mapping(mapping: dict[str, Any]) -> Track1RunConfig:
allowed = set(Track1RunConfig.__dataclass_fields__.keys())
filtered = {key: value for key, value in mapping.items() if key in allowed}
if "M" in mapping and "u" not in filtered:
filtered["M"] = mapping["M"]
return Track1RunConfig(**filtered)
def load_config(path: str | Path) -> Track1RunConfig:
raw = json.loads(Path(path).read_text(encoding="utf-8"))
return config_from_mapping(raw)
def save_payload(payload: dict[str, Any], path: str | Path) -> None:
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def run_config(config: Track1RunConfig) -> dict[str, Any]:
params = config.to_parameters()
candidate_values = (
[float(value) for value in config.t_values]
if config.t_values is not None
else list(range(config.t_start, config.t_stop + 1, config.t_step))
)
if config.mode == "simulate":
summaries = simulate_run(params, seed=config.seed)
return {
"mode": "simulate",
"parameters": parameter_payload(params),
"generations_recorded": len(summaries),
"extinct": bool(summaries[-1].extinct) if summaries else True,
"tracking_summary": asdict(summarize_tracking(summaries)),
"final_summary": asdict(summaries[-1]) if summaries else None,
}
if config.mode == "report":
report_dir = config.report_dir or "/tmp/track1-report"
return generate_report_bundle(
params=params,
runs=config.runs,
seed_start=config.seed,
report_dir=report_dir,
)
if config.mode == "extinction_dataset":
dataset_dir = config.dataset_dir or "/tmp/track1-extinction-dataset"
metadata = generate_extinction_dataset(
params=params,
runs=config.runs,
seed_start=config.seed,
dataset_dir=dataset_dir,
grid=config.grid,
)
return {
"mode": "extinction_dataset",
"parameters": parameter_payload(params),
**metadata,
}
if config.mode == "extinction_fit":
run_rows_path = config.run_rows_path
if run_rows_path is None:
dataset_dir = config.dataset_dir or "/tmp/track1-extinction-dataset"
run_rows_path = str(Path(dataset_dir) / "run_rows.jsonl")
try:
payload = fit_payload_from_jsonl(run_rows_path)
fit_status = "ok"
fit_error = None
except ValueError as exc:
rows = load_jsonl(run_rows_path)
payload = {
"run_rows_path": str(Path(run_rows_path)),
"model": None,
"summary": asdict(class_balance(rows)),
}
fit_status = "insufficient_outcome_variation"
fit_error = str(exc)
return {
"mode": "extinction_fit",
"parameters": parameter_payload(params),
"fit_status": fit_status,
"fit_error": fit_error,
**payload,
}
if config.mode == "threshold":
result = evaluate_threshold_candidate(
params=params,
T_value=float(config.T),
runs=config.runs,
seed_start=config.seed,
cache_path=config.cache_path,
jobs=config.jobs,
)
return {
"mode": "threshold",
"parameters": parameter_payload(params),
"result": {
"threshold_T": result.threshold_T,
"baseline_check": asdict(result.baseline_check),
"check_1_02": asdict(result.check_1_02),
"check_1_05": asdict(result.check_1_05),
"check_1_10": asdict(result.check_1_10),
"retest_check": asdict(result.retest_check) if result.retest_check else None,
},
}
if config.mode == "search":
result = search_threshold_over_candidates(
params=params,
candidate_T_values=candidate_values,
runs=config.runs,
seed_start=config.seed,
cache_path=config.cache_path,
jobs=config.jobs,
)
return {
"mode": "search",
"parameters": parameter_payload(params),
"candidates": candidate_values,
"result": None
if result is None
else {
"threshold_T": result.threshold_T,
"baseline_check": asdict(result.baseline_check),
"check_1_02": asdict(result.check_1_02),
"check_1_05": asdict(result.check_1_05),
"check_1_10": asdict(result.check_1_10),
"retest_check": asdict(result.retest_check) if result.retest_check else None,
},
}
if config.mode == "loci_regression":
loci_values = config.loci_values if config.loci_values is not None else [1, 2, 3, 4, 5, 6, 7]
sweep = sweep_number_of_loci(
params=params,
loci_values=loci_values,
candidate_T_values=candidate_values,
runs=config.runs,
seed_start=config.seed,
cache_path=config.cache_path,
jobs=config.jobs,
)
return {
"mode": "loci_regression",
"parameters": parameter_payload(params),
"loci_values": loci_values,
"candidates": candidate_values,
"rows": [asdict(row) for row in sweep.rows],
"fit": None if sweep.fit is None else asdict(sweep.fit),
}
raise ValueError(f"Unsupported Track 1 mode: {config.mode}")

View File

@ -0,0 +1,107 @@
"""
track1_dataset.py
Dataset generation for extinction-risk analysis on top of Track 1 simulations.
"""
from __future__ import annotations
from dataclasses import asdict
import itertools
import json
from pathlib import Path
from typing import Any
from .track1_extinction import build_extinction_generation_rows, build_extinction_run_row, save_jsonl
from .track1_reference import Track1Parameters, simulate_run
GRID_KEYS = ("K", "N0", "n", "u", "R", "T", "epochs", "p", "a_max")
def _grid_axes_from_config(params: Track1Parameters, grid: dict[str, list[Any]] | None) -> dict[str, list[Any]]:
base = {
"K": [params.K],
"N0": [params.N0],
"n": [params.n],
"u": [params.u],
"R": [params.R],
"T": [params.T],
"epochs": [params.epochs],
"p": [params.p],
"a_max": [params.a_max],
}
if not grid:
return base
for key, values in grid.items():
if key not in GRID_KEYS:
raise ValueError(f"Unsupported extinction dataset grid key: {key}")
if not isinstance(values, list) or len(values) == 0:
raise ValueError(f"Grid key {key} must map to a non-empty list.")
base[key] = values
return base
def generate_extinction_dataset(
params: Track1Parameters,
runs: int,
seed_start: int,
dataset_dir: str | Path,
grid: dict[str, list[Any]] | None = None,
) -> dict[str, Any]:
outdir = Path(dataset_dir)
outdir.mkdir(parents=True, exist_ok=True)
axes = _grid_axes_from_config(params, grid)
combinations = list(itertools.product(*(axes[key] for key in GRID_KEYS)))
generation_rows = []
run_rows = []
treatment_rows = []
for treatment_index, values in enumerate(combinations):
combo = dict(zip(GRID_KEYS, values))
run_params = Track1Parameters(
K=int(combo["K"]),
N0=int(combo["N0"]),
n=int(combo["n"]),
u=float(combo["u"]),
R=float(combo["R"]),
T=int(combo["T"]),
epochs=int(combo["epochs"]),
p=float(combo["p"]),
a_max=None if combo["a_max"] is None else int(combo["a_max"]),
)
treatment_rows.append(
{
"treatment_index": treatment_index,
**asdict(run_params),
"M": run_params.M,
"runs": runs,
}
)
for run_offset in range(runs):
seed = seed_start + (treatment_index * runs) + run_offset
summaries = simulate_run(run_params, seed=seed)
generation_rows.extend(build_extinction_generation_rows(run_params, summaries, seed=seed))
run_rows.append(build_extinction_run_row(run_params, summaries, seed=seed))
save_jsonl(generation_rows, outdir / "generation_rows.jsonl")
save_jsonl(run_rows, outdir / "run_rows.jsonl")
(outdir / "treatments.json").write_text(
json.dumps(treatment_rows, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
metadata = {
"dataset_dir": str(outdir),
"generation_rows_path": str(outdir / "generation_rows.jsonl"),
"run_rows_path": str(outdir / "run_rows.jsonl"),
"treatments_path": str(outdir / "treatments.json"),
"treatment_count": len(treatment_rows),
"run_row_count": len(run_rows),
"generation_row_count": len(generation_rows),
"runs_per_treatment": runs,
"seed_start": seed_start,
"grid": axes,
}
(outdir / "metadata.json").write_text(json.dumps(metadata, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return metadata

View File

@ -0,0 +1,259 @@
"""
track1_extinction.py
Dataset builders and analysis scaffolding for extinction-risk modeling on top
of the Track 1 simulation output.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
import json
from pathlib import Path
from typing import Iterable
from .track1_analysis import TrackingSummary, summarize_tracking
from .track1_reference import GenerationSummary, Track1Parameters
@dataclass(frozen=True)
class ExtinctionGenerationRow:
"""Per-generation covariate row for discrete-time extinction modeling."""
seed: int
t: int
K: int
N0: int
n: int
u: float
M: float
R: float
T: int
epochs: int
p: float
N: int
female_count: int
male_count: int
female_fraction: float
fecundity: float
mean_fitness: float
mean_expected_female_productivity: float
target_value: float
mean_allele_value: float
mean_genotype_value: float
mean_tracking_gap: float
abs_tracking_gap: float
birth_count: int
surviving_offspring_count: int
survival_fraction: float
replacement_deficit: float
expected_mutation_count: float
realized_mutation_count: int
mutation_shortfall: float
realized_mutation_rate_per_allele: float
ne_approx: float
extinction_occurred: bool
extinction_next_step: bool
@dataclass(frozen=True)
class ExtinctionRunRow:
"""Run-level summary row for finite-horizon extinction modeling."""
seed: int
K: int
N0: int
n: int
u: float
M: float
R: float
T: int
epochs: int
p: float
generations_recorded: int
extinction_occurred: bool
first_extinction_t: int | None
final_extinct: bool
final_N: int
min_N: int
max_N: int
mean_N: float
final_mean_allele_value: float
final_target_value: float
final_tracking_gap: float
mean_abs_tracking_gap: float
max_abs_tracking_gap: float
first_nonzero_allele_t: int | None
last_nonzero_allele_t: int | None
stayed_zero_after_initialization: bool
first_productivity_below_replacement_t: int | None
fraction_generations_below_replacement: float
longest_zero_mutation_streak: int
cumulative_expected_mutations: float
cumulative_realized_mutations: int
cumulative_mutation_shortfall: float
def _survival_fraction(summary: GenerationSummary) -> float:
if summary.birth_count <= 0:
return 0.0
return float(summary.surviving_offspring_count / summary.birth_count)
def _replacement_deficit(summary: GenerationSummary) -> float:
return float(max(0.0, 2.0 - summary.mean_expected_female_productivity))
def _mutation_shortfall(summary: GenerationSummary) -> float:
return float(max(0.0, summary.expected_mutations_current_N - summary.realized_mutation_count))
def build_extinction_generation_rows(
params: Track1Parameters,
summaries: Iterable[GenerationSummary],
seed: int,
) -> list[ExtinctionGenerationRow]:
summary_list = list(summaries)
rows: list[ExtinctionGenerationRow] = []
for index, summary in enumerate(summary_list):
next_extinct = bool(summary_list[index + 1].extinct) if index + 1 < len(summary_list) else bool(summary.extinct)
rows.append(
ExtinctionGenerationRow(
seed=seed,
t=int(summary.t),
K=params.K,
N0=params.N0,
n=params.n,
u=params.u,
M=params.M,
R=params.R,
T=params.T,
epochs=params.epochs,
p=params.p,
N=int(summary.N),
female_count=int(summary.female_count),
male_count=int(summary.male_count),
female_fraction=float(summary.female_fraction),
fecundity=float(summary.fecundity),
mean_fitness=float(summary.mean_fitness),
mean_expected_female_productivity=float(summary.mean_expected_female_productivity),
target_value=float(summary.target_value),
mean_allele_value=float(summary.mean_allele_value),
mean_genotype_value=float(summary.mean_genotype_value),
mean_tracking_gap=float(summary.mean_tracking_gap),
abs_tracking_gap=float(abs(summary.mean_tracking_gap)),
birth_count=int(summary.birth_count),
surviving_offspring_count=int(summary.surviving_offspring_count),
survival_fraction=_survival_fraction(summary),
replacement_deficit=_replacement_deficit(summary),
expected_mutation_count=float(summary.expected_mutations_current_N),
realized_mutation_count=int(summary.realized_mutation_count),
mutation_shortfall=_mutation_shortfall(summary),
realized_mutation_rate_per_allele=float(summary.realized_mutation_rate_per_allele),
ne_approx=float(summary.ne_approx),
extinction_occurred=bool(summary.extinct),
extinction_next_step=next_extinct,
)
)
return rows
def build_extinction_run_row(
params: Track1Parameters,
summaries: Iterable[GenerationSummary],
seed: int,
) -> ExtinctionRunRow:
summary_list = list(summaries)
tracking: TrackingSummary = summarize_tracking(summary_list)
if not summary_list:
return ExtinctionRunRow(
seed=seed,
K=params.K,
N0=params.N0,
n=params.n,
u=params.u,
M=params.M,
R=params.R,
T=params.T,
epochs=params.epochs,
p=params.p,
generations_recorded=0,
extinction_occurred=False,
first_extinction_t=None,
final_extinct=True,
final_N=0,
min_N=0,
max_N=0,
mean_N=0.0,
final_mean_allele_value=0.0,
final_target_value=0.0,
final_tracking_gap=0.0,
mean_abs_tracking_gap=0.0,
max_abs_tracking_gap=0.0,
first_nonzero_allele_t=None,
last_nonzero_allele_t=None,
stayed_zero_after_initialization=True,
first_productivity_below_replacement_t=None,
fraction_generations_below_replacement=0.0,
longest_zero_mutation_streak=0,
cumulative_expected_mutations=0.0,
cumulative_realized_mutations=0,
cumulative_mutation_shortfall=0.0,
)
below_replacement = [s for s in summary_list if s.mean_expected_female_productivity < 2.0]
first_below_replacement_t = None if not below_replacement else int(below_replacement[0].t)
zero_mutation_streak = 0
longest_zero_mutation_streak = 0
for summary in summary_list:
if summary.realized_mutation_count == 0:
zero_mutation_streak += 1
longest_zero_mutation_streak = max(longest_zero_mutation_streak, zero_mutation_streak)
else:
zero_mutation_streak = 0
cumulative_expected = float(sum(s.expected_mutations_current_N for s in summary_list))
cumulative_realized = int(sum(s.realized_mutation_count for s in summary_list))
cumulative_shortfall = float(sum(_mutation_shortfall(s) for s in summary_list))
final = summary_list[-1]
return ExtinctionRunRow(
seed=seed,
K=params.K,
N0=params.N0,
n=params.n,
u=params.u,
M=params.M,
R=params.R,
T=params.T,
epochs=params.epochs,
p=params.p,
generations_recorded=len(summary_list),
extinction_occurred=tracking.extinction_occurred,
first_extinction_t=tracking.first_extinction_t,
final_extinct=bool(final.extinct),
final_N=int(final.N),
min_N=min(int(s.N) for s in summary_list),
max_N=max(int(s.N) for s in summary_list),
mean_N=float(sum(s.N for s in summary_list) / len(summary_list)),
final_mean_allele_value=float(tracking.final_mean_allele_value),
final_target_value=float(tracking.final_target_value),
final_tracking_gap=float(tracking.final_tracking_gap),
mean_abs_tracking_gap=float(tracking.mean_abs_tracking_gap),
max_abs_tracking_gap=float(tracking.max_abs_tracking_gap),
first_nonzero_allele_t=tracking.first_nonzero_allele_t,
last_nonzero_allele_t=tracking.last_nonzero_allele_t,
stayed_zero_after_initialization=bool(tracking.stayed_zero_after_initialization),
first_productivity_below_replacement_t=first_below_replacement_t,
fraction_generations_below_replacement=float(len(below_replacement) / len(summary_list)),
longest_zero_mutation_streak=longest_zero_mutation_streak,
cumulative_expected_mutations=cumulative_expected,
cumulative_realized_mutations=cumulative_realized,
cumulative_mutation_shortfall=cumulative_shortfall,
)
def save_jsonl(rows: Iterable[object], path: str | Path) -> None:
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
with out.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(asdict(row), sort_keys=True) + "\n")

250
src/renunney/track1_fit.py Normal file
View File

@ -0,0 +1,250 @@
"""
track1_fit.py
Simple dependency-free baseline fitting for extinction-risk models using
run-level Track 1 extinction datasets.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
import json
from math import log
from pathlib import Path
from typing import Iterable
import numpy as np
DEFAULT_RUN_FEATURES = (
"log_M",
"inv_T",
"n",
"log_K",
"log_N0_over_K",
"mean_abs_tracking_gap",
"fraction_generations_below_replacement",
"longest_zero_mutation_streak",
"cumulative_mutation_shortfall_per_generation",
)
@dataclass(frozen=True)
class ExtinctionLogitModel:
"""Baseline run-level logistic regression model."""
feature_names: list[str]
feature_means: list[float]
feature_scales: list[float]
coefficients: list[float]
intercept: float
iterations: int
converged: bool
@dataclass(frozen=True)
class ExtinctionFitSummary:
"""Training-set summary for the baseline extinction fit."""
sample_count: int
extinction_count: int
non_extinction_count: int
brier_score: float
log_loss: float
mean_predicted_probability: float
@dataclass(frozen=True)
class ExtinctionClassBalance:
"""Outcome-class counts for a run-level extinction dataset."""
sample_count: int
extinction_count: int
non_extinction_count: int
def load_jsonl(path: str | Path) -> list[dict]:
rows: list[dict] = []
with Path(path).open("r", encoding="utf-8") as handle:
for line in handle:
text = line.strip()
if text:
rows.append(json.loads(text))
return rows
def class_balance(rows: Iterable[dict]) -> ExtinctionClassBalance:
row_list = list(rows)
extinction_count = sum(1 for row in row_list if bool(row["extinction_occurred"]))
sample_count = len(row_list)
return ExtinctionClassBalance(
sample_count=sample_count,
extinction_count=extinction_count,
non_extinction_count=sample_count - extinction_count,
)
def _safe_log(value: float, eps: float = 1.0e-12) -> float:
return float(log(max(value, eps)))
def feature_value(row: dict, feature_name: str) -> float:
if feature_name == "log_M":
return _safe_log(float(row["M"]))
if feature_name == "inv_T":
return 1.0 / float(row["T"])
if feature_name == "n":
return float(row["n"])
if feature_name == "log_K":
return _safe_log(float(row["K"]))
if feature_name == "log_N0_over_K":
return _safe_log(float(row["N0"]) / float(row["K"]))
if feature_name == "mean_abs_tracking_gap":
return float(row["mean_abs_tracking_gap"])
if feature_name == "fraction_generations_below_replacement":
return float(row["fraction_generations_below_replacement"])
if feature_name == "longest_zero_mutation_streak":
return float(row["longest_zero_mutation_streak"])
if feature_name == "cumulative_mutation_shortfall_per_generation":
generations = max(1.0, float(row["generations_recorded"]))
return float(row["cumulative_mutation_shortfall"]) / generations
raise ValueError(f"Unsupported extinction-fit feature: {feature_name}")
def design_matrix_from_run_rows(
rows: Iterable[dict],
feature_names: Iterable[str] = DEFAULT_RUN_FEATURES,
) -> tuple[np.ndarray, np.ndarray, list[str]]:
feature_list = list(feature_names)
row_list = list(rows)
x = np.array(
[[feature_value(row, name) for name in feature_list] for row in row_list],
dtype=float,
)
y = np.array([1.0 if bool(row["extinction_occurred"]) else 0.0 for row in row_list], dtype=float)
return x, y, feature_list
def standardize_design_matrix(x: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
means = np.mean(x, axis=0)
scales = np.std(x, axis=0, ddof=0)
scales = np.where(scales <= 1.0e-12, 1.0, scales)
x_std = (x - means) / scales
return x_std, means, scales
def _sigmoid(z: np.ndarray) -> np.ndarray:
clipped = np.clip(z, -30.0, 30.0)
return 1.0 / (1.0 + np.exp(-clipped))
def fit_logistic_regression(
x: np.ndarray,
y: np.ndarray,
l2_penalty: float = 1.0e-6,
max_iter: int = 200,
tol: float = 1.0e-8,
) -> tuple[np.ndarray, int, bool]:
if x.ndim != 2:
raise ValueError("x must be a 2D array.")
if y.ndim != 1 or y.shape[0] != x.shape[0]:
raise ValueError("y must be a 1D array aligned with x.")
if x.shape[0] == 0:
raise ValueError("Cannot fit logistic regression with zero rows.")
if np.all(y == y[0]):
raise ValueError("Extinction fit requires both extinct and non-extinct runs.")
x1 = np.column_stack([np.ones(x.shape[0], dtype=float), x])
beta = np.zeros(x1.shape[1], dtype=float)
converged = False
for iteration in range(1, max_iter + 1):
eta = x1 @ beta
p = _sigmoid(eta)
w = np.clip(p * (1.0 - p), 1.0e-8, None)
z = eta + (y - p) / w
wx = x1 * w[:, None]
xtwx = x1.T @ wx
penalty = np.eye(x1.shape[1], dtype=float) * l2_penalty
penalty[0, 0] = 0.0
xtwz = x1.T @ (w * z)
beta_new = np.linalg.solve(xtwx + penalty, xtwz)
if np.max(np.abs(beta_new - beta)) < tol:
beta = beta_new
converged = True
return beta, iteration, converged
beta = beta_new
return beta, max_iter, converged
def predict_probabilities(model: ExtinctionLogitModel, rows: Iterable[dict]) -> np.ndarray:
row_list = list(rows)
if not row_list:
return np.array([], dtype=float)
x = np.array(
[[feature_value(row, name) for name in model.feature_names] for row in row_list],
dtype=float,
)
means = np.array(model.feature_means, dtype=float)
scales = np.array(model.feature_scales, dtype=float)
x_std = (x - means) / scales
eta = model.intercept + x_std @ np.array(model.coefficients, dtype=float)
return _sigmoid(eta)
def summarize_fit(y: np.ndarray, p: np.ndarray) -> ExtinctionFitSummary:
p_clip = np.clip(p, 1.0e-12, 1.0 - 1.0e-12)
brier = float(np.mean((p - y) ** 2))
log_loss = float(-np.mean(y * np.log(p_clip) + (1.0 - y) * np.log(1.0 - p_clip)))
extinction_count = int(np.sum(y))
sample_count = int(y.shape[0])
return ExtinctionFitSummary(
sample_count=sample_count,
extinction_count=extinction_count,
non_extinction_count=sample_count - extinction_count,
brier_score=brier,
log_loss=log_loss,
mean_predicted_probability=float(np.mean(p)),
)
def fit_extinction_run_model(
run_rows: Iterable[dict],
feature_names: Iterable[str] = DEFAULT_RUN_FEATURES,
) -> tuple[ExtinctionLogitModel, ExtinctionFitSummary]:
row_list = list(run_rows)
x, y, feature_list = design_matrix_from_run_rows(row_list, feature_names=feature_names)
x_std, means, scales = standardize_design_matrix(x)
beta, iterations, converged = fit_logistic_regression(x_std, y)
model = ExtinctionLogitModel(
feature_names=feature_list,
feature_means=[float(value) for value in means],
feature_scales=[float(value) for value in scales],
intercept=float(beta[0]),
coefficients=[float(value) for value in beta[1:]],
iterations=iterations,
converged=converged,
)
p = predict_probabilities(model, row_list)
return model, summarize_fit(y, p)
def fit_extinction_run_model_from_jsonl(
run_rows_path: str | Path,
feature_names: Iterable[str] = DEFAULT_RUN_FEATURES,
) -> tuple[ExtinctionLogitModel, ExtinctionFitSummary]:
rows = load_jsonl(run_rows_path)
return fit_extinction_run_model(rows, feature_names=feature_names)
def fit_payload_from_jsonl(
run_rows_path: str | Path,
feature_names: Iterable[str] = DEFAULT_RUN_FEATURES,
) -> dict:
model, summary = fit_extinction_run_model_from_jsonl(run_rows_path, feature_names=feature_names)
return {
"run_rows_path": str(Path(run_rows_path)),
"model": asdict(model),
"summary": asdict(summary),
}

View File

@ -0,0 +1,428 @@
"""
track1_reference.py
Local Track 1 reference module for renunney.
This is the historically faithful Nunney-style simulation kernel used by the
local analysis, threshold, and orchestration layers.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import numpy as np
@dataclass(frozen=True, init=False)
class Track1Parameters:
"""Reference parameters for the Track 1 baseline."""
K: int
N0: int
n: int
u: float
R: float
T: int
epochs: int = 8
p: float = 0.5
a_max: Optional[int] = None
def __init__(
self,
K: int,
N0: int,
n: int,
u: float | None = None,
R: float = 10.0,
T: int = 300,
epochs: int = 8,
p: float = 0.5,
a_max: Optional[int] = None,
M: float | None = None,
) -> None:
if u is None:
if M is None:
raise ValueError("Track1Parameters requires u, or convenience M to derive u.")
u = float(M / (2.0 * K))
object.__setattr__(self, "K", int(K))
object.__setattr__(self, "N0", int(N0))
object.__setattr__(self, "n", int(n))
object.__setattr__(self, "u", float(u))
object.__setattr__(self, "R", float(R))
object.__setattr__(self, "T", int(T))
object.__setattr__(self, "epochs", int(epochs))
object.__setattr__(self, "p", float(p))
object.__setattr__(self, "a_max", a_max)
@property
def M(self) -> float:
return float(2.0 * self.K * self.u)
def resolved_a_max(self) -> int:
if self.a_max is not None:
return self.a_max
return self.epochs
def intrinsic_growth_rate(self) -> float:
return float(np.log(self.R / 2.0))
def total_generations(self) -> int:
return self.epochs * self.T + (self.T // 2)
@dataclass
class PopulationState:
"""Diploid population state for one generation."""
genomes: np.ndarray
sexes: np.ndarray
@property
def size(self) -> int:
return int(self.genomes.shape[0])
@dataclass(frozen=True)
class GenerationSummary:
"""Per-generation diagnostics for Track 1."""
t: int
N: int
female_fraction: float
male_count: int
female_count: int
fecundity: float
mean_fitness: float
mean_expected_female_productivity: float
target_value: float
mean_allele_value: float
mean_genotype_value: float
mean_tracking_gap: float
paper_M: float
expected_mutations_current_N: float
realized_mutation_count: int
realized_mutation_rate_per_allele: float
birth_count: int
surviving_offspring_count: int
ne_approx: float
extinct: bool
def initialize_population(params: Track1Parameters, rng: np.random.Generator) -> PopulationState:
genomes = np.zeros((params.N0, 2, params.n), dtype=np.int16)
sexes = rng.binomial(1, 1.0 - params.p, size=params.N0).astype(np.int8)
return PopulationState(genomes=genomes, sexes=sexes)
def female_fecundity(r: float, N: int, K: int) -> float:
return float(2.0 * np.exp(r * (1.0 - (N / K) ** (1.0 / r))))
def genotype_fitness(genomes: np.ndarray, r: float, T: int, t: int) -> np.ndarray:
target = t / T
locus_means = 0.5 * (genomes[:, 0, :] + genomes[:, 1, :])
squared_distance = np.square(locus_means - target, dtype=np.float64)
return np.exp(-(r / genomes.shape[2]) * np.sum(squared_distance, axis=1))
def expected_female_productivity(fecundity: float, fitness: np.ndarray) -> np.ndarray:
return fecundity * fitness
def paper_mutation_supply_M(K: int, u: float) -> float:
return float(2.0 * K * u)
def expected_mutations_for_population(N: int, n: int, u: float) -> float:
return float(2.0 * N * n * u)
def allele_tracking_metrics(genomes: np.ndarray, T: int, t: int) -> tuple[float, float, float, float]:
target_value = float(t / T)
if genomes.size == 0:
return target_value, 0.0, 0.0, -target_value
mean_allele_value = float(np.mean(genomes))
genotype_values = 0.5 * (genomes[:, 0, :] + genomes[:, 1, :])
mean_genotype_value = float(np.mean(genotype_values))
mean_tracking_gap = mean_genotype_value - target_value
return target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap
def approximate_ne(N: int, female_fraction: float) -> float:
male_fraction = 1.0 - female_fraction
denom = female_fraction + male_fraction
if N <= 0 or female_fraction <= 0.0 or male_fraction <= 0.0 or denom == 0.0:
return 0.0
return float((2.0 * N * female_fraction * male_fraction) / denom)
def female_fraction(sexes: np.ndarray) -> float:
if sexes.size == 0:
return 0.0
return float(np.sum(sexes == 0) / sexes.size)
def is_extinct(state: PopulationState) -> bool:
if state.size == 0:
return True
female_count = int(np.sum(state.sexes == 0))
male_count = int(np.sum(state.sexes == 1))
return female_count == 0 or male_count == 0
def choose_gamete(parent: np.ndarray, rng: np.random.Generator) -> np.ndarray:
n = parent.shape[1]
picks = rng.integers(0, 2, size=n)
gamete = np.empty(n, dtype=parent.dtype)
gamete[:] = parent[picks, np.arange(n)]
return gamete
def choose_mutant_allele(current: int, a_max: int, rng: np.random.Generator) -> int:
if a_max <= 0:
return current
draw = int(rng.integers(0, a_max))
return draw if draw < current else draw + 1
def mutate_zygote(zygote: np.ndarray, params: Track1Parameters, rng: np.random.Generator) -> np.ndarray:
a_max = params.resolved_a_max()
out = zygote.copy()
for chrom in range(out.shape[0]):
for locus in range(out.shape[1]):
if rng.random() <= params.u:
out[chrom, locus] = choose_mutant_allele(int(out[chrom, locus]), a_max, rng)
return out
def produce_offspring(
mother: np.ndarray,
father: np.ndarray,
params: Track1Parameters,
rng: np.random.Generator,
locus_indices: np.ndarray,
) -> tuple[np.ndarray, int]:
n = params.n
offspring = np.empty((2, n), dtype=mother.dtype)
maternal_picks = rng.integers(0, 2, size=n)
paternal_picks = rng.integers(0, 2, size=n)
offspring[0, :] = mother[maternal_picks, locus_indices]
offspring[1, :] = father[paternal_picks, locus_indices]
a_max = params.resolved_a_max()
mutation_count = 0
for chrom in range(2):
for locus in range(n):
if rng.random() <= params.u:
offspring[chrom, locus] = choose_mutant_allele(int(offspring[chrom, locus]), a_max, rng)
mutation_count += 1
return offspring, mutation_count
def realize_birth_counts(fecundity: float, sexes: np.ndarray, rng: np.random.Generator) -> np.ndarray:
female_mask = sexes == 0
counts = np.zeros(sexes.shape[0], dtype=np.int32)
counts[female_mask] = rng.poisson(fecundity, size=int(np.sum(female_mask)))
return counts
def summarize_generation(state: PopulationState, params: Track1Parameters, t: int) -> GenerationSummary:
if state.size == 0:
return GenerationSummary(
t=t,
N=0,
female_fraction=0.0,
male_count=0,
female_count=0,
fecundity=0.0,
mean_fitness=0.0,
mean_expected_female_productivity=0.0,
target_value=float(t / params.T),
mean_allele_value=0.0,
mean_genotype_value=0.0,
mean_tracking_gap=float(-(t / params.T)),
paper_M=params.M,
expected_mutations_current_N=0.0,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=0.0,
extinct=True,
)
fit, fec, exp_fp, ff, female_count, male_count = generation_metrics(state, params, t)
mean_expected_fp = float(np.mean(exp_fp[state.sexes == 0])) if female_count > 0 else 0.0
target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap = allele_tracking_metrics(
state.genomes,
T=params.T,
t=t,
)
return GenerationSummary(
t=t,
N=state.size,
female_fraction=ff,
male_count=male_count,
female_count=female_count,
fecundity=fec,
mean_fitness=float(np.mean(fit)),
mean_expected_female_productivity=mean_expected_fp,
target_value=target_value,
mean_allele_value=mean_allele_value,
mean_genotype_value=mean_genotype_value,
mean_tracking_gap=mean_tracking_gap,
paper_M=params.M,
expected_mutations_current_N=expected_mutations_for_population(state.size, params.n, params.u),
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=approximate_ne(state.size, ff),
extinct=is_extinct(state),
)
def generation_metrics(
state: PopulationState,
params: Track1Parameters,
t: int,
) -> tuple[np.ndarray, float, np.ndarray, float, int, int]:
r = params.intrinsic_growth_rate()
fit = genotype_fitness(state.genomes, r=r, T=params.T, t=t)
fec = female_fecundity(r=r, N=state.size, K=params.K)
exp_fp = expected_female_productivity(fec, fit)
female_count = int(np.sum(state.sexes == 0))
male_count = int(state.size - female_count)
ff = float(female_count / state.size) if state.size > 0 else 0.0
return fit, fec, exp_fp, ff, female_count, male_count
def simulate_one_generation(
state: PopulationState,
params: Track1Parameters,
t: int,
rng: np.random.Generator,
) -> tuple[PopulationState, GenerationSummary]:
if state.size == 0:
summary = summarize_generation(state, params, t)
return state, summary
fit, fec, exp_fp, ff, female_count, male_count = generation_metrics(state, params, t)
extinct = female_count == 0 or male_count == 0
target_value, mean_allele_value, mean_genotype_value, mean_tracking_gap = allele_tracking_metrics(
state.genomes,
T=params.T,
t=t,
)
expected_mutations_current_N = expected_mutations_for_population(state.size, params.n, params.u)
summary = GenerationSummary(
t=t,
N=state.size,
female_fraction=ff,
male_count=male_count,
female_count=female_count,
fecundity=fec,
mean_fitness=float(np.mean(fit)),
mean_expected_female_productivity=float(np.mean(exp_fp[state.sexes == 0])) if female_count > 0 else 0.0,
target_value=target_value,
mean_allele_value=mean_allele_value,
mean_genotype_value=mean_genotype_value,
mean_tracking_gap=mean_tracking_gap,
paper_M=paper_mutation_supply_M(params.K, params.u),
expected_mutations_current_N=expected_mutations_current_N,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=approximate_ne(state.size, ff),
extinct=extinct,
)
if extinct:
return state, summary
birth_counts = realize_birth_counts(fec, state.sexes, rng)
female_indices = np.flatnonzero(state.sexes == 0)
male_indices = np.flatnonzero(state.sexes == 1)
total_births = int(np.sum(birth_counts[female_indices]))
new_genomes = np.zeros((total_births, 2, params.n), dtype=np.int16)
new_sexes = np.zeros(total_births, dtype=np.int8)
locus_indices = np.arange(params.n)
r = params.intrinsic_growth_rate()
offspring_t = t + 1
survivor_cursor = 0
realized_mutation_count = 0
for mother_index in female_indices:
count = int(birth_counts[mother_index])
if count == 0:
continue
father_index = int(male_indices[rng.integers(0, male_indices.size)])
for _ in range(count):
offspring, offspring_mutations = produce_offspring(
state.genomes[mother_index],
state.genomes[father_index],
params,
rng,
locus_indices,
)
realized_mutation_count += offspring_mutations
offspring_fitness = float(
genotype_fitness(offspring[np.newaxis, :, :], r=r, T=params.T, t=offspring_t)[0]
)
if rng.random() <= offspring_fitness:
new_genomes[survivor_cursor] = offspring
new_sexes[survivor_cursor] = int(rng.binomial(1, 1.0 - params.p))
survivor_cursor += 1
allele_exposures = 2 * total_births * params.n
next_state = PopulationState(genomes=new_genomes[:survivor_cursor], sexes=new_sexes[:survivor_cursor])
summary = GenerationSummary(
t=summary.t,
N=summary.N,
female_fraction=summary.female_fraction,
male_count=summary.male_count,
female_count=summary.female_count,
fecundity=summary.fecundity,
mean_fitness=summary.mean_fitness,
mean_expected_female_productivity=summary.mean_expected_female_productivity,
target_value=summary.target_value,
mean_allele_value=summary.mean_allele_value,
mean_genotype_value=summary.mean_genotype_value,
mean_tracking_gap=summary.mean_tracking_gap,
paper_M=summary.paper_M,
expected_mutations_current_N=summary.expected_mutations_current_N,
realized_mutation_count=realized_mutation_count,
realized_mutation_rate_per_allele=0.0
if allele_exposures == 0
else float(realized_mutation_count / allele_exposures),
birth_count=total_births,
surviving_offspring_count=survivor_cursor,
ne_approx=summary.ne_approx,
extinct=summary.extinct,
)
return next_state, summary
def simulate_run(
params: Track1Parameters,
seed: Optional[int] = None,
) -> list[GenerationSummary]:
rng = np.random.default_rng(seed)
state = initialize_population(params, rng)
t = -(params.T // 2)
summaries: list[GenerationSummary] = []
for _ in range(params.total_generations()):
state, summary = simulate_one_generation(state, params, t, rng)
summaries.append(summary)
if is_extinct(state):
terminal_t = t if summary.extinct else t + 1
terminal_summary = summarize_generation(state, params, terminal_t)
if summaries[-1] != terminal_summary:
summaries.append(terminal_summary)
break
t += 1
return summaries

View File

@ -0,0 +1,294 @@
"""
track1_report.py
Local report generation for Track 1 simulation runs.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
import json
from pathlib import Path
from typing import Iterable
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
from .track1_analysis import summarize_tracking
from .track1_reference import GenerationSummary, Track1Parameters, simulate_run
@dataclass(frozen=True)
class SeriesCI:
t: int
count: int
mean: float
ci_low: float
ci_high: float
def confidence_interval(values: Iterable[float]) -> tuple[int, float, float, float]:
arr = np.array(list(values), dtype=float)
count = int(arr.size)
if count == 0:
return 0, 0.0, 0.0, 0.0
mean = float(np.mean(arr))
if count == 1:
return count, mean, mean, mean
sem = float(np.std(arr, ddof=1) / np.sqrt(count))
margin = 1.96 * sem
return count, mean, mean - margin, mean + margin
def aggregate_series(runs: list[list[GenerationSummary]], attr: str) -> list[SeriesCI]:
by_t: dict[int, list[float]] = {}
for run in runs:
for summary in run:
by_t.setdefault(int(summary.t), []).append(float(getattr(summary, attr)))
rows: list[SeriesCI] = []
for t in sorted(by_t):
count, mean, low, high = confidence_interval(by_t[t])
rows.append(SeriesCI(t=t, count=count, mean=mean, ci_low=low, ci_high=high))
return rows
def aggregate_derived_series(
runs: list[list[GenerationSummary]],
fn,
) -> list[SeriesCI]:
by_t: dict[int, list[float]] = {}
for run in runs:
for summary in run:
by_t.setdefault(int(summary.t), []).append(float(fn(summary)))
rows: list[SeriesCI] = []
for t in sorted(by_t):
count, mean, low, high = confidence_interval(by_t[t])
rows.append(SeriesCI(t=t, count=count, mean=mean, ci_low=low, ci_high=high))
return rows
def plot_series(series: list[SeriesCI], ylabel: str, title: str, outpath: Path) -> None:
x = np.array([row.t for row in series], dtype=float)
y = np.array([row.mean for row in series], dtype=float)
low = np.array([row.ci_low for row in series], dtype=float)
high = np.array([row.ci_high for row in series], dtype=float)
plt.figure(figsize=(9, 4.8))
plt.plot(x, y, linewidth=2)
plt.fill_between(x, low, high, alpha=0.25)
plt.xlabel("Generation t")
plt.ylabel(ylabel)
plt.title(title)
plt.tight_layout()
plt.savefig(outpath, dpi=150)
plt.close()
def plot_series_with_reference(
series: list[SeriesCI],
ylabel: str,
title: str,
outpath: Path,
ref_value: float,
ref_label: str,
) -> None:
x = np.array([row.t for row in series], dtype=float)
y = np.array([row.mean for row in series], dtype=float)
low = np.array([row.ci_low for row in series], dtype=float)
high = np.array([row.ci_high for row in series], dtype=float)
plt.figure(figsize=(9, 4.8))
plt.plot(x, y, linewidth=2, label="Observed mean")
plt.fill_between(x, low, high, alpha=0.25)
plt.axhline(ref_value, linestyle="--", linewidth=2, label=ref_label)
plt.xlabel("Generation t")
plt.ylabel(ylabel)
plt.title(title)
plt.legend()
plt.tight_layout()
plt.savefig(outpath, dpi=150)
plt.close()
def plot_mean_allele_vs_target(
allele_series: list[SeriesCI],
target_series: list[SeriesCI],
outpath: Path,
) -> None:
x = np.array([row.t for row in allele_series], dtype=float)
allele_mean = np.array([row.mean for row in allele_series], dtype=float)
allele_low = np.array([row.ci_low for row in allele_series], dtype=float)
allele_high = np.array([row.ci_high for row in allele_series], dtype=float)
target_mean = np.array([row.mean for row in target_series], dtype=float)
plt.figure(figsize=(9, 4.8))
plt.plot(x, allele_mean, linewidth=2, label="Mean allele value")
plt.fill_between(x, allele_low, allele_high, alpha=0.25)
plt.plot(x, target_mean, linewidth=2, linestyle="--", label="Selective target t/T")
plt.xlabel("Generation t")
plt.ylabel("Allele-value scale")
plt.title("Mean Allele Value vs Selective Target")
plt.legend()
plt.tight_layout()
plt.savefig(outpath, dpi=150)
plt.close()
def render_markdown_report(
params: Track1Parameters,
runs: int,
seeds: list[int],
tracking_rows: list[dict],
figures: dict[str, str],
) -> str:
lines = [
"# Track 1 Run Report",
"",
"## Parameters",
"",
]
param_items = list(asdict(params).items())
insertion_index = next((idx + 1 for idx, (key, _) in enumerate(param_items) if key == "u"), len(param_items))
param_items.insert(insertion_index, ("M", params.M))
for key, value in param_items:
lines.append(f"- `{key}`: `{value}`")
lines.extend(
[
f"- `sim_runs`: `{runs}`",
f"- `seed_start`: `{seeds[0] if seeds else 0}`",
"",
"## Tracking Summary By Run",
"",
"| Run Seed | Extinction Occurred | First Extinction t | First Nonzero Allele t | Last Nonzero Allele t | Stayed Zero After Init | Final Mean Allele | Final Target | Final Gap | Mean |gap| | Max |gap| |",
"| --- | --- | ---: | ---: | ---: | --- | ---: | ---: | ---: | ---: | ---: |",
]
)
for row in tracking_rows:
lines.append(
f"| `{row['seed']}` | `{row['extinction_occurred']}` | `{row['first_extinction_t']}` | "
f"`{row['first_nonzero_allele_t']}` | `{row['last_nonzero_allele_t']}` | "
f"`{row['stayed_zero_after_initialization']}` | `{row['final_mean_allele_value']:.6f}` | "
f"`{row['final_target_value']:.6f}` | `{row['final_tracking_gap']:.6f}` | "
f"`{row['mean_abs_tracking_gap']:.6f}` | `{row['max_abs_tracking_gap']:.6f}` |"
)
lines.extend(["", "## Figures", ""])
for label, relpath in figures.items():
lines.extend([f"### {label}", "", f"![{label}]({relpath})", ""])
return "\n".join(lines) + "\n"
def generate_report_bundle(
params: Track1Parameters,
runs: int,
seed_start: int,
report_dir: str | Path,
) -> dict:
outdir = Path(report_dir)
outdir.mkdir(parents=True, exist_ok=True)
run_summaries: list[list[GenerationSummary]] = []
tracking_rows: list[dict] = []
seeds = [seed_start + idx for idx in range(runs)]
for seed in seeds:
summaries = simulate_run(params, seed=seed)
run_summaries.append(summaries)
tracking = summarize_tracking(summaries)
tracking_rows.append({"seed": seed, **asdict(tracking)})
figure_specs = {
"Female Fecundity f": ("fecundity", "Female fecundity f", "figure_fecundity.png"),
"Mean Fitness w": ("mean_fitness", "Mean offspring survival w", "figure_fitness.png"),
"Expected Female Productivity f*w": (
"mean_expected_female_productivity",
"Mean expected female productivity",
"figure_expected_productivity.png",
),
"Birth Count": ("birth_count", "Birth count", "figure_birth_count.png"),
"Surviving Offspring Count": (
"surviving_offspring_count",
"Surviving offspring count",
"figure_survivor_count.png",
),
"Population Size N": ("N", "Population size N", "figure_population_size.png"),
"Tracking Gap": ("mean_tracking_gap", "Mean tracking gap", "figure_tracking_gap.png"),
"Selective Target": ("target_value", "Selective target t/T", "figure_target_value.png"),
}
derived_specs = {
"Survival Fraction": (
lambda s: 0.0 if s.birth_count == 0 else s.surviving_offspring_count / s.birth_count,
"Survivors / births",
"figure_survival_fraction.png",
),
"Fecundity Excess f - 2": (
lambda s: s.fecundity - 2.0,
"Female fecundity excess over replacement",
"figure_fecundity_excess.png",
),
}
aggregate_payload: dict[str, list[dict]] = {}
figure_paths: dict[str, str] = {}
for label, (attr, ylabel, filename) in figure_specs.items():
series = aggregate_series(run_summaries, attr)
aggregate_payload[attr] = [asdict(row) for row in series]
plot_series(series, ylabel=ylabel, title=label, outpath=outdir / filename)
figure_paths[label] = filename
for label, (fn, ylabel, filename) in derived_specs.items():
series = aggregate_derived_series(run_summaries, fn)
aggregate_payload[filename.removesuffix(".png")] = [asdict(row) for row in series]
plot_series(series, ylabel=ylabel, title=label, outpath=outdir / filename)
figure_paths[label] = filename
allele_series = aggregate_series(run_summaries, "mean_allele_value")
target_series = aggregate_series(run_summaries, "target_value")
aggregate_payload["mean_allele_overlay"] = {
"mean_allele_value": [asdict(row) for row in allele_series],
"target_value": [asdict(row) for row in target_series],
}
plot_mean_allele_vs_target(
allele_series=allele_series,
target_series=target_series,
outpath=outdir / "figure_mean_allele_vs_target.png",
)
figure_paths["Mean Allele Value vs Selective Target"] = "figure_mean_allele_vs_target.png"
realized_m_series = aggregate_series(run_summaries, "realized_mutation_count")
aggregate_payload["realized_M"] = [asdict(row) for row in realized_m_series]
plot_series_with_reference(
realized_m_series,
ylabel="Mutation count per generation",
title="Realized M vs Expected M",
outpath=outdir / "figure_realized_M.png",
ref_value=2.0 * params.K * params.u,
ref_label="Expected M = 2Ku",
)
figure_paths["Realized M vs Expected M"] = "figure_realized_M.png"
(outdir / "aggregate_series.json").write_text(
json.dumps(aggregate_payload, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
(outdir / "tracking_summary.json").write_text(
json.dumps(tracking_rows, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
report_text = render_markdown_report(
params=params,
runs=runs,
seeds=seeds,
tracking_rows=tracking_rows,
figures=figure_paths,
)
report_path = outdir / "report.md"
report_path.write_text(report_text, encoding="utf-8")
return {
"mode": "report",
"parameters": asdict(params),
"derived_M": params.M,
"runs": runs,
"seed_start": seed_start,
"report_dir": str(outdir),
"report_path": str(report_path),
"figures": figure_paths,
"tracking_summary": tracking_rows,
}

View File

@ -0,0 +1,255 @@
"""
track1_threshold.py
Local Track 1 threshold-search layer for renunney.
"""
from __future__ import annotations
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
import json
from pathlib import Path
from typing import Iterable, Optional
from .track1_reference import Track1Parameters, simulate_run
@dataclass(frozen=True)
class ThresholdCheck:
"""Summary for one T value under the Track 1 heuristic."""
T: float
runs: int
extinctions: int
@property
def survived_all(self) -> bool:
return self.extinctions == 0
@dataclass(frozen=True)
class ThresholdSearchResult:
"""Result of Nunney-style threshold search."""
threshold_T: float
baseline_check: ThresholdCheck
check_1_02: ThresholdCheck
check_1_05: ThresholdCheck
check_1_10: ThresholdCheck
retest_check: Optional[ThresholdCheck]
def _cache_key(params: Track1Parameters, T_value: float, runs: int, seed_start: int) -> str:
return json.dumps(
{
"K": params.K,
"N0": params.N0,
"n": params.n,
"u": params.u,
"R": params.R,
"T_value": T_value,
"epochs": params.epochs,
"p": params.p,
"a_max": params.a_max,
"runs": runs,
"seed_start": seed_start,
},
sort_keys=True,
)
def _load_cache(path: str | Path | None) -> dict:
if path is None:
return {}
cache_path = Path(path)
if not cache_path.exists():
return {}
return json.loads(cache_path.read_text(encoding="utf-8"))
def _save_cache(path: str | Path | None, cache: dict) -> None:
if path is None:
return
cache_path = Path(path)
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_text(json.dumps(cache, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _single_run_extinct(params: Track1Parameters, sim_T: int, seed: int) -> bool:
run_params = Track1Parameters(
K=params.K,
N0=params.N0,
n=params.n,
u=params.u,
R=params.R,
T=sim_T,
epochs=params.epochs,
p=params.p,
a_max=params.a_max,
)
summaries = simulate_run(run_params, seed=seed)
return bool(summaries and summaries[-1].extinct)
def run_extinction_check(
params: Track1Parameters,
T_value: float,
runs: int = 20,
seed_start: int = 0,
cache_path: str | Path | None = None,
jobs: int = 1,
) -> ThresholdCheck:
"""
Run repeated simulations at one T value.
reconstruction_choice:
T is rounded to the nearest integer generation count for the simulator.
"""
cache = _load_cache(cache_path)
key = _cache_key(params, T_value, runs, seed_start)
if key in cache:
cached = cache[key]
return ThresholdCheck(T=cached["T"], runs=cached["runs"], extinctions=cached["extinctions"])
sim_T = int(round(T_value))
seeds = [seed_start + run_index for run_index in range(runs)]
if jobs <= 1 or runs <= 1:
extinctions = sum(1 for seed in seeds if _single_run_extinct(params, sim_T, seed))
else:
worker_count = min(jobs, runs)
with ProcessPoolExecutor(max_workers=worker_count) as executor:
extinctions = sum(executor.map(_single_run_extinct, [params] * runs, [sim_T] * runs, seeds))
result = ThresholdCheck(T=T_value, runs=runs, extinctions=extinctions)
cache[key] = {"T": result.T, "runs": result.runs, "extinctions": result.extinctions}
_save_cache(cache_path, cache)
return result
def nunney_threshold_accepts(
baseline_check: ThresholdCheck,
check_1_02: ThresholdCheck,
check_1_05: ThresholdCheck,
check_1_10: ThresholdCheck,
) -> tuple[bool, int]:
"""Return acceptance and the number of failed higher checks."""
if not baseline_check.survived_all:
return False, 3
higher_checks = [check_1_02, check_1_05, check_1_10]
failures = sum(0 if check.survived_all else 1 for check in higher_checks)
return failures == 0, failures
def evaluate_threshold_candidate(
params: Track1Parameters,
T_value: float,
runs: int = 20,
seed_start: int = 0,
cache_path: str | Path | None = None,
jobs: int = 1,
) -> ThresholdSearchResult:
"""Evaluate one candidate threshold T using Nunney's published checks."""
baseline_check = run_extinction_check(
params,
T_value,
runs=runs,
seed_start=seed_start,
cache_path=cache_path,
jobs=jobs,
)
check_1_02 = run_extinction_check(
params,
1.02 * T_value,
runs=runs,
seed_start=seed_start + 1000,
cache_path=cache_path,
jobs=jobs,
)
check_1_05 = run_extinction_check(
params,
1.05 * T_value,
runs=runs,
seed_start=seed_start + 2000,
cache_path=cache_path,
jobs=jobs,
)
check_1_10 = run_extinction_check(
params,
1.10 * T_value,
runs=runs,
seed_start=seed_start + 3000,
cache_path=cache_path,
jobs=jobs,
)
_, failures = nunney_threshold_accepts(
baseline_check=baseline_check,
check_1_02=check_1_02,
check_1_05=check_1_05,
check_1_10=check_1_10,
)
retest_check: Optional[ThresholdCheck] = None
if failures == 1:
failed = [check for check in (check_1_02, check_1_05, check_1_10) if not check.survived_all][0]
retest_check = run_extinction_check(
params,
failed.T,
runs=runs,
seed_start=seed_start + 4000,
cache_path=cache_path,
jobs=jobs,
)
return ThresholdSearchResult(
threshold_T=T_value,
baseline_check=baseline_check,
check_1_02=check_1_02,
check_1_05=check_1_05,
check_1_10=check_1_10,
retest_check=retest_check,
)
def published_threshold_accepts(result: ThresholdSearchResult) -> bool:
"""True if the candidate passes Nunney's published criterion."""
if not result.baseline_check.survived_all:
return False
higher_checks = [result.check_1_02, result.check_1_05, result.check_1_10]
failures = [check for check in higher_checks if not check.survived_all]
if len(failures) == 0:
return True
if len(failures) > 1:
return False
if result.retest_check is None:
return False
return result.retest_check.survived_all
def search_threshold_over_candidates(
params: Track1Parameters,
candidate_T_values: Iterable[float],
runs: int = 20,
seed_start: int = 0,
cache_path: str | Path | None = None,
jobs: int = 1,
) -> Optional[ThresholdSearchResult]:
"""Search candidate T values from below and return the first accepted threshold."""
for index, T_value in enumerate(candidate_T_values):
result = evaluate_threshold_candidate(
params=params,
T_value=T_value,
runs=runs,
seed_start=seed_start + (index * 10000),
cache_path=cache_path,
jobs=jobs,
)
if published_threshold_accepts(result):
return result
return None

296
tests/test_orchestration.py Normal file
View File

@ -0,0 +1,296 @@
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.orchestration as orch
def test_registry_submit_claim_and_complete(tmp_path: Path):
db_path = tmp_path / "jobs.sqlite"
orch.initialize_registry(db_path)
manifest = {
"job_id": "job-1",
"project": "renunney",
"track": "track1",
"job_kind": "track1_locus_threshold",
"priority": 5,
"created_at": "2026-04-10T00:00:00Z",
"created_by": "test",
"worker_backend": "python-track1",
"config": {
"mode": "search",
"K": 500,
"N0": 500,
"n": 1,
"u": 0.001,
"R": 10.0,
"T": 10,
"epochs": 1,
"runs": 1,
"seed": 1,
"t_values": [5, 10],
},
"resources": {"cpu_cores": 1},
"result_paths": {"payload_json": "payloads/job-1.json"},
"retry": {"max_attempts": 2, "idempotent": True},
"notes": "",
}
orch.submit_job_manifest(db_path, manifest)
claimed = orch.claim_next_job(db_path, worker_backend="python-track1", worker_host="worker-a")
assert claimed is not None
assert claimed.job_id == "job-1"
result = {
"job_id": "job-1",
"status": "succeeded",
"worker_backend": "python-track1",
"worker_host": "worker-a",
"started_at": "2026-04-10T00:00:10Z",
"finished_at": "2026-04-10T00:00:20Z",
"wall_seconds": 10.0,
"exit_code": 0,
"config_hash": orch.hash_config(manifest["config"]),
"code_identity": {"git_commit": None},
"artifacts": {},
"summary": {"threshold_T": 10.0},
"error": None,
}
orch.complete_job(db_path, result)
jobs = orch.list_jobs(db_path)
assert jobs[0]["status"] == "succeeded"
def test_run_one_job_executes_python_track1_backend(tmp_path: Path):
db_path = tmp_path / "jobs.sqlite"
result_root = tmp_path / "results"
scratch_root = tmp_path / "scratch"
orch.initialize_registry(db_path)
manifest = {
"job_id": "job-sim-1",
"project": "renunney",
"track": "track1",
"job_kind": "track1_simulate",
"priority": 1,
"created_at": "2026-04-10T00:00:00Z",
"created_by": "test",
"worker_backend": "python-track1",
"config": {
"mode": "simulate",
"K": 500,
"N0": 500,
"n": 1,
"u": 0.001,
"R": 10.0,
"T": 10,
"epochs": 1,
"seed": 1,
},
"resources": {"cpu_cores": 1},
"result_paths": {
"payload_json": "payloads/job-sim-1.json",
"log_txt": "logs/job-sim-1.log",
},
"retry": {"max_attempts": 1, "idempotent": True},
"notes": "",
}
orch.submit_job_manifest(db_path, manifest)
result = orch.run_one_job(
db_path=db_path,
result_root=result_root,
worker_backend="python-track1",
worker_host="worker-a",
scratch_root=scratch_root,
cwd=ROOT,
)
assert result is not None
assert result["status"] == "succeeded"
payload_path = result_root / "payloads" / "job-sim-1.json"
assert payload_path.exists()
payload = json.loads(payload_path.read_text(encoding="utf-8"))
assert payload["mode"] == "simulate"
jobs = orch.list_jobs(db_path)
assert jobs[0]["status"] == "succeeded"
def test_collate_track1_figure1_groups_rows_and_fits(tmp_path: Path):
db_path = tmp_path / "jobs.sqlite"
orch.initialize_registry(db_path)
for n_value, threshold in [(1, 50.0), (2, 75.0), (3, 100.0)]:
manifest = {
"job_id": f"job-m1-n{n_value}",
"project": "renunney",
"track": "track1",
"job_kind": "track1_locus_threshold",
"priority": 1,
"created_at": "2026-04-10T00:00:00Z",
"created_by": "test",
"worker_backend": "python-track1",
"config": {
"mode": "search",
"K": 5000,
"N0": 5000,
"n": n_value,
"u": 0.0001,
},
"resources": {"cpu_cores": 1},
"result_paths": {"payload_json": f"payloads/job-m1-n{n_value}.json"},
"retry": {"max_attempts": 1, "idempotent": True},
"notes": "",
}
orch.submit_job_manifest(db_path, manifest)
result = {
"job_id": manifest["job_id"],
"status": "succeeded",
"worker_backend": "python-track1",
"worker_host": "worker-a",
"started_at": "2026-04-10T00:00:10Z",
"finished_at": "2026-04-10T00:00:20Z",
"wall_seconds": 10.0,
"exit_code": 0,
"config_hash": orch.hash_config(manifest["config"]),
"code_identity": {"git_commit": None},
"artifacts": {},
"summary": {
"mode": "search",
"M": 1.0,
"u": 0.0001,
"n": n_value,
"accepted": True,
"threshold_T": threshold,
"baseline_extinctions": 0,
"check_1_02_extinctions": 0,
"check_1_05_extinctions": 0,
"check_1_10_extinctions": 0,
},
"error": None,
}
orch.complete_job(db_path, result)
payload = orch.collate_track1_figure1(db_path)
assert payload["treatment_count"] == 1
treatment = payload["treatments"][0]
assert treatment["M"] == 1.0
assert [row["n"] for row in treatment["rows"]] == [1, 2, 3]
assert treatment["fit"] is not None
assert treatment["fit"]["points_used"] == 3
def test_expand_track1_figure1_manifest_splits_by_locus():
config = {
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 0.0001,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6],
"loci_values": [1, 3, 5],
"seed": 10,
}
manifests = orch.expand_track1_figure1_manifest(
base_job_id_prefix="fig1-m10",
config=config,
priority=7,
created_by="test",
)
assert [manifest["job_id"] for manifest in manifests] == [
"fig1-m10-n1",
"fig1-m10-n3",
"fig1-m10-n5",
]
assert all(manifest["job_kind"] == "track1_locus_threshold" for manifest in manifests)
assert all(manifest["config"]["mode"] == "search" for manifest in manifests)
assert [manifest["config"]["n"] for manifest in manifests] == [1, 3, 5]
assert all("loci_values" not in manifest["config"] for manifest in manifests)
assert all(manifest["project"] == "renunney" for manifest in manifests)
def test_submit_track1_figure1_jobs_registers_expanded_jobs(tmp_path: Path):
db_path = tmp_path / "jobs.sqlite"
orch.initialize_registry(db_path)
config = {
"mode": "loci_regression",
"K": 5000,
"N0": 5000,
"n": 1,
"u": 0.0001,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4],
"loci_values": [2, 4],
"seed": 10,
}
job_ids = orch.submit_track1_figure1_jobs(
db_path=db_path,
base_job_id_prefix="fig1-m10",
config=config,
)
assert job_ids == ["fig1-m10-n2", "fig1-m10-n4"]
jobs = orch.list_jobs(db_path)
assert len(jobs) == 2
assert {job["job_id"] for job in jobs} == set(job_ids)
def test_run_worker_loop_processes_until_queue_empty(tmp_path: Path):
db_path = tmp_path / "jobs.sqlite"
result_root = tmp_path / "results"
scratch_root = tmp_path / "scratch"
orch.initialize_registry(db_path)
for idx in range(2):
manifest = {
"job_id": f"job-sim-{idx}",
"project": "renunney",
"track": "track1",
"job_kind": "track1_simulate",
"priority": 1,
"created_at": "2026-04-10T00:00:00Z",
"created_by": "test",
"worker_backend": "python-track1",
"config": {
"mode": "simulate",
"K": 500,
"N0": 500,
"n": 1,
"u": 0.001,
"R": 10.0,
"T": 10,
"epochs": 1,
"seed": idx + 1,
},
"resources": {"cpu_cores": 1},
"result_paths": {
"payload_json": f"payloads/job-sim-{idx}.json",
"log_txt": f"logs/job-sim-{idx}.log",
},
"retry": {"max_attempts": 1, "idempotent": True},
"notes": "",
}
orch.submit_job_manifest(db_path, manifest)
payload = orch.run_worker_loop(
db_path=db_path,
result_root=result_root,
worker_backend="python-track1",
worker_host="worker-a",
scratch_root=scratch_root,
cwd=ROOT,
)
assert payload["attempted_jobs"] == 2
assert payload["succeeded_jobs"] == 2
assert payload["failed_jobs"] == 0
assert payload["stopped_because"] == "queue_empty"
jobs = orch.list_jobs(db_path)
assert all(job["status"] == "succeeded" for job in jobs)

View File

@ -0,0 +1,179 @@
import sys
from dataclasses import dataclass
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_analysis as analysis
import renunney.track1_api as api
from track1_reference import GenerationSummary, Track1Parameters
def test_fit_linear_cost_by_loci_recovers_line():
rows = [
analysis.LocusThresholdRow(n=1, threshold_T=12.0, accepted=True),
analysis.LocusThresholdRow(n=2, threshold_T=14.0, accepted=True),
analysis.LocusThresholdRow(n=3, threshold_T=16.0, accepted=True),
]
fit = analysis.fit_linear_cost_by_loci(rows)
assert fit is not None
assert abs(fit.intercept_c0 - 10.0) < 1e-9
assert abs(fit.slope_c1 - 2.0) < 1e-9
def test_sweep_number_of_loci_uses_search_results(monkeypatch):
params = Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
class Dummy:
def __init__(self, threshold_T):
self.threshold_T = threshold_T
def fake_search(params, candidate_T_values, runs=20, seed_start=0, cache_path=None, jobs=1):
return Dummy(threshold_T=10.0 + params.n)
monkeypatch.setattr(analysis, "search_threshold_over_candidates", fake_search)
sweep = analysis.sweep_number_of_loci(params, [1, 2, 3], [10, 20, 30], runs=2, seed_start=1, jobs=3)
assert [row.threshold_T for row in sweep.rows] == [11.0, 12.0, 13.0]
assert sweep.fit is not None
def test_run_config_loci_regression_mode(monkeypatch):
@dataclass(frozen=True)
class DummyFit:
intercept_c0: float = 5.0
slope_c1: float = 2.0
r_squared: float = 1.0
points_used: int = 3
class DummySweep:
rows = [
analysis.LocusThresholdRow(n=1, threshold_T=7.0, accepted=True),
analysis.LocusThresholdRow(n=2, threshold_T=9.0, accepted=True),
analysis.LocusThresholdRow(n=3, threshold_T=11.0, accepted=True),
]
fit = DummyFit()
monkeypatch.setattr(api, "sweep_number_of_loci", lambda *args, **kwargs: DummySweep())
config = api.Track1RunConfig(
mode="loci_regression",
loci_values=[1, 2, 3],
t_start=10,
t_stop=30,
t_step=10,
runs=2,
)
payload = api.run_config(config)
assert payload["mode"] == "loci_regression"
assert payload["loci_values"] == [1, 2, 3]
assert payload["fit"]["slope_c1"] == 2.0
def test_summarize_tracking_detects_post_initial_nonzero_alleles():
summaries = [
GenerationSummary(
t=-2,
N=10,
female_fraction=0.5,
male_count=5,
female_count=5,
fecundity=1.0,
mean_fitness=1.0,
mean_expected_female_productivity=1.0,
target_value=-0.1,
mean_allele_value=0.0,
mean_genotype_value=0.0,
mean_tracking_gap=0.1,
paper_M=0.05,
expected_mutations_current_N=0.0001,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=5.0,
extinct=False,
),
GenerationSummary(
t=-1,
N=10,
female_fraction=0.5,
male_count=5,
female_count=5,
fecundity=1.0,
mean_fitness=1.0,
mean_expected_female_productivity=1.0,
target_value=-0.05,
mean_allele_value=0.2,
mean_genotype_value=0.2,
mean_tracking_gap=0.25,
paper_M=0.05,
expected_mutations_current_N=0.0001,
realized_mutation_count=1,
realized_mutation_rate_per_allele=0.05,
birth_count=2,
surviving_offspring_count=1,
ne_approx=5.0,
extinct=False,
),
]
tracking = analysis.summarize_tracking(summaries)
assert tracking.extinction_occurred is False
assert tracking.first_extinction_t is None
assert tracking.first_nonzero_allele_t == -1
assert tracking.last_nonzero_allele_t == -1
assert tracking.stayed_zero_after_initialization is False
def test_summarize_tracking_detects_extinction_time():
summaries = [
GenerationSummary(
t=0,
N=10,
female_fraction=0.5,
male_count=5,
female_count=5,
fecundity=1.0,
mean_fitness=1.0,
mean_expected_female_productivity=1.0,
target_value=0.0,
mean_allele_value=0.0,
mean_genotype_value=0.0,
mean_tracking_gap=0.0,
paper_M=0.05,
expected_mutations_current_N=0.0001,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=5.0,
extinct=False,
),
GenerationSummary(
t=1,
N=0,
female_fraction=0.0,
male_count=0,
female_count=0,
fecundity=0.0,
mean_fitness=0.0,
mean_expected_female_productivity=0.0,
target_value=0.1,
mean_allele_value=0.0,
mean_genotype_value=0.0,
mean_tracking_gap=-0.1,
paper_M=0.05,
expected_mutations_current_N=0.0,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=0.0,
extinct=True,
),
]
tracking = analysis.summarize_tracking(summaries)
assert tracking.extinction_occurred is True
assert tracking.first_extinction_t == 1

40
tests/test_track1_api.py Normal file
View File

@ -0,0 +1,40 @@
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_api as api
def test_run_config_simulate_mode_returns_contract():
config = api.Track1RunConfig(mode="simulate", K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20, seed=1)
payload = api.run_config(config)
assert payload["mode"] == "simulate"
assert payload["parameters"]["u"] == 5e-6
assert payload["parameters"]["M"] == 0.05
assert "final_summary" in payload
assert "tracking_summary" in payload
def test_config_roundtrip_from_mapping_and_file(tmp_path: Path):
raw = {
"mode": "search",
"K": 5000,
"N0": 20,
"n": 1,
"u": 5e-6,
"R": 10.0,
"T": 20,
"runs": 1,
"jobs": 1,
"t_values": [5, 10],
}
cfg = api.config_from_mapping(raw)
path = tmp_path / "track1.json"
path.write_text(json.dumps(raw), encoding="utf-8")
loaded = api.load_config(path)
assert loaded == cfg

View File

@ -0,0 +1,56 @@
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_api as api
import renunney.track1_dataset as ds
import renunney.track1_reference as ref
def test_generate_extinction_dataset_writes_expected_files(tmp_path: Path):
params = ref.Track1Parameters(K=500, N0=20, n=1, u=0.001, R=10.0, T=10, epochs=1)
dataset_dir = tmp_path / "dataset"
payload = ds.generate_extinction_dataset(
params=params,
runs=1,
seed_start=1,
dataset_dir=dataset_dir,
grid={"N0": [20, 500], "u": [0.001, 0.005]},
)
assert payload["treatment_count"] == 4
assert payload["run_row_count"] == 4
assert Path(payload["generation_rows_path"]).exists()
assert Path(payload["run_rows_path"]).exists()
assert Path(payload["treatments_path"]).exists()
metadata = json.loads((dataset_dir / "metadata.json").read_text(encoding="utf-8"))
assert metadata["treatment_count"] == 4
def test_run_config_extinction_dataset_mode(tmp_path: Path):
dataset_dir = tmp_path / "dataset"
config = api.Track1RunConfig(
mode="extinction_dataset",
K=500,
N0=20,
n=1,
u=0.001,
R=10.0,
T=10,
epochs=1,
runs=1,
seed=1,
dataset_dir=str(dataset_dir),
grid={"u": [0.001, 0.005]},
)
payload = api.run_config(config)
assert payload["mode"] == "extinction_dataset"
assert payload["parameters"]["u"] == 0.001
assert payload["parameters"]["M"] == 1.0
assert payload["treatment_count"] == 2
assert (dataset_dir / "run_rows.jsonl").exists()
assert (dataset_dir / "generation_rows.jsonl").exists()

View File

@ -0,0 +1,95 @@
import sys
from pathlib import Path
import numpy as np
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_reference as ref
def test_zero_mutation_preserves_zero_alleles_in_one_generation():
params = ref.Track1Parameters(K=5000, N0=2, n=2, u=0.0, R=10.0, T=20)
state = ref.PopulationState(
genomes=np.zeros((2, 2, 2), dtype=np.int16),
sexes=np.array([0, 1], dtype=np.int8),
)
next_state, summary = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(1))
assert summary.mean_allele_value == 0.0
assert int(next_state.genomes.sum()) == 0
def test_zero_offspring_fitness_prevents_survival(monkeypatch):
params = ref.Track1Parameters(K=5000, N0=2, n=1, u=0.0, R=10.0, T=20)
state = ref.PopulationState(
genomes=np.zeros((2, 2, 1), dtype=np.int16),
sexes=np.array([0, 1], dtype=np.int8),
)
def fake_fitness(genomes, r, T, t):
if genomes.shape[0] == 1:
return np.zeros(1, dtype=float)
return np.ones(genomes.shape[0], dtype=float)
monkeypatch.setattr(ref, "genotype_fitness", fake_fitness)
monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0)
monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0], dtype=np.int32))
next_state, _ = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(2))
assert next_state.size == 0
def test_unit_offspring_fitness_keeps_all_births(monkeypatch):
params = ref.Track1Parameters(K=5000, N0=2, n=1, u=0.0, R=10.0, T=20)
state = ref.PopulationState(
genomes=np.zeros((2, 2, 1), dtype=np.int16),
sexes=np.array([0, 1], dtype=np.int8),
)
monkeypatch.setattr(ref, "genotype_fitness", lambda genomes, r, T, t: np.ones(genomes.shape[0], dtype=float))
monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0)
monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0], dtype=np.int32))
next_state, summary = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(2))
assert summary.female_count == 1
assert next_state.size == 3
def test_one_father_is_used_per_female_reproductive_event(monkeypatch):
params = ref.Track1Parameters(K=5000, N0=3, n=2, u=0.0, R=10.0, T=20)
state = ref.PopulationState(
genomes=np.array(
[
[[0, 0], [0, 0]],
[[1, 1], [1, 1]],
[[2, 2], [2, 2]],
],
dtype=np.int16,
),
sexes=np.array([0, 1, 1], dtype=np.int8),
)
monkeypatch.setattr(ref, "genotype_fitness", lambda genomes, r, T, t: np.ones(genomes.shape[0], dtype=float))
monkeypatch.setattr(ref, "female_fecundity", lambda r, N, K: 4.0)
monkeypatch.setattr(ref, "realize_birth_counts", lambda fecundity, sexes, rng: np.array([3, 0, 0], dtype=np.int32))
next_state, _ = ref.simulate_one_generation(state, params, t=0, rng=np.random.default_rng(3))
paternal_alleles = next_state.genomes[:, 1, :]
assert next_state.size == 3
assert np.unique(paternal_alleles).size == 1
def test_absence_of_males_or_females_counts_as_extinction():
female_only = ref.PopulationState(
genomes=np.zeros((2, 2, 1), dtype=np.int16),
sexes=np.array([0, 0], dtype=np.int8),
)
male_only = ref.PopulationState(
genomes=np.zeros((2, 2, 1), dtype=np.int16),
sexes=np.array([1, 1], dtype=np.int8),
)
assert ref.is_extinct(female_only) is True
assert ref.is_extinct(male_only) is True

View File

@ -0,0 +1,95 @@
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_extinction as ext
import renunney.track1_reference as ref
def _sample_summaries():
return [
ref.GenerationSummary(
t=0,
N=10,
female_fraction=0.5,
male_count=5,
female_count=5,
fecundity=3.0,
mean_fitness=0.8,
mean_expected_female_productivity=2.4,
target_value=0.0,
mean_allele_value=0.2,
mean_genotype_value=0.2,
mean_tracking_gap=0.2,
paper_M=0.05,
expected_mutations_current_N=0.1,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=6,
surviving_offspring_count=4,
ne_approx=5.0,
extinct=False,
),
ref.GenerationSummary(
t=1,
N=0,
female_fraction=0.0,
male_count=0,
female_count=0,
fecundity=0.0,
mean_fitness=0.0,
mean_expected_female_productivity=0.0,
target_value=0.1,
mean_allele_value=0.0,
mean_genotype_value=0.0,
mean_tracking_gap=-0.1,
paper_M=0.05,
expected_mutations_current_N=0.0,
realized_mutation_count=0,
realized_mutation_rate_per_allele=0.0,
birth_count=0,
surviving_offspring_count=0,
ne_approx=0.0,
extinct=True,
),
]
def test_build_extinction_generation_rows_derives_expected_fields():
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
rows = ext.build_extinction_generation_rows(params, _sample_summaries(), seed=7)
assert len(rows) == 2
assert rows[0].M == 0.05
assert rows[0].survival_fraction == 4 / 6
assert rows[0].replacement_deficit == 0.0
assert rows[0].mutation_shortfall == 0.1
assert rows[0].extinction_next_step is True
assert rows[1].extinction_occurred is True
def test_build_extinction_run_row_summarizes_extinction_and_mutation_streaks():
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
row = ext.build_extinction_run_row(params, _sample_summaries(), seed=7)
assert row.extinction_occurred is True
assert row.first_extinction_t == 1
assert row.first_productivity_below_replacement_t == 1
assert row.longest_zero_mutation_streak == 2
assert row.cumulative_expected_mutations == 0.1
assert row.cumulative_realized_mutations == 0
def test_save_jsonl_writes_row_data(tmp_path: Path):
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
row = ext.build_extinction_run_row(params, _sample_summaries(), seed=7)
path = tmp_path / "runs.jsonl"
ext.save_jsonl([row], path)
lines = path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
parsed = json.loads(lines[0])
assert parsed["seed"] == 7
assert parsed["extinction_occurred"] is True

200
tests/test_track1_fit.py Normal file
View File

@ -0,0 +1,200 @@
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_api as api
import renunney.track1_fit as fit
def _synthetic_run_rows():
return [
{
"seed": 1,
"K": 500,
"N0": 500,
"n": 1,
"u": 0.005,
"M": 5.0,
"R": 10.0,
"T": 20,
"epochs": 2,
"p": 0.5,
"generations_recorded": 25,
"extinction_occurred": False,
"first_extinction_t": None,
"final_extinct": False,
"final_N": 300,
"min_N": 200,
"max_N": 520,
"mean_N": 360.0,
"final_mean_allele_value": 1.8,
"final_target_value": 1.9,
"final_tracking_gap": -0.1,
"mean_abs_tracking_gap": 0.2,
"max_abs_tracking_gap": 0.4,
"first_nonzero_allele_t": -3,
"last_nonzero_allele_t": 19,
"stayed_zero_after_initialization": False,
"first_productivity_below_replacement_t": None,
"fraction_generations_below_replacement": 0.1,
"longest_zero_mutation_streak": 0,
"cumulative_expected_mutations": 90.0,
"cumulative_realized_mutations": 110,
"cumulative_mutation_shortfall": 6.0,
},
{
"seed": 2,
"K": 500,
"N0": 500,
"n": 1,
"u": 0.001,
"M": 1.0,
"R": 10.0,
"T": 10,
"epochs": 2,
"p": 0.5,
"generations_recorded": 25,
"extinction_occurred": True,
"first_extinction_t": 15,
"final_extinct": True,
"final_N": 0,
"min_N": 0,
"max_N": 500,
"mean_N": 140.0,
"final_mean_allele_value": 0.2,
"final_target_value": 1.9,
"final_tracking_gap": -1.7,
"mean_abs_tracking_gap": 1.1,
"max_abs_tracking_gap": 1.8,
"first_nonzero_allele_t": 2,
"last_nonzero_allele_t": 8,
"stayed_zero_after_initialization": False,
"first_productivity_below_replacement_t": -1,
"fraction_generations_below_replacement": 0.9,
"longest_zero_mutation_streak": 7,
"cumulative_expected_mutations": 12.0,
"cumulative_realized_mutations": 3,
"cumulative_mutation_shortfall": 9.0,
},
{
"seed": 3,
"K": 500,
"N0": 20,
"n": 1,
"u": 0.001,
"M": 1.0,
"R": 10.0,
"T": 10,
"epochs": 2,
"p": 0.5,
"generations_recorded": 25,
"extinction_occurred": True,
"first_extinction_t": 12,
"final_extinct": True,
"final_N": 0,
"min_N": 0,
"max_N": 200,
"mean_N": 70.0,
"final_mean_allele_value": 0.0,
"final_target_value": 1.9,
"final_tracking_gap": -1.9,
"mean_abs_tracking_gap": 1.3,
"max_abs_tracking_gap": 2.0,
"first_nonzero_allele_t": None,
"last_nonzero_allele_t": None,
"stayed_zero_after_initialization": True,
"first_productivity_below_replacement_t": -3,
"fraction_generations_below_replacement": 1.0,
"longest_zero_mutation_streak": 12,
"cumulative_expected_mutations": 8.0,
"cumulative_realized_mutations": 1,
"cumulative_mutation_shortfall": 7.0,
},
{
"seed": 4,
"K": 500,
"N0": 500,
"n": 1,
"u": 0.005,
"M": 5.0,
"R": 10.0,
"T": 10,
"epochs": 2,
"p": 0.5,
"generations_recorded": 25,
"extinction_occurred": False,
"first_extinction_t": None,
"final_extinct": False,
"final_N": 380,
"min_N": 180,
"max_N": 520,
"mean_N": 350.0,
"final_mean_allele_value": 1.7,
"final_target_value": 1.9,
"final_tracking_gap": -0.2,
"mean_abs_tracking_gap": 0.25,
"max_abs_tracking_gap": 0.6,
"first_nonzero_allele_t": -2,
"last_nonzero_allele_t": 19,
"stayed_zero_after_initialization": False,
"first_productivity_below_replacement_t": 1,
"fraction_generations_below_replacement": 0.2,
"longest_zero_mutation_streak": 1,
"cumulative_expected_mutations": 80.0,
"cumulative_realized_mutations": 90,
"cumulative_mutation_shortfall": 10.0,
},
]
def test_fit_extinction_run_model_returns_model_and_summary():
model, summary = fit.fit_extinction_run_model(_synthetic_run_rows())
assert model.converged is True
assert model.feature_names == list(fit.DEFAULT_RUN_FEATURES)
assert len(model.feature_means) == len(model.feature_names)
assert len(model.feature_scales) == len(model.feature_names)
assert len(model.coefficients) == len(model.feature_names)
assert summary.sample_count == 4
assert summary.extinction_count == 2
assert 0.0 <= summary.brier_score <= 1.0
def test_fit_payload_from_jsonl_and_api_mode(tmp_path: Path):
path = tmp_path / "run_rows.jsonl"
with path.open("w", encoding="utf-8") as handle:
for row in _synthetic_run_rows():
handle.write(json.dumps(row, sort_keys=True) + "\n")
payload = fit.fit_payload_from_jsonl(path)
assert payload["summary"]["sample_count"] == 4
assert payload["model"]["converged"] is True
config = api.Track1RunConfig(mode="extinction_fit", run_rows_path=str(path))
api_payload = api.run_config(config)
assert api_payload["mode"] == "extinction_fit"
assert api_payload["fit_status"] == "ok"
assert api_payload["summary"]["extinction_count"] == 2
def test_api_extinction_fit_reports_insufficient_outcome_variation(tmp_path: Path):
path = tmp_path / "run_rows.jsonl"
only_survivors = _synthetic_run_rows()[:1] + [_synthetic_run_rows()[3]]
with path.open("w", encoding="utf-8") as handle:
for row in only_survivors:
row = dict(row)
row["extinction_occurred"] = False
row["final_extinct"] = False
row["first_extinction_t"] = None
handle.write(json.dumps(row, sort_keys=True) + "\n")
config = api.Track1RunConfig(mode="extinction_fit", run_rows_path=str(path))
payload = api.run_config(config)
assert payload["mode"] == "extinction_fit"
assert payload["fit_status"] == "insufficient_outcome_variation"
assert payload["model"] is None
assert payload["summary"]["sample_count"] == 2

View File

@ -0,0 +1,59 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_reference as ref
def test_fecundity_matches_r_definition():
params = ref.Track1Parameters(K=5000, N0=50, n=1, u=5e-6, R=10.0, T=40)
r = params.intrinsic_growth_rate()
assert ref.female_fecundity(r, N=0, K=params.K) == pytest_approx(10.0)
def test_initialize_population_shapes_and_allele_zero_fixation():
params = ref.Track1Parameters(K=5000, N0=12, n=3, u=5e-6, R=10.0, T=40)
state = ref.initialize_population(params, ref.np.random.default_rng(1))
assert state.genomes.shape == (12, 2, 3)
assert state.sexes.shape == (12,)
assert int(state.genomes.sum()) == 0
def test_simulate_run_returns_generation_summaries():
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
out = ref.simulate_run(params, seed=2)
assert out
assert out[0].N == 20
assert out[0].t == -(params.T // 2)
def test_generation_summary_reports_tracking_fields_for_initial_population():
params = ref.Track1Parameters(K=5000, N0=20, n=2, u=5e-6, R=10.0, T=20)
out = ref.simulate_run(params, seed=2)
first = out[0]
assert first.mean_allele_value == pytest_approx(0.0)
assert first.mean_genotype_value == pytest_approx(0.0)
assert first.target_value == pytest_approx(-0.5)
assert first.mean_tracking_gap == pytest_approx(0.5)
assert first.paper_M == pytest_approx(0.05)
assert first.expected_mutations_current_N == pytest_approx(0.0004)
def test_generation_summary_reports_realized_mutations_for_high_u():
params = ref.Track1Parameters(K=5000, N0=20, n=1, u=0.5, R=10.0, T=20)
out = ref.simulate_run(params, seed=2)
first = out[0]
assert first.realized_mutation_count >= 0
assert 0.0 <= first.realized_mutation_rate_per_allele <= 1.0
def pytest_approx(value: float, tol: float = 1e-9):
class Approx:
def __eq__(self, other):
return abs(other - value) <= tol
return Approx()

View File

@ -0,0 +1,44 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_api as api
def test_run_config_report_mode_writes_report_bundle(tmp_path: Path):
report_dir = tmp_path / "report"
config = api.Track1RunConfig(
mode="report",
K=5000,
N0=20,
n=1,
u=5e-6,
R=10.0,
T=20,
runs=2,
seed=1,
report_dir=str(report_dir),
)
payload = api.run_config(config)
assert payload["mode"] == "report"
report_path = Path(payload["report_path"])
assert report_path.exists()
report_text = report_path.read_text(encoding="utf-8")
assert "- `K`: `5000`" in report_text
assert "- `u`: `5e-06`" in report_text
assert "- `M`: `0.05`" in report_text
assert "Extinction Occurred" in report_text
assert "First Extinction t" in report_text
assert (report_dir / "aggregate_series.json").exists()
assert (report_dir / "tracking_summary.json").exists()
assert (report_dir / "figure_fecundity.png").exists()
assert (report_dir / "figure_fitness.png").exists()
assert (report_dir / "figure_expected_productivity.png").exists()
assert (report_dir / "figure_realized_M.png").exists()
assert (report_dir / "figure_survival_fraction.png").exists()
assert (report_dir / "figure_fecundity_excess.png").exists()
assert (report_dir / "figure_mean_allele_vs_target.png").exists()

View File

@ -0,0 +1,82 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_threshold as thr
from track1_reference import Track1Parameters
def test_published_threshold_accepts_when_all_checks_survive():
result = thr.ThresholdSearchResult(
threshold_T=100.0,
baseline_check=thr.ThresholdCheck(T=100.0, runs=20, extinctions=0),
check_1_02=thr.ThresholdCheck(T=102.0, runs=20, extinctions=0),
check_1_05=thr.ThresholdCheck(T=105.0, runs=20, extinctions=0),
check_1_10=thr.ThresholdCheck(T=110.0, runs=20, extinctions=0),
retest_check=None,
)
assert thr.published_threshold_accepts(result) is True
def test_published_threshold_accepts_single_failure_if_retest_survives():
result = thr.ThresholdSearchResult(
threshold_T=100.0,
baseline_check=thr.ThresholdCheck(T=100.0, runs=20, extinctions=0),
check_1_02=thr.ThresholdCheck(T=102.0, runs=20, extinctions=1),
check_1_05=thr.ThresholdCheck(T=105.0, runs=20, extinctions=0),
check_1_10=thr.ThresholdCheck(T=110.0, runs=20, extinctions=0),
retest_check=thr.ThresholdCheck(T=102.0, runs=20, extinctions=0),
)
assert thr.published_threshold_accepts(result) is True
def test_published_threshold_rejects_multiple_failures():
result = thr.ThresholdSearchResult(
threshold_T=100.0,
baseline_check=thr.ThresholdCheck(T=100.0, runs=20, extinctions=0),
check_1_02=thr.ThresholdCheck(T=102.0, runs=20, extinctions=1),
check_1_05=thr.ThresholdCheck(T=105.0, runs=20, extinctions=1),
check_1_10=thr.ThresholdCheck(T=110.0, runs=20, extinctions=0),
retest_check=None,
)
assert thr.published_threshold_accepts(result) is False
def test_search_threshold_over_candidates_uses_first_accepted(monkeypatch):
params = Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
def fake_eval(params, T_value, runs=20, seed_start=0, cache_path=None, jobs=1):
if T_value == 60:
return thr.ThresholdSearchResult(
threshold_T=60.0,
baseline_check=thr.ThresholdCheck(T=60.0, runs=runs, extinctions=1),
check_1_02=thr.ThresholdCheck(T=61.2, runs=runs, extinctions=0),
check_1_05=thr.ThresholdCheck(T=63.0, runs=runs, extinctions=0),
check_1_10=thr.ThresholdCheck(T=66.0, runs=runs, extinctions=0),
retest_check=None,
)
return thr.ThresholdSearchResult(
threshold_T=float(T_value),
baseline_check=thr.ThresholdCheck(T=float(T_value), runs=runs, extinctions=0),
check_1_02=thr.ThresholdCheck(T=1.02 * T_value, runs=runs, extinctions=0),
check_1_05=thr.ThresholdCheck(T=1.05 * T_value, runs=runs, extinctions=0),
check_1_10=thr.ThresholdCheck(T=1.10 * T_value, runs=runs, extinctions=0),
retest_check=None,
)
monkeypatch.setattr(thr, "evaluate_threshold_candidate", fake_eval)
result = thr.search_threshold_over_candidates(params, [60, 80, 100], runs=3, seed_start=10)
assert result is not None
assert result.threshold_T == 80.0
def test_run_extinction_check_parallel_matches_serial():
params = Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
serial = thr.run_extinction_check(params, T_value=20.0, runs=2, seed_start=5, jobs=1)
parallel = thr.run_extinction_check(params, T_value=20.0, runs=2, seed_start=5, jobs=2)
assert parallel == serial