Add citation graph expansion workflow

This commit is contained in:
welsberr 2026-03-19 21:06:12 -04:00
parent ac405943fb
commit 10280a6229
12 changed files with 1339 additions and 16 deletions

View File

@ -46,6 +46,11 @@ The initial repo includes:
- `pybtex`-backed BibTeX parsing and export in a repo-local virtual environment; - `pybtex`-backed BibTeX parsing and export in a repo-local virtual environment;
- a SQLite-backed bibliography store; - a SQLite-backed bibliography store;
- a small CLI for ingest, search, inspection, and export; - a small CLI for ingest, search, inspection, and export;
- review-state tracking on entries and per-field ingest provenance;
- first-pass plaintext reference extraction into draft BibTeX;
- identifier-first metadata resolution for DOI, DBLP, and arXiv-backed entries;
- local citation-graph traversal over stored `cites`, `cited_by`, and `crossref` edges;
- Crossref-backed graph expansion that materializes draft referenced works and edge provenance;
- normalized tables for entries, creators, identifiers, and citation relations; - normalized tables for entries, creators, identifiers, and citation relations;
- full-text-search-ready indexing over title, abstract, and fulltext when SQLite FTS5 is available; - full-text-search-ready indexing over title, abstract, and fulltext when SQLite FTS5 is available;
- tests covering parsing, ingestion, relation storage, and search. - tests covering parsing, ingestion, relation storage, and search.
@ -106,15 +111,19 @@ Or use the CLI directly:
cd citegeist cd citegeist
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 ingest references.bib PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 ingest references.bib
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 search "semantic search" PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 search "semantic search"
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 show smith2024graphs PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 show --provenance smith2024graphs
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 set-status smith2024graphs reviewed
PYTHONPATH=src .venv/bin/python -m citegeist extract references.txt --output draft.bib
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 resolve smith2024graphs
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 graph smith2024graphs --relation cites --depth 2 --missing-only
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 expand smith2024graphs --source crossref
PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 export --output reviewed.bib PYTHONPATH=src .venv/bin/python -m citegeist --db library.sqlite3 export --output reviewed.bib
``` ```
## Near-Term Priorities ## Near-Term Priorities
- provenance tracking and entry review states; - stronger plaintext extraction coverage for more citation styles;
- plaintext reference extraction into draft BibTeX; - richer graph expansion from additional external citation sources.
- metadata resolvers for DOI, Crossref, DBLP, and arXiv.
See [ROADMAP.md](./ROADMAP.md) for the prioritized phase plan and rationale. See [ROADMAP.md](./ROADMAP.md) for the prioritized phase plan and rationale.

View File

@ -1,4 +1,15 @@
from .bibtex import BibEntry, parse_bibtex from .bibtex import BibEntry, parse_bibtex
from .expand import CrossrefExpander
from .extract import extract_references
from .resolve import MetadataResolver, merge_entries
from .storage import BibliographyStore from .storage import BibliographyStore
__all__ = ["BibEntry", "BibliographyStore", "parse_bibtex"] __all__ = [
"BibEntry",
"BibliographyStore",
"CrossrefExpander",
"MetadataResolver",
"extract_references",
"merge_entries",
"parse_bibtex",
]

View File

