Evidence engine update

This commit is contained in:
welsberr 2026-03-12 21:40:34 -04:00
parent 6c04e1e7bf
commit e4d416a48d
18 changed files with 445 additions and 188 deletions

View File

@ -2,26 +2,26 @@
**Didactopus** is a local-first AI-assisted autodidactic mastery platform. **Didactopus** is a local-first AI-assisted autodidactic mastery platform.
This revision moves the system from simple concept merging toward a true **merged learning graph**. This revision adds an evidence-driven mastery engine on top of the adaptive learner model.
## Added in this revision ## Added in this revision
- merged learning graph builder - evidence record models
- combined prerequisite DAG across packs - rubric-style evidence scoring
- merged roadmap stage catalog - concept mastery updates from accumulated evidence
- merged project catalog - weak-concept resurfacing
- namespaced concept keys (`pack::concept`) - automatic learner state updates from evidence bundles
- optional concept override support in `pack.yaml` - project evidence integration
- learner-facing roadmap generation from merged packs - CLI demonstration of evidence-driven progression
- CLI reporting for merged graph statistics - tests for mastery promotion and resurfacing
- tests for merged learning graph behavior
## Why this matters ## Why this matters
Didactopus can now use multiple compatible packs to build one composite domain model rather than treating packs as isolated fragments. Didactopus no longer needs mastery to be supplied only by hand. It can now begin to infer learner state from observed evidence such as:
That enables: - explanation quality
- foundations + extension pack composition - problem-solving performance
- unified learner roadmaps - project completion
- shared project catalogs - transfer-task performance
- safe coexistence of overlapping concept IDs via namespacing
That is a necessary step toward a genuine mastery engine.

View File

@ -15,6 +15,7 @@ platform:
require_learner_explanations: true require_learner_explanations: true
permit_direct_answers: false permit_direct_answers: false
mastery_threshold: 0.8 mastery_threshold: 0.8
resurfacing_threshold: 0.55
artifacts: artifacts:
local_pack_dirs: local_pack_dirs:

35
docs/evidence-engine.md Normal file
View File

@ -0,0 +1,35 @@
# Evidence Engine
## Purpose
The evidence engine updates learner mastery state from observed work rather than from manual declarations alone.
## Evidence types in this revision
- explanation
- problem
- project
- transfer
Each evidence item includes:
- target concept
- evidence type
- score from 0.0 to 1.0
- optional notes
## Current update policy
For each concept:
- maintain a running average evidence score
- mark as mastered when average score meets mastery threshold and at least one evidence item exists
- resurface a mastered concept when the average later drops below the resurfacing threshold
This is intentionally simple and transparent.
## Future work
- weighted evidence types
- recency decay
- uncertainty estimates
- Bayesian mastery models
- multi-rubric scoring per concept

View File

@ -0,0 +1,6 @@
concepts:
- id: model-checking
title: Model Checking
prerequisites: []
mastery_signals:
- compare model assumptions

View File

@ -0,0 +1,14 @@
name: applied-inference
display_name: Applied Inference Pack
version: 0.2.0
schema_version: "1"
didactopus_min_version: 0.1.0
didactopus_max_version: 0.9.99
description: Simple applied inference pack.
author: Wesley R. Elsberry
license: MIT
dependencies:
- name: bayes-extension
min_version: 0.1.0
max_version: 1.0.0
overrides: []

View File

@ -0,0 +1,9 @@
projects:
- id: inference-project
title: Applied Inference Project
difficulty: advanced
prerequisites:
- model-checking
deliverables:
- memo
- critique

View File

@ -0,0 +1,7 @@
stages:
- id: stage-1
title: Applied Inference
concepts:
- model-checking
checkpoint:
- critique a simple model

View File

@ -0,0 +1,6 @@
rubrics:
- id: inference-rubric
title: Inference Rubric
criteria:
- correctness
- critique

View File

