Migrate Track 1 threshold layer into renunney

This commit is contained in:
Codex 2026-04-11 06:31:39 -04:00
parent acbb90f452
commit 83350e68c8
8 changed files with 375 additions and 11 deletions

View File

@ -23,6 +23,7 @@ plane and the Track 1 runner/API boundary are now local to `renunney`.
- local paper-scale Figure 1 submission configs,
- a local Track 1 runner and config/API layer,
- a local Track 1 analysis layer for tracking summaries and loci-regression,
- a local Track 1 threshold/search layer for Nunney-style threshold checks,
- a Makefile for common tasks,
- migration notes for pulling code into this repo in stages.
@ -84,6 +85,7 @@ The current state is split:
- orchestration control plane: local to `renunney`
- Track 1 runner and config/API layer: local to `renunney`
- Track 1 analysis layer: local to `renunney`
- Track 1 threshold/search layer: local to `renunney`
- Track 1 simulation backend: still in the older `cost_of_substitution`
directory and imported through the local compatibility layer

View File

@ -29,17 +29,19 @@ Operational code still lives in:
- `src/renunney/track1_api.py`
4. Track 1 analysis boundary has been migrated locally:
- `src/renunney/track1_analysis.py`
5. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable.
6. Migrate the Track 1 simulation core after the runner path is stable:
5. Track 1 threshold/search boundary has been migrated locally:
- `src/renunney/track1_threshold.py`
6. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable.
7. Migrate the Track 1 simulation core after the runner path is stable:
- `python/track1_reference.py`
- `python/track1_threshold.py`
- `python/track1_analysis.py`
7. Migrate report, dataset, fit, and orchestration-adjacent Track 1 modules after the kernel boundary is stable:
8. Migrate report, dataset, fit, and orchestration-adjacent Track 1 modules after the kernel boundary is stable:
- `python/track1_report.py`
- `python/track1_dataset.py`
- `python/track1_fit.py`
- `python/track1_extinction.py`
8. Migrate docs and example configs last, after path references are updated.
9. Migrate docs and example configs last, after path references are updated.
## Constraint

View File

@ -47,8 +47,8 @@ make status
## Current Assumption
The Makefile now drives the local orchestration code in `renunney`, while the
Track 1 runner/API boundary and analysis layer are also local to `renunney`.
The simulation kernel is still imported from the legacy `cost_of_substitution`
directory through the compatibility layer in `src/renunney/legacy.py`. The
paper-scale Figure 1 configs used for submission are now local to
`renunney/config`.
Track 1 runner/API boundary, analysis layer, and threshold/search layer are
also local to `renunney`. The simulation kernel is still imported from the
legacy `cost_of_substitution` directory through the compatibility layer in
`src/renunney/legacy.py`. The paper-scale Figure 1 configs used for submission
are now local to `renunney/config`.

View File

