Add notebook promotion pipeline

This commit is contained in:
welsberr 2026-05-11 07:00:03 -04:00
parent 44954a4ec9
commit 6ac95a37f2
3 changed files with 341 additions and 0 deletions

View File

@ -11,6 +11,7 @@ from .augmentation_bundle_probe import write_probe_report
from .archive_phrase_inventory import write_archive_phrase_inventory_report
from .first_ring_batch_promotion import run_first_ring_batch_promotion
from .hub_bundle_rebuild import rebuild_hub_bundle_from_binding
from .notebook_promotion_pipeline import run_notebook_promotion_pipeline
from .notebook_page import export_notebook_page_from_groundrecall_bundle
from .notebook_page import export_notebook_page_from_groundrecall_store
from .review_loader import load_draft_pack
@ -100,6 +101,19 @@ def build_parser() -> argparse.ArgumentParser:
help="Rebuild a hub bundle support layer from the bundle paths listed in a hub binding manifest",
)
hub_rebuild_parser.add_argument("binding_path")
pipeline_parser = subparsers.add_parser(
"notebook-promotion-pipeline",
help="Run the Notebook phrase-inventory, batch-promotion, and hub-rebuild loop and write a comparison report",
)
pipeline_parser.add_argument("binding_path")
pipeline_parser.add_argument("manifest_path")
pipeline_parser.add_argument("canonical_dir")
pipeline_parser.add_argument("output_path")
pipeline_parser.add_argument("--phrase-inventory-output")
pipeline_parser.add_argument("--phrase-input", action="append", default=[])
pipeline_parser.add_argument("--seed-term", action="append", default=[])
pipeline_parser.add_argument("--top-n", type=int, default=50)
return parser
@ -216,4 +230,17 @@ def main() -> None:
summary = rebuild_hub_bundle_from_binding(args.binding_path)
print(summary)
return
if args.command == "notebook-promotion-pipeline":
summary = run_notebook_promotion_pipeline(
binding_path=args.binding_path,
manifest_path=args.manifest_path,
canonical_dir=args.canonical_dir,
output_path=args.output_path,
phrase_inventory_output=args.phrase_inventory_output,
phrase_inputs=args.phrase_input,
seed_terms=args.seed_term,
top_n=args.top_n,
)
print(summary)
return
build_parser().print_help()

View File

