258 lines
7.4 KiB
Python
258 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Group files in a directory by "capture time" and move them into
|
|
timestamped subdirectories.
|
|
|
|
"Capture time" is determined as:
|
|
1. EXIF DateTimeOriginal (if available; for image files)
|
|
2. Otherwise, filesystem modification time (mtime)
|
|
|
|
Subdirectory naming scheme:
|
|
YYYY_mdd-hhmmss-[camera tag]-grouped
|
|
|
|
Where:
|
|
- YYYY is the 4-digit year
|
|
- m is month as a hexadecimal digit: 1..9,a,b,c (Jan..Dec)
|
|
- dd is 2-digit day of month
|
|
- hhmmss is the time of the earliest file in the group (24h)
|
|
- [camera tag] is taken from the input directory name pattern:
|
|
YYYY_mdd-[camera tag]-<text>
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Optional
|
|
|
|
from PIL import Image
|
|
from PIL.ExifTags import TAGS
|
|
|
|
|
|
def month_to_hex_digit(month: int) -> str:
|
|
"""
|
|
Convert month number (1-12) to hex-like digit:
|
|
1..9, a, b, c
|
|
"""
|
|
if not (1 <= month <= 12):
|
|
raise ValueError(f"Month out of range: {month}")
|
|
if month <= 9:
|
|
return str(month)
|
|
return "abc"[month - 10]
|
|
|
|
|
|
def parse_camera_tag_from_dirname(dirname: str) -> str:
|
|
"""
|
|
Extract camera tag from a directory name of the form:
|
|
YYYY_mdd-[camera tag]-<text>
|
|
|
|
Returns 'unknown' if the pattern doesn't match.
|
|
"""
|
|
# Example: 2025_40f-d600-trip-to-park
|
|
# Pattern: ^YYYY_mdd-[camera_tag]-...
|
|
pattern = r"^\d{4}_[0-9a-cA-C]\d{2}-([^-]+)"
|
|
m = re.match(pattern, dirname)
|
|
if m:
|
|
return m.group(1)
|
|
return "unknown"
|
|
|
|
|
|
def get_exif_datetime_original(path: Path) -> Optional[datetime]:
|
|
"""
|
|
Try to read EXIF DateTimeOriginal from an image.
|
|
|
|
Returns a datetime object if found and parsable, otherwise None.
|
|
"""
|
|
try:
|
|
with Image.open(path) as img:
|
|
exif = img._getexif()
|
|
if not exif:
|
|
return None
|
|
|
|
# Build tag-name -> value dict
|
|
exif_data = {}
|
|
for tag_id, value in exif.items():
|
|
tag_name = TAGS.get(tag_id, tag_id)
|
|
exif_data[tag_name] = value
|
|
|
|
dto = exif_data.get("DateTimeOriginal")
|
|
if not dto:
|
|
return None
|
|
|
|
# Typical format: "YYYY:MM:DD HH:MM:SS"
|
|
try:
|
|
return datetime.strptime(dto, "%Y:%m:%d %H:%M:%S")
|
|
except (ValueError, TypeError):
|
|
return None
|
|
except Exception:
|
|
# Any error (non-image, corrupt file, etc.) -> no EXIF time
|
|
return None
|
|
|
|
|
|
def get_logical_timestamp(path: Path) -> float:
|
|
"""
|
|
Get the best available "capture time" for the file as a POSIX timestamp.
|
|
|
|
Preference:
|
|
1. EXIF DateTimeOriginal (if available)
|
|
2. else filesystem modification time (mtime)
|
|
"""
|
|
exif_dt = get_exif_datetime_original(path)
|
|
if exif_dt is not None:
|
|
return exif_dt.timestamp()
|
|
|
|
# Fallback: mtime
|
|
return os.path.getmtime(path)
|
|
|
|
|
|
def group_files_by_logical_time(
|
|
files: List[Path], threshold_minutes: float
|
|
) -> List[List[Path]]:
|
|
"""
|
|
Given a list of file Paths, group them by "logical time"
|
|
(EXIF DateTimeOriginal or mtime) so that consecutive files
|
|
within a group are no more than threshold_minutes apart.
|
|
A new group starts when the gap between successive files
|
|
exceeds the threshold.
|
|
"""
|
|
if not files:
|
|
return []
|
|
|
|
files_with_times: List[Tuple[Path, float]] = [
|
|
(f, get_logical_timestamp(f)) for f in files
|
|
]
|
|
files_with_times.sort(key=lambda x: x[1])
|
|
|
|
groups: List[List[Path]] = []
|
|
current_group: List[Path] = [files_with_times[0][0]]
|
|
last_time = files_with_times[0][1]
|
|
|
|
for f, t in files_with_times[1:]:
|
|
diff_minutes = (t - last_time) / 60.0
|
|
if diff_minutes > threshold_minutes:
|
|
# Start a new group
|
|
groups.append(current_group)
|
|
current_group = [f]
|
|
else:
|
|
current_group.append(f)
|
|
last_time = t
|
|
|
|
if current_group:
|
|
groups.append(current_group)
|
|
|
|
return groups
|
|
|
|
|
|
def build_group_dir_name(
|
|
earliest_ts: float,
|
|
camera_tag: str,
|
|
) -> str:
|
|
"""
|
|
Build directory name:
|
|
YYYY_mdd-hhmmss-[camera tag]-grouped
|
|
where m is hex month (1..9,a,b,c)
|
|
"""
|
|
dt = datetime.fromtimestamp(earliest_ts)
|
|
year = dt.year
|
|
month_hex = month_to_hex_digit(dt.month)
|
|
day = dt.day
|
|
date_code = f"{year}_{month_hex}{day:02d}"
|
|
time_code = dt.strftime("%H%M%S")
|
|
return f"{date_code}-{time_code}-{camera_tag}-grouped"
|
|
|
|
|
|
def ensure_unique_dir(path: Path) -> Path:
|
|
"""
|
|
If 'path' exists, append _2, _3, ... until we find
|
|
a non-existing path. Returns the chosen path.
|
|
"""
|
|
if not path.exists():
|
|
return path
|
|
base = path.stem
|
|
suffix = path.suffix # usually empty for dirs
|
|
parent = path.parent
|
|
counter = 2
|
|
while True:
|
|
candidate = parent / f"{base}_{counter}{suffix}"
|
|
if not candidate.exists():
|
|
return candidate
|
|
counter += 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Group files in INPUT_DIR by capture time (EXIF DateTimeOriginal or "
|
|
"mtime) and move each group into a new subdirectory in OUTPUT_DIR."
|
|
)
|
|
)
|
|
parser.add_argument("input_dir", help="Input directory containing files")
|
|
parser.add_argument("output_dir", help="Output directory for grouped subdirs")
|
|
parser.add_argument(
|
|
"--threshold-minutes",
|
|
"-t",
|
|
type=float,
|
|
default=60.0,
|
|
help="Time gap threshold in minutes between groups (default: 60)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be done without moving files",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
input_dir = Path(args.input_dir).expanduser().resolve()
|
|
output_dir = Path(args.output_dir).expanduser().resolve()
|
|
threshold_minutes = args.threshold_minutes
|
|
dry_run = args.dry_run
|
|
|
|
if not input_dir.is_dir():
|
|
raise SystemExit(f"Input directory does not exist or is not a directory: {input_dir}")
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Extract camera tag from input directory name
|
|
camera_tag = parse_camera_tag_from_dirname(input_dir.name)
|
|
|
|
# Collect files (non-recursive; adjust if you want recursion)
|
|
files = [p for p in input_dir.iterdir() if p.is_file()]
|
|
|
|
if not files:
|
|
print(f"No files found in {input_dir}")
|
|
return
|
|
|
|
groups = group_files_by_logical_time(files, threshold_minutes)
|
|
|
|
print(
|
|
f"Found {len(files)} files in {input_dir}, "
|
|
f"grouped into {len(groups)} group(s) using threshold "
|
|
f"{threshold_minutes} minute(s)."
|
|
)
|
|
|
|
for idx, group in enumerate(groups, start=1):
|
|
# Earliest logical time in the group
|
|
earliest_ts = min(get_logical_timestamp(f) for f in group)
|
|
group_dir_name = build_group_dir_name(earliest_ts, camera_tag)
|
|
group_dir = ensure_unique_dir(output_dir / group_dir_name)
|
|
|
|
print(f"\nGroup {idx}: {len(group)} file(s)")
|
|
print(f" Target directory: {group_dir}")
|
|
|
|
if dry_run:
|
|
for f in group:
|
|
print(f" (dry-run) would move: {f.name}")
|
|
else:
|
|
group_dir.mkdir(parents=True, exist_ok=False)
|
|
for f in group:
|
|
dest = group_dir / f.name
|
|
print(f" Moving: {f.name} -> {dest}")
|
|
shutil.move(str(f), str(dest))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|