Weighted evidence update

This commit is contained in:
welsberr 2026-03-12 21:42:40 -04:00
parent e4d416a48d
commit d682d2774f
13 changed files with 345 additions and 220 deletions

View File

@ -2,26 +2,21 @@
**Didactopus** is a local-first AI-assisted autodidactic mastery platform.
This revision adds an evidence-driven mastery engine on top of the adaptive learner model.
This revision upgrades the evidence layer from simple averaging to a more realistic weighted and recency-aware mastery model.
## Added in this revision
- evidence record models
- rubric-style evidence scoring
- concept mastery updates from accumulated evidence
- weak-concept resurfacing
- automatic learner state updates from evidence bundles
- project evidence integration
- CLI demonstration of evidence-driven progression
- tests for mastery promotion and resurfacing
- evidence-type weighting
- recency weighting
- confidence estimation from weighted evidence mass
- dimension-level rubric storage
- weighted concept summaries
- mastery decisions using weighted score and confidence
- resurfacing from recent weak evidence
- tests for weighted scoring and recency behavior
## Why this matters
Didactopus no longer needs mastery to be supplied only by hand. It can now begin to infer learner state from observed evidence such as:
Not all evidence should count equally.
- explanation quality
- problem-solving performance
- project completion
- transfer-task performance
That is a necessary step toward a genuine mastery engine.
A capstone project or transfer task should usually matter more than a short explanation, and recent poor performance should sometimes matter more than older success. This revision begins to model that explicitly.

View File

@ -16,6 +16,13 @@ platform:
permit_direct_answers: false
mastery_threshold: 0.8
resurfacing_threshold: 0.55
confidence_threshold: 0.8
evidence_weights:
explanation: 1.0
problem: 1.5
project: 2.5
transfer: 2.0
recent_evidence_multiplier: 1.35
artifacts:
local_pack_dirs:

47
docs/weighted-evidence.md Normal file
View File

@ -0,0 +1,47 @@
# Weighted Evidence Model
## Purpose
The earlier evidence engine treated all evidence items equally. This revision adds a more realistic model with:
- evidence-type weights
- recency weighting
- dimension-level rubric storage
- confidence estimates based on weighted support
## Evidence weighting
Default weights:
- explanation: 1.0
- problem: 1.5
- transfer: 2.0
- project: 2.5
## Recency policy
Each evidence item can be marked `is_recent`. Recent items receive a multiplier. This allows weak recent performance to matter more than stale success, which is useful for resurfacing fragile concepts.
## Confidence
Confidence is currently derived from total weighted evidence mass using a saturating function:
`confidence = total_weight / (total_weight + 1.0)`
This is simple, monotonic, and interpretable.
## Current mastery rule
A concept is mastered if:
- weighted mean score >= mastery threshold
- confidence >= confidence threshold
A previously mastered concept resurfaces if:
- weighted mean score < resurfacing threshold
- and recent weak evidence drags its summary downward enough
## Future work
- per-dimension mastery thresholds
- decay by timestamp instead of a boolean recent flag
- Bayesian knowledge tracing
- separate competence vs fluency models

View File

@ -1,17 +1 @@
__version__ = "0.1.0"
__all__ = [
"__version__",
"adaptive_engine",
"artifact_registry",
"artifact_schemas",
"config",
"evidence_engine",
"evaluation",
"learning_graph",
"main",
"mentor",
"model_provider",
"practice",
"project_advisor",
]

View File

