Expose source roles and distinctions in review export

This commit is contained in:
welsberr 2026-05-08 14:26:48 -04:00
parent ddbd5dbf2a
commit 97e27946da
2 changed files with 99 additions and 2 deletions

View File

@ -300,10 +300,20 @@ def _claim_secondary_products(claim: dict[str, Any]) -> dict[str, Any]:
if text:
if re.search(r"\b(is|are|means|refers to|defined as|describes)\b", lowered):
definition_candidate = True
if re.search(r"\b(however|although|but|except|unless|only if|in some cases|under some conditions|may not|does not always)\b", lowered):
if re.search(
r"\b(however|although|but|except|unless|only if|in some cases|under some conditions|may not|does not always|not all|not every|typically|generally|often|sometimes|in general|in most cases|under these conditions|under those conditions|can occur without|may occur without)\b",
lowered,
):
qualification_candidate = True
if re.search(r"\b(must|requires|required|cannot|depends on|limited to|constraint|scope)\b", lowered):
if re.search(
r"\b(must|requires|required|cannot|depends on|limited to|constraint|scope|only when|only if|provided that|without|fails to|will not|does not lead to|does not cause|not sufficient|insufficient)\b",
lowered,
):
constraint_candidate = True
if " if " in lowered and " then " in lowered:
constraint_candidate = True
if " not " in lowered and any(token in lowered for token in ("lead to", "cause", "produce", "result in", "evolutionary change")):
qualification_candidate = True
if claim_kind in {"quote", "quotation"}:
quote_candidate = True
elif re.search(r"[\"“”]", text) and len(text) >= 40:
@ -330,6 +340,52 @@ def _claim_secondary_products(claim: dict[str, Any]) -> dict[str, Any]:
}
def _artifact_source_role(artifact: dict[str, Any]) -> str:
metadata = artifact.get("metadata", {}) if isinstance(artifact.get("metadata"), dict) else {}
explicit = str(metadata.get("source_role", "") or metadata.get("source_role_hint", "")).strip().lower()
if explicit:
return explicit
title = str(artifact.get("title", "")).lower()
path = str(artifact.get("path", "")).lower()
corpus = str(metadata.get("corpus", "")).lower()
document_kind = str(metadata.get("document_kind", "")).lower()
joined = " ".join(part for part in (title, path, corpus, document_kind) if part)
if any(token in joined for token in ("pandasthumb", "indexcc", "talkorigins", "rebuttal", "argument", "critique")):
return "argumentation"
if any(token in joined for token in ("controvers", "debate", "dispute", "polemic")):
return "controversy"
if any(token in joined for token in ("mechanism", "model", "testing", "test", "process", "rate")):
return "mechanism"
if any(token in joined for token in ("plasticity", "epigenetic", "drift", "qualification", "constraint", "nuance")):
return "nuance"
return "overview"
def _claim_distinction_payload(claim: dict[str, Any]) -> dict[str, Any] | None:
text = str(claim.get("claim_text", "")).strip()
lowered = text.lower()
if not text:
return None
patterns = [
("non_implication", r"\bdoes not imply\b", "does not imply"),
("decoupling", r"\b(can|may)\s+occur\s+without\b", "can or may occur without"),
("contrast", r"\bversus\b|\bvs\.\b|\bvs\b", "versus"),
("contrast", r"\brather than\b", "rather than"),
("contrast", r"\bdifferent from\b|\bdistinguish(?:ed)? from\b", "different from"),
("contrast", r"\bnot\b.+\bbut\b", "not ... but"),
]
for distinction_type, pattern, cue in patterns:
if re.search(pattern, lowered):
return {
"claim_id": claim.get("claim_id", ""),
"distinction_type": distinction_type,
"cue": cue,
"text": text,
}
return None
def build_citation_review_entries_from_import(import_dir: str | Path) -> list[CitationReviewEntry]:
base = Path(import_dir)
manifest = _read_json(base / "manifest.json")
@ -437,6 +493,7 @@ def _build_import_review_payload(session: ReviewSession, import_dir: Path) -> di
bibliography_root=resolved_bibliography_root,
)
artifact_by_id = {item["artifact_id"]: item for item in artifacts}
artifact_role_by_id = {item["artifact_id"]: _artifact_source_role(item) for item in artifacts if item.get("artifact_id")}
queue_by_candidate_id = {
str(item.get("candidate_id", "")): item
for item in queue_payload.get("items", [])
@ -455,10 +512,12 @@ def _build_import_review_payload(session: ReviewSession, import_dir: Path) -> di
for claim in concept_claims[:25]:
supporting_observations = [observations_by_id[item] for item in claim.get("source_observation_ids", []) if item in observations_by_id]
artifact_ids = {item["artifact_id"] for item in supporting_observations}
source_roles = sorted({artifact_role_by_id.get(artifact_id, "") for artifact_id in artifact_ids if artifact_role_by_id.get(artifact_id, "")})
citation_support = [artifact_citation_summary.get(artifact_id, {}) for artifact_id in artifact_ids]
has_citation_support = has_citation_support or any(item.get("has_citation_support") for item in citation_support)
analysis = _claim_analysis_metadata(claim)
secondary = _claim_secondary_products(claim)
distinction = _claim_distinction_payload(claim)
lane_counts[analysis["analysis_lane"]] += 1
for label in secondary["secondary_labels"]:
secondary_counts[label] += 1
@ -490,15 +549,19 @@ def _build_import_review_payload(session: ReviewSession, import_dir: Path) -> di
"constraint_candidate": secondary["constraint_candidate"],
"quote_candidate": secondary["quote_candidate"],
"secondary_labels": secondary["secondary_labels"],
"source_roles": source_roles,
"distinction": distinction,
"grounding_status": claim.get("grounding_status", "unknown"),
"supporting_observations": [
{
"observation_id": obs["observation_id"],
"artifact_id": obs.get("artifact_id", ""),
"origin_path": obs.get("origin_path", ""),
"origin_section": obs.get("origin_section", ""),
"text": obs.get("text", ""),
"line_start": obs.get("line_start", 0),
"line_end": obs.get("line_end", 0),
"source_role": artifact_role_by_id.get(obs.get("artifact_id", ""), ""),
}
for obs in supporting_observations
],
@ -529,6 +592,18 @@ def _build_import_review_payload(session: ReviewSession, import_dir: Path) -> di
"finding_codes": list(queue_entry.get("finding_codes", [])),
"graph_codes": list(queue_entry.get("graph_codes", [])),
"analysis_lanes": dict(sorted(lane_counts.items())),
"source_role_summary": dict(
sorted(
(
role,
sum(1 for claim_payload in claim_payloads if role in (claim_payload.get("source_roles", []) or [])),
)
for role in sorted({role for claim_payload in claim_payloads for role in (claim_payload.get("source_roles", []) or [])})
)
),
"key_distinctions": [
item["distinction"] for item in claim_payloads if isinstance(item.get("distinction"), dict)
][:6],
"secondary_products": dict(sorted(secondary_counts.items())),
"top_claims": claim_payloads,
"notes": list(concept.notes),

View File

@ -136,11 +136,33 @@ def test_review_workspace_surfaces_local_bibliography_support_suggestions(tmp_pa
concept_review = next(item for item in payload["concept_reviews"] if item["concept_id"] == "drift")
suggestions = concept_review["top_claims"][0]["support_suggestions"]
assert concept_review["analysis_lanes"]["empirical"] >= 1
assert concept_review["source_role_summary"]
assert suggestions
assert suggestions[0]["citation_key"] == "kimura1968evolutionary"
assert "abstract" in suggestions[0]["reason"].lower() or "title" in suggestions[0]["reason"].lower()
def test_review_workspace_surfaces_source_roles_and_distinctions(tmp_path: Path) -> None:
root = tmp_path / "llmwiki"
(root / "wiki").mkdir(parents=True)
(root / "wiki" / "selection.md").write_text(
"# Selection\n\n"
"- Natural selection does not imply adaptation.\n",
encoding="utf-8",
)
import_result = run_groundrecall_import(root, out_root=tmp_path / "imports", mode="quick", import_id="distinction-review")
workspace = GroundRecallReviewWorkspace(import_result.out_dir)
payload = workspace.load_review_data()
concept_review = next(item for item in payload["concept_reviews"] if item["concept_id"] == "selection")
claim = concept_review["top_claims"][0]
assert claim["source_roles"] == ["overview"]
assert claim["distinction"]["distinction_type"] == "non_implication"
assert claim["supporting_observations"][0]["source_role"] == "overview"
assert concept_review["key_distinctions"][0]["distinction_type"] == "non_implication"
def test_review_workspace_can_use_separate_bibliography_root(tmp_path: Path) -> None:
root = tmp_path / "pilot"
source_root = root / "source"