Migrate orchestration control plane into renunney

This commit is contained in:
Codex 2026-04-11 06:17:58 -04:00
parent 61313daee4
commit 1385583efc
10 changed files with 933 additions and 13 deletions

View File

@ -1,7 +1,7 @@
PYTHON := python3 PYTHON := python3
REPO_ROOT := $(abspath .) REPO_ROOT := $(abspath .)
LEGACY_ROOT := $(REPO_ROOT)/../collaborations/to_ptbc/evc/cost_of_substitution LEGACY_ROOT := $(REPO_ROOT)/../collaborations/to_ptbc/evc/cost_of_substitution
ORCH := $(LEGACY_ROOT)/python/run_orchestration.py ORCH := $(REPO_ROOT)/scripts/run_orchestration.py
DB := $(REPO_ROOT)/runs/state/cos-orch.sqlite DB := $(REPO_ROOT)/runs/state/cos-orch.sqlite
RESULT_ROOT := $(REPO_ROOT)/runs/results RESULT_ROOT := $(REPO_ROOT)/runs/results
@ -20,7 +20,7 @@ FIG1_M100 := $(LEGACY_ROOT)/examples/track1_figure1_paper_M_10_0.json
help: help:
@echo "Targets:" @echo "Targets:"
@echo " init Create run directories and initialize the SQLite registry" @echo " init Create run directories and initialize the SQLite registry"
@echo " doctor Show key paths and verify legacy orchestration entrypoint" @echo " doctor Show key paths and verify local orchestration and legacy backend paths"
@echo " list-jobs List jobs in the local registry" @echo " list-jobs List jobs in the local registry"
@echo " run-one Claim and run one queued job" @echo " run-one Claim and run one queued job"
@echo " run-loop Run worker loop until queue empty" @echo " run-loop Run worker loop until queue empty"
@ -42,10 +42,12 @@ init:
doctor: doctor:
@echo "REPO_ROOT=$(REPO_ROOT)" @echo "REPO_ROOT=$(REPO_ROOT)"
@echo "LEGACY_ROOT=$(LEGACY_ROOT)" @echo "LEGACY_ROOT=$(LEGACY_ROOT)"
@echo "ORCH=$(ORCH)"
@echo "DB=$(DB)" @echo "DB=$(DB)"
@echo "RESULT_ROOT=$(RESULT_ROOT)" @echo "RESULT_ROOT=$(RESULT_ROOT)"
@echo "SCRATCH_ROOT=$(SCRATCH_ROOT)" @echo "SCRATCH_ROOT=$(SCRATCH_ROOT)"
test -f $(ORCH) test -f $(ORCH)
test -d $(LEGACY_ROOT)/python
list-jobs: list-jobs:
$(PYTHON) $(ORCH) list --db $(DB) $(PYTHON) $(ORCH) list --db $(DB)

View File

@ -12,11 +12,14 @@ This repository is the clean operational wrapper around the current work in:
- [`../collaborations/to_ptbc/evc/cost_of_substitution`](/mnt/CIFS/pengolodh/Docs/Projects/collaborations/to_ptbc/evc/cost_of_substitution) - [`../collaborations/to_ptbc/evc/cost_of_substitution`](/mnt/CIFS/pengolodh/Docs/Projects/collaborations/to_ptbc/evc/cost_of_substitution)
The current orchestration and Track 1 simulation code still live there. The Track 1 simulation backend still lives there. The orchestration control
plane is now local to `renunney`.
`renunney` provides: `renunney` provides:
- a clean git repo, - a clean git repo,
- a stable working directory layout, - a stable working directory layout,
- a local orchestration CLI and library,
- a Makefile for common tasks, - a Makefile for common tasks,
- migration notes for pulling code into this repo in stages. - migration notes for pulling code into this repo in stages.
@ -63,6 +66,11 @@ make collate-figure1
## Status ## Status
The active simulation/orchestration implementation remains in the older The current state is split:
`cost_of_substitution` directory for now. This repo is the clean shell for
running, organizing, and gradually migrating that work. - orchestration control plane: local to `renunney`
- Track 1 simulation backend: still in the older `cost_of_substitution`
directory
This repo is now the clean operational entry point while the simulation code is
migrated in later stages.

