Apply ZIP update: 105-didactopus-coverage-alignment-update.zip [2026-03-14T13:18:56]

This commit is contained in:
welsberr 2026-03-14 13:29:55 -04:00
parent f3ea3da848
commit ac4c975100
29 changed files with 385 additions and 580 deletions

View File

@ -1,7 +1,25 @@
concepts: concepts:
- id: duplicate - id: c1
title: First title: Foundations
description: Tiny. description: Broad foundations topic with many ideas.
- id: duplicate prerequisites: []
title: Second mastery_signals:
description: Tiny. - Explain core foundations.
- id: c2
title: Methods
description: Methods concept with sparse explicit assessment.
prerequisites: [c1]
mastery_signals:
- Use methods appropriately.
- id: c3
title: Advanced Inference
description: Advanced inference topic.
prerequisites: [c1, c2]
mastery_signals:
- Critique advanced inference.
- id: c4
title: Detached Topic
description: Detached topic with no assessment coverage.
prerequisites: []
mastery_signals:
- Explain detached topic.

View File

@ -1,5 +1,6 @@
projects: projects:
- id: bad-project - id: narrow-project
title: Bad Project title: Final Memo
prerequisites: prerequisites: [c1]
- missing-concept deliverables:
- brief memo

View File

@ -1,5 +1,9 @@
stages: stages:
- id: stage-1 - id: stage-1
title: Bad Stage title: Start
concepts: concepts: [c1, c2, c3]
- missing-concept checkpoint: []
- id: stage-2
title: Tiny Bridge
concepts: [c4]
checkpoint: []

View File

@ -1,4 +1,4 @@
rubrics: rubrics:
- id: - id: r1
title: Broken Rubric title: Basic
criteria: invalid criteria: [style, formatting]

View File

@ -1,6 +1,5 @@
review: review:
default_reviewer: "Wesley R. Elsberry" default_reviewer: "Wesley R. Elsberry"
write_promoted_pack: true
bridge: bridge:
host: "127.0.0.1" host: "127.0.0.1"
port: 8765 port: 8765

View File

@ -1,32 +1,3 @@
# FAQ # FAQ
## Why add a full pack validator? This layer does not prove pedagogical adequacy. It is a heuristic signal layer for likely misalignments.
Because import safety is not only about whether files exist. It is also about
whether the pack makes sense as a Didactopus artifact set.
## How does this help with the activation-energy problem?
It reduces uncertainty at a crucial point. Users can see whether a generated pack
is coherent enough to work with before losing momentum in manual debugging.
## What does it validate?
In this scaffold it validates:
- required files
- YAML parsing
- metadata presence
- duplicate concept ids
- roadmap references
- project prerequisite references
- rubric structure
- weak concept entries
## Does validation guarantee quality?
No. It checks structural coherence, not whether the pack is the best possible
representation of a domain.
## Where are validation results shown?
They are included in import preview results and surfaced in the UI.

View File

@ -1,14 +1,19 @@
concepts: concepts:
- id: bayes-prior - id: bayes-prior
title: Bayes Prior title: Bayes Prior
description: Prior beliefs before evidence. description: Prior beliefs before evidence in a probabilistic model.
prerequisites: [] prerequisites: []
mastery_signals: mastery_signals:
- Explain a prior distribution. - Explain a prior distribution clearly.
- id: bayes-posterior - id: bayes-posterior
title: Bayes Posterior title: Bayes Posterior
description: Updated beliefs after evidence. description: Updated beliefs after evidence in a probabilistic model.
prerequisites: prerequisites: [bayes-prior]
- bayes-prior
mastery_signals: mastery_signals:
- Compare prior and posterior beliefs. - Compare prior and posterior beliefs.
- id: model-checking
title: Model Checking
description: Evaluate whether model assumptions and fit remain plausible.
prerequisites: [bayes-posterior]
mastery_signals:
- Critique a model fit.

View File

@ -1,8 +1,7 @@
projects: projects:
- id: compare-beliefs - id: culminating-analysis
title: Compare Prior and Posterior title: Final Model Critique
prerequisites: prerequisites: [bayes-prior, bayes-posterior, model-checking]
- bayes-prior
- bayes-posterior
deliverables: deliverables:
- short report - short critique report
- explanation of prior and posterior updates

View File

@ -1,8 +1,16 @@
stages: stages:
- id: stage-1 - id: stage-1
title: Bayes Basics title: Prior Beliefs
concepts: concepts: [bayes-prior]
- bayes-prior checkpoint:
- bayes-posterior - Explain a prior distribution.
- id: stage-2
title: Posterior Updating
concepts: [bayes-posterior]
checkpoint: checkpoint:
- Compare prior and posterior beliefs. - Compare prior and posterior beliefs.
- id: stage-3
title: Model Checking
concepts: [model-checking]
checkpoint:
- Critique a model fit.

View File

@ -1,6 +1,4 @@
rubrics: rubrics:
- id: basic-rubric - id: r1
title: Basic Rubric title: Basic
criteria: criteria: [correctness, explanation, critique]
- correctness
- explanation

View File

@ -5,15 +5,13 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "didactopus" name = "didactopus"
version = "0.1.0" version = "0.1.0"
description = "Didactopus: full pack validation layer" description = "Didactopus: coverage and alignment analysis"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.10"
license = {text = "MIT"}
authors = [{name = "Wesley R. Elsberry"}]
dependencies = ["pydantic>=2.7", "pyyaml>=6.0"] dependencies = ["pydantic>=2.7", "pyyaml>=6.0"]
[project.optional-dependencies] [project.optional-dependencies]
dev = ["pytest>=8.0", "ruff>=0.6"] dev = ["pytest>=8.0"]
[project.scripts] [project.scripts]
didactopus-review-bridge = "didactopus.review_bridge_server:main" didactopus-review-bridge = "didactopus.review_bridge_server:main"

View File

@ -1 +1 @@
__version__ = "0.1.0" __version__ = '0.1.0'

View File

