186 lines
6.1 KiB
Python
186 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
from ecospecies_api.repository import (
|
|
get_editor_species_citations,
|
|
get_editor_species_list,
|
|
update_species_citation_enrichment,
|
|
)
|
|
|
|
|
|
def should_backfill(citation: dict[str, object], include_accepted: bool) -> bool:
|
|
review_status = str(citation.get("review_status", "")).strip().lower()
|
|
source_type = str(citation.get("source_type", "")).strip().lower()
|
|
enrichment_status = str(citation.get("enrichment_status", "")).strip().lower()
|
|
normalized_text = str(citation.get("normalized_text", "")).strip()
|
|
abstract_text = str(citation.get("abstract_text", "")).strip()
|
|
|
|
if not include_accepted and review_status == "accepted":
|
|
return False
|
|
if source_type in {"editor_selected_candidate", "editor_added_candidate"} and not include_accepted:
|
|
return False
|
|
|
|
return (
|
|
source_type in {"document_extract", "editor_review", ""}
|
|
or enrichment_status in {"pending", "unresolved", "error", ""}
|
|
or not normalized_text
|
|
or not abstract_text
|
|
)
|
|
|
|
|
|
def reorder_species_with_cursor(
|
|
species_items: list[dict[str, object]],
|
|
state_file: Path | None,
|
|
) -> list[dict[str, object]]:
|
|
if not state_file or not species_items:
|
|
return species_items
|
|
|
|
try:
|
|
last_slug = state_file.read_text(encoding="utf-8").strip()
|
|
except FileNotFoundError:
|
|
return species_items
|
|
|
|
if not last_slug:
|
|
return species_items
|
|
|
|
for index, item in enumerate(species_items):
|
|
if str(item.get("slug", "")).strip() == last_slug:
|
|
return species_items[index + 1 :] + species_items[: index + 1]
|
|
return species_items
|
|
|
|
|
|
def write_cursor(state_file: Path | None, slug: str) -> None:
|
|
if not state_file or not slug:
|
|
return
|
|
state_file.parent.mkdir(parents=True, exist_ok=True)
|
|
state_file.write_text(f"{slug}\n", encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Backfill EcoSpecies citation enrichment.")
|
|
parser.add_argument("--slug", help="Limit the backfill to a single species slug.")
|
|
parser.add_argument("--username", default="citation-backfill", help="Audit username to record.")
|
|
parser.add_argument(
|
|
"--include-accepted",
|
|
action="store_true",
|
|
help="Also rerun accepted/editor-curated citations.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-species",
|
|
type=int,
|
|
default=0,
|
|
help="Stop after this many species with eligible citations. 0 means no limit.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-citations",
|
|
type=int,
|
|
default=0,
|
|
help="Stop after this many citations overall. 0 means no limit.",
|
|
)
|
|
parser.add_argument(
|
|
"--state-file",
|
|
help="Optional cursor file used to rotate scheduled runs through the species list.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
state_file = Path(args.state_file).expanduser() if args.state_file else None
|
|
species_items = (
|
|
[item for item in get_editor_species_list() if item["slug"] == args.slug]
|
|
if args.slug
|
|
else get_editor_species_list()
|
|
)
|
|
if not args.slug:
|
|
species_items = reorder_species_with_cursor(species_items, state_file)
|
|
|
|
if args.slug and not species_items:
|
|
print(f"Species not found: {args.slug}")
|
|
return 1
|
|
|
|
species_count = 0
|
|
citation_count = 0
|
|
changed_count = 0
|
|
resolved_count = 0
|
|
unresolved_count = 0
|
|
error_count = 0
|
|
last_seen_slug = ""
|
|
|
|
for species in species_items:
|
|
if args.max_species and species_count >= args.max_species:
|
|
break
|
|
slug = str(species["slug"])
|
|
last_seen_slug = slug
|
|
citation_payload = get_editor_species_citations(slug)
|
|
if citation_payload is None:
|
|
continue
|
|
|
|
eligible = [
|
|
citation
|
|
for citation in citation_payload["citations"]
|
|
if should_backfill(citation, include_accepted=args.include_accepted)
|
|
]
|
|
if not eligible:
|
|
continue
|
|
|
|
species_count += 1
|
|
print(f"[{slug}] backfilling {len(eligible)} citation(s)", flush=True)
|
|
|
|
for citation in eligible:
|
|
if args.max_citations and citation_count >= args.max_citations:
|
|
write_cursor(state_file, last_seen_slug)
|
|
print("citation limit reached; stopping early", flush=True)
|
|
print(
|
|
"summary:"
|
|
f" species={species_count}"
|
|
f" citations={citation_count}"
|
|
f" changed={changed_count}"
|
|
f" resolved={resolved_count}"
|
|
f" unresolved={unresolved_count}"
|
|
f" errors={error_count}",
|
|
flush=True,
|
|
)
|
|
return 0
|
|
citation_count += 1
|
|
result = update_species_citation_enrichment(
|
|
slug=slug,
|
|
citation_id=int(citation["id"]),
|
|
username=args.username,
|
|
)
|
|
if result is None:
|
|
print(f" - citation {citation['id']}: skipped (not found)", flush=True)
|
|
continue
|
|
|
|
changed_fields = result.get("changed_fields", {})
|
|
status = str(result["citation"].get("enrichment_status", "")).strip().lower()
|
|
if changed_fields:
|
|
changed_count += 1
|
|
if status == "resolved":
|
|
resolved_count += 1
|
|
elif status == "unresolved":
|
|
unresolved_count += 1
|
|
elif status == "error":
|
|
error_count += 1
|
|
print(
|
|
f" - citation {citation['id']}: {status or 'unknown'}"
|
|
+ (f" ({len(changed_fields)} field changes)" if changed_fields else "")
|
|
, flush=True)
|
|
|
|
write_cursor(state_file, last_seen_slug)
|
|
print(
|
|
"summary:"
|
|
f" species={species_count}"
|
|
f" citations={citation_count}"
|
|
f" changed={changed_count}"
|
|
f" resolved={resolved_count}"
|
|
f" unresolved={unresolved_count}"
|
|
f" errors={error_count}",
|
|
flush=True,
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|