from __future__ import annotations import csv import hashlib import os from dataclasses import asdict, dataclass from mmap import ACCESS_READ, mmap from multiprocessing import Pool, cpu_count from typing import Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple from .filter import filter_walk_triplet @dataclass(frozen=True) class FileRecord: name: str relative_path: str extension: str size: int creation_date: float modified_date: float hash_value: str file_type: str number_of_files: int volume_name: str def hash_file_mmap(path: str, hash_type: str) -> str: h = hashlib.new(hash_type) with open(path, "rb") as f: with mmap(f.fileno(), 0, access=ACCESS_READ) as mm: h.update(mm) return h.hexdigest() def hash_file_chunked(path: str, hash_type: str, chunk_size: int = 1024 * 1024) -> str: h = hashlib.new(hash_type) with open(path, "rb") as f: while True: buf = f.read(chunk_size) if not buf: break h.update(buf) return h.hexdigest() def _compute_record(args: Tuple[str, str, str, str, str, int, bool]) -> Optional[FileRecord]: """ Worker for multiprocessing. args: path, root, directory, volume_name, hash_type, number_of_files, use_mmap """ path, root, directory, volume_name, hash_type, number_of_files, use_mmap = args try: base = os.path.basename(path) stem, ext = os.path.splitext(base) rel_root = os.path.relpath(root, directory) size = os.path.getsize(path) ctime = os.path.getctime(path) mtime = os.path.getmtime(path) if use_mmap: hv = hash_file_mmap(path, hash_type) else: hv = hash_file_chunked(path, hash_type) return FileRecord( name=stem, relative_path=rel_root, extension=ext, size=size, creation_date=ctime, modified_date=mtime, hash_value=hv, file_type=ext, number_of_files=number_of_files, volume_name=volume_name, ) except (OSError, PermissionError): # Optionally log these elsewhere; for now, skip unreadable entries. return None def iter_file_tasks( directory: str, volume_name: str, hash_type: str, excluded: Sequence[str] = (".git", ".svn"), use_mmap: bool = True, ) -> Iterator[Tuple[str, str, str, str, str, int, bool]]: """ Yield worker tasks from os.walk, pruning excluded directories. """ for root, dirs, files in os.walk(directory): root, filtered_dirs, filtered_files = filter_walk_triplet(list(excluded), root, dirs, files) dirs[:] = filtered_dirs # prune descent if not filtered_files: continue nfiles = len(filtered_files) for fname in filtered_files: path = os.path.join(root, fname) yield (path, root, directory, volume_name, hash_type, nfiles, use_mmap) def collect_file_attributes( volume_name: str, directory: str, hash_type: str = "sha256", excluded: Sequence[str] = (".git", ".svn"), processes: Optional[int] = None, reserve_cores: int = 0, chunksize: int = 32, use_mmap: bool = True, progress_cb: Optional[Callable[[int, Optional[int]], None]] = None, ) -> List[Dict[str, object]]: """ Scan directory, compute file metadata + hash, return list of dicts suitable for CSV/db. progress_cb(current, total) if provided; total may be None if not pre-counted. """ if processes is None: processes = max(cpu_count() - int(reserve_cores), 1) tasks = iter_file_tasks( directory=directory, volume_name=volume_name, hash_type=hash_type, excluded=excluded, use_mmap=use_mmap, ) out: List[Dict[str, object]] = [] completed = 0 with Pool(processes=processes) as pool: for rec in pool.imap(_compute_record, tasks, chunksize=chunksize): if rec is None: continue out.append(asdict(rec)) completed += 1 if progress_cb: progress_cb(completed, None) return out