diff --git a/pyproject.toml b/pyproject.toml
index 1fcb487..b6b46ba 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,7 +6,19 @@ build-backend = "setuptools.build_meta"
name = "didactopus"
version = "0.1.0"
requires-python = ">=3.10"
-dependencies = ["pydantic>=2.7", "pyyaml>=6.0"]
+dependencies = [
+ "pydantic>=2.7",
+ "fastapi>=0.115",
+ "uvicorn>=0.30",
+ "sqlalchemy>=2.0",
+ "passlib[bcrypt]>=1.7",
+ "python-jose[cryptography]>=3.3"
+]
+
+[project.scripts]
+didactopus-api = "didactopus.api:main"
+didactopus-export-svg = "didactopus.export_svg:main"
+didactopus-render-bundle = "didactopus.render_bundle:main"
[tool.setuptools.packages.find]
where = ["src"]
diff --git a/src/didactopus/api.py b/src/didactopus/api.py
index 4c91a9b..8f42cf9 100644
--- a/src/didactopus/api.py
+++ b/src/didactopus/api.py
@@ -1,103 +1,47 @@
from __future__ import annotations
-import json
-from fastapi import FastAPI, HTTPException, Header, Depends, BackgroundTasks
+from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
-import uvicorn
-from .config import load_settings
+import uvicorn, json, tempfile
+from pathlib import Path
from .db import Base, engine
-from .models import (
- LoginRequest, ServiceAccountLoginRequest, ServiceAccountCreateRequest, ServiceAccountRotateRequest,
- ServiceAccountStateRequest, ServiceToken, RefreshRequest, TokenPair, CreateLearnerRequest, LearnerState,
- EvidenceEvent, EvaluatorSubmission, EvaluatorJobStatus, CreatePackRequest, AgentCapabilityManifest,
- AgentLearnerPlanRequest, AgentLearnerPlanResponse, LearnerRunCreateRequest, WorkflowEventCreateRequest
-)
-from .repository import (
- authenticate_user, get_user_by_id, create_service_account, list_service_accounts, authenticate_service_account,
- rotate_service_account_secret, set_service_account_active, add_agent_audit_log, list_agent_audit_logs,
- create_learner_run, end_learner_run, list_learner_runs, add_workflow_event, list_workflow_events,
- store_refresh_token, refresh_token_active, revoke_refresh_token, deployment_policy_profile, list_packs_for_user,
- get_pack, get_pack_row, upsert_pack, create_learner, learner_owned_by_user, load_learner_state,
- save_learner_state, create_evaluator_job, get_evaluator_job, list_evaluator_jobs_for_learner
-)
-from .engine import apply_evidence, recommend_next, build_animation_frames
-from .auth import issue_access_token, issue_refresh_token, issue_service_access_token, decode_token, new_token_id, new_secret
-from .worker import process_job
+from .models import LoginRequest, RefreshRequest, TokenPair, CreateLearnerRequest, LearnerState, MediaRenderRequest
+from .repository import authenticate_user, get_user_by_id, store_refresh_token, refresh_token_active, revoke_refresh_token, list_packs_for_user, get_pack, get_pack_row, create_learner, learner_owned_by_user, load_learner_state, save_learner_state
+from .auth import issue_access_token, issue_refresh_token, decode_token, new_token_id
+from .engine import build_graph_frames, stable_layout
+from .render_bundle import make_render_bundle
-settings = load_settings()
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Didactopus API Prototype")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
-def current_actor(authorization: str = Header(default="")):
+def current_user(authorization: str = Header(default="")):
token = authorization.removeprefix("Bearer ").strip()
payload = decode_token(token) if token else None
- if not payload:
+ if not payload or payload.get("kind") != "access":
raise HTTPException(status_code=401, detail="Unauthorized")
- if payload.get("kind") == "access":
- user = get_user_by_id(int(payload["sub"]))
- if user is None or not user.is_active:
- raise HTTPException(status_code=401, detail="Unauthorized")
- return {"actor_type": "user", "user": user, "scopes": None}
- if payload.get("kind") == "service":
- return {
- "actor_type": "service",
- "service_account_id": int(payload["sub"]),
- "service_account_name": payload.get("service_account_name"),
- "scopes": payload.get("scopes", []),
- }
- raise HTTPException(status_code=401, detail="Unauthorized")
+ user = get_user_by_id(int(payload["sub"]))
+ if user is None or not user.is_active:
+ raise HTTPException(status_code=401, detail="Unauthorized")
+ return user
-def require_admin(actor = Depends(current_actor)):
- if actor["actor_type"] != "user" or actor["user"].role != "admin":
- raise HTTPException(status_code=403, detail="Admin role required")
- return actor["user"]
-
-def audit_service_action(actor, action: str, target: str, outcome: str = "ok", detail: dict | None = None):
- if actor["actor_type"] == "service":
- add_agent_audit_log(
- actor["service_account_id"],
- actor["service_account_name"],
- action,
- target,
- outcome,
- detail or {},
- )
-
-def require_scope(scope: str):
- def inner(actor = Depends(current_actor)):
- if actor["actor_type"] == "user":
- return actor
- scopes = set(actor.get("scopes") or [])
- if scope not in scopes:
- audit_service_action(actor, f"scope_denied:{scope}", "", "denied", {"scope": scope})
- raise HTTPException(status_code=403, detail=f"Missing scope: {scope}")
- return actor
- return inner
-
-def ensure_learner_access(actor, learner_id: str):
- if actor["actor_type"] == "service":
- return
- user = actor["user"]
+def ensure_learner_access(user, learner_id: str):
if user.role == "admin":
return
if not learner_owned_by_user(user.id, learner_id):
- raise HTTPException(status_code=403, detail="Learner not accessible by this actor")
+ raise HTTPException(status_code=403, detail="Learner not accessible by this user")
-def ensure_pack_access(actor, pack_id: str):
+def ensure_pack_access(user, pack_id: str):
row = get_pack_row(pack_id)
if row is None:
raise HTTPException(status_code=404, detail="Pack not found")
- if actor["actor_type"] == "service":
- return row
- user = actor["user"]
if user.role == "admin":
return row
if row.policy_lane == "community":
return row
if row.owner_user_id == user.id:
return row
- raise HTTPException(status_code=403, detail="Pack not accessible by this actor")
+ raise HTTPException(status_code=403, detail="Pack not accessible by this user")
@app.post("/api/login", response_model=TokenPair)
def login(payload: LoginRequest):
@@ -108,14 +52,6 @@ def login(payload: LoginRequest):
store_refresh_token(user.id, token_id)
return TokenPair(access_token=issue_access_token(user.id, user.username, user.role), refresh_token=issue_refresh_token(user.id, user.username, user.role, token_id), username=user.username, role=user.role)
-@app.post("/api/service-accounts/login", response_model=ServiceToken)
-def service_login(payload: ServiceAccountLoginRequest):
- sa = authenticate_service_account(payload.name, payload.secret)
- if sa is None:
- raise HTTPException(status_code=401, detail="Invalid service account credentials")
- scopes = json.loads(sa.scopes_json or "[]")
- return ServiceToken(access_token=issue_service_access_token(sa.id, sa.name, scopes), service_account_name=sa.name, scopes=scopes)
-
@app.post("/api/refresh", response_model=TokenPair)
def refresh(payload: RefreshRequest):
data = decode_token(payload.refresh_token)
@@ -132,189 +68,73 @@ def refresh(payload: RefreshRequest):
store_refresh_token(user.id, new_jti)
return TokenPair(access_token=issue_access_token(user.id, user.username, user.role), refresh_token=issue_refresh_token(user.id, user.username, user.role, new_jti), username=user.username, role=user.role)
-@app.get("/api/deployment-policy")
-def api_deployment_policy(actor = Depends(current_actor)):
- return deployment_policy_profile().model_dump()
-
-@app.get("/api/agent/capabilities", response_model=AgentCapabilityManifest)
-def api_agent_capabilities(actor = Depends(current_actor)):
- return AgentCapabilityManifest()
-
-@app.post("/api/agent/learner-plan", response_model=AgentLearnerPlanResponse)
-def api_agent_learner_plan(payload: AgentLearnerPlanRequest, actor = Depends(require_scope("recommendations:read"))):
- ensure_learner_access(actor, payload.learner_id)
- ensure_pack_access(actor, payload.pack_id)
- state = load_learner_state(payload.learner_id)
- pack = get_pack(payload.pack_id)
- if pack is None:
- raise HTTPException(status_code=404, detail="Pack not found")
- cards = recommend_next(state, pack)
- audit_service_action(actor, "agent_learner_plan", f"{payload.learner_id}:{payload.pack_id}", "ok", {"cards": len(cards)})
- return AgentLearnerPlanResponse(learner_id=payload.learner_id, pack_id=payload.pack_id, next_cards=cards, suggested_actions=["Read learner state", "Choose next card", "Submit evidence", "Refresh recommendations"])
-
-@app.post("/api/admin/service-accounts")
-def api_create_service_account(payload: ServiceAccountCreateRequest, user = Depends(require_admin)):
- secret = new_secret()
- sa = create_service_account(payload.name, user.id, payload.description, payload.scopes, secret)
- return {"id": sa.id, "name": sa.name, "scopes": payload.scopes, "secret": secret}
-
-@app.get("/api/admin/service-accounts")
-def api_list_service_accounts(user = Depends(require_admin)):
- return list_service_accounts()
-
-@app.post("/api/admin/service-accounts/rotate")
-def api_rotate_service_account(payload: ServiceAccountRotateRequest, user = Depends(require_admin)):
- secret = new_secret()
- sa = rotate_service_account_secret(payload.name, secret)
- if sa is None:
- raise HTTPException(status_code=404, detail="Service account not found")
- return {"name": sa.name, "secret": secret}
-
-@app.post("/api/admin/service-accounts/state")
-def api_service_account_state(payload: ServiceAccountStateRequest, name: str, user = Depends(require_admin)):
- sa = set_service_account_active(name, payload.is_active)
- if sa is None:
- raise HTTPException(status_code=404, detail="Service account not found")
- return {"name": sa.name, "is_active": sa.is_active}
-
-@app.get("/api/admin/agent-audit-logs")
-def api_agent_audit_logs(user = Depends(require_admin)):
- return list_agent_audit_logs()
-
@app.get("/api/packs")
-def api_list_packs(actor = Depends(require_scope("packs:read"))):
- user_id = actor["user"].id if actor["actor_type"] == "user" else None
- packs = [p.model_dump() for p in list_packs_for_user(user_id, include_unpublished=(actor["actor_type"] == "user" and actor["user"].role == "admin"))]
- audit_service_action(actor, "packs_list", "packs", "ok", {"count": len(packs)})
- return packs
-
-@app.post("/api/packs")
-def api_upsert_personal_pack(payload: CreatePackRequest, actor = Depends(require_scope("packs:write_personal"))):
- if payload.policy_lane != "personal":
- raise HTTPException(status_code=403, detail="This endpoint is for personal-lane write access")
- if actor["actor_type"] != "user":
- raise HTTPException(status_code=403, detail="Service accounts may not own personal packs in this scaffold")
- upsert_pack(payload.pack, submitted_by_user_id=actor["user"].id, policy_lane="personal", is_published=payload.is_published, change_summary=payload.change_summary)
- return {"ok": True, "pack_id": payload.pack.id, "policy_lane": "personal"}
+def api_list_packs(user = Depends(current_user)):
+ return [p.model_dump() for p in list_packs_for_user(user.id, include_unpublished=(user.role == "admin"))]
@app.post("/api/learners")
-def api_create_learner(payload: CreateLearnerRequest, actor = Depends(require_scope("learners:write"))):
- if actor["actor_type"] != "user":
- raise HTTPException(status_code=403, detail="Service accounts do not create learners in this scaffold")
- create_learner(actor["user"].id, payload.learner_id, payload.display_name)
+def api_create_learner(payload: CreateLearnerRequest, user = Depends(current_user)):
+ create_learner(user.id, payload.learner_id, payload.display_name)
return {"ok": True, "learner_id": payload.learner_id}
@app.get("/api/learners/{learner_id}/state")
-def api_get_learner_state(learner_id: str, actor = Depends(require_scope("learners:read"))):
- ensure_learner_access(actor, learner_id)
- state = load_learner_state(learner_id).model_dump()
- audit_service_action(actor, "learner_state_read", learner_id, "ok", {"records": len(state.get("records", []))})
- return state
+def api_get_learner_state(learner_id: str, user = Depends(current_user)):
+ ensure_learner_access(user, learner_id)
+ return load_learner_state(learner_id).model_dump()
@app.put("/api/learners/{learner_id}/state")
-def api_put_learner_state(learner_id: str, state: LearnerState, actor = Depends(require_scope("learners:write"))):
- ensure_learner_access(actor, learner_id)
+def api_put_learner_state(learner_id: str, state: LearnerState, user = Depends(current_user)):
+ ensure_learner_access(user, learner_id)
if learner_id != state.learner_id:
raise HTTPException(status_code=400, detail="Learner ID mismatch")
- result = save_learner_state(state).model_dump()
- audit_service_action(actor, "learner_state_write", learner_id, "ok", {"records": len(result.get("records", []))})
- return result
+ return save_learner_state(state).model_dump()
-@app.post("/api/learners/{learner_id}/evidence")
-def api_post_evidence(learner_id: str, event: EvidenceEvent, actor = Depends(require_scope("learners:write"))):
- ensure_learner_access(actor, learner_id)
- state = load_learner_state(learner_id)
- state = apply_evidence(state, event)
- result = save_learner_state(state).model_dump()
- audit_service_action(actor, "learner_evidence_post", learner_id, "ok", {"concept_id": event.concept_id})
- return result
-
-@app.get("/api/learners/{learner_id}/recommendations/{pack_id}")
-def api_get_recommendations(learner_id: str, pack_id: str, actor = Depends(require_scope("recommendations:read"))):
- ensure_learner_access(actor, learner_id)
- ensure_pack_access(actor, pack_id)
- state = load_learner_state(learner_id)
+@app.get("/api/packs/{pack_id}/layout")
+def api_pack_layout(pack_id: str, user = Depends(current_user)):
+ ensure_pack_access(user, pack_id)
pack = get_pack(pack_id)
- if pack is None:
- raise HTTPException(status_code=404, detail="Pack not found")
- cards = recommend_next(state, pack)
- audit_service_action(actor, "recommendations_read", f"{learner_id}:{pack_id}", "ok", {"cards": len(cards)})
- return {"cards": cards}
+ return {"pack_id": pack_id, "layout": stable_layout(pack)} if pack else {"pack_id": pack_id, "layout": {}}
-@app.post("/api/learners/{learner_id}/evaluator-jobs", response_model=EvaluatorJobStatus)
-def api_submit_evaluator_job(learner_id: str, payload: EvaluatorSubmission, background_tasks: BackgroundTasks, actor = Depends(require_scope("evaluators:submit"))):
- ensure_learner_access(actor, learner_id)
- ensure_pack_access(actor, payload.pack_id)
- job_id = create_evaluator_job(learner_id, payload.pack_id, payload.concept_id, payload.submitted_text)
- background_tasks.add_task(process_job, job_id)
- audit_service_action(actor, "evaluator_job_submit", str(job_id), "ok", {"learner_id": learner_id, "pack_id": payload.pack_id})
- return EvaluatorJobStatus(job_id=job_id, status="queued")
-
-@app.get("/api/evaluator-jobs/{job_id}", response_model=EvaluatorJobStatus)
-def api_get_evaluator_job(job_id: int, actor = Depends(require_scope("evaluators:read"))):
- job = get_evaluator_job(job_id)
- if job is None:
- raise HTTPException(status_code=404, detail="Job not found")
- audit_service_action(actor, "evaluator_job_read", str(job_id), "ok", {})
- return EvaluatorJobStatus(job_id=job.id, status=job.status, result_score=job.result_score, result_confidence_hint=job.result_confidence_hint, result_notes=job.result_notes)
-
-@app.get("/api/learners/{learner_id}/evaluator-history")
-def api_get_evaluator_history(learner_id: str, actor = Depends(require_scope("evaluators:read"))):
- ensure_learner_access(actor, learner_id)
- jobs = list_evaluator_jobs_for_learner(learner_id)
- audit_service_action(actor, "evaluator_history_read", learner_id, "ok", {"jobs": len(jobs)})
- return [{"job_id": j.id, "status": j.status, "concept_id": j.concept_id, "result_score": j.result_score, "result_confidence_hint": j.result_confidence_hint, "result_notes": j.result_notes} for j in jobs]
-
-@app.post("/api/learner-runs")
-def api_create_learner_run(payload: LearnerRunCreateRequest, actor = Depends(current_actor)):
- ensure_learner_access(actor, payload.learner_id)
- ensure_pack_access(actor, payload.pack_id)
- actor_name = actor["service_account_name"] if actor["actor_type"] == "service" else actor["user"].username
- run_id = create_learner_run(payload.learner_id, payload.pack_id, payload.actor_kind, actor_name, payload.title)
- audit_service_action(actor, "learner_run_create", str(run_id), "ok", {"learner_id": payload.learner_id, "pack_id": payload.pack_id})
- return {"run_id": run_id}
-
-@app.post("/api/learner-runs/{run_id}/complete")
-def api_complete_learner_run(run_id: int, actor = Depends(current_actor)):
- row = end_learner_run(run_id)
- if row is None:
- raise HTTPException(status_code=404, detail="Run not found")
- audit_service_action(actor, "learner_run_complete", str(run_id), "ok", {})
- return {"run_id": run_id, "status": row.status}
-
-@app.get("/api/learners/{learner_id}/runs")
-def api_list_learner_runs(learner_id: str, actor = Depends(current_actor)):
- ensure_learner_access(actor, learner_id)
- return list_learner_runs(learner_id)
-
-@app.post("/api/workflow-events")
-def api_add_workflow_event(payload: WorkflowEventCreateRequest, actor = Depends(current_actor)):
- ensure_learner_access(actor, payload.learner_id)
- event_id = add_workflow_event(payload.run_id, payload.learner_id, payload.event_type, payload.concept_id, payload.timestamp, payload.detail)
- audit_service_action(actor, "workflow_event_add", str(event_id), "ok", {"run_id": payload.run_id, "event_type": payload.event_type})
- return {"event_id": event_id}
-
-@app.get("/api/learner-runs/{run_id}/workflow-events")
-def api_list_workflow_events(run_id: int, actor = Depends(current_actor)):
- events = list_workflow_events(run_id)
- audit_service_action(actor, "workflow_events_read", str(run_id), "ok", {"count": len(events)})
- return events
-
-@app.get("/api/learners/{learner_id}/animation/{pack_id}")
-def api_learning_animation(learner_id: str, pack_id: str, actor = Depends(current_actor)):
- ensure_learner_access(actor, learner_id)
- ensure_pack_access(actor, pack_id)
+@app.get("/api/learners/{learner_id}/graph-animation/{pack_id}")
+def api_graph_animation(learner_id: str, pack_id: str, user = Depends(current_user)):
+ ensure_learner_access(user, learner_id)
+ ensure_pack_access(user, pack_id)
pack = get_pack(pack_id)
state = load_learner_state(learner_id)
- frames = build_animation_frames(state)
- audit_service_action(actor, "learning_animation_read", f"{learner_id}:{pack_id}", "ok", {"frames": len(frames)})
+ frames = build_graph_frames(state, pack)
return {
"learner_id": learner_id,
"pack_id": pack_id,
"pack_title": pack.title if pack else "",
"frames": frames,
- "concepts": [c.id for c in pack.concepts] if pack else [],
+ "concepts": [{"id": c.id, "title": c.title, "prerequisites": c.prerequisites, "cross_pack_links": [l.model_dump() for l in c.cross_pack_links]} for c in pack.concepts] if pack else [],
+ }
+
+@app.post("/api/learners/{learner_id}/render-bundle/{pack_id}")
+def api_render_bundle(learner_id: str, pack_id: str, payload: MediaRenderRequest, user = Depends(current_user)):
+ ensure_learner_access(user, learner_id)
+ ensure_pack_access(user, pack_id)
+ pack = get_pack(pack_id)
+ state = load_learner_state(learner_id)
+ animation = {
+ "learner_id": learner_id,
+ "pack_id": pack_id,
+ "pack_title": pack.title if pack else "",
+ "frames": build_graph_frames(state, pack),
+ }
+ base = Path(tempfile.mkdtemp(prefix="didactopus_render_"))
+ payload_json = base / "animation_payload.json"
+ payload_json.write_text(json.dumps(animation, indent=2), encoding="utf-8")
+ out_dir = base / "bundle"
+ make_render_bundle(str(payload_json), str(out_dir), fps=payload.fps, fmt=payload.format)
+ return {
+ "bundle_dir": str(out_dir),
+ "payload_json": str(payload_json),
+ "manifest": str(out_dir / "render_manifest.json"),
+ "script": str(out_dir / "render.sh"),
+ "format": payload.format,
+ "fps": payload.fps,
}
def main():
- uvicorn.run(app, host=settings.host, port=settings.port)
+ uvicorn.run(app, host="127.0.0.1", port=8011)
diff --git a/src/didactopus/auth.py b/src/didactopus/auth.py
index c7b3117..54745ac 100644
--- a/src/didactopus/auth.py
+++ b/src/didactopus/auth.py
@@ -25,9 +25,6 @@ def issue_access_token(user_id: int, username: str, role: str) -> str:
def issue_refresh_token(user_id: int, username: str, role: str, token_id: str) -> str:
return _encode_token({"sub": str(user_id), "username": username, "role": role, "kind": "refresh", "jti": token_id}, timedelta(days=14))
-def issue_service_access_token(service_account_id: int, name: str, scopes: list[str]) -> str:
- return _encode_token({"sub": str(service_account_id), "service_account_name": name, "kind": "service", "scopes": scopes}, timedelta(hours=8))
-
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])
@@ -36,6 +33,3 @@ def decode_token(token: str) -> dict | None:
def new_token_id() -> str:
return secrets.token_urlsafe(24)
-
-def new_secret() -> str:
- return secrets.token_urlsafe(24)
diff --git a/src/didactopus/config.py b/src/didactopus/config.py
index 88e5551..1f71733 100644
--- a/src/didactopus/config.py
+++ b/src/didactopus/config.py
@@ -8,7 +8,6 @@ class Settings(BaseModel):
port: int = int(os.getenv("DIDACTOPUS_PORT", "8011"))
jwt_secret: str = os.getenv("DIDACTOPUS_JWT_SECRET", "change-me")
jwt_algorithm: str = "HS256"
- deployment_policy_profile: str = os.getenv("DIDACTOPUS_POLICY_PROFILE", "single_user")
def load_settings() -> Settings:
return Settings()
diff --git a/src/didactopus/engine.py b/src/didactopus/engine.py
index 1531303..6b7c236 100644
--- a/src/didactopus/engine.py
+++ b/src/didactopus/engine.py
@@ -1,76 +1,110 @@
from __future__ import annotations
-from .models import LearnerState, EvidenceEvent, MasteryRecord, PackData
+from collections import defaultdict
+from .models import LearnerState, PackData
-def get_record(state: LearnerState, concept_id: str, dimension: str = "mastery") -> MasteryRecord | None:
- for rec in state.records:
- if rec.concept_id == concept_id and rec.dimension == dimension:
- return rec
- return None
+def concept_depths(pack: PackData) -> dict[str, int]:
+ concept_map = {c.id: c for c in pack.concepts}
+ memo = {}
+ def depth(cid: str) -> int:
+ if cid in memo:
+ return memo[cid]
+ c = concept_map[cid]
+ if not c.prerequisites:
+ memo[cid] = 0
+ else:
+ memo[cid] = 1 + max(depth(pid) for pid in c.prerequisites if pid in concept_map)
+ return memo[cid]
+ for cid in concept_map:
+ depth(cid)
+ return memo
-def apply_evidence(state: LearnerState, event: EvidenceEvent, decay: float = 0.05, reinforcement: float = 0.25) -> LearnerState:
- rec = get_record(state, event.concept_id, event.dimension)
- if rec is None:
- rec = MasteryRecord(concept_id=event.concept_id, dimension=event.dimension, score=0.0, confidence=0.0, evidence_count=0, last_updated=event.timestamp)
- state.records.append(rec)
- weight = max(0.05, min(1.0, event.confidence_hint))
- rec.score = ((rec.score * rec.evidence_count) + (event.score * weight)) / max(1, rec.evidence_count + 1)
- rec.confidence = min(1.0, max(0.0, rec.confidence * (1.0 - decay) + reinforcement * weight + 0.10 * max(0.0, min(1.0, event.score))))
- rec.evidence_count += 1
- rec.last_updated = event.timestamp
- state.history.append(event)
- return state
+def stable_layout(pack: PackData, width: int = 900, height: int = 520):
+ depths = concept_depths(pack)
+ layers = defaultdict(list)
+ for c in pack.concepts:
+ layers[depths.get(c.id, 0)].append(c)
+ positions = {}
+ max_depth = max(layers.keys()) if layers else 0
+ for d in sorted(layers):
+ nodes = sorted(layers[d], key=lambda c: c.id)
+ y = 90 + d * ((height - 160) / max(1, max_depth))
+ for idx, node in enumerate(nodes):
+ if node.position is not None:
+ positions[node.id] = {"x": node.position.x, "y": node.position.y, "source": "pack_authored"}
+ else:
+ spacing = width / (len(nodes) + 1)
+ x = spacing * (idx + 1)
+ positions[node.id] = {"x": x, "y": y, "source": "auto_layered"}
+ return positions
-def prereqs_satisfied(state: LearnerState, concept, min_score: float = 0.65, min_confidence: float = 0.45) -> bool:
+def prereqs_satisfied(scores: dict[str, float], concept, min_score: float = 0.65) -> bool:
for pid in concept.prerequisites:
- rec = get_record(state, pid, concept.masteryDimension)
- if rec is None or rec.score < min_score or rec.confidence < min_confidence:
+ if scores.get(pid, 0.0) < min_score:
return False
return True
-def concept_status(state: LearnerState, concept, min_score: float = 0.65, min_confidence: float = 0.45) -> str:
- rec = get_record(state, concept.id, concept.masteryDimension)
- if rec and rec.score >= min_score and rec.confidence >= min_confidence:
+def concept_status(scores: dict[str, float], concept, min_score: float = 0.65) -> str:
+ score = scores.get(concept.id, 0.0)
+ if score >= min_score:
return "mastered"
- if prereqs_satisfied(state, concept, min_score, min_confidence):
- return "active" if rec else "available"
+ if prereqs_satisfied(scores, concept, min_score):
+ return "active" if score > 0 else "available"
return "locked"
-def recommend_next(state: LearnerState, pack: PackData) -> list[dict]:
- cards = []
- for concept in pack.concepts:
- status = concept_status(state, concept)
- rec = get_record(state, concept.id, concept.masteryDimension)
- if status in {"available", "active"}:
- cards.append({
- "id": concept.id,
- "title": f"Work on {concept.title}",
- "minutes": 15 if status == "available" else 10,
- "reason": "Prerequisites are satisfied, so this is the best next unlock." if status == "available" else "You have started this concept, but mastery is not yet secure.",
- "why": [
- "Prerequisite check passed",
- f"Current score: {rec.score:.2f}" if rec else "No evidence recorded yet",
- f"Current confidence: {rec.confidence:.2f}" if rec else "Confidence starts after your first exercise",
- ],
- "reward": concept.exerciseReward or f"{concept.title} progress recorded",
- "conceptId": concept.id,
- "scoreHint": 0.82 if status == "available" else 0.76,
- "confidenceHint": 0.72 if status == "available" else 0.55,
- })
- return cards[:4]
-
-def build_animation_frames(state: LearnerState):
- concepts = sorted({ev.concept_id for ev in state.history} | {r.concept_id for r in state.records})
+def build_graph_frames(state: LearnerState, pack: PackData):
+ concepts = {c.id: c for c in pack.concepts}
+ layout = stable_layout(pack)
+ scores = {c.id: 0.0 for c in pack.concepts}
frames = []
- running = {c: 0.0 for c in concepts}
- for idx, ev in enumerate(sorted(state.history, key=lambda x: x.timestamp)):
- running[ev.concept_id] = ev.score
+ history = sorted(state.history, key=lambda x: x.timestamp)
+ static_edges = [{"source": pre, "target": c.id, "kind": "prerequisite"} for c in pack.concepts for pre in c.prerequisites]
+ static_cross = [{
+ "source": c.id,
+ "target_pack_id": link.target_pack_id,
+ "target_concept_id": link.target_concept_id,
+ "relationship": link.relationship,
+ "kind": "cross_pack"
+ } for c in pack.concepts for link in c.cross_pack_links]
+ for idx, ev in enumerate(history):
+ if ev.concept_id in scores:
+ scores[ev.concept_id] = ev.score
+ nodes = []
+ for cid, concept in concepts.items():
+ score = scores.get(cid, 0.0)
+ status = concept_status(scores, concept)
+ pos = layout[cid]
+ nodes.append({
+ "id": cid,
+ "title": concept.title,
+ "score": score,
+ "status": status,
+ "size": 20 + int(score * 30),
+ "x": pos["x"],
+ "y": pos["y"],
+ "layout_source": pos["source"],
+ })
frames.append({
"index": idx,
"timestamp": ev.timestamp,
"event_kind": ev.kind,
- "concept_id": ev.concept_id,
- "scores": dict(running),
+ "focus_concept_id": ev.concept_id,
+ "nodes": nodes,
+ "edges": static_edges,
+ "cross_pack_links": static_cross,
})
if not frames:
- frames.append({"index": 0, "timestamp": "", "event_kind": "empty", "concept_id": "", "scores": dict(running)})
+ nodes = []
+ for c in pack.concepts:
+ pos = layout[c.id]
+ nodes.append({
+ "id": c.id,
+ "title": c.title,
+ "score": 0.0,
+ "status": "available" if not c.prerequisites else "locked",
+ "size": 20,
+ "x": pos["x"],
+ "y": pos["y"],
+ "layout_source": pos["source"],
+ })
+ frames.append({"index": 0, "timestamp": "", "event_kind": "empty", "focus_concept_id": "", "nodes": nodes, "edges": static_edges, "cross_pack_links": static_cross})
return frames
diff --git a/src/didactopus/export_svg.py b/src/didactopus/export_svg.py
index e3e4a68..1bb04a0 100644
--- a/src/didactopus/export_svg.py
+++ b/src/didactopus/export_svg.py
@@ -18,6 +18,10 @@ def frame_to_svg(frame: dict, width: int = 960, height: int = 560) -> str:
dst = next((n for n in frame["nodes"] if n["id"] == edge["target"]), None)
if src and dst:
parts.append(f'