MemorySharing/ImageEditing/msauto_derivative.py

481 lines
14 KiB
Python

#!/usr/bin/env python3
"""
msauto_derivative.py
Across a directory tree, find groups of "related" files that share a common
basename prefix (original + variants like _tn, _ws, etc.). For each group:
- Determine an "original" image file using:
* File type priority (RAW > standard image > other),
* EXIF DateTimeOriginal (if present) or filesystem mtime,
* and a slight penalty for stems that look like variant names.
- Run an ImageMagick command on the original to produce a full-image
derivative with color correction, sigmoidal contrast, and light sharpening.
- Save the result as:
<group-base>-msauto.jpg
in the same directory as the chosen original, unless that file already exists.
The "group base" is the stem with variant suffix markers removed
(e.g., P1040123 from P1040123_ws-2048, P1040123_tn, etc.).
Requires:
- Python 3.8+
- Pillow (PIL)
- ImageMagick ("magick" or "convert" on PATH)
"""
import argparse
import os
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from PIL import Image
from PIL.ExifTags import TAGS
# Multi-processing
from functools import partial
from concurrent.futures import ProcessPoolExecutor, as_completed
# Common extensions
RAW_EXTS = {
".nef", ".cr2", ".cr3", ".raf", ".arw", ".dng",
".orf", ".rw2", ".pef", ".sr2", ".srf", ".kdc", ".erf", ".mrw",
}
IMAGE_EXTS = {
".jpg", ".jpeg", ".tif", ".tiff", ".png", ".heic", ".heif",
".webp", ".bmp",
}
# video/audio present in tree but we don't generate derivatives from them
VIDEO_EXTS = {
".mp4", ".mov", ".avi", ".mkv", ".mts", ".m2ts", ".wmv",
}
AUDIO_EXTS = {
".wav", ".flac", ".mp3", ".aac", ".m4a",
}
# --------------------------- EXIF + mtime logic -----------------------------
def get_exif_datetime_original(path: Path) -> Optional[datetime]:
"""
Try to read EXIF DateTimeOriginal from an image.
Returns a datetime object if found and parsable; otherwise None.
"""
try:
with Image.open(path) as img:
exif = img._getexif()
if not exif:
return None
exif_data = {TAGS.get(tag_id, tag_id): value for tag_id, value in exif.items()}
dto = exif_data.get("DateTimeOriginal")
if not dto:
return None
# Typical EXIF datetime format: "YYYY:MM:DD HH:MM:SS"
try:
return datetime.strptime(dto, "%Y:%m:%d %H:%M:%S")
except (ValueError, TypeError):
return None
except Exception:
return None
def get_logical_timestamp(path: Path) -> float:
"""
Get the best available "capture" timestamp for the file as a POSIX float.
Preference:
1. EXIF DateTimeOriginal (if available);
2. Else filesystem modification time (mtime).
"""
exif_dt = get_exif_datetime_original(path)
if exif_dt is not None:
return exif_dt.timestamp()
return os.path.getmtime(path)
# --------------------------- Grouping logic ---------------------------------
def derive_group_key(stem: str, variant_markers: List[str]) -> str:
"""
Given a stem (basename without extension) and a list of variant markers
like ["_tn", "_ws"], derive a group key that is the shared "base" prefix.
We look for the earliest occurrence (by index) of any marker; everything
BEFORE that index is considered the group base.
Examples:
stem = "P1040123" -> "P1040123"
stem = "P1040123_tn" -> "P1040123"
stem = "P1040123_ws-2048" -> "P1040123"
stem = "DSC0001-preview" -> "DSC0001" if "-preview" is in markers
"""
s_lower = stem.lower()
positions = []
for marker in variant_markers:
m_lower = marker.lower()
idx = s_lower.find(m_lower)
if idx != -1:
positions.append(idx)
if not positions:
return stem
cutoff = min(positions)
if cutoff <= 0:
return stem
return stem[:cutoff]
def collect_groups(
root: Path,
variant_markers: List[str],
include_video: bool = False,
include_audio: bool = False,
) -> Dict[str, List[Path]]:
"""
Walk the directory tree under 'root', grouping files by derived group key.
Files whose stem ends with "-msauto" are ignored so that re-running the
script won't treat existing derivatives as new inputs.
Returns:
{group_key: [Path, Path, ...], ...}
"""
groups: Dict[str, List[Path]] = {}
for path in root.rglob("*"):
if not path.is_file():
continue
stem = path.stem
# Ignore any existing msauto derivatives
if stem.lower().endswith("-msauto"):
continue
ext = path.suffix.lower()
# We allow all file types into groups, but you'll typically care about
# RAW + image files as candidates for the "original".
if (
ext not in RAW_EXTS
and ext not in IMAGE_EXTS
and not (include_video and ext in VIDEO_EXTS)
and not (include_audio and ext in AUDIO_EXTS)
):
# Non-media junk (txt, sidecar, etc.) — skip from grouping.
# If you *do* want sidecars in a group, remove this filter.
continue
key = derive_group_key(stem, variant_markers)
groups.setdefault(key, []).append(path)
return groups
# --------------------------- Original selection -----------------------------
def rank_candidate(
path: Path,
variant_markers: List[str],
) -> Tuple[int, int, float]:
"""
Produce a sort key for selecting the "original" from a set of related files.
Rank tuple:
(type_rank, variant_penalty, logical_timestamp)
where smaller is better:
- type_rank: RAW (0) < other image (1) < everything else (2)
- variant_penalty: 0 if stem doesn't look like a variant,
1 if stem includes a variant marker
- logical_timestamp: earlier capture time preferred
"""
ext = path.suffix.lower()
if ext in RAW_EXTS:
type_rank = 0
elif ext in IMAGE_EXTS:
type_rank = 1
else:
type_rank = 2
s_lower = path.stem.lower()
variant_penalty = 0
for marker in variant_markers:
if marker.lower() in s_lower:
variant_penalty = 1
break
ts = get_logical_timestamp(path)
return (type_rank, variant_penalty, ts)
def choose_original(
group_paths: List[Path],
variant_markers: List[str],
) -> Optional[Path]:
"""
Choose the most likely "original" in a group using rank_candidate.
"""
if not group_paths:
return None
return min(group_paths, key=lambda p: rank_candidate(p, variant_markers))
# --------------------------- ImageMagick call -------------------------------
def build_imagemagick_cmd(
magick_binary: str,
input_path: Path,
output_path: Path,
) -> List[str]:
"""
Build the ImageMagick command to generate the msauto derivative.
This pipeline:
- normalizes levels/gamma,
- applies sigmoidal contrast,
- adds light sharpening,
- writes JPEG output.
You can tweak these flags as desired.
"""
return [
magick_binary,
str(input_path),
# Basic color / tone normalization:
"-auto-level",
#"-auto-gamma",
# Sigmoidal contrast (moderate):
"-sigmoidal-contrast", "2.0,0.5",
# Light sharpening:
"-unsharp", "0x1+0.5+0.02",
# Output JPEG:
str(output_path),
]
def ensure_imagemagick_available(magick_binary: str) -> None:
"""
Quick check that the ImageMagick binary is runnable.
"""
try:
subprocess.run(
[magick_binary, "-version"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
except Exception as e:
raise SystemExit(
f"Error: Unable to run ImageMagick binary '{magick_binary}'. "
f"Ensure it is installed and on PATH. Details: {e}"
)
# --------------------------- Main routine -----------------------------------
def run_convert_job(cmd: List[str], dry_run: bool) -> Tuple[str, bool, str]:
"""
Worker function executed in parallel.
Returns (job_id, success, message).
"""
job_id = cmd[-1] # output file path as an identifier
if dry_run:
return (job_id, True, "[DRY-RUN] Skipped execution.")
try:
subprocess.run(cmd, check=True)
return (job_id, True, "OK")
except subprocess.CalledProcessError as e:
return (job_id, False, f"Convert failed: {e}")
def process_tree(
root: Path,
magick_binary: str,
variant_markers: List[str],
dry_run: bool = False,
overwrite: bool = False,
threads: int = 2,
) -> None:
groups = collect_groups(root, variant_markers)
total_groups = len(groups)
total_files = sum(len(v) for v in groups.values())
print(f"Found {total_files} files across {total_groups} groups under {root}")
# Build the job list (cmds only)
jobs = []
for group_base, paths in sorted(groups.items(), key=lambda kv: kv[0]):
original = choose_original(paths, variant_markers)
if original is None:
continue
out_name = f"{group_base}-msauto.jpg"
output_path = original.parent / out_name
if output_path.exists() and not overwrite:
print(f"[skip] {output_path} exists.")
continue
cmd = build_imagemagick_cmd(magick_binary, original, output_path)
jobs.append(cmd)
print(f"\nPrepared {len(jobs)} convert jobs.")
if dry_run:
print("Dry-run mode: jobs will not be executed.\n")
# Run a multi-worker pool
print(f"Running with {threads} parallel workers.\n")
worker = partial(run_convert_job, dry_run=dry_run)
with ProcessPoolExecutor(max_workers=threads) as exe:
futures = {exe.submit(worker, cmd): cmd for cmd in jobs}
for fut in as_completed(futures):
cmd = futures[fut]
out_file = cmd[-1]
try:
job_id, ok, msg = fut.result()
except Exception as e:
print(f"[ERROR] Job for {out_file} crashed: {e}")
continue
if ok:
print(f"[OK] {out_file}: {msg}")
else:
print(f"[FAIL] {out_file}: {msg}")
def process_tree_1(
root: Path,
magick_binary: str,
variant_markers: List[str],
dry_run: bool = False,
overwrite: bool = False,
) -> None:
groups = collect_groups(root, variant_markers)
total_groups = len(groups)
total_files = sum(len(v) for v in groups.values())
print(f"Found {total_files} files across {total_groups} groups under {root}")
for group_base, paths in sorted(groups.items(), key=lambda kv: kv[0]):
original = choose_original(paths, variant_markers)
if original is None:
continue
original_ts = datetime.fromtimestamp(get_logical_timestamp(original)).isoformat(sep=" ")
print(f"\nGroup '{group_base}': {len(paths)} file(s)")
print(f" Original: {original} (logical time: {original_ts})")
# Derivative name: <group-base>-msauto.jpg, in original's directory
out_name = f"{group_base}-msauto.jpg"
output_path = original.parent / out_name
if output_path.exists() and not overwrite:
print(f" Skipping: {output_path} already exists (use --overwrite to replace).")
continue
cmd = build_imagemagick_cmd(magick_binary, original, output_path)
print(f" Command: {' '.join(cmd)}")
if dry_run:
print(" [DRY-RUN] Not executing ImageMagick.")
continue
try:
subprocess.run(cmd, check=True)
print(f" Created: {output_path}")
except subprocess.CalledProcessError as e:
print(f" ERROR running ImageMagick for {original}: {e}")
def main():
parser = argparse.ArgumentParser(
description=(
"Across a directory tree, identify groups of related image files by "
"basename (original + variants like _tn, _ws, etc.), determine the "
"most likely 'original' using EXIF DateTimeOriginal + mtime logic, "
"then create a full-image ImageMagick-derived JPEG named "
"<basename>-msauto.jpg for each group."
)
)
parser.add_argument(
"root_dir",
help="Root directory to scan recursively for image groups.",
)
parser.add_argument(
"--magick-binary",
default="magick",
help="ImageMagick executable to use (default: 'magick'; e.g. 'convert' on older installs).",
)
parser.add_argument(
"--variant-markers",
nargs="*",
default=["_tn", "_ws"],
help=(
"Variant markers used to detect thumbnails/websize variants in stems. "
"Default: _tn _ws"
),
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without running ImageMagick.",
)
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing <basename>-msauto.jpg files if they already exist.",
)
parser.add_argument(
"--threads",
type=int,
default=3,
help="Number of parallel ImageMagick conversions to run (default: 2).",
)
args = parser.parse_args()
root = Path(args.root_dir).expanduser().resolve()
if not root.is_dir():
raise SystemExit(f"Not a directory: {root}")
ensure_imagemagick_available(args.magick_binary)
print(f"Scanning: {root}")
print(f"Using ImageMagick binary: {args.magick_binary}")
print(f"Variant markers: {args.variant_markers}")
if args.dry_run:
print("[DRY-RUN MODE] No files will be written.\n")
process_tree(
root=root,
magick_binary=args.magick_binary,
variant_markers=args.variant_markers,
dry_run=args.dry_run,
overwrite=args.overwrite,
threads=args.threads,
)
if __name__ == "__main__":
main()