@ -0,0 +1,142 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from .archive_phrase_inventory import write_archive_phrase_inventory_report
from .first_ring_batch_promotion import run_first_ring_batch_promotion
from .hub_bundle_rebuild import rebuild_hub_bundle_from_binding
def _load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def _hub_metrics(binding_path: str | Path) -> dict[str, Any]:
binding_file = Path(binding_path)
binding = _load_json(binding_file)
hub_path = (binding_file.parent / binding["primary_artifacts"]["groundrecall_query_bundle"]).resolve()
page_path = (binding_file.parent / binding["primary_artifacts"]["notebook_page"]).resolve()
hub = _load_json(hub_path) if hub_path.exists() else {}
page = _load_json(page_path) if page_path.exists() else {}
return {
"hub_bundle_path": str(hub_path),
"notebook_page_path": str(page_path),
"hub": {
"claim_count": len(hub.get("relevant_claims", []) or []),
"supporting_observation_count": len(hub.get("supporting_observations", []) or []),
"related_concept_count": len(hub.get("related_concepts", []) or []),
"source_artifact_count": len(hub.get("source_artifacts", []) or []),
"source_role_summary": hub.get("source_role_summary", {}) or {},
"distinction_count": len(hub.get("key_distinctions", []) or []),
},
"page_summary": page.get("summary", {}) or {},
}
def _delta(before: dict[str, Any], after: dict[str, Any]) -> dict[str, Any]:
out: dict[str, Any] = {}
for key in (
"claim_count",
"supporting_observation_count",
"related_concept_count",
"source_artifact_count",
"distinction_count",
):
out[key] = (after.get(key) or 0) - (before.get(key) or 0)
return out
def run_notebook_promotion_pipeline(
*,
binding_path: str | Path,
manifest_path: str | Path,
canonical_dir: str | Path,
output_path: str | Path,
phrase_inventory_output: str | Path | None = None,
phrase_inputs: list[str | Path] | None = None,
seed_terms: list[str] | None = None,
top_n: int = 50,
) -> dict[str, Any]:
before = _hub_metrics(binding_path)
phrase_summary: dict[str, Any] | None = None
if phrase_inventory_output and phrase_inputs:
phrase_summary = write_archive_phrase_inventory_report(
phrase_inputs,
phrase_inventory_output,
seed_terms=seed_terms or [],
top_n=top_n,
)
batch_summary = run_first_ring_batch_promotion(manifest_path, canonical_dir)
rebuild_summary = rebuild_hub_bundle_from_binding(binding_path)
after = _hub_metrics(binding_path)
generated = batch_summary.get("generated", []) or []
weak_nodes = [item for item in generated if (item.get("claim_count") or 0) < 2]
strong_nodes = [item for item in generated if (item.get("claim_count") or 0) >= 2]
report = {
"binding_path": str(Path(binding_path)),
"manifest_path": str(Path(manifest_path)),
"canonical_dir": str(Path(canonical_dir)),
"phrase_inventory": phrase_summary,
"batch_promotion": {
"report_path": batch_summary.get("report_path"),
"generated_count": batch_summary.get("generated_count", len(generated)),
"strong_node_count": len(strong_nodes),
"weak_node_count": len(weak_nodes),
"weak_nodes": [
{
"concept": item.get("concept"),
"claim_count": item.get("claim_count"),
"status": item.get("status"),
}
for item in weak_nodes
],
},
"hub_rebuild": rebuild_summary,
"before": before,
"after": after,
"delta": {
"hub": _delta(before.get("hub", {}) or {}, after.get("hub", {}) or {}),
},
}
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(report, indent=2), encoding="utf-8")
md_path = out.with_suffix(".md")
lines = [
"# Notebook Promotion Pipeline",
"",
f"- binding: `{report['binding_path']}`",
f"- manifest: `{report['manifest_path']}`",
f"- batch generated: `{report['batch_promotion']['generated_count']}`",
f"- strong nodes: `{report['batch_promotion']['strong_node_count']}`",
f"- weak nodes: `{report['batch_promotion']['weak_node_count']}`",
"",
"## Hub Delta",
]
for key, value in report["delta"]["hub"].items():
lines.append(f"- `{key}`: `{value:+d}`")
if report["batch_promotion"]["weak_nodes"]:
lines.extend(["", "## Weak Nodes"])
for item in report["batch_promotion"]["weak_nodes"]:
lines.append(
f"- `{item['concept']}` claims=`{item['claim_count']}` status=`{item['status']}`"
)
if phrase_summary:
lines.extend(
[
"",
"## Phrase Inventory",
f"- report: `{phrase_summary['report_path']}`",
f"- documents: `{phrase_summary['summary']['document_count']}`",
f"- prioritized concepts: `{phrase_summary['summary']['distinct_phrase_count']}`",
]
)
md_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return {"report_path": str(out), "markdown_path": str(md_path), "report": report}

View File

