from __future__ import annotations import re from collections import defaultdict from .course_schema import NormalizedDocument, NormalizedCourse, Module, Lesson, TopicBundle, ConceptCandidate def slugify(text: str) -> str: cleaned = re.sub(r"[^a-zA-Z0-9]+", "-", text.strip().lower()).strip("-") return cleaned or "untitled" def extract_key_terms(text: str, min_term_length: int = 4, max_terms: int = 8) -> list[str]: candidates = re.findall(r"\b[A-Z][A-Za-z0-9\-]{%d,}\b" % (min_term_length - 1), text) seen = set() out = [] for term in candidates: if term not in seen: seen.add(term) out.append(term) if len(out) >= max_terms: break return out def document_to_course(doc: NormalizedDocument, course_title: str) -> NormalizedCourse: # Conservative mapping: each section becomes a lesson; all lessons go into one module. lessons = [] for section in doc.sections: body = section.body.strip() lines = body.splitlines() objectives = [] exercises = [] for line in lines: low = line.lower().strip() if low.startswith("objective:"): objectives.append(line.split(":", 1)[1].strip()) if low.startswith("exercise:"): exercises.append(line.split(":", 1)[1].strip()) lessons.append( Lesson( title=section.heading.strip() or "Untitled Lesson", body=body, objectives=objectives, exercises=exercises, key_terms=extract_key_terms(section.heading + "\n" + body), source_refs=[doc.source_path], ) ) module = Module(title=f"Imported from {doc.source_type.upper()}", lessons=lessons) return NormalizedCourse(title=course_title, modules=[module], source_records=[doc]) def build_topic_bundle(topic_title: str, courses: list[NormalizedCourse]) -> TopicBundle: return TopicBundle(topic_title=topic_title, courses=courses) def merge_courses_into_topic_course(topic_bundle: TopicBundle, merge_same_named_lessons: bool = True) -> NormalizedCourse: modules_by_title: dict[str, Module] = {} source_records = [] for course in topic_bundle.courses: source_records.extend(course.source_records) for module in course.modules: target_module = modules_by_title.setdefault(module.title, Module(title=module.title, lessons=[])) if merge_same_named_lessons: lesson_map = {lesson.title: lesson for lesson in target_module.lessons} for lesson in module.lessons: if lesson.title in lesson_map: existing = lesson_map[lesson.title] if lesson.body and lesson.body not in existing.body: existing.body = (existing.body + "\n\n" + lesson.body).strip() for x in lesson.objectives: if x not in existing.objectives: existing.objectives.append(x) for x in lesson.exercises: if x not in existing.exercises: existing.exercises.append(x) for x in lesson.key_terms: if x not in existing.key_terms: existing.key_terms.append(x) for x in lesson.source_refs: if x not in existing.source_refs: existing.source_refs.append(x) else: target_module.lessons.append(lesson) else: target_module.lessons.extend(module.lessons) return NormalizedCourse(title=topic_bundle.topic_title, modules=list(modules_by_title.values()), source_records=source_records) def extract_concept_candidates(course: NormalizedCourse) -> list[ConceptCandidate]: concepts = [] seen_ids = set() for module in course.modules: for lesson in module.lessons: cid = slugify(lesson.title) if cid not in seen_ids: seen_ids.add(cid) concepts.append( ConceptCandidate( id=cid, title=lesson.title, description=lesson.body[:240].strip(), source_modules=[module.title], source_lessons=[lesson.title], source_courses=list(lesson.source_refs), mastery_signals=list(lesson.objectives[:3] or lesson.exercises[:2]), ) ) for term in lesson.key_terms: tid = slugify(term) if tid in seen_ids: continue seen_ids.add(tid) concepts.append( ConceptCandidate( id=tid, title=term, description=f"Candidate concept extracted from lesson '{lesson.title}'.", source_modules=[module.title], source_lessons=[lesson.title], source_courses=list(lesson.source_refs), mastery_signals=list(lesson.objectives[:2]), ) ) return concepts