View File

@ -20,10 +20,10 @@ Operational code still lives in:
## Recommended Migration Order ## Recommended Migration Order
1. Keep orchestration running from the legacy path until real multi-host runs are stable. 1. Orchestration control plane has been migrated locally:
2. Migrate the orchestration modules first: - `src/renunney/orchestration.py`
- `python/orchestration.py` - `scripts/run_orchestration.py`
- `python/run_orchestration.py` 2. Keep the Track 1 simulation backend in the legacy path until real multi-host runs are stable.
3. Migrate Track 1 runner and API next: 3. Migrate Track 1 runner and API next:
- `python/run_track1.py` - `python/run_track1.py`
- `python/track1_api.py` - `python/track1_api.py`

View File

@ -40,5 +40,6 @@ make status
## Current Assumption ## Current Assumption
The Makefile drives the orchestration code in the legacy The Makefile now drives the local orchestration code in `renunney`, while the
`cost_of_substitution` directory until that code is migrated here. simulation backend is still imported from the legacy `cost_of_substitution`
directory.

View File

@ -0,0 +1,40 @@
{
"job_id": "track1-figure1-M1.0-n3-v1",
"project": "cost_of_substitution",
"track": "track1",
"job_kind": "track1_locus_threshold",
"priority": 10,
"created_at": "2026-04-10T21:40:00-04:00",
"created_by": "codex",
"worker_backend": "python-track1",
"config": {
"mode": "search",
"K": 5000,
"N0": 5000,
"n": 3,
"u": 0.0001,
"R": 10.0,
"T": 500,
"epochs": 8,
"p": 0.5,
"runs": 20,
"jobs": 8,
"t_values": [1, 2, 4, 6, 8, 10, 15, 20, 30, 40, 50, 75, 100, 125, 150, 175, 200, 250, 300, 350, 400, 450, 500],
"seed": 10
},
"resources": {
"cpu_cores": 8,
"ram_gb": 8,
"runtime_class": "long",
"use_local_scratch": true
},
"result_paths": {
"payload_json": "results/track1/figure1/M1.0/n3/payload.json",
"log_txt": "results/track1/figure1/M1.0/n3/run.log"
},
"retry": {
"max_attempts": 2,
"idempotent": true
},
"notes": "One Track 1 Figure 1 threshold search job for M=1.0 and n=3."
}

View File

@ -0,0 +1,33 @@
{
"job_id": "track1-figure1-M1.0-n3-v1",
"status": "succeeded",
"worker_backend": "python-track1",
"worker_host": "xeon24a",
"started_at": "2026-04-10T21:41:00-04:00",
"finished_at": "2026-04-10T21:58:30-04:00",
"wall_seconds": 1050.0,
"exit_code": 0,
"config_hash": "sha256:example",
"code_identity": {
"git_commit": "example",
"runner": "python/run_track1.py",
"python_version": "3.12"
},
"artifacts": {
"payload_json": "results/track1/figure1/M1.0/n3/payload.json",
"log_txt": "results/track1/figure1/M1.0/n3/run.log",
"cache_summary_json": "results/track1/figure1/M1.0/n3/cache-summary.json"
},
"summary": {
"M": 1.0,
"u": 0.0001,
"n": 3,
"threshold_T": 75.0,
"accepted": true,
"baseline_extinctions": 0,
"check_1_02_extinctions": 0,
"check_1_05_extinctions": 0,
"check_1_10_extinctions": 1
},
"error": null
}

View File

