Add topic review workflow and expansion tooling

This commit is contained in:
welsberr 2026-03-20 07:42:49 -04:00
parent 0491b435a1
commit b74582b72f
22 changed files with 7365 additions and 69 deletions

View File

@ -1,7 +1,7 @@
PYTHONPATH_SRC=PYTHONPATH=src
VENV_PYTHON=.venv/bin/python
.PHONY: test test-live live-smoke
.PHONY: test test-live live-smoke validate-talkorigins
test:
$(PYTHONPATH_SRC) $(VENV_PYTHON) -m pytest -q
@ -11,3 +11,6 @@ test-live:
live-smoke:
CITEGEIST_SOURCE_CACHE=.cache/citegeist $(PYTHONPATH_SRC) $(VENV_PYTHON) scripts/live_smoke.py
validate-talkorigins:
$(PYTHONPATH_SRC) $(VENV_PYTHON) -m citegeist validate-talkorigins talkorigins-out/talkorigins_manifest.json

102
README.md
View File

@ -46,12 +46,17 @@ The initial repo includes:
- `pybtex`-backed BibTeX parsing and export in a repo-local virtual environment;
- a SQLite-backed bibliography store;
- a small CLI for ingest, search, inspection, and export;
- review-state tracking on entries and per-field ingest provenance;
- review-state tracking on entries, per-field ingest provenance, and field-level conflict review;
- plaintext reference extraction into draft BibTeX for numbered, APA-like, wrapped-line, and simple book-style references;
- identifier-first metadata resolution for DOI, OpenAlex, DBLP, and arXiv-backed entries, with OpenAlex title-search fallback;
- identifier-first metadata resolution for DOI, OpenAlex, DBLP, arXiv, and DataCite-backed entries, with OpenAlex/DataCite title-search fallback;
- local citation-graph traversal over stored `cites`, `cited_by`, and `crossref` edges;
- Crossref- and OpenAlex-backed graph expansion that materializes draft related works and edge provenance;
- a dedicated source-client layer with fixture/cache support for live-source development;
- OAI-PMH Dublin Core harvesting for institutional repositories and thesis/dissertation sources;
- OAI-PMH repository discovery via `Identify`, `ListSets`, and `ListMetadataFormats` to target harvests more precisely;
- bibliography bootstrap workflows that can start from a seed `.bib`, a topic phrase, or both;
- batch bootstrap orchestration from JSON job files containing seed BibTeX paths, topic phrases, or both;
- a TalkOrigins scraper that fixes repeated-author plaintext references, emits per-topic seed BibTeX files, and writes a batch JSON specification;
- normalized tables for entries, creators, identifiers, and citation relations;
- full-text-search-ready indexing over title, abstract, and fulltext when SQLite FTS5 is available;
- tests covering parsing, ingestion, relation storage, and search.
@ -113,18 +118,107 @@ Or use the CLI directly:
cd citegeist
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 ingest references.bib
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 search "semantic search"
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 show --provenance smith2024graphs
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 search "origin" --topic abiogenesis
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 show --provenance --conflicts smith2024graphs
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 set-status smith2024graphs reviewed
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 resolve-conflicts smith2024graphs title accepted
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 apply-conflict smith2024graphs title
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 bootstrap --seed-bib seed.bib --topic "bayesian nonparametrics"
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 bootstrap --topic "bayesian nonparametrics" --preview --topic-commit-limit 5
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 scrape-talkorigins talkorigins-out --limit-topics 5 --limit-entries-per-topic 20
PYTHONPATH=src .venv/bin/python -m citegeist extract references.txt --output draft.bib
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 resolve smith2024graphs
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 topics
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 topic-entries abiogenesis
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 export-topic abiogenesis --output abiogenesis.bib
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 graph smith2024graphs --relation cites --depth 2 --missing-only
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 expand smith2024graphs --source crossref
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 expand smith2024graphs --source openalex --relation cited_by --limit 10
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 expand-topic abiogenesis --topic-phrase "abiogenesis origin chemistry" --source openalex --relation cites --seed-key seed2024 --min-relevance 0.3 --preview
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 set-topic-phrase abiogenesis "abiogenesis origin chemistry prebiotic"
PYTHONPATH=src .venv/bin/python -m citegeist discover-oai https://example.edu/oai
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 harvest-oai https://example.edu/oai --metadata-prefix mods --from 2024-01-01 --until 2024-12-31 --limit 10
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 export --output reviewed.bib
```
For live-source development, prefer fixture-backed or cache-backed source clients so resolver and expansion work can be exercised repeatedly without re-hitting upstream APIs on every run.
For large legacy plaintext corpora such as the TalkOrigins bibliography, prefer a two-step workflow:
1. `scrape-talkorigins` to generate cleaned per-topic `seed_bib` files plus a `talkorigins_jobs.json` batch spec.
2. `bootstrap-batch` on that JSON file when you want to ingest, resolve, and expand from the generated seeds.
The TalkOrigins scrape output now includes:
- `seeds/*.bib` per-topic seed BibTeX files for `bootstrap-batch`
- `plaintext/*.txt` per-topic cleaned GSA-style plaintext with repeated authors expanded
- `site/topics/*.html` reconstructed topic pages with hide/show BibTeX blocks
- `talkorigins_full.txt` and `talkorigins_full.bib` aggregate downloads
- `snapshots/*.json` cached topic payloads so reruns can resume without re-fetching already scraped topics
After a full scrape, run:
```bash
PYTHONPATH=src .venv/bin/python -m citegeist validate-talkorigins talkorigins-out/talkorigins_manifest.json
PYTHONPATH=src .venv/bin/python -m citegeist duplicates-talkorigins talkorigins-out/talkorigins_manifest.json --limit 20
PYTHONPATH=src .venv/bin/python -m citegeist duplicates-talkorigins talkorigins-out/talkorigins_manifest.json --limit 20 --preview --weak-only
PYTHONPATH=src .venv/bin/python -m citegeist suggest-talkorigins-phrases talkorigins-out/talkorigins_manifest.json --output topic-phrases.json
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 stage-topic-phrases topic-phrases.json
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 review-topic-phrase abiogenesis accepted --notes "curated from local corpus"
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 apply-topic-phrases topic-phrases.json
PYTHONPATH=src .venv/bin/python -m citegeist --db talkorigins.sqlite3 enrich-talkorigins talkorigins-out/talkorigins_manifest.json --limit 20
PYTHONPATH=src .venv/bin/python -m citegeist --db talkorigins-copy.sqlite3 enrich-talkorigins talkorigins-out/talkorigins_manifest.json --limit 5 --apply --allow-unsafe-search-matches
PYTHONPATH=src .venv/bin/python -m citegeist --db talkorigins.sqlite3 review-talkorigins talkorigins-out/talkorigins_manifest.json --output talkorigins-review.json
PYTHONPATH=src .venv/bin/python -m citegeist --db talkorigins.sqlite3 apply-talkorigins-corrections talkorigins-out/talkorigins_manifest.json talkorigins-corrections.json
```
That report summarizes parse coverage and flags suspicious entry-type / venue combinations for manual cleanup.
It also reports duplicate clusters across topic seed files so you can gauge how much deduplication pressure to expect before ingestion.
Use `duplicates-talkorigins` when you want to inspect specific clusters, filter by text, restrict the audit to one topic slug, or preview only weak canonicalization outcomes before importing.
Use `suggest-talkorigins-phrases` to derive candidate stored expansion phrases from the existing TalkOrigins topic corpus itself. The output is deterministic JSON keyed by topic slug, with a suggested phrase plus the extracted keywords that drove it. This is a useful first pass before setting topic phrases in the database or editing generated batch jobs.
Use `stage-topic-phrases` to load those suggestions into the database as review items. Staging stores the candidate in `suggested_phrase` and marks the topic `pending` without changing the active `expansion_phrase`.
Use `review-topic-phrase` to accept or reject one staged suggestion in place. Accepting a suggestion copies it into `expansion_phrase`; rejecting it preserves the review state without changing the live phrase.
Use `apply-topic-phrases` when you want a direct patch path instead of the staged review flow. It accepts either the raw suggestion list or an object with a `topics` list, and will apply `suggested_phrase` or `phrase` to matching topic slugs immediately.
Use `enrich-talkorigins` when you want to target those weak canonical entries for resolver-based metadata upgrades before retrying graph expansion on imported topic slices.
Use `review-talkorigins` when you want one JSON review artifact that combines weak canonical clusters with dry-run enrichment outcomes for manual cleanup.
Use `expand-topic` when you already have both a topic phrase and a curated topic seed set in the database: it expands outward from the topics existing entries, then only assigns discovered works back to that topic if they clear a topic-relevance threshold. Write-enabled assignment is stricter than preview ranking: a candidate must clear the score threshold and show a non-generic title anchor to the topic phrase, so broad methods papers do not get attached just because their abstracts or related terms overlap. On large noisy topics, prefer `--seed-key` to restrict the run to just the trusted seed entries you want to expand from, and use `--preview` first to inspect discovered candidates and relevance scores before writing anything.
Use `set-topic-phrase` to store a curated expansion phrase on the topic itself. When a stored phrase exists, `expand-topic` will use it automatically if you do not pass `--topic-phrase`. Batch bootstrap jobs can also set `topic_slug`, `topic_name`, and `topic_phrase` so curated topic metadata is created as part of the run.
Use `topics --phrase-review-status pending` when you want to audit only topics whose staged phrase suggestions still need review.
`--allow-unsafe-search-matches` exists only for bounded experiments on copied databases when you explicitly want to relax trust to exercise downstream expansion behavior.
Correction files are simple JSON:
```json
{
"corrections": [
{
"key": "smith jane|1999|weak duplicate",
"entry_type": "article",
"review_status": "reviewed",
"fields": {
"journal": "Journal of Better Metadata",
"doi": "10.1000/weak",
"note": null
}
}
]
}
```
`fields` values overwrite the canonical entry for that duplicate-cluster key. Set a field to `null` to remove it.
To import the reconstructed corpus into SQLite while collapsing duplicate works across topics into canonical entries:
```bash
PYTHONPATH=src .venv/bin/python -m citegeist --db talkorigins.sqlite3 ingest-talkorigins talkorigins-out/talkorigins_manifest.json
```
That import preserves many-to-many topic membership through the `topics` and `entry_topics` tables.
After import, use `topics`, `topic-entries`, `search --topic`, and `export-topic` to inspect or export topic slices from the consolidated database.
Live-source workflow:
```bash
@ -147,7 +241,7 @@ make live-smoke
## Near-Term Priorities
- additional resolvers and expansion paths for non-DOI scholarly ecosystems.
- source adapters beyond OAI-PMH for additional non-DOI scholarly ecosystems.
See [ROADMAP.md](./ROADMAP.md) for the prioritized phase plan and rationale.

View File

@ -1,18 +1,52 @@
from .batch import BatchBootstrapRunner, BatchJobResult, load_batch_jobs
from .bibtex import BibEntry, parse_bibtex
from .bootstrap import BootstrapResult, Bootstrapper
from .expand import CrossrefExpander, OpenAlexExpander
from .extract import extract_references
from .resolve import MetadataResolver, merge_entries
from .harvest import OaiMetadataFormat, OaiPmhHarvester, OaiSet
from .resolve import MetadataResolver, merge_entries, merge_entries_with_conflicts
from .sources import SourceClient
from .storage import BibliographyStore
from .talkorigins import (
TalkOriginsBatchExport,
TalkOriginsDuplicateCluster,
TalkOriginsEnrichmentResult,
TalkOriginsIngestReport,
TalkOriginsReviewExport,
TalkOriginsScraper,
TalkOriginsSeedSet,
TalkOriginsTopicPhraseSuggestion,
TalkOriginsTopic,
TalkOriginsValidationReport,
)
__all__ = [
"BibEntry",
"BatchBootstrapRunner",
"BatchJobResult",
"BibliographyStore",
"BootstrapResult",
"Bootstrapper",
"CrossrefExpander",
"MetadataResolver",
"OpenAlexExpander",
"OaiPmhHarvester",
"OaiMetadataFormat",
"OaiSet",
"SourceClient",
"TalkOriginsBatchExport",
"TalkOriginsDuplicateCluster",
"TalkOriginsEnrichmentResult",
"TalkOriginsIngestReport",
"TalkOriginsReviewExport",
"TalkOriginsScraper",
"TalkOriginsSeedSet",
"TalkOriginsTopicPhraseSuggestion",
"TalkOriginsTopic",
"TalkOriginsValidationReport",
"extract_references",
"load_batch_jobs",
"merge_entries",
"merge_entries_with_conflicts",
"parse_bibtex",
]

78
src/citegeist/batch.py Normal file
View File

@ -0,0 +1,78 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from .bootstrap import BootstrapResult, Bootstrapper
from .storage import BibliographyStore
@dataclass(slots=True)
class BatchJobResult:
job_name: str
result_count: int
results: list[BootstrapResult]
def load_batch_jobs(path: str | Path) -> list[dict]:
path = Path(path)
payload = json.loads(path.read_text(encoding="utf-8"))
if isinstance(payload, dict):
jobs = payload.get("jobs", [])
else:
jobs = payload
if not isinstance(jobs, list):
raise ValueError("Batch JSON must be a list of jobs or an object with a 'jobs' list")
normalized_jobs: list[dict] = []
for job in jobs:
if not isinstance(job, dict):
raise ValueError("Each batch job must be an object")
normalized = dict(job)
seed_bib = normalized.get("seed_bib")
if isinstance(seed_bib, str) and seed_bib:
seed_path = Path(seed_bib)
if not seed_path.is_absolute():
normalized["seed_bib"] = str((path.parent / seed_path).resolve())
normalized_jobs.append(normalized)
return normalized_jobs
class BatchBootstrapRunner:
def __init__(self, bootstrapper: Bootstrapper | None = None) -> None:
self.bootstrapper = bootstrapper or Bootstrapper()
def run(self, store: BibliographyStore, jobs: list[dict]) -> list[BatchJobResult]:
results: list[BatchJobResult] = []
for index, job in enumerate(jobs, start=1):
seed_bib = job.get("seed_bib")
topic = job.get("topic")
topic_limit = int(job.get("topic_limit", 5))
topic_commit_limit = job.get("topic_commit_limit")
expand = bool(job.get("expand", True))
review_status = str(job.get("status", "draft"))
preview = bool(job.get("preview", False))
name = str(job.get("name") or f"job_{index}")
topic_slug = job.get("topic_slug")
topic_name = job.get("topic_name")
topic_phrase = job.get("topic_phrase")
seed_bibtex = None
if seed_bib:
seed_bibtex = Path(seed_bib).read_text(encoding="utf-8")
job_results = self.bootstrapper.bootstrap(
store,
seed_bibtex=seed_bibtex,
topic=topic,
topic_limit=topic_limit,
topic_commit_limit=int(topic_commit_limit) if topic_commit_limit is not None else None,
expand=expand,
review_status=review_status,
preview_only=preview,
topic_slug=str(topic_slug) if topic_slug else None,
topic_name=str(topic_name) if topic_name else None,
topic_phrase=str(topic_phrase) if topic_phrase else None,
)
results.append(BatchJobResult(name, len(job_results), job_results))
return results

View File

@ -5,8 +5,10 @@ from io import StringIO
try:
from pybtex.database import BibliographyData, Entry, Person, parse_string
from pybtex.bibtex.exceptions import BibTeXError
from pybtex.database.output.bibtex import Writer
except ImportError: # pragma: no cover - exercised only outside the configured venv
BibTeXError = None
BibliographyData = Entry = Person = Writer = None
parse_string = None
@ -40,7 +42,11 @@ def render_bibtex(entries: list[BibEntry]) -> str:
_require_pybtex()
bibliography_entries = {}
for entry in entries:
fields = {key: value for key, value in entry.fields.items() if key not in {"author", "editor"}}
fields = {
key: _sanitize_bibtex_value(value)
for key, value in entry.fields.items()
if key not in {"author", "editor"}
}
persons = {}
for role in ("author", "editor"):
raw_names = entry.fields.get(role)
@ -49,7 +55,24 @@ def render_bibtex(entries: list[BibEntry]) -> str:
bibliography_entries[entry.citation_key] = Entry(entry.entry_type, fields=fields, persons=persons)
buffer = StringIO()
try:
Writer().write_stream(BibliographyData(entries=bibliography_entries), buffer)
except BibTeXError:
conservative_entries = {}
for entry in entries:
fields = {
key: _flatten_bibtex_braces(value)
for key, value in entry.fields.items()
if key not in {"author", "editor"}
}
persons = {}
for role in ("author", "editor"):
raw_names = entry.fields.get(role)
if raw_names:
persons[role] = [Person(name.strip()) for name in raw_names.split(" and ") if name.strip()]
conservative_entries[entry.citation_key] = Entry(entry.entry_type, fields=fields, persons=persons)
buffer = StringIO()
Writer().write_stream(BibliographyData(entries=conservative_entries), buffer)
return buffer.getvalue().strip()
@ -58,3 +81,36 @@ def _require_pybtex() -> None:
raise RuntimeError(
"pybtex is required. Use the repo-local virtual environment under .venv/ for citegeist commands."
)
def _sanitize_bibtex_value(value: str) -> str:
depth = 0
parts: list[str] = []
for char in value:
if char == "{":
depth += 1
parts.append(char)
continue
if char == "}":
if depth == 0:
parts.append(")")
else:
depth -= 1
parts.append(char)
continue
parts.append(char)
if depth > 0:
open_count = depth
normalized = []
for char in parts:
if char == "{" and open_count > 0:
normalized.append("(")
open_count -= 1
else:
normalized.append(char)
return "".join(normalized)
return "".join(parts)
def _flatten_bibtex_braces(value: str) -> str:
return value.replace("{", "(").replace("}", ")")

145
src/citegeist/bootstrap.py Normal file
View File

@ -0,0 +1,145 @@
from __future__ import annotations
from dataclasses import dataclass
import re
from .bibtex import BibEntry, parse_bibtex
from .expand import CrossrefExpander, OpenAlexExpander
from .resolve import MetadataResolver
from .storage import BibliographyStore
@dataclass(slots=True)
class BootstrapResult:
citation_key: str
origin: str
created: bool
score: float = 0.0
class Bootstrapper:
def __init__(
self,
resolver: MetadataResolver | None = None,
crossref_expander: CrossrefExpander | None = None,
openalex_expander: OpenAlexExpander | None = None,
) -> None:
self.resolver = resolver or MetadataResolver()
self.crossref_expander = crossref_expander or CrossrefExpander(self.resolver)
self.openalex_expander = openalex_expander or OpenAlexExpander(self.resolver)
def bootstrap(
self,
store: BibliographyStore,
seed_bibtex: str | None = None,
topic: str | None = None,
topic_limit: int = 5,
topic_commit_limit: int | None = None,
expand: bool = True,
review_status: str = "draft",
preview_only: bool = False,
topic_slug: str | None = None,
topic_name: str | None = None,
topic_phrase: str | None = None,
) -> list[BootstrapResult]:
results: list[BootstrapResult] = []
seed_keys: list[str] = []
if seed_bibtex:
for entry in parse_bibtex(seed_bibtex):
created = store.get_entry(entry.citation_key) is None
if not preview_only:
store.upsert_entry(
entry,
raw_bibtex=None,
source_type="bootstrap",
source_label="seed_bibtex",
review_status=review_status,
)
seed_keys.append(entry.citation_key)
results.append(BootstrapResult(entry.citation_key, "seed_bibtex", created))
if topic:
if not preview_only and (topic_slug or topic_name or topic_phrase):
store.ensure_topic(
slug=topic_slug or _slugify(topic),
name=topic_name or topic,
source_type="bootstrap",
expansion_phrase=topic_phrase or topic,
)
ranked_candidates = self._topic_candidates(topic, seed_keys, topic_limit)
if topic_commit_limit is not None:
ranked_candidates = ranked_candidates[:topic_commit_limit]
for entry, score in ranked_candidates:
created = store.get_entry(entry.citation_key) is None
if not preview_only:
store.upsert_entry(
entry,
raw_bibtex=None,
source_type="bootstrap",
source_label=f"topic:{topic}",
review_status=review_status,
)
seed_keys.append(entry.citation_key)
results.append(BootstrapResult(entry.citation_key, "topic", created, score=score))
if expand and not preview_only:
expanded_keys = list(dict.fromkeys(seed_keys))
for citation_key in expanded_keys:
for item in self.crossref_expander.expand_entry_references(store, citation_key):
results.append(BootstrapResult(item.discovered_citation_key, "crossref_expand", item.created_entry))
for item in self.openalex_expander.expand_entry(store, citation_key, relation_type="cites", limit=topic_limit):
results.append(BootstrapResult(item.discovered_citation_key, "openalex_expand", item.created_entry))
store.connection.commit()
return results
def _topic_candidates(self, topic: str, seed_keys: list[str], limit: int) -> list[tuple[BibEntry, float]]:
scored: dict[str, tuple[BibEntry, float]] = {}
for source_name, base_score, entries in (
("openalex", 3.0, self.resolver.search_openalex(topic, limit=limit)),
("crossref", 2.0, self.resolver.search_crossref(topic, limit=limit)),
("datacite", 1.5, self.resolver.search_datacite(topic, limit=limit)),
):
for entry in entries:
score = base_score + _topic_relevance_score(entry, topic) + _seed_overlap_score(entry, seed_keys)
existing = scored.get(entry.citation_key)
if existing is None or score > existing[1]:
scored[entry.citation_key] = (entry, score)
ranked = sorted(
scored.values(),
key=lambda item: (-item[1], item[0].citation_key),
)
return ranked[:limit]
def _topic_relevance_score(entry: BibEntry, topic: str) -> float:
topic_terms = _tokenize(topic)
title_terms = _tokenize(entry.fields.get("title", ""))
abstract_terms = _tokenize(entry.fields.get("abstract", ""))
overlap = len(topic_terms & (title_terms | abstract_terms))
return float(overlap)
def _seed_overlap_score(entry: BibEntry, seed_keys: list[str]) -> float:
if not seed_keys:
return 0.0
title_terms = _tokenize(entry.fields.get("title", ""))
score = 0.0
for seed_key in seed_keys:
seed_terms = _tokenize(seed_key)
if seed_terms & title_terms:
score += 0.25
return score
def _tokenize(value: str) -> set[str]:
return {token for token in re.split(r"\W+", value.lower()) if token}
def _slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "topic"

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,20 @@ class ExpansionResult:
source_label: str
@dataclass(slots=True)
class TopicExpansionResult:
topic_slug: str
source_citation_key: str
discovered_citation_key: str
discovered_title: str
created_entry: bool
relation_type: str
source_label: str
relevance_score: float
meets_relevance_threshold: bool
assigned_to_topic: bool
class CrossrefExpander:
def __init__(self, resolver: MetadataResolver | None = None) -> None:
self.resolver = resolver or MetadataResolver()
@ -163,6 +177,192 @@ class OpenAlexExpander:
return _normalize_openalex_id(results[0].get("id", ""))
class TopicExpander:
def __init__(
self,
crossref_expander: CrossrefExpander | None = None,
openalex_expander: OpenAlexExpander | None = None,
) -> None:
self.crossref_expander = crossref_expander or CrossrefExpander()
self.openalex_expander = openalex_expander or OpenAlexExpander()
def expand_topic(
self,
store: BibliographyStore,
topic_slug: str,
topic_phrase: str | None = None,
source: str = "openalex",
relation_type: str = "cites",
seed_limit: int = 25,
per_seed_limit: int = 25,
min_relevance: float = 0.2,
seed_keys: list[str] | None = None,
preview_only: bool = False,
) -> list[TopicExpansionResult]:
topic = store.get_topic(topic_slug)
if topic is None:
return []
phrase = (topic_phrase or str(topic.get("name") or topic_slug)).strip()
seeds = store.list_topic_entries(topic_slug, limit=seed_limit)
if seed_keys:
allowed = set(seed_keys)
seeds = [seed for seed in seeds if str(seed["citation_key"]) in allowed]
results: list[TopicExpansionResult] = []
for seed in seeds:
seed_key = str(seed["citation_key"])
if preview_only:
discovered_rows = self._preview_discoveries(
store,
seed_key,
source=source,
relation_type=relation_type,
limit=per_seed_limit,
)
else:
discovered_rows = self._materialized_discoveries(
store,
seed_key,
source=source,
relation_type=relation_type,
limit=per_seed_limit,
)
for row, target_entry in discovered_rows:
score = _topic_relevance_score(phrase, target_entry)
meets_threshold = _meets_topic_assignment_threshold(
phrase,
target_entry,
min_relevance=min_relevance,
relevance_score=score,
)
assigned = False
if not preview_only and meets_threshold and target_entry is not None:
assigned = store.add_entry_topic(
row.discovered_citation_key,
topic_slug=topic_slug,
topic_name=str(topic.get("name") or topic_slug),
source_type="topic_expand",
source_url=str(topic.get("source_url") or ""),
source_label=f"{source}:{relation_type}:{seed_key}",
confidence=score,
)
results.append(
TopicExpansionResult(
topic_slug=topic_slug,
source_citation_key=row.source_citation_key,
discovered_citation_key=row.discovered_citation_key,
discovered_title=str(target_entry.get("title") or ""),
created_entry=row.created_entry,
relation_type=row.relation_type,
source_label=row.source_label,
relevance_score=score,
meets_relevance_threshold=meets_threshold,
assigned_to_topic=assigned,
)
)
store.connection.commit()
return results
def _materialized_discoveries(
self,
store: BibliographyStore,
citation_key: str,
source: str,
relation_type: str,
limit: int,
) -> list[tuple[ExpansionResult, dict[str, object] | None]]:
if source == "crossref":
expansion_rows = self.crossref_expander.expand_entry_references(store, citation_key)
else:
expansion_rows = self.openalex_expander.expand_entry(
store,
citation_key,
relation_type=relation_type,
limit=limit,
)
return [(row, store.get_entry(row.discovered_citation_key)) for row in expansion_rows]
def _preview_discoveries(
self,
store: BibliographyStore,
citation_key: str,
source: str,
relation_type: str,
limit: int,
) -> list[tuple[ExpansionResult, dict[str, object]]]:
if source == "crossref":
return self._preview_crossref_discoveries(store, citation_key, limit)
return self._preview_openalex_discoveries(store, citation_key, relation_type, limit)
def _preview_crossref_discoveries(
self,
store: BibliographyStore,
citation_key: str,
limit: int,
) -> list[tuple[ExpansionResult, dict[str, object]]]:
entry = store.get_entry(citation_key)
if entry is None or not entry.get("doi"):
return []
doi = str(entry["doi"])
payload = self.crossref_expander.resolver.source_client.get_json(
f"https://api.crossref.org/works/{doi}?mailto=welsberr@gmail.com"
)
references = payload.get("message", {}).get("reference", [])[:limit]
rows: list[tuple[ExpansionResult, dict[str, object]]] = []
for index, reference in enumerate(references, start=1):
discovered = _crossref_reference_to_entry(reference, citation_key, index)
rows.append(
(
ExpansionResult(
source_citation_key=citation_key,
discovered_citation_key=discovered.citation_key,
created_entry=store.get_entry(discovered.citation_key) is None,
relation_type="cites",
source_label=f"crossref:references:{doi}",
),
dict(discovered.fields),
)
)
return rows
def _preview_openalex_discoveries(
self,
store: BibliographyStore,
citation_key: str,
relation_type: str,
limit: int,
) -> list[tuple[ExpansionResult, dict[str, object]]]:
entry = store.get_entry(citation_key)
if entry is None:
return []
openalex_id = entry.get("openalex") or self.openalex_expander._lookup_openalex_id(entry)
if not openalex_id:
return []
filter_name = "cited_by" if relation_type == "cites" else "cites"
query = urlencode({"filter": f"{filter_name}:{openalex_id}", "per-page": limit})
payload = self.openalex_expander.resolver.source_client.get_json(f"https://api.openalex.org/works?{query}")
works = payload.get("results", [])
rows: list[tuple[ExpansionResult, dict[str, object]]] = []
for work in works:
discovered = _openalex_work_to_entry(work)
source_key = citation_key if relation_type == "cites" else discovered.citation_key
rows.append(
(
ExpansionResult(
source_citation_key=source_key,
discovered_citation_key=discovered.citation_key,
created_entry=store.get_entry(discovered.citation_key) is None,
relation_type=relation_type,
source_label=f"openalex:{relation_type}:{openalex_id}",
),
dict(discovered.fields),
)
)
return rows
def _crossref_reference_to_entry(reference: dict, source_citation_key: str, ordinal: int) -> BibEntry:
title = (
reference.get("article-title")
@ -211,6 +411,115 @@ def _normalize_text(value: str) -> str:
return " ".join(value.split())
def _topic_relevance_score(topic_phrase: str, entry: dict[str, object] | None) -> float:
if entry is None:
return 0.0
topic_terms = _expanded_keyword_terms(topic_phrase)
if not topic_terms:
return 0.0
title_terms = _expanded_keyword_terms(str(entry.get("title") or ""))
abstract_terms = _expanded_keyword_terms(str(entry.get("abstract") or ""))
keyword_terms = _expanded_keyword_terms(str(entry.get("keywords") or ""))
venue_terms = _expanded_keyword_terms(" ".join(str(entry.get(field) or "") for field in ("journal", "booktitle")))
score = 0.0
score += 0.6 * _term_overlap_ratio(topic_terms, title_terms)
score += 0.25 * _term_overlap_ratio(topic_terms, abstract_terms)
score += 0.1 * _term_overlap_ratio(topic_terms, keyword_terms)
score += 0.05 * _term_overlap_ratio(topic_terms, venue_terms)
phrase = _normalize_text(topic_phrase.casefold())
title = _normalize_text(str(entry.get("title") or "").casefold())
if phrase and title and phrase in title:
score = max(score, 0.75)
return min(score, 1.0)
def _meets_topic_assignment_threshold(
topic_phrase: str,
entry: dict[str, object] | None,
min_relevance: float,
relevance_score: float | None = None,
) -> bool:
if entry is None:
return False
score = relevance_score if relevance_score is not None else _topic_relevance_score(topic_phrase, entry)
if score < min_relevance:
return False
title_anchor = _title_topic_anchor_ratio(topic_phrase, str(entry.get("title") or ""))
return title_anchor >= 0.2
def _keyword_terms(text: str) -> set[str]:
return {
_normalize_keyword(term)
for term in re.findall(r"[A-Za-z0-9]+", text.casefold())
if len(term) >= 4
}
def _expanded_keyword_terms(text: str) -> set[str]:
terms = _keyword_terms(text)
expanded = set(terms)
for term in terms:
expanded.update(_related_topic_terms(term))
return expanded
def _title_topic_anchor_ratio(topic_phrase: str, title: str) -> float:
normalized_phrase = _normalize_text(topic_phrase.casefold())
normalized_title = _normalize_text(title.casefold())
if normalized_phrase and normalized_title and normalized_phrase in normalized_title:
return 1.0
topic_terms = _core_topic_terms(topic_phrase)
title_terms = _keyword_terms(title)
if not topic_terms or not title_terms:
return 0.0
overlap = topic_terms & title_terms
if overlap:
return max(0.25, len(overlap) / len(topic_terms))
return 0.0
def _core_topic_terms(topic_phrase: str) -> set[str]:
generic_terms = {"evolution", "origin", "origins", "science", "study", "studies"}
return {term for term in _keyword_terms(topic_phrase) if term not in generic_terms}
def _term_overlap_ratio(topic_terms: set[str], candidate_terms: set[str]) -> float:
if not topic_terms or not candidate_terms:
return 0.0
return len(topic_terms & candidate_terms) / len(topic_terms)
def _normalize_keyword(term: str) -> str:
normalized = term.casefold()
for suffix in ("isms", "ists", "ation", "ment", "ings", "ness", "isms", "ism", "ist", "ing", "ers", "er", "ies", "ied", "ed", "es", "s"):
if len(normalized) > len(suffix) + 3 and normalized.endswith(suffix):
if suffix in {"ies", "ied"}:
return normalized[: -len(suffix)] + "y"
return normalized[: -len(suffix)]
return normalized
def _related_topic_terms(term: str) -> set[str]:
related_groups = (
{"human", "hominid", "hominin", "homo"},
{"chimpanzee", "chimp", "pan", "ape", "apes", "primate"},
{"primate", "primate", "ape", "apes", "hominid", "hominin"},
{"evolution", "evolutionary", "phylogeny", "phylogen", "ancestor", "ancestral"},
{"origin", "origins", "abiogenesis", "prebiotic"},
{"morphometry", "morphology", "cranial", "dental", "skeletal", "body"},
{"paleolithic", "paleoanthropology", "paleoanthropological", "pleistocene", "fossil"},
)
for group in related_groups:
if term in group:
return group - {term}
return set()
def _openalex_work_to_entry(work: dict) -> BibEntry:
title = _normalize_text(work.get("display_name", "") or "Untitled work")
year = str(work.get("publication_year") or "")

317
src/citegeist/harvest.py Normal file
View File

@ -0,0 +1,317 @@
from __future__ import annotations
from dataclasses import dataclass
from urllib.parse import urlencode
import xml.etree.ElementTree as ET
from .bibtex import BibEntry
from .sources import SourceClient
NS = {
"oai": "http://www.openarchives.org/OAI/2.0/",
"oai_dc": "http://www.openarchives.org/OAI/2.0/oai_dc/",
"dc": "http://purl.org/dc/elements/1.1/",
"mods": "http://www.loc.gov/mods/v3",
}
@dataclass(slots=True)
class HarvestResult:
base_url: str
identifier: str
entry: BibEntry
@dataclass(slots=True)
class OaiSet:
set_spec: str
set_name: str
set_description: str = ""
@dataclass(slots=True)
class OaiMetadataFormat:
metadata_prefix: str
schema: str
metadata_namespace: str
class OaiPmhHarvester:
def __init__(self, source_client: SourceClient | None = None) -> None:
self.source_client = source_client or SourceClient()
def identify(self, base_url: str) -> dict[str, str]:
root = self.source_client.get_xml(f"{base_url}?{urlencode({'verb': 'Identify'})}")
identify = root.find(".//oai:Identify", NS)
if identify is None:
return {}
payload: dict[str, str] = {}
for field_name in (
"repositoryName",
"baseURL",
"protocolVersion",
"adminEmail",
"earliestDatestamp",
"deletedRecord",
"granularity",
):
payload[field_name] = _node_text(identify.find(f"oai:{field_name}", NS))
return payload
def list_sets(self, base_url: str) -> list[OaiSet]:
root = self.source_client.get_xml(f"{base_url}?{urlencode({'verb': 'ListSets'})}")
sets = root.findall(".//oai:set", NS)
results: list[OaiSet] = []
for node in sets:
results.append(
OaiSet(
set_spec=_node_text(node.find("oai:setSpec", NS)),
set_name=_node_text(node.find("oai:setName", NS)),
set_description=_flatten_set_description(node.find("oai:setDescription", NS)),
)
)
return results
def list_metadata_formats(self, base_url: str, identifier: str | None = None) -> list[OaiMetadataFormat]:
params = {"verb": "ListMetadataFormats"}
if identifier:
params["identifier"] = identifier
root = self.source_client.get_xml(f"{base_url}?{urlencode(params)}")
formats = root.findall(".//oai:metadataFormat", NS)
results: list[OaiMetadataFormat] = []
for node in formats:
results.append(
OaiMetadataFormat(
metadata_prefix=_node_text(node.find("oai:metadataPrefix", NS)),
schema=_node_text(node.find("oai:schema", NS)),
metadata_namespace=_node_text(node.find("oai:metadataNamespace", NS)),
)
)
return results
def list_records(
self,
base_url: str,
metadata_prefix: str = "oai_dc",
set_spec: str | None = None,
date_from: str | None = None,
date_until: str | None = None,
limit: int | None = None,
) -> list[HarvestResult]:
results: list[HarvestResult] = []
params = {"verb": "ListRecords", "metadataPrefix": metadata_prefix}
if set_spec:
params["set"] = set_spec
if date_from:
params["from"] = date_from
if date_until:
params["until"] = date_until
ordinal = 1
next_url = f"{base_url}?{urlencode(params)}"
while next_url:
root = self.source_client.get_xml(next_url)
records = root.findall(".//oai:record", NS)
for record in records:
parsed = self._record_to_result(base_url, record, ordinal)
ordinal += 1
if parsed is not None:
results.append(parsed)
if limit is not None and len(results) >= limit:
return results
next_url = self._resumption_url(base_url, root)
return results
def get_record(
self,
base_url: str,
identifier: str,
metadata_prefix: str = "oai_dc",
) -> HarvestResult | None:
params = {
"verb": "GetRecord",
"metadataPrefix": metadata_prefix,
"identifier": identifier,
}
root = self.source_client.get_xml(f"{base_url}?{urlencode(params)}")
record = root.find(".//oai:record", NS)
if record is None:
return None
return self._record_to_result(base_url, record, 1)
def _record_to_result(self, base_url: str, record: ET.Element, ordinal: int) -> HarvestResult | None:
identifier = _node_text(record.find("./oai:header/oai:identifier", NS))
metadata_node = record.find("./oai:metadata/*", NS)
if metadata_node is None or not identifier:
return None
entry = _metadata_node_to_entry(base_url, identifier, metadata_node, ordinal)
return HarvestResult(base_url=base_url, identifier=identifier, entry=entry)
def _resumption_url(self, base_url: str, root: ET.Element) -> str | None:
token = _node_text(root.find(".//oai:resumptionToken", NS))
if not token:
return None
return f"{base_url}?{urlencode({'verb': 'ListRecords', 'resumptionToken': token})}"
def _oai_dc_to_entry(base_url: str, identifier: str, metadata: ET.Element, ordinal: int) -> BibEntry:
titles = _all_text(metadata.findall("dc:title", NS))
creators = _all_text(metadata.findall("dc:creator", NS))
dates = _all_text(metadata.findall("dc:date", NS))
descriptions = _all_text(metadata.findall("dc:description", NS))
identifiers = _all_text(metadata.findall("dc:identifier", NS))
publishers = _all_text(metadata.findall("dc:publisher", NS))
types = [value.lower() for value in _all_text(metadata.findall("dc:type", NS))]
title = titles[0] if titles else "Untitled record"
year = _first_year(dates)
entry_type = _guess_oai_entry_type(types)
fields: dict[str, str] = {
"title": title,
"oai": identifier,
"url": _best_identifier_url(identifiers) or f"{base_url}?verb=GetRecord&identifier={identifier}&metadataPrefix=oai_dc",
"note": "harvested_from = {oai_pmh}",
}
if creators:
fields["author"] = " and ".join(creators)
if year:
fields["year"] = year
if descriptions:
fields["abstract"] = descriptions[0]
if publishers:
fields["publisher"] = publishers[0]
citation_key = _oai_citation_key(creators, year, title, ordinal)
return BibEntry(entry_type=entry_type, citation_key=citation_key, fields=fields)
def _mods_to_entry(base_url: str, identifier: str, metadata: ET.Element, ordinal: int) -> BibEntry:
title = _node_text(metadata.find(".//mods:titleInfo/mods:title", NS)) or "Untitled record"
sub_title = _node_text(metadata.find(".//mods:titleInfo/mods:subTitle", NS))
if sub_title:
title = f"{title}: {sub_title}"
creators: list[str] = []
for name in metadata.findall(".//mods:name", NS):
role_terms = [term.text or "" for term in name.findall(".//mods:roleTerm", NS)]
if role_terms and not any(term.lower() == "author" for term in role_terms):
continue
parts = [_node_text(part) for part in name.findall("./mods:namePart", NS)]
parts = [part for part in parts if part]
if parts:
creators.append(", ".join(parts) if len(parts) == 2 else " ".join(parts))
year = ""
for date_node in metadata.findall(".//mods:originInfo/mods:dateIssued", NS):
text = _node_text(date_node)
if len(text) >= 4 and text[:4].isdigit():
year = text[:4]
break
publisher = _node_text(metadata.find(".//mods:originInfo/mods:publisher", NS))
abstract = _node_text(metadata.find(".//mods:abstract", NS))
genre = _node_text(metadata.find(".//mods:genre", NS)).lower()
related_title = _node_text(metadata.find(".//mods:relatedItem/mods:titleInfo/mods:title", NS))
url = _node_text(metadata.find(".//mods:location/mods:url", NS))
entry_type = "phdthesis" if "thesis" in genre or "dissertation" in genre else "misc"
if not entry_type == "phdthesis":
if related_title:
entry_type = "article"
fields: dict[str, str] = {
"title": title,
"oai": identifier,
"url": url or f"{base_url}?verb=GetRecord&identifier={identifier}&metadataPrefix=mods",
"note": "harvested_from = {oai_pmh_mods}",
}
if creators:
fields["author"] = " and ".join(creators)
if year:
fields["year"] = year
if publisher:
fields["publisher"] = publisher
if abstract:
fields["abstract"] = abstract
if related_title:
fields["journal"] = related_title
citation_key = _oai_citation_key(creators, year, title, ordinal)
return BibEntry(entry_type=entry_type, citation_key=citation_key, fields=fields)
def _metadata_node_to_entry(base_url: str, identifier: str, metadata: ET.Element, ordinal: int) -> BibEntry:
if metadata.tag.endswith("dc"):
return _oai_dc_to_entry(base_url, identifier, metadata, ordinal)
if metadata.tag.endswith("mods"):
return _mods_to_entry(base_url, identifier, metadata, ordinal)
return BibEntry(
entry_type="misc",
citation_key=_oai_citation_key([], "", identifier, ordinal),
fields={
"title": identifier,
"oai": identifier,
"url": f"{base_url}?verb=GetRecord&identifier={identifier}",
"note": f"unsupported_oai_metadata = {{{metadata.tag}}}",
},
)
def _node_text(node: ET.Element | None) -> str:
if node is None or node.text is None:
return ""
return " ".join(node.text.split())
def _all_text(nodes: list[ET.Element]) -> list[str]:
values = []
for node in nodes:
value = _node_text(node)
if value:
values.append(value)
return values
def _first_year(dates: list[str]) -> str:
for date in dates:
if len(date) >= 4 and date[:4].isdigit():
return date[:4]
return ""
def _guess_oai_entry_type(types: list[str]) -> str:
joined = " ".join(types)
if "thesis" in joined or "dissertation" in joined:
return "phdthesis"
if "article" in joined:
return "article"
if "book" in joined:
return "book"
return "misc"
def _best_identifier_url(identifiers: list[str]) -> str:
for identifier in identifiers:
if identifier.startswith("http://") or identifier.startswith("https://"):
return identifier
return ""
def _oai_citation_key(creators: list[str], year: str, title: str, ordinal: int) -> str:
author = creators[0] if creators else "oai"
family = author.split(",")[0] if "," in author else author.split()[-1]
family = "".join(ch for ch in family.lower() if ch.isalnum()) or "oai"
first_word = "".join(ch for ch in title.split()[0].lower() if ch.isalnum()) if title.split() else "untitled"
return f"{family}{year or 'nd'}{first_word}{ordinal}"
def _flatten_set_description(node: ET.Element | None) -> str:
if node is None:
return ""
parts = []
for child in node.iter():
if child.text and child.text.strip():
parts.append(" ".join(child.text.split()))
return " ".join(parts)

View File

@ -30,6 +30,9 @@ class MetadataResolver:
resolved = self.resolve_doi(doi)
if resolved is not None:
return resolved
resolved = self.resolve_datacite_doi(doi)
if resolved is not None:
return resolved
if openalex_id := entry.fields.get("openalex"):
resolved = self.resolve_openalex(openalex_id)
@ -47,6 +50,20 @@ class MetadataResolver:
return resolved
if title := entry.fields.get("title"):
resolved = self.search_crossref_best_match(
title=title,
author_text=entry.fields.get("author", ""),
year=entry.fields.get("year", ""),
)
if resolved is not None:
return resolved
resolved = self.search_datacite_best_match(
title=title,
author_text=entry.fields.get("author", ""),
year=entry.fields.get("year", ""),
)
if resolved is not None:
return resolved
resolved = self.search_openalex_best_match(
title=title,
author_text=entry.fields.get("author", ""),
@ -75,6 +92,26 @@ class MetadataResolver:
items = payload.get("message", {}).get("items", [])
return [_crossref_message_to_entry(item) for item in items]
def search_crossref_best_match(
self,
title: str,
author_text: str = "",
year: str = "",
) -> Resolution | None:
candidate = _select_best_title_match(
self.search_crossref(title, limit=5),
title=title,
author_text=author_text,
year=year,
)
if candidate is None:
return None
return Resolution(
entry=candidate,
source_type="resolver",
source_label=f"crossref:search:{title}",
)
def resolve_dblp(self, dblp_key: str) -> Resolution | None:
encoded_key = urllib.parse.quote(dblp_key, safe="/:")
text = self.source_client.get_text(f"https://dblp.org/rec/{encoded_key}.bib")
@ -128,6 +165,43 @@ class MetadataResolver:
source_label=f"openalex:id:{normalized_id}",
)
def resolve_datacite_doi(self, doi: str) -> Resolution | None:
encoded = urllib.parse.quote(doi, safe="")
payload = self.source_client.get_json(f"https://api.datacite.org/dois/{encoded}")
data = payload.get("data", {})
if not data:
return None
return Resolution(
entry=_datacite_work_to_entry(data),
source_type="resolver",
source_label=f"datacite:doi:{doi}",
)
def search_datacite(self, title: str, limit: int = 5) -> list[BibEntry]:
query = urllib.parse.urlencode({"query": title, "page[size]": limit})
payload = self.source_client.get_json(f"https://api.datacite.org/dois?{query}")
return [_datacite_work_to_entry(item) for item in payload.get("data", [])]
def search_datacite_best_match(
self,
title: str,
author_text: str = "",
year: str = "",
) -> Resolution | None:
candidate = _select_best_title_match(
self.search_datacite(title, limit=5),
title=title,
author_text=author_text,
year=year,
)
if candidate is None:
return None
return Resolution(
entry=candidate,
source_type="resolver",
source_label=f"datacite:search:{title}",
)
def search_openalex(self, title: str, limit: int = 5) -> list[BibEntry]:
query = urllib.parse.urlencode({"search": title, "per-page": limit})
payload = self.source_client.get_json(f"https://api.openalex.org/works?{query}")
@ -139,42 +213,50 @@ class MetadataResolver:
author_text: str = "",
year: str = "",
) -> Resolution | None:
candidates = self.search_openalex(title, limit=5)
if not candidates:
candidate = _select_best_title_match(
self.search_openalex(title, limit=5),
title=title,
author_text=author_text,
year=year,
)
if candidate is None:
return None
title_norm = _normalize_match_text(title)
author_norm = _normalize_match_text(author_text)
for candidate in candidates:
candidate_title = _normalize_match_text(candidate.fields.get("title", ""))
candidate_author = _normalize_match_text(candidate.fields.get("author", ""))
candidate_year = candidate.fields.get("year", "")
if candidate_title == title_norm:
if author_norm and candidate_author and author_norm.split(" and ")[0] not in candidate_author:
continue
if year and candidate_year and year != candidate_year:
continue
return Resolution(
entry=candidate,
source_type="resolver",
source_label=f"openalex:search:{title}",
)
return Resolution(
entry=candidates[0],
source_type="resolver",
source_label=f"openalex:search:{title}",
)
def merge_entries(base: BibEntry, resolved: BibEntry) -> BibEntry:
merged, _ = merge_entries_with_conflicts(base, resolved)
return merged
def merge_entries_with_conflicts(base: BibEntry, resolved: BibEntry) -> tuple[BibEntry, list[dict[str, str]]]:
merged_fields = dict(base.fields)
conflicts: list[dict[str, str]] = []
for key, value in resolved.fields.items():
if value and (key not in merged_fields or not merged_fields[key]):
if not value:
continue
current_value = merged_fields.get(key, "")
if current_value and current_value != value:
conflicts.append(
{
"field_name": key,
"current_value": current_value,
"proposed_value": value,
}
)
continue
if key not in merged_fields or not merged_fields[key]:
merged_fields[key] = value
return BibEntry(
return (
BibEntry(
entry_type=base.entry_type or resolved.entry_type,
citation_key=base.citation_key,
fields=merged_fields,
),
conflicts,
)
@ -363,3 +445,123 @@ def _normalize_match_text(value: str) -> str:
lowered = value.lower()
lowered = re.sub(r"\W+", " ", lowered)
return " ".join(lowered.split())
def _select_best_title_match(
candidates: list[BibEntry],
title: str,
author_text: str = "",
year: str = "",
) -> BibEntry | None:
if not candidates:
return None
title_norm = _normalize_match_text(title)
author_tokens = _author_match_tokens(author_text)
year_text = str(year or "").strip()
for candidate in candidates:
candidate_title = _normalize_match_text(candidate.fields.get("title", ""))
if candidate_title != title_norm:
continue
candidate_year = str(candidate.fields.get("year", "") or "").strip()
if year_text and candidate_year and year_text != candidate_year:
continue
if author_tokens and not _candidate_matches_author_tokens(candidate, author_tokens):
continue
return candidate
return None
def _author_match_tokens(author_text: str) -> set[str]:
normalized = _normalize_match_text(author_text)
if not normalized:
return set()
tokens = {
token
for token in re.findall(r"[a-z0-9]+", normalized)
if len(token) >= 2 and token not in {"and", "et", "al"}
}
return tokens
def _candidate_matches_author_tokens(candidate: BibEntry, author_tokens: set[str]) -> bool:
candidate_author = _normalize_match_text(candidate.fields.get("author", ""))
if not candidate_author:
return False
candidate_tokens = set(re.findall(r"[a-z0-9]+", candidate_author))
return bool(author_tokens & candidate_tokens)
def _datacite_work_to_entry(data: dict) -> BibEntry:
attributes = data.get("attributes", {})
doi = str(attributes.get("doi") or "")
titles = attributes.get("titles") or []
creators = attributes.get("creators") or []
descriptions = attributes.get("descriptions") or []
publisher = str(attributes.get("publisher") or "")
year = str(attributes.get("publicationYear") or "")
url = str(attributes.get("url") or "")
types = attributes.get("types") or {}
title = titles[0].get("title", "") if titles else ""
author_names = " and ".join(_datacite_creator_name(creator) for creator in creators if _datacite_creator_name(creator))
abstract = _datacite_abstract(descriptions)
entry_type = _datacite_type_to_bibtype(str(types.get("resourceTypeGeneral") or ""))
fields: dict[str, str] = {}
if title:
fields["title"] = title
if author_names:
fields["author"] = author_names
if year:
fields["year"] = year
if doi:
fields["doi"] = doi
if url:
fields["url"] = url
elif doi:
fields["url"] = f"https://doi.org/{doi}"
if publisher:
fields["publisher"] = publisher
if abstract:
fields["abstract"] = abstract
citation_key = _make_resolution_key(author_names or "datacite", year or "n.d.", title or doi or "untitled")
return BibEntry(entry_type=entry_type, citation_key=citation_key, fields=fields)
def _datacite_creator_name(creator: dict) -> str:
family = str(creator.get("familyName") or "")
given = str(creator.get("givenName") or "")
if family and given:
return f"{family}, {given}"
return str(creator.get("name") or family or given)
def _datacite_abstract(descriptions: list[dict]) -> str:
for description in descriptions:
if str(description.get("descriptionType") or "").lower() == "abstract":
return str(description.get("description") or "")
return ""
def _datacite_type_to_bibtype(resource_type: str) -> str:
lowered = resource_type.lower()
mapping = {
"audiovisual": "misc",
"book": "book",
"bookchapter": "incollection",
"collection": "misc",
"computationalnotebook": "misc",
"conferencepaper": "inproceedings",
"dataset": "misc",
"dissertation": "phdthesis",
"image": "misc",
"journalarticle": "article",
"model": "misc",
"report": "techreport",
"software": "misc",
"text": "misc",
}
return mapping.get(lowered, "misc")

View File

@ -30,11 +30,11 @@ class SourceClient:
def get_text(self, url: str) -> str:
cached = self._read_cached(url, "txt")
if cached is not None:
return cached.decode("utf-8")
return self._decode_text(cached)
payload = self._fetch_bytes(url)
self._write_cache(url, "txt", payload)
return payload.decode("utf-8")
return self._decode_text(payload)
def get_xml(self, url: str) -> ET.Element:
cached = self._read_cached(url, "xml")
@ -76,3 +76,11 @@ class SourceClient:
self.cache_dir.mkdir(parents=True, exist_ok=True)
path = self.cache_dir / self._cache_key(url, suffix)
path.write_bytes(payload)
def _decode_text(self, payload: bytes) -> str:
for encoding in ("utf-8", "utf-8-sig", "iso-8859-1", "latin-1"):
try:
return payload.decode(encoding)
except UnicodeDecodeError:
continue
return payload.decode("utf-8", errors="replace")

View File

@ -95,6 +95,29 @@ class BibliographyStore:
PRIMARY KEY (source_entry_id, target_citation_key, relation_type)
);
CREATE TABLE IF NOT EXISTS topics (
id INTEGER PRIMARY KEY,
slug TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
source_type TEXT NOT NULL,
source_url TEXT,
expansion_phrase TEXT,
suggested_phrase TEXT,
phrase_review_status TEXT NOT NULL DEFAULT 'unreviewed',
phrase_review_notes TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS entry_topics (
entry_id INTEGER NOT NULL REFERENCES entries(id) ON DELETE CASCADE,
topic_id INTEGER NOT NULL REFERENCES topics(id) ON DELETE CASCADE,
source_label TEXT NOT NULL,
confidence REAL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (entry_id, topic_id)
);
CREATE TABLE IF NOT EXISTS field_provenance (
id INTEGER PRIMARY KEY,
entry_id INTEGER NOT NULL REFERENCES entries(id) ON DELETE CASCADE,
@ -117,10 +140,23 @@ class BibliographyStore:
confidence REAL,
recorded_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS field_conflicts (
id INTEGER PRIMARY KEY,
entry_id INTEGER NOT NULL REFERENCES entries(id) ON DELETE CASCADE,
field_name TEXT NOT NULL,
current_value TEXT,
proposed_value TEXT,
source_type TEXT NOT NULL,
source_label TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open',
recorded_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"""
)
self._ensure_entry_columns()
self._ensure_topic_columns()
if self._fts5_enabled:
self.connection.execute(
@ -177,6 +213,7 @@ class BibliographyStore:
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(citation_key) DO UPDATE SET
entry_type = excluded.entry_type,
review_status = excluded.review_status,
title = excluded.title,
year = excluded.year,
journal = excluded.journal,
@ -280,8 +317,23 @@ class BibliographyStore:
return entry_id
def search_text(self, query: str, limit: int = 10) -> list[dict[str, object]]:
def search_text(self, query: str, limit: int = 10, topic_slug: str | None = None) -> list[dict[str, object]]:
if self._fts5_enabled:
if topic_slug:
rows = self.connection.execute(
"""
SELECT DISTINCT e.citation_key, e.title, e.year, bm25(entry_text_fts) AS score
FROM entry_text_fts
JOIN entries e ON e.citation_key = entry_text_fts.citation_key
JOIN entry_topics et ON et.entry_id = e.id
JOIN topics t ON t.id = et.topic_id
WHERE entry_text_fts MATCH ? AND t.slug = ?
ORDER BY score
LIMIT ?
""",
(query, topic_slug, limit),
).fetchall()
else:
rows = self.connection.execute(
"""
SELECT e.citation_key, e.title, e.year, bm25(entry_text_fts) AS score
@ -295,6 +347,19 @@ class BibliographyStore:
).fetchall()
else:
pattern = f"%{query}%"
if topic_slug:
rows = self.connection.execute(
"""
SELECT DISTINCT e.citation_key, e.title, e.year, 0.0 AS score
FROM entries e
JOIN entry_topics et ON et.entry_id = e.id
JOIN topics t ON t.id = et.topic_id
WHERE t.slug = ? AND (e.title LIKE ? OR e.abstract LIKE ? OR e.fulltext LIKE ?)
LIMIT ?
""",
(topic_slug, pattern, pattern, pattern, limit),
).fetchall()
else:
rows = self.connection.execute(
"""
SELECT citation_key, title, year, 0.0 AS score
@ -383,7 +448,11 @@ class BibliographyStore:
"SELECT * FROM entries WHERE citation_key = ?",
(citation_key,),
).fetchone()
return self._row_to_entry_dict(row) if row else None
if row is None:
return None
payload = self._row_to_entry_dict(row)
payload["topics"] = self.get_entry_topics(citation_key)
return payload
def list_entries(self, limit: int = 50) -> list[dict[str, object]]:
rows = self.connection.execute(
@ -397,6 +466,227 @@ class BibliographyStore:
).fetchall()
return [dict(row) for row in rows]
def ensure_topic(
self,
slug: str,
name: str,
source_type: str = "manual",
source_url: str | None = None,
expansion_phrase: str | None = None,
suggested_phrase: str | None = None,
phrase_review_status: str | None = None,
phrase_review_notes: str | None = None,
) -> int:
row = self.connection.execute(
"""
INSERT INTO topics (
slug, name, source_type, source_url, expansion_phrase,
suggested_phrase, phrase_review_status, phrase_review_notes
)
VALUES (?, ?, ?, ?, ?, ?, COALESCE(?, 'unreviewed'), ?)
ON CONFLICT(slug) DO UPDATE SET
name = excluded.name,
source_type = excluded.source_type,
source_url = COALESCE(excluded.source_url, topics.source_url),
expansion_phrase = COALESCE(excluded.expansion_phrase, topics.expansion_phrase),
suggested_phrase = COALESCE(excluded.suggested_phrase, topics.suggested_phrase),
phrase_review_status = COALESCE(excluded.phrase_review_status, topics.phrase_review_status),
phrase_review_notes = COALESCE(excluded.phrase_review_notes, topics.phrase_review_notes),
updated_at = CURRENT_TIMESTAMP
RETURNING id
""",
(
slug,
name,
source_type,
source_url,
expansion_phrase,
suggested_phrase,
phrase_review_status,
phrase_review_notes,
),
).fetchone()
return int(row["id"])
def add_entry_topic(
self,
citation_key: str,
topic_slug: str,
topic_name: str,
source_type: str = "manual",
source_url: str | None = None,
source_label: str = "manual",
confidence: float = 1.0,
expansion_phrase: str | None = None,
) -> bool:
entry_row = self.connection.execute(
"SELECT id FROM entries WHERE citation_key = ?",
(citation_key,),
).fetchone()
if entry_row is None:
return False
topic_id = self.ensure_topic(
topic_slug,
topic_name,
source_type=source_type,
source_url=source_url,
expansion_phrase=expansion_phrase,
)
self.connection.execute(
"""
INSERT INTO entry_topics (entry_id, topic_id, source_label, confidence)
VALUES (?, ?, ?, ?)
ON CONFLICT(entry_id, topic_id) DO UPDATE SET
source_label = excluded.source_label,
confidence = excluded.confidence
""",
(int(entry_row["id"]), topic_id, source_label, confidence),
)
return True
def get_entry_topics(self, citation_key: str) -> list[dict[str, object]]:
rows = self.connection.execute(
"""
SELECT t.slug, t.name, t.source_type, t.source_url, et.source_label, et.confidence
FROM entry_topics et
JOIN entries e ON e.id = et.entry_id
JOIN topics t ON t.id = et.topic_id
WHERE e.citation_key = ?
ORDER BY t.name, t.slug
""",
(citation_key,),
).fetchall()
return [dict(row) for row in rows]
def list_topics(
self,
limit: int = 100,
phrase_review_status: str | None = None,
) -> list[dict[str, object]]:
where = ""
params: list[object] = []
if phrase_review_status is not None:
where = "WHERE t.phrase_review_status = ?"
params.append(phrase_review_status)
params.append(limit)
rows = self.connection.execute(
f"""
SELECT t.slug, t.name, t.source_type, t.source_url, t.expansion_phrase,
t.suggested_phrase, t.phrase_review_status, t.phrase_review_notes,
COUNT(et.entry_id) AS entry_count
FROM topics t
LEFT JOIN entry_topics et ON et.topic_id = t.id
{where}
GROUP BY t.id, t.slug, t.name, t.source_type, t.source_url, t.expansion_phrase,
t.suggested_phrase, t.phrase_review_status, t.phrase_review_notes
ORDER BY t.name, t.slug
LIMIT ?
""",
params,
).fetchall()
return [dict(row) for row in rows]
def get_topic(self, slug: str) -> dict[str, object] | None:
row = self.connection.execute(
"""
SELECT t.slug, t.name, t.source_type, t.source_url, t.expansion_phrase,
t.suggested_phrase, t.phrase_review_status, t.phrase_review_notes,
COUNT(et.entry_id) AS entry_count
FROM topics t
LEFT JOIN entry_topics et ON et.topic_id = t.id
WHERE t.slug = ?
GROUP BY t.id, t.slug, t.name, t.source_type, t.source_url, t.expansion_phrase,
t.suggested_phrase, t.phrase_review_status, t.phrase_review_notes
""",
(slug,),
).fetchone()
return dict(row) if row else None
def set_topic_expansion_phrase(self, slug: str, expansion_phrase: str | None) -> bool:
row = self.connection.execute(
"""
UPDATE topics
SET expansion_phrase = ?, updated_at = CURRENT_TIMESTAMP
WHERE slug = ?
RETURNING id
""",
(expansion_phrase, slug),
).fetchone()
self.connection.commit()
return row is not None
def stage_topic_phrase_suggestion(
self,
slug: str,
suggested_phrase: str | None,
review_status: str = "pending",
review_notes: str | None = None,
) -> bool:
row = self.connection.execute(
"""
UPDATE topics
SET suggested_phrase = ?,
phrase_review_status = ?,
phrase_review_notes = ?,
updated_at = CURRENT_TIMESTAMP
WHERE slug = ?
RETURNING id
""",
(suggested_phrase, review_status, review_notes, slug),
).fetchone()
self.connection.commit()
return row is not None
def review_topic_phrase_suggestion(
self,
slug: str,
review_status: str,
review_notes: str | None = None,
applied_phrase: str | None = None,
) -> bool:
topic = self.get_topic(slug)
if topic is None:
return False
suggested_phrase = topic.get("suggested_phrase")
expansion_phrase = topic.get("expansion_phrase")
if review_status == "accepted":
expansion_phrase = applied_phrase if applied_phrase is not None else suggested_phrase
elif applied_phrase is not None:
expansion_phrase = applied_phrase
row = self.connection.execute(
"""
UPDATE topics
SET expansion_phrase = ?,
phrase_review_status = ?,
phrase_review_notes = ?,
updated_at = CURRENT_TIMESTAMP
WHERE slug = ?
RETURNING id
""",
(expansion_phrase, review_status, review_notes, slug),
).fetchone()
self.connection.commit()
return row is not None
def list_topic_entries(self, topic_slug: str, limit: int = 100) -> list[dict[str, object]]:
rows = self.connection.execute(
"""
SELECT e.citation_key, e.entry_type, e.review_status, e.title, e.year,
t.slug AS topic_slug, t.name AS topic_name, et.source_label, et.confidence
FROM entry_topics et
JOIN topics t ON t.id = et.topic_id
JOIN entries e ON e.id = et.entry_id
WHERE t.slug = ?
ORDER BY COALESCE(e.year, ''), e.citation_key
LIMIT ?
""",
(topic_slug, limit),
).fetchall()
return [dict(row) for row in rows]
def set_entry_status(self, citation_key: str, review_status: str) -> bool:
row = self.connection.execute(
"""
@ -437,6 +727,114 @@ class BibliographyStore:
self.connection.commit()
return True
def record_conflicts(
self,
citation_key: str,
conflicts: list[dict[str, str]],
source_type: str,
source_label: str,
) -> bool:
row = self.connection.execute(
"SELECT id FROM entries WHERE citation_key = ?",
(citation_key,),
).fetchone()
if row is None:
return False
entry_id = int(row["id"])
for conflict in conflicts:
self.connection.execute(
"""
INSERT INTO field_conflicts (
entry_id, field_name, current_value, proposed_value, source_type, source_label, status
) VALUES (?, ?, ?, ?, ?, ?, 'open')
""",
(
entry_id,
conflict["field_name"],
conflict.get("current_value"),
conflict.get("proposed_value"),
source_type,
source_label,
),
)
self.connection.commit()
return True
def get_field_conflicts(self, citation_key: str, status: str | None = None) -> list[dict[str, object]]:
where = ""
params: list[object] = [citation_key]
if status is not None:
where = " AND fc.status = ?"
params.append(status)
rows = self.connection.execute(
f"""
SELECT fc.field_name, fc.current_value, fc.proposed_value, fc.source_type,
fc.source_label, fc.status, fc.recorded_at
FROM field_conflicts fc
JOIN entries e ON e.id = fc.entry_id
WHERE e.citation_key = ?{where}
ORDER BY fc.recorded_at, fc.id
""",
params,
).fetchall()
return [dict(row) for row in rows]
def set_conflict_status(self, citation_key: str, field_name: str, status: str) -> int:
row = self.connection.execute(
"SELECT id FROM entries WHERE citation_key = ?",
(citation_key,),
).fetchone()
if row is None:
return 0
entry_id = int(row["id"])
result = self.connection.execute(
"""
UPDATE field_conflicts
SET status = ?
WHERE entry_id = ? AND field_name = ? AND status = 'open'
""",
(status, entry_id, field_name),
)
self.connection.commit()
return result.rowcount
def apply_conflict_value(self, citation_key: str, field_name: str) -> bool:
row = self.connection.execute(
"""
SELECT fc.id, fc.proposed_value, e.review_status
FROM field_conflicts fc
JOIN entries e ON e.id = fc.entry_id
WHERE e.citation_key = ? AND fc.field_name = ? AND fc.status = 'open'
ORDER BY fc.recorded_at DESC, fc.id DESC
LIMIT 1
""",
(citation_key, field_name),
).fetchone()
if row is None:
return False
entry = self._load_bib_entry(citation_key)
if entry is None:
return False
proposed_value = str(row["proposed_value"] or "")
entry.fields[field_name] = proposed_value
self.upsert_entry(
entry,
raw_bibtex=_entry_to_bibtex(entry),
source_type="manual_review",
source_label=f"conflict_accept:{field_name}",
review_status=str(row["review_status"] or "draft"),
)
self.connection.execute(
"UPDATE field_conflicts SET status = 'accepted' WHERE id = ?",
(int(row["id"]),),
)
self.connection.commit()
return True
def add_relation(
self,
source_citation_key: str,
@ -651,6 +1049,37 @@ class BibliographyStore:
"ALTER TABLE entries ADD COLUMN review_status TEXT NOT NULL DEFAULT 'draft'"
)
def _ensure_topic_columns(self) -> None:
columns = {
row["name"] for row in self.connection.execute("PRAGMA table_info(topics)").fetchall()
}
if "expansion_phrase" not in columns:
try:
self.connection.execute("ALTER TABLE topics ADD COLUMN expansion_phrase TEXT")
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
raise
if "suggested_phrase" not in columns:
try:
self.connection.execute("ALTER TABLE topics ADD COLUMN suggested_phrase TEXT")
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
raise
if "phrase_review_status" not in columns:
try:
self.connection.execute(
"ALTER TABLE topics ADD COLUMN phrase_review_status TEXT NOT NULL DEFAULT 'unreviewed'"
)
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
raise
if "phrase_review_notes" not in columns:
try:
self.connection.execute("ALTER TABLE topics ADD COLUMN phrase_review_notes TEXT")
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
raise
def _record_field_provenance(
self,
entry_id: int,

1485
src/citegeist/talkorigins.py Normal file

File diff suppressed because it is too large Load Diff

129
tests/test_batch.py Normal file
View File

@ -0,0 +1,129 @@
from pathlib import Path
from citegeist.batch import BatchBootstrapRunner, load_batch_jobs
from citegeist.cli import main
from citegeist.storage import BibliographyStore
def test_load_batch_jobs_accepts_object_with_jobs(tmp_path: Path):
path = tmp_path / "jobs.json"
path.write_text(
"""
{
"jobs": [
{"name": "topic-only", "topic": "graph topic"},
{"name": "seed-only", "seed_bib": "seed.bib"}
]
}
""",
encoding="utf-8",
)
jobs = load_batch_jobs(path)
assert jobs[0]["name"] == "topic-only"
assert jobs[1]["seed_bib"] == str((tmp_path / "seed.bib").resolve())
def test_batch_runner_executes_multiple_jobs(tmp_path: Path):
seed_bib = tmp_path / "seed.bib"
seed_bib.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
jobs = [
{"name": "seed-job", "seed_bib": str(seed_bib), "expand": False},
{"name": "topic-job", "topic": "graph topic", "expand": False, "preview": True},
]
runner = BatchBootstrapRunner()
from citegeist import BibEntry
runner.bootstrapper.resolver.search_openalex = lambda topic, limit=5: [ # type: ignore[method-assign]
BibEntry(entry_type="article", citation_key="topic2024graph", fields={"title": "Graph Topic Result"})
]
runner.bootstrapper.resolver.search_crossref = lambda topic, limit=5: [] # type: ignore[method-assign]
runner.bootstrapper.resolver.search_datacite = lambda topic, limit=5: [] # type: ignore[method-assign]
store = BibliographyStore()
try:
results = runner.run(store, jobs)
assert [job.job_name for job in results] == ["seed-job", "topic-job"]
assert results[0].result_count == 1
assert results[1].results[0].citation_key == "topic2024graph"
assert store.get_entry("seed2024") is not None
assert store.get_entry("topic2024graph") is None
finally:
store.close()
def test_batch_runner_can_store_topic_phrase_metadata():
jobs = [
{
"name": "topic-job",
"topic": "graph topic",
"topic_slug": "graph-methods",
"topic_name": "Graph Methods",
"topic_phrase": "graph networks biology",
"expand": False,
"preview": False,
}
]
runner = BatchBootstrapRunner()
from citegeist import BibEntry
runner.bootstrapper.resolver.search_openalex = lambda topic, limit=5: [ # type: ignore[method-assign]
BibEntry(entry_type="article", citation_key="topic2024graph", fields={"title": "Graph Topic Result"})
]
runner.bootstrapper.resolver.search_crossref = lambda topic, limit=5: [] # type: ignore[method-assign]
runner.bootstrapper.resolver.search_datacite = lambda topic, limit=5: [] # type: ignore[method-assign]
store = BibliographyStore()
try:
runner.run(store, jobs)
topic = store.get_topic("graph-methods")
assert topic is not None
assert topic["name"] == "Graph Methods"
assert topic["expansion_phrase"] == "graph networks biology"
finally:
store.close()
def test_bootstrap_batch_cli_runs_json_jobs(tmp_path: Path):
seed_bib = tmp_path / "seed.bib"
seed_bib.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
batch_json = tmp_path / "jobs.json"
batch_json.write_text(
f"""
[
{{"name": "seed-job", "seed_bib": "{seed_bib}", "expand": false}},
{{"name": "topic-job", "topic": "graph topic", "expand": false, "preview": true}}
]
""",
encoding="utf-8",
)
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
with patch("citegeist.cli.BatchBootstrapRunner.run") as mocked_run:
mocked_run.return_value = []
exit_code = main(["--db", str(database), "bootstrap-batch", str(batch_json)])
assert exit_code == 0

175
tests/test_bootstrap.py Normal file
View File

@ -0,0 +1,175 @@
from citegeist import BibliographyStore
from citegeist.bootstrap import Bootstrapper
from citegeist.cli import main
def test_bootstrap_from_seed_bib_only():
store = BibliographyStore()
try:
bootstrapper = Bootstrapper()
bootstrapper.crossref_expander.expand_entry_references = lambda _store, _key: [] # type: ignore[method-assign]
bootstrapper.openalex_expander.expand_entry = lambda _store, _key, relation_type="cites", limit=5: [] # type: ignore[method-assign]
results = bootstrapper.bootstrap(
store,
seed_bibtex="""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
expand=False,
)
assert [item.citation_key for item in results] == ["seed2024"]
assert store.get_entry("seed2024") is not None
finally:
store.close()
def test_bootstrap_from_topic_only():
store = BibliographyStore()
try:
bootstrapper = Bootstrapper()
bootstrapper.resolver.search_openalex = lambda topic, limit=5: [] # type: ignore[method-assign]
bootstrapper.resolver.search_crossref = lambda topic, limit=5: [] # type: ignore[method-assign]
bootstrapper.resolver.search_datacite = lambda topic, limit=5: [ # type: ignore[method-assign]
__import__("citegeist").BibEntry(
entry_type="article",
citation_key="topic2024graph",
fields={"title": "Graph Topic Result", "year": "2024"},
)
]
bootstrapper.crossref_expander.expand_entry_references = lambda _store, _key: [] # type: ignore[method-assign]
bootstrapper.openalex_expander.expand_entry = lambda _store, _key, relation_type="cites", limit=5: [] # type: ignore[method-assign]
results = bootstrapper.bootstrap(store, topic="graph topic", expand=False)
assert [item.citation_key for item in results] == ["topic2024graph"]
assert store.get_entry("topic2024graph") is not None
assert results[0].score > 0
finally:
store.close()
def test_bootstrap_cli_accepts_seed_and_topic(tmp_path):
seed_bib = tmp_path / "seed.bib"
seed_bib.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
with patch("citegeist.cli.Bootstrapper.bootstrap") as mocked_bootstrap:
mocked_bootstrap.return_value = []
exit_code = main(
[
"--db",
str(database),
"bootstrap",
"--seed-bib",
str(seed_bib),
"--topic",
"graph topic",
"--no-expand",
]
)
assert exit_code == 0
def test_bootstrap_ranks_and_deduplicates_topic_candidates():
store = BibliographyStore()
try:
bootstrapper = Bootstrapper()
from citegeist import BibEntry
bootstrapper.resolver.search_openalex = lambda topic, limit=5: [ # type: ignore[method-assign]
BibEntry(
entry_type="article",
citation_key="shared2024graph",
fields={"title": "Graph Topic Ranking", "abstract": "graph topic graph"},
)
]
bootstrapper.resolver.search_crossref = lambda topic, limit=5: [ # type: ignore[method-assign]
BibEntry(
entry_type="article",
citation_key="shared2024graph",
fields={"title": "Graph Topic Ranking", "abstract": "graph"},
),
BibEntry(
entry_type="article",
citation_key="crossref2024other",
fields={"title": "Less relevant paper"},
),
]
bootstrapper.resolver.search_datacite = lambda topic, limit=5: [] # type: ignore[method-assign]
bootstrapper.crossref_expander.expand_entry_references = lambda _store, _key: [] # type: ignore[method-assign]
bootstrapper.openalex_expander.expand_entry = lambda _store, _key, relation_type="cites", limit=5: [] # type: ignore[method-assign]
results = bootstrapper.bootstrap(store, topic="graph topic", expand=False, topic_limit=5)
topic_results = [item for item in results if item.origin == "topic"]
assert [item.citation_key for item in topic_results] == ["shared2024graph", "crossref2024other"]
assert topic_results[0].score > topic_results[1].score
finally:
store.close()
def test_bootstrap_preview_does_not_write_to_database():
store = BibliographyStore()
try:
bootstrapper = Bootstrapper()
from citegeist import BibEntry
bootstrapper.resolver.search_openalex = lambda topic, limit=5: [ # type: ignore[method-assign]
BibEntry(entry_type="article", citation_key="preview2024graph", fields={"title": "Preview Graph Topic"})
]
bootstrapper.resolver.search_crossref = lambda topic, limit=5: [] # type: ignore[method-assign]
bootstrapper.resolver.search_datacite = lambda topic, limit=5: [] # type: ignore[method-assign]
results = bootstrapper.bootstrap(store, topic="graph topic", expand=False, preview_only=True)
assert [item.citation_key for item in results] == ["preview2024graph"]
assert store.get_entry("preview2024graph") is None
finally:
store.close()
def test_bootstrap_topic_commit_limit_restricts_persisted_candidates():
store = BibliographyStore()
try:
bootstrapper = Bootstrapper()
from citegeist import BibEntry
bootstrapper.resolver.search_openalex = lambda topic, limit=5: [ # type: ignore[method-assign]
BibEntry(entry_type="article", citation_key="rank1", fields={"title": "Graph Topic One"}),
BibEntry(entry_type="article", citation_key="rank2", fields={"title": "Graph Topic Two"}),
]
bootstrapper.resolver.search_crossref = lambda topic, limit=5: [] # type: ignore[method-assign]
bootstrapper.resolver.search_datacite = lambda topic, limit=5: [] # type: ignore[method-assign]
bootstrapper.crossref_expander.expand_entry_references = lambda _store, _key: [] # type: ignore[method-assign]
bootstrapper.openalex_expander.expand_entry = lambda _store, _key, relation_type="cites", limit=5: [] # type: ignore[method-assign]
results = bootstrapper.bootstrap(
store,
topic="graph topic",
expand=False,
topic_limit=5,
topic_commit_limit=1,
)
assert [item.citation_key for item in results if item.origin == "topic"] == ["rank1"]
assert store.get_entry("rank1") is not None
assert store.get_entry("rank2") is None
finally:
store.close()

View File

@ -119,7 +119,7 @@ def test_cli_resolve_updates_entry(tmp_path: Path):
citation_key="resolvedkey",
fields={
"author": "Smith, Jane",
"title": "Graph-first bibliography augmentation",
"title": "Resolved Graph-first bibliography augmentation",
"year": "2024",
"doi": "10.1000/example-doi",
"journal": "Journal of Graph Studies",
@ -138,6 +138,803 @@ def test_cli_resolve_updates_entry(tmp_path: Path):
)
assert exit_code == 0
show = run_cli(tmp_path, "show", "--conflicts", "smith2024graphs")
assert show.returncode == 0
payload = json.loads(show.stdout)
assert payload["field_conflicts"][0]["field_name"] == "title"
def test_cli_resolve_conflicts_updates_status(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{smith2024graphs,
author = {Smith, Jane},
title = {Graph-first bibliography augmentation},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.record_conflicts(
"smith2024graphs",
[
{
"field_name": "title",
"current_value": "Graph-first bibliography augmentation",
"proposed_value": "Resolved title",
}
],
source_type="resolver",
source_label="openalex:search:Graph-first bibliography augmentation",
)
finally:
store.close()
result = run_cli(tmp_path, "resolve-conflicts", "smith2024graphs", "title", "accepted")
assert result.returncode == 0
assert "accepted" in result.stdout
def test_cli_apply_conflict_updates_entry_value(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{smith2024graphs,
author = {Smith, Jane},
title = {Graph-first bibliography augmentation},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.record_conflicts(
"smith2024graphs",
[
{
"field_name": "title",
"current_value": "Graph-first bibliography augmentation",
"proposed_value": "Resolved Graph-first bibliography augmentation",
}
],
source_type="resolver",
source_label="openalex:search:Graph-first bibliography augmentation",
)
finally:
store.close()
result = run_cli(tmp_path, "apply-conflict", "smith2024graphs", "title")
assert result.returncode == 0
assert "applied" in result.stdout
show = run_cli(tmp_path, "show", "smith2024graphs")
payload = json.loads(show.stdout)
assert payload["title"] == "Resolved Graph-first bibliography augmentation"
def test_cli_discover_oai_outputs_identity_and_sets():
from unittest.mock import patch
from citegeist.harvest import OaiMetadataFormat, OaiSet
with patch("citegeist.cli.OaiPmhHarvester.identify") as mocked_identify, patch(
"citegeist.cli.OaiPmhHarvester.list_sets"
) as mocked_sets, patch("citegeist.cli.OaiPmhHarvester.list_metadata_formats") as mocked_formats:
mocked_identify.return_value = {
"repositoryName": "Example Repository",
"granularity": "YYYY-MM-DD",
}
mocked_formats.return_value = [
OaiMetadataFormat(
metadata_prefix="oai_dc",
schema="http://www.openarchives.org/OAI/2.0/oai_dc.xsd",
metadata_namespace="http://www.openarchives.org/OAI/2.0/oai_dc/",
)
]
mocked_sets.return_value = [
OaiSet(set_spec="theses", set_name="Theses", set_description="Graduate theses")
]
exit_code = main(["discover-oai", "https://example.edu/oai"])
assert exit_code == 0
def test_cli_bootstrap_preview_mode(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
with patch("citegeist.cli.Bootstrapper.bootstrap") as mocked_bootstrap:
mocked_bootstrap.return_value = []
exit_code = main(
[
"--db",
str(database),
"bootstrap",
"--topic",
"graph topic",
"--preview",
"--topic-commit-limit",
"2",
]
)
assert exit_code == 0
_, kwargs = mocked_bootstrap.call_args
assert kwargs["preview_only"] is True
assert kwargs["topic_commit_limit"] == 2
def test_cli_bootstrap_accepts_stored_topic_metadata(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
with patch("citegeist.cli.Bootstrapper.bootstrap") as mocked_bootstrap:
mocked_bootstrap.return_value = []
exit_code = main(
[
"--db",
str(database),
"bootstrap",
"--topic",
"graph topic",
"--topic-slug",
"graph-methods",
"--topic-name",
"Graph Methods",
"--store-topic-phrase",
"graph networks biology",
]
)
assert exit_code == 0
_, kwargs = mocked_bootstrap.call_args
assert kwargs["topic_slug"] == "graph-methods"
assert kwargs["topic_name"] == "Graph Methods"
assert kwargs["topic_phrase"] == "graph networks biology"
def test_cli_scrape_talkorigins_accepts_output_dir(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
with patch("citegeist.cli.TalkOriginsScraper.scrape_to_directory") as mocked_scrape:
mocked_scrape.return_value = __import__("citegeist").TalkOriginsBatchExport(
base_url="https://www.talkorigins.org/origins/biblio/",
output_dir=str(tmp_path),
topic_count=1,
entry_count=2,
jobs_path=str(tmp_path / "jobs.json"),
manifest_path=str(tmp_path / "manifest.json"),
seed_sets=[],
)
exit_code = main(
[
"--db",
str(database),
"scrape-talkorigins",
str(tmp_path / "talkorigins-out"),
"--limit-topics",
"3",
"--limit-entries-per-topic",
"10",
"--no-resume",
"--no-expand",
]
)
assert exit_code == 0
def test_cli_validate_talkorigins_accepts_manifest(tmp_path):
from unittest.mock import patch
manifest = tmp_path / "talkorigins_manifest.json"
manifest.write_text("{}", encoding="utf-8")
with patch("citegeist.cli.TalkOriginsScraper.validate_export") as mocked_validate:
mocked_validate.return_value = __import__("citegeist").TalkOriginsValidationReport(
manifest_path=str(manifest),
topic_count=1,
entry_count=2,
parsed_ratio=1.0,
missing_author_count=0,
missing_title_count=0,
missing_year_count=0,
suspicious_entry_type_count=0,
suspicious_examples=[],
duplicate_cluster_count=0,
duplicate_entry_count=0,
duplicate_examples=[],
)
exit_code = main(["validate-talkorigins", str(manifest)])
assert exit_code == 0
def test_cli_suggest_talkorigins_phrases_writes_output(tmp_path):
from unittest.mock import patch
manifest = tmp_path / "talkorigins_manifest.json"
manifest.write_text("{}", encoding="utf-8")
output = tmp_path / "phrases.json"
with patch("citegeist.cli.TalkOriginsScraper.suggest_topic_phrases") as mocked_suggest:
mocked_suggest.return_value = [
__import__("citegeist", fromlist=["TalkOriginsTopicPhraseSuggestion"]).TalkOriginsTopicPhraseSuggestion(
slug="abiogenesis",
topic="Abiogenesis",
entry_count=2,
suggested_phrase="Abiogenesis prebiotic chemistry ribozyme",
keywords=["prebiotic", "chemistry", "ribozyme"],
review_required=True,
review_reasons=["small_topic"],
)
]
exit_code = main(
[
"suggest-talkorigins-phrases",
str(manifest),
"--topic",
"abiogenesis",
"--output",
str(output),
]
)
assert exit_code == 0
payload = json.loads(output.read_text(encoding="utf-8"))
assert payload[0]["slug"] == "abiogenesis"
def test_cli_duplicates_talkorigins_accepts_manifest(tmp_path):
from unittest.mock import patch
manifest = tmp_path / "talkorigins_manifest.json"
manifest.write_text("{}", encoding="utf-8")
with patch("citegeist.cli.TalkOriginsScraper.inspect_duplicate_clusters") as mocked_duplicates:
mocked_duplicates.return_value = [
__import__("citegeist.talkorigins", fromlist=["TalkOriginsDuplicateCluster"]).TalkOriginsDuplicateCluster(
key="smith|1999|duplicate paper",
count=2,
items=[
{
"citation_key": "dup1",
"title": "Duplicate Paper",
"author": "Smith, Jane",
"year": "1999",
"seed_bib": "a.bib",
"topic": "Abiogenesis",
"topic_slug": "abiogenesis",
}
],
canonical={
"citation_key": "dup1",
"entry_type": "article",
"field_count": 3,
"fields": {"title": "Duplicate Paper"},
"weak_reasons": [],
},
)
]
exit_code = main(
[
"duplicates-talkorigins",
str(manifest),
"--topic",
"abiogenesis",
"--match",
"duplicate",
"--preview",
"--weak-only",
]
)
assert exit_code == 0
def test_cli_ingest_talkorigins_accepts_manifest(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
manifest = tmp_path / "talkorigins_manifest.json"
manifest.write_text("{}", encoding="utf-8")
with patch("citegeist.cli.TalkOriginsScraper.ingest_export") as mocked_ingest:
mocked_ingest.return_value = __import__("citegeist").TalkOriginsIngestReport(
manifest_path=str(manifest),
topic_count=1,
raw_entry_count=2,
stored_entry_count=1,
duplicate_cluster_count=1,
duplicate_entry_count=2,
canonicalized_count=1,
)
exit_code = main(["--db", str(database), "ingest-talkorigins", str(manifest)])
assert exit_code == 0
def test_cli_enrich_talkorigins_accepts_manifest(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
manifest = tmp_path / "talkorigins_manifest.json"
manifest.write_text("{}", encoding="utf-8")
with patch("citegeist.cli.TalkOriginsScraper.enrich_weak_canonicals") as mocked_enrich:
mocked_enrich.return_value = [
__import__("citegeist.talkorigins", fromlist=["TalkOriginsEnrichmentResult"]).TalkOriginsEnrichmentResult(
key="smith|1999|duplicate paper",
citation_key="dup1",
weak_reasons_before=["missing:doi"],
resolved=True,
applied=False,
source_label="crossref:search:Duplicate Paper",
weak_reasons_after=[],
conflicts=[],
error="",
)
]
exit_code = main(
[
"--db",
str(database),
"enrich-talkorigins",
str(manifest),
"--limit",
"5",
"--apply",
"--allow-unsafe-search-matches",
]
)
assert exit_code == 0
def test_cli_review_talkorigins_writes_output(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
manifest = tmp_path / "talkorigins_manifest.json"
manifest.write_text("{}", encoding="utf-8")
output = tmp_path / "review.json"
with patch("citegeist.cli.TalkOriginsScraper.build_review_export") as mocked_review:
mocked_review.return_value = __import__("citegeist.talkorigins", fromlist=["TalkOriginsReviewExport"]).TalkOriginsReviewExport(
manifest_path=str(manifest),
item_count=1,
items=[{"key": "smith|1999|duplicate paper", "canonical": {}, "enrichment": {}}],
)
exit_code = main(
[
"--db",
str(database),
"review-talkorigins",
str(manifest),
"--output",
str(output),
]
)
assert exit_code == 0
assert output.exists()
def test_cli_apply_talkorigins_corrections_accepts_files(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
manifest = tmp_path / "talkorigins_manifest.json"
corrections = tmp_path / "corrections.json"
manifest.write_text("{}", encoding="utf-8")
corrections.write_text('{"corrections": []}', encoding="utf-8")
with patch("citegeist.cli.TalkOriginsScraper.apply_review_corrections") as mocked_apply:
mocked_apply.return_value = [
__import__("citegeist.talkorigins", fromlist=["TalkOriginsCorrectionResult"]).TalkOriginsCorrectionResult(
key="smith|1999|duplicate paper",
citation_key="dup1",
applied=True,
error="",
)
]
exit_code = main(
[
"--db",
str(database),
"apply-talkorigins-corrections",
str(manifest),
str(corrections),
]
)
assert exit_code == 0
def test_cli_topics_and_topic_entries(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.connection.commit()
finally:
store.close()
topics = run_cli(tmp_path, "topics")
assert topics.returncode == 0
topics_payload = json.loads(topics.stdout)
assert topics_payload[0]["slug"] == "graph-methods"
topic_entries = run_cli(tmp_path, "topic-entries", "graph-methods")
assert topic_entries.returncode == 0
topic_payload = json.loads(topic_entries.stdout)
assert topic_payload["topic"]["slug"] == "graph-methods"
assert topic_payload["entries"][0]["citation_key"] == "seed2024"
def test_cli_can_set_topic_phrase(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.connection.commit()
finally:
store.close()
result = run_cli(tmp_path, "set-topic-phrase", "graph-methods", "graph networks biology")
assert result.returncode == 0
payload = json.loads(result.stdout)
assert payload["expansion_phrase"] == "graph networks biology"
def test_cli_can_apply_topic_phrases_from_json(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.connection.commit()
finally:
store.close()
phrases_path = tmp_path / "phrases.json"
phrases_path.write_text(
json.dumps(
[
{
"slug": "graph-methods",
"suggested_phrase": "graph networks biology",
}
]
),
encoding="utf-8",
)
result = run_cli(tmp_path, "apply-topic-phrases", str(phrases_path))
assert result.returncode == 0
payload = json.loads(result.stdout)
assert payload[0]["applied"] is True
check = run_cli(tmp_path, "topics")
topics_payload = json.loads(check.stdout)
assert topics_payload[0]["expansion_phrase"] == "graph networks biology"
def test_cli_can_stage_topic_phrases_from_json(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.connection.commit()
finally:
store.close()
phrases_path = tmp_path / "phrases.json"
phrases_path.write_text(
json.dumps(
[
{
"slug": "graph-methods",
"suggested_phrase": "graph networks biology",
}
]
),
encoding="utf-8",
)
result = run_cli(tmp_path, "stage-topic-phrases", str(phrases_path))
assert result.returncode == 0
payload = json.loads(result.stdout)
assert payload[0]["staged"] is True
assert payload[0]["phrase_review_status"] == "pending"
check = run_cli(tmp_path, "topics")
topics_payload = json.loads(check.stdout)
assert topics_payload[0]["suggested_phrase"] == "graph networks biology"
assert topics_payload[0]["expansion_phrase"] is None
assert topics_payload[0]["phrase_review_status"] == "pending"
def test_cli_can_review_topic_phrase(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.stage_topic_phrase_suggestion("graph-methods", "graph networks biology")
finally:
store.close()
result = run_cli(
tmp_path,
"review-topic-phrase",
"graph-methods",
"accepted",
"--notes",
"curated and approved",
)
assert result.returncode == 0
payload = json.loads(result.stdout)
assert payload["suggested_phrase"] == "graph networks biology"
assert payload["expansion_phrase"] == "graph networks biology"
assert payload["phrase_review_status"] == "accepted"
assert payload["phrase_review_notes"] == "curated and approved"
def test_cli_topics_can_filter_by_phrase_review_status(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.ensure_topic("abiogenesis", "Abiogenesis")
store.stage_topic_phrase_suggestion("graph-methods", "graph networks biology")
store.stage_topic_phrase_suggestion("abiogenesis", "abiogenesis life origin")
store.review_topic_phrase_suggestion("abiogenesis", "accepted")
finally:
store.close()
result = run_cli(tmp_path, "topics", "--phrase-review-status", "pending")
assert result.returncode == 0
payload = json.loads(result.stdout)
assert [topic["slug"] for topic in payload] == ["graph-methods"]
def test_cli_export_topic(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
store.connection.commit()
finally:
store.close()
export_path = tmp_path / "graph-methods.bib"
result = run_cli(tmp_path, "export-topic", "graph-methods", "--output", str(export_path))
assert result.returncode == 0
exported = export_path.read_text(encoding="utf-8")
assert "@article{seed2024," in exported
def test_cli_search_can_filter_by_topic(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Graph Methods for Biology},
year = {2024},
abstract = {A graph methods paper.}
}
@article{other2023,
author = {Other, Bob},
title = {Graph Methods for Chemistry},
year = {2023},
abstract = {Another graph methods paper.}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.storage import BibliographyStore
database = tmp_path / "library.sqlite3"
store = BibliographyStore(database)
try:
store.add_entry_topic(
"seed2024",
topic_slug="biology",
topic_name="Biology",
source_type="talkorigins",
source_url="https://example.org/topics/biology",
source_label="topic-seed",
)
store.add_entry_topic(
"other2023",
topic_slug="chemistry",
topic_name="Chemistry",
source_type="talkorigins",
source_url="https://example.org/topics/chemistry",
source_label="topic-seed",
)
store.connection.commit()
finally:
store.close()
search = run_cli(tmp_path, "search", "graph", "--topic", "biology")
assert search.returncode == 0
assert "seed2024" in search.stdout
assert "other2023" not in search.stdout
def test_cli_graph_outputs_missing_targets(tmp_path: Path):
@ -239,3 +1036,43 @@ def test_cli_expand_with_mocked_openalex(tmp_path: Path):
)
assert exit_code == 0
def test_cli_expand_topic_with_mocked_expander(tmp_path: Path):
from citegeist.expand import TopicExpansionResult
with patch("citegeist.cli.TopicExpander.expand_topic") as mocked_expand:
mocked_expand.return_value = [
TopicExpansionResult(
topic_slug="abiogenesis",
source_citation_key="seed2024",
discovered_citation_key="discovered1",
discovered_title="Abiogenesis origin chemistry",
created_entry=True,
relation_type="cites",
source_label="openalex:cites:seed2024",
relevance_score=0.67,
meets_relevance_threshold=True,
assigned_to_topic=True,
)
]
database = tmp_path / "library.sqlite3"
exit_code = main(
[
"--db",
str(database),
"expand-topic",
"abiogenesis",
"--topic-phrase",
"abiogenesis origin chemistry",
"--seed-key",
"seed2024",
"--min-relevance",
"0.3",
"--preview",
]
)
assert exit_code == 0
_, kwargs = mocked_expand.call_args
assert kwargs["preview_only"] is True

293
tests/test_harvest.py Normal file
View File

@ -0,0 +1,293 @@
from citegeist import OaiPmhHarvester, parse_bibtex
from citegeist.cli import main
OAI_XML = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/"
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<ListRecords>
<record>
<header>
<identifier>oai:example.edu:123</identifier>
</header>
<metadata>
<oai_dc:dc>
<dc:title>Thesis Metadata Harvesting</dc:title>
<dc:creator>Doe, Jane</dc:creator>
<dc:date>2023-05-01</dc:date>
<dc:description>A dissertation about repository harvesting.</dc:description>
<dc:identifier>https://example.edu/items/123</dc:identifier>
<dc:publisher>Example University</dc:publisher>
<dc:type>Text</dc:type>
<dc:type>Dissertation</dc:type>
</oai_dc:dc>
</metadata>
</record>
</ListRecords>
</OAI-PMH>
"""
OAI_XML_PAGE_1 = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/"
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<ListRecords>
<record>
<header>
<identifier>oai:example.edu:123</identifier>
</header>
<metadata>
<oai_dc:dc>
<dc:title>First Harvested Thesis</dc:title>
<dc:creator>Doe, Jane</dc:creator>
<dc:date>2023-05-01</dc:date>
<dc:type>Dissertation</dc:type>
</oai_dc:dc>
</metadata>
</record>
<resumptionToken>TOKEN123</resumptionToken>
</ListRecords>
</OAI-PMH>
"""
OAI_XML_PAGE_2 = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/"
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<ListRecords>
<record>
<header>
<identifier>oai:example.edu:456</identifier>
</header>
<metadata>
<oai_dc:dc>
<dc:title>Second Harvested Thesis</dc:title>
<dc:creator>Smith, John</dc:creator>
<dc:date>2022-05-01</dc:date>
<dc:type>Dissertation</dc:type>
</oai_dc:dc>
</metadata>
</record>
</ListRecords>
</OAI-PMH>
"""
OAI_IDENTIFY_XML = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/">
<Identify>
<repositoryName>Example Repository</repositoryName>
<baseURL>https://example.edu/oai</baseURL>
<protocolVersion>2.0</protocolVersion>
<adminEmail>repo@example.edu</adminEmail>
<earliestDatestamp>2001-01-01</earliestDatestamp>
<deletedRecord>persistent</deletedRecord>
<granularity>YYYY-MM-DD</granularity>
</Identify>
</OAI-PMH>
"""
OAI_LISTSETS_XML = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/">
<ListSets>
<set>
<setSpec>theses</setSpec>
<setName>Theses and Dissertations</setName>
<setDescription>
<description>This set contains graduate theses.</description>
</setDescription>
</set>
</ListSets>
</OAI-PMH>
"""
OAI_METADATA_FORMATS_XML = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/">
<ListMetadataFormats>
<metadataFormat>
<metadataPrefix>oai_dc</metadataPrefix>
<schema>http://www.openarchives.org/OAI/2.0/oai_dc.xsd</schema>
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
</metadataFormat>
<metadataFormat>
<metadataPrefix>mods</metadataPrefix>
<schema>http://www.loc.gov/standards/mods/v3/mods-3-7.xsd</schema>
<metadataNamespace>http://www.loc.gov/mods/v3</metadataNamespace>
</metadataFormat>
</ListMetadataFormats>
</OAI-PMH>
"""
OAI_MODS_XML = """<?xml version="1.0" encoding="UTF-8"?>
<OAI-PMH xmlns="http://www.openarchives.org/OAI/2.0/"
xmlns:mods="http://www.loc.gov/mods/v3">
<ListRecords>
<record>
<header>
<identifier>oai:example.edu:mods123</identifier>
</header>
<metadata>
<mods:mods>
<mods:titleInfo>
<mods:title>MODS Thesis Title</mods:title>
</mods:titleInfo>
<mods:name>
<mods:namePart>Doe</mods:namePart>
<mods:namePart>Jane</mods:namePart>
<mods:role>
<mods:roleTerm>author</mods:roleTerm>
</mods:role>
</mods:name>
<mods:originInfo>
<mods:publisher>Example University</mods:publisher>
<mods:dateIssued>2022</mods:dateIssued>
</mods:originInfo>
<mods:genre>dissertation</mods:genre>
<mods:abstract>MODS abstract text.</mods:abstract>
<mods:location>
<mods:url>https://example.edu/mods123</mods:url>
</mods:location>
</mods:mods>
</metadata>
</record>
</ListRecords>
</OAI-PMH>
"""
def test_oai_harvester_maps_dublin_core_to_bibentry():
harvester = OaiPmhHarvester()
harvester.source_client.get_xml = lambda _url: __import__("xml.etree.ElementTree").etree.ElementTree.fromstring(OAI_XML) # type: ignore[method-assign]
results = harvester.list_records("https://example.edu/oai")
assert len(results) == 1
entry = results[0].entry
assert entry.entry_type == "phdthesis"
assert entry.fields["title"] == "Thesis Metadata Harvesting"
assert entry.fields["author"] == "Doe, Jane"
assert entry.fields["oai"] == "oai:example.edu:123"
def test_oai_harvester_follows_resumption_tokens():
harvester = OaiPmhHarvester()
from xml.etree import ElementTree as ET
payloads = iter([ET.fromstring(OAI_XML_PAGE_1), ET.fromstring(OAI_XML_PAGE_2)])
harvester.source_client.get_xml = lambda _url: next(payloads) # type: ignore[method-assign]
results = harvester.list_records("https://example.edu/oai")
assert [result.identifier for result in results] == [
"oai:example.edu:123",
"oai:example.edu:456",
]
assert [result.entry.citation_key for result in results] == [
"doe2023first1",
"smith2022second2",
]
def test_oai_harvester_passes_date_filters():
harvester = OaiPmhHarvester()
seen_urls: list[str] = []
from xml.etree import ElementTree as ET
def fake_get_xml(url: str):
seen_urls.append(url)
return ET.fromstring(OAI_XML)
harvester.source_client.get_xml = fake_get_xml # type: ignore[method-assign]
harvester.list_records(
"https://example.edu/oai",
date_from="2023-01-01",
date_until="2023-12-31",
limit=1,
)
assert "from=2023-01-01" in seen_urls[0]
assert "until=2023-12-31" in seen_urls[0]
def test_oai_harvester_maps_mods_records():
harvester = OaiPmhHarvester()
from xml.etree import ElementTree as ET
harvester.source_client.get_xml = lambda _url: ET.fromstring(OAI_MODS_XML) # type: ignore[method-assign]
results = harvester.list_records("https://example.edu/oai", metadata_prefix="mods")
assert len(results) == 1
entry = results[0].entry
assert entry.entry_type == "phdthesis"
assert entry.fields["title"] == "MODS Thesis Title"
assert entry.fields["author"] == "Doe, Jane"
assert entry.fields["publisher"] == "Example University"
assert entry.fields["abstract"] == "MODS abstract text."
def test_oai_harvester_can_identify_repository_and_list_sets():
harvester = OaiPmhHarvester()
from xml.etree import ElementTree as ET
payloads = iter(
[ET.fromstring(OAI_IDENTIFY_XML), ET.fromstring(OAI_LISTSETS_XML), ET.fromstring(OAI_METADATA_FORMATS_XML)]
)
harvester.source_client.get_xml = lambda _url: next(payloads) # type: ignore[method-assign]
identify = harvester.identify("https://example.edu/oai")
sets = harvester.list_sets("https://example.edu/oai")
formats = harvester.list_metadata_formats("https://example.edu/oai")
assert identify["repositoryName"] == "Example Repository"
assert identify["granularity"] == "YYYY-MM-DD"
assert sets[0].set_spec == "theses"
assert sets[0].set_name == "Theses and Dissertations"
assert "graduate theses" in sets[0].set_description
assert [item.metadata_prefix for item in formats] == ["oai_dc", "mods"]
def test_harvest_oai_cli_ingests_records(tmp_path):
from unittest.mock import patch
database = tmp_path / "library.sqlite3"
harvester = OaiPmhHarvester()
from xml.etree import ElementTree as ET
harvester.source_client.get_xml = lambda _url: ET.fromstring(OAI_XML) # type: ignore[method-assign]
harvested = harvester.list_records("https://example.edu/oai")
with patch("citegeist.cli.OaiPmhHarvester.list_records") as mocked_list:
mocked_list.return_value = harvested
exit_code = main(
[
"--db",
str(database),
"harvest-oai",
"https://example.edu/oai",
"--metadata-prefix",
"oai_dc",
"--from",
"2023-01-01",
"--until",
"2023-12-31",
"--limit",
"5",
]
)
assert exit_code == 0
from citegeist.storage import BibliographyStore
store = BibliographyStore(database)
try:
entry = store.list_entries(limit=10)[0]
assert entry["citation_key"] == "doe2023thesis1"
bibtex = store.get_entry_bibtex("doe2023thesis1")
parsed = parse_bibtex(bibtex or "")
assert parsed[0].fields["oai"] == "oai:example.edu:123"
finally:
store.close()

View File

@ -1,11 +1,13 @@
from xml.etree import ElementTree as ET
from citegeist.bibtex import BibEntry
from citegeist.bibtex import BibEntry, render_bibtex
from citegeist.resolve import (
MetadataResolver,
_arxiv_atom_entry_to_bib,
_crossref_message_to_entry,
_datacite_work_to_entry,
_openalex_work_to_entry,
merge_entries_with_conflicts,
merge_entries,
)
@ -65,6 +67,31 @@ def test_merge_entries_prefers_existing_values_and_adds_missing_fields():
assert merged.fields["journal"] == "Journal of Graph Studies"
def test_merge_entries_with_conflicts_records_disagreements():
base = BibEntry(
entry_type="article",
citation_key="smith2024graphs",
fields={"title": "Existing Title", "journal": "Current Journal"},
)
resolved = BibEntry(
entry_type="article",
citation_key="resolved",
fields={"title": "Resolved Title", "journal": "Current Journal", "year": "2024"},
)
merged, conflicts = merge_entries_with_conflicts(base, resolved)
assert merged.fields["title"] == "Existing Title"
assert merged.fields["year"] == "2024"
assert conflicts == [
{
"field_name": "title",
"current_value": "Existing Title",
"proposed_value": "Resolved Title",
}
]
def test_resolver_tries_doi_before_dblp():
resolver = MetadataResolver()
calls: list[tuple[str, str]] = []
@ -77,7 +104,12 @@ def test_resolver_tries_doi_before_dblp():
calls.append(("dblp", value))
return None
def fake_datacite(value: str):
calls.append(("datacite", value))
return None
resolver.resolve_doi = fake_doi # type: ignore[method-assign]
resolver.resolve_datacite_doi = fake_datacite # type: ignore[method-assign]
resolver.resolve_dblp = fake_dblp # type: ignore[method-assign]
resolver.resolve_entry(
@ -88,7 +120,11 @@ def test_resolver_tries_doi_before_dblp():
)
)
assert calls == [("doi", "10.1000/example-doi"), ("dblp", "conf/test/Smith24")]
assert calls == [
("doi", "10.1000/example-doi"),
("datacite", "10.1000/example-doi"),
("dblp", "conf/test/Smith24"),
]
def test_openalex_work_to_entry_maps_basic_fields():
@ -131,6 +167,8 @@ def test_resolver_can_resolve_openalex_id():
def test_resolver_falls_back_to_openalex_title_search():
resolver = MetadataResolver()
resolver.search_datacite = lambda title, limit=5: [] # type: ignore[method-assign]
resolver.search_crossref = lambda title, limit=5: [] # type: ignore[method-assign]
resolver.search_openalex = lambda title, limit=5: [ # type: ignore[method-assign]
_openalex_work_to_entry(
{
@ -154,3 +192,212 @@ def test_resolver_falls_back_to_openalex_title_search():
assert resolution is not None
assert resolution.source_label == "openalex:search:OpenAlex Resolved Work"
assert resolution.entry.fields["openalex"] == "W12345"
def test_resolver_prefers_exact_crossref_title_match_before_datacite():
resolver = MetadataResolver()
resolver.search_crossref = lambda title, limit=5: [ # type: ignore[method-assign]
_crossref_message_to_entry(
{
"type": "journal-article",
"title": [title],
"DOI": "10.1126/science.1090005",
"container-title": ["Science"],
"author": [
{"family": "King", "given": "Mary-Claire"},
{"family": "Wilson", "given": "A. C."},
],
"issued": {"date-parts": [[1975, 4, 11]]},
}
)
]
resolver.search_datacite = lambda title, limit=5: [ # type: ignore[method-assign]
_datacite_work_to_entry(
{
"attributes": {
"doi": "10.5061/dryad.v6wwpzh17",
"titles": [
{
"title": "Conserved patterns and locomotor-related evolutionary constraints in the hominoid vertebral column"
}
],
"creators": [
{"familyName": "Villamil", "givenName": "Catalina I."},
{"familyName": "Middleton", "givenName": "Emily R."},
],
"publicationYear": 2024,
"types": {"resourceTypeGeneral": "Dataset"},
}
}
)
]
resolution = resolver.resolve_entry(
BibEntry(
entry_type="article",
citation_key="king1975evolution2",
fields={
"title": "Evolution at two levels in humans and chimpanzees",
"author": "King, M. C. and Wilson, A. C.",
"year": "1975",
},
)
)
assert resolution is not None
assert resolution.source_label == "crossref:search:Evolution at two levels in humans and chimpanzees"
assert resolution.entry.fields["doi"] == "10.1126/science.1090005"
def test_resolver_rejects_mismatched_title_search_candidates():
resolver = MetadataResolver()
resolver.search_crossref = lambda title, limit=5: [] # type: ignore[method-assign]
resolver.search_datacite = lambda title, limit=5: [ # type: ignore[method-assign]
_datacite_work_to_entry(
{
"attributes": {
"doi": "10.5061/dryad.v6wwpzh17",
"titles": [
{
"title": "Conserved patterns and locomotor-related evolutionary constraints in the hominoid vertebral column"
}
],
"creators": [
{"familyName": "Villamil", "givenName": "Catalina I."},
],
"publicationYear": 2024,
"types": {"resourceTypeGeneral": "Dataset"},
}
}
)
]
resolver.search_openalex = lambda title, limit=5: [ # type: ignore[method-assign]
_openalex_work_to_entry(
{
"id": "https://openalex.org/W2033360601",
"display_name": "Immunological relatedness of glucose 6-phosphate dehydrogenases from vertebrate and invertebrate species.",
"publication_year": 1978,
"type": "article",
"authorships": [
{"author": {"display_name": "Yoshikazu Sado"}},
{"author": {"display_name": "Samuel H. Hori"}},
],
"doi": "https://doi.org/10.1266/jjg.53.91",
}
)
]
resolution = resolver.resolve_entry(
BibEntry(
entry_type="article",
citation_key="sarich1967immunological1",
fields={
"title": "Immunological Time Scale for Homonid Evolution",
"author": "Sarich, V. and Wilson, A.",
"year": "1967",
},
)
)
assert resolution is None
def test_datacite_work_to_entry_maps_basic_fields():
entry = _datacite_work_to_entry(
{
"attributes": {
"doi": "10.1000/datacite-example",
"titles": [{"title": "Repository Dissertation Record"}],
"creators": [{"familyName": "Doe", "givenName": "Jane"}],
"publicationYear": 2021,
"publisher": "Example University",
"url": "https://example.edu/record/123",
"types": {"resourceTypeGeneral": "Dissertation"},
"descriptions": [
{
"descriptionType": "Abstract",
"description": "An abstract from DataCite.",
}
],
}
}
)
assert entry.entry_type == "phdthesis"
assert entry.fields["doi"] == "10.1000/datacite-example"
assert entry.fields["author"] == "Doe, Jane"
assert entry.fields["publisher"] == "Example University"
assert entry.fields["abstract"] == "An abstract from DataCite."
def test_resolver_can_resolve_datacite_doi():
resolver = MetadataResolver()
resolver.source_client.get_json = lambda _url: { # type: ignore[method-assign]
"data": {
"attributes": {
"doi": "10.1000/datacite-example",
"titles": [{"title": "Repository Dissertation Record"}],
"creators": [{"familyName": "Doe", "givenName": "Jane"}],
"publicationYear": 2021,
"types": {"resourceTypeGeneral": "Dissertation"},
}
}
}
resolution = resolver.resolve_datacite_doi("10.1000/datacite-example")
assert resolution is not None
assert resolution.source_label == "datacite:doi:10.1000/datacite-example"
assert resolution.entry.entry_type == "phdthesis"
def test_resolver_can_fall_back_to_datacite_title_search():
resolver = MetadataResolver()
resolver.search_crossref = lambda title, limit=5: [] # type: ignore[method-assign]
resolver.search_datacite = lambda title, limit=5: [ # type: ignore[method-assign]
_datacite_work_to_entry(
{
"attributes": {
"doi": "10.1000/datacite-example",
"titles": [{"title": title}],
"creators": [{"familyName": "Doe", "givenName": "Jane"}],
"publicationYear": 2021,
"types": {"resourceTypeGeneral": "Dissertation"},
}
}
)
]
resolver.search_openalex = lambda title, limit=5: [] # type: ignore[method-assign]
resolution = resolver.resolve_entry(
BibEntry(
entry_type="misc",
citation_key="draft1",
fields={"title": "Repository Dissertation Record", "author": "Doe, Jane", "year": "2021"},
)
)
assert resolution is not None
assert resolution.source_label == "datacite:search:Repository Dissertation Record"
assert resolution.entry.fields["doi"] == "10.1000/datacite-example"
def test_render_bibtex_tolerates_unmatched_braces_in_field_values():
rendered = render_bibtex(
[
BibEntry(
entry_type="misc",
citation_key="broken2026",
fields={
"author": "Broken, Example",
"title": "Unmatched { braces } example } tail",
"year": "2026",
"note": "Open { brace only",
},
)
]
)
assert "@misc{broken2026," in rendered
assert "Unmatched { braces } example ) tail" in rendered
assert "Open ( brace only" in rendered

View File

@ -28,3 +28,14 @@ def test_source_client_writes_cache_after_fetch(tmp_path: Path):
assert payload["ok"] is True
assert any(cache_dir.iterdir())
def test_source_client_falls_back_to_latin1_for_text(tmp_path: Path):
client = SourceClient(cache_dir=tmp_path / "cache")
url = "https://example.org/latin1"
client._fetch_bytes = lambda _url: "café".encode("iso-8859-1") # type: ignore[method-assign]
payload = client.get_text(url)
assert payload == "café"

View File

@ -130,3 +130,250 @@ def test_store_traverses_graph_and_surfaces_missing_targets():
assert rows[2]["depth"] == 2
finally:
store.close()
def test_store_records_and_updates_field_conflicts():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
"""
)
ok = store.record_conflicts(
"seed2024",
[
{
"field_name": "title",
"current_value": "Seed Paper",
"proposed_value": "Resolved Seed Paper",
}
],
source_type="resolver",
source_label="crossref:doi:10.1000/seed",
)
assert ok is True
conflicts = store.get_field_conflicts("seed2024")
assert conflicts[0]["field_name"] == "title"
assert conflicts[0]["status"] == "open"
assert store.set_conflict_status("seed2024", "title", "accepted") == 1
updated = store.get_field_conflicts("seed2024", status="accepted")
assert len(updated) == 1
finally:
store.close()
def test_store_can_apply_latest_conflict_value():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
"""
)
store.record_conflicts(
"seed2024",
[
{
"field_name": "title",
"current_value": "Seed Paper",
"proposed_value": "Resolved Seed Paper",
}
],
source_type="resolver",
source_label="crossref:doi:10.1000/seed",
)
assert store.apply_conflict_value("seed2024", "title") is True
entry = store.get_entry("seed2024")
assert entry is not None
assert entry["title"] == "Resolved Seed Paper"
accepted = store.get_field_conflicts("seed2024", status="accepted")
assert len(accepted) == 1
finally:
store.close()
def test_store_supports_entry_topic_membership():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
"""
)
assert store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
) is True
assert store.add_entry_topic(
"seed2024",
topic_slug="semantic-search",
topic_name="Semantic Search",
source_type="talkorigins",
source_url="https://example.org/topics/semantic-search",
source_label="topic-seed",
) is True
entry = store.get_entry("seed2024")
assert entry is not None
assert [topic["slug"] for topic in entry["topics"]] == ["graph-methods", "semantic-search"]
topics = store.list_topics()
assert [topic["slug"] for topic in topics] == ["graph-methods", "semantic-search"]
assert topics[0]["entry_count"] == 1
topic = store.get_topic("graph-methods")
assert topic is not None
assert topic["name"] == "Graph Methods"
assert topic["expansion_phrase"] is None
topic_entries = store.list_topic_entries("graph-methods")
assert topic_entries[0]["citation_key"] == "seed2024"
finally:
store.close()
def test_store_can_set_topic_expansion_phrase():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024}
}
"""
)
store.add_entry_topic(
"seed2024",
topic_slug="graph-methods",
topic_name="Graph Methods",
source_type="talkorigins",
source_url="https://example.org/topics/graph-methods",
source_label="topic-seed",
)
assert store.set_topic_expansion_phrase("graph-methods", "graph networks biology") is True
topic = store.get_topic("graph-methods")
assert topic is not None
assert topic["expansion_phrase"] == "graph networks biology"
assert topic["phrase_review_status"] == "unreviewed"
topics = store.list_topics()
assert topics[0]["expansion_phrase"] == "graph networks biology"
finally:
store.close()
def test_store_can_stage_and_review_topic_phrase_suggestion():
store = BibliographyStore()
try:
store.ensure_topic("graph-methods", "Graph Methods")
assert store.stage_topic_phrase_suggestion(
"graph-methods",
"graph networks biology",
review_notes="generated from local titles",
) is True
staged = store.get_topic("graph-methods")
assert staged is not None
assert staged["suggested_phrase"] == "graph networks biology"
assert staged["expansion_phrase"] is None
assert staged["phrase_review_status"] == "pending"
assert staged["phrase_review_notes"] == "generated from local titles"
assert store.review_topic_phrase_suggestion(
"graph-methods",
"accepted",
review_notes="looks good",
) is True
reviewed = store.get_topic("graph-methods")
assert reviewed is not None
assert reviewed["suggested_phrase"] == "graph networks biology"
assert reviewed["expansion_phrase"] == "graph networks biology"
assert reviewed["phrase_review_status"] == "accepted"
assert reviewed["phrase_review_notes"] == "looks good"
finally:
store.close()
def test_store_can_filter_topics_by_phrase_review_status():
store = BibliographyStore()
try:
store.ensure_topic("graph-methods", "Graph Methods")
store.ensure_topic("abiogenesis", "Abiogenesis")
store.stage_topic_phrase_suggestion("graph-methods", "graph networks biology")
store.stage_topic_phrase_suggestion("abiogenesis", "abiogenesis life origin")
store.review_topic_phrase_suggestion("abiogenesis", "accepted")
pending_topics = store.list_topics(phrase_review_status="pending")
accepted_topics = store.list_topics(phrase_review_status="accepted")
assert [topic["slug"] for topic in pending_topics] == ["graph-methods"]
assert [topic["slug"] for topic in accepted_topics] == ["abiogenesis"]
finally:
store.close()
def test_store_search_text_can_filter_by_topic():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Graph Methods for Biology},
year = {2024},
abstract = {A graph methods paper.}
}
@article{other2023,
author = {Other, Bob},
title = {Graph Methods for Chemistry},
year = {2023},
abstract = {Another graph methods paper.}
}
"""
)
store.add_entry_topic(
"seed2024",
topic_slug="biology",
topic_name="Biology",
source_type="talkorigins",
source_url="https://example.org/topics/biology",
source_label="topic-seed",
)
store.add_entry_topic(
"other2023",
topic_slug="chemistry",
topic_name="Chemistry",
source_type="talkorigins",
source_url="https://example.org/topics/chemistry",
source_label="topic-seed",
)
store.connection.commit()
results = store.search_text("graph", topic_slug="biology")
assert [row["citation_key"] for row in results] == ["seed2024"]
finally:
store.close()

1024
tests/test_talkorigins.py Normal file

File diff suppressed because it is too large Load Diff

242
tests/test_topic_expand.py Normal file
View File

@ -0,0 +1,242 @@
from citegeist.bibtex import BibEntry
from citegeist.expand import (
ExpansionResult,
TopicExpander,
_meets_topic_assignment_threshold,
_topic_relevance_score,
)
from citegeist.storage import BibliographyStore
class FakeOpenAlexExpander:
def __init__(self, results: list[ExpansionResult] | dict[str, list[ExpansionResult]]) -> None:
self.results = results
def expand_entry(self, store, citation_key, relation_type="cites", limit=25):
if isinstance(self.results, dict):
return list(self.results.get(citation_key, []))
return list(self.results)
def test_topic_expander_assigns_relevant_discoveries_back_to_topic():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Abiogenesis Seed Paper},
year = {2024}
}
"""
)
store.add_entry_topic(
"seed2024",
topic_slug="abiogenesis",
topic_name="Abiogenesis",
source_type="talkorigins",
source_url="https://example.org/topics/abiogenesis",
source_label="seed",
)
store.upsert_entry(
BibEntry(
entry_type="article",
citation_key="discovered1",
fields={
"title": "Abiogenesis and origin chemistry",
"abstract": "A study of abiogenesis pathways.",
"year": "2025",
},
),
source_type="graph_expand",
source_label="test",
review_status="draft",
)
store.upsert_entry(
BibEntry(
entry_type="article",
citation_key="discovered2",
fields={
"title": "Galaxy formation dynamics",
"abstract": "Nothing about the topic.",
"year": "2025",
},
),
source_type="graph_expand",
source_label="test",
review_status="draft",
)
store.connection.commit()
expander = TopicExpander(
openalex_expander=FakeOpenAlexExpander(
[
ExpansionResult("seed2024", "discovered1", False, "cites", "openalex:cites:seed2024"),
ExpansionResult("seed2024", "discovered2", False, "cites", "openalex:cites:seed2024"),
]
)
)
results = expander.expand_topic(
store,
"abiogenesis",
topic_phrase="abiogenesis origin chemistry",
min_relevance=0.34,
)
assert len(results) == 2
assigned = {item.discovered_citation_key: item.assigned_to_topic for item in results}
assert assigned["discovered1"] is True
assert assigned["discovered2"] is False
topics = store.get_entry_topics("discovered1")
assert topics[0]["slug"] == "abiogenesis"
assert store.get_entry_topics("discovered2") == []
finally:
store.close()
def test_topic_expander_can_restrict_to_allowed_seed_keys():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Abiogenesis Seed Paper},
year = {2024}
}
@article{seed2023,
author = {Seed, Bob},
title = {Abiogenesis Historical Seed},
year = {2023}
}
"""
)
for citation_key in ("seed2024", "seed2023"):
store.add_entry_topic(
citation_key,
topic_slug="abiogenesis",
topic_name="Abiogenesis",
source_type="talkorigins",
source_url="https://example.org/topics/abiogenesis",
source_label="seed",
)
store.upsert_entry(
BibEntry(
entry_type="article",
citation_key="discovered1",
fields={
"title": "Abiogenesis origin chemistry",
"abstract": "A study of abiogenesis chemistry.",
"year": "2025",
},
),
source_type="graph_expand",
source_label="test",
review_status="draft",
)
store.connection.commit()
expander = TopicExpander(
openalex_expander=FakeOpenAlexExpander(
{"seed2023": [ExpansionResult("seed2023", "discovered1", False, "cites", "openalex:cites:seed2023")]}
)
)
results = expander.expand_topic(
store,
"abiogenesis",
topic_phrase="abiogenesis origin chemistry",
seed_keys=["seed2024"],
)
assert results == []
assert store.get_entry_topics("discovered1") == []
finally:
store.close()
def test_topic_expander_preview_discovers_without_writing():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Abiogenesis Seed Paper},
year = {2024}
}
"""
)
store.add_entry_topic(
"seed2024",
topic_slug="abiogenesis",
topic_name="Abiogenesis",
source_type="talkorigins",
source_url="https://example.org/topics/abiogenesis",
source_label="seed",
)
store.connection.commit()
expander = TopicExpander()
expander._preview_discoveries = lambda *_args, **_kwargs: [ # type: ignore[method-assign]
(
ExpansionResult(
"seed2024",
"preview1",
True,
"cites",
"openalex:cites:seed2024",
),
{
"title": "Abiogenesis origin chemistry",
"abstract": "A study of abiogenesis chemistry.",
"year": "2025",
},
)
]
results = expander.expand_topic(
store,
"abiogenesis",
topic_phrase="abiogenesis origin chemistry",
min_relevance=0.3,
preview_only=True,
)
assert len(results) == 1
assert results[0].discovered_citation_key == "preview1"
assert results[0].meets_relevance_threshold is True
assert results[0].assigned_to_topic is False
assert results[0].created_entry is True
assert store.get_entry("preview1") is None
assert store.get_entry_topics("preview1") == []
finally:
store.close()
def test_topic_relevance_score_expands_human_evolution_terms():
score = _topic_relevance_score(
"human evolution",
{
"title": "Body size and proportions in early hominids",
"abstract": "A fossil and paleolithic perspective on primate ancestry.",
"journal": "Science",
},
)
assert score >= 0.15
def test_topic_assignment_requires_title_anchor():
entry = {
"title": "Phylogenies and the Comparative Method",
"abstract": "A comparative framework for primate and hominid evolution.",
"journal": "Systematic Zoology",
}
score = _topic_relevance_score("human evolution", entry)
assert score >= 0.15
assert _meets_topic_assignment_threshold("human evolution", entry, min_relevance=0.15, relevance_score=score) is False