#!/usr/bin/env python3 from __future__ import annotations import argparse from pathlib import Path from ecospecies_api.repository import ( get_editor_species_citations, get_editor_species_list, update_species_citation_enrichment, ) def should_backfill(citation: dict[str, object], include_accepted: bool) -> bool: review_status = str(citation.get("review_status", "")).strip().lower() source_type = str(citation.get("source_type", "")).strip().lower() enrichment_status = str(citation.get("enrichment_status", "")).strip().lower() normalized_text = str(citation.get("normalized_text", "")).strip() abstract_text = str(citation.get("abstract_text", "")).strip() if not include_accepted and review_status == "accepted": return False if source_type in {"editor_selected_candidate", "editor_added_candidate"} and not include_accepted: return False return ( source_type in {"document_extract", "editor_review", ""} or enrichment_status in {"pending", "unresolved", "error", ""} or not normalized_text or not abstract_text ) def reorder_species_with_cursor( species_items: list[dict[str, object]], state_file: Path | None, ) -> list[dict[str, object]]: if not state_file or not species_items: return species_items try: last_slug = state_file.read_text(encoding="utf-8").strip() except FileNotFoundError: return species_items if not last_slug: return species_items for index, item in enumerate(species_items): if str(item.get("slug", "")).strip() == last_slug: return species_items[index + 1 :] + species_items[: index + 1] return species_items def write_cursor(state_file: Path | None, slug: str) -> None: if not state_file or not slug: return state_file.parent.mkdir(parents=True, exist_ok=True) state_file.write_text(f"{slug}\n", encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser(description="Backfill EcoSpecies citation enrichment.") parser.add_argument("--slug", help="Limit the backfill to a single species slug.") parser.add_argument("--username", default="citation-backfill", help="Audit username to record.") parser.add_argument( "--include-accepted", action="store_true", help="Also rerun accepted/editor-curated citations.", ) parser.add_argument( "--max-species", type=int, default=0, help="Stop after this many species with eligible citations. 0 means no limit.", ) parser.add_argument( "--max-citations", type=int, default=0, help="Stop after this many citations overall. 0 means no limit.", ) parser.add_argument( "--state-file", help="Optional cursor file used to rotate scheduled runs through the species list.", ) args = parser.parse_args() state_file = Path(args.state_file).expanduser() if args.state_file else None species_items = ( [item for item in get_editor_species_list() if item["slug"] == args.slug] if args.slug else get_editor_species_list() ) if not args.slug: species_items = reorder_species_with_cursor(species_items, state_file) if args.slug and not species_items: print(f"Species not found: {args.slug}") return 1 species_count = 0 citation_count = 0 changed_count = 0 resolved_count = 0 unresolved_count = 0 error_count = 0 last_seen_slug = "" for species in species_items: if args.max_species and species_count >= args.max_species: break slug = str(species["slug"]) last_seen_slug = slug citation_payload = get_editor_species_citations(slug) if citation_payload is None: continue eligible = [ citation for citation in citation_payload["citations"] if should_backfill(citation, include_accepted=args.include_accepted) ] if not eligible: continue species_count += 1 print(f"[{slug}] backfilling {len(eligible)} citation(s)", flush=True) for citation in eligible: if args.max_citations and citation_count >= args.max_citations: write_cursor(state_file, last_seen_slug) print("citation limit reached; stopping early", flush=True) print( "summary:" f" species={species_count}" f" citations={citation_count}" f" changed={changed_count}" f" resolved={resolved_count}" f" unresolved={unresolved_count}" f" errors={error_count}", flush=True, ) return 0 citation_count += 1 result = update_species_citation_enrichment( slug=slug, citation_id=int(citation["id"]), username=args.username, ) if result is None: print(f" - citation {citation['id']}: skipped (not found)", flush=True) continue changed_fields = result.get("changed_fields", {}) status = str(result["citation"].get("enrichment_status", "")).strip().lower() if changed_fields: changed_count += 1 if status == "resolved": resolved_count += 1 elif status == "unresolved": unresolved_count += 1 elif status == "error": error_count += 1 print( f" - citation {citation['id']}: {status or 'unknown'}" + (f" ({len(changed_fields)} field changes)" if changed_fields else "") , flush=True) write_cursor(state_file, last_seen_slug) print( "summary:" f" species={species_count}" f" citations={citation_count}" f" changed={changed_count}" f" resolved={resolved_count}" f" unresolved={unresolved_count}" f" errors={error_count}", flush=True, ) return 0 if __name__ == "__main__": raise SystemExit(main())