481 lines
14 KiB
Python
481 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
msauto_derivative.py
|
|
|
|
Across a directory tree, find groups of "related" files that share a common
|
|
basename prefix (original + variants like _tn, _ws, etc.). For each group:
|
|
|
|
- Determine an "original" image file using:
|
|
* File type priority (RAW > standard image > other),
|
|
* EXIF DateTimeOriginal (if present) or filesystem mtime,
|
|
* and a slight penalty for stems that look like variant names.
|
|
|
|
- Run an ImageMagick command on the original to produce a full-image
|
|
derivative with color correction, sigmoidal contrast, and light sharpening.
|
|
|
|
- Save the result as:
|
|
<group-base>-msauto.jpg
|
|
in the same directory as the chosen original, unless that file already exists.
|
|
|
|
The "group base" is the stem with variant suffix markers removed
|
|
(e.g., P1040123 from P1040123_ws-2048, P1040123_tn, etc.).
|
|
|
|
Requires:
|
|
- Python 3.8+
|
|
- Pillow (PIL)
|
|
- ImageMagick ("magick" or "convert" on PATH)
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import subprocess
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
|
from PIL import Image
|
|
from PIL.ExifTags import TAGS
|
|
|
|
# Multi-processing
|
|
from functools import partial
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
|
|
# Common extensions
|
|
RAW_EXTS = {
|
|
".nef", ".cr2", ".cr3", ".raf", ".arw", ".dng",
|
|
".orf", ".rw2", ".pef", ".sr2", ".srf", ".kdc", ".erf", ".mrw",
|
|
}
|
|
IMAGE_EXTS = {
|
|
".jpg", ".jpeg", ".tif", ".tiff", ".png", ".heic", ".heif",
|
|
".webp", ".bmp",
|
|
}
|
|
# video/audio present in tree but we don't generate derivatives from them
|
|
VIDEO_EXTS = {
|
|
".mp4", ".mov", ".avi", ".mkv", ".mts", ".m2ts", ".wmv",
|
|
}
|
|
AUDIO_EXTS = {
|
|
".wav", ".flac", ".mp3", ".aac", ".m4a",
|
|
}
|
|
|
|
|
|
# --------------------------- EXIF + mtime logic -----------------------------
|
|
|
|
def get_exif_datetime_original(path: Path) -> Optional[datetime]:
|
|
"""
|
|
Try to read EXIF DateTimeOriginal from an image.
|
|
|
|
Returns a datetime object if found and parsable; otherwise None.
|
|
"""
|
|
try:
|
|
with Image.open(path) as img:
|
|
exif = img._getexif()
|
|
if not exif:
|
|
return None
|
|
|
|
exif_data = {TAGS.get(tag_id, tag_id): value for tag_id, value in exif.items()}
|
|
dto = exif_data.get("DateTimeOriginal")
|
|
if not dto:
|
|
return None
|
|
|
|
# Typical EXIF datetime format: "YYYY:MM:DD HH:MM:SS"
|
|
try:
|
|
return datetime.strptime(dto, "%Y:%m:%d %H:%M:%S")
|
|
except (ValueError, TypeError):
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_logical_timestamp(path: Path) -> float:
|
|
"""
|
|
Get the best available "capture" timestamp for the file as a POSIX float.
|
|
|
|
Preference:
|
|
1. EXIF DateTimeOriginal (if available);
|
|
2. Else filesystem modification time (mtime).
|
|
"""
|
|
exif_dt = get_exif_datetime_original(path)
|
|
if exif_dt is not None:
|
|
return exif_dt.timestamp()
|
|
return os.path.getmtime(path)
|
|
|
|
|
|
# --------------------------- Grouping logic ---------------------------------
|
|
|
|
def derive_group_key(stem: str, variant_markers: List[str]) -> str:
|
|
"""
|
|
Given a stem (basename without extension) and a list of variant markers
|
|
like ["_tn", "_ws"], derive a group key that is the shared "base" prefix.
|
|
|
|
We look for the earliest occurrence (by index) of any marker; everything
|
|
BEFORE that index is considered the group base.
|
|
|
|
Examples:
|
|
stem = "P1040123" -> "P1040123"
|
|
stem = "P1040123_tn" -> "P1040123"
|
|
stem = "P1040123_ws-2048" -> "P1040123"
|
|
stem = "DSC0001-preview" -> "DSC0001" if "-preview" is in markers
|
|
"""
|
|
s_lower = stem.lower()
|
|
positions = []
|
|
|
|
for marker in variant_markers:
|
|
m_lower = marker.lower()
|
|
idx = s_lower.find(m_lower)
|
|
if idx != -1:
|
|
positions.append(idx)
|
|
|
|
if not positions:
|
|
return stem
|
|
|
|
cutoff = min(positions)
|
|
if cutoff <= 0:
|
|
return stem
|
|
return stem[:cutoff]
|
|
|
|
|
|
def collect_groups(
|
|
root: Path,
|
|
variant_markers: List[str],
|
|
include_video: bool = False,
|
|
include_audio: bool = False,
|
|
) -> Dict[str, List[Path]]:
|
|
"""
|
|
Walk the directory tree under 'root', grouping files by derived group key.
|
|
|
|
Files whose stem ends with "-msauto" are ignored so that re-running the
|
|
script won't treat existing derivatives as new inputs.
|
|
|
|
Returns:
|
|
{group_key: [Path, Path, ...], ...}
|
|
"""
|
|
groups: Dict[str, List[Path]] = {}
|
|
|
|
for path in root.rglob("*"):
|
|
if not path.is_file():
|
|
continue
|
|
|
|
stem = path.stem
|
|
|
|
# Ignore any existing msauto derivatives
|
|
if stem.lower().endswith("-msauto"):
|
|
continue
|
|
|
|
ext = path.suffix.lower()
|
|
# We allow all file types into groups, but you'll typically care about
|
|
# RAW + image files as candidates for the "original".
|
|
if (
|
|
ext not in RAW_EXTS
|
|
and ext not in IMAGE_EXTS
|
|
and not (include_video and ext in VIDEO_EXTS)
|
|
and not (include_audio and ext in AUDIO_EXTS)
|
|
):
|
|
# Non-media junk (txt, sidecar, etc.) — skip from grouping.
|
|
# If you *do* want sidecars in a group, remove this filter.
|
|
continue
|
|
|
|
key = derive_group_key(stem, variant_markers)
|
|
groups.setdefault(key, []).append(path)
|
|
|
|
return groups
|
|
|
|
|
|
# --------------------------- Original selection -----------------------------
|
|
|
|
def rank_candidate(
|
|
path: Path,
|
|
variant_markers: List[str],
|
|
) -> Tuple[int, int, float]:
|
|
"""
|
|
Produce a sort key for selecting the "original" from a set of related files.
|
|
|
|
Rank tuple:
|
|
(type_rank, variant_penalty, logical_timestamp)
|
|
|
|
where smaller is better:
|
|
- type_rank: RAW (0) < other image (1) < everything else (2)
|
|
- variant_penalty: 0 if stem doesn't look like a variant,
|
|
1 if stem includes a variant marker
|
|
- logical_timestamp: earlier capture time preferred
|
|
"""
|
|
ext = path.suffix.lower()
|
|
if ext in RAW_EXTS:
|
|
type_rank = 0
|
|
elif ext in IMAGE_EXTS:
|
|
type_rank = 1
|
|
else:
|
|
type_rank = 2
|
|
|
|
s_lower = path.stem.lower()
|
|
variant_penalty = 0
|
|
for marker in variant_markers:
|
|
if marker.lower() in s_lower:
|
|
variant_penalty = 1
|
|
break
|
|
|
|
ts = get_logical_timestamp(path)
|
|
return (type_rank, variant_penalty, ts)
|
|
|
|
|
|
def choose_original(
|
|
group_paths: List[Path],
|
|
variant_markers: List[str],
|
|
) -> Optional[Path]:
|
|
"""
|
|
Choose the most likely "original" in a group using rank_candidate.
|
|
"""
|
|
if not group_paths:
|
|
return None
|
|
return min(group_paths, key=lambda p: rank_candidate(p, variant_markers))
|
|
|
|
|
|
# --------------------------- ImageMagick call -------------------------------
|
|
|
|
def build_imagemagick_cmd(
|
|
magick_binary: str,
|
|
input_path: Path,
|
|
output_path: Path,
|
|
) -> List[str]:
|
|
"""
|
|
Build the ImageMagick command to generate the msauto derivative.
|
|
|
|
This pipeline:
|
|
- normalizes levels/gamma,
|
|
- applies sigmoidal contrast,
|
|
- adds light sharpening,
|
|
- writes JPEG output.
|
|
|
|
You can tweak these flags as desired.
|
|
"""
|
|
return [
|
|
magick_binary,
|
|
str(input_path),
|
|
# Basic color / tone normalization:
|
|
"-auto-level",
|
|
#"-auto-gamma",
|
|
# Sigmoidal contrast (moderate):
|
|
"-sigmoidal-contrast", "2.0,0.5",
|
|
# Light sharpening:
|
|
"-unsharp", "0x1+0.5+0.02",
|
|
# Output JPEG:
|
|
str(output_path),
|
|
]
|
|
|
|
|
|
def ensure_imagemagick_available(magick_binary: str) -> None:
|
|
"""
|
|
Quick check that the ImageMagick binary is runnable.
|
|
"""
|
|
try:
|
|
subprocess.run(
|
|
[magick_binary, "-version"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=True,
|
|
)
|
|
except Exception as e:
|
|
raise SystemExit(
|
|
f"Error: Unable to run ImageMagick binary '{magick_binary}'. "
|
|
f"Ensure it is installed and on PATH. Details: {e}"
|
|
)
|
|
|
|
|
|
# --------------------------- Main routine -----------------------------------
|
|
|
|
|
|
def run_convert_job(cmd: List[str], dry_run: bool) -> Tuple[str, bool, str]:
|
|
"""
|
|
Worker function executed in parallel.
|
|
Returns (job_id, success, message).
|
|
"""
|
|
job_id = cmd[-1] # output file path as an identifier
|
|
|
|
if dry_run:
|
|
return (job_id, True, "[DRY-RUN] Skipped execution.")
|
|
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
return (job_id, True, "OK")
|
|
except subprocess.CalledProcessError as e:
|
|
return (job_id, False, f"Convert failed: {e}")
|
|
|
|
|
|
def process_tree(
|
|
root: Path,
|
|
magick_binary: str,
|
|
variant_markers: List[str],
|
|
dry_run: bool = False,
|
|
overwrite: bool = False,
|
|
threads: int = 2,
|
|
) -> None:
|
|
|
|
groups = collect_groups(root, variant_markers)
|
|
total_groups = len(groups)
|
|
total_files = sum(len(v) for v in groups.values())
|
|
|
|
print(f"Found {total_files} files across {total_groups} groups under {root}")
|
|
|
|
# Build the job list (cmds only)
|
|
jobs = []
|
|
|
|
for group_base, paths in sorted(groups.items(), key=lambda kv: kv[0]):
|
|
original = choose_original(paths, variant_markers)
|
|
if original is None:
|
|
continue
|
|
|
|
out_name = f"{group_base}-msauto.jpg"
|
|
output_path = original.parent / out_name
|
|
|
|
if output_path.exists() and not overwrite:
|
|
print(f"[skip] {output_path} exists.")
|
|
continue
|
|
|
|
cmd = build_imagemagick_cmd(magick_binary, original, output_path)
|
|
jobs.append(cmd)
|
|
|
|
print(f"\nPrepared {len(jobs)} convert jobs.")
|
|
if dry_run:
|
|
print("Dry-run mode: jobs will not be executed.\n")
|
|
|
|
# Run a multi-worker pool
|
|
print(f"Running with {threads} parallel workers.\n")
|
|
|
|
worker = partial(run_convert_job, dry_run=dry_run)
|
|
|
|
with ProcessPoolExecutor(max_workers=threads) as exe:
|
|
futures = {exe.submit(worker, cmd): cmd for cmd in jobs}
|
|
|
|
for fut in as_completed(futures):
|
|
cmd = futures[fut]
|
|
out_file = cmd[-1]
|
|
|
|
try:
|
|
job_id, ok, msg = fut.result()
|
|
except Exception as e:
|
|
print(f"[ERROR] Job for {out_file} crashed: {e}")
|
|
continue
|
|
|
|
if ok:
|
|
print(f"[OK] {out_file}: {msg}")
|
|
else:
|
|
print(f"[FAIL] {out_file}: {msg}")
|
|
|
|
|
|
def process_tree_1(
|
|
root: Path,
|
|
magick_binary: str,
|
|
variant_markers: List[str],
|
|
dry_run: bool = False,
|
|
overwrite: bool = False,
|
|
) -> None:
|
|
groups = collect_groups(root, variant_markers)
|
|
total_groups = len(groups)
|
|
total_files = sum(len(v) for v in groups.values())
|
|
|
|
print(f"Found {total_files} files across {total_groups} groups under {root}")
|
|
|
|
for group_base, paths in sorted(groups.items(), key=lambda kv: kv[0]):
|
|
original = choose_original(paths, variant_markers)
|
|
if original is None:
|
|
continue
|
|
|
|
original_ts = datetime.fromtimestamp(get_logical_timestamp(original)).isoformat(sep=" ")
|
|
print(f"\nGroup '{group_base}': {len(paths)} file(s)")
|
|
print(f" Original: {original} (logical time: {original_ts})")
|
|
|
|
# Derivative name: <group-base>-msauto.jpg, in original's directory
|
|
out_name = f"{group_base}-msauto.jpg"
|
|
output_path = original.parent / out_name
|
|
|
|
if output_path.exists() and not overwrite:
|
|
print(f" Skipping: {output_path} already exists (use --overwrite to replace).")
|
|
continue
|
|
|
|
cmd = build_imagemagick_cmd(magick_binary, original, output_path)
|
|
print(f" Command: {' '.join(cmd)}")
|
|
|
|
if dry_run:
|
|
print(" [DRY-RUN] Not executing ImageMagick.")
|
|
continue
|
|
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
print(f" Created: {output_path}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f" ERROR running ImageMagick for {original}: {e}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Across a directory tree, identify groups of related image files by "
|
|
"basename (original + variants like _tn, _ws, etc.), determine the "
|
|
"most likely 'original' using EXIF DateTimeOriginal + mtime logic, "
|
|
"then create a full-image ImageMagick-derived JPEG named "
|
|
"<basename>-msauto.jpg for each group."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"root_dir",
|
|
help="Root directory to scan recursively for image groups.",
|
|
)
|
|
parser.add_argument(
|
|
"--magick-binary",
|
|
default="magick",
|
|
help="ImageMagick executable to use (default: 'magick'; e.g. 'convert' on older installs).",
|
|
)
|
|
parser.add_argument(
|
|
"--variant-markers",
|
|
nargs="*",
|
|
default=["_tn", "_ws"],
|
|
help=(
|
|
"Variant markers used to detect thumbnails/websize variants in stems. "
|
|
"Default: _tn _ws"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be done without running ImageMagick.",
|
|
)
|
|
parser.add_argument(
|
|
"--overwrite",
|
|
action="store_true",
|
|
help="Overwrite existing <basename>-msauto.jpg files if they already exist.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--threads",
|
|
type=int,
|
|
default=3,
|
|
help="Number of parallel ImageMagick conversions to run (default: 2).",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
root = Path(args.root_dir).expanduser().resolve()
|
|
if not root.is_dir():
|
|
raise SystemExit(f"Not a directory: {root}")
|
|
|
|
ensure_imagemagick_available(args.magick_binary)
|
|
|
|
print(f"Scanning: {root}")
|
|
print(f"Using ImageMagick binary: {args.magick_binary}")
|
|
print(f"Variant markers: {args.variant_markers}")
|
|
if args.dry_run:
|
|
print("[DRY-RUN MODE] No files will be written.\n")
|
|
|
|
process_tree(
|
|
root=root,
|
|
magick_binary=args.magick_binary,
|
|
variant_markers=args.variant_markers,
|
|
dry_run=args.dry_run,
|
|
overwrite=args.overwrite,
|
|
threads=args.threads,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|