@ -4,7 +4,6 @@ import yaml
class ReviewConfig(BaseModel): class ReviewConfig(BaseModel):
default_reviewer: str = "Unknown Reviewer" default_reviewer: str = "Unknown Reviewer"
write_promoted_pack: bool = True
class BridgeConfig(BaseModel): class BridgeConfig(BaseModel):
host: str = "127.0.0.1" host: str = "127.0.0.1"
@ -18,5 +17,4 @@ class AppConfig(BaseModel):
def load_config(path: str | Path) -> AppConfig: def load_config(path: str | Path) -> AppConfig:
with open(path, "r", encoding="utf-8") as handle: with open(path, "r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {} return AppConfig.model_validate(yaml.safe_load(handle) or {})
return AppConfig.model_validate(data)

View File

@ -1,2 +1,78 @@
import re
from .pack_validator import load_pack_artifacts
def tokenize(text: str) -> set[str]:
return {t for t in re.sub(r"[^a-z0-9]+", " ", str(text).lower()).split() if t}
def _concept_title_tokens(title: str) -> set[str]:
stop = {"the","of","and","to","for","in","on","a","an"}
return {t for t in tokenize(title) if t not in stop}
def coverage_alignment_for_pack(source_dir): def coverage_alignment_for_pack(source_dir):
return {'warnings': [], 'summary': {'coverage_warning_count': 0}} loaded = load_pack_artifacts(source_dir)
if not loaded["ok"]:
return {"warnings": [], "summary": {"coverage_warning_count": 0}}
concepts = loaded["artifacts"]["concepts"].get("concepts", []) or []
roadmap = loaded["artifacts"]["roadmap"].get("stages", []) or []
projects = loaded["artifacts"]["projects"].get("projects", []) or []
rubrics = loaded["artifacts"]["rubrics"].get("rubrics", []) or []
concept_by_id = {c.get("id"): c for c in concepts if c.get("id")}
roadmap_ids = {cid for stage in roadmap for cid in (stage.get("concepts", []) or [])}
checkpoint_tokens = tokenize(" ".join(str(item) for stage in roadmap for item in (stage.get("checkpoint", []) or [])))
project_ids = {cid for project in projects for cid in (project.get("prerequisites", []) or [])}
deliverable_tokens = tokenize(" ".join(str(item) for project in projects for item in (project.get("deliverables", []) or [])))
checkpoint_ids = set()
assessed_ids = set(project_ids)
warnings = []
for cid, concept in concept_by_id.items():
title_tokens = _concept_title_tokens(concept.get("title", ""))
if cid not in roadmap_ids:
warnings.append(f"Concept '{cid}' does not appear in any roadmap stage.")
if title_tokens and (title_tokens & checkpoint_tokens):
checkpoint_ids.add(cid)
else:
warnings.append(f"Concept '{cid}' is not reflected in checkpoint language.")
if cid not in project_ids:
warnings.append(f"Concept '{cid}' is not referenced by any project prerequisites.")
if cid in project_ids or cid in checkpoint_ids:
assessed_ids.add(cid)
else:
warnings.append(f"Concept '{cid}' is never covered by checkpoints or projects.")
for cid, concept in concept_by_id.items():
for signal in concept.get("mastery_signals", []) or []:
signal_tokens = tokenize(signal)
if signal_tokens and not ((signal_tokens & checkpoint_tokens) or (signal_tokens & deliverable_tokens)):
warnings.append(f"Mastery signal for concept '{cid}' is not reflected in checkpoints or project deliverables.")
rubric_tokens = set()
for rubric in rubrics:
for criterion in rubric.get("criteria", []) or []:
rubric_tokens |= tokenize(criterion)
project_and_signal_tokens = set(deliverable_tokens)
for concept in concept_by_id.values():
for signal in concept.get("mastery_signals", []) or []:
project_and_signal_tokens |= tokenize(signal)
if rubric_tokens and len(rubric_tokens & project_and_signal_tokens) == 0:
warnings.append("Rubric criteria show weak lexical overlap with mastery signals and project deliverables.")
concept_count = max(1, len(concept_by_id))
if projects and len(project_ids) <= max(1, concept_count // 4):
warnings.append("Projects appear to cover only a narrow subset of the concept set.")
return {
"warnings": warnings,
"summary": {
"coverage_warning_count": len(warnings),
"concept_count": len(concept_by_id),
"roadmap_covered_count": len(roadmap_ids & set(concept_by_id)),
"checkpoint_covered_count": len(checkpoint_ids),
"project_covered_count": len(project_ids & set(concept_by_id)),
"assessed_concept_count": len(assessed_ids),
},
}

View File

@ -1,91 +1,4 @@
from __future__ import annotations
from collections import defaultdict, deque
from .pack_validator import load_pack_artifacts from .pack_validator import load_pack_artifacts
def graph_qa_for_pack(source_dir):
def graph_qa_for_pack(source_dir) -> dict:
loaded = load_pack_artifacts(source_dir) loaded = load_pack_artifacts(source_dir)
if not loaded["ok"]: return {"warnings": [], "summary": {"graph_warning_count": 0}} if loaded["ok"] else {"warnings": [], "summary": {"graph_warning_count": 0}}
return {"warnings": [], "summary": {"graph_warning_count": 0}}
concepts = loaded["artifacts"]["concepts"].get("concepts", []) or []
concept_ids = [c.get("id") for c in concepts if c.get("id")]
prereqs = {c.get("id"): list(c.get("prerequisites", []) or []) for c in concepts if c.get("id")}
incoming = defaultdict(set)
outgoing = defaultdict(set)
for cid, pres in prereqs.items():
for p in pres:
outgoing[p].add(cid)
incoming[cid].add(p)
warnings = []
# Cycle detection
WHITE, GRAY, BLACK = 0, 1, 2
color = {cid: WHITE for cid in concept_ids}
stack = []
found_cycles = []
def dfs(node):
color[node] = GRAY
stack.append(node)
for nxt in outgoing.get(node, []):
if color.get(nxt, WHITE) == WHITE:
dfs(nxt)
elif color.get(nxt) == GRAY:
if nxt in stack:
idx = stack.index(nxt)
found_cycles.append(stack[idx:] + [nxt])
stack.pop()
color[node] = BLACK
for cid in concept_ids:
if color[cid] == WHITE:
dfs(cid)
for cyc in found_cycles:
warnings.append("Prerequisite cycle detected: " + " -> ".join(cyc))
# Isolated concepts
for cid in concept_ids:
if len(incoming[cid]) == 0 and len(outgoing[cid]) == 0:
warnings.append(f"Concept '{cid}' is isolated from the prerequisite graph.")
# Bottlenecks
threshold = 3
for cid in concept_ids:
if len(outgoing[cid]) >= threshold:
warnings.append(f"Concept '{cid}' is a bottleneck with {len(outgoing[cid])} downstream dependents.")
# Flatness
edge_count = sum(len(v) for v in prereqs.values())
if len(concept_ids) >= 4 and edge_count <= max(1, len(concept_ids) // 4):
warnings.append("Pack appears suspiciously flat: very few prerequisite edges relative to concept count.")
# Deep chains
indegree = {cid: len(incoming[cid]) for cid in concept_ids}
q = deque([cid for cid in concept_ids if indegree[cid] == 0])
longest = {cid: 1 for cid in concept_ids}
visited = 0
while q:
node = q.popleft()
visited += 1
for nxt in outgoing.get(node, []):
longest[nxt] = max(longest.get(nxt, 1), longest[node] + 1)
indegree[nxt] -= 1
if indegree[nxt] == 0:
q.append(nxt)
max_chain = max(longest.values()) if longest else 0
if max_chain >= 6:
warnings.append(f"Pack has a deep prerequisite chain of length {max_chain}, which may indicate over-fragmentation.")
summary = {
"graph_warning_count": len(warnings),
"concept_count": len(concept_ids),
"edge_count": edge_count,
"max_chain_length": max_chain,
"cycle_count": len(found_cycles),
"isolated_count": sum(1 for cid in concept_ids if len(incoming[cid]) == 0 and len(outgoing[cid]) == 0),
}
return {"warnings": warnings, "summary": summary}

View File

@ -1,11 +1,18 @@
from __future__ import annotations
from pathlib import Path from pathlib import Path
from .review_schema import ImportPreview from .review_schema import ImportPreview
from .pack_validator import validate_pack_directory from .pack_validator import validate_pack_directory
from .semantic_qa import semantic_qa_for_pack
from .graph_qa import graph_qa_for_pack
from .path_quality_qa import path_quality_for_pack
from .coverage_alignment_qa import coverage_alignment_for_pack
def preview_draft_pack_import(source_dir: str | Path, workspace_id: str, overwrite_required: bool = False) -> ImportPreview: def preview_draft_pack_import(source_dir, workspace_id, overwrite_required=False):
result = validate_pack_directory(source_dir) result = validate_pack_directory(source_dir)
preview = ImportPreview( semantic = semantic_qa_for_pack(source_dir) if result["ok"] else {"warnings": []}
graph = graph_qa_for_pack(source_dir) if result["ok"] else {"warnings": []}
pathq = path_quality_for_pack(source_dir) if result["ok"] else {"warnings": []}
coverage = coverage_alignment_for_pack(source_dir) if result["ok"] else {"warnings": []}
return ImportPreview(
source_dir=str(Path(source_dir)), source_dir=str(Path(source_dir)),
workspace_id=workspace_id, workspace_id=workspace_id,
overwrite_required=overwrite_required, overwrite_required=overwrite_required,
@ -13,5 +20,8 @@ def preview_draft_pack_import(source_dir: str | Path, workspace_id: str, overwri
errors=list(result["errors"]), errors=list(result["errors"]),
warnings=list(result["warnings"]), warnings=list(result["warnings"]),
summary=dict(result["summary"]), summary=dict(result["summary"]),
semantic_warnings=list(semantic["warnings"]),
graph_warnings=list(graph["warnings"]),
path_warnings=list(pathq["warnings"]),
coverage_warnings=list(coverage["warnings"]),
) )
return preview

View File

@ -1,8 +1,7 @@
from __future__ import annotations
from pathlib import Path from pathlib import Path
import yaml import yaml
REQUIRED_FILES = ["pack.yaml", "concepts.yaml", "roadmap.yaml", "projects.yaml", "rubrics.yaml"] REQUIRED_FILES = ["pack.yaml","concepts.yaml","roadmap.yaml","projects.yaml","rubrics.yaml"]
def _safe_load_yaml(path: Path, errors: list[str], label: str): def _safe_load_yaml(path: Path, errors: list[str], label: str):
try: try:
@ -11,111 +10,73 @@ def _safe_load_yaml(path: Path, errors: list[str], label: str):
errors.append(f"Could not parse {label}: {exc}") errors.append(f"Could not parse {label}: {exc}")
return {} return {}
def validate_pack_directory(source_dir: str | Path) -> dict: def load_pack_artifacts(source_dir):
source = Path(source_dir) source = Path(source_dir)
errors: list[str] = [] errors = []
warnings: list[str] = []
summary: dict = {}
if not source.exists(): if not source.exists():
return {"ok": False, "errors": [f"Source directory does not exist: {source}"], "warnings": [], "summary": {}} return {"ok": False, "errors": [f"Source directory does not exist: {source}"], "warnings": [], "summary": {}, "artifacts": {}}
if not source.is_dir(): if not source.is_dir():
return {"ok": False, "errors": [f"Source path is not a directory: {source}"], "warnings": [], "summary": {}} return {"ok": False, "errors": [f"Source path is not a directory: {source}"], "warnings": [], "summary": {}, "artifacts": {}}
for fn in REQUIRED_FILES:
for filename in REQUIRED_FILES: if not (source/fn).exists():
if not (source / filename).exists(): errors.append(f"Missing required file: {fn}")
errors.append(f"Missing required file: {filename}")
if errors: if errors:
return {"ok": False, "errors": errors, "warnings": warnings, "summary": summary} return {"ok": False, "errors": errors, "warnings": [], "summary": {}, "artifacts": {}}
return {
pack_data = _safe_load_yaml(source / "pack.yaml", errors, "pack.yaml") "ok": True, "errors": [], "warnings": [], "summary": {},
concepts_data = _safe_load_yaml(source / "concepts.yaml", errors, "concepts.yaml") "artifacts": {
roadmap_data = _safe_load_yaml(source / "roadmap.yaml", errors, "roadmap.yaml") "pack": _safe_load_yaml(source/"pack.yaml", errors, "pack.yaml"),
projects_data = _safe_load_yaml(source / "projects.yaml", errors, "projects.yaml") "concepts": _safe_load_yaml(source/"concepts.yaml", errors, "concepts.yaml"),
rubrics_data = _safe_load_yaml(source / "rubrics.yaml", errors, "rubrics.yaml") "roadmap": _safe_load_yaml(source/"roadmap.yaml", errors, "roadmap.yaml"),
"projects": _safe_load_yaml(source/"projects.yaml", errors, "projects.yaml"),
if errors: "rubrics": _safe_load_yaml(source/"rubrics.yaml", errors, "rubrics.yaml"),
return {"ok": False, "errors": errors, "warnings": warnings, "summary": summary}
for field in ["name", "display_name", "version"]:
if field not in pack_data:
warnings.append(f"pack.yaml has no '{field}' field.")
concepts = concepts_data.get("concepts", [])
roadmap_stages = roadmap_data.get("stages", [])
projects = projects_data.get("projects", [])
rubrics = rubrics_data.get("rubrics", [])
if not isinstance(concepts, list):
errors.append("concepts.yaml top-level 'concepts' is not a list.")
concepts = []
if not isinstance(roadmap_stages, list):
errors.append("roadmap.yaml top-level 'stages' is not a list.")
roadmap_stages = []
if not isinstance(projects, list):
errors.append("projects.yaml top-level 'projects' is not a list.")
projects = []
if not isinstance(rubrics, list):
errors.append("rubrics.yaml top-level 'rubrics' is not a list.")
rubrics = []
concept_ids = []
for idx, concept in enumerate(concepts):
cid = concept.get("id", "")
if not cid:
errors.append(f"Concept at index {idx} has no id.")
else:
concept_ids.append(cid)
if not concept.get("title"):
warnings.append(f"Concept '{cid or idx}' has no title.")
desc = str(concept.get("description", "") or "")
if len(desc.strip()) < 12:
warnings.append(f"Concept '{cid or idx}' has a very thin description.")
seen = set()
dups = set()
for cid in concept_ids:
if cid in seen:
dups.add(cid)
seen.add(cid)
for cid in sorted(dups):
errors.append(f"Duplicate concept id: {cid}")
concept_id_set = set(concept_ids)
for stage in roadmap_stages:
for cid in stage.get("concepts", []) or []:
if cid not in concept_id_set:
errors.append(f"roadmap.yaml references missing concept id: {cid}")
for project in projects:
if not project.get("id"):
warnings.append("A project entry has no id.")
for cid in project.get("prerequisites", []) or []:
if cid not in concept_id_set:
errors.append(f"projects.yaml references missing prerequisite concept id: {cid}")
for idx, rubric in enumerate(rubrics):
if not rubric.get("id"):
warnings.append(f"Rubric at index {idx} has no id.")
criteria = rubric.get("criteria", [])
if criteria is None:
warnings.append(f"Rubric '{rubric.get('id', idx)}' has null criteria.")
elif isinstance(criteria, list) and len(criteria) == 0:
warnings.append(f"Rubric '{rubric.get('id', idx)}' has empty criteria.")
elif not isinstance(criteria, list):
errors.append(f"Rubric '{rubric.get('id', idx)}' criteria is not a list.")
summary = {
"pack_name": pack_data.get("name", ""),
"display_name": pack_data.get("display_name", ""),
"version": pack_data.get("version", ""),
"concept_count": len(concepts),
"roadmap_stage_count": len(roadmap_stages),
"project_count": len(projects),
"rubric_count": len(rubrics),
"error_count": len(errors),
"warning_count": len(warnings),
} }
return {"ok": len(errors) == 0, "errors": errors, "warnings": warnings, "summary": summary} }
def validate_pack_directory(source_dir):
loaded = load_pack_artifacts(source_dir)
errors = list(loaded["errors"]); warnings = list(loaded["warnings"]); summary = dict(loaded["summary"])
if not loaded["ok"]:
return {"ok": False, "errors": errors, "warnings": warnings, "summary": summary}
pack = loaded["artifacts"]["pack"]; concepts = loaded["artifacts"]["concepts"].get("concepts", []) or []
roadmap = loaded["artifacts"]["roadmap"].get("stages", []) or []
projects = loaded["artifacts"]["projects"].get("projects", []) or []
rubrics = loaded["artifacts"]["rubrics"].get("rubrics", []) or []
for field in ["name","display_name","version"]:
if field not in pack:
warnings.append(f"pack.yaml has no '{field}' field.")
ids = []
for i, c in enumerate(concepts):
cid = c.get("id","")
if not cid:
errors.append(f"Concept at index {i} has no id.")
else:
ids.append(cid)
if len(str(c.get("description","")).strip()) < 12:
warnings.append(f"Concept '{cid or i}' has a very thin description.")
seen = set()
for cid in ids:
if cid in seen:
errors.append(f"Duplicate concept id: {cid}")
seen.add(cid)
idset = set(ids)
for stage in roadmap:
for cid in stage.get("concepts", []) or []:
if cid not in idset:
errors.append(f"roadmap.yaml references missing concept id: {cid}")
for project in projects:
for cid in project.get("prerequisites", []) or []:
if cid not in idset:
errors.append(f"projects.yaml references missing prerequisite concept id: {cid}")
for i, rubric in enumerate(rubrics):
crit = rubric.get("criteria", [])
if not rubric.get("id"):
warnings.append(f"Rubric at index {i} has no id.")
if crit is None:
warnings.append(f"Rubric '{rubric.get('id', i)}' has null criteria.")
elif isinstance(crit, list) and len(crit) == 0:
warnings.append(f"Rubric '{rubric.get('id', i)}' has empty criteria.")
elif not isinstance(crit, list):
errors.append(f"Rubric '{rubric.get('id', i)}' criteria is not a list.")
summary = {"pack_name": pack.get("name",""), "display_name": pack.get("display_name",""), "version": pack.get("version",""), "concept_count": len(concepts), "roadmap_stage_count": len(roadmap), "project_count": len(projects), "rubric_count": len(rubrics)}
return {"ok": len(errors)==0, "errors": errors, "warnings": warnings, "summary": summary}

View File

@ -1,2 +1,50 @@
import re
from statistics import mean
from .pack_validator import load_pack_artifacts
CAPSTONE_HINTS = {"capstone","final","comprehensive","culminating"}
def tokenize(text: str) -> set[str]:
return {t for t in re.sub(r"[^a-z0-9]+", " ", str(text).lower()).split() if t}
def path_quality_for_pack(source_dir): def path_quality_for_pack(source_dir):
return {'warnings': [], 'summary': {'path_warning_count': 0}} loaded = load_pack_artifacts(source_dir)
if not loaded["ok"]:
return {"warnings": [], "summary": {"path_warning_count": 0}}
concepts = loaded["artifacts"]["concepts"].get("concepts", []) or []
roadmap = loaded["artifacts"]["roadmap"].get("stages", []) or []
projects = loaded["artifacts"]["projects"].get("projects", []) or []
concept_by_id = {c.get("id"): c for c in concepts if c.get("id")}
project_prereq_ids = {cid for p in projects for cid in (p.get("prerequisites", []) or [])}
warnings = []; stage_sizes = []; stage_prereq_loads = []; assessed = set(project_prereq_ids)
for idx, stage in enumerate(roadmap):
sc = stage.get("concepts", []) or []; cp = stage.get("checkpoint", []) or []
stage_sizes.append(len(sc))
if len(sc) == 0: warnings.append(f"Roadmap stage '{stage.get('title', idx)}' has no concepts.")
if len(cp) == 0: warnings.append(f"Roadmap stage '{stage.get('title', idx)}' has no checkpoint activity.")
cp_tokens = tokenize(' '.join(str(x) for x in cp))
for cid in sc:
if tokenize(concept_by_id.get(cid, {}).get("title","")) & cp_tokens:
assessed.add(cid)
stage_prereq_loads.append(sum(len(concept_by_id.get(cid, {}).get("prerequisites", []) or []) for cid in sc))
for cid in concept_by_id:
if cid not in assessed: warnings.append(f"Concept '{cid}' is not visibly assessed by checkpoints or project prerequisites.")
for idx, project in enumerate(projects):
if tokenize(project.get("title","")) & CAPSTONE_HINTS and len(roadmap) >= 3 and idx == 0:
warnings.append(f"Project '{project.get('title')}' looks capstone-like but appears very early in the project list.")
if roadmap:
for idx in range(max(0, len(roadmap)-2), len(roadmap)):
stage = roadmap[idx]; sc = stage.get("concepts", []) or []; cp = stage.get("checkpoint", []) or []
linked = any(cid in project_prereq_ids for cid in sc)
if sc and len(cp) == 0 and not linked:
warnings.append(f"Late roadmap stage '{stage.get('title', idx)}' may be a dead end: no checkpoints and no project linkage.")
if stage_sizes:
avg = mean(stage_sizes)
for idx, size in enumerate(stage_sizes):
title = roadmap[idx].get("title", idx)
if avg > 0 and size >= max(4, 2.5 * avg): warnings.append(f"Roadmap stage '{title}' is unusually large relative to other stages.")
if len(roadmap) >= 3 and size == 1: warnings.append(f"Roadmap stage '{title}' is unusually small and may need merging or support concepts.")
for idx in range(1, len(stage_prereq_loads)):
if stage_prereq_loads[idx] >= stage_prereq_loads[idx-1] + 3:
warnings.append(f"Roadmap stage '{roadmap[idx].get('title', idx)}' shows an abrupt prerequisite-load jump from the prior stage.")
return {"warnings": warnings, "summary": {"path_warning_count": len(warnings)}}

View File

@ -1,4 +1,3 @@
from __future__ import annotations
from .review_schema import ReviewAction, ReviewLedgerEntry, ReviewSession from .review_schema import ReviewAction, ReviewLedgerEntry, ReviewSession
def _find_concept(session: ReviewSession, concept_id: str): def _find_concept(session: ReviewSession, concept_id: str):

View File

@ -1,4 +1,3 @@
from __future__ import annotations
from pathlib import Path from pathlib import Path
import json import json
from .review_loader import load_draft_pack from .review_loader import load_draft_pack
@ -7,44 +6,36 @@ from .review_actions import apply_action
from .review_export import export_review_state_json, export_promoted_pack from .review_export import export_review_state_json, export_promoted_pack
class ReviewWorkspaceBridge: class ReviewWorkspaceBridge:
def __init__(self, workspace_dir: str | Path, reviewer: str = "Unknown Reviewer") -> None: def __init__(self, workspace_dir, reviewer="Unknown Reviewer"):
self.workspace_dir = Path(workspace_dir) self.workspace_dir = Path(workspace_dir)
self.reviewer = reviewer self.reviewer = reviewer
self.workspace_dir.mkdir(parents=True, exist_ok=True) self.workspace_dir.mkdir(parents=True, exist_ok=True)
@property @property
def draft_pack_dir(self) -> Path: def draft_pack_dir(self): return self.workspace_dir / "draft_pack"
return self.workspace_dir / "draft_pack"
@property @property
def review_session_path(self) -> Path: def review_session_path(self): return self.workspace_dir / "review_session.json"
return self.workspace_dir / "review_session.json"
@property @property
def promoted_pack_dir(self) -> Path: def promoted_pack_dir(self): return self.workspace_dir / "promoted_pack"
return self.workspace_dir / "promoted_pack"
def load_session(self) -> ReviewSession: def load_session(self):
if self.review_session_path.exists(): if self.review_session_path.exists():
data = json.loads(self.review_session_path.read_text(encoding="utf-8")) return ReviewSession.model_validate(json.loads(self.review_session_path.read_text(encoding="utf-8")))
return ReviewSession.model_validate(data)
draft = load_draft_pack(self.draft_pack_dir) draft = load_draft_pack(self.draft_pack_dir)
session = ReviewSession(reviewer=self.reviewer, draft_pack=draft) session = ReviewSession(reviewer=self.reviewer, draft_pack=draft)
export_review_state_json(session, self.review_session_path) export_review_state_json(session, self.review_session_path)
return session return session
def save_session(self, session: ReviewSession) -> None: def save_session(self, session): export_review_state_json(session, self.review_session_path)
export_review_state_json(session, self.review_session_path)
def apply_actions(self, actions: list[dict]) -> ReviewSession: def apply_actions(self, actions):
session = self.load_session() session = self.load_session()
for action_dict in actions: for action_dict in actions:
action = ReviewAction.model_validate(action_dict) apply_action(session, session.reviewer, ReviewAction.model_validate(action_dict))
apply_action(session, session.reviewer, action)
self.save_session(session) self.save_session(session)
return session return session
def export_promoted(self) -> ReviewSession: def export_promoted(self):
session = self.load_session() session = self.load_session()
export_promoted_pack(session, self.promoted_pack_dir) export_promoted_pack(session, self.promoted_pack_dir)
return session return session

View File

@ -1,13 +1,11 @@
from __future__ import annotations import argparse, json
import argparse
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from pathlib import Path from pathlib import Path
from .config import load_config from .config import load_config
from .review_bridge import ReviewWorkspaceBridge from .review_bridge import ReviewWorkspaceBridge
from .workspace_manager import WorkspaceManager from .workspace_manager import WorkspaceManager
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict) -> None: def json_response(handler, status, payload):
body = json.dumps(payload, indent=2).encode("utf-8") body = json.dumps(payload, indent=2).encode("utf-8")
handler.send_response(status) handler.send_response(status)
handler.send_header("Content-Type", "application/json") handler.send_header("Content-Type", "application/json")
@ -19,119 +17,71 @@ def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict) -
handler.wfile.write(body) handler.wfile.write(body)
class ReviewBridgeHandler(BaseHTTPRequestHandler): class ReviewBridgeHandler(BaseHTTPRequestHandler):
reviewer: str = "Unknown Reviewer" reviewer = "Unknown Reviewer"
workspace_manager: WorkspaceManager = None # type: ignore workspace_manager = None
active_bridge: ReviewWorkspaceBridge | None = None active_bridge = None
active_workspace_id: str | None = None active_workspace_id = None
@classmethod @classmethod
def set_active_workspace(cls, workspace_id: str) -> bool: def set_active_workspace(cls, workspace_id):
meta = cls.workspace_manager.touch_recent(workspace_id) meta = cls.workspace_manager.touch_recent(workspace_id)
if meta is None: if meta is None: return False
return False
cls.active_workspace_id = workspace_id cls.active_workspace_id = workspace_id
cls.active_bridge = ReviewWorkspaceBridge(meta.path, reviewer=cls.reviewer) cls.active_bridge = ReviewWorkspaceBridge(meta.path, reviewer=cls.reviewer)
return True return True
def do_OPTIONS(self): def do_OPTIONS(self): json_response(self, 200, {"ok": True})
json_response(self, 200, {"ok": True})
def do_GET(self): def do_GET(self):
if self.path == "/api/workspaces": if self.path == "/api/workspaces":
reg = self.workspace_manager.list_workspaces() return json_response(self, 200, self.workspace_manager.list_workspaces().model_dump())
json_response(self, 200, reg.model_dump())
return
if self.path == "/api/load": if self.path == "/api/load":
if self.active_bridge is None: if self.active_bridge is None: return json_response(self, 400, {"error": "no active workspace"})
json_response(self, 400, {"error": "no active workspace"}) return json_response(self, 200, {"workspace_id": self.active_workspace_id, "session": self.active_bridge.load_session().model_dump()})
return return json_response(self, 404, {"error": "not found"})
session = self.active_bridge.load_session()
json_response(self, 200, {"workspace_id": self.active_workspace_id, "session": session.model_dump()})
return
json_response(self, 404, {"error": "not found"})
def do_POST(self): def do_POST(self):
length = int(self.headers.get("Content-Length", "0")) length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length) if length else b"{}" payload = json.loads((self.rfile.read(length) if length else b"{}").decode("utf-8") or "{}")
payload = json.loads(raw.decode("utf-8") or "{}")
if self.path == "/api/workspaces/create": if self.path == "/api/workspaces/create":
meta = self.workspace_manager.create_workspace( meta = self.workspace_manager.create_workspace(payload["workspace_id"], payload["title"], notes=payload.get("notes", ""))
workspace_id=payload["workspace_id"],
title=payload["title"],
notes=payload.get("notes", "")
)
self.set_active_workspace(meta.workspace_id) self.set_active_workspace(meta.workspace_id)
json_response(self, 200, {"ok": True, "workspace": meta.model_dump()}) return json_response(self, 200, {"ok": True, "workspace": meta.model_dump()})
return
if self.path == "/api/workspaces/open": if self.path == "/api/workspaces/open":
ok = self.set_active_workspace(payload["workspace_id"]) ok = self.set_active_workspace(payload["workspace_id"])
if not ok: return json_response(self, 200 if ok else 404, {"ok": ok, "workspace_id": self.active_workspace_id} if ok else {"error": "workspace not found"})
json_response(self, 404, {"error": "workspace not found"})
return
json_response(self, 200, {"ok": True, "workspace_id": self.active_workspace_id})
return
if self.path == "/api/workspaces/import-preview": if self.path == "/api/workspaces/import-preview":
preview = self.workspace_manager.preview_import( return json_response(self, 200, self.workspace_manager.preview_import(payload["source_dir"], payload["workspace_id"]).model_dump())
source_dir=payload["source_dir"],
workspace_id=payload["workspace_id"]
)
json_response(self, 200, preview.model_dump())
return
if self.path == "/api/workspaces/import": if self.path == "/api/workspaces/import":
try: try:
meta = self.workspace_manager.import_draft_pack( meta = self.workspace_manager.import_draft_pack(payload["source_dir"], payload["workspace_id"], title=payload.get("title"), notes=payload.get("notes",""), allow_overwrite=bool(payload.get("allow_overwrite", False)))
source_dir=payload["source_dir"],
workspace_id=payload["workspace_id"],
title=payload.get("title"),
notes=payload.get("notes", ""),
allow_overwrite=bool(payload.get("allow_overwrite", False)),
)
except FileNotFoundError as exc: except FileNotFoundError as exc:
json_response(self, 404, {"ok": False, "error": str(exc)}) return json_response(self, 404, {"ok": False, "error": str(exc)})
return
except FileExistsError as exc: except FileExistsError as exc:
json_response(self, 409, {"ok": False, "error": str(exc)}) return json_response(self, 409, {"ok": False, "error": str(exc)})
return
except ValueError as exc: except ValueError as exc:
json_response(self, 400, {"ok": False, "error": str(exc)}) return json_response(self, 400, {"ok": False, "error": str(exc)})
return
self.set_active_workspace(meta.workspace_id) self.set_active_workspace(meta.workspace_id)
json_response(self, 200, {"ok": True, "workspace": meta.model_dump()}) return json_response(self, 200, {"ok": True, "workspace": meta.model_dump()})
return
if self.active_bridge is None: if self.active_bridge is None:
json_response(self, 400, {"error": "no active workspace"}) return json_response(self, 400, {"error": "no active workspace"})
return
if self.path == "/api/save": if self.path == "/api/save":
session = self.active_bridge.apply_actions(payload.get("actions", [])) return json_response(self, 200, {"ok": True, "workspace_id": self.active_workspace_id, "session": self.active_bridge.apply_actions(payload.get("actions", [])).model_dump()})
json_response(self, 200, {"ok": True, "workspace_id": self.active_workspace_id, "session": session.model_dump()})
return
if self.path == "/api/export": if self.path == "/api/export":
session = self.active_bridge.export_promoted() session = self.active_bridge.export_promoted()
json_response(self, 200, {"ok": True, "promoted_pack_dir": str(self.active_bridge.promoted_pack_dir), "workspace_id": self.active_workspace_id, "session": session.model_dump()}) return json_response(self, 200, {"ok": True, "promoted_pack_dir": str(self.active_bridge.promoted_pack_dir), "workspace_id": self.active_workspace_id, "session": session.model_dump()})
return return json_response(self, 404, {"error": "not found"})
json_response(self, 404, {"error": "not found"}) def build_parser():
p = argparse.ArgumentParser(description="Didactopus local review bridge server with coverage/alignment QA")
p.add_argument("--config", default="configs/config.example.yaml")
return p
def build_parser() -> argparse.ArgumentParser: def main():
parser = argparse.ArgumentParser(description="Didactopus local review bridge server with full pack validation")
parser.add_argument("--config", default="configs/config.example.yaml")
return parser
def main() -> None:
args = build_parser().parse_args() args = build_parser().parse_args()
config = load_config(Path(args.config)) config = load_config(Path(args.config))
ReviewBridgeHandler.reviewer = config.review.default_reviewer ReviewBridgeHandler.reviewer = config.review.default_reviewer
ReviewBridgeHandler.workspace_manager = WorkspaceManager( ReviewBridgeHandler.workspace_manager = WorkspaceManager(config.bridge.registry_path, config.bridge.default_workspace_root)
registry_path=config.bridge.registry_path,
default_workspace_root=config.bridge.default_workspace_root
)
server = HTTPServer((config.bridge.host, config.bridge.port), ReviewBridgeHandler) server = HTTPServer((config.bridge.host, config.bridge.port), ReviewBridgeHandler)
print(f"Didactopus review bridge listening on http://{config.bridge.host}:{config.bridge.port}") print(f"Didactopus review bridge listening on http://{config.bridge.host}:{config.bridge.port}")
server.serve_forever() server.serve_forever()

View File

@ -1,34 +1,24 @@
from __future__ import annotations
from pathlib import Path from pathlib import Path
import json, yaml import json, yaml
from .review_schema import ReviewSession from .review_schema import ReviewSession
def export_review_state_json(session: ReviewSession, path: str | Path) -> None: def export_review_state_json(session: ReviewSession, path):
Path(path).write_text(session.model_dump_json(indent=2), encoding="utf-8") Path(path).write_text(session.model_dump_json(indent=2), encoding="utf-8")
def export_promoted_pack(session: ReviewSession, outdir: str | Path) -> None: def export_promoted_pack(session: ReviewSession, outdir):
outdir = Path(outdir) outdir = Path(outdir); outdir.mkdir(parents=True, exist_ok=True)
outdir.mkdir(parents=True, exist_ok=True) promoted = dict(session.draft_pack.pack)
promoted_pack = dict(session.draft_pack.pack) promoted["version"] = str(promoted.get("version", "0.1.0-draft")).replace("-draft","-reviewed")
promoted_pack["version"] = str(promoted_pack.get("version", "0.1.0-draft")).replace("-draft", "-reviewed") promoted["curation"] = {"reviewer": session.reviewer, "ledger_entries": len(session.ledger)}
promoted_pack["curation"] = {"reviewer": session.reviewer, "ledger_entries": len(session.ledger)}
concepts = [] concepts = []
for concept in session.draft_pack.concepts: for concept in session.draft_pack.concepts:
if concept.status == "rejected": if concept.status == "rejected":
continue continue
concepts.append({ concepts.append({
"id": concept.concept_id, "id": concept.concept_id, "title": concept.title, "description": concept.description,
"title": concept.title, "prerequisites": concept.prerequisites, "mastery_signals": concept.mastery_signals,
"description": concept.description, "status": concept.status, "notes": concept.notes, "mastery_profile": {}
"prerequisites": concept.prerequisites,
"mastery_signals": concept.mastery_signals,
"status": concept.status,
"notes": concept.notes,
"mastery_profile": {},
}) })
(outdir/"pack.yaml").write_text(yaml.safe_dump(promoted, sort_keys=False), encoding="utf-8")
(outdir / "pack.yaml").write_text(yaml.safe_dump(promoted_pack, sort_keys=False), encoding="utf-8") (outdir/"concepts.yaml").write_text(yaml.safe_dump({"concepts": concepts}, sort_keys=False), encoding="utf-8")
(outdir / "concepts.yaml").write_text(yaml.safe_dump({"concepts": concepts}, sort_keys=False), encoding="utf-8") (outdir/"review_ledger.json").write_text(json.dumps(session.model_dump(), indent=2), encoding="utf-8")
(outdir / "review_ledger.json").write_text(json.dumps(session.model_dump(), indent=2), encoding="utf-8")
(outdir / "license_attribution.json").write_text(json.dumps(session.draft_pack.attribution, indent=2), encoding="utf-8")

View File

@ -5,38 +5,20 @@ from .review_schema import DraftPackData, ConceptReviewEntry
def load_draft_pack(pack_dir: str | Path) -> DraftPackData: def load_draft_pack(pack_dir: str | Path) -> DraftPackData:
pack_dir = Path(pack_dir) pack_dir = Path(pack_dir)
concepts_yaml = yaml.safe_load((pack_dir / "concepts.yaml").read_text(encoding="utf-8")) or {} data = yaml.safe_load((pack_dir / "concepts.yaml").read_text(encoding="utf-8")) or {}
concepts = [] concepts = []
for item in concepts_yaml.get("concepts", []): for item in data.get("concepts", []):
concepts.append( concepts.append(ConceptReviewEntry(
ConceptReviewEntry( concept_id=item.get("id",""),
concept_id=item.get("id", ""), title=item.get("title",""),
title=item.get("title", ""), description=item.get("description",""),
description=item.get("description", ""),
prerequisites=list(item.get("prerequisites", [])), prerequisites=list(item.get("prerequisites", [])),
mastery_signals=list(item.get("mastery_signals", [])), mastery_signals=list(item.get("mastery_signals", [])),
status=item.get("status", "needs_review"), status=item.get("status","needs_review"),
notes=list(item.get("notes", [])), notes=list(item.get("notes", [])),
) ))
) pack = yaml.safe_load((pack_dir / "pack.yaml").read_text(encoding="utf-8")) if (pack_dir/"pack.yaml").exists() else {}
attribution = json.loads((pack_dir / "license_attribution.json").read_text(encoding="utf-8")) if (pack_dir/"license_attribution.json").exists() else {}
def bullets(path: Path) -> list[str]: def bullets(path):
if not path.exists(): return [line[2:] for line in path.read_text(encoding="utf-8").splitlines() if line.startswith("- ")] if path.exists() else []
return [] return DraftPackData(pack=pack or {}, concepts=concepts, conflicts=bullets(pack_dir/"conflict_report.md"), review_flags=bullets(pack_dir/"review_report.md"), attribution=attribution)
return [line[2:] for line in path.read_text(encoding="utf-8").splitlines() if line.startswith("- ")]
pack = {}
if (pack_dir / "pack.yaml").exists():
pack = yaml.safe_load((pack_dir / "pack.yaml").read_text(encoding="utf-8")) or {}
attribution = {}
if (pack_dir / "license_attribution.json").exists():
attribution = json.loads((pack_dir / "license_attribution.json").read_text(encoding="utf-8"))
return DraftPackData(
pack=pack,
concepts=concepts,
conflicts=bullets(pack_dir / "conflict_report.md"),
review_flags=bullets(pack_dir / "review_report.md"),
attribution=attribution,
)

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing import Literal from typing import Literal
TrustStatus = Literal["trusted", "provisional", "rejected", "needs_review"] TrustStatus = Literal["trusted","provisional","rejected","needs_review"]
class ConceptReviewEntry(BaseModel): class ConceptReviewEntry(BaseModel):
concept_id: str concept_id: str
@ -55,3 +55,7 @@ class ImportPreview(BaseModel):
errors: list[str] = Field(default_factory=list) errors: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list) warnings: list[str] = Field(default_factory=list)
summary: dict = Field(default_factory=dict) summary: dict = Field(default_factory=dict)
semantic_warnings: list[str] = Field(default_factory=list)
graph_warnings: list[str] = Field(default_factory=list)
path_warnings: list[str] = Field(default_factory=list)
coverage_warnings: list[str] = Field(default_factory=list)

