From 7a80bb352a0d68b00c3ac847abb48aaac596d76f Mon Sep 17 00:00:00 2001 From: welsberr Date: Mon, 27 Apr 2026 11:13:28 -0400 Subject: [PATCH] Use graph diagnostics in GroundRecall review queue --- src/groundrecall/groundrecall_review_queue.py | 44 ++++++++++++++++--- tests/test_groundrecall_import.py | 20 +++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/groundrecall/groundrecall_review_queue.py b/src/groundrecall/groundrecall_review_queue.py index 23196c9..e1040e0 100644 --- a/src/groundrecall/groundrecall_review_queue.py +++ b/src/groundrecall/groundrecall_review_queue.py @@ -20,15 +20,19 @@ def _read_jsonl(path: Path) -> list[dict[str, Any]]: return [json.loads(line) for line in text.splitlines()] -def _triage_lane(item: dict[str, Any], finding_codes: set[str]) -> str: +def _triage_lane(item: dict[str, Any], finding_codes: set[str], graph_codes: set[str] | None = None) -> str: + graph_codes = graph_codes or set() if {"claim_ungrounded", "ungrounded_summary"} & finding_codes: return "source_cleanup" + if {"bridge_concept", "isolated_concept", "small_component"} & graph_codes: + return "conflict_resolution" if {"relation_missing_source", "relation_missing_target", "orphan_concept"} & finding_codes: return "conflict_resolution" return "knowledge_capture" -def _priority(item: dict[str, Any], finding_codes: set[str]) -> int: +def _priority(item: dict[str, Any], finding_codes: set[str], graph_codes: set[str] | None = None) -> int: + graph_codes = graph_codes or set() priority = 50 if item.get("grounding_status") == "grounded": priority -= 10 @@ -37,6 +41,12 @@ def _priority(item: dict[str, Any], finding_codes: set[str]) -> int: if any(code.startswith("claim_") or code.startswith("relation_") for code in finding_codes): priority += 20 priority -= min(len(finding_codes) * 2, 10) + if "bridge_concept" in graph_codes: + priority -= 10 + if "isolated_concept" in graph_codes: + priority -= 6 + if "small_component" in graph_codes: + priority -= 4 return max(priority, 1) @@ -44,12 +54,14 @@ def build_review_queue(import_dir: str | Path) -> dict[str, Any]: base = Path(import_dir) manifest = _read_json(base / "manifest.json") lint_payload = _read_json(base / "lint_findings.json") + graph_payload = _read_json(base / "graph_diagnostics.json") claims = _read_jsonl(base / "claims.jsonl") concepts = _read_jsonl(base / "concepts.jsonl") findings_by_target: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) for finding in lint_payload.get("findings", []): findings_by_target[finding["target_id"]].append(finding) + graph_codes_by_concept = _graph_codes_by_concept(graph_payload) queue: list[dict[str, Any]] = [] @@ -74,7 +86,8 @@ def build_review_queue(import_dir: str | Path) -> dict[str, Any]: for concept in concepts: related = findings_by_target.get(concept["concept_id"], []) finding_codes = {item["code"] for item in related} - if not finding_codes: + graph_codes = graph_codes_by_concept.get(concept["concept_id"], set()) + if not finding_codes and not graph_codes: continue queue.append( { @@ -82,12 +95,13 @@ def build_review_queue(import_dir: str | Path) -> dict[str, Any]: "candidate_type": "concept", "candidate_id": concept["concept_id"], "title": concept["title"], - "triage_lane": _triage_lane(concept, finding_codes), - "priority": _priority(concept, finding_codes), + "triage_lane": _triage_lane(concept, finding_codes, graph_codes), + "priority": _priority(concept, finding_codes, graph_codes), "grounding_status": concept.get("grounding_status", "triaged"), "status": "needs_review", - "finding_codes": sorted(finding_codes), + "finding_codes": sorted(finding_codes | graph_codes), "concept_ids": [concept["concept_id"]], + "graph_codes": sorted(graph_codes), } ) @@ -99,6 +113,24 @@ def build_review_queue(import_dir: str | Path) -> dict[str, Any]: } +def _graph_codes_by_concept(graph_payload: dict[str, Any]) -> dict[str, set[str]]: + codes: defaultdict[str, set[str]] = defaultdict(set) + components = graph_payload.get("components", []) + for component in components: + concept_ids = [str(item) for item in component.get("concept_ids", [])] + size = int(component.get("size", len(concept_ids))) + if size == 1 and concept_ids: + codes[concept_ids[0]].add("isolated_concept") + elif 1 < size <= 2: + for concept_id in concept_ids: + codes[concept_id].add("small_component") + for bridge in graph_payload.get("bridge_concepts", []): + concept_id = str(bridge.get("concept_id", "")) + if concept_id: + codes[concept_id].add("bridge_concept") + return codes + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Build a GroundRecall review queue from import artifacts.") parser.add_argument("import_dir") diff --git a/tests/test_groundrecall_import.py b/tests/test_groundrecall_import.py index 1e278d8..117a591 100644 --- a/tests/test_groundrecall_import.py +++ b/tests/test_groundrecall_import.py @@ -74,6 +74,7 @@ def test_groundrecall_import_emits_normalized_artifacts(tmp_path: Path) -> None: review_queue = json.loads((result.out_dir / "review_queue.json").read_text(encoding="utf-8")) assert review_queue["queue_length"] >= 1 assert any(item["candidate_type"] == "claim" for item in review_queue["items"]) + assert any(item["candidate_type"] == "concept" for item in review_queue["items"]) review_session = json.loads((result.out_dir / "review_session.json").read_text(encoding="utf-8")) assert review_session["reviewer"] == "GroundRecall Import" assert review_session["draft_pack"]["pack"]["source_import_id"] == "import-test" @@ -151,6 +152,25 @@ def test_graph_diagnostics_detect_bridge_concepts() -> None: assert [item["concept_id"] for item in diagnostics["bridge_concepts"]] == ["concept::b", "concept::c"] +def test_review_queue_uses_graph_diagnostics_for_concept_triage(tmp_path: Path) -> None: + root = tmp_path / "llmwiki" + (root / "wiki").mkdir(parents=True) + (root / "wiki" / "a.md").write_text("# A\n\nSee also [[B]].\n", encoding="utf-8") + (root / "wiki" / "b.md").write_text("# B\n\nSee also [[C]].\n", encoding="utf-8") + (root / "wiki" / "c.md").write_text("# C\n", encoding="utf-8") + (root / "wiki" / "isolated.md").write_text("# Isolated\n", encoding="utf-8") + + result = run_groundrecall_import(root, mode="quick", import_id="graph-queue-test") + review_queue = json.loads((result.out_dir / "review_queue.json").read_text(encoding="utf-8")) + concept_items = {item["candidate_id"]: item for item in review_queue["items"] if item["candidate_type"] == "concept"} + + assert concept_items["concept::b"]["triage_lane"] == "conflict_resolution" + assert "bridge_concept" in concept_items["concept::b"]["graph_codes"] + assert concept_items["concept::isolated"]["triage_lane"] == "conflict_resolution" + assert "isolated_concept" in concept_items["concept::isolated"]["graph_codes"] + assert concept_items["concept::b"]["priority"] < concept_items["concept::isolated"]["priority"] + + def test_groundrecall_import_parses_explicit_claim_relations(tmp_path: Path) -> None: root = tmp_path / "llmwiki" (root / "wiki").mkdir(parents=True)