diff --git a/Makefile b/Makefile index 8ec5b52..7ffbd04 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ PYTHON := python3 REPO_ROOT := $(abspath .) LEGACY_ROOT := $(REPO_ROOT)/../collaborations/to_ptbc/evc/cost_of_substitution -ORCH := $(LEGACY_ROOT)/python/run_orchestration.py +ORCH := $(REPO_ROOT)/scripts/run_orchestration.py DB := $(REPO_ROOT)/runs/state/cos-orch.sqlite RESULT_ROOT := $(REPO_ROOT)/runs/results @@ -20,7 +20,7 @@ FIG1_M100 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_10_0.json help: @echo "Targets:" @echo " init Create run directories and initialize the SQLite registry" - @echo " doctor Show key paths and verify legacy orchestration entrypoint" + @echo " doctor Show key paths and verify local orchestration and legacy backend paths" @echo " list-jobs List jobs in the local registry" @echo " run-one Claim and run one queued job" @echo " run-loop Run worker loop until queue empty" @@ -42,10 +42,12 @@ init: doctor: @echo "REPO_ROOT=$(REPO_ROOT)" @echo "LEGACY_ROOT=$(LEGACY_ROOT)" + @echo "ORCH=$(ORCH)" @echo "DB=$(DB)" @echo "RESULT_ROOT=$(RESULT_ROOT)" @echo "SCRATCH_ROOT=$(SCRATCH_ROOT)" test -f $(ORCH) + test -d $(LEGACY_ROOT)/python list-jobs: $(PYTHON) $(ORCH) list --db $(DB) diff --git a/README.md b/README.md index 91fb642..abcf74a 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,14 @@ This repository is the clean operational wrapper around the current work in: - [`../collaborations/to_ptbc/evc/cost_of_substitution`](/mnt/CIFS/pengolodh/Docs/Projects/collaborations/to_ptbc/evc/cost_of_substitution) -The current orchestration and Track 1 simulation code still live there. +The Track 1 simulation backend still lives there. The orchestration control +plane is now local to `renunney`. + `renunney` provides: - a clean git repo, - a stable working directory layout, +- a local orchestration CLI and library, - a Makefile for common tasks, - migration notes for pulling code into this repo in stages. @@ -63,6 +66,11 @@ make collate-figure1 ## Status -The active simulation/orchestration implementation remains in the older -`cost_of_substitution` directory for now. This repo is the clean shell for -running, organizing, and gradually migrating that work. +The current state is split: + +- orchestration control plane: local to `renunney` +- Track 1 simulation backend: still in the older `cost_of_substitution` + directory + +This repo is now the clean operational entry point while the simulation code is +migrated in later stages. diff --git a/docs/MIGRATION.md b/docs/MIGRATION.md index 1f775ce..d7be0b2 100644 --- a/docs/MIGRATION.md +++ b/docs/MIGRATION.md @@ -20,10 +20,10 @@ Operational code still lives in: ## Recommended Migration Order -1. Keep orchestration running from the legacy path until real multi-host runs are stable. -2. Migrate the orchestration modules first: - - `python/orchestration.py` - - `python/run_orchestration.py` +1. Orchestration control plane has been migrated locally: + - `src/renunney/orchestration.py` + - `scripts/run_orchestration.py` +2. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable. 3. Migrate Track 1 runner and API next: - `python/run_track1.py` - `python/track1_api.py` diff --git a/docs/WORKFLOW.md b/docs/WORKFLOW.md index 2f310d3..7abcf1b 100644 --- a/docs/WORKFLOW.md +++ b/docs/WORKFLOW.md @@ -40,5 +40,6 @@ make status ## Current Assumption -The Makefile drives the orchestration code in the legacy -`cost_of_substitution` directory until that code is migrated here. +The Makefile now drives the local orchestration code in `renunney`, while the +simulation backend is still imported from the legacy `cost_of_substitution` +directory. diff --git a/examples/orchestration_job_track1_locus_threshold.json b/examples/orchestration_job_track1_locus_threshold.json new file mode 100644 index 0000000..e8849d1 --- /dev/null +++ b/examples/orchestration_job_track1_locus_threshold.json @@ -0,0 +1,40 @@ +{ + "job_id": "track1-figure1-M1.0-n3-v1", + "project": "cost_of_substitution", + "track": "track1", + "job_kind": "track1_locus_threshold", + "priority": 10, + "created_at": "2026-04-10T21:40:00-04:00", + "created_by": "codex", + "worker_backend": "python-track1", + "config": { + "mode": "search", + "K": 5000, + "N0": 5000, + "n": 3, + "u": 0.0001, + "R": 10.0, + "T": 500, + "epochs": 8, + "p": 0.5, + "runs": 20, + "jobs": 8, + "t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500], + "seed": 10 + }, + "resources": { + "cpu_cores": 8, + "ram_gb": 8, + "runtime_class": "long", + "use_local_scratch": true + }, + "result_paths": { + "payload_json": "results/track1/figure1/M1.0/n3/payload.json", + "log_txt": "results/track1/figure1/M1.0/n3/run.log" + }, + "retry": { + "max_attempts": 2, + "idempotent": true + }, + "notes": "One Track 1 Figure 1 threshold search job for M=1.0 and n=3." +} diff --git a/examples/orchestration_result_track1_locus_threshold.json b/examples/orchestration_result_track1_locus_threshold.json new file mode 100644 index 0000000..434e808 --- /dev/null +++ b/examples/orchestration_result_track1_locus_threshold.json @@ -0,0 +1,33 @@ +{ + "job_id": "track1-figure1-M1.0-n3-v1", + "status": "succeeded", + "worker_backend": "python-track1", + "worker_host": "xeon24a", + "started_at": "2026-04-10T21:41:00-04:00", + "finished_at": "2026-04-10T21:58:30-04:00", + "wall_seconds": 1050.0, + "exit_code": 0, + "config_hash": "sha256:example", + "code_identity": { + "git_commit": "example", + "runner": "python/run_track1.py", + "python_version": "3.12" + }, + "artifacts": { + "payload_json": "results/track1/figure1/M1.0/n3/payload.json", + "log_txt": "results/track1/figure1/M1.0/n3/run.log", + "cache_summary_json": "results/track1/figure1/M1.0/n3/cache-summary.json" + }, + "summary": { + "M": 1.0, + "u": 0.0001, + "n": 3, + "threshold_T": 75.0, + "accepted": true, + "baseline_extinctions": 0, + "check_1_02_extinctions": 0, + "check_1_05_extinctions": 0, + "check_1_10_extinctions": 1 + }, + "error": null +} diff --git a/scripts/run_orchestration.py b/scripts/run_orchestration.py new file mode 100644 index 0000000..da75473 --- /dev/null +++ b/scripts/run_orchestration.py @@ -0,0 +1,168 @@ +""" +run_orchestration.py + +Small CLI for the minimal SQLite-backed orchestration layer. +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +import sys + +REPO_ROOT = Path(__file__).resolve().parents[1] +SRC_DIR = REPO_ROOT / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) + +from renunney.orchestration import ( + collate_track1_figure1, + expand_track1_figure1_manifest, + initialize_registry, + list_jobs, + run_one_job, + run_worker_loop, + submit_job_manifest, + submit_track1_figure1_jobs, +) + + +def save_payload(payload: dict, path: str | Path) -> None: + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Minimal orchestration CLI for cost-of-substitution sweeps.") + subparsers = parser.add_subparsers(dest="command", required=True) + + init_db = subparsers.add_parser("init-db") + init_db.add_argument("--db", required=True) + + submit = subparsers.add_parser("submit") + submit.add_argument("--db", required=True) + submit.add_argument("--manifest", required=True) + + submit_fig1 = subparsers.add_parser("submit-figure1") + submit_fig1.add_argument("--db", required=True) + submit_fig1.add_argument("--config", required=True) + submit_fig1.add_argument("--job-prefix", required=True) + submit_fig1.add_argument("--priority", type=int, default=0) + submit_fig1.add_argument("--created-by", default="codex") + submit_fig1.add_argument("--result-prefix", default="results/track1/figure1") + + list_cmd = subparsers.add_parser("list") + list_cmd.add_argument("--db", required=True) + + run_one = subparsers.add_parser("run-one") + run_one.add_argument("--db", required=True) + run_one.add_argument("--result-root", required=True) + run_one.add_argument("--worker-backend", default="python-track1") + run_one.add_argument("--worker-host", default=None) + run_one.add_argument("--scratch-root", default=None) + + run_loop = subparsers.add_parser("run-loop") + run_loop.add_argument("--db", required=True) + run_loop.add_argument("--result-root", required=True) + run_loop.add_argument("--worker-backend", default="python-track1") + run_loop.add_argument("--worker-host", default=None) + run_loop.add_argument("--scratch-root", default=None) + run_loop.add_argument("--max-jobs", type=int, default=None) + + collate = subparsers.add_parser("collate-figure1") + collate.add_argument("--db", required=True) + collate.add_argument("--output", default=None) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + if args.command == "init-db": + initialize_registry(args.db) + print(json.dumps({"db": str(Path(args.db)), "status": "initialized"}, indent=2, sort_keys=True)) + return 0 + + if args.command == "submit": + manifest = json.loads(Path(args.manifest).read_text(encoding="utf-8")) + job_id = submit_job_manifest(args.db, manifest) + print(json.dumps({"db": str(Path(args.db)), "job_id": job_id, "status": "submitted"}, indent=2, sort_keys=True)) + return 0 + + if args.command == "submit-figure1": + config = json.loads(Path(args.config).read_text(encoding="utf-8")) + manifests = expand_track1_figure1_manifest( + base_job_id_prefix=args.job_prefix, + config=config, + priority=args.priority, + created_by=args.created_by, + result_prefix=args.result_prefix, + ) + job_ids = submit_track1_figure1_jobs( + db_path=args.db, + base_job_id_prefix=args.job_prefix, + config=config, + priority=args.priority, + created_by=args.created_by, + result_prefix=args.result_prefix, + ) + print( + json.dumps( + { + "db": str(Path(args.db)), + "job_count": len(job_ids), + "job_ids": job_ids, + "status": "submitted", + "preview": manifests, + }, + indent=2, + sort_keys=True, + ) + ) + return 0 + + if args.command == "list": + print(json.dumps(list_jobs(args.db), indent=2, sort_keys=True)) + return 0 + + if args.command == "run-one": + result = run_one_job( + db_path=args.db, + result_root=args.result_root, + worker_backend=args.worker_backend, + worker_host=args.worker_host, + scratch_root=args.scratch_root, + cwd=REPO_ROOT, + ) + print(json.dumps(result, indent=2, sort_keys=True)) + return 0 if result is None or result["status"] == "succeeded" else 1 + + if args.command == "run-loop": + payload = run_worker_loop( + db_path=args.db, + result_root=args.result_root, + worker_backend=args.worker_backend, + worker_host=args.worker_host, + scratch_root=args.scratch_root, + cwd=REPO_ROOT, + max_jobs=args.max_jobs, + ) + print(json.dumps(payload, indent=2, sort_keys=True)) + return 0 if payload["failed_jobs"] == 0 else 1 + + if args.command == "collate-figure1": + payload = collate_track1_figure1(args.db) + if args.output: + save_payload(payload, args.output) + print(json.dumps(payload, indent=2, sort_keys=True)) + return 0 + + raise ValueError(args.command) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/renunney/__init__.py b/src/renunney/__init__.py index 62671d6..e835411 100644 --- a/src/renunney/__init__.py +++ b/src/renunney/__init__.py @@ -1 +1,42 @@ -"""renunney package placeholder.""" +"""renunney package.""" + +from .legacy import ensure_legacy_python_path, legacy_python_dir, legacy_root, repo_root +from .orchestration import ( + ClaimedJob, + claim_next_job, + collate_track1_figure1, + complete_job, + execute_job_manifest, + expand_track1_figure1_manifest, + hash_config, + initialize_registry, + list_job_results, + list_jobs, + load_job_manifest, + run_one_job, + run_worker_loop, + submit_job_manifest, + submit_track1_figure1_jobs, +) + +__all__ = [ + "ClaimedJob", + "claim_next_job", + "collate_track1_figure1", + "complete_job", + "ensure_legacy_python_path", + "execute_job_manifest", + "expand_track1_figure1_manifest", + "hash_config", + "initialize_registry", + "legacy_python_dir", + "legacy_root", + "list_job_results", + "list_jobs", + "load_job_manifest", + "repo_root", + "run_one_job", + "run_worker_loop", + "submit_job_manifest", + "submit_track1_figure1_jobs", +] diff --git a/src/renunney/legacy.py b/src/renunney/legacy.py new file mode 100644 index 0000000..5c683c0 --- /dev/null +++ b/src/renunney/legacy.py @@ -0,0 +1,25 @@ +"""Legacy path helpers for staged migration.""" + +from __future__ import annotations + +import sys +from pathlib import Path + + +def repo_root() -> Path: + return Path(__file__).resolve().parents[2] + + +def legacy_root() -> Path: + return repo_root().parent / "collaborations" / "to_ptbc" / "evc" / "cost_of_substitution" + + +def legacy_python_dir() -> Path: + return legacy_root() / "python" + + +def ensure_legacy_python_path() -> Path: + path = legacy_python_dir() + if str(path) not in sys.path: + sys.path.insert(0, str(path)) + return path diff --git a/src/renunney/orchestration.py b/src/renunney/orchestration.py new file mode 100644 index 0000000..7515deb --- /dev/null +++ b/src/renunney/orchestration.py @@ -0,0 +1,602 @@ +""" +orchestration.py + +Minimal SQLite-backed orchestration support for LAN-distributed sweep runs. +""" + +from __future__ import annotations + +import json +import platform +import sqlite3 +import subprocess +import time +from copy import deepcopy +from dataclasses import asdict, dataclass +from hashlib import sha256 +from pathlib import Path +from typing import Any, Optional + +from .legacy import ensure_legacy_python_path + +ensure_legacy_python_path() + +from track1_analysis import LocusThresholdRow, fit_linear_cost_by_loci +from track1_api import config_from_mapping, run_config, save_payload + + +def utc_now_iso() -> str: + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + +def canonical_json(value: Any) -> str: + return json.dumps(value, sort_keys=True, separators=(",", ":")) + + +def hash_config(config: dict[str, Any]) -> str: + return "sha256:" + sha256(canonical_json(config).encode("utf-8")).hexdigest() + + +def default_worker_host() -> str: + return platform.node() or "unknown-host" + + +def code_identity(cwd: str | Path | None = None) -> dict[str, Any]: + identity: dict[str, Any] = { + "runner": "renunney orchestration + legacy Track 1 backend", + "python_version": platform.python_version(), + "git_commit": None, + } + try: + commit = subprocess.check_output( + ["git", "rev-parse", "HEAD"], + cwd=str(cwd) if cwd is not None else None, + stderr=subprocess.DEVNULL, + text=True, + ).strip() + except Exception: + commit = None + identity["git_commit"] = commit + return identity + + +def _connect(db_path: str | Path) -> sqlite3.Connection: + conn = sqlite3.connect(str(db_path), timeout=30.0) + conn.row_factory = sqlite3.Row + return conn + + +def initialize_registry(db_path: str | Path) -> None: + db_file = Path(db_path) + db_file.parent.mkdir(parents=True, exist_ok=True) + with _connect(db_file) as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS jobs ( + job_id TEXT PRIMARY KEY, + project TEXT NOT NULL, + track TEXT NOT NULL, + job_kind TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL, + created_at TEXT NOT NULL, + created_by TEXT, + worker_backend TEXT NOT NULL, + manifest_json TEXT NOT NULL, + config_hash TEXT NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 1, + claimed_by TEXT, + claimed_at TEXT, + heartbeat_at TEXT, + finished_at TEXT + ); + + CREATE TABLE IF NOT EXISTS job_results ( + job_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + worker_host TEXT NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT, + wall_seconds REAL, + exit_code INTEGER, + result_json TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS job_events ( + event_id INTEGER PRIMARY KEY, + job_id TEXT NOT NULL, + event_time TEXT NOT NULL, + event_kind TEXT NOT NULL, + event_json TEXT NOT NULL + ); + """ + ) + + +def _record_event(conn: sqlite3.Connection, job_id: str, event_kind: str, event: dict[str, Any]) -> None: + conn.execute( + """ + INSERT INTO job_events (job_id, event_time, event_kind, event_json) + VALUES (?, ?, ?, ?) + """, + (job_id, utc_now_iso(), event_kind, json.dumps(event, sort_keys=True)), + ) + + +def submit_job_manifest(db_path: str | Path, manifest: dict[str, Any]) -> str: + initialize_registry(db_path) + config_hash = hash_config(manifest["config"]) + retry = manifest.get("retry", {}) + with _connect(db_path) as conn: + conn.execute( + """ + INSERT INTO jobs ( + job_id, project, track, job_kind, priority, status, created_at, + created_by, worker_backend, manifest_json, config_hash, + attempt_count, max_attempts + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + manifest["job_id"], + manifest["project"], + manifest["track"], + manifest["job_kind"], + int(manifest.get("priority", 0)), + "pending", + manifest.get("created_at", utc_now_iso()), + manifest.get("created_by"), + manifest["worker_backend"], + json.dumps(manifest, sort_keys=True), + config_hash, + 0, + int(retry.get("max_attempts", 1)), + ), + ) + _record_event(conn, manifest["job_id"], "submitted", {"manifest": manifest}) + return manifest["job_id"] + + +@dataclass(frozen=True) +class ClaimedJob: + job_id: str + manifest: dict[str, Any] + claimed_by: str + claimed_at: str + + +def claim_next_job( + db_path: str | Path, + worker_backend: str, + worker_host: str, +) -> Optional[ClaimedJob]: + initialize_registry(db_path) + with _connect(db_path) as conn: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute( + """ + SELECT job_id, manifest_json + FROM jobs + WHERE status = 'pending' AND worker_backend = ? + ORDER BY priority DESC, created_at ASC, job_id ASC + LIMIT 1 + """, + (worker_backend,), + ).fetchone() + if row is None: + conn.commit() + return None + claimed_at = utc_now_iso() + conn.execute( + """ + UPDATE jobs + SET status = 'running', + claimed_by = ?, + claimed_at = ?, + heartbeat_at = ?, + attempt_count = attempt_count + 1 + WHERE job_id = ? + """, + (worker_host, claimed_at, claimed_at, row["job_id"]), + ) + manifest = json.loads(row["manifest_json"]) + _record_event( + conn, + row["job_id"], + "claimed", + {"worker_host": worker_host, "claimed_at": claimed_at}, + ) + conn.commit() + return ClaimedJob( + job_id=row["job_id"], + manifest=manifest, + claimed_by=worker_host, + claimed_at=claimed_at, + ) + + +def load_job_manifest(db_path: str | Path, job_id: str) -> dict[str, Any]: + with _connect(db_path) as conn: + row = conn.execute("SELECT manifest_json FROM jobs WHERE job_id = ?", (job_id,)).fetchone() + if row is None: + raise KeyError(job_id) + return json.loads(row["manifest_json"]) + + +def list_jobs(db_path: str | Path) -> list[dict[str, Any]]: + with _connect(db_path) as conn: + rows = conn.execute( + """ + SELECT job_id, project, track, job_kind, priority, status, created_at, + created_by, worker_backend, config_hash, attempt_count, + max_attempts, claimed_by, claimed_at, heartbeat_at, finished_at + FROM jobs + ORDER BY priority DESC, created_at ASC, job_id ASC + """ + ).fetchall() + return [dict(row) for row in rows] + + +def list_job_results(db_path: str | Path) -> list[dict[str, Any]]: + with _connect(db_path) as conn: + rows = conn.execute( + """ + SELECT job_id, status, worker_host, started_at, finished_at, + wall_seconds, exit_code, result_json + FROM job_results + ORDER BY started_at ASC, job_id ASC + """ + ).fetchall() + results: list[dict[str, Any]] = [] + for row in rows: + item = dict(row) + item["result"] = json.loads(item.pop("result_json")) + results.append(item) + return results + + +def expand_track1_figure1_manifest( + base_job_id_prefix: str, + config: dict[str, Any], + priority: int = 0, + created_by: str = "codex", + result_prefix: str = "results/track1/figure1", +) -> list[dict[str, Any]]: + loci_values = list(config.get("loci_values", [])) + if config.get("mode") != "loci_regression": + raise ValueError("Figure 1 expansion requires a loci_regression config.") + if not loci_values: + raise ValueError("Figure 1 expansion requires non-empty loci_values.") + + M_value = 2.0 * float(config["K"]) * float(config["u"]) + manifests: list[dict[str, Any]] = [] + for n_value in loci_values: + job_id = f"{base_job_id_prefix}-n{int(n_value)}" + job_config = deepcopy(config) + job_config["mode"] = "search" + job_config["n"] = int(n_value) + job_config.pop("loci_values", None) + result_base = f"{result_prefix}/M{M_value:g}/n{int(n_value)}" + manifests.append( + { + "job_id": job_id, + "project": "cost_of_substitution", + "track": "track1", + "job_kind": "track1_locus_threshold", + "priority": int(priority), + "created_at": utc_now_iso(), + "created_by": created_by, + "worker_backend": "python-track1", + "config": job_config, + "resources": { + "cpu_cores": int(job_config.get("jobs", 1)), + "ram_gb": 8, + "runtime_class": "long", + "use_local_scratch": True, + }, + "result_paths": { + "payload_json": f"{result_base}/payload.json", + "log_txt": f"{result_base}/run.log", + }, + "retry": {"max_attempts": 2, "idempotent": True}, + "notes": f"Track 1 Figure 1 threshold job for M={M_value:g}, n={int(n_value)}.", + } + ) + return manifests + + +def submit_track1_figure1_jobs( + db_path: str | Path, + base_job_id_prefix: str, + config: dict[str, Any], + priority: int = 0, + created_by: str = "codex", + result_prefix: str = "results/track1/figure1", +) -> list[str]: + manifests = expand_track1_figure1_manifest( + base_job_id_prefix=base_job_id_prefix, + config=config, + priority=priority, + created_by=created_by, + result_prefix=result_prefix, + ) + job_ids: list[str] = [] + for manifest in manifests: + job_ids.append(submit_job_manifest(db_path, manifest)) + return job_ids + + +def complete_job(db_path: str | Path, result: dict[str, Any]) -> None: + finished_at = result.get("finished_at", utc_now_iso()) + with _connect(db_path) as conn: + conn.execute( + """ + UPDATE jobs + SET status = ?, heartbeat_at = ?, finished_at = ? + WHERE job_id = ? + """, + (result["status"], finished_at, finished_at, result["job_id"]), + ) + conn.execute( + """ + INSERT OR REPLACE INTO job_results ( + job_id, status, worker_host, started_at, finished_at, + wall_seconds, exit_code, result_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + result["job_id"], + result["status"], + result["worker_host"], + result["started_at"], + finished_at, + result.get("wall_seconds"), + result.get("exit_code"), + json.dumps(result, sort_keys=True), + ), + ) + _record_event(conn, result["job_id"], "completed", result) + + +def _resolve_result_path(result_root: str | Path, relative_path: str) -> Path: + path = Path(result_root) / relative_path + path.parent.mkdir(parents=True, exist_ok=True) + return path + + +def _job_summary(manifest: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]: + summary: dict[str, Any] = { + "mode": payload.get("mode"), + } + config = manifest.get("config", {}) + if "u" in config: + summary["u"] = config["u"] + if {"K", "u"} <= set(config): + summary["M"] = 2.0 * float(config["K"]) * float(config["u"]) + if "n" in config: + summary["n"] = config["n"] + if payload.get("mode") == "search": + result = payload.get("result") + summary["accepted"] = result is not None + summary["threshold_T"] = None if result is None else result["threshold_T"] + if result is not None: + summary["baseline_extinctions"] = result["baseline_check"]["extinctions"] + summary["check_1_02_extinctions"] = result["check_1_02"]["extinctions"] + summary["check_1_05_extinctions"] = result["check_1_05"]["extinctions"] + summary["check_1_10_extinctions"] = result["check_1_10"]["extinctions"] + elif payload.get("mode") == "loci_regression": + rows = payload.get("rows", []) + summary["row_count"] = len(rows) + summary["accepted_rows"] = sum(1 for row in rows if row.get("accepted")) + summary["fit"] = payload.get("fit") + elif payload.get("mode") == "simulate": + summary["extinct"] = payload.get("extinct") + summary["generations_recorded"] = payload.get("generations_recorded") + return summary + + +def execute_job_manifest( + manifest: dict[str, Any], + result_root: str | Path, + scratch_root: str | Path | None = None, + worker_host: str | None = None, + cwd: str | Path | None = None, +) -> dict[str, Any]: + worker_host = worker_host or default_worker_host() + started_at = utc_now_iso() + started_clock = time.monotonic() + + config_mapping = deepcopy(manifest["config"]) + if scratch_root is not None and "cache_path" not in config_mapping: + cache_path = Path(scratch_root) / "cache" / f"{manifest['job_id']}.json" + cache_path.parent.mkdir(parents=True, exist_ok=True) + config_mapping["cache_path"] = str(cache_path) + + status = "succeeded" + error = None + exit_code = 0 + payload: dict[str, Any] | None = None + try: + if manifest["worker_backend"] != "python-track1": + raise ValueError(f"Unsupported worker backend: {manifest['worker_backend']}") + payload = run_config(config_from_mapping(config_mapping)) + except Exception as exc: # pragma: no cover - failure path still summarized + status = "failed" + exit_code = 1 + error = {"type": type(exc).__name__, "message": str(exc)} + payload = {"job_id": manifest["job_id"], "error": error} + + artifacts: dict[str, str] = {} + result_paths = manifest.get("result_paths", {}) + if "payload_json" in result_paths and payload is not None: + payload_path = _resolve_result_path(result_root, result_paths["payload_json"]) + save_payload(payload, payload_path) + artifacts["payload_json"] = str(payload_path) + if "log_txt" in result_paths: + log_path = _resolve_result_path(result_root, result_paths["log_txt"]) + lines = [ + f"job_id={manifest['job_id']}", + f"worker_backend={manifest['worker_backend']}", + f"status={status}", + f"started_at={started_at}", + f"finished_at={utc_now_iso()}", + ] + if error is not None: + lines.append(f"error={error['type']}: {error['message']}") + log_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + artifacts["log_txt"] = str(log_path) + + finished_at = utc_now_iso() + return { + "job_id": manifest["job_id"], + "status": status, + "worker_backend": manifest["worker_backend"], + "worker_host": worker_host, + "started_at": started_at, + "finished_at": finished_at, + "wall_seconds": float(time.monotonic() - started_clock), + "exit_code": exit_code, + "config_hash": hash_config(manifest["config"]), + "code_identity": code_identity(cwd=cwd), + "artifacts": artifacts, + "summary": _job_summary(manifest, payload or {}), + "error": error, + } + + +def run_one_job( + db_path: str | Path, + result_root: str | Path, + worker_backend: str = "python-track1", + worker_host: str | None = None, + scratch_root: str | Path | None = None, + cwd: str | Path | None = None, +) -> Optional[dict[str, Any]]: + worker_host = worker_host or default_worker_host() + claimed = claim_next_job(db_path=db_path, worker_backend=worker_backend, worker_host=worker_host) + if claimed is None: + return None + result = execute_job_manifest( + manifest=claimed.manifest, + result_root=result_root, + scratch_root=scratch_root, + worker_host=worker_host, + cwd=cwd, + ) + complete_job(db_path, result) + return result + + +def run_worker_loop( + db_path: str | Path, + result_root: str | Path, + worker_backend: str = "python-track1", + worker_host: str | None = None, + scratch_root: str | Path | None = None, + cwd: str | Path | None = None, + max_jobs: int | None = None, +) -> dict[str, Any]: + worker_host = worker_host or default_worker_host() + completed: list[dict[str, Any]] = [] + attempted = 0 + succeeded = 0 + failed = 0 + + while max_jobs is None or attempted < max_jobs: + result = run_one_job( + db_path=db_path, + result_root=result_root, + worker_backend=worker_backend, + worker_host=worker_host, + scratch_root=scratch_root, + cwd=cwd, + ) + if result is None: + break + attempted += 1 + completed.append(result) + if result["status"] == "succeeded": + succeeded += 1 + else: + failed += 1 + + return { + "worker_backend": worker_backend, + "worker_host": worker_host, + "attempted_jobs": attempted, + "succeeded_jobs": succeeded, + "failed_jobs": failed, + "completed_job_ids": [item["job_id"] for item in completed], + "stopped_because": "max_jobs_reached" if max_jobs is not None and attempted >= max_jobs else "queue_empty", + } + + +def collate_track1_figure1(db_path: str | Path) -> dict[str, Any]: + initialize_registry(db_path) + with _connect(db_path) as conn: + rows = conn.execute( + """ + SELECT j.job_id, j.job_kind, j.manifest_json, r.result_json + FROM jobs AS j + JOIN job_results AS r ON j.job_id = r.job_id + WHERE r.status = 'succeeded' + ORDER BY j.job_id ASC + """ + ).fetchall() + + grouped: dict[str, list[dict[str, Any]]] = {} + for row in rows: + manifest = json.loads(row["manifest_json"]) + result = json.loads(row["result_json"]) + summary = result.get("summary", {}) + config = manifest.get("config", {}) + if manifest.get("job_kind") != "track1_locus_threshold": + continue + if summary.get("threshold_T") is None and not summary.get("accepted", False): + continue + M_value = summary.get("M") + n_value = summary.get("n", config.get("n")) + if M_value is None or n_value is None: + continue + group_key = f"{float(M_value):g}" + grouped.setdefault(group_key, []).append( + { + "job_id": manifest["job_id"], + "M": float(M_value), + "u": float(summary.get("u", config.get("u"))), + "n": int(n_value), + "threshold_T": summary.get("threshold_T"), + "accepted": bool(summary.get("accepted", False)), + "baseline_extinctions": summary.get("baseline_extinctions"), + "check_1_02_extinctions": summary.get("check_1_02_extinctions"), + "check_1_05_extinctions": summary.get("check_1_05_extinctions"), + "check_1_10_extinctions": summary.get("check_1_10_extinctions"), + } + ) + + treatments: list[dict[str, Any]] = [] + for key in sorted(grouped, key=lambda value: float(value)): + item_rows = sorted(grouped[key], key=lambda item: item["n"]) + fit_rows = [ + LocusThresholdRow( + n=int(item["n"]), + threshold_T=None if item["threshold_T"] is None else float(item["threshold_T"]), + accepted=bool(item["accepted"]), + ) + for item in item_rows + ] + fit = fit_linear_cost_by_loci(fit_rows) + treatments.append( + { + "M": float(key), + "u_values": sorted({float(item["u"]) for item in item_rows}), + "rows": item_rows, + "fit": None if fit is None else asdict(fit), + } + ) + + return { + "job_count": len(rows), + "treatment_count": len(treatments), + "treatments": treatments, + }