@ -27,50 +27,35 @@ class AdaptivePlan:
def classify_node_status(merged: MergedLearningGraph, profile: LearnerProfile) -> dict[str, NodeStatus]:
status: dict[str, NodeStatus] = {}
status = {}
for concept_key in nx.topological_sort(merged.graph):
if concept_key in profile.mastered_concepts:
status[concept_key] = "hidden" if profile.hide_mastered else "mastered"
continue
prereqs = set(merged.graph.predecessors(concept_key))
if prereqs.issubset(profile.mastered_concepts):
status[concept_key] = "ready"
else:
status[concept_key] = "blocked"
status[concept_key] = "ready" if prereqs.issubset(profile.mastered_concepts) else "blocked"
return status
def select_next_best_concepts(status: dict[str, NodeStatus], limit: int = 5) -> list[str]:
return [concept for concept, s in status.items() if s == "ready"][:limit]
def recommend_projects(merged: MergedLearningGraph, profile: LearnerProfile) -> list[dict]:
eligible = []
for project in merged.project_catalog:
if set(project["prerequisites"]).issubset(profile.mastered_concepts):
eligible.append(project)
return eligible
def build_adaptive_plan(merged: MergedLearningGraph, profile: LearnerProfile, next_limit: int = 5) -> AdaptivePlan:
status = classify_node_status(merged, profile)
roadmap = []
for concept_key in nx.topological_sort(merged.graph):
node_state = status[concept_key]
if node_state == "hidden":
state = status[concept_key]
if state == "hidden":
continue
concept = merged.concept_data[concept_key]
data = merged.concept_data[concept_key]
roadmap.append({
"concept_key": concept_key,
"title": concept["title"],
"pack": concept["pack"],
"status": node_state,
"title": data["title"],
"pack": data["pack"],
"status": state,
"prerequisites": list(merged.graph.predecessors(concept_key)),
})
return AdaptivePlan(
node_status=status,
learner_roadmap=roadmap,
next_best_concepts=select_next_best_concepts(status, limit=next_limit),
eligible_projects=recommend_projects(merged, profile),
)
eligible = [
p for p in merged.project_catalog
if set(p["prerequisites"]).issubset(profile.mastered_concepts)
]
next_best = [k for k, s in status.items() if s == "ready"][:next_limit]
return AdaptivePlan(status, roadmap, next_best, eligible)

View File

