Updated source adapters.

This commit is contained in:
welsberr 2026-04-30 10:17:21 +00:00
parent 7ef6f4dc3f
commit 88a547463d
7 changed files with 649 additions and 1 deletions

View File

@ -8,8 +8,10 @@ from .base import get_source_adapter, list_source_adapters
from . import llmwiki # noqa: F401 from . import llmwiki # noqa: F401
from . import polypaper # noqa: F401 from . import polypaper # noqa: F401
from . import doclift_bundle # noqa: F401 from . import doclift_bundle # noqa: F401
from . import indexcc # noqa: F401
from . import markdown_notes # noqa: F401 from . import markdown_notes # noqa: F401
from . import transcript # noqa: F401 from . import transcript # noqa: F401
from . import didactopus_pack # noqa: F401 from . import didactopus_pack # noqa: F401
from . import pandasthumb_mt # noqa: F401
__all__ = ["get_source_adapter", "list_source_adapters"] __all__ = ["get_source_adapter", "list_source_adapters"]

View File

@ -0,0 +1,199 @@
from __future__ import annotations
from hashlib import sha256
import json
import re
from pathlib import Path
from typing import Any
from .base import DiscoveredImportSource, StructuredImportRows, register_source_adapter
SECTION_RE = re.compile(r"^##\s+(.*)$", re.M)
def _site_root(root: Path) -> Path:
candidate = root / "site2_src" / "content" / "indexcc"
if candidate.is_dir():
return candidate
if (root / "content" / "indexcc").is_dir():
return root / "content" / "indexcc"
if root.name == "indexcc" and root.is_dir():
return root
return root
def _discover_md_files(base: Path) -> list[Path]:
if not base.exists():
return []
if base.is_file():
return [base] if base.suffix.lower() == ".md" else []
return sorted(path for path in base.rglob("*.md") if path.is_file())
def _read_meta(md_path: Path) -> dict[str, Any]:
meta_path = md_path.with_suffix(".meta.json")
if not meta_path.exists():
return {}
return json.loads(meta_path.read_text(encoding="utf-8"))
def _split_sections(text: str) -> dict[str, str]:
lines = text.splitlines()
current = "Body"
sections: dict[str, list[str]] = {current: []}
for line in lines:
match = SECTION_RE.match(line)
if match:
current = match.group(1).strip()
sections.setdefault(current, [])
continue
sections.setdefault(current, []).append(line)
return {key: "\n".join(value).strip() for key, value in sections.items() if "\n".join(value).strip()}
class IndexCcSourceAdapter:
name = "indexcc"
def detect(self, root: str | Path) -> bool:
base = _site_root(Path(root))
if not base.is_dir():
return False
md_files = _discover_md_files(base)
if not md_files:
return False
return any(str(_read_meta(path).get("page_kind", "")) == "claim_entry" for path in md_files)
def discover(self, root: str | Path) -> list[DiscoveredImportSource]:
base = _site_root(Path(root))
rows: list[DiscoveredImportSource] = []
for path in _discover_md_files(base):
rows.append(
DiscoveredImportSource(
path=path,
relative_path=path.relative_to(base).as_posix(),
source_kind="indexcc",
artifact_kind="indexcc_entry",
is_text=True,
metadata={"corpus": "indexcc"},
)
)
return rows
def import_intent(self) -> str:
return "grounded_knowledge"
def build_rows(self, context, sources: list[DiscoveredImportSource]) -> StructuredImportRows | None:
artifact_rows: list[dict[str, Any]] = []
observation_rows: list[dict[str, Any]] = []
claim_rows: list[dict[str, Any]] = []
concept_rows: list[dict[str, Any]] = []
relation_rows: list[dict[str, Any]] = []
for index, source in enumerate(sources, start=1):
meta = _read_meta(source.path)
text = source.path.read_text(encoding="utf-8")
sections = _split_sections(text)
title = str(meta.get("title") or source.path.stem)
claim_text = sections.get("Claim", "")
response_text = sections.get("Response", "")
references_text = sections.get("References", "")
further_text = sections.get("Further Reading", "")
artifact_id = f"ia_{sha256(source.relative_path.encode('utf-8')).hexdigest()[:12]}"
artifact_rows.append(
{
"artifact_id": artifact_id,
"import_id": context.import_id,
"artifact_kind": source.artifact_kind,
"path": source.relative_path,
"title": title,
"sha256": sha256(source.path.read_bytes()).hexdigest(),
"created_at": context.imported_at,
"metadata": {
"corpus": "indexcc",
"document_kind": meta.get("page_kind", "claim_entry"),
"author": meta.get("author", ""),
"legacy_source": meta.get("legacy_source", ""),
"section_label": meta.get("section_label", ""),
"page_kind": meta.get("page_kind", ""),
},
"current_status": "draft",
}
)
body_sections = [
("Claim", claim_text),
("Response", response_text),
("References", references_text),
("Further Reading", further_text),
]
for sec_index, (section_name, section_text) in enumerate(body_sections, start=1):
if not section_text:
continue
observation_rows.append(
{
"observation_id": f"obs_{artifact_id}_{sec_index}",
"import_id": context.import_id,
"artifact_id": artifact_id,
"role": "summary" if section_name != "Claim" else "claim",
"text": section_text,
"origin_path": source.relative_path,
"origin_section": section_name,
"line_start": 0,
"line_end": 0,
"source_url": str(meta.get("legacy_source") or ""),
"metadata": {
"corpus": "indexcc",
"document_kind": meta.get("page_kind", "claim_entry"),
"section_name": section_name,
"author": meta.get("author", ""),
},
"grounding_status": "grounded",
"support_kind": "direct_source",
"confidence_hint": 0.88 if section_name == "Claim" else 0.8,
"current_status": "draft",
}
)
claim_obs_id = f"obs_{artifact_id}_1" if claim_text else ""
if claim_text:
claim_rows.append(
{
"claim_id": f"clm_{artifact_id}",
"import_id": context.import_id,
"claim_text": claim_text,
"claim_kind": "claim_entry",
"source_observation_ids": [claim_obs_id],
"supporting_fragment_ids": [],
"concept_ids": [f"concept::{source.path.stem.lower()}"],
"contradicts_claim_ids": [],
"supersedes_claim_ids": [],
"confidence_hint": 0.88,
"grounding_status": "grounded",
"current_status": "triaged",
}
)
concept_rows.append(
{
"concept_id": f"concept::{source.path.stem.lower()}",
"import_id": context.import_id,
"title": title,
"aliases": [source.path.stem.upper()],
"description": meta.get("description", "Imported Index to Creationist Claims entry."),
"source_artifact_ids": [artifact_id],
"current_status": "triaged",
}
)
return StructuredImportRows(
artifact_rows=artifact_rows,
fragment_rows=[],
observation_rows=observation_rows,
claim_rows=claim_rows,
concept_rows=concept_rows,
relation_rows=relation_rows,
)
register_source_adapter(IndexCcSourceAdapter())