View File

@ -1,91 +1,4 @@
from __future__ import annotations
from pathlib import Path
import re
from difflib import SequenceMatcher
from .pack_validator import load_pack_artifacts from .pack_validator import load_pack_artifacts
def semantic_qa_for_pack(source_dir):
BROAD_HINTS = {"and", "overview", "foundations", "introduction", "basics", "advanced"}
def normalize_title(text: str) -> str:
return re.sub(r"[^a-z0-9]+", " ", text.lower()).strip()
def similarity(a: str, b: str) -> float:
return SequenceMatcher(None, normalize_title(a), normalize_title(b)).ratio()
def token_set(text: str) -> set[str]:
return {t for t in normalize_title(text).split() if t}
def semantic_qa_for_pack(source_dir: str | Path) -> dict:
loaded = load_pack_artifacts(source_dir) loaded = load_pack_artifacts(source_dir)
if not loaded["ok"]: return {"warnings": [], "summary": {"semantic_warning_count": 0}} if loaded["ok"] else {"warnings": [], "summary": {"semantic_warning_count": 0}}
return {"warnings": [], "summary": {"semantic_warning_count": 0}}
pack = loaded["artifacts"]["pack"]
concepts = loaded["artifacts"]["concepts"].get("concepts", []) or []
roadmap = loaded["artifacts"]["roadmap"].get("stages", []) or []
warnings: list[str] = []
# Near-duplicate titles
for i in range(len(concepts)):
for j in range(i + 1, len(concepts)):
a = concepts[i]
b = concepts[j]
sim = similarity(a.get("title", ""), b.get("title", ""))
if sim >= 0.86 and a.get("id") != b.get("id"):
warnings.append(f"Near-duplicate concept titles: '{a.get('title')}' vs '{b.get('title')}'")
# Over-broad titles
for concept in concepts:
title = concept.get("title", "")
toks = token_set(title)
if len(toks) >= 3 and (BROAD_HINTS & toks):
warnings.append(f"Concept '{title}' may be over-broad and may need splitting.")
if " and " in title.lower():
warnings.append(f"Concept '{title}' is compound and may combine multiple ideas.")
# Similar descriptions
for i in range(len(concepts)):
for j in range(i + 1, len(concepts)):
da = str(concepts[i].get("description", "") or "")
db = str(concepts[j].get("description", "") or "")
if len(da) > 20 and len(db) > 20:
sim = SequenceMatcher(None, da.lower(), db.lower()).ratio()
if sim >= 0.82:
warnings.append(
f"Concept descriptions are very similar: '{concepts[i].get('title')}' vs '{concepts[j].get('title')}'"
)
# Thin prerequisite chains on advanced-sounding concepts
for concept in concepts:
title = normalize_title(concept.get("title", ""))
prereqs = concept.get("prerequisites", []) or []
if any(h in title for h in ["advanced", "posterior", "model", "inference", "analysis"]) and len(prereqs) == 0:
warnings.append(f"Concept '{concept.get('title')}' looks advanced but has no prerequisites.")
# Missing bridge concepts between roadmap stages
concept_by_id = {c.get("id"): c for c in concepts if c.get("id")}
for idx in range(len(roadmap) - 1):
current_stage = roadmap[idx]
next_stage = roadmap[idx + 1]
current_titles = [concept_by_id[cid].get("title", "") for cid in current_stage.get("concepts", []) if cid in concept_by_id]
next_titles = [concept_by_id[cid].get("title", "") for cid in next_stage.get("concepts", []) if cid in concept_by_id]
current_tokens = set().union(*[token_set(t) for t in current_titles]) if current_titles else set()
next_tokens = set().union(*[token_set(t) for t in next_titles]) if next_titles else set()
overlap = current_tokens & next_tokens
if current_titles and next_titles and len(overlap) == 0:
warnings.append(
f"Roadmap transition from stage '{current_stage.get('title')}' to '{next_stage.get('title')}' may lack a bridge concept."
)
if len(next_titles) == 1 and len(current_titles) >= 2 and len(overlap) == 0:
warnings.append(
f"Stage '{next_stage.get('title')}' contains a singleton concept with weak visible continuity from the prior stage."
)
return {
"warnings": warnings,
"summary": {
"semantic_warning_count": len(warnings),
"pack_name": pack.get("name", ""),
},
}