@ -0,0 +1,172 @@
from __future__ import annotations
import json
from pathlib import Path
from didactopus.notebook_promotion_pipeline import run_notebook_promotion_pipeline
def test_notebook_promotion_pipeline_runs_end_to_end(tmp_path: Path) -> None:
pilot = tmp_path / "pilot"
docs_dir = pilot / "normalized" / "seed-bundle" / "documents" / "source-one"
docs_dir.mkdir(parents=True)
(docs_dir / "document.md").write_text(
"# Source One\n\nNatural selection can occur without leading to evolution if traits are not inherited. "
"Evolution is a change in the gene pool of a population over time.\n",
encoding="utf-8",
)
export_dir = pilot / "groundrecall" / "export" / "canonical"
export_dir.mkdir(parents=True)
notebook_dir = pilot / "didactopus" / "notebook-page"
notebook_dir.mkdir(parents=True)
hub = {
"bundle_kind": "groundrecall_query_bundle",
"query_type": "concept",
"concept": {
"concept_id": "concept::hub",
"title": "Hub",
"aliases": [],
"description": "Hub concept",
"source_artifact_ids": ["ia_hub"],
"current_status": "reviewed",
},
"relevant_claims": [{"claim_id": "hc1", "claim_text": "Hub claim."}],
"relations": [],
"supporting_observations": [],
"source_artifacts": [],
"related_concepts": [],
"review_candidates": [],
"suggested_next_actions": [],
"bundle_notes": [],
}
source_bundle = {
"bundle_kind": "groundrecall_query_bundle",
"query_type": "concept",
"concept": {
"concept_id": "concept::source",
"title": "Source Concept",
"aliases": [],
"description": "Source concept",
"source_artifact_ids": ["ia_src"],
"current_status": "reviewed",
},
"relevant_claims": [
{
"claim_id": "c1",
"claim_text": "Evolution is a change in the gene pool of a population over time.",
"source_observation_ids": ["o1"],
"metadata": {},
},
{
"claim_id": "c2",
"claim_text": "Natural selection can occur without leading to evolution if traits are not inherited.",
"source_observation_ids": ["o2"],
"metadata": {},
},
],
"relations": [],
"supporting_observations": [
{
"observation_id": "o1",
"artifact_id": "ia_src",
"text": "Evolution is a change in the gene pool of a population over time.",
"role": "claim",
"origin_path": "documents/source-one/document.md",
"grounding_status": "grounded",
},
{
"observation_id": "o2",
"artifact_id": "ia_src",
"text": "Natural selection can occur without leading to evolution if traits are not inherited.",
"role": "claim",
"origin_path": "documents/source-one/document.md",
"grounding_status": "grounded",
},
],
"source_artifacts": [
{
"artifact_id": "ia_src",
"artifact_kind": "doclift_bundle_artifact",
"title": "document",
"path": "documents/source-one/document.md",
"current_status": "reviewed",
}
],
"related_concepts": [],
}
placeholder = {
"bundle_kind": "groundrecall_query_bundle",
"query_type": "concept",
"concept": {
"concept_id": "concept::gene-pool",
"title": "Gene Pool",
"aliases": [],
"description": "Placeholder",
"source_artifact_ids": [],
"current_status": "reviewed",
},
"relevant_claims": [],
"relations": [],
"supporting_observations": [],
"source_artifacts": [],
"related_concepts": [],
"review_candidates": [],
"suggested_next_actions": [],
"bundle_notes": [],
}
(export_dir / "groundrecall_query_bundle__hub.json").write_text(json.dumps(hub), encoding="utf-8")
(export_dir / "query_bundle__source.json").write_text(json.dumps(source_bundle), encoding="utf-8")
(export_dir / "query_bundle__gene-pool.json").write_text(json.dumps(placeholder), encoding="utf-8")
(notebook_dir / "notebook_page__hub.json").write_text(json.dumps({"concept": {"concept_id": "concept::hub"}, "summary": {}}))
binding = {
"primary_artifacts": {
"groundrecall_query_bundle": "../../groundrecall/export/canonical/groundrecall_query_bundle__hub.json",
"notebook_page": "./notebook_page__hub.json",
},
"supporting_artifacts": {
"gene_pool_bundle": "../../groundrecall/export/canonical/query_bundle__gene-pool.json",
},
}
binding_path = notebook_dir / "binding.json"
binding_path.write_text(json.dumps(binding), encoding="utf-8")
manifest = pilot / "manifests" / "first-ring-promotion-batch.yaml"
manifest.parent.mkdir(parents=True)
manifest.write_text(
"""
promotion_priority:
tier_3:
- concept: gene-pool
label: Gene Pool
compose_from:
bundle_refs:
- query_bundle__source.json
keyword_phrases:
- gene pool
""",
encoding="utf-8",
)
report_path = pilot / "reports" / "pipeline.json"
phrase_path = pilot / "reports" / "phrases.json"
result = run_notebook_promotion_pipeline(
binding_path=binding_path,
manifest_path=manifest,
canonical_dir=export_dir,
output_path=report_path,
phrase_inventory_output=phrase_path,
phrase_inputs=[pilot / "normalized" / "seed-bundle"],
seed_terms=["gene pool", "natural selection"],
top_n=10,
)
report = json.loads(report_path.read_text(encoding="utf-8"))
rebuilt_bundle = json.loads((export_dir / "query_bundle__gene-pool.json").read_text(encoding="utf-8"))
assert result["report_path"] == str(report_path)
assert phrase_path.exists()
assert report["batch_promotion"]["weak_node_count"] == 1
assert report["delta"]["hub"]["related_concept_count"] == 1
assert len(rebuilt_bundle["relevant_claims"]) == 1