View File

@ -0,0 +1,291 @@
from __future__ import annotations
from dataclasses import dataclass, field
from hashlib import sha256
from html.parser import HTMLParser
from pathlib import Path
import re
from typing import Any
from .base import DiscoveredImportSource, StructuredImportRows, register_source_adapter
ARTIFACT_SUFFIXES = {".html", ".htm"}
ARTICLE_TITLE_RE = re.compile(r'<h1 class="post-title">(.*?)</h1>', re.I | re.S)
BYLINE_RE = re.compile(
r'<p class="post-meta">\s*Posted\s+(?P<date>.*?)\s+by\s+<span class="post-author">(?P<author>.*?)</span>',
re.I | re.S,
)
COMMENT_META_RE = re.compile(
r'<p class="comment-meta">\s*<span class="comment-author">(?P<author>.*?)</span>\s*&middot;\s*(?P<date>.*?)</p>',
re.I | re.S,
)
COMMENTS_SECTION_RE = re.compile(r'<section class="comments-section">(.*?)</section>', re.I | re.S)
def _strip_tags(text: str) -> str:
return re.sub(r"(?s)<[^>]+>", " ", text)
def _normalize_space(text: str) -> str:
text = text.replace("\r", "\n")
text = re.sub(r"[ \t]+\n", "\n", text)
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]{2,}", " ", text)
return text.strip()
def _fragment_to_text(fragment: str) -> str:
fragment = re.sub(r"(?is)<script\b.*?</script>", " ", fragment)
fragment = re.sub(r"(?is)<style\b.*?</style>", " ", fragment)
fragment = re.sub(
r'(?is)<a\b[^>]*href=["\']([^"\']+)["\'][^>]*>(.*?)</a>',
lambda match: f"{_strip_tags(match.group(2)).strip()} ({match.group(1).strip()})".strip(),
fragment,
)
fragment = re.sub(r"(?i)<br\s*/?>", "\n", fragment)
fragment = re.sub(r"(?i)</p\s*>", "\n\n", fragment)
fragment = re.sub(r"(?i)<p\b[^>]*>", "", fragment)
fragment = re.sub(r"(?i)</div\s*>", "\n", fragment)
fragment = re.sub(r"(?i)<div\b[^>]*>", "", fragment)
fragment = re.sub(r"(?i)</section\s*>", "\n", fragment)
fragment = re.sub(r"(?i)<section\b[^>]*>", "", fragment)
fragment = re.sub(r"(?i)</blockquote\s*>", "\n", fragment)
fragment = re.sub(r"(?i)<blockquote\b[^>]*>", "\n> ", fragment)
fragment = re.sub(r"(?i)</li\s*>", "\n", fragment)
fragment = re.sub(r"(?i)<li\b[^>]*>", "\n- ", fragment)
fragment = re.sub(r"(?i)<ul\b[^>]*>|</ul\s*>", "\n", fragment)
fragment = re.sub(r"(?i)<ol\b[^>]*>|</ol\s*>", "\n", fragment)
fragment = _strip_tags(fragment)
fragment = re.sub(r"\s*\n\s*", "\n", fragment)
return _normalize_space(fragment.replace("\xa0", " "))
def _id_from_path(relative_path: str) -> str:
return f"pt_{sha256(relative_path.encode('utf-8')).hexdigest()[:12]}"
def _site_root(root: Path) -> Path:
candidate = root / "public_html"
if (candidate / "archives").is_dir():
return candidate
return root
def _discover_html_files(site_root: Path) -> list[Path]:
archives = site_root / "archives"
if not archives.is_dir():
return []
rows = []
for path in sorted(archives.rglob("*")):
if not path.is_file() or path.suffix.lower() not in ARTIFACT_SUFFIXES:
continue
if path.name.lower() == "index.html":
continue
rows.append(path)
return rows
def _extract_article(html_text: str, relative_path: str) -> dict[str, Any] | None:
title_match = ARTICLE_TITLE_RE.search(html_text)
body_match = re.search(r'<div class="post-body">(.*)', html_text, re.I | re.S)
if title_match is None or body_match is None:
return None
meta_match = BYLINE_RE.search(html_text)
comments_match = COMMENTS_SECTION_RE.search(html_text)
body_html = body_match.group(1)
if comments_match is not None:
body_html = body_html[: comments_match.start() - body_match.end()]
body_text = _fragment_to_text(body_html)
title = _fragment_to_text(title_match.group(1))
author = _fragment_to_text(meta_match.group("author")) if meta_match else ""
published_at = _fragment_to_text(meta_match.group("date")) if meta_match else ""
canonical_url = "/" + relative_path.lstrip("/")
return {
"document_id": _id_from_path(relative_path),
"title": title,
"author": author,
"published_at": published_at,
"canonical_url": canonical_url,
"body_text": body_text,
}
def _extract_comments(html_text: str, relative_path: str, parent_document_id: str) -> list[dict[str, Any]]:
comments_match = COMMENTS_SECTION_RE.search(html_text)
if comments_match is None:
return []
comments_html = comments_match.group(1)
rows: list[dict[str, Any]] = []
starts = list(re.finditer(r'<div class="comment" id="comment-(?P<comment_id>\d+)">', comments_html, re.I))
for index, match in enumerate(starts):
start = match.end()
end = starts[index + 1].start() if index + 1 < len(starts) else len(comments_html)
chunk = comments_html[start:end]
meta_match = COMMENT_META_RE.search(chunk)
body_match = re.search(r'<div class="comment-body">(.*)</div>', chunk, re.I | re.S)
if body_match is None:
continue
rows.append(
{
"document_id": f"{parent_document_id}__comment_{match.group('comment_id')}",
"parent_document_id": parent_document_id,
"comment_id": match.group("comment_id"),
"document_kind": "comment",
"comment_author": _fragment_to_text(meta_match.group("author")) if meta_match else "",
"comment_date": _fragment_to_text(meta_match.group("date")) if meta_match else "",
"canonical_url": "/" + relative_path.lstrip("/"),
"body_text": _fragment_to_text(body_match.group(1)),
}
)
return rows
class PandasThumbMtSourceAdapter:
name = "pandasthumb_mt"
def detect(self, root: str | Path) -> bool:
site_root = _site_root(Path(root))
return (site_root / "archives").is_dir() and (site_root / "index.html").exists()
def discover(self, root: str | Path) -> list[DiscoveredImportSource]:
site_root = _site_root(Path(root))
rows: list[DiscoveredImportSource] = []
for path in _discover_html_files(site_root):
rows.append(
DiscoveredImportSource(
path=path,
relative_path=path.relative_to(site_root).as_posix(),
source_kind="pandasthumb_mt",
artifact_kind="pandasthumb_mt_page",
is_text=True,
metadata={"corpus": "pandasthumb_mt"},
)
)
return rows
def import_intent(self) -> str:
return "grounded_knowledge"
def build_rows(self, context, sources: list[DiscoveredImportSource]) -> StructuredImportRows | None:
artifact_rows: list[dict[str, Any]] = []
observation_rows: list[dict[str, Any]] = []
claim_rows: list[dict[str, Any]] = []
concept_rows: list[dict[str, Any]] = []
relation_rows: list[dict[str, Any]] = []
for source in sources:
html_text = source.path.read_text(encoding="utf-8", errors="replace")
article = _extract_article(html_text, source.relative_path)
if article is None:
continue
artifact_id = _id_from_path(source.relative_path)
artifact_rows.append(
{
"artifact_id": artifact_id,
"import_id": context.import_id,
"artifact_kind": source.artifact_kind,
"path": source.relative_path,
"title": article["title"],
"sha256": sha256(source.path.read_bytes()).hexdigest(),
"created_at": context.imported_at,
"metadata": {
"corpus": "pandasthumb_mt",
"document_kind": "article",
"author": article["author"],
"published_at": article["published_at"],
"canonical_url": article["canonical_url"],
},
"current_status": "draft",
}
)
observation_rows.append(
{
"observation_id": f"obs_{artifact_id}_body",
"import_id": context.import_id,
"artifact_id": artifact_id,
"role": "summary",
"text": article["body_text"],
"origin_path": source.relative_path,
"origin_section": article["title"],
"line_start": 0,
"line_end": 0,
"source_url": article["canonical_url"],
"metadata": {
"corpus": "pandasthumb_mt",
"document_kind": "article",
"author": article["author"],
"published_at": article["published_at"],
},
"grounding_status": "grounded",
"support_kind": "direct_source",
"confidence_hint": 0.75,
"current_status": "draft",
}
)
for comment in _extract_comments(html_text, source.relative_path, artifact_id):
comment_artifact_id = comment["document_id"]
artifact_rows.append(
{
"artifact_id": comment_artifact_id,
"import_id": context.import_id,
"artifact_kind": "pandasthumb_mt_comment",
"path": source.relative_path,
"title": f"{article['title']} comment {comment['comment_id']}",
"sha256": sha256(
f"{source.relative_path}#{comment['comment_id']}".encode("utf-8")
).hexdigest(),
"created_at": context.imported_at,
"metadata": {
"corpus": "pandasthumb_mt",
"document_kind": "comment",
"parent_document_id": artifact_id,
"comment_id": comment["comment_id"],
"comment_author": comment["comment_author"],
"comment_date": comment["comment_date"],
"canonical_url": comment["canonical_url"],
},
"current_status": "draft",
}
)
observation_rows.append(
{
"observation_id": f"obs_{comment_artifact_id}_body",
"import_id": context.import_id,
"artifact_id": comment_artifact_id,
"role": "summary",
"text": comment["body_text"],
"origin_path": source.relative_path,
"origin_section": f"comment {comment['comment_id']}",
"line_start": 0,
"line_end": 0,
"source_url": comment["canonical_url"],
"metadata": {
"corpus": "pandasthumb_mt",
"document_kind": "comment",
"parent_document_id": artifact_id,
"comment_id": comment["comment_id"],
"comment_author": comment["comment_author"],
"comment_date": comment["comment_date"],
},
"grounding_status": "grounded",
"support_kind": "direct_source",
"confidence_hint": 0.7,
"current_status": "draft",
}
)
return StructuredImportRows(
artifact_rows=artifact_rows,
fragment_rows=[],
observation_rows=observation_rows,
claim_rows=claim_rows,
concept_rows=concept_rows,
relation_rows=relation_rows,
)
register_source_adapter(PandasThumbMtSourceAdapter())