@ -28,6 +28,15 @@ from .track1_analysis import (
sweep_number_of_loci,
)
from .track1_api import Track1RunConfig, config_from_mapping, load_config, run_config, save_payload
from .track1_threshold import (
ThresholdCheck,
ThresholdSearchResult,
evaluate_threshold_candidate,
nunney_threshold_accepts,
published_threshold_accepts,
run_extinction_check,
search_threshold_over_candidates,
)
__all__ = [
"ClaimedJob",
@ -52,13 +61,20 @@ __all__ = [
"run_worker_loop",
"submit_job_manifest",
"submit_track1_figure1_jobs",
"ThresholdCheck",
"ThresholdSearchResult",
"TrackingSummary",
"Track1RunConfig",
"config_from_mapping",
"evaluate_threshold_candidate",
"fit_linear_cost_by_loci",
"load_config",
"nunney_threshold_accepts",
"published_threshold_accepts",
"run_config",
"run_extinction_check",
"save_payload",
"search_threshold_over_candidates",
"summarize_tracking",
"sweep_number_of_loci",
]

View File

@ -15,11 +15,11 @@ from typing import Iterable, Optional
import numpy as np
from .legacy import ensure_legacy_python_path
from .track1_threshold import ThresholdSearchResult, search_threshold_over_candidates
ensure_legacy_python_path()
from track1_reference import GenerationSummary, Track1Parameters
from track1_threshold import ThresholdSearchResult, search_threshold_over_candidates
@dataclass(frozen=True)

View File

@ -16,6 +16,7 @@ from typing import Any, Optional
from .legacy import ensure_legacy_python_path
from .track1_analysis import summarize_tracking, sweep_number_of_loci
from .track1_threshold import evaluate_threshold_candidate, search_threshold_over_candidates
ensure_legacy_python_path()
@ -23,7 +24,6 @@ from track1_dataset import generate_extinction_dataset
from track1_fit import class_balance, fit_payload_from_jsonl, load_jsonl
from track1_reference import Track1Parameters, simulate_run
from track1_report import generate_report_bundle
from track1_threshold import evaluate_threshold_candidate, search_threshold_over_candidates
@dataclass(frozen=True, init=False)

View File

@ -0,0 +1,262 @@
"""
track1_threshold.py
Local Track 1 threshold-search layer for renunney.
This stage keeps the simulation kernel in the legacy tree while moving the
historical threshold heuristic inward.
"""
from __future__ import annotations
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
import json
from pathlib import Path
from typing import Iterable, Optional
from .legacy import ensure_legacy_python_path
ensure_legacy_python_path()
from track1_reference import Track1Parameters, simulate_run
@dataclass(frozen=True)
class ThresholdCheck:
"""Summary for one T value under the Track 1 heuristic."""
T: float
runs: int
extinctions: int
@property
def survived_all(self) -> bool:
return self.extinctions == 0
@dataclass(frozen=True)
class ThresholdSearchResult:
"""Result of Nunney-style threshold search."""
threshold_T: float
baseline_check: ThresholdCheck
check_1_02: ThresholdCheck
check_1_05: ThresholdCheck
check_1_10: ThresholdCheck
retest_check: Optional[ThresholdCheck]
def _cache_key(params: Track1Parameters, T_value: float, runs: int, seed_start: int) -> str:
return json.dumps(
{
"K": params.K,
"N0": params.N0,
"n": params.n,
"u": params.u,
"R": params.R,
"T_value": T_value,
"epochs": params.epochs,
"p": params.p,
"a_max": params.a_max,
"runs": runs,
"seed_start": seed_start,
},
sort_keys=True,
)
def _load_cache(path: str | Path | None) -> dict:
if path is None:
return {}
cache_path = Path(path)
if not cache_path.exists():
return {}
return json.loads(cache_path.read_text(encoding="utf-8"))
def _save_cache(path: str | Path | None, cache: dict) -> None:
if path is None:
return
cache_path = Path(path)
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_text(json.dumps(cache, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _single_run_extinct(params: Track1Parameters, sim_T: int, seed: int) -> bool:
run_params = Track1Parameters(
K=params.K,
N0=params.N0,
n=params.n,
u=params.u,
R=params.R,
T=sim_T,
epochs=params.epochs,
p=params.p,
a_max=params.a_max,
)
summaries = simulate_run(run_params, seed=seed)
return bool(summaries and summaries[-1].extinct)
def run_extinction_check(
params: Track1Parameters,
T_value: float,
runs: int = 20,
seed_start: int = 0,
cache_path: str | Path | None = None,
jobs: int = 1,
) -> ThresholdCheck:
"""
Run repeated simulations at one T value.
reconstruction_choice:
T is rounded to the nearest integer generation count for the simulator.
"""
cache = _load_cache(cache_path)
key = _cache_key(params, T_value, runs, seed_start)
if key in cache:
cached = cache[key]
return ThresholdCheck(T=cached["T"], runs=cached["runs"], extinctions=cached["extinctions"])
sim_T = int(round(T_value))
seeds = [seed_start + run_index for run_index in range(runs)]
if jobs <= 1 or runs <= 1:
extinctions = sum(1 for seed in seeds if _single_run_extinct(params, sim_T, seed))
else:
worker_count = min(jobs, runs)
with ProcessPoolExecutor(max_workers=worker_count) as executor:
extinctions = sum(executor.map(_single_run_extinct, [params] * runs, [sim_T] * runs, seeds))
result = ThresholdCheck(T=T_value, runs=runs, extinctions=extinctions)
cache[key] = {"T": result.T, "runs": result.runs, "extinctions": result.extinctions}
_save_cache(cache_path, cache)
return result
def nunney_threshold_accepts(
baseline_check: ThresholdCheck,
check_1_02: ThresholdCheck,
check_1_05: ThresholdCheck,
check_1_10: ThresholdCheck,
) -> tuple[bool, int]:
"""Return acceptance and the number of failed higher checks."""
if not baseline_check.survived_all:
return False, 3
higher_checks = [check_1_02, check_1_05, check_1_10]
failures = sum(0 if check.survived_all else 1 for check in higher_checks)
return failures == 0, failures
def evaluate_threshold_candidate(
params: Track1Parameters,
T_value: float,
runs: int = 20,
seed_start: int = 0,
cache_path: str | Path | None = None,
jobs: int = 1,
) -> ThresholdSearchResult:
"""Evaluate one candidate threshold T using Nunney's published checks."""
baseline_check = run_extinction_check(
params,
T_value,
runs=runs,
seed_start=seed_start,
cache_path=cache_path,
jobs=jobs,
)
check_1_02 = run_extinction_check(
params,
1.02 * T_value,
runs=runs,
seed_start=seed_start + 1000,
cache_path=cache_path,
jobs=jobs,
)
check_1_05 = run_extinction_check(
params,
1.05 * T_value,
runs=runs,
seed_start=seed_start + 2000,
cache_path=cache_path,
jobs=jobs,
)
check_1_10 = run_extinction_check(
params,
1.10 * T_value,
runs=runs,
seed_start=seed_start + 3000,
cache_path=cache_path,
jobs=jobs,
)
_, failures = nunney_threshold_accepts(
baseline_check=baseline_check,
check_1_02=check_1_02,
check_1_05=check_1_05,
check_1_10=check_1_10,
)
retest_check: Optional[ThresholdCheck] = None
if failures == 1:
failed = [check for check in (check_1_02, check_1_05, check_1_10) if not check.survived_all][0]
retest_check = run_extinction_check(
params,
failed.T,
runs=runs,
seed_start=seed_start + 4000,
cache_path=cache_path,
jobs=jobs,
)
return ThresholdSearchResult(
threshold_T=T_value,
baseline_check=baseline_check,
check_1_02=check_1_02,
check_1_05=check_1_05,
check_1_10=check_1_10,
retest_check=retest_check,
)
def published_threshold_accepts(result: ThresholdSearchResult) -> bool:
"""True if the candidate passes Nunney's published criterion."""
if not result.baseline_check.survived_all:
return False
higher_checks = [result.check_1_02, result.check_1_05, result.check_1_10]
failures = [check for check in higher_checks if not check.survived_all]
if len(failures) == 0:
return True
if len(failures) > 1:
return False
if result.retest_check is None:
return False
return result.retest_check.survived_all
def search_threshold_over_candidates(
params: Track1Parameters,
candidate_T_values: Iterable[float],
runs: int = 20,
seed_start: int = 0,
cache_path: str | Path | None = None,
jobs: int = 1,
) -> Optional[ThresholdSearchResult]:
"""Search candidate T values from below and return the first accepted threshold."""
for index, T_value in enumerate(candidate_T_values):
result = evaluate_threshold_candidate(
params=params,
T_value=T_value,
runs=runs,
seed_start=seed_start + (index * 10000),
cache_path=cache_path,
jobs=jobs,
)
if published_threshold_accepts(result):
return result
return None

View File

@ -0,0 +1,82 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
import renunney.track1_threshold as thr
from track1_reference import Track1Parameters
def test_published_threshold_accepts_when_all_checks_survive():
result = thr.ThresholdSearchResult(
threshold_T=100.0,
baseline_check=thr.ThresholdCheck(T=100.0, runs=20, extinctions=0),
check_1_02=thr.ThresholdCheck(T=102.0, runs=20, extinctions=0),
check_1_05=thr.ThresholdCheck(T=105.0, runs=20, extinctions=0),
check_1_10=thr.ThresholdCheck(T=110.0, runs=20, extinctions=0),
retest_check=None,
)
assert thr.published_threshold_accepts(result) is True
def test_published_threshold_accepts_single_failure_if_retest_survives():
result = thr.ThresholdSearchResult(
threshold_T=100.0,
baseline_check=thr.ThresholdCheck(T=100.0, runs=20, extinctions=0),
check_1_02=thr.ThresholdCheck(T=102.0, runs=20, extinctions=1),
check_1_05=thr.ThresholdCheck(T=105.0, runs=20, extinctions=0),
check_1_10=thr.ThresholdCheck(T=110.0, runs=20, extinctions=0),
retest_check=thr.ThresholdCheck(T=102.0, runs=20, extinctions=0),
)
assert thr.published_threshold_accepts(result) is True
def test_published_threshold_rejects_multiple_failures():
result = thr.ThresholdSearchResult(
threshold_T=100.0,
baseline_check=thr.ThresholdCheck(T=100.0, runs=20, extinctions=0),
check_1_02=thr.ThresholdCheck(T=102.0, runs=20, extinctions=1),
check_1_05=thr.ThresholdCheck(T=105.0, runs=20, extinctions=1),
check_1_10=thr.ThresholdCheck(T=110.0, runs=20, extinctions=0),
retest_check=None,
)
assert thr.published_threshold_accepts(result) is False
def test_search_threshold_over_candidates_uses_first_accepted(monkeypatch):
params = Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
def fake_eval(params, T_value, runs=20, seed_start=0, cache_path=None, jobs=1):
if T_value == 60:
return thr.ThresholdSearchResult(
threshold_T=60.0,
baseline_check=thr.ThresholdCheck(T=60.0, runs=runs, extinctions=1),
check_1_02=thr.ThresholdCheck(T=61.2, runs=runs, extinctions=0),
check_1_05=thr.ThresholdCheck(T=63.0, runs=runs, extinctions=0),
check_1_10=thr.ThresholdCheck(T=66.0, runs=runs, extinctions=0),
retest_check=None,
)
return thr.ThresholdSearchResult(
threshold_T=float(T_value),
baseline_check=thr.ThresholdCheck(T=float(T_value), runs=runs, extinctions=0),
check_1_02=thr.ThresholdCheck(T=1.02 * T_value, runs=runs, extinctions=0),
check_1_05=thr.ThresholdCheck(T=1.05 * T_value, runs=runs, extinctions=0),
check_1_10=thr.ThresholdCheck(T=1.10 * T_value, runs=runs, extinctions=0),
retest_check=None,
)
monkeypatch.setattr(thr, "evaluate_threshold_candidate", fake_eval)
result = thr.search_threshold_over_candidates(params, [60, 80, 100], runs=3, seed_start=10)
assert result is not None
assert result.threshold_T == 80.0
def test_run_extinction_check_parallel_matches_serial():
params = Track1Parameters(K=5000, N0=20, n=1, u=5e-6, R=10.0, T=20)
serial = thr.run_extinction_check(params, T_value=20.0, runs=2, seed_start=5, jobs=1)
parallel = thr.run_extinction_check(params, T_value=20.0, runs=2, seed_start=5, jobs=2)
assert parallel == serial