View File

@ -1,58 +1,46 @@
from __future__ import annotations
from pathlib import Path from pathlib import Path
from datetime import datetime, UTC from datetime import datetime, UTC
import json, shutil import json, shutil
from .review_schema import WorkspaceMeta, WorkspaceRegistry from .review_schema import WorkspaceMeta, WorkspaceRegistry
from .import_validator import preview_draft_pack_import from .import_validator import preview_draft_pack_import
def utc_now() -> str: def utc_now():
return datetime.now(UTC).isoformat() return datetime.now(UTC).isoformat()
class WorkspaceManager: class WorkspaceManager:
def __init__(self, registry_path: str | Path, default_workspace_root: str | Path) -> None: def __init__(self, registry_path, default_workspace_root):
self.registry_path = Path(registry_path) self.registry_path = Path(registry_path)
self.default_workspace_root = Path(default_workspace_root) self.default_workspace_root = Path(default_workspace_root)
self.default_workspace_root.mkdir(parents=True, exist_ok=True) self.default_workspace_root.mkdir(parents=True, exist_ok=True)
def load_registry(self) -> WorkspaceRegistry: def load_registry(self):
if self.registry_path.exists(): if self.registry_path.exists():
return WorkspaceRegistry.model_validate(json.loads(self.registry_path.read_text(encoding="utf-8"))) return WorkspaceRegistry.model_validate(json.loads(self.registry_path.read_text(encoding="utf-8")))
return WorkspaceRegistry() return WorkspaceRegistry()
def save_registry(self, registry: WorkspaceRegistry) -> None: def save_registry(self, registry):
self.registry_path.write_text(registry.model_dump_json(indent=2), encoding="utf-8") self.registry_path.write_text(registry.model_dump_json(indent=2), encoding="utf-8")
def list_workspaces(self) -> WorkspaceRegistry: def list_workspaces(self):
return self.load_registry() return self.load_registry()
def create_workspace(self, workspace_id: str, title: str, notes: str = "") -> WorkspaceMeta: def create_workspace(self, workspace_id, title, notes=""):
registry = self.load_registry() registry = self.load_registry()
workspace_dir = self.default_workspace_root / workspace_id workspace_dir = self.default_workspace_root / workspace_id
workspace_dir.mkdir(parents=True, exist_ok=True) workspace_dir.mkdir(parents=True, exist_ok=True)
draft_dir = workspace_dir / "draft_pack" draft_dir = workspace_dir / "draft_pack"
draft_dir.mkdir(parents=True, exist_ok=True) draft_dir.mkdir(parents=True, exist_ok=True)
if not (draft_dir / "pack.yaml").exists(): if not (draft_dir / "pack.yaml").exists():
(draft_dir / "pack.yaml").write_text( (draft_dir / "pack.yaml").write_text(f"name: {workspace_id}\ndisplay_name: {title}\nversion: 0.1.0-draft\n", encoding="utf-8")
f"name: {workspace_id}\ndisplay_name: {title}\nversion: 0.1.0-draft\ndescription: Seed draft pack for workspace {workspace_id}\n",
encoding="utf-8"
)
if not (draft_dir / "concepts.yaml").exists(): if not (draft_dir / "concepts.yaml").exists():
(draft_dir / "concepts.yaml").write_text("concepts: []\n", encoding="utf-8") (draft_dir / "concepts.yaml").write_text("concepts: []\n", encoding="utf-8")
meta = WorkspaceMeta(workspace_id=workspace_id, title=title, path=str(workspace_dir), created_at=utc_now(), last_opened_at=utc_now(), notes=notes)
meta = WorkspaceMeta(
workspace_id=workspace_id,
title=title,
path=str(workspace_dir),
created_at=utc_now(),
last_opened_at=utc_now(),
notes=notes,
)
registry.workspaces = [w for w in registry.workspaces if w.workspace_id != workspace_id] + [meta] registry.workspaces = [w for w in registry.workspaces if w.workspace_id != workspace_id] + [meta]
registry.recent_workspace_ids = [workspace_id] + [w for w in registry.recent_workspace_ids if w != workspace_id] registry.recent_workspace_ids = [workspace_id] + [w for w in registry.recent_workspace_ids if w != workspace_id]
self.save_registry(registry) self.save_registry(registry)
return meta return meta
def touch_recent(self, workspace_id: str) -> WorkspaceMeta | None: def touch_recent(self, workspace_id):
registry = self.load_registry() registry = self.load_registry()
target = None target = None
for ws in registry.workspaces: for ws in registry.workspaces:
@ -65,49 +53,39 @@ class WorkspaceManager:
self.save_registry(registry) self.save_registry(registry)
return target return target
def get_workspace(self, workspace_id: str) -> WorkspaceMeta | None: def get_workspace(self, workspace_id):
registry = self.load_registry() for ws in self.load_registry().workspaces:
for ws in registry.workspaces:
if ws.workspace_id == workspace_id: if ws.workspace_id == workspace_id:
return ws return ws
return None return None
def preview_import(self, source_dir: str | Path, workspace_id: str): def preview_import(self, source_dir, workspace_id):
preview = preview_draft_pack_import(source_dir, workspace_id) preview = preview_draft_pack_import(source_dir, workspace_id)
existing = self.get_workspace(workspace_id) if self.get_workspace(workspace_id) is not None:
if existing is not None:
preview.overwrite_required = True preview.overwrite_required = True
preview.warnings.append(f"Workspace '{workspace_id}' already exists and import will overwrite draft_pack.") preview.warnings.append(f"Workspace '{workspace_id}' already exists and import will overwrite draft_pack.")
return preview return preview
def import_draft_pack(self, source_dir: str | Path, workspace_id: str, title: str | None = None, notes: str = "", allow_overwrite: bool = False) -> WorkspaceMeta: def import_draft_pack(self, source_dir, workspace_id, title=None, notes="", allow_overwrite=False):
preview = self.preview_import(source_dir, workspace_id) preview = self.preview_import(source_dir, workspace_id)
if not preview.ok: if not preview.ok:
raise ValueError("Draft pack preview failed: " + "; ".join(preview.errors)) raise ValueError("Draft pack preview failed: " + "; ".join(preview.errors))
existing = self.get_workspace(workspace_id) existing = self.get_workspace(workspace_id)
if existing is not None and not allow_overwrite: if existing is not None and not allow_overwrite:
raise FileExistsError(f"Workspace '{workspace_id}' already exists; set allow_overwrite to replace its draft pack.") raise FileExistsError(f"Workspace '{workspace_id}' already exists; set allow_overwrite to replace its draft pack.")
meta = existing if existing is not None else self.create_workspace(workspace_id, title or workspace_id, notes=notes)
meta = existing if existing is not None:
if meta is None:
meta = self.create_workspace(workspace_id, title or workspace_id, notes=notes)
else:
self.touch_recent(workspace_id) self.touch_recent(workspace_id)
target_draft = Path(meta.path) / "draft_pack"
workspace_dir = Path(meta.path)
target_draft = workspace_dir / "draft_pack"
if target_draft.exists(): if target_draft.exists():
shutil.rmtree(target_draft) shutil.rmtree(target_draft)
shutil.copytree(Path(source_dir), target_draft) shutil.copytree(Path(source_dir), target_draft)
registry = self.load_registry() registry = self.load_registry()
for ws in registry.workspaces: for ws in registry.workspaces:
if ws.workspace_id == workspace_id: if ws.workspace_id == workspace_id:
ws.last_opened_at = utc_now() ws.last_opened_at = utc_now()
if title: if title: ws.title = title
ws.title = title if notes: ws.notes = notes
if notes:
ws.notes = notes
meta = ws meta = ws
break break
registry.recent_workspace_ids = [workspace_id] + [w for w in registry.recent_workspace_ids if w != workspace_id] registry.recent_workspace_ids = [workspace_id] + [w for w in registry.recent_workspace_ids if w != workspace_id]

