PyOrgPatcher/code/orgpatch.py

387 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
# orgpatch.py
"""
Replace parts of Org-mode files from the command line.
Supported targets:
1) Sections by exact heading text (e.g., "** My Section")
2) '#+NAME:'-labeled elements:
- Named blocks (#+BEGIN_... / #+END_...)
- Named tables (consecutive lines beginning with '|')
- Named paragraphs/lists immediately following '#+NAME:' (until blank line)
Also supports JSON "sync" to batch update/export:
- direction=in : update Org from external files
- direction=out : export from Org into external files (like tangle, but no noweb)
Usage:
List replaceable regions:
orgpatch.py list FILE.org
Replace a section body with stdin:
orgpatch.py replace FILE.org --section "** My Section" --stdin
Replace a named element with a file:
orgpatch.py replace FILE.org --name foo --from path.txt
Sync via JSON mapping (in: files → Org, out: Org → files):
orgpatch.py sync FILE.org --map mapping.json --direction in --backup
orgpatch.py sync FILE.org --map mapping.json --direction out --mkdirs
Mapping JSON format:
Either a top-level list:
[
{"name": "code-snippet", "file": "code.py"},
{"name": "mytable", "file": "table.org"},
{"section": "** My Section", "file": "section.txt"}
]
or an object with "entries": [ ...same as above... ]
"""
from __future__ import annotations
import argparse
import re
import sys
import json
import shutil
import os
from typing import List, Tuple, Optional, Dict, Any
import pathlib
Heading = Dict[str, Any]
NamedElem = Dict[str, Any]
HEADING_RE = re.compile(r'^(\*+)\s+(.*)\s*$')
NAME_RE = re.compile(r'^\s*#\+NAME:\s*(\S+)\s*$', re.IGNORECASE)
BEGIN_RE = re.compile(r'^\s*#\+BEGIN_([A-Z0-9_]+)\b.*$', re.IGNORECASE)
END_RE_FMT = r'^\s*#\+END_{kind}\b.*$'
TABLE_RE = re.compile(r'^\s*\|')
def read_lines(path: str) -> List[str]:
with open(path, 'r', encoding='utf-8') as f:
return f.readlines()
def write_lines(path: str, lines: List[str]) -> None:
with open(path, 'w', encoding='utf-8') as f:
f.writelines(lines)
def parse_headings(lines: List[str]) -> List[Heading]:
heads: List[Heading] = []
for i, line in enumerate(lines):
m = HEADING_RE.match(line)
if m:
level = len(m.group(1))
title = m.group(2)
heads.append({'i': i, 'level': level, 'title': title})
return heads
def section_bounds(lines: List[str], headings: List[Heading], exact_heading: str) -> Optional[Tuple[int, int]]:
exact_heading = exact_heading.rstrip('\n')
start_idx = None
level = None
for h in headings:
heading_line_text = "{} {}".format('*' * h['level'], h['title'])
if heading_line_text == exact_heading:
start_idx = h['i']
level = h['level']
break
if start_idx is None:
return None
body_start = start_idx + 1
for h in headings:
if h['i'] <= start_idx:
continue
if h['level'] <= level:
return (body_start, h['i'])
return (body_start, len(lines))
def _scan_named_paragraph(lines: List[str], j: int) -> Optional[Tuple[int, int]]:
n = len(lines)
if j >= n:
return None
if BEGIN_RE.match(lines[j]) or TABLE_RE.match(lines[j]):
return None
if lines[j].strip() == "":
return None
k = j + 1
while k < n and lines[k].strip() != "":
k += 1
return (j, k)
def parse_named_elements(lines: List[str]) -> List[NamedElem]:
named: List[NamedElem] = []
i = 0
n = len(lines)
while i < n:
m = NAME_RE.match(lines[i])
if not m:
i += 1
continue
name = m.group(1)
j = i + 1
while j < n and lines[j].strip() == '':
j += 1
if j >= n:
i += 1
continue
mbeg = BEGIN_RE.match(lines[j])
if mbeg:
kind = mbeg.group(1).upper()
end_re = re.compile(END_RE_FMT.format(kind=re.escape(kind)), re.IGNORECASE)
k = j + 1
while k < n and not end_re.match(lines[k]):
k += 1
if k >= n:
content_start = j + 1
content_end = n
named.append({
'type': 'block',
'name': name,
'kind': kind,
'begin_i': j,
'end_i': n - 1,
'content_start': content_start,
'content_end': content_end
})
i = n
continue
named.append({
'type': 'block',
'name': name,
'kind': kind,
'begin_i': j,
'end_i': k,
'content_start': j + 1,
'content_end': k
})
i = k + 1
continue
if TABLE_RE.match(lines[j]):
k = j
while k < n and TABLE_RE.match(lines[k]):
k += 1
named.append({
'type': 'table',
'name': name,
'start_i': j,
'end_i': k
})
i = k
continue
para_bounds = _scan_named_paragraph(lines, j)
if para_bounds is not None:
s, e = para_bounds
named.append({
'type': 'para',
'name': name,
'start_i': s,
'end_i': e
})
i = e
continue
i = j
return named
def list_targets(lines: List[str]) -> None:
heads = parse_headings(lines)
print("== Sections ==")
for h in heads:
print(f" L{h['i']+1:>4} | level {h['level']} | {('*'*h['level'])} {h['title']}")
print("\n== #+NAME elements ==")
named = parse_named_elements(lines)
for e in named:
if e['type'] == 'block':
beg = e['begin_i'] + 1
end = e['end_i'] + 1
print(f" name={e['name']} | block {e['kind']} | lines {beg}-{end} (content {e['content_start']+1}-{e['content_end']})")
elif e['type'] == 'table':
print(f" name={e['name']} | table | lines {e['start_i']+1}-{e['end_i']}")
else:
print(f" name={e['name']} | paragraph | lines {e['start_i']+1}-{e['end_i']}")
def load_replacement(from_file: Optional[str], use_stdin: bool) -> str:
if from_file and use_stdin:
raise SystemExit("Choose either --from FILE or --stdin, not both.")
if from_file:
with open(from_file, 'r', encoding='utf-8') as f:
return f.read()
if use_stdin:
return sys.stdin.read()
raise SystemExit("You must provide replacement text with --from FILE or --stdin.")
def replace_section(lines: List[str], heading_text: str, new_body: str) -> List[str]:
heads = parse_headings(lines)
bounds = section_bounds(lines, heads, heading_text)
if not bounds:
raise SystemExit(f"Section not found: {heading_text!r}")
start, end = bounds
new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_body.splitlines()]
return lines[:start] + new_lines + lines[end:]
def replace_named(lines: List[str], name: str, new_content: str) -> List[str]:
elems = parse_named_elements(lines)
target = next((e for e in elems if e['name'] == name), None)
if not target:
raise SystemExit(f"No '#+NAME: {name}' element found (supported: named blocks, tables, and paragraphs).")
if target['type'] == 'block':
c0, c1 = target['content_start'], target['content_end']
new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_content.splitlines()]
return lines[:c0] + new_lines + lines[c1:]
elif target['type'] == 'table':
s, e = target['start_i'], target['end_i']
new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_content.splitlines()]
for idx, ln in enumerate(new_lines, 1):
if ln.strip() and not ln.lstrip().startswith('|'):
print(f"Warning: replacement line {idx} for table '{name}' does not begin with '|'", file=sys.stderr)
return lines[:s] + new_lines + lines[e:]
else:
s, e = target['start_i'], target['end_i']
new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_content.splitlines()]
if not new_lines or new_lines[-1].strip() != "":
new_lines.append("\n")
return lines[:s] + new_lines + lines[e:]
# ===== Sync helpers =====
def extract_named_content(lines: List[str], name: str) -> str:
elems = parse_named_elements(lines)
e = next((x for x in elems if x['name'] == name), None)
if not e:
raise SystemExit(f"No '#+NAME: {name}' element found.")
if e['type'] == 'block':
c0, c1 = e['content_start'], e['content_end']
return ''.join(lines[c0:c1])
elif e['type'] == 'table':
s, eidx = e['start_i'], e['end_i']
return ''.join(lines[s:eidx])
else:
s, eidx = e['start_i'], e['end_i']
return ''.join(lines[s:eidx])
def extract_section_body(lines: List[str], heading_text: str) -> str:
heads = parse_headings(lines)
bounds = section_bounds(lines, heads, heading_text)
if not bounds:
raise SystemExit(f"Section not found: {heading_text!r}")
s, e = bounds
return ''.join(lines[s:e])
def sync_apply_in(lines: List[str], mapping: list) -> List[str]:
"""Apply updates from files into the Org according to mapping entries."""
for idx, entry in enumerate(mapping, 1):
if 'file' not in entry:
raise SystemExit(f"Mapping entry #{idx} is missing 'file' field.")
fpath = entry['file']
try:
with open(fpath, 'r', encoding='utf-8') as fh:
content = fh.read()
except FileNotFoundError:
raise SystemExit(f"Mapping entry #{idx}: source file not found: {fpath!r}")
if 'section' in entry:
lines = replace_section(lines, entry['section'], content)
elif 'name' in entry:
lines = replace_named(lines, entry['name'], content)
else:
raise SystemExit(f"Mapping entry #{idx} must have either 'name' or 'section'.")
return lines
def sync_apply_out(lines: List[str], mapping: list, mkdirs: bool = False, overwrite: bool = True) -> None:
"""Extract content from Org into files according to mapping entries (does not alter Org)."""
for idx, entry in enumerate(mapping, 1):
if 'file' not in entry:
raise SystemExit(f"Mapping entry #{idx} is missing 'file' field.")
fpath = entry['file']
if mkdirs:
pathlib.Path(fpath).parent.mkdir(parents=True, exist_ok=True)
if (not overwrite) and os.path.exists(fpath):
print(f"Skipping existing file (overwrite disabled): {fpath}", file=sys.stderr)
continue
if 'section' in entry:
text = extract_section_body(lines, entry['section'])
elif 'name' in entry:
text = extract_named_content(lines, entry['name'])
else:
raise SystemExit(f"Mapping entry #{idx} must have either 'name' or 'section'.")
with open(fpath, 'w', encoding='utf-8') as fh:
fh.write(text)
def load_mapping(path: str) -> list:
with open(path, 'r', encoding='utf-8') as fh:
data = json.load(fh)
if isinstance(data, dict) and 'entries' in data:
return data['entries']
if not isinstance(data, list):
raise SystemExit("Mapping JSON must be a list of entries or an object with an 'entries' list.")
return data
def main(argv: Optional[List[str]] = None) -> None:
p = argparse.ArgumentParser(description="Replace parts of Org files and sync via JSON map.")
sub = p.add_subparsers(dest='cmd', required=True)
p_list = sub.add_parser('list', help="List replaceable sections and #+NAME elements")
p_list.add_argument('file', help="Org file")
p_rep = sub.add_parser('replace', help="Replace a section body or a named element's contents")
p_rep.add_argument('file', help="Org file")
target = p_rep.add_mutually_exclusive_group(required=True)
target.add_argument('--section', help='Exact heading text to match (e.g., "** My Section")')
target.add_argument('--name', help="Name after '#+NAME:' to match")
src = p_rep.add_mutually_exclusive_group(required=True)
src.add_argument('--from', dest='from_file', metavar='FILE', help="Take replacement text from FILE")
src.add_argument('--stdin', action='store_true', help="Read replacement text from stdin")
p_rep.add_argument('--dry-run', action='store_true', help="Show the result to stdout without writing the file")
p_rep.add_argument('--backup', action='store_true', help="Write FILE.org.bak before modifying")
p_sync = sub.add_parser('sync', help="Sync blocks/sections using a JSON mapping (in: files → Org, out: Org → files)")
p_sync.add_argument('file', help='Org file')
p_sync.add_argument('--map', required=True, help='Path to JSON mapping file')
p_sync.add_argument('--direction', choices=['in','out'], required=True, help="'in' updates Org from files; 'out' exports from Org to files")
p_sync.add_argument('--dry-run', action='store_true', help='Preview Org result (direction=in) without writing file')
p_sync.add_argument('--backup', action='store_true', help='Write FILE.org.bak before modifying (direction=in only)')
p_sync.add_argument('--mkdirs', action='store_true', help='Create parent directories when exporting (direction=out)')
p_sync.add_argument('--no-overwrite', action='store_true', help='When exporting, do not overwrite existing files')
args = p.parse_args(argv)
lines = read_lines(args.file)
if args.cmd == 'list':
list_targets(lines)
return
if args.cmd == 'sync':
mapping = load_mapping(args.map)
if args.direction == 'in':
new_lines = sync_apply_in(lines, mapping)
if args.dry_run:
sys.stdout.write(''.join(new_lines))
return
if args.backup:
shutil.copyfile(args.file, args.file + '.bak')
write_lines(args.file, new_lines)
print(f"Updated {args.file} from {args.map}.")
else:
sync_apply_out(lines, mapping, mkdirs=args.mkdirs, overwrite=not args.no_overwrite)
print(f"Exported content from {args.file} to files per {args.map}.")
return
# replace
replacement = load_replacement(getattr(args, 'from_file', None), getattr(args, 'stdin', False))
if args.section:
new_lines = replace_section(lines, args.section, replacement)
else:
new_lines = replace_named(lines, args.name, replacement)
if args.dry_run:
sys.stdout.write(''.join(new_lines))
else:
if args.backup:
shutil.copyfile(args.file, args.file + ".bak")
write_lines(args.file, new_lines)
if args.section:
print(f"Replaced body of section {args.section!r} in {args.file}")
else:
print(f"Replaced contents of '#+NAME: {args.name}' in {args.file}")
if __name__ == '__main__':
main()