@ -2,13 +2,12 @@ __version__ = "0.1.0"
__all__ = [ __all__ = [
"__version__", "__version__",
"adaptive_engine",
"artifact_registry", "artifact_registry",
"artifact_schemas", "artifact_schemas",
"config", "config",
"curriculum", "evidence_engine",
"domain_map",
"evaluation", "evaluation",
"graph_merge",
"learning_graph", "learning_graph",
"main", "main",
"mentor", "mentor",

View File

@ -0,0 +1,76 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
import networkx as nx
from .learning_graph import MergedLearningGraph
NodeStatus = Literal["mastered", "ready", "blocked", "hidden"]
@dataclass
class LearnerProfile:
learner_id: str
display_name: str = ""
goals: list[str] = field(default_factory=list)
mastered_concepts: set[str] = field(default_factory=set)
hide_mastered: bool = True
@dataclass
class AdaptivePlan:
node_status: dict[str, NodeStatus] = field(default_factory=dict)
learner_roadmap: list[dict] = field(default_factory=list)
next_best_concepts: list[str] = field(default_factory=list)
eligible_projects: list[dict] = field(default_factory=list)
def classify_node_status(merged: MergedLearningGraph, profile: LearnerProfile) -> dict[str, NodeStatus]:
status: dict[str, NodeStatus] = {}
for concept_key in nx.topological_sort(merged.graph):
if concept_key in profile.mastered_concepts:
status[concept_key] = "hidden" if profile.hide_mastered else "mastered"
continue
prereqs = set(merged.graph.predecessors(concept_key))
if prereqs.issubset(profile.mastered_concepts):
status[concept_key] = "ready"
else:
status[concept_key] = "blocked"
return status
def select_next_best_concepts(status: dict[str, NodeStatus], limit: int = 5) -> list[str]:
return [concept for concept, s in status.items() if s == "ready"][:limit]
def recommend_projects(merged: MergedLearningGraph, profile: LearnerProfile) -> list[dict]:
eligible = []
for project in merged.project_catalog:
if set(project["prerequisites"]).issubset(profile.mastered_concepts):
eligible.append(project)
return eligible
def build_adaptive_plan(merged: MergedLearningGraph, profile: LearnerProfile, next_limit: int = 5) -> AdaptivePlan:
status = classify_node_status(merged, profile)
roadmap = []
for concept_key in nx.topological_sort(merged.graph):
node_state = status[concept_key]
if node_state == "hidden":
continue
concept = merged.concept_data[concept_key]
roadmap.append({
"concept_key": concept_key,
"title": concept["title"],
"pack": concept["pack"],
"status": node_state,
"prerequisites": list(merged.graph.predecessors(concept_key)),
})
return AdaptivePlan(
node_status=status,
learner_roadmap=roadmap,
next_best_concepts=select_next_best_concepts(status, limit=next_limit),
eligible_projects=recommend_projects(merged, profile),
)

View File

@ -56,49 +56,34 @@ def _check_duplicate_ids(entries: list[Any], label: str) -> list[str]:
errors: list[str] = [] errors: list[str] = []
seen: set[str] = set() seen: set[str] = set()
for entry in entries: for entry in entries:
entry_id = entry.id if entry.id in seen:
if entry_id in seen: errors.append(f"duplicate {label} id: {entry.id}")
errors.append(f"duplicate {label} id: {entry_id}") seen.add(entry.id)
seen.add(entry_id)
return errors return errors
def _check_concept_references(concepts_file: ConceptsFile, roadmap_file: RoadmapFile, projects_file: ProjectsFile) -> list[str]: def _check_concept_references(concepts_file: ConceptsFile, roadmap_file: RoadmapFile, projects_file: ProjectsFile) -> list[str]:
errors: list[str] = [] errors: list[str] = []
concept_ids = {c.id for c in concepts_file.concepts} concept_ids = {c.id for c in concepts_file.concepts}
for concept in concepts_file.concepts: for concept in concepts_file.concepts:
for prereq in concept.prerequisites: for prereq in concept.prerequisites:
if prereq not in concept_ids: if prereq not in concept_ids:
errors.append( errors.append(f"unknown concept prerequisite '{prereq}' referenced by concept '{concept.id}'")
f"unknown concept prerequisite '{prereq}' referenced by concept '{concept.id}'"
)
for stage in roadmap_file.stages: for stage in roadmap_file.stages:
for concept_id in stage.concepts: for concept_id in stage.concepts:
if concept_id not in concept_ids: if concept_id not in concept_ids:
errors.append( errors.append(f"unknown concept '{concept_id}' referenced by roadmap stage '{stage.id}'")
f"unknown concept '{concept_id}' referenced by roadmap stage '{stage.id}'"
)
for project in projects_file.projects: for project in projects_file.projects:
for prereq in project.prerequisites: for prereq in project.prerequisites:
if prereq not in concept_ids: if prereq not in concept_ids:
errors.append( errors.append(f"unknown concept prerequisite '{prereq}' referenced by project '{project.id}'")
f"unknown concept prerequisite '{prereq}' referenced by project '{project.id}'"
)
return errors return errors
def _check_core_compatibility(manifest: PackManifest) -> list[str]: def _check_core_compatibility(manifest: PackManifest) -> list[str]:
if _version_in_range(DIDACTOPUS_VERSION, manifest.didactopus_min_version, manifest.didactopus_max_version): if _version_in_range(DIDACTOPUS_VERSION, manifest.didactopus_min_version, manifest.didactopus_max_version):
return [] return []
return [ return [f"incompatible with Didactopus core version {DIDACTOPUS_VERSION}; supported range is {manifest.didactopus_min_version}..{manifest.didactopus_max_version}"]
"incompatible with Didactopus core version "
f"{DIDACTOPUS_VERSION}; supported range is "
f"{manifest.didactopus_min_version}..{manifest.didactopus_max_version}"
]
def validate_pack(pack_dir: str | Path) -> PackValidationResult: def validate_pack(pack_dir: str | Path) -> PackValidationResult:
@ -148,7 +133,6 @@ def validate_pack(pack_dir: str | Path) -> PackValidationResult:
if concepts_file and roadmap_file and projects_file: if concepts_file and roadmap_file and projects_file:
result.errors.extend(_check_concept_references(concepts_file, roadmap_file, projects_file)) result.errors.extend(_check_concept_references(concepts_file, roadmap_file, projects_file))
except Exception as exc: except Exception as exc:
result.errors.append(str(exc)) result.errors.append(str(exc))
@ -170,7 +154,6 @@ def discover_domain_packs(base_dirs: list[str | Path]) -> list[PackValidationRes
def check_pack_dependencies(results: list[PackValidationResult]) -> list[str]: def check_pack_dependencies(results: list[PackValidationResult]) -> list[str]:
errors: list[str] = [] errors: list[str] = []
manifest_by_name = {r.manifest.name: r.manifest for r in results if r.manifest is not None} manifest_by_name = {r.manifest.name: r.manifest for r in results if r.manifest is not None}
for result in results: for result in results:
if result.manifest is None: if result.manifest is None:
continue continue
@ -180,19 +163,16 @@ def check_pack_dependencies(results: list[PackValidationResult]) -> list[str]:
errors.append(f"pack '{result.manifest.name}' depends on missing pack '{dep.name}'") errors.append(f"pack '{result.manifest.name}' depends on missing pack '{dep.name}'")
continue continue
if not _version_in_range(dep_manifest.version, dep.min_version, dep.max_version): if not _version_in_range(dep_manifest.version, dep.min_version, dep.max_version):
errors.append( errors.append(f"pack '{result.manifest.name}' requires '{dep.name}' version {dep.min_version}..{dep.max_version}, but found {dep_manifest.version}")
f"pack '{result.manifest.name}' requires '{dep.name}' version "
f"{dep.min_version}..{dep.max_version}, but found {dep_manifest.version}"
)
return errors return errors
def build_dependency_graph(results: list[PackValidationResult]) -> nx.DiGraph: def build_dependency_graph(results: list[PackValidationResult]) -> nx.DiGraph:
graph = nx.DiGraph() graph = nx.DiGraph()
valid_results = [r for r in results if r.manifest is not None and r.is_valid] valid = [r for r in results if r.manifest is not None and r.is_valid]
for result in valid_results: for result in valid:
graph.add_node(result.manifest.name) graph.add_node(result.manifest.name)
for result in valid_results: for result in valid:
for dep in result.manifest.dependencies: for dep in result.manifest.dependencies:
if dep.name in graph: if dep.name in graph:
graph.add_edge(dep.name, result.manifest.name) graph.add_edge(dep.name, result.manifest.name)
@ -200,10 +180,8 @@ def build_dependency_graph(results: list[PackValidationResult]) -> nx.DiGraph:
def detect_dependency_cycles(results: list[PackValidationResult]) -> list[list[str]]: def detect_dependency_cycles(results: list[PackValidationResult]) -> list[list[str]]:
graph = build_dependency_graph(results) return [cycle for cycle in nx.simple_cycles(build_dependency_graph(results))]
return [cycle for cycle in nx.simple_cycles(graph)]
def topological_pack_order(results: list[PackValidationResult]) -> list[str]: def topological_pack_order(results: list[PackValidationResult]) -> list[str]:
graph = build_dependency_graph(results) return list(nx.topological_sort(build_dependency_graph(results)))
return list(nx.topological_sort(graph))

View File

@ -27,6 +27,7 @@ class PlatformConfig(BaseModel):
require_learner_explanations: bool = True require_learner_explanations: bool = True
permit_direct_answers: bool = False permit_direct_answers: bool = False
mastery_threshold: float = 0.8 mastery_threshold: float = 0.8
resurfacing_threshold: float = 0.55
class ArtifactConfig(BaseModel): class ArtifactConfig(BaseModel):

View File

@ -1,7 +1,26 @@
from .model_provider import ModelProvider from dataclasses import dataclass
def generate_rubric(provider: ModelProvider, concept: str) -> str: @dataclass
return provider.generate( class RubricScore:
f"Create a concise evaluation rubric for mastery of '{concept}'." correctness: float
).text clarity: float
justification: float
transfer: float
def mean(self) -> float:
return (self.correctness + self.clarity + self.justification + self.transfer) / 4.0
def score_simple_rubric(
correctness: float,
clarity: float,
justification: float,
transfer: float,
) -> RubricScore:
return RubricScore(
correctness=correctness,
clarity=clarity,
justification=justification,
transfer=transfer,
)

View File

@ -0,0 +1,82 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
from .adaptive_engine import LearnerProfile
EvidenceType = Literal["explanation", "problem", "project", "transfer"]
@dataclass
class EvidenceItem:
concept_key: str
evidence_type: EvidenceType
score: float
notes: str = ""
@dataclass
class ConceptEvidenceSummary:
concept_key: str
count: int = 0
mean_score: float = 0.0
@dataclass
class EvidenceState:
evidence_by_concept: dict[str, list[EvidenceItem]] = field(default_factory=dict)
summary_by_concept: dict[str, ConceptEvidenceSummary] = field(default_factory=dict)
resurfaced_concepts: set[str] = field(default_factory=set)
def clamp_score(score: float) -> float:
return max(0.0, min(1.0, score))
def add_evidence_item(state: EvidenceState, item: EvidenceItem) -> None:
item.score = clamp_score(item.score)
state.evidence_by_concept.setdefault(item.concept_key, []).append(item)
items = state.evidence_by_concept[item.concept_key]
mean_score = sum(x.score for x in items) / len(items)
state.summary_by_concept[item.concept_key] = ConceptEvidenceSummary(
concept_key=item.concept_key,
count=len(items),
mean_score=mean_score,
)
def update_profile_mastery_from_evidence(
profile: LearnerProfile,
state: EvidenceState,
mastery_threshold: float,
resurfacing_threshold: float,
) -> None:
for concept_key, summary in state.summary_by_concept.items():
if summary.count == 0:
continue
if summary.mean_score >= mastery_threshold:
profile.mastered_concepts.add(concept_key)
if concept_key in state.resurfaced_concepts:
state.resurfaced_concepts.remove(concept_key)
elif concept_key in profile.mastered_concepts and summary.mean_score < resurfacing_threshold:
profile.mastered_concepts.remove(concept_key)
state.resurfaced_concepts.add(concept_key)
def ingest_evidence_bundle(
profile: LearnerProfile,
items: list[EvidenceItem],
mastery_threshold: float,
resurfacing_threshold: float,
) -> EvidenceState:
state = EvidenceState()
for item in items:
add_evidence_item(state, item)
update_profile_mastery_from_evidence(
profile=profile,
state=state,
mastery_threshold=mastery_threshold,
resurfacing_threshold=resurfacing_threshold,
)
return state

View File

@ -21,67 +21,27 @@ class MergedLearningGraph:
load_order: list[str] = field(default_factory=list) load_order: list[str] = field(default_factory=list)
def _build_override_targets(results: list[PackValidationResult]) -> set[str]:
targets: set[str] = set()
for result in results:
if result.manifest is None or not result.is_valid:
continue
targets.update(result.manifest.overrides)
return targets
def build_merged_learning_graph(results: list[PackValidationResult]) -> MergedLearningGraph: def build_merged_learning_graph(results: list[PackValidationResult]) -> MergedLearningGraph:
merged = MergedLearningGraph() merged = MergedLearningGraph()
valid = { valid = {r.manifest.name: r for r in results if r.manifest is not None and r.is_valid}
r.manifest.name: r
for r in results
if r.manifest is not None and r.is_valid
}
merged.load_order = topological_pack_order(results) merged.load_order = topological_pack_order(results)
override_targets = _build_override_targets(results)
# Add concepts
for pack_name in merged.load_order: for pack_name in merged.load_order:
result = valid[pack_name] result = valid[pack_name]
concepts_file = result.loaded_files.get("concepts") concepts_file = result.loaded_files.get("concepts")
if concepts_file is None: if concepts_file is None:
continue continue
for concept in concepts_file.concepts: for concept in concepts_file.concepts:
local_key = namespaced_concept(pack_name, concept.id) key = namespaced_concept(pack_name, concept.id)
merged.concept_data[key] = {
# explicit override support: same local concept ID may replace one namespaced target
replaced = False
for target in result.manifest.overrides:
if target.split("::")[-1] == concept.id and target in merged.concept_data:
merged.concept_data[target] = {
"id": concept.id,
"title": concept.title,
"pack": pack_name,
"prerequisites": [],
"mastery_signals": list(concept.mastery_signals),
"overridden_by": local_key,
}
replaced = True
break
if replaced:
continue
if local_key in merged.concept_data:
merged.conflicts.append(f"duplicate namespaced concept key: {local_key}")
continue
merged.concept_data[local_key] = {
"id": concept.id, "id": concept.id,
"title": concept.title, "title": concept.title,
"pack": pack_name, "pack": pack_name,
"prerequisites": list(concept.prerequisites), "prerequisites": list(concept.prerequisites),
"mastery_signals": list(concept.mastery_signals), "mastery_signals": list(concept.mastery_signals),
} }
merged.graph.add_node(local_key) merged.graph.add_node(key)
# Add prerequisite edges within each pack
for pack_name in merged.load_order: for pack_name in merged.load_order:
result = valid[pack_name] result = valid[pack_name]
concepts_file = result.loaded_files.get("concepts") concepts_file = result.loaded_files.get("concepts")
@ -89,66 +49,32 @@ def build_merged_learning_graph(results: list[PackValidationResult]) -> MergedLe
continue continue
for concept in concepts_file.concepts: for concept in concepts_file.concepts:
concept_key = namespaced_concept(pack_name, concept.id) concept_key = namespaced_concept(pack_name, concept.id)
if concept_key not in merged.graph:
continue
for prereq in concept.prerequisites: for prereq in concept.prerequisites:
prereq_key = namespaced_concept(pack_name, prereq) prereq_key = namespaced_concept(pack_name, prereq)
if prereq_key in merged.graph: if prereq_key in merged.graph:
merged.graph.add_edge(prereq_key, concept_key) merged.graph.add_edge(prereq_key, concept_key)
else:
merged.conflicts.append(
f"missing namespaced prerequisite '{prereq_key}' for concept '{concept_key}'"
)
# Merge stage catalog
for pack_name in merged.load_order:
result = valid[pack_name]
roadmap_file = result.loaded_files.get("roadmap") roadmap_file = result.loaded_files.get("roadmap")
if roadmap_file is None: if roadmap_file is not None:
continue for stage in roadmap_file.stages:
for stage in roadmap_file.stages: merged.stage_catalog.append({
merged.stage_catalog.append(
{
"id": f"{pack_name}::{stage.id}", "id": f"{pack_name}::{stage.id}",
"pack": pack_name, "pack": pack_name,
"title": stage.title, "title": stage.title,
"concepts": [namespaced_concept(pack_name, c) for c in stage.concepts], "concepts": [namespaced_concept(pack_name, c) for c in stage.concepts],
"checkpoint": list(stage.checkpoint), "checkpoint": list(stage.checkpoint),
} })
)
# Merge project catalog
for pack_name in merged.load_order:
result = valid[pack_name]
projects_file = result.loaded_files.get("projects") projects_file = result.loaded_files.get("projects")
if projects_file is None: if projects_file is not None:
continue for project in projects_file.projects:
for project in projects_file.projects: merged.project_catalog.append({
merged.project_catalog.append(
{
"id": f"{pack_name}::{project.id}", "id": f"{pack_name}::{project.id}",
"pack": pack_name, "pack": pack_name,
"title": project.title, "title": project.title,
"difficulty": project.difficulty, "difficulty": project.difficulty,
"prerequisites": [namespaced_concept(pack_name, p) for p in project.prerequisites], "prerequisites": [namespaced_concept(pack_name, p) for p in project.prerequisites],
"deliverables": list(project.deliverables), "deliverables": list(project.deliverables),
} })
)
return merged return merged
def generate_learner_roadmap(merged: MergedLearningGraph) -> list[dict[str, Any]]:
roadmap: list[dict[str, Any]] = []
for i, concept_key in enumerate(nx.topological_sort(merged.graph), start=1):
data = merged.concept_data[concept_key]
roadmap.append(
{
"stage_number": i,
"concept_key": concept_key,
"title": data["title"],
"pack": data["pack"],
"prerequisites": list(merged.graph.predecessors(concept_key)),
}
)
return roadmap

View File

@ -2,16 +2,12 @@ import argparse
import os import os
from pathlib import Path from pathlib import Path
from .artifact_registry import ( from .adaptive_engine import LearnerProfile, build_adaptive_plan
check_pack_dependencies, from .artifact_registry import check_pack_dependencies, detect_dependency_cycles, discover_domain_packs, topological_pack_order
detect_dependency_cycles,
discover_domain_packs,
topological_pack_order,
)
from .config import load_config from .config import load_config
from .curriculum import generate_stages_from_learning_graph from .evidence_engine import EvidenceItem, ingest_evidence_bundle
from .evaluation import generate_rubric from .evaluation import score_simple_rubric
from .learning_graph import build_merged_learning_graph, generate_learner_roadmap from .learning_graph import build_merged_learning_graph
from .mentor import generate_socratic_prompt from .mentor import generate_socratic_prompt
from .model_provider import ModelProvider from .model_provider import ModelProvider
from .practice import generate_practice_task from .practice import generate_practice_task
@ -19,7 +15,7 @@ from .project_advisor import suggest_capstone
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Didactopus merged learning graph scaffold") parser = argparse.ArgumentParser(description="Didactopus evidence-driven mastery scaffold")
parser.add_argument("--domain", required=True) parser.add_argument("--domain", required=True)
parser.add_argument("--goal", required=True) parser.add_argument("--goal", required=True)
parser.add_argument( parser.add_argument(
@ -40,54 +36,96 @@ def main() -> None:
print("== Didactopus ==") print("== Didactopus ==")
print("Many arms, one goal — mastery.") print("Many arms, one goal — mastery.")
print() print()
print("== Domain Pack Validation ==")
for pack in packs:
pack_name = pack.manifest.display_name if pack.manifest else pack.pack_dir.name
print(f"- {pack_name}: {'valid' if pack.is_valid else 'INVALID'}")
for err in pack.errors:
print(f" * {err}")
print()
print("== Dependency Resolution ==")
if dependency_errors: if dependency_errors:
print("== Dependency Errors ==")
for err in dependency_errors: for err in dependency_errors:
print(f"- {err}") print(f"- {err}")
else: print()
print("- all resolved")
print()
print("== Dependency Cycles ==")
if cycles: if cycles:
print("== Dependency Cycles ==")
for cycle in cycles: for cycle in cycles:
print(f"- cycle: {' -> '.join(cycle)}") print(f"- cycle: {' -> '.join(cycle)}")
print("- merged learning graph unavailable while cycles exist")
return return
print("- none")
print()
print("== Pack Load Order ==") print("== Pack Load Order ==")
for name in topological_pack_order(packs): for name in topological_pack_order(packs):
print(f"- {name}") print(f"- {name}")
print() print()
merged = build_merged_learning_graph(packs) merged = build_merged_learning_graph(packs)
learner_roadmap = generate_learner_roadmap(merged) profile = LearnerProfile(
print("== Merged Learning Graph ==") learner_id="demo-learner",
print(f"- concepts: {len(merged.concept_data)}") display_name="Demo Learner",
print(f"- prerequisite edges: {merged.graph.number_of_edges()}") goals=[args.goal],
print(f"- roadmap stages: {len(merged.stage_catalog)}") mastered_concepts=set(),
print(f"- projects: {len(merged.project_catalog)}") hide_mastered=True,
if merged.conflicts: )
for conflict in merged.conflicts:
print(f" * conflict: {conflict}") demo_score = score_simple_rubric(0.9, 0.85, 0.8, 0.75)
else: evidence_items = [
print("- no merge conflicts") EvidenceItem(
concept_key="foundations-statistics::descriptive-statistics",
evidence_type="explanation",
score=demo_score.mean(),
notes="Strong introductory explanation.",
),
EvidenceItem(
concept_key="foundations-statistics::descriptive-statistics",
evidence_type="problem",
score=0.88,
notes="Solved summary statistics problem correctly.",
),
EvidenceItem(
concept_key="bayes-extension::prior",
evidence_type="explanation",
score=0.62,
notes="Partial understanding of priors.",
),
]
evidence_state = ingest_evidence_bundle(
profile=profile,
items=evidence_items,
mastery_threshold=config.platform.mastery_threshold,
resurfacing_threshold=config.platform.resurfacing_threshold,
)
plan = build_adaptive_plan(merged, profile)
print("== Evidence Summary ==")
for concept_key, summary in evidence_state.summary_by_concept.items():
print(f"- {concept_key}: count={summary.count}, mean={summary.mean_score:.2f}")
print() print()
print("== Learner Roadmap ==") print("== Mastered Concepts After Evidence ==")
for item in learner_roadmap[:8]: for concept_key in sorted(profile.mastered_concepts):
print(f"- Stage {item['stage_number']}: {item['concept_key']} ({item['title']})") print(f"- {concept_key}")
print() print()
if learner_roadmap: print("== Resurfaced Concepts ==")
focus_concept = learner_roadmap[0]["concept_key"] if evidence_state.resurfaced_concepts:
for concept_key in sorted(evidence_state.resurfaced_concepts):
print(f"- {concept_key}")
else: else:
focus_concept = args.domain print("- none")
print()
print("== Adaptive Plan Summary ==")
print(f"- roadmap items visible: {len(plan.learner_roadmap)}")
print(f"- next-best concepts: {len(plan.next_best_concepts)}")
print(f"- eligible projects: {len(plan.eligible_projects)}")
print()
print("== Next Best Concepts ==")
for concept in plan.next_best_concepts:
print(f"- {concept}")
print()
print("== Eligible Projects ==")
if plan.eligible_projects:
for project in plan.eligible_projects:
print(f"- {project['id']}: {project['title']}")
else:
print("- none yet")
print()
focus_concept = plan.next_best_concepts[0] if plan.next_best_concepts else args.domain
print(generate_socratic_prompt(provider, focus_concept)) print(generate_socratic_prompt(provider, focus_concept))
print(generate_practice_task(provider, focus_concept)) print(generate_practice_task(provider, focus_concept))
print(suggest_capstone(provider, args.domain)) print(suggest_capstone(provider, args.domain))
print(generate_rubric(provider, focus_concept))

View File

@ -0,0 +1,20 @@
from didactopus.adaptive_engine import LearnerProfile, build_adaptive_plan
from didactopus.artifact_registry import discover_domain_packs
from didactopus.evidence_engine import EvidenceItem, ingest_evidence_bundle
from didactopus.learning_graph import build_merged_learning_graph
def test_evidence_drives_plan() -> None:
merged = build_merged_learning_graph(discover_domain_packs(["domain-packs"]))
profile = LearnerProfile(learner_id="u1")
ingest_evidence_bundle(
profile,
[
EvidenceItem("foundations-statistics::descriptive-statistics", "problem", 0.9),
EvidenceItem("foundations-statistics::descriptive-statistics", "explanation", 0.85),
],
mastery_threshold=0.8,
resurfacing_threshold=0.55,
)
plan = build_adaptive_plan(merged, profile)
assert "foundations-statistics::probability-basics" in plan.next_best_concepts

View File

@ -0,0 +1,40 @@
from didactopus.adaptive_engine import LearnerProfile
from didactopus.evidence_engine import EvidenceItem, add_evidence_item, ingest_evidence_bundle, EvidenceState
def test_add_evidence_item_updates_mean() -> None:
state = EvidenceState()
add_evidence_item(state, EvidenceItem("c1", "problem", 0.8))
add_evidence_item(state, EvidenceItem("c1", "problem", 0.6))
assert state.summary_by_concept["c1"].count == 2
assert abs(state.summary_by_concept["c1"].mean_score - 0.7) < 1e-9
def test_mastery_promotion() -> None:
profile = LearnerProfile(learner_id="u1")
state = ingest_evidence_bundle(
profile,
[
EvidenceItem("c1", "explanation", 0.85),
EvidenceItem("c1", "problem", 0.9),
],
mastery_threshold=0.8,
resurfacing_threshold=0.55,
)
assert "c1" in profile.mastered_concepts
assert state.resurfaced_concepts == set()
def test_resurfacing() -> None:
profile = LearnerProfile(learner_id="u1", mastered_concepts={"c1"})
state = ingest_evidence_bundle(
profile,
[
EvidenceItem("c1", "problem", 0.4),
EvidenceItem("c1", "transfer", 0.5),
],
mastery_threshold=0.8,
resurfacing_threshold=0.55,
)
assert "c1" not in profile.mastered_concepts
assert "c1" in state.resurfaced_concepts