View File

@ -1,17 +1,11 @@
from pathlib import Path from pathlib import Path
from didactopus.import_validator import preview_draft_pack_import from didactopus.import_validator import preview_draft_pack_import
def test_valid_preview(tmp_path: Path) -> None: def test_preview_includes_coverage_warnings(tmp_path: Path) -> None:
(tmp_path / "pack.yaml").write_text("name: p\ndisplay_name: P\nversion: 0.1.0\n", encoding="utf-8") (tmp_path / "pack.yaml").write_text("name: p\ndisplay_name: P\nversion: 0.1.0\n", encoding="utf-8")
(tmp_path / "concepts.yaml").write_text("concepts:\n - id: c1\n title: C1\n description: A full enough description.\n", encoding="utf-8") (tmp_path / "concepts.yaml").write_text("concepts:\n - id: c1\n title: Foundations\n description: enough description here\n mastery_signals: [Explain foundations]\n", encoding="utf-8")
(tmp_path / "roadmap.yaml").write_text("stages: []\n", encoding="utf-8") (tmp_path / "roadmap.yaml").write_text("stages:\n - id: s1\n title: One\n concepts: [c1]\n checkpoint: []\n", encoding="utf-8")
(tmp_path / "projects.yaml").write_text("projects: []\n", encoding="utf-8") (tmp_path / "projects.yaml").write_text("projects: []\n", encoding="utf-8")
(tmp_path / "rubrics.yaml").write_text("rubrics: []\n", encoding="utf-8") (tmp_path / "rubrics.yaml").write_text("rubrics:\n - id: r1\n title: Style\n criteria: [formatting]\n", encoding="utf-8")
preview = preview_draft_pack_import(tmp_path, "ws1") preview = preview_draft_pack_import(tmp_path, "ws1")
assert preview.ok is True assert isinstance(preview.coverage_warnings, list)
assert preview.summary["concept_count"] == 1
def test_missing_required_file(tmp_path: Path) -> None:
(tmp_path / "pack.yaml").write_text("name: p\n", encoding="utf-8")
preview = preview_draft_pack_import(tmp_path, "ws1")
assert preview.ok is False

View File

@ -1,5 +1,4 @@
from pathlib import Path from pathlib import Path
def test_webui_scaffold_exists() -> None: def test_webui_scaffold_exists() -> None:
assert Path("webui/src/App.jsx").exists() assert Path("webui/src/App.jsx").exists()
assert Path("webui/package.json").exists() assert Path("webui/package.json").exists()

View File

@ -1,7 +1,5 @@
concepts: concepts:
- id: descriptive-statistics - id: descriptive-statistics
title: Descriptive Statistics title: Descriptive Statistics
description: Measures of center and spread. description: Measures of center and spread in descriptive data analysis.
prerequisites: [] prerequisites: []
mastery_signals:
- Explain mean, median, and variance.