View File

@ -121,6 +121,66 @@ def search_claims(
} }
def _artifact_corpus(artifact) -> str:
corpus = artifact.metadata.get("corpus") if isinstance(getattr(artifact, "metadata", None), dict) else ""
return str(corpus or "")
def search_documents(
store_dir: str | Path,
text: str,
corpora: list[str] | None = None,
include_rejected: bool = False,
limit: int = 20,
) -> dict[str, Any]:
store = GroundRecallStore(store_dir)
artifacts = {item.artifact_id: item for item in store.list_artifacts()}
observations_by_artifact: dict[str, list[Any]] = {}
for observation in store.list_observations():
observations_by_artifact.setdefault(observation.artifact_id, []).append(observation)
active_corpora = {value for value in (corpora or []) if value}
matches: list[dict[str, Any]] = []
for artifact in artifacts.values():
corpus = _artifact_corpus(artifact)
if active_corpora and corpus not in active_corpora:
continue
if not include_rejected and artifact.current_status == "rejected":
continue
artifact_observations = observations_by_artifact.get(artifact.artifact_id, [])
haystack_parts = [
artifact.title,
artifact.path,
corpus,
str(artifact.metadata.get("document_kind", "")),
str(artifact.metadata.get("author", "")),
str(artifact.metadata.get("canonical_url", "")),
str(artifact.metadata.get("published_at", "")),
]
haystack_parts.extend(observation.text for observation in artifact_observations)
haystack = " ".join(part for part in haystack_parts if part)
if _matches(text, haystack):
matches.append(
{
"artifact": artifact.model_dump(),
"corpus": corpus,
"observation_count": len(artifact_observations),
"matching_text": haystack[:800],
}
)
if len(matches) >= limit:
break
return {
"query_type": "document_search",
"query": text,
"active_corpora": sorted(active_corpora),
"matches": matches,
}
def query_provenance( def query_provenance(
store_dir: str | Path, store_dir: str | Path,
origin_path: str | None = None, origin_path: str | None = None,
@ -178,12 +238,34 @@ def build_query_bundle_for_concept(store_dir: str | Path, concept_ref: str) -> d
} }
def build_search_bundle(
store_dir: str | Path,
text: str,
corpora: list[str] | None = None,
limit: int = 20,
) -> dict[str, Any]:
payload = search_documents(store_dir, text=text, corpora=corpora, limit=limit)
return {
"bundle_kind": "groundrecall_search_bundle",
"query_type": "document_search",
"query": text,
"active_corpora": payload["active_corpora"],
"matches": payload["matches"],
"suggested_next_actions": [
"Open the matching documents and review the artifact metadata.",
"Tighten the corpus filter when the result set is too broad.",
"Use corpus defaults for a site-specific search preset and add others only when needed.",
],
}
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Query canonical GroundRecall objects.") parser = argparse.ArgumentParser(description="Query canonical GroundRecall objects.")
parser.add_argument("store_dir") parser.add_argument("store_dir")
parser.add_argument("query") parser.add_argument("query")
parser.add_argument("--kind", choices=["concept", "claim", "provenance", "bundle"], default="concept") parser.add_argument("--kind", choices=["concept", "claim", "provenance", "bundle", "search"], default="concept")
parser.add_argument("--source-url", default=None) parser.add_argument("--source-url", default=None)
parser.add_argument("--corpus", action="append", default=[])
return parser return parser
@ -195,6 +277,8 @@ def main() -> None:
payload = search_claims(args.store_dir, args.query) payload = search_claims(args.store_dir, args.query)
elif args.kind == "provenance": elif args.kind == "provenance":
payload = query_provenance(args.store_dir, origin_path=args.query, source_url=args.source_url) payload = query_provenance(args.store_dir, origin_path=args.query, source_url=args.source_url)
elif args.kind == "search":
payload = build_search_bundle(args.store_dir, args.query, corpora=list(args.corpus or []))
else: else:
payload = build_query_bundle_for_concept(args.store_dir, args.query) payload = build_query_bundle_for_concept(args.store_dir, args.query)
print(json.dumps(payload, indent=2)) print(json.dumps(payload, indent=2))

