import json import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SRC_DIR = ROOT / "src" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) import renunney.orchestration as orch def test_registry_submit_claim_and_complete(tmp_path: Path): db_path = tmp_path / "jobs.sqlite" orch.initialize_registry(db_path) manifest = { "job_id": "job-1", "project": "renunney", "track": "track1", "job_kind": "track1_locus_threshold", "priority": 5, "created_at": "2026-04-10T00:00:00Z", "created_by": "test", "worker_backend": "python-track1", "config": { "mode": "search", "K": 500, "N0": 500, "n": 1, "u": 0.001, "R": 10.0, "T": 10, "epochs": 1, "runs": 1, "seed": 1, "t_values": [5, 10], }, "resources": {"cpu_cores": 1}, "result_paths": {"payload_json": "payloads/job-1.json"}, "retry": {"max_attempts": 2, "idempotent": True}, "notes": "", } orch.submit_job_manifest(db_path, manifest) claimed = orch.claim_next_job(db_path, worker_backend="python-track1", worker_host="worker-a") assert claimed is not None assert claimed.job_id == "job-1" result = { "job_id": "job-1", "status": "succeeded", "worker_backend": "python-track1", "worker_host": "worker-a", "started_at": "2026-04-10T00:00:10Z", "finished_at": "2026-04-10T00:00:20Z", "wall_seconds": 10.0, "exit_code": 0, "config_hash": orch.hash_config(manifest["config"]), "code_identity": {"git_commit": None}, "artifacts": {}, "summary": {"threshold_T": 10.0}, "error": None, } orch.complete_job(db_path, result) jobs = orch.list_jobs(db_path) assert jobs[0]["status"] == "succeeded" def test_run_one_job_executes_python_track1_backend(tmp_path: Path): db_path = tmp_path / "jobs.sqlite" result_root = tmp_path / "results" scratch_root = tmp_path / "scratch" orch.initialize_registry(db_path) manifest = { "job_id": "job-sim-1", "project": "renunney", "track": "track1", "job_kind": "track1_simulate", "priority": 1, "created_at": "2026-04-10T00:00:00Z", "created_by": "test", "worker_backend": "python-track1", "config": { "mode": "simulate", "K": 500, "N0": 500, "n": 1, "u": 0.001, "R": 10.0, "T": 10, "epochs": 1, "seed": 1, }, "resources": {"cpu_cores": 1}, "result_paths": { "payload_json": "payloads/job-sim-1.json", "log_txt": "logs/job-sim-1.log", }, "retry": {"max_attempts": 1, "idempotent": True}, "notes": "", } orch.submit_job_manifest(db_path, manifest) result = orch.run_one_job( db_path=db_path, result_root=result_root, worker_backend="python-track1", worker_host="worker-a", scratch_root=scratch_root, cwd=ROOT, ) assert result is not None assert result["status"] == "succeeded" payload_path = result_root / "payloads" / "job-sim-1.json" assert payload_path.exists() payload = json.loads(payload_path.read_text(encoding="utf-8")) assert payload["mode"] == "simulate" jobs = orch.list_jobs(db_path) assert jobs[0]["status"] == "succeeded" def test_collate_track1_figure1_groups_rows_and_fits(tmp_path: Path): db_path = tmp_path / "jobs.sqlite" orch.initialize_registry(db_path) for n_value, threshold in [(1, 50.0), (2, 75.0), (3, 100.0)]: manifest = { "job_id": f"job-m1-n{n_value}", "project": "renunney", "track": "track1", "job_kind": "track1_locus_threshold", "priority": 1, "created_at": "2026-04-10T00:00:00Z", "created_by": "test", "worker_backend": "python-track1", "config": { "mode": "search", "K": 5000, "N0": 5000, "n": n_value, "u": 0.0001, }, "resources": {"cpu_cores": 1}, "result_paths": {"payload_json": f"payloads/job-m1-n{n_value}.json"}, "retry": {"max_attempts": 1, "idempotent": True}, "notes": "", } orch.submit_job_manifest(db_path, manifest) result = { "job_id": manifest["job_id"], "status": "succeeded", "worker_backend": "python-track1", "worker_host": "worker-a", "started_at": "2026-04-10T00:00:10Z", "finished_at": "2026-04-10T00:00:20Z", "wall_seconds": 10.0, "exit_code": 0, "config_hash": orch.hash_config(manifest["config"]), "code_identity": {"git_commit": None}, "artifacts": {}, "summary": { "mode": "search", "M": 1.0, "u": 0.0001, "n": n_value, "accepted": True, "threshold_T": threshold, "baseline_extinctions": 0, "check_1_02_extinctions": 0, "check_1_05_extinctions": 0, "check_1_10_extinctions": 0, }, "error": None, } orch.complete_job(db_path, result) payload = orch.collate_track1_figure1(db_path) assert payload["treatment_count"] == 1 treatment = payload["treatments"][0] assert treatment["M"] == 1.0 assert [row["n"] for row in treatment["rows"]] == [1, 2, 3] assert treatment["fit"] is not None assert treatment["fit"]["points_used"] == 3 def test_expand_track1_figure1_manifest_splits_by_locus(): config = { "mode": "loci_regression", "K": 5000, "N0": 5000, "n": 1, "u": 0.0001, "R": 10.0, "T": 500, "epochs": 8, "p": 0.5, "runs": 20, "jobs": 8, "t_values": [1, 2, 4, 6], "loci_values": [1, 3, 5], "seed": 10, } manifests = orch.expand_track1_figure1_manifest( base_job_id_prefix="fig1-m10", config=config, priority=7, created_by="test", ) assert [manifest["job_id"] for manifest in manifests] == [ "fig1-m10-n1", "fig1-m10-n3", "fig1-m10-n5", ] assert all(manifest["job_kind"] == "track1_locus_threshold" for manifest in manifests) assert all(manifest["config"]["mode"] == "search" for manifest in manifests) assert [manifest["config"]["n"] for manifest in manifests] == [1, 3, 5] assert all("loci_values" not in manifest["config"] for manifest in manifests) assert all(manifest["project"] == "renunney" for manifest in manifests) def test_submit_track1_figure1_jobs_registers_expanded_jobs(tmp_path: Path): db_path = tmp_path / "jobs.sqlite" orch.initialize_registry(db_path) config = { "mode": "loci_regression", "K": 5000, "N0": 5000, "n": 1, "u": 0.0001, "R": 10.0, "T": 500, "epochs": 8, "p": 0.5, "runs": 20, "jobs": 8, "t_values": [1, 2, 4], "loci_values": [2, 4], "seed": 10, } job_ids = orch.submit_track1_figure1_jobs( db_path=db_path, base_job_id_prefix="fig1-m10", config=config, ) assert job_ids == ["fig1-m10-n2", "fig1-m10-n4"] jobs = orch.list_jobs(db_path) assert len(jobs) == 2 assert {job["job_id"] for job in jobs} == set(job_ids) def test_run_worker_loop_processes_until_queue_empty(tmp_path: Path): db_path = tmp_path / "jobs.sqlite" result_root = tmp_path / "results" scratch_root = tmp_path / "scratch" orch.initialize_registry(db_path) for idx in range(2): manifest = { "job_id": f"job-sim-{idx}", "project": "renunney", "track": "track1", "job_kind": "track1_simulate", "priority": 1, "created_at": "2026-04-10T00:00:00Z", "created_by": "test", "worker_backend": "python-track1", "config": { "mode": "simulate", "K": 500, "N0": 500, "n": 1, "u": 0.001, "R": 10.0, "T": 10, "epochs": 1, "seed": idx + 1, }, "resources": {"cpu_cores": 1}, "result_paths": { "payload_json": f"payloads/job-sim-{idx}.json", "log_txt": f"logs/job-sim-{idx}.log", }, "retry": {"max_attempts": 1, "idempotent": True}, "notes": "", } orch.submit_job_manifest(db_path, manifest) payload = orch.run_worker_loop( db_path=db_path, result_root=result_root, worker_backend="python-track1", worker_host="worker-a", scratch_root=scratch_root, cwd=ROOT, ) assert payload["attempted_jobs"] == 2 assert payload["succeeded_jobs"] == 2 assert payload["failed_jobs"] == 0 assert payload["stopped_because"] == "queue_empty" jobs = orch.list_jobs(db_path) assert all(job["status"] == "succeeded" for job in jobs)