@ -20,18 +20,11 @@ REQUIRED_FILES = ["pack.yaml", "concepts.yaml", "roadmap.yaml", "projects.yaml",
def _parse_version(version: str) -> tuple[int, ...]:
parts = []
for chunk in version.split("."):
digits = "".join(ch for ch in chunk if ch.isdigit())
parts.append(int(digits) if digits else 0)
return tuple(parts)
return tuple(int("".join(ch for ch in chunk if ch.isdigit()) or 0) for chunk in version.split("."))
def _version_in_range(version: str, min_version: str, max_version: str) -> bool:
v = _parse_version(version)
vmin = _parse_version(min_version)
vmax = _parse_version(max_version)
return vmin <= v <= vmax
return _parse_version(min_version) <= _parse_version(version) <= _parse_version(max_version)
@dataclass
@ -52,40 +45,6 @@ def _load_yaml(path: Path) -> dict[str, Any]:
return data
def _check_duplicate_ids(entries: list[Any], label: str) -> list[str]:
errors: list[str] = []
seen: set[str] = set()
for entry in entries:
if entry.id in seen:
errors.append(f"duplicate {label} id: {entry.id}")
seen.add(entry.id)
return errors
def _check_concept_references(concepts_file: ConceptsFile, roadmap_file: RoadmapFile, projects_file: ProjectsFile) -> list[str]:
errors: list[str] = []
concept_ids = {c.id for c in concepts_file.concepts}
for concept in concepts_file.concepts:
for prereq in concept.prerequisites:
if prereq not in concept_ids:
errors.append(f"unknown concept prerequisite '{prereq}' referenced by concept '{concept.id}'")
for stage in roadmap_file.stages:
for concept_id in stage.concepts:
if concept_id not in concept_ids:
errors.append(f"unknown concept '{concept_id}' referenced by roadmap stage '{stage.id}'")
for project in projects_file.projects:
for prereq in project.prerequisites:
if prereq not in concept_ids:
errors.append(f"unknown concept prerequisite '{prereq}' referenced by project '{project.id}'")
return errors
def _check_core_compatibility(manifest: PackManifest) -> list[str]:
if _version_in_range(DIDACTOPUS_VERSION, manifest.didactopus_min_version, manifest.didactopus_max_version):
return []
return [f"incompatible with Didactopus core version {DIDACTOPUS_VERSION}; supported range is {manifest.didactopus_min_version}..{manifest.didactopus_max_version}"]
def validate_pack(pack_dir: str | Path) -> PackValidationResult:
pack_path = Path(pack_dir)
result = PackValidationResult(pack_dir=pack_path)
@ -98,41 +57,21 @@ def validate_pack(pack_dir: str | Path) -> PackValidationResult:
try:
result.manifest = PackManifest.model_validate(_load_yaml(pack_path / "pack.yaml"))
result.errors.extend(_check_core_compatibility(result.manifest))
if not _version_in_range(DIDACTOPUS_VERSION, result.manifest.didactopus_min_version, result.manifest.didactopus_max_version):
result.errors.append(
f"incompatible with Didactopus core version {DIDACTOPUS_VERSION}; supported range is "
f"{result.manifest.didactopus_min_version}..{result.manifest.didactopus_max_version}"
)
concepts_data = _load_yaml(pack_path / "concepts.yaml")
result.errors.extend(validate_top_level_key(concepts_data, "concepts"))
concepts_file = None
if "concepts" in concepts_data:
concepts_file = ConceptsFile.model_validate(concepts_data)
result.loaded_files["concepts"] = concepts_file
result.errors.extend(_check_duplicate_ids(concepts_file.concepts, "concept"))
concepts = ConceptsFile.model_validate(_load_yaml(pack_path / "concepts.yaml"))
roadmap = RoadmapFile.model_validate(_load_yaml(pack_path / "roadmap.yaml"))
projects = ProjectsFile.model_validate(_load_yaml(pack_path / "projects.yaml"))
rubrics = RubricsFile.model_validate(_load_yaml(pack_path / "rubrics.yaml"))
roadmap_data = _load_yaml(pack_path / "roadmap.yaml")
result.errors.extend(validate_top_level_key(roadmap_data, "stages"))
roadmap_file = None
if "stages" in roadmap_data:
roadmap_file = RoadmapFile.model_validate(roadmap_data)
result.loaded_files["roadmap"] = roadmap_file
result.errors.extend(_check_duplicate_ids(roadmap_file.stages, "roadmap stage"))
projects_data = _load_yaml(pack_path / "projects.yaml")
result.errors.extend(validate_top_level_key(projects_data, "projects"))
projects_file = None
if "projects" in projects_data:
projects_file = ProjectsFile.model_validate(projects_data)
result.loaded_files["projects"] = projects_file
result.errors.extend(_check_duplicate_ids(projects_file.projects, "project"))
rubrics_data = _load_yaml(pack_path / "rubrics.yaml")
result.errors.extend(validate_top_level_key(rubrics_data, "rubrics"))
if "rubrics" in rubrics_data:
rubrics_file = RubricsFile.model_validate(rubrics_data)
result.loaded_files["rubrics"] = rubrics_file
result.errors.extend(_check_duplicate_ids(rubrics_file.rubrics, "rubric"))
if concepts_file and roadmap_file and projects_file:
result.errors.extend(_check_concept_references(concepts_file, roadmap_file, projects_file))
result.loaded_files["concepts"] = concepts
result.loaded_files["roadmap"] = roadmap
result.loaded_files["projects"] = projects
result.loaded_files["rubrics"] = rubrics
except Exception as exc:
result.errors.append(str(exc))
@ -141,7 +80,7 @@ def validate_pack(pack_dir: str | Path) -> PackValidationResult:
def discover_domain_packs(base_dirs: list[str | Path]) -> list[PackValidationResult]:
results: list[PackValidationResult] = []
results = []
for base_dir in base_dirs:
base = Path(base_dir)
if not base.exists():
@ -152,7 +91,7 @@ def discover_domain_packs(base_dirs: list[str | Path]) -> list[PackValidationRes
def check_pack_dependencies(results: list[PackValidationResult]) -> list[str]:
errors: list[str] = []
errors = []
manifest_by_name = {r.manifest.name: r.manifest for r in results if r.manifest is not None}
for result in results:
if result.manifest is None:
@ -163,7 +102,10 @@ def check_pack_dependencies(results: list[PackValidationResult]) -> list[str]:
errors.append(f"pack '{result.manifest.name}' depends on missing pack '{dep.name}'")
continue
if not _version_in_range(dep_manifest.version, dep.min_version, dep.max_version):
errors.append(f"pack '{result.manifest.name}' requires '{dep.name}' version {dep.min_version}..{dep.max_version}, but found {dep_manifest.version}")
errors.append(
f"pack '{result.manifest.name}' requires '{dep.name}' version "
f"{dep.min_version}..{dep.max_version}, but found {dep_manifest.version}"
)
return errors

View File

@ -28,6 +28,16 @@ class PlatformConfig(BaseModel):
permit_direct_answers: bool = False
mastery_threshold: float = 0.8
resurfacing_threshold: float = 0.55
confidence_threshold: float = 0.8
evidence_weights: dict[str, float] = Field(
default_factory=lambda: {
"explanation": 1.0,
"problem": 1.5,
"project": 2.5,
"transfer": 2.0,
}
)
recent_evidence_multiplier: float = 1.35
class ArtifactConfig(BaseModel):

View File

@ -12,15 +12,5 @@ class RubricScore:
return (self.correctness + self.clarity + self.justification + self.transfer) / 4.0
def score_simple_rubric(
correctness: float,
clarity: float,
justification: float,
transfer: float,
) -> RubricScore:
return RubricScore(
correctness=correctness,
clarity=clarity,
justification=justification,
transfer=transfer,
)
def score_simple_rubric(correctness: float, clarity: float, justification: float, transfer: float) -> RubricScore:
return RubricScore(correctness, clarity, justification, transfer)

View File

@ -14,13 +14,18 @@ class EvidenceItem:
evidence_type: EvidenceType
score: float
notes: str = ""
is_recent: bool = False
rubric_dimensions: dict[str, float] = field(default_factory=dict)
@dataclass
class ConceptEvidenceSummary:
concept_key: str
count: int = 0
mean_score: float = 0.0
weighted_mean_score: float = 0.0
total_weight: float = 0.0
confidence: float = 0.0
dimension_means: dict[str, float] = field(default_factory=dict)
@dataclass
@ -34,15 +39,66 @@ def clamp_score(score: float) -> float:
return max(0.0, min(1.0, score))
def add_evidence_item(state: EvidenceState, item: EvidenceItem) -> None:
def evidence_weight(item: EvidenceItem, type_weights: dict[str, float], recent_multiplier: float) -> float:
base = type_weights.get(item.evidence_type, 1.0)
return base * (recent_multiplier if item.is_recent else 1.0)
def confidence_from_weight(total_weight: float) -> float:
return total_weight / (total_weight + 1.0) if total_weight > 0 else 0.0
def recompute_concept_summary(
concept_key: str,
items: list[EvidenceItem],
type_weights: dict[str, float],
recent_multiplier: float,
) -> ConceptEvidenceSummary:
weighted_score_sum = 0.0
total_weight = 0.0
dimension_totals: dict[str, float] = {}
dimension_weights: dict[str, float] = {}
for item in items:
item.score = clamp_score(item.score)
w = evidence_weight(item, type_weights, recent_multiplier)
weighted_score_sum += item.score * w
total_weight += w
for dim, val in item.rubric_dimensions.items():
v = clamp_score(val)
dimension_totals[dim] = dimension_totals.get(dim, 0.0) + v * w
dimension_weights[dim] = dimension_weights.get(dim, 0.0) + w
dimension_means = {
dim: (dimension_totals[dim] / dimension_weights[dim])
for dim in dimension_totals
if dimension_weights[dim] > 0
}
return ConceptEvidenceSummary(
concept_key=concept_key,
count=len(items),
weighted_mean_score=(weighted_score_sum / total_weight) if total_weight > 0 else 0.0,
total_weight=total_weight,
confidence=confidence_from_weight(total_weight),
dimension_means=dimension_means,
)
def add_evidence_item(
state: EvidenceState,
item: EvidenceItem,
type_weights: dict[str, float],
recent_multiplier: float,
) -> None:
item.score = clamp_score(item.score)
state.evidence_by_concept.setdefault(item.concept_key, []).append(item)
items = state.evidence_by_concept[item.concept_key]
mean_score = sum(x.score for x in items) / len(items)
state.summary_by_concept[item.concept_key] = ConceptEvidenceSummary(
concept_key=item.concept_key,
count=len(items),
mean_score=mean_score,
state.summary_by_concept[item.concept_key] = recompute_concept_summary(
item.concept_key,
state.evidence_by_concept[item.concept_key],
type_weights,
recent_multiplier,
)
@ -51,15 +107,22 @@ def update_profile_mastery_from_evidence(
state: EvidenceState,
mastery_threshold: float,
resurfacing_threshold: float,
confidence_threshold: float,
) -> None:
for concept_key, summary in state.summary_by_concept.items():
if summary.count == 0:
continue
if summary.mean_score >= mastery_threshold:
if (
summary.weighted_mean_score >= mastery_threshold
and summary.confidence >= confidence_threshold
):
profile.mastered_concepts.add(concept_key)
if concept_key in state.resurfaced_concepts:
state.resurfaced_concepts.remove(concept_key)
elif concept_key in profile.mastered_concepts and summary.mean_score < resurfacing_threshold:
state.resurfaced_concepts.discard(concept_key)
elif (
concept_key in profile.mastered_concepts
and summary.weighted_mean_score < resurfacing_threshold
):
profile.mastered_concepts.remove(concept_key)
state.resurfaced_concepts.add(concept_key)
@ -69,14 +132,18 @@ def ingest_evidence_bundle(
items: list[EvidenceItem],
mastery_threshold: float,
resurfacing_threshold: float,
confidence_threshold: float,
type_weights: dict[str, float],
recent_multiplier: float,
) -> EvidenceState:
state = EvidenceState()
for item in items:
add_evidence_item(state, item)
add_evidence_item(state, item, type_weights, recent_multiplier)
update_profile_mastery_from_evidence(
profile=profile,
state=state,
mastery_threshold=mastery_threshold,
resurfacing_threshold=resurfacing_threshold,
confidence_threshold=confidence_threshold,
)
return state

View File

@ -15,9 +15,7 @@ def namespaced_concept(pack_name: str, concept_id: str) -> str:
class MergedLearningGraph:
graph: nx.DiGraph = field(default_factory=nx.DiGraph)
concept_data: dict[str, dict[str, Any]] = field(default_factory=dict)
stage_catalog: list[dict[str, Any]] = field(default_factory=list)
project_catalog: list[dict[str, Any]] = field(default_factory=list)
conflicts: list[str] = field(default_factory=list)
load_order: list[str] = field(default_factory=list)
@ -28,9 +26,7 @@ def build_merged_learning_graph(results: list[PackValidationResult]) -> MergedLe
for pack_name in merged.load_order:
result = valid[pack_name]
concepts_file = result.loaded_files.get("concepts")
if concepts_file is None:
continue
concepts_file = result.loaded_files["concepts"]
for concept in concepts_file.concepts:
key = namespaced_concept(pack_name, concept.id)
merged.concept_data[key] = {
@ -44,9 +40,7 @@ def build_merged_learning_graph(results: list[PackValidationResult]) -> MergedLe
for pack_name in merged.load_order:
result = valid[pack_name]
concepts_file = result.loaded_files.get("concepts")
if concepts_file is None:
continue
concepts_file = result.loaded_files["concepts"]
for concept in concepts_file.concepts:
concept_key = namespaced_concept(pack_name, concept.id)
for prereq in concept.prerequisites:
@ -54,19 +48,7 @@ def build_merged_learning_graph(results: list[PackValidationResult]) -> MergedLe
if prereq_key in merged.graph:
merged.graph.add_edge(prereq_key, concept_key)
roadmap_file = result.loaded_files.get("roadmap")
if roadmap_file is not None:
for stage in roadmap_file.stages:
merged.stage_catalog.append({
"id": f"{pack_name}::{stage.id}",
"pack": pack_name,
"title": stage.title,
"concepts": [namespaced_concept(pack_name, c) for c in stage.concepts],
"checkpoint": list(stage.checkpoint),
})
projects_file = result.loaded_files.get("projects")
if projects_file is not None:
projects_file = result.loaded_files["projects"]
for project in projects_file.projects:
merged.project_catalog.append({
"id": f"{pack_name}::{project.id}",

View File

@ -3,7 +3,12 @@ import os
from pathlib import Path
from .adaptive_engine import LearnerProfile, build_adaptive_plan
from .artifact_registry import check_pack_dependencies, detect_dependency_cycles, discover_domain_packs, topological_pack_order
from .artifact_registry import (
check_pack_dependencies,
detect_dependency_cycles,
discover_domain_packs,
topological_pack_order,
)
from .config import load_config
from .evidence_engine import EvidenceItem, ingest_evidence_bundle
from .evaluation import score_simple_rubric
@ -15,7 +20,7 @@ from .project_advisor import suggest_capstone
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Didactopus evidence-driven mastery scaffold")
parser = argparse.ArgumentParser(description="Didactopus weighted evidence scaffold")
parser.add_argument("--domain", required=True)
parser.add_argument("--goal", required=True)
parser.add_argument(
@ -63,44 +68,76 @@ def main() -> None:
hide_mastered=True,
)
demo_score = score_simple_rubric(0.9, 0.85, 0.8, 0.75)
rubric = score_simple_rubric(0.92, 0.86, 0.82, 0.78)
evidence_items = [
EvidenceItem(
concept_key="foundations-statistics::descriptive-statistics",
evidence_type="explanation",
score=demo_score.mean(),
notes="Strong introductory explanation.",
score=rubric.mean(),
is_recent=False,
rubric_dimensions={
"correctness": rubric.correctness,
"clarity": rubric.clarity,
"justification": rubric.justification,
"transfer": rubric.transfer,
},
notes="Good explanation.",
),
EvidenceItem(
concept_key="foundations-statistics::descriptive-statistics",
evidence_type="problem",
score=0.88,
notes="Solved summary statistics problem correctly.",
evidence_type="project",
score=0.9,
is_recent=True,
rubric_dimensions={
"correctness": 0.9,
"clarity": 0.84,
"justification": 0.88,
"transfer": 0.82,
},
notes="Strong project evidence.",
),
EvidenceItem(
concept_key="bayes-extension::prior",
evidence_type="explanation",
score=0.62,
notes="Partial understanding of priors.",
evidence_type="problem",
score=0.58,
is_recent=True,
rubric_dimensions={
"correctness": 0.6,
"clarity": 0.55,
},
notes="Recent weak but informative performance.",
),
]
evidence_state = ingest_evidence_bundle(
profile=profile,
items=evidence_items,
mastery_threshold=config.platform.mastery_threshold,
resurfacing_threshold=config.platform.resurfacing_threshold,
confidence_threshold=config.platform.confidence_threshold,
type_weights=config.platform.evidence_weights,
recent_multiplier=config.platform.recent_evidence_multiplier,
)
plan = build_adaptive_plan(merged, profile)
print("== Evidence Summary ==")
print("== Weighted Evidence Summary ==")
for concept_key, summary in evidence_state.summary_by_concept.items():
print(f"- {concept_key}: count={summary.count}, mean={summary.mean_score:.2f}")
print(
f"- {concept_key}: count={summary.count}, "
f"weighted_mean={summary.weighted_mean_score:.2f}, "
f"confidence={summary.confidence:.2f}, "
f"total_weight={summary.total_weight:.2f}"
)
if summary.dimension_means:
dims = ", ".join(f"{k}={v:.2f}" for k, v in sorted(summary.dimension_means.items()))
print(f" * dimensions: {dims}")
print()
print("== Mastered Concepts After Evidence ==")
print("== Mastered Concepts After Weighted Evidence ==")
for concept_key in sorted(profile.mastered_concepts):
print(f"- {concept_key}")
print()
print("== Resurfaced Concepts ==")
if evidence_state.resurfaced_concepts:
for concept_key in sorted(evidence_state.resurfaced_concepts):
@ -108,22 +145,17 @@ def main() -> None:
else:
print("- none")
print()
print("== Adaptive Plan Summary ==")
print(f"- roadmap items visible: {len(plan.learner_roadmap)}")
print(f"- next-best concepts: {len(plan.next_best_concepts)}")
print(f"- eligible projects: {len(plan.eligible_projects)}")
print()
print("== Next Best Concepts ==")
for concept in plan.next_best_concepts:
print(f"- {concept}")
print()
print("== Eligible Projects ==")
if plan.eligible_projects:
for project in plan.eligible_projects:
print(f"- {project['id']}: {project['title']}")
else:
print("- none yet")
print()
focus_concept = plan.next_best_concepts[0] if plan.next_best_concepts else args.domain
print(generate_socratic_prompt(provider, focus_concept))

View File

@ -4,5 +4,5 @@ from didactopus.config import load_config
def test_load_example_config() -> None:
config = load_config(Path("configs/config.example.yaml"))
assert config.model_provider.mode == "local_first"
assert "domain-packs" in config.artifacts.local_pack_dirs
assert config.platform.evidence_weights["project"] == 2.5
assert config.platform.recent_evidence_multiplier == 1.35

View File

@ -0,0 +1,84 @@
from didactopus.adaptive_engine import LearnerProfile
from didactopus.evidence_engine import (
EvidenceItem,
EvidenceState,
add_evidence_item,
confidence_from_weight,
evidence_weight,
ingest_evidence_bundle,
)
def test_evidence_weighting_by_type_and_recency() -> None:
item = EvidenceItem("c1", "project", 0.9, is_recent=True)
w = evidence_weight(
item,
{"explanation": 1.0, "problem": 1.5, "project": 2.5, "transfer": 2.0},
1.35,
)
assert abs(w - 3.375) < 1e-9
def test_confidence_increases_with_weight() -> None:
assert confidence_from_weight(0.0) == 0.0
assert confidence_from_weight(1.0) < confidence_from_weight(3.0)
def test_weighted_summary_promotes_mastery() -> None:
profile = LearnerProfile(learner_id="u1")
state = ingest_evidence_bundle(
profile,
[
EvidenceItem("c1", "project", 0.9, is_recent=True),
EvidenceItem("c1", "problem", 0.85, is_recent=False),
],
mastery_threshold=0.8,
resurfacing_threshold=0.55,
confidence_threshold=0.75,
type_weights={"explanation": 1.0, "problem": 1.5, "project": 2.5, "transfer": 2.0},
recent_multiplier=1.35,
)
assert "c1" in profile.mastered_concepts
assert state.summary_by_concept["c1"].weighted_mean_score >= 0.8
assert state.summary_by_concept["c1"].confidence >= 0.75
def test_recent_weak_evidence_can_resurface() -> None:
profile = LearnerProfile(learner_id="u1", mastered_concepts={"c1"})
state = ingest_evidence_bundle(
profile,
[
EvidenceItem("c1", "project", 0.3, is_recent=True),
EvidenceItem("c1", "explanation", 0.5, is_recent=True),
],
mastery_threshold=0.8,
resurfacing_threshold=0.55,
confidence_threshold=0.75,
type_weights={"explanation": 1.0, "problem": 1.5, "project": 2.5, "transfer": 2.0},
recent_multiplier=1.35,
)
assert "c1" not in profile.mastered_concepts
assert "c1" in state.resurfaced_concepts
def test_dimension_means_present() -> None:
profile = LearnerProfile(learner_id="u1")
state = ingest_evidence_bundle(
profile,
[
EvidenceItem(
"c1",
"problem",
0.8,
rubric_dimensions={"correctness": 0.9, "clarity": 0.7},
)
],
mastery_threshold=0.8,
resurfacing_threshold=0.55,
confidence_threshold=0.1,
type_weights={"explanation": 1.0, "problem": 1.5, "project": 2.5, "transfer": 2.0},
recent_multiplier=1.35,
)
summary = state.summary_by_concept["c1"]
assert abs(summary.dimension_means["correctness"] - 0.9) < 1e-9
assert abs(summary.dimension_means["clarity"] - 0.7) < 1e-9