View File

@ -0,0 +1,3 @@
from __future__ import annotations
from ..groundrecall_source_adapters.indexcc import * # noqa: F403

View File

@ -0,0 +1,3 @@
from __future__ import annotations
from ..groundrecall_source_adapters.pandasthumb_mt import * # noqa: F403

View File

@ -27,6 +27,8 @@ def test_groundrecall_source_adapter_registry_lists_expected_adapters() -> None:
assert "transcript" in names assert "transcript" in names
assert "didactopus_pack" in names assert "didactopus_pack" in names
assert "doclift_bundle" in names assert "doclift_bundle" in names
assert "indexcc" in names
assert "pandasthumb_mt" in names
def test_detect_llmwiki_adapter(tmp_path: Path) -> None: def test_detect_llmwiki_adapter(tmp_path: Path) -> None:
@ -75,6 +77,70 @@ def test_markdown_notes_adapter_ingests_tex_files(tmp_path: Path) -> None:
assert result.claims assert result.claims
def test_plain_markdown_directory_uses_markdown_notes_adapter(tmp_path: Path) -> None:
(tmp_path / "note.md").write_text("# Operational Note\n\nA plain note.\n", encoding="utf-8")
adapter = detect_source_adapter(tmp_path)
assert adapter.name == "markdown_notes"
def test_indexcc_adapter_import_generates_rows(tmp_path: Path) -> None:
indexcc_dir = tmp_path / "site2_src" / "content" / "indexcc"
indexcc_dir.mkdir(parents=True)
(indexcc_dir / "CA100.md").write_text(
"\n".join(
[
"## Claim",
"",
"Argument from incredulity claim.",
"",
"## Response",
"",
"A lack of imagination is not evidence of impossibility.",
]
),
encoding="utf-8",
)
(indexcc_dir / "CA100.meta.json").write_text(
'{"title": "CA100: Argument from Incredulity", "page_kind": "claim_entry", "legacy_source": "/indexcc/CA/CA100.html"}\n',
encoding="utf-8",
)
result = run_groundrecall_import(tmp_path, mode="quick", import_id="indexcc-test")
assert result.manifest["source_adapter"] == "indexcc"
assert result.manifest["import_intent"] == "grounded_knowledge"
assert result.manifest["fragment_count"] == 0
assert result.artifacts[0]["metadata"]["corpus"] == "indexcc"
assert result.claims[0]["claim_kind"] == "claim_entry"
def test_pandasthumb_mt_adapter_import_generates_article_rows(tmp_path: Path) -> None:
public_html = tmp_path / "public_html"
archive_dir = public_html / "archives" / "2016" / "01"
archive_dir.mkdir(parents=True)
(public_html / "index.html").write_text("<html><body>PT</body></html>\n", encoding="utf-8")
(archive_dir / "sample.html").write_text(
"\n".join(
[
'<h1 class="post-title">Sample Article</h1>',
'<p class="post-meta">Posted 2016-01-01 by <span class="post-author">Author Name</span></p>',
'<div class="post-body"><p>Article body text.</p></div>',
]
),
encoding="utf-8",
)
result = run_groundrecall_import(tmp_path, mode="quick", import_id="ptmt-test")
assert result.manifest["source_adapter"] == "pandasthumb_mt"
assert result.manifest["import_intent"] == "grounded_knowledge"
assert result.manifest["fragment_count"] == 0
assert result.artifacts[0]["metadata"]["corpus"] == "pandasthumb_mt"
assert result.observations[0]["text"] == "Article body text."
def test_tex_import_uses_pandoc_markdown_when_available(tmp_path: Path, monkeypatch) -> None: def test_tex_import_uses_pandoc_markdown_when_available(tmp_path: Path, monkeypatch) -> None:
(tmp_path / "draft.tex").write_text( (tmp_path / "draft.tex").write_text(
"\\section{Ignored by fallback}\n" "\\section{Ignored by fallback}\n"