#!/usr/bin/env python3 """ msauto_derivative.py Across a directory tree, find groups of "related" files that share a common basename prefix (original + variants like _tn, _ws, etc.). For each group: - Determine an "original" image file using: * File type priority (RAW > standard image > other), * EXIF DateTimeOriginal (if present) or filesystem mtime, * and a slight penalty for stems that look like variant names. - Run an ImageMagick command on the original to produce a full-image derivative with color correction, sigmoidal contrast, and light sharpening. - Save the result as: -msauto.jpg in the same directory as the chosen original, unless that file already exists. The "group base" is the stem with variant suffix markers removed (e.g., P1040123 from P1040123_ws-2048, P1040123_tn, etc.). Requires: - Python 3.8+ - Pillow (PIL) - ImageMagick ("magick" or "convert" on PATH) """ import argparse import os import subprocess from datetime import datetime from pathlib import Path from typing import Dict, List, Tuple, Optional from PIL import Image from PIL.ExifTags import TAGS # Multi-processing from functools import partial from concurrent.futures import ProcessPoolExecutor, as_completed # Common extensions RAW_EXTS = { ".nef", ".cr2", ".cr3", ".raf", ".arw", ".dng", ".orf", ".rw2", ".pef", ".sr2", ".srf", ".kdc", ".erf", ".mrw", } IMAGE_EXTS = { ".jpg", ".jpeg", ".tif", ".tiff", ".png", ".heic", ".heif", ".webp", ".bmp", } # video/audio present in tree but we don't generate derivatives from them VIDEO_EXTS = { ".mp4", ".mov", ".avi", ".mkv", ".mts", ".m2ts", ".wmv", } AUDIO_EXTS = { ".wav", ".flac", ".mp3", ".aac", ".m4a", } # --------------------------- EXIF + mtime logic ----------------------------- def get_exif_datetime_original(path: Path) -> Optional[datetime]: """ Try to read EXIF DateTimeOriginal from an image. Returns a datetime object if found and parsable; otherwise None. """ try: with Image.open(path) as img: exif = img._getexif() if not exif: return None exif_data = {TAGS.get(tag_id, tag_id): value for tag_id, value in exif.items()} dto = exif_data.get("DateTimeOriginal") if not dto: return None # Typical EXIF datetime format: "YYYY:MM:DD HH:MM:SS" try: return datetime.strptime(dto, "%Y:%m:%d %H:%M:%S") except (ValueError, TypeError): return None except Exception: return None def get_logical_timestamp(path: Path) -> float: """ Get the best available "capture" timestamp for the file as a POSIX float. Preference: 1. EXIF DateTimeOriginal (if available); 2. Else filesystem modification time (mtime). """ exif_dt = get_exif_datetime_original(path) if exif_dt is not None: return exif_dt.timestamp() return os.path.getmtime(path) # --------------------------- Grouping logic --------------------------------- def derive_group_key(stem: str, variant_markers: List[str]) -> str: """ Given a stem (basename without extension) and a list of variant markers like ["_tn", "_ws"], derive a group key that is the shared "base" prefix. We look for the earliest occurrence (by index) of any marker; everything BEFORE that index is considered the group base. Examples: stem = "P1040123" -> "P1040123" stem = "P1040123_tn" -> "P1040123" stem = "P1040123_ws-2048" -> "P1040123" stem = "DSC0001-preview" -> "DSC0001" if "-preview" is in markers """ s_lower = stem.lower() positions = [] for marker in variant_markers: m_lower = marker.lower() idx = s_lower.find(m_lower) if idx != -1: positions.append(idx) if not positions: return stem cutoff = min(positions) if cutoff <= 0: return stem return stem[:cutoff] def collect_groups( root: Path, variant_markers: List[str], include_video: bool = False, include_audio: bool = False, ) -> Dict[str, List[Path]]: """ Walk the directory tree under 'root', grouping files by derived group key. Files whose stem ends with "-msauto" are ignored so that re-running the script won't treat existing derivatives as new inputs. Returns: {group_key: [Path, Path, ...], ...} """ groups: Dict[str, List[Path]] = {} for path in root.rglob("*"): if not path.is_file(): continue stem = path.stem # Ignore any existing msauto derivatives if stem.lower().endswith("-msauto"): continue ext = path.suffix.lower() # We allow all file types into groups, but you'll typically care about # RAW + image files as candidates for the "original". if ( ext not in RAW_EXTS and ext not in IMAGE_EXTS and not (include_video and ext in VIDEO_EXTS) and not (include_audio and ext in AUDIO_EXTS) ): # Non-media junk (txt, sidecar, etc.) — skip from grouping. # If you *do* want sidecars in a group, remove this filter. continue key = derive_group_key(stem, variant_markers) groups.setdefault(key, []).append(path) return groups # --------------------------- Original selection ----------------------------- def rank_candidate( path: Path, variant_markers: List[str], ) -> Tuple[int, int, float]: """ Produce a sort key for selecting the "original" from a set of related files. Rank tuple: (type_rank, variant_penalty, logical_timestamp) where smaller is better: - type_rank: RAW (0) < other image (1) < everything else (2) - variant_penalty: 0 if stem doesn't look like a variant, 1 if stem includes a variant marker - logical_timestamp: earlier capture time preferred """ ext = path.suffix.lower() if ext in RAW_EXTS: type_rank = 0 elif ext in IMAGE_EXTS: type_rank = 1 else: type_rank = 2 s_lower = path.stem.lower() variant_penalty = 0 for marker in variant_markers: if marker.lower() in s_lower: variant_penalty = 1 break ts = get_logical_timestamp(path) return (type_rank, variant_penalty, ts) def choose_original( group_paths: List[Path], variant_markers: List[str], ) -> Optional[Path]: """ Choose the most likely "original" in a group using rank_candidate. """ if not group_paths: return None return min(group_paths, key=lambda p: rank_candidate(p, variant_markers)) # --------------------------- ImageMagick call ------------------------------- def build_imagemagick_cmd( magick_binary: str, input_path: Path, output_path: Path, ) -> List[str]: """ Build the ImageMagick command to generate the msauto derivative. This pipeline: - normalizes levels/gamma, - applies sigmoidal contrast, - adds light sharpening, - writes JPEG output. You can tweak these flags as desired. """ return [ magick_binary, str(input_path), # Basic color / tone normalization: "-auto-level", #"-auto-gamma", # Sigmoidal contrast (moderate): "-sigmoidal-contrast", "2.0,0.5", # Light sharpening: "-unsharp", "0x1+0.5+0.02", # Output JPEG: str(output_path), ] def ensure_imagemagick_available(magick_binary: str) -> None: """ Quick check that the ImageMagick binary is runnable. """ try: subprocess.run( [magick_binary, "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) except Exception as e: raise SystemExit( f"Error: Unable to run ImageMagick binary '{magick_binary}'. " f"Ensure it is installed and on PATH. Details: {e}" ) # --------------------------- Main routine ----------------------------------- def run_convert_job(cmd: List[str], dry_run: bool) -> Tuple[str, bool, str]: """ Worker function executed in parallel. Returns (job_id, success, message). """ job_id = cmd[-1] # output file path as an identifier if dry_run: return (job_id, True, "[DRY-RUN] Skipped execution.") try: subprocess.run(cmd, check=True) return (job_id, True, "OK") except subprocess.CalledProcessError as e: return (job_id, False, f"Convert failed: {e}") def process_tree( root: Path, magick_binary: str, variant_markers: List[str], dry_run: bool = False, overwrite: bool = False, threads: int = 2, ) -> None: groups = collect_groups(root, variant_markers) total_groups = len(groups) total_files = sum(len(v) for v in groups.values()) print(f"Found {total_files} files across {total_groups} groups under {root}") # Build the job list (cmds only) jobs = [] for group_base, paths in sorted(groups.items(), key=lambda kv: kv[0]): original = choose_original(paths, variant_markers) if original is None: continue out_name = f"{group_base}-msauto.jpg" output_path = original.parent / out_name if output_path.exists() and not overwrite: print(f"[skip] {output_path} exists.") continue cmd = build_imagemagick_cmd(magick_binary, original, output_path) jobs.append(cmd) print(f"\nPrepared {len(jobs)} convert jobs.") if dry_run: print("Dry-run mode: jobs will not be executed.\n") # Run a multi-worker pool print(f"Running with {threads} parallel workers.\n") worker = partial(run_convert_job, dry_run=dry_run) with ProcessPoolExecutor(max_workers=threads) as exe: futures = {exe.submit(worker, cmd): cmd for cmd in jobs} for fut in as_completed(futures): cmd = futures[fut] out_file = cmd[-1] try: job_id, ok, msg = fut.result() except Exception as e: print(f"[ERROR] Job for {out_file} crashed: {e}") continue if ok: print(f"[OK] {out_file}: {msg}") else: print(f"[FAIL] {out_file}: {msg}") def process_tree_1( root: Path, magick_binary: str, variant_markers: List[str], dry_run: bool = False, overwrite: bool = False, ) -> None: groups = collect_groups(root, variant_markers) total_groups = len(groups) total_files = sum(len(v) for v in groups.values()) print(f"Found {total_files} files across {total_groups} groups under {root}") for group_base, paths in sorted(groups.items(), key=lambda kv: kv[0]): original = choose_original(paths, variant_markers) if original is None: continue original_ts = datetime.fromtimestamp(get_logical_timestamp(original)).isoformat(sep=" ") print(f"\nGroup '{group_base}': {len(paths)} file(s)") print(f" Original: {original} (logical time: {original_ts})") # Derivative name: -msauto.jpg, in original's directory out_name = f"{group_base}-msauto.jpg" output_path = original.parent / out_name if output_path.exists() and not overwrite: print(f" Skipping: {output_path} already exists (use --overwrite to replace).") continue cmd = build_imagemagick_cmd(magick_binary, original, output_path) print(f" Command: {' '.join(cmd)}") if dry_run: print(" [DRY-RUN] Not executing ImageMagick.") continue try: subprocess.run(cmd, check=True) print(f" Created: {output_path}") except subprocess.CalledProcessError as e: print(f" ERROR running ImageMagick for {original}: {e}") def main(): parser = argparse.ArgumentParser( description=( "Across a directory tree, identify groups of related image files by " "basename (original + variants like _tn, _ws, etc.), determine the " "most likely 'original' using EXIF DateTimeOriginal + mtime logic, " "then create a full-image ImageMagick-derived JPEG named " "-msauto.jpg for each group." ) ) parser.add_argument( "root_dir", help="Root directory to scan recursively for image groups.", ) parser.add_argument( "--magick-binary", default="magick", help="ImageMagick executable to use (default: 'magick'; e.g. 'convert' on older installs).", ) parser.add_argument( "--variant-markers", nargs="*", default=["_tn", "_ws"], help=( "Variant markers used to detect thumbnails/websize variants in stems. " "Default: _tn _ws" ), ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without running ImageMagick.", ) parser.add_argument( "--overwrite", action="store_true", help="Overwrite existing -msauto.jpg files if they already exist.", ) parser.add_argument( "--threads", type=int, default=3, help="Number of parallel ImageMagick conversions to run (default: 2).", ) args = parser.parse_args() root = Path(args.root_dir).expanduser().resolve() if not root.is_dir(): raise SystemExit(f"Not a directory: {root}") ensure_imagemagick_available(args.magick_binary) print(f"Scanning: {root}") print(f"Using ImageMagick binary: {args.magick_binary}") print(f"Variant markers: {args.variant_markers}") if args.dry_run: print("[DRY-RUN MODE] No files will be written.\n") process_tree( root=root, magick_binary=args.magick_binary, variant_markers=args.variant_markers, dry_run=args.dry_run, overwrite=args.overwrite, threads=args.threads, ) if __name__ == "__main__": main()