@ -0,0 +1,168 @@
"""
run_orchestration.py
Small CLI for the minimal SQLite-backed orchestration layer.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
REPO_ROOT = Path(__file__).resolve().parents[1]
SRC_DIR = REPO_ROOT / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
from renunney.orchestration import (
collate_track1_figure1,
expand_track1_figure1_manifest,
initialize_registry,
list_jobs,
run_one_job,
run_worker_loop,
submit_job_manifest,
submit_track1_figure1_jobs,
)
def save_payload(payload: dict, path: str | Path) -> None:
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Minimal orchestration CLI for cost-of-substitution sweeps.")
subparsers = parser.add_subparsers(dest="command", required=True)
init_db = subparsers.add_parser("init-db")
init_db.add_argument("--db", required=True)
submit = subparsers.add_parser("submit")
submit.add_argument("--db", required=True)
submit.add_argument("--manifest", required=True)
submit_fig1 = subparsers.add_parser("submit-figure1")
submit_fig1.add_argument("--db", required=True)
submit_fig1.add_argument("--config", required=True)
submit_fig1.add_argument("--job-prefix", required=True)
submit_fig1.add_argument("--priority", type=int, default=0)
submit_fig1.add_argument("--created-by", default="codex")
submit_fig1.add_argument("--result-prefix", default="results/track1/figure1")
list_cmd = subparsers.add_parser("list")
list_cmd.add_argument("--db", required=True)
run_one = subparsers.add_parser("run-one")
run_one.add_argument("--db", required=True)
run_one.add_argument("--result-root", required=True)
run_one.add_argument("--worker-backend", default="python-track1")
run_one.add_argument("--worker-host", default=None)
run_one.add_argument("--scratch-root", default=None)
run_loop = subparsers.add_parser("run-loop")
run_loop.add_argument("--db", required=True)
run_loop.add_argument("--result-root", required=True)
run_loop.add_argument("--worker-backend", default="python-track1")
run_loop.add_argument("--worker-host", default=None)
run_loop.add_argument("--scratch-root", default=None)
run_loop.add_argument("--max-jobs", type=int, default=None)
collate = subparsers.add_parser("collate-figure1")
collate.add_argument("--db", required=True)
collate.add_argument("--output", default=None)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.command == "init-db":
initialize_registry(args.db)
print(json.dumps({"db": str(Path(args.db)), "status": "initialized"}, indent=2, sort_keys=True))
return 0
if args.command == "submit":
manifest = json.loads(Path(args.manifest).read_text(encoding="utf-8"))
job_id = submit_job_manifest(args.db, manifest)
print(json.dumps({"db": str(Path(args.db)), "job_id": job_id, "status": "submitted"}, indent=2, sort_keys=True))
return 0
if args.command == "submit-figure1":
config = json.loads(Path(args.config).read_text(encoding="utf-8"))
manifests = expand_track1_figure1_manifest(
base_job_id_prefix=args.job_prefix,
config=config,
priority=args.priority,
created_by=args.created_by,
result_prefix=args.result_prefix,
)
job_ids = submit_track1_figure1_jobs(
db_path=args.db,
base_job_id_prefix=args.job_prefix,
config=config,
priority=args.priority,
created_by=args.created_by,
result_prefix=args.result_prefix,
)
print(
json.dumps(
{
"db": str(Path(args.db)),
"job_count": len(job_ids),
"job_ids": job_ids,
"status": "submitted",
"preview": manifests,
},
indent=2,
sort_keys=True,
)
)
return 0
if args.command == "list":
print(json.dumps(list_jobs(args.db), indent=2, sort_keys=True))
return 0
if args.command == "run-one":
result = run_one_job(
db_path=args.db,
result_root=args.result_root,
worker_backend=args.worker_backend,
worker_host=args.worker_host,
scratch_root=args.scratch_root,
cwd=REPO_ROOT,
)
print(json.dumps(result, indent=2, sort_keys=True))
return 0 if result is None or result["status"] == "succeeded" else 1
if args.command == "run-loop":
payload = run_worker_loop(
db_path=args.db,
result_root=args.result_root,
worker_backend=args.worker_backend,
worker_host=args.worker_host,
scratch_root=args.scratch_root,
cwd=REPO_ROOT,
max_jobs=args.max_jobs,
)
print(json.dumps(payload, indent=2, sort_keys=True))
return 0 if payload["failed_jobs"] == 0 else 1
if args.command == "collate-figure1":
payload = collate_track1_figure1(args.db)
if args.output:
save_payload(payload, args.output)
print(json.dumps(payload, indent=2, sort_keys=True))
return 0
raise ValueError(args.command)
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -1 +1,42 @@
"""renunney package placeholder.""" """renunney package."""
from .legacy import ensure_legacy_python_path, legacy_python_dir, legacy_root, repo_root
from .orchestration import (
ClaimedJob,
claim_next_job,
collate_track1_figure1,
complete_job,
execute_job_manifest,
expand_track1_figure1_manifest,
hash_config,
initialize_registry,
list_job_results,
list_jobs,
load_job_manifest,
run_one_job,
run_worker_loop,
submit_job_manifest,
submit_track1_figure1_jobs,
)
__all__ = [
"ClaimedJob",
"claim_next_job",
"collate_track1_figure1",
"complete_job",
"ensure_legacy_python_path",
"execute_job_manifest",
"expand_track1_figure1_manifest",
"hash_config",
"initialize_registry",
"legacy_python_dir",
"legacy_root",
"list_job_results",
"list_jobs",
"load_job_manifest",
"repo_root",
"run_one_job",
"run_worker_loop",
"submit_job_manifest",
"submit_track1_figure1_jobs",
]

25
src/renunney/legacy.py Normal file
View File

@ -0,0 +1,25 @@
"""Legacy path helpers for staged migration."""
from __future__ import annotations
import sys
from pathlib import Path
def repo_root() -> Path:
return Path(__file__).resolve().parents[2]
def legacy_root() -> Path:
return repo_root().parent / "collaborations" / "to_ptbc" / "evc" / "cost_of_substitution"
def legacy_python_dir() -> Path:
return legacy_root() / "python"
def ensure_legacy_python_path() -> Path:
path = legacy_python_dir()
if str(path) not in sys.path:
sys.path.insert(0, str(path))
return path

View File

@ -0,0 +1,602 @@
"""
orchestration.py
Minimal SQLite-backed orchestration support for LAN-distributed sweep runs.
"""
from __future__ import annotations
import json
import platform
import sqlite3
import subprocess
import time
from copy import deepcopy
from dataclasses import asdict, dataclass
from hashlib import sha256
from pathlib import Path
from typing import Any, Optional
from .legacy import ensure_legacy_python_path
ensure_legacy_python_path()
from track1_analysis import LocusThresholdRow, fit_linear_cost_by_loci
from track1_api import config_from_mapping, run_config, save_payload
def utc_now_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def canonical_json(value: Any) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"))
def hash_config(config: dict[str, Any]) -> str:
return "sha256:" + sha256(canonical_json(config).encode("utf-8")).hexdigest()
def default_worker_host() -> str:
return platform.node() or "unknown-host"
def code_identity(cwd: str | Path | None = None) -> dict[str, Any]:
identity: dict[str, Any] = {
"runner": "renunney orchestration + legacy Track 1 backend",
"python_version": platform.python_version(),
"git_commit": None,
}
try:
commit = subprocess.check_output(
["git", "rev-parse", "HEAD"],
cwd=str(cwd) if cwd is not None else None,
stderr=subprocess.DEVNULL,
text=True,
).strip()
except Exception:
commit = None
identity["git_commit"] = commit
return identity
def _connect(db_path: str | Path) -> sqlite3.Connection:
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.row_factory = sqlite3.Row
return conn
def initialize_registry(db_path: str | Path) -> None:
db_file = Path(db_path)
db_file.parent.mkdir(parents=True, exist_ok=True)
with _connect(db_file) as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
project TEXT NOT NULL,
track TEXT NOT NULL,
job_kind TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
created_by TEXT,
worker_backend TEXT NOT NULL,
manifest_json TEXT NOT NULL,
config_hash TEXT NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 1,
claimed_by TEXT,
claimed_at TEXT,
heartbeat_at TEXT,
finished_at TEXT
);
CREATE TABLE IF NOT EXISTS job_results (
job_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
worker_host TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
wall_seconds REAL,
exit_code INTEGER,
result_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS job_events (
event_id INTEGER PRIMARY KEY,
job_id TEXT NOT NULL,
event_time TEXT NOT NULL,
event_kind TEXT NOT NULL,
event_json TEXT NOT NULL
);
"""
)
def _record_event(conn: sqlite3.Connection, job_id: str, event_kind: str, event: dict[str, Any]) -> None:
conn.execute(
"""
INSERT INTO job_events (job_id, event_time, event_kind, event_json)
VALUES (?, ?, ?, ?)
""",
(job_id, utc_now_iso(), event_kind, json.dumps(event, sort_keys=True)),
)
def submit_job_manifest(db_path: str | Path, manifest: dict[str, Any]) -> str:
initialize_registry(db_path)
config_hash = hash_config(manifest["config"])
retry = manifest.get("retry", {})
with _connect(db_path) as conn:
conn.execute(
"""
INSERT INTO jobs (
job_id, project, track, job_kind, priority, status, created_at,
created_by, worker_backend, manifest_json, config_hash,
attempt_count, max_attempts
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
manifest["job_id"],
manifest["project"],
manifest["track"],
manifest["job_kind"],
int(manifest.get("priority", 0)),
"pending",
manifest.get("created_at", utc_now_iso()),
manifest.get("created_by"),
manifest["worker_backend"],
json.dumps(manifest, sort_keys=True),
config_hash,
0,
int(retry.get("max_attempts", 1)),
),
)
_record_event(conn, manifest["job_id"], "submitted", {"manifest": manifest})
return manifest["job_id"]
@dataclass(frozen=True)
class ClaimedJob:
job_id: str
manifest: dict[str, Any]
claimed_by: str
claimed_at: str
def claim_next_job(
db_path: str | Path,
worker_backend: str,
worker_host: str,
) -> Optional[ClaimedJob]:
initialize_registry(db_path)
with _connect(db_path) as conn:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"""
SELECT job_id, manifest_json
FROM jobs
WHERE status = 'pending' AND worker_backend = ?
ORDER BY priority DESC, created_at ASC, job_id ASC
LIMIT 1
""",
(worker_backend,),
).fetchone()
if row is None:
conn.commit()
return None
claimed_at = utc_now_iso()
conn.execute(
"""
UPDATE jobs
SET status = 'running',
claimed_by = ?,
claimed_at = ?,
heartbeat_at = ?,
attempt_count = attempt_count + 1
WHERE job_id = ?
""",
(worker_host, claimed_at, claimed_at, row["job_id"]),
)
manifest = json.loads(row["manifest_json"])
_record_event(
conn,
row["job_id"],
"claimed",
{"worker_host": worker_host, "claimed_at": claimed_at},
)
conn.commit()
return ClaimedJob(
job_id=row["job_id"],
manifest=manifest,
claimed_by=worker_host,
claimed_at=claimed_at,
)
def load_job_manifest(db_path: str | Path, job_id: str) -> dict[str, Any]:
with _connect(db_path) as conn:
row = conn.execute("SELECT manifest_json FROM jobs WHERE job_id = ?", (job_id,)).fetchone()
if row is None:
raise KeyError(job_id)
return json.loads(row["manifest_json"])
def list_jobs(db_path: str | Path) -> list[dict[str, Any]]:
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT job_id, project, track, job_kind, priority, status, created_at,
created_by, worker_backend, config_hash, attempt_count,
max_attempts, claimed_by, claimed_at, heartbeat_at, finished_at
FROM jobs
ORDER BY priority DESC, created_at ASC, job_id ASC
"""
).fetchall()
return [dict(row) for row in rows]
def list_job_results(db_path: str | Path) -> list[dict[str, Any]]:
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT job_id, status, worker_host, started_at, finished_at,
wall_seconds, exit_code, result_json
FROM job_results
ORDER BY started_at ASC, job_id ASC
"""
).fetchall()
results: list[dict[str, Any]] = []
for row in rows:
item = dict(row)
item["result"] = json.loads(item.pop("result_json"))
results.append(item)
return results
def expand_track1_figure1_manifest(
base_job_id_prefix: str,
config: dict[str, Any],
priority: int = 0,
created_by: str = "codex",
result_prefix: str = "results/track1/figure1",
) -> list[dict[str, Any]]:
loci_values = list(config.get("loci_values", []))
if config.get("mode") != "loci_regression":
raise ValueError("Figure 1 expansion requires a loci_regression config.")
if not loci_values:
raise ValueError("Figure 1 expansion requires non-empty loci_values.")
M_value = 2.0 * float(config["K"]) * float(config["u"])
manifests: list[dict[str, Any]] = []
for n_value in loci_values:
job_id = f"{base_job_id_prefix}-n{int(n_value)}"
job_config = deepcopy(config)
job_config["mode"] = "search"
job_config["n"] = int(n_value)
job_config.pop("loci_values", None)
result_base = f"{result_prefix}/M{M_value:g}/n{int(n_value)}"
manifests.append(
{
"job_id": job_id,
"project": "cost_of_substitution",
"track": "track1",
"job_kind": "track1_locus_threshold",
"priority": int(priority),
"created_at": utc_now_iso(),
"created_by": created_by,
"worker_backend": "python-track1",
"config": job_config,
"resources": {
"cpu_cores": int(job_config.get("jobs", 1)),
"ram_gb": 8,
"runtime_class": "long",
"use_local_scratch": True,
},
"result_paths": {
"payload_json": f"{result_base}/payload.json",
"log_txt": f"{result_base}/run.log",
},
"retry": {"max_attempts": 2, "idempotent": True},
"notes": f"Track 1 Figure 1 threshold job for M={M_value:g}, n={int(n_value)}.",
}
)
return manifests
def submit_track1_figure1_jobs(
db_path: str | Path,
base_job_id_prefix: str,
config: dict[str, Any],
priority: int = 0,
created_by: str = "codex",
result_prefix: str = "results/track1/figure1",
) -> list[str]:
manifests = expand_track1_figure1_manifest(
base_job_id_prefix=base_job_id_prefix,
config=config,
priority=priority,
created_by=created_by,
result_prefix=result_prefix,
)
job_ids: list[str] = []
for manifest in manifests:
job_ids.append(submit_job_manifest(db_path, manifest))
return job_ids
def complete_job(db_path: str | Path, result: dict[str, Any]) -> None:
finished_at = result.get("finished_at", utc_now_iso())
with _connect(db_path) as conn:
conn.execute(
"""
UPDATE jobs
SET status = ?, heartbeat_at = ?, finished_at = ?
WHERE job_id = ?
""",
(result["status"], finished_at, finished_at, result["job_id"]),
)
conn.execute(
"""
INSERT OR REPLACE INTO job_results (
job_id, status, worker_host, started_at, finished_at,
wall_seconds, exit_code, result_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
result["job_id"],
result["status"],
result["worker_host"],
result["started_at"],
finished_at,
result.get("wall_seconds"),
result.get("exit_code"),
json.dumps(result, sort_keys=True),
),
)
_record_event(conn, result["job_id"], "completed", result)
def _resolve_result_path(result_root: str | Path, relative_path: str) -> Path:
path = Path(result_root) / relative_path
path.parent.mkdir(parents=True, exist_ok=True)
return path
def _job_summary(manifest: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]:
summary: dict[str, Any] = {
"mode": payload.get("mode"),
}
config = manifest.get("config", {})
if "u" in config:
summary["u"] = config["u"]
if {"K", "u"} <= set(config):
summary["M"] = 2.0 * float(config["K"]) * float(config["u"])
if "n" in config:
summary["n"] = config["n"]
if payload.get("mode") == "search":
result = payload.get("result")
summary["accepted"] = result is not None
summary["threshold_T"] = None if result is None else result["threshold_T"]
if result is not None:
summary["baseline_extinctions"] = result["baseline_check"]["extinctions"]
summary["check_1_02_extinctions"] = result["check_1_02"]["extinctions"]
summary["check_1_05_extinctions"] = result["check_1_05"]["extinctions"]
summary["check_1_10_extinctions"] = result["check_1_10"]["extinctions"]
elif payload.get("mode") == "loci_regression":
rows = payload.get("rows", [])
summary["row_count"] = len(rows)
summary["accepted_rows"] = sum(1 for row in rows if row.get("accepted"))
summary["fit"] = payload.get("fit")
elif payload.get("mode") == "simulate":
summary["extinct"] = payload.get("extinct")
summary["generations_recorded"] = payload.get("generations_recorded")
return summary
def execute_job_manifest(
manifest: dict[str, Any],
result_root: str | Path,
scratch_root: str | Path | None = None,
worker_host: str | None = None,
cwd: str | Path | None = None,
) -> dict[str, Any]:
worker_host = worker_host or default_worker_host()
started_at = utc_now_iso()
started_clock = time.monotonic()
config_mapping = deepcopy(manifest["config"])
if scratch_root is not None and "cache_path" not in config_mapping:
cache_path = Path(scratch_root) / "cache" / f"{manifest['job_id']}.json"
cache_path.parent.mkdir(parents=True, exist_ok=True)
config_mapping["cache_path"] = str(cache_path)
status = "succeeded"
error = None
exit_code = 0
payload: dict[str, Any] | None = None
try:
if manifest["worker_backend"] != "python-track1":
raise ValueError(f"Unsupported worker backend: {manifest['worker_backend']}")
payload = run_config(config_from_mapping(config_mapping))
except Exception as exc: # pragma: no cover - failure path still summarized
status = "failed"
exit_code = 1
error = {"type": type(exc).__name__, "message": str(exc)}
payload = {"job_id": manifest["job_id"], "error": error}
artifacts: dict[str, str] = {}
result_paths = manifest.get("result_paths", {})
if "payload_json" in result_paths and payload is not None:
payload_path = _resolve_result_path(result_root, result_paths["payload_json"])
save_payload(payload, payload_path)
artifacts["payload_json"] = str(payload_path)
if "log_txt" in result_paths:
log_path = _resolve_result_path(result_root, result_paths["log_txt"])
lines = [
f"job_id={manifest['job_id']}",
f"worker_backend={manifest['worker_backend']}",
f"status={status}",
f"started_at={started_at}",
f"finished_at={utc_now_iso()}",
]
if error is not None:
lines.append(f"error={error['type']}: {error['message']}")
log_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
artifacts["log_txt"] = str(log_path)
finished_at = utc_now_iso()
return {
"job_id": manifest["job_id"],
"status": status,
"worker_backend": manifest["worker_backend"],
"worker_host": worker_host,
"started_at": started_at,
"finished_at": finished_at,
"wall_seconds": float(time.monotonic() - started_clock),
"exit_code": exit_code,
"config_hash": hash_config(manifest["config"]),
"code_identity": code_identity(cwd=cwd),
"artifacts": artifacts,
"summary": _job_summary(manifest, payload or {}),
"error": error,
}
def run_one_job(
db_path: str | Path,
result_root: str | Path,
worker_backend: str = "python-track1",
worker_host: str | None = None,
scratch_root: str | Path | None = None,
cwd: str | Path | None = None,
) -> Optional[dict[str, Any]]:
worker_host = worker_host or default_worker_host()
claimed = claim_next_job(db_path=db_path, worker_backend=worker_backend, worker_host=worker_host)
if claimed is None:
return None
result = execute_job_manifest(
manifest=claimed.manifest,
result_root=result_root,
scratch_root=scratch_root,
worker_host=worker_host,
cwd=cwd,
)
complete_job(db_path, result)
return result
def run_worker_loop(
db_path: str | Path,
result_root: str | Path,
worker_backend: str = "python-track1",
worker_host: str | None = None,
scratch_root: str | Path | None = None,
cwd: str | Path | None = None,
max_jobs: int | None = None,
) -> dict[str, Any]:
worker_host = worker_host or default_worker_host()
completed: list[dict[str, Any]] = []
attempted = 0
succeeded = 0
failed = 0
while max_jobs is None or attempted < max_jobs:
result = run_one_job(
db_path=db_path,
result_root=result_root,
worker_backend=worker_backend,
worker_host=worker_host,
scratch_root=scratch_root,
cwd=cwd,
)
if result is None:
break
attempted += 1
completed.append(result)
if result["status"] == "succeeded":
succeeded += 1
else:
failed += 1
return {
"worker_backend": worker_backend,
"worker_host": worker_host,
"attempted_jobs": attempted,
"succeeded_jobs": succeeded,
"failed_jobs": failed,
"completed_job_ids": [item["job_id"] for item in completed],
"stopped_because": "max_jobs_reached" if max_jobs is not None and attempted >= max_jobs else "queue_empty",
}
def collate_track1_figure1(db_path: str | Path) -> dict[str, Any]:
initialize_registry(db_path)
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT j.job_id, j.job_kind, j.manifest_json, r.result_json
FROM jobs AS j
JOIN job_results AS r ON j.job_id = r.job_id
WHERE r.status = 'succeeded'
ORDER BY j.job_id ASC
"""
).fetchall()
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
manifest = json.loads(row["manifest_json"])
result = json.loads(row["result_json"])
summary = result.get("summary", {})
config = manifest.get("config", {})
if manifest.get("job_kind") != "track1_locus_threshold":
continue
if summary.get("threshold_T") is None and not summary.get("accepted", False):
continue
M_value = summary.get("M")
n_value = summary.get("n", config.get("n"))
if M_value is None or n_value is None:
continue
group_key = f"{float(M_value):g}"
grouped.setdefault(group_key, []).append(
{
"job_id": manifest["job_id"],
"M": float(M_value),
"u": float(summary.get("u", config.get("u"))),
"n": int(n_value),
"threshold_T": summary.get("threshold_T"),
"accepted": bool(summary.get("accepted", False)),
"baseline_extinctions": summary.get("baseline_extinctions"),
"check_1_02_extinctions": summary.get("check_1_02_extinctions"),
"check_1_05_extinctions": summary.get("check_1_05_extinctions"),
"check_1_10_extinctions": summary.get("check_1_10_extinctions"),
}
)
treatments: list[dict[str, Any]] = []
for key in sorted(grouped, key=lambda value: float(value)):
item_rows = sorted(grouped[key], key=lambda item: item["n"])
fit_rows = [
LocusThresholdRow(
n=int(item["n"]),
threshold_T=None if item["threshold_T"] is None else float(item["threshold_T"]),
accepted=bool(item["accepted"]),
)
for item in item_rows
]
fit = fit_linear_cost_by_loci(fit_rows)
treatments.append(
{
"M": float(key),
"u_values": sorted({float(item["u"]) for item in item_rows}),
"rows": item_rows,
"fit": None if fit is None else asdict(fit),
}
)
return {
"job_count": len(rows),
"treatment_count": len(treatments),
"treatments": treatments,
}