@ -1,10 +1,15 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
from dataclasses import asdict
import json import json
import sys import sys
from pathlib import Path from pathlib import Path
from .bibtex import parse_bibtex, render_bibtex
from .expand import CrossrefExpander
from .extract import extract_references
from .resolve import MetadataResolver, merge_entries
from .storage import BibliographyStore from .storage import BibliographyStore
@ -16,6 +21,8 @@ def build_parser() -> argparse.ArgumentParser:
ingest_parser = subparsers.add_parser("ingest", help="Ingest BibTeX into the database") ingest_parser = subparsers.add_parser("ingest", help="Ingest BibTeX into the database")
ingest_parser.add_argument("input", help="BibTeX file to ingest") ingest_parser.add_argument("input", help="BibTeX file to ingest")
ingest_parser.add_argument("--status", default="draft", help="Initial review status")
ingest_parser.add_argument("--source-label", help="Provenance label for this ingest run")
search_parser = subparsers.add_parser("search", help="Search titles, abstracts, and fulltext") search_parser = subparsers.add_parser("search", help="Search titles, abstracts, and fulltext")
search_parser.add_argument("query", help="Search query") search_parser.add_argument("query", help="Search query")
@ -24,11 +31,49 @@ def build_parser() -> argparse.ArgumentParser:
show_parser = subparsers.add_parser("show", help="Show one entry or list entries") show_parser = subparsers.add_parser("show", help="Show one entry or list entries")
show_parser.add_argument("citation_key", nargs="?", help="Citation key to show") show_parser.add_argument("citation_key", nargs="?", help="Citation key to show")
show_parser.add_argument("--limit", type=int, default=20, help="Maximum entries when listing") show_parser.add_argument("--limit", type=int, default=20, help="Maximum entries when listing")
show_parser.add_argument("--provenance", action="store_true", help="Include field provenance")
export_parser = subparsers.add_parser("export", help="Export entries as BibTeX") export_parser = subparsers.add_parser("export", help="Export entries as BibTeX")
export_parser.add_argument("citation_keys", nargs="*", help="Optional citation keys to export") export_parser.add_argument("citation_keys", nargs="*", help="Optional citation keys to export")
export_parser.add_argument("--output", help="Write BibTeX to a file instead of stdout") export_parser.add_argument("--output", help="Write BibTeX to a file instead of stdout")
status_parser = subparsers.add_parser("set-status", help="Set the review status for one entry")
status_parser.add_argument("citation_key", help="Citation key to update")
status_parser.add_argument("review_status", help="New review status")
extract_parser = subparsers.add_parser("extract", help="Extract draft BibTeX from plaintext references")
extract_parser.add_argument("input", help="Plaintext file containing bibliography-style references")
extract_parser.add_argument("--output", help="Write extracted BibTeX to a file instead of stdout")
resolve_parser = subparsers.add_parser("resolve", help="Enrich stored entries from external metadata sources")
resolve_parser.add_argument("citation_keys", nargs="+", help="Citation keys to enrich")
graph_parser = subparsers.add_parser("graph", help="Traverse citation relations from one or more seed entries")
graph_parser.add_argument("citation_keys", nargs="+", help="Seed citation keys")
graph_parser.add_argument(
"--relation",
action="append",
dest="relations",
choices=["cites", "cited_by", "crossref"],
help="Relation type to traverse; may be passed multiple times",
)
graph_parser.add_argument("--depth", type=int, default=1, help="Maximum traversal depth")
graph_parser.add_argument("--review-status", help="Filter results by target review status")
graph_parser.add_argument(
"--missing-only",
action="store_true",
help="Show only unresolved target nodes that are not yet present in the database",
)
expand_parser = subparsers.add_parser("expand", help="Expand graph edges from external metadata sources")
expand_parser.add_argument("citation_keys", nargs="+", help="Seed citation keys to expand")
expand_parser.add_argument(
"--source",
choices=["crossref"],
default="crossref",
help="External source used for graph expansion",
)
return parser return parser
@ -39,13 +84,30 @@ def main(argv: list[str] | None = None) -> int:
store = BibliographyStore(args.db) store = BibliographyStore(args.db)
try: try:
if args.command == "ingest": if args.command == "ingest":
return _run_ingest(store, Path(args.input)) return _run_ingest(store, Path(args.input), args.status, args.source_label)
if args.command == "search": if args.command == "search":
return _run_search(store, args.query, args.limit) return _run_search(store, args.query, args.limit)
if args.command == "show": if args.command == "show":
return _run_show(store, args.citation_key, args.limit) return _run_show(store, args.citation_key, args.limit, args.provenance)
if args.command == "export": if args.command == "export":
return _run_export(store, args.citation_keys, args.output) return _run_export(store, args.citation_keys, args.output)
if args.command == "set-status":
return _run_set_status(store, args.citation_key, args.review_status)
if args.command == "extract":
return _run_extract(Path(args.input), args.output)
if args.command == "resolve":
return _run_resolve(store, args.citation_keys)
if args.command == "graph":
return _run_graph(
store,
args.citation_keys,
args.relations,
args.depth,
args.review_status,
args.missing_only,
)
if args.command == "expand":
return _run_expand(store, args.citation_keys, args.source)
finally: finally:
store.close() store.close()
@ -53,9 +115,18 @@ def main(argv: list[str] | None = None) -> int:
return 2 return 2
def _run_ingest(store: BibliographyStore, input_path: Path) -> int: def _run_ingest(
store: BibliographyStore,
input_path: Path,
review_status: str,
source_label: str | None,
) -> int:
text = input_path.read_text(encoding="utf-8") text = input_path.read_text(encoding="utf-8")
keys = store.ingest_bibtex(text) keys = store.ingest_bibtex(
text,
source_label=source_label or str(input_path),
review_status=review_status,
)
for key in keys: for key in keys:
print(key) print(key)
return 0 return 0
@ -68,12 +139,14 @@ def _run_search(store: BibliographyStore, query: str, limit: int) -> int:
return 0 return 0
def _run_show(store: BibliographyStore, citation_key: str | None, limit: int) -> int: def _run_show(store: BibliographyStore, citation_key: str | None, limit: int, provenance: bool) -> int:
if citation_key: if citation_key:
entry = store.get_entry(citation_key) entry = store.get_entry(citation_key)
if entry is None: if entry is None:
print(f"Entry not found: {citation_key}", file=sys.stderr) print(f"Entry not found: {citation_key}", file=sys.stderr)
return 1 return 1
if provenance:
entry["field_provenance"] = store.get_field_provenance(citation_key)
print(json.dumps(entry, indent=2, sort_keys=True)) print(json.dumps(entry, indent=2, sort_keys=True))
return 0 return 0
@ -89,3 +162,89 @@ def _run_export(store: BibliographyStore, citation_keys: list[str], output: str
if rendered: if rendered:
print(rendered) print(rendered)
return 0 return 0
def _run_set_status(store: BibliographyStore, citation_key: str, review_status: str) -> int:
if not store.set_entry_status(citation_key, review_status):
print(f"Entry not found: {citation_key}", file=sys.stderr)
return 1
print(f"{citation_key}\t{review_status}")
return 0
def _run_extract(input_path: Path, output: str | None) -> int:
text = input_path.read_text(encoding="utf-8")
entries = extract_references(text)
rendered = render_bibtex(entries)
if output:
Path(output).write_text(rendered + ("\n" if rendered else ""), encoding="utf-8")
else:
if rendered:
print(rendered)
return 0
def _run_resolve(store: BibliographyStore, citation_keys: list[str]) -> int:
resolver = MetadataResolver()
exit_code = 0
for citation_key in citation_keys:
existing = store.get_entry(citation_key)
if existing is None:
print(f"Entry not found: {citation_key}", file=sys.stderr)
exit_code = 1
continue
bibtex = store.get_entry_bibtex(citation_key)
if not bibtex:
print(f"Entry not renderable: {citation_key}", file=sys.stderr)
exit_code = 1
continue
current_entry = parse_bibtex(bibtex)[0]
resolution = resolver.resolve_entry(current_entry)
if resolution is None:
print(f"No resolver match: {citation_key}", file=sys.stderr)
exit_code = 1
continue
merged = merge_entries(current_entry, resolution.entry)
store.replace_entry(
citation_key,
merged,
source_type=resolution.source_type,
source_label=resolution.source_label,
review_status="enriched",
)
print(f"{citation_key}\t{resolution.source_label}")
return exit_code
def _run_graph(
store: BibliographyStore,
citation_keys: list[str],
relations: list[str] | None,
depth: int,
review_status: str | None,
missing_only: bool,
) -> int:
rows = store.traverse_graph(
citation_keys,
relation_types=relations or ["cites"],
max_depth=depth,
review_status=review_status,
include_missing=True,
)
if missing_only:
rows = [row for row in rows if not row["target_exists"]]
print(json.dumps(rows, indent=2))
return 0
def _run_expand(store: BibliographyStore, citation_keys: list[str], source: str) -> int:
if source != "crossref":
print(f"Unsupported expansion source: {source}", file=sys.stderr)
return 1
expander = CrossrefExpander()
all_results = []
for citation_key in citation_keys:
all_results.extend(expander.expand_entry_references(store, citation_key))
print(json.dumps([asdict(result) for result in all_results], indent=2))
return 0

121
src/citegeist/expand.py Normal file
View File

@ -0,0 +1,121 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from .bibtex import BibEntry
from .resolve import MetadataResolver
from .storage import BibliographyStore
@dataclass(slots=True)
class ExpansionResult:
source_citation_key: str
discovered_citation_key: str
created_entry: bool
relation_type: str
source_label: str
class CrossrefExpander:
def __init__(self, resolver: MetadataResolver | None = None) -> None:
self.resolver = resolver or MetadataResolver()
def expand_entry_references(
self,
store: BibliographyStore,
citation_key: str,
) -> list[ExpansionResult]:
entry = store.get_entry(citation_key)
if entry is None:
return []
doi = entry.get("doi")
if not doi:
return []
payload = self.resolver._get_json( # noqa: SLF001
f"https://api.crossref.org/works/{doi}?mailto=welsberr@gmail.com"
)
references = payload.get("message", {}).get("reference", [])
results: list[ExpansionResult] = []
for index, reference in enumerate(references, start=1):
discovered = _crossref_reference_to_entry(reference, citation_key, index)
created = False
if store.get_entry(discovered.citation_key) is None:
store.upsert_entry(
discovered,
raw_bibtex=None,
source_type="graph_expand",
source_label=f"crossref:references:{doi}",
review_status="draft",
)
store.connection.commit()
created = True
store.add_relation(
citation_key,
discovered.citation_key,
"cites",
source_type="graph_expand",
source_label=f"crossref:references:{doi}",
confidence=1.0 if reference.get("DOI") else 0.6,
)
results.append(
ExpansionResult(
source_citation_key=citation_key,
discovered_citation_key=discovered.citation_key,
created_entry=created,
relation_type="cites",
source_label=f"crossref:references:{doi}",
)
)
return results
def _crossref_reference_to_entry(reference: dict, source_citation_key: str, ordinal: int) -> BibEntry:
title = (
reference.get("article-title")
or reference.get("volume-title")
or reference.get("journal-title")
or reference.get("unstructured")
or f"Referenced work {ordinal}"
)
year = str(reference.get("year") or "")
author = reference.get("author") or ""
doi = reference.get("DOI") or ""
journal_title = reference.get("journal-title") or ""
fields: dict[str, str] = {
"title": _normalize_text(title),
"note": f"discovered_from = {{{source_citation_key}}}",
}
if year:
fields["year"] = year
if author:
fields["author"] = _normalize_text(author)
if doi:
fields["doi"] = doi
fields["url"] = f"https://doi.org/{doi}"
if journal_title:
fields["journal"] = _normalize_text(journal_title)
citation_key = _reference_citation_key(reference, title, year, ordinal)
entry_type = "article" if journal_title else "misc"
return BibEntry(entry_type=entry_type, citation_key=citation_key, fields=fields)
def _reference_citation_key(reference: dict, title: str, year: str, ordinal: int) -> str:
if doi := reference.get("DOI"):
suffix = re.sub(r"[^A-Za-z0-9]+", "", doi).lower()
return f"doi{suffix}"
author = reference.get("author") or "ref"
family = author.split(",")[0].split()[-1]
family = re.sub(r"[^A-Za-z0-9]+", "", family).lower() or "ref"
first_word = re.sub(r"[^A-Za-z0-9]+", "", title.split()[0]).lower() if title.split() else "untitled"
return f"{family}{year or 'nd'}{first_word}{ordinal}"
def _normalize_text(value: str) -> str:
return " ".join(value.split())

102
src/citegeist/extract.py Normal file
View File

@ -0,0 +1,102 @@
from __future__ import annotations
import re
from .bibtex import BibEntry
YEAR_PATTERN = re.compile(r"\b(19|20)\d{2}\b")
def extract_references(text: str) -> list[BibEntry]:
entries: list[BibEntry] = []
for index, line in enumerate(_iter_reference_lines(text), start=1):
parsed = _parse_reference_line(line, index)
if parsed is not None:
entries.append(parsed)
return entries
def render_extracted_bibtex(text: str) -> str:
from .bibtex import render_bibtex
return render_bibtex(extract_references(text))
def _iter_reference_lines(text: str) -> list[str]:
lines: list[str] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
line = re.sub(r"^\[\d+\]\s*", "", line)
line = re.sub(r"^\d+\.\s*", "", line)
line = re.sub(r"^\(\d+\)\s*", "", line)
if len(line) < 20:
continue
lines.append(" ".join(line.split()))
return lines
def _parse_reference_line(line: str, ordinal: int) -> BibEntry | None:
year_match = YEAR_PATTERN.search(line)
if year_match is None:
return None
year = year_match.group(0)
author_part = line[: year_match.start()].strip(" .")
remainder = line[year_match.end() :].strip(" .")
if not author_part or not remainder:
return None
segments = [segment.strip(" .") for segment in remainder.split(".") if segment.strip(" .")]
if not segments:
return None
title = segments[0]
venue = segments[1] if len(segments) > 1 else ""
authors = _normalize_authors(author_part)
citation_key = _make_citation_key(authors, year, title, ordinal)
entry_type = _guess_entry_type(venue)
fields: dict[str, str] = {
"author": authors,
"year": year,
"title": title,
"note": f"extracted_reference = {{true}}; raw_reference = {{{line}}}",
}
if venue:
if entry_type == "article":
fields["journal"] = venue
else:
fields["booktitle"] = venue
return BibEntry(entry_type=entry_type, citation_key=citation_key, fields=fields)
def _normalize_authors(author_part: str) -> str:
normalized = author_part.replace(" & ", " and ")
normalized = re.sub(r"\bet al\.$", "and others", normalized)
normalized = re.sub(r"\s+and\s+", " and ", normalized)
normalized = re.sub(r"\s*,\s*", ", ", normalized)
return normalized.strip(" .")
def _make_citation_key(authors: str, year: str, title: str, ordinal: int) -> str:
first_author = authors.split(" and ")[0]
family_name = first_author.split(",")[0] if "," in first_author else first_author.split()[-1]
family_name = re.sub(r"[^A-Za-z0-9]+", "", family_name).lower() or "ref"
first_word = re.sub(r"[^A-Za-z0-9]+", "", title.split()[0]).lower() if title.split() else "untitled"
if not first_word:
first_word = "untitled"
return f"{family_name}{year}{first_word}{ordinal}"
def _guess_entry_type(venue: str) -> str:
lowered = venue.lower()
if any(token in lowered for token in ("journal", "transactions", "review", "letters")):
return "article"
if any(token in lowered for token in ("proceedings", "conference", "workshop", "symposium")):
return "inproceedings"
return "misc"

240
src/citegeist/resolve.py Normal file
View File

@ -0,0 +1,240 @@
from __future__ import annotations
import json
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from .bibtex import BibEntry, parse_bibtex
@dataclass(slots=True)
class Resolution:
entry: BibEntry
source_type: str
source_label: str
class MetadataResolver:
def __init__(self, user_agent: str = "citegeist/0.1 (local research tool)") -> None:
self.user_agent = user_agent
def resolve_entry(self, entry: BibEntry) -> Resolution | None:
if doi := entry.fields.get("doi"):
resolved = self.resolve_doi(doi)
if resolved is not None:
return resolved
if dblp_key := entry.fields.get("dblp"):
resolved = self.resolve_dblp(dblp_key)
if resolved is not None:
return resolved
if arxiv_id := entry.fields.get("arxiv"):
resolved = self.resolve_arxiv(arxiv_id)
if resolved is not None:
return resolved
return None
def resolve_doi(self, doi: str) -> Resolution | None:
encoded = urllib.parse.quote(doi, safe="")
payload = self._get_json(f"https://api.crossref.org/works/{encoded}")
message = payload.get("message", {})
if not message:
return None
return Resolution(
entry=_crossref_message_to_entry(message),
source_type="resolver",
source_label=f"crossref:doi:{doi}",
)
def search_crossref(self, title: str, limit: int = 5) -> list[BibEntry]:
query = urllib.parse.urlencode({"query.title": title, "rows": limit})
payload = self._get_json(f"https://api.crossref.org/works?{query}")
items = payload.get("message", {}).get("items", [])
return [_crossref_message_to_entry(item) for item in items]
def resolve_dblp(self, dblp_key: str) -> Resolution | None:
encoded_key = urllib.parse.quote(dblp_key, safe="/:")
text = self._get_text(f"https://dblp.org/rec/{encoded_key}.bib")
entries = parse_bibtex(text)
if not entries:
return None
return Resolution(
entry=entries[0],
source_type="resolver",
source_label=f"dblp:key:{dblp_key}",
)
def search_dblp(self, query_text: str, limit: int = 5) -> list[BibEntry]:
query = urllib.parse.urlencode({"q": query_text, "format": "json", "h": limit})
payload = self._get_json(f"https://dblp.org/search/publ/api?{query}")
hits = payload.get("result", {}).get("hits", {}).get("hit", [])
if isinstance(hits, dict):
hits = [hits]
results: list[BibEntry] = []
for hit in hits:
info = hit.get("info", {})
dblp_key = info.get("key")
if dblp_key:
resolved = self.resolve_dblp(dblp_key)
if resolved is not None:
results.append(resolved.entry)
return results
def resolve_arxiv(self, arxiv_id: str) -> Resolution | None:
query = urllib.parse.urlencode({"id_list": arxiv_id})
root = self._get_xml(f"https://export.arxiv.org/api/query?{query}")
namespace = {"atom": "http://www.w3.org/2005/Atom"}
entry = root.find("atom:entry", namespace)
if entry is None:
return None
return Resolution(
entry=_arxiv_atom_entry_to_bib(entry, arxiv_id),
source_type="resolver",
source_label=f"arxiv:id:{arxiv_id}",
)
def _get_json(self, url: str) -> dict:
with urllib.request.urlopen(self._request(url)) as response:
return json.load(response)
def _get_text(self, url: str) -> str:
with urllib.request.urlopen(self._request(url)) as response:
return response.read().decode("utf-8")
def _get_xml(self, url: str) -> ET.Element:
with urllib.request.urlopen(self._request(url)) as response:
return ET.fromstring(response.read())
def _request(self, url: str) -> urllib.request.Request:
return urllib.request.Request(
url,
headers={
"User-Agent": self.user_agent,
},
)
def merge_entries(base: BibEntry, resolved: BibEntry) -> BibEntry:
merged_fields = dict(base.fields)
for key, value in resolved.fields.items():
if value and (key not in merged_fields or not merged_fields[key]):
merged_fields[key] = value
return BibEntry(
entry_type=base.entry_type or resolved.entry_type,
citation_key=base.citation_key,
fields=merged_fields,
)
def _crossref_message_to_entry(message: dict) -> BibEntry:
entry_type = _crossref_type_to_bibtype(message.get("type", "article"))
title_values = message.get("title", [])
title = title_values[0] if title_values else ""
year = _extract_crossref_year(message)
authors = " and ".join(_crossref_person_to_name(person) for person in message.get("author", []))
venue = ""
if container_title := message.get("container-title", []):
venue = container_title[0]
fields: dict[str, str] = {}
if authors:
fields["author"] = authors
if title:
fields["title"] = title
if year:
fields["year"] = year
if doi := message.get("DOI"):
fields["doi"] = doi
if url := message.get("URL"):
fields["url"] = url
if abstract := message.get("abstract"):
fields["abstract"] = abstract
if venue:
if entry_type == "article":
fields["journal"] = venue
else:
fields["booktitle"] = venue
if volume := message.get("volume"):
fields["volume"] = str(volume)
if issue := message.get("issue"):
fields["number"] = str(issue)
if pages := message.get("page"):
fields["pages"] = str(pages)
citation_key = _make_resolution_key(fields.get("author", "crossref"), year or "n.d.", title or "untitled")
return BibEntry(entry_type=entry_type, citation_key=citation_key, fields=fields)
def _arxiv_atom_entry_to_bib(node: ET.Element, arxiv_id: str) -> BibEntry:
ns = {
"atom": "http://www.w3.org/2005/Atom",
"arxiv": "http://arxiv.org/schemas/atom",
}
title = _node_text(node.find("atom:title", ns))
summary = _node_text(node.find("atom:summary", ns))
published = _node_text(node.find("atom:published", ns))
year = published[:4] if published else ""
authors = " and ".join(
_node_text(author.find("atom:name", ns)) for author in node.findall("atom:author", ns)
)
doi = _node_text(node.find("arxiv:doi", ns))
fields: dict[str, str] = {
"title": title,
"author": authors,
"year": year,
"arxiv": arxiv_id,
"url": f"https://arxiv.org/abs/{arxiv_id}",
"pdf": f"https://arxiv.org/pdf/{arxiv_id}.pdf",
}
if summary:
fields["abstract"] = summary
if doi:
fields["doi"] = doi
return BibEntry(entry_type="article", citation_key=f"arxiv{arxiv_id.replace('.', '').replace('/', '')}", fields=fields)
def _crossref_type_to_bibtype(crossref_type: str) -> str:
mapping = {
"journal-article": "article",
"proceedings-article": "inproceedings",
"book-chapter": "incollection",
"book": "book",
"proceedings": "proceedings",
}
return mapping.get(crossref_type, "misc")
def _extract_crossref_year(message: dict) -> str:
for field_name in ("published-print", "published-online", "issued", "created"):
date_parts = message.get(field_name, {}).get("date-parts", [])
if date_parts and date_parts[0]:
return str(date_parts[0][0])
return ""
def _crossref_person_to_name(person: dict) -> str:
family = person.get("family", "")
given = person.get("given", "")
if family and given:
return f"{family}, {given}"
return family or given
def _node_text(node: ET.Element | None) -> str:
if node is None or node.text is None:
return ""
return " ".join(node.text.split())
def _make_resolution_key(author_text: str, year: str, title: str) -> str:
first_author = author_text.split(" and ")[0]
family_name = first_author.split(",")[0] if "," in first_author else first_author.split()[-1]
family_name = "".join(ch for ch in family_name.lower() if ch.isalnum()) or "ref"
first_word = "".join(ch for ch in title.split()[0].lower() if ch.isalnum()) if title.split() else "untitled"
return f"{family_name}{year}{first_word}"

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import sqlite3 import sqlite3
from collections import deque
from collections import OrderedDict from collections import OrderedDict
from pathlib import Path from pathlib import Path
@ -47,6 +48,7 @@ class BibliographyStore:
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
citation_key TEXT NOT NULL UNIQUE, citation_key TEXT NOT NULL UNIQUE,
entry_type TEXT NOT NULL, entry_type TEXT NOT NULL,
review_status TEXT NOT NULL DEFAULT 'draft',
title TEXT, title TEXT,
year TEXT, year TEXT,
journal TEXT, journal TEXT,
@ -92,9 +94,34 @@ class BibliographyStore:
relation_type TEXT NOT NULL, relation_type TEXT NOT NULL,
PRIMARY KEY (source_entry_id, target_citation_key, relation_type) PRIMARY KEY (source_entry_id, target_citation_key, relation_type)
); );
CREATE TABLE IF NOT EXISTS field_provenance (
id INTEGER PRIMARY KEY,
entry_id INTEGER NOT NULL REFERENCES entries(id) ON DELETE CASCADE,
field_name TEXT NOT NULL,
field_value TEXT,
source_type TEXT NOT NULL,
source_label TEXT NOT NULL,
operation TEXT NOT NULL,
confidence REAL,
recorded_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS relation_provenance (
id INTEGER PRIMARY KEY,
source_entry_id INTEGER NOT NULL REFERENCES entries(id) ON DELETE CASCADE,
target_citation_key TEXT NOT NULL,
relation_type TEXT NOT NULL,
source_type TEXT NOT NULL,
source_label TEXT NOT NULL,
confidence REAL,
recorded_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
""" """
) )
self._ensure_entry_columns()
if self._fts5_enabled: if self._fts5_enabled:
self.connection.execute( self.connection.execute(
""" """
@ -109,24 +136,45 @@ class BibliographyStore:
) )
self.connection.commit() self.connection.commit()
def ingest_bibtex(self, text: str, fulltext_by_key: dict[str, str] | None = None) -> list[str]: def ingest_bibtex(
self,
text: str,
fulltext_by_key: dict[str, str] | None = None,
source_label: str = "bibtex_import",
review_status: str = "draft",
) -> list[str]:
fulltext_by_key = fulltext_by_key or {} fulltext_by_key = fulltext_by_key or {}
entries = parse_bibtex(text) entries = parse_bibtex(text)
keys: list[str] = [] keys: list[str] = []
for entry in entries: for entry in entries:
fulltext = fulltext_by_key.get(entry.citation_key) fulltext = fulltext_by_key.get(entry.citation_key)
self.upsert_entry(entry, fulltext=fulltext, raw_bibtex=_entry_to_bibtex(entry)) self.upsert_entry(
entry,
fulltext=fulltext,
raw_bibtex=_entry_to_bibtex(entry),
source_type="bibtex",
source_label=source_label,
review_status=review_status,
)
keys.append(entry.citation_key) keys.append(entry.citation_key)
self.connection.commit() self.connection.commit()
return keys return keys
def upsert_entry(self, entry: BibEntry, fulltext: str | None = None, raw_bibtex: str | None = None) -> int: def upsert_entry(
self,
entry: BibEntry,
fulltext: str | None = None,
raw_bibtex: str | None = None,
source_type: str = "manual",
source_label: str = "manual",
review_status: str = "draft",
) -> int:
row = self.connection.execute( row = self.connection.execute(
""" """
INSERT INTO entries ( INSERT INTO entries (
citation_key, entry_type, title, year, journal, booktitle, publisher, citation_key, entry_type, review_status, title, year, journal, booktitle, publisher,
abstract, keywords, url, doi, isbn, fulltext, raw_bibtex, extra_fields_json abstract, keywords, url, doi, isbn, fulltext, raw_bibtex, extra_fields_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(citation_key) DO UPDATE SET ON CONFLICT(citation_key) DO UPDATE SET
entry_type = excluded.entry_type, entry_type = excluded.entry_type,
title = excluded.title, title = excluded.title,
@ -148,6 +196,7 @@ class BibliographyStore:
( (
entry.citation_key, entry.citation_key,
entry.entry_type, entry.entry_type,
review_status,
entry.fields.get("title"), entry.fields.get("title"),
entry.fields.get("year"), entry.fields.get("year"),
entry.fields.get("journal"), entry.fields.get("journal"),
@ -165,6 +214,15 @@ class BibliographyStore:
).fetchone() ).fetchone()
entry_id = int(row["id"]) entry_id = int(row["id"])
self._record_field_provenance(
entry_id=entry_id,
entry=entry,
source_type=source_type,
source_label=source_label,
operation="upsert",
fulltext=fulltext,
)
self.connection.execute("DELETE FROM entry_creators WHERE entry_id = ?", (entry_id,)) self.connection.execute("DELETE FROM entry_creators WHERE entry_id = ?", (entry_id,))
for role in ("author", "editor"): for role in ("author", "editor"):
names = _split_names(entry.fields.get(role, "")) names = _split_names(entry.fields.get(role, ""))
@ -262,6 +320,64 @@ class BibliographyStore:
).fetchall() ).fetchall()
return [str(row["target_citation_key"]) for row in rows] return [str(row["target_citation_key"]) for row in rows]
def traverse_graph(
self,
seed_keys: list[str],
relation_types: list[str] | None = None,
max_depth: int = 1,
review_status: str | None = None,
include_missing: bool = True,
) -> list[dict[str, object]]:
relation_types = relation_types or ["cites"]
allowed_relations = set(relation_types)
visited: dict[str, int] = {}
queue: deque[tuple[str, int]] = deque()
for seed_key in seed_keys:
queue.append((seed_key, 0))
visited[seed_key] = 0
results: list[dict[str, object]] = []
while queue:
citation_key, depth = queue.popleft()
if depth >= max_depth:
continue
for edge in self._iter_graph_edges(citation_key, allowed_relations):
target_key = str(edge["target_citation_key"])
target_entry = self.get_entry(target_key)
target_status = target_entry.get("review_status") if target_entry else None
if review_status is not None and target_status != review_status:
if target_entry is not None or not include_missing:
continue
next_depth = depth + 1
result = {
"source_citation_key": citation_key,
"target_citation_key": target_key,
"relation_type": str(edge["relation_type"]),
"depth": next_depth,
"target_exists": target_entry is not None,
"target_review_status": target_status,
"target_title": target_entry.get("title") if target_entry else None,
}
results.append(result)
if target_entry is not None and (target_key not in visited or next_depth < visited[target_key]):
visited[target_key] = next_depth
queue.append((target_key, next_depth))
results.sort(
key=lambda row: (
int(row["depth"]),
str(row["relation_type"]),
str(row["source_citation_key"]),
str(row["target_citation_key"]),
)
)
return results
def get_entry(self, citation_key: str) -> dict[str, object] | None: def get_entry(self, citation_key: str) -> dict[str, object] | None:
row = self.connection.execute( row = self.connection.execute(
"SELECT * FROM entries WHERE citation_key = ?", "SELECT * FROM entries WHERE citation_key = ?",
@ -272,7 +388,7 @@ class BibliographyStore:
def list_entries(self, limit: int = 50) -> list[dict[str, object]]: def list_entries(self, limit: int = 50) -> list[dict[str, object]]:
rows = self.connection.execute( rows = self.connection.execute(
""" """
SELECT citation_key, entry_type, title, year SELECT citation_key, entry_type, review_status, title, year
FROM entries FROM entries
ORDER BY COALESCE(year, ''), citation_key ORDER BY COALESCE(year, ''), citation_key
LIMIT ? LIMIT ?
@ -281,6 +397,109 @@ class BibliographyStore:
).fetchall() ).fetchall()
return [dict(row) for row in rows] return [dict(row) for row in rows]
def set_entry_status(self, citation_key: str, review_status: str) -> bool:
row = self.connection.execute(
"""
UPDATE entries
SET review_status = ?, updated_at = CURRENT_TIMESTAMP
WHERE citation_key = ?
RETURNING id
""",
(review_status, citation_key),
).fetchone()
self.connection.commit()
return row is not None
def replace_entry(
self,
citation_key: str,
entry: BibEntry,
source_type: str,
source_label: str,
review_status: str = "enriched",
) -> bool:
existing = self.get_entry(citation_key)
if existing is None:
return False
replacement = BibEntry(
entry_type=entry.entry_type,
citation_key=citation_key,
fields=entry.fields,
)
self.upsert_entry(
replacement,
fulltext=existing.get("fulltext"),
raw_bibtex=_entry_to_bibtex(replacement),
source_type=source_type,
source_label=source_label,
review_status=review_status,
)
self.connection.commit()
return True
def add_relation(
self,
source_citation_key: str,
target_citation_key: str,
relation_type: str,
source_type: str,
source_label: str,
confidence: float = 1.0,
) -> bool:
row = self.connection.execute(
"SELECT id FROM entries WHERE citation_key = ?",
(source_citation_key,),
).fetchone()
if row is None:
return False
source_entry_id = int(row["id"])
self.connection.execute(
"""
INSERT OR IGNORE INTO relations (source_entry_id, target_citation_key, relation_type)
VALUES (?, ?, ?)
""",
(source_entry_id, target_citation_key, relation_type),
)
self.connection.execute(
"""
INSERT INTO relation_provenance (
source_entry_id, target_citation_key, relation_type, source_type, source_label, confidence
) VALUES (?, ?, ?, ?, ?, ?)
""",
(source_entry_id, target_citation_key, relation_type, source_type, source_label, confidence),
)
self.connection.commit()
return True
def get_field_provenance(self, citation_key: str) -> list[dict[str, object]]:
rows = self.connection.execute(
"""
SELECT fp.field_name, fp.field_value, fp.source_type, fp.source_label,
fp.operation, fp.confidence, fp.recorded_at
FROM field_provenance fp
JOIN entries e ON e.id = fp.entry_id
WHERE e.citation_key = ?
ORDER BY fp.recorded_at, fp.id
""",
(citation_key,),
).fetchall()
return [dict(row) for row in rows]
def get_relation_provenance(self, citation_key: str) -> list[dict[str, object]]:
rows = self.connection.execute(
"""
SELECT rp.target_citation_key, rp.relation_type, rp.source_type, rp.source_label,
rp.confidence, rp.recorded_at
FROM relation_provenance rp
JOIN entries e ON e.id = rp.source_entry_id
WHERE e.citation_key = ?
ORDER BY rp.recorded_at, rp.id
""",
(citation_key,),
).fetchall()
return [dict(row) for row in rows]
def get_entry_bibtex(self, citation_key: str) -> str | None: def get_entry_bibtex(self, citation_key: str) -> str | None:
entry = self._load_bib_entry(citation_key) entry = self._load_bib_entry(citation_key)
if entry is None: if entry is None:
@ -382,6 +601,72 @@ class BibliographyStore:
).fetchall() ).fetchall()
return [str(row["full_name"]) for row in rows] return [str(row["full_name"]) for row in rows]
def _iter_graph_edges(self, citation_key: str, allowed_relations: set[str]) -> list[sqlite3.Row]:
rows = self.connection.execute(
"""
SELECT e.citation_key AS source_citation_key, r.target_citation_key, r.relation_type
FROM relations r
JOIN entries e ON e.id = r.source_entry_id
WHERE e.citation_key = ? AND r.relation_type IN ({placeholders})
ORDER BY r.relation_type, r.target_citation_key
""".format(placeholders=",".join("?" for _ in allowed_relations)),
(citation_key, *sorted(allowed_relations)),
).fetchall()
reverse_rows = []
if "cited_by" in allowed_relations:
reverse_rows = self.connection.execute(
"""
SELECT ? AS source_citation_key, e.citation_key AS target_citation_key, 'cited_by' AS relation_type
FROM relations r
JOIN entries e ON e.id = r.source_entry_id
WHERE r.target_citation_key = ? AND r.relation_type = 'cites'
ORDER BY e.citation_key
""",
(citation_key, citation_key),
).fetchall()
seen: set[tuple[str, str]] = set()
merged: list[sqlite3.Row] = []
for row in list(rows) + list(reverse_rows):
key = (str(row["relation_type"]), str(row["target_citation_key"]))
if key not in seen:
seen.add(key)
merged.append(row)
return merged
def _ensure_entry_columns(self) -> None:
columns = {
row["name"] for row in self.connection.execute("PRAGMA table_info(entries)").fetchall()
}
if "review_status" not in columns:
self.connection.execute(
"ALTER TABLE entries ADD COLUMN review_status TEXT NOT NULL DEFAULT 'draft'"
)
def _record_field_provenance(
self,
entry_id: int,
entry: BibEntry,
source_type: str,
source_label: str,
operation: str,
fulltext: str | None,
) -> None:
field_items = list(entry.fields.items())
if fulltext:
field_items.append(("fulltext", fulltext))
for field_name, field_value in field_items:
self.connection.execute(
"""
INSERT INTO field_provenance (
entry_id, field_name, field_value, source_type, source_label, operation, confidence
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(entry_id, field_name, field_value, source_type, source_label, operation, 1.0),
)
def _split_names(value: str) -> list[str]: def _split_names(value: str) -> list[str]:
if not value: if not value:

View File

@ -4,6 +4,9 @@ import json
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from unittest.mock import patch
from citegeist.cli import main
SAMPLE_BIB = """ SAMPLE_BIB = """
@ -59,3 +62,144 @@ def test_cli_ingest_show_search_and_export(tmp_path: Path):
assert export_result.returncode == 0 assert export_result.returncode == 0
exported = export_path.read_text(encoding="utf-8") exported = export_path.read_text(encoding="utf-8")
assert "@article{smith2024graphs," in exported assert "@article{smith2024graphs," in exported
def test_cli_provenance_and_status_updates(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(SAMPLE_BIB, encoding="utf-8")
ingest = run_cli(
tmp_path,
"ingest",
"--status",
"draft",
"--source-label",
"tests/input.bib",
str(bib_path),
)
assert ingest.returncode == 0
show = run_cli(tmp_path, "show", "--provenance", "smith2024graphs")
assert show.returncode == 0
payload = json.loads(show.stdout)
assert payload["review_status"] == "draft"
assert payload["field_provenance"][0]["source_label"] == "tests/input.bib"
status = run_cli(tmp_path, "set-status", "smith2024graphs", "reviewed")
assert status.returncode == 0
assert "reviewed" in status.stdout
def test_cli_resolve_updates_entry(tmp_path: Path):
bib_path = tmp_path / "input.bib"
bib_path.write_text(
"""
@article{smith2024graphs,
author = {Smith, Jane},
title = {Graph-first bibliography augmentation},
year = {2024},
doi = {10.1000/example-doi}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.bibtex import BibEntry
from citegeist.resolve import Resolution
database = tmp_path / "library.sqlite3"
with patch("citegeist.cli.MetadataResolver.resolve_entry") as mocked_resolve:
mocked_resolve.return_value = Resolution(
entry=BibEntry(
entry_type="article",
citation_key="resolvedkey",
fields={
"author": "Smith, Jane",
"title": "Graph-first bibliography augmentation",
"year": "2024",
"doi": "10.1000/example-doi",
"journal": "Journal of Graph Studies",
},
),
source_type="resolver",
source_label="crossref:doi:10.1000/example-doi",
)
exit_code = main(
[
"--db",
str(database),
"resolve",
"smith2024graphs",
]
)
assert exit_code == 0
def test_cli_graph_outputs_missing_targets(tmp_path: Path):
bib_path = tmp_path / "graph.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024},
references = {known2023, missing2022}
}
@article{known2023,
author = {Known, Bob},
title = {Known Paper},
year = {2023}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
graph = run_cli(tmp_path, "graph", "seed2024", "--missing-only")
assert graph.returncode == 0
payload = json.loads(graph.stdout)
assert len(payload) == 1
assert payload[0]["target_citation_key"] == "missing2022"
assert payload[0]["target_exists"] is False
def test_cli_expand_with_mocked_crossref(tmp_path: Path):
bib_path = tmp_path / "expand.bib"
bib_path.write_text(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024},
doi = {10.1000/seed-doi}
}
""",
encoding="utf-8",
)
ingest = run_cli(tmp_path, "ingest", str(bib_path))
assert ingest.returncode == 0
from citegeist.expand import ExpansionResult
with patch("citegeist.cli.CrossrefExpander.expand_entry_references") as mocked_expand:
mocked_expand.return_value = [
ExpansionResult(
source_citation_key="seed2024",
discovered_citation_key="doi101000exampleref",
created_entry=True,
relation_type="cites",
source_label="crossref:references:10.1000/seed-doi",
)
]
database = tmp_path / "library.sqlite3"
exit_code = main(["--db", str(database), "expand", "seed2024"])
assert exit_code == 0

69
tests/test_expand.py Normal file
View File

@ -0,0 +1,69 @@
from citegeist.expand import CrossrefExpander, _crossref_reference_to_entry
from citegeist.storage import BibliographyStore
def test_crossref_reference_to_entry_prefers_doi_key():
entry = _crossref_reference_to_entry(
{
"DOI": "10.1000/example-ref",
"article-title": "Discovered Reference",
"author": "Doe, Alex",
"year": "2022",
"journal-title": "Journal of Discovery",
},
"seed2024",
1,
)
assert entry.citation_key == "doi101000exampleref"
assert entry.fields["doi"] == "10.1000/example-ref"
assert entry.fields["journal"] == "Journal of Discovery"
def test_crossref_expander_creates_draft_nodes_and_relations():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024},
doi = {10.1000/seed-doi}
}
"""
)
expander = CrossrefExpander()
expander.resolver._get_json = lambda _url: { # type: ignore[method-assign]
"message": {
"reference": [
{
"DOI": "10.1000/example-ref",
"article-title": "Discovered Reference",
"author": "Doe, Alex",
"year": "2022",
"journal-title": "Journal of Discovery",
},
{
"unstructured": "Unstructured reference string",
"year": "2021",
},
]
}
}
results = expander.expand_entry_references(store, "seed2024")
assert [result.discovered_citation_key for result in results] == [
"doi101000exampleref",
"ref2021unstructured2",
]
discovered = store.get_entry("doi101000exampleref")
assert discovered is not None
assert discovered["review_status"] == "draft"
assert store.get_relations("seed2024") == ["doi101000exampleref", "ref2021unstructured2"]
relation_provenance = store.get_relation_provenance("seed2024")
assert relation_provenance[0]["source_type"] == "graph_expand"
finally:
store.close()

35
tests/test_extract.py Normal file
View File

@ -0,0 +1,35 @@
from citegeist import extract_references, parse_bibtex
from citegeist.cli import main
SAMPLE_REFERENCES = """
[1] Smith, Jane and Doe, Alex. 2024. Graph-first bibliography augmentation. Journal of Research Systems.
[2] Miller, Sam. 2023. Semantic search for research corpora. Proceedings of the Retrieval Workshop.
"""
def test_extract_references_builds_draft_entries():
entries = extract_references(SAMPLE_REFERENCES)
assert [entry.citation_key for entry in entries] == [
"smith2024graphfirst1",
"miller2023semantic2",
]
assert entries[0].entry_type == "article"
assert entries[0].fields["journal"] == "Journal of Research Systems"
assert entries[1].entry_type == "inproceedings"
assert entries[1].fields["booktitle"] == "Proceedings of the Retrieval Workshop"
def test_extract_cli_writes_bibtex(tmp_path):
input_path = tmp_path / "references.txt"
output_path = tmp_path / "draft.bib"
input_path.write_text(SAMPLE_REFERENCES, encoding="utf-8")
exit_code = main(["extract", str(input_path), "--output", str(output_path)])
assert exit_code == 0
exported = output_path.read_text(encoding="utf-8")
parsed = {entry.citation_key: entry for entry in parse_bibtex(exported)}
assert parsed["smith2024graphfirst1"].fields["journal"] == "Journal of Research Systems"
assert parsed["miller2023semantic2"].fields["booktitle"] == "Proceedings of the Retrieval Workshop"

85
tests/test_resolve.py Normal file
View File

@ -0,0 +1,85 @@
from xml.etree import ElementTree as ET
from citegeist.bibtex import BibEntry
from citegeist.resolve import MetadataResolver, _arxiv_atom_entry_to_bib, _crossref_message_to_entry, merge_entries
def test_crossref_message_to_entry_maps_basic_fields():
entry = _crossref_message_to_entry(
{
"type": "journal-article",
"title": ["Graph-first bibliography augmentation"],
"DOI": "10.1000/example-doi",
"URL": "https://doi.org/10.1000/example-doi",
"container-title": ["Journal of Graph Studies"],
"author": [{"family": "Smith", "given": "Jane"}],
"issued": {"date-parts": [[2024, 5, 1]]},
}
)
assert entry.entry_type == "article"
assert entry.fields["author"] == "Smith, Jane"
assert entry.fields["journal"] == "Journal of Graph Studies"
assert entry.fields["year"] == "2024"
def test_arxiv_atom_entry_to_bib_maps_basic_fields():
xml = ET.fromstring(
"""
<entry xmlns="http://www.w3.org/2005/Atom" xmlns:arxiv="http://arxiv.org/schemas/atom">
<title>Semantic search for research corpora</title>
<summary>Dense retrieval improves recall.</summary>
<published>2023-01-15T00:00:00Z</published>
<author><name>Miller, Sam</name></author>
<arxiv:doi>10.1000/arxiv-example</arxiv:doi>
</entry>
"""
)
entry = _arxiv_atom_entry_to_bib(xml, "2301.12345")
assert entry.fields["author"] == "Miller, Sam"
assert entry.fields["arxiv"] == "2301.12345"
assert entry.fields["doi"] == "10.1000/arxiv-example"
def test_merge_entries_prefers_existing_values_and_adds_missing_fields():
base = BibEntry(
entry_type="article",
citation_key="smith2024graphs",
fields={"title": "Graph-first bibliography augmentation", "doi": "10.1000/example-doi"},
)
resolved = BibEntry(
entry_type="article",
citation_key="otherkey",
fields={"title": "Different title", "journal": "Journal of Graph Studies"},
)
merged = merge_entries(base, resolved)
assert merged.fields["title"] == "Graph-first bibliography augmentation"
assert merged.fields["journal"] == "Journal of Graph Studies"
def test_resolver_tries_doi_before_dblp():
resolver = MetadataResolver()
calls: list[tuple[str, str]] = []
def fake_doi(value: str):
calls.append(("doi", value))
return None
def fake_dblp(value: str):
calls.append(("dblp", value))
return None
resolver.resolve_doi = fake_doi # type: ignore[method-assign]
resolver.resolve_dblp = fake_dblp # type: ignore[method-assign]
resolver.resolve_entry(
BibEntry(
entry_type="article",
citation_key="smith2024graphs",
fields={"doi": "10.1000/example-doi", "dblp": "conf/test/Smith24"},
)
)
assert calls == [("doi", "10.1000/example-doi"), ("dblp", "conf/test/Smith24")]

View File

@ -67,3 +67,66 @@ def test_store_exports_bibtex_from_normalized_rows():
assert parsed["smith2024graphs"].fields["references"] == "miller2023search" assert parsed["smith2024graphs"].fields["references"] == "miller2023search"
finally: finally:
store.close() store.close()
def test_store_records_provenance_and_review_status():
store = BibliographyStore()
try:
store.ingest_bibtex(SAMPLE_BIB, source_label="fixtures/sample.bib", review_status="draft")
entry = store.get_entry("smith2024graphs")
assert entry is not None
assert entry["review_status"] == "draft"
provenance = store.get_field_provenance("smith2024graphs")
assert provenance
assert provenance[0]["source_type"] == "bibtex"
assert provenance[0]["source_label"] == "fixtures/sample.bib"
assert store.set_entry_status("smith2024graphs", "reviewed") is True
updated = store.get_entry("smith2024graphs")
assert updated is not None
assert updated["review_status"] == "reviewed"
finally:
store.close()
def test_store_traverses_graph_and_surfaces_missing_targets():
store = BibliographyStore()
try:
store.ingest_bibtex(
"""
@article{seed2024,
author = {Seed, Alice},
title = {Seed Paper},
year = {2024},
references = {known2023, missing2022}
}
@article{known2023,
author = {Known, Bob},
title = {Known Paper},
year = {2023},
references = {leaf2021}
}
@article{leaf2021,
author = {Leaf, Carol},
title = {Leaf Paper},
year = {2021}
}
""",
review_status="reviewed",
)
rows = store.traverse_graph(["seed2024"], relation_types=["cites"], max_depth=2)
assert [row["target_citation_key"] for row in rows] == [
"known2023",
"missing2022",
"leaf2021",
]
assert rows[1]["target_exists"] is False
assert rows[2]["depth"] == 2
finally:
store.close()