diff --git a/code/example.org b/code/example.org new file mode 100755 index 0000000..8e57738 --- /dev/null +++ b/code/example.org @@ -0,0 +1,27 @@ +* Main Title +** Introduction +Some intro text. + +#+NAME: intro-para +This is a named paragraph that will be replaced. +It continues until the first blank line. + +** Data +#+NAME: mytable +| Item | Value | +|------+-------| +| A | 1 | +| B | 2 | + +** Code +#+NAME: code-snippet +#+BEGIN_SRC python +print("hello world") +#+END_SRC + +** My Section +This section body will be replaced. +It ends at the next heading of level ** or *. + +** Another Section +Content of another section. diff --git a/code/mapping.json b/code/mapping.json new file mode 100755 index 0000000..856e717 --- /dev/null +++ b/code/mapping.json @@ -0,0 +1,18 @@ +[ + { + "name": "code-snippet", + "file": "/mnt/data/orgpatch/code.py" + }, + { + "name": "mytable", + "file": "/mnt/data/orgpatch/tbl.org" + }, + { + "section": "** My Section", + "file": "/mnt/data/orgpatch/my_section.txt" + }, + { + "name": "intro-para", + "file": "/mnt/data/orgpatch/intro.txt" + } +] \ No newline at end of file diff --git a/code/mapping_out.json b/code/mapping_out.json new file mode 100755 index 0000000..5ec4df3 --- /dev/null +++ b/code/mapping_out.json @@ -0,0 +1,18 @@ +[ + { + "name": "code-snippet", + "file": "/mnt/data/orgpatch/exported/code.py" + }, + { + "name": "mytable", + "file": "/mnt/data/orgpatch/exported/table.org" + }, + { + "section": "** My Section", + "file": "/mnt/data/orgpatch/exported/section.txt" + }, + { + "name": "intro-para", + "file": "/mnt/data/orgpatch/exported/intro.txt" + } +] \ No newline at end of file diff --git a/code/orgpatch.py b/code/orgpatch.py new file mode 100755 index 0000000..ade5ed6 --- /dev/null +++ b/code/orgpatch.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python3 +# orgpatch.py +""" +Replace parts of Org-mode files from the command line. + +Supported targets: + 1) Sections by exact heading text (e.g., "** My Section") + 2) '#+NAME:'-labeled elements: + - Named blocks (#+BEGIN_... / #+END_...) + - Named tables (consecutive lines beginning with '|') + - Named paragraphs/lists immediately following '#+NAME:' (until blank line) + +Also supports JSON "sync" to batch update/export: + - direction=in : update Org from external files + - direction=out : export from Org into external files (like tangle, but no noweb) + +Usage: + List replaceable regions: + orgpatch.py list FILE.org + + Replace a section body with stdin: + orgpatch.py replace FILE.org --section "** My Section" --stdin + + Replace a named element with a file: + orgpatch.py replace FILE.org --name foo --from path.txt + + Sync via JSON mapping (in: files → Org, out: Org → files): + orgpatch.py sync FILE.org --map mapping.json --direction in --backup + orgpatch.py sync FILE.org --map mapping.json --direction out --mkdirs + +Mapping JSON format: + Either a top-level list: + [ + {"name": "code-snippet", "file": "code.py"}, + {"name": "mytable", "file": "table.org"}, + {"section": "** My Section", "file": "section.txt"} + ] + or an object with "entries": [ ...same as above... ] +""" + +from __future__ import annotations +import argparse +import re +import sys +import json +import shutil +import os +from typing import List, Tuple, Optional, Dict, Any + +Heading = Dict[str, Any] +NamedElem = Dict[str, Any] + +HEADING_RE = re.compile(r'^(\*+)\s+(.*)\s*$') +NAME_RE = re.compile(r'^\s*#\+NAME:\s*(\S+)\s*$', re.IGNORECASE) +BEGIN_RE = re.compile(r'^\s*#\+BEGIN_([A-Z0-9_]+)\b.*$', re.IGNORECASE) +END_RE_FMT = r'^\s*#\+END_{kind}\b.*$' +TABLE_RE = re.compile(r'^\s*\|') + +def read_lines(path: str) -> List[str]: + with open(path, 'r', encoding='utf-8') as f: + return f.readlines() + +def write_lines(path: str, lines: List[str]) -> None: + with open(path, 'w', encoding='utf-8') as f: + f.writelines(lines) + +def parse_headings(lines: List[str]) -> List[Heading]: + heads: List[Heading] = [] + for i, line in enumerate(lines): + m = HEADING_RE.match(line) + if m: + level = len(m.group(1)) + title = m.group(2) + heads.append({'i': i, 'level': level, 'title': title}) + return heads + +def section_bounds(lines: List[str], headings: List[Heading], exact_heading: str) -> Optional[Tuple[int, int]]: + exact_heading = exact_heading.rstrip('\n') + start_idx = None + level = None + for h in headings: + heading_line_text = "{} {}".format('*' * h['level'], h['title']) + if heading_line_text == exact_heading: + start_idx = h['i'] + level = h['level'] + break + if start_idx is None: + return None + body_start = start_idx + 1 + for h in headings: + if h['i'] <= start_idx: + continue + if h['level'] <= level: + return (body_start, h['i']) + return (body_start, len(lines)) + +def _scan_named_paragraph(lines: List[str], j: int) -> Optional[Tuple[int, int]]: + n = len(lines) + if j >= n: + return None + if BEGIN_RE.match(lines[j]) or TABLE_RE.match(lines[j]): + return None + if lines[j].strip() == "": + return None + k = j + 1 + while k < n and lines[k].strip() != "": + k += 1 + return (j, k) + +def parse_named_elements(lines: List[str]) -> List[NamedElem]: + named: List[NamedElem] = [] + i = 0 + n = len(lines) + while i < n: + m = NAME_RE.match(lines[i]) + if not m: + i += 1 + continue + name = m.group(1) + j = i + 1 + while j < n and lines[j].strip() == '': + j += 1 + if j >= n: + i += 1 + continue + mbeg = BEGIN_RE.match(lines[j]) + if mbeg: + kind = mbeg.group(1).upper() + end_re = re.compile(END_RE_FMT.format(kind=re.escape(kind)), re.IGNORECASE) + k = j + 1 + while k < n and not end_re.match(lines[k]): + k += 1 + if k >= n: + content_start = j + 1 + content_end = n + named.append({ + 'type': 'block', + 'name': name, + 'kind': kind, + 'begin_i': j, + 'end_i': n - 1, + 'content_start': content_start, + 'content_end': content_end + }) + i = n + continue + named.append({ + 'type': 'block', + 'name': name, + 'kind': kind, + 'begin_i': j, + 'end_i': k, + 'content_start': j + 1, + 'content_end': k + }) + i = k + 1 + continue + if TABLE_RE.match(lines[j]): + k = j + while k < n and TABLE_RE.match(lines[k]): + k += 1 + named.append({ + 'type': 'table', + 'name': name, + 'start_i': j, + 'end_i': k + }) + i = k + continue + para_bounds = _scan_named_paragraph(lines, j) + if para_bounds is not None: + s, e = para_bounds + named.append({ + 'type': 'para', + 'name': name, + 'start_i': s, + 'end_i': e + }) + i = e + continue + i = j + return named + +def list_targets(lines: List[str]) -> None: + heads = parse_headings(lines) + print("== Sections ==") + for h in heads: + print(f" L{h['i']+1:>4} | level {h['level']} | {('*'*h['level'])} {h['title']}") + print("\n== #+NAME elements ==") + named = parse_named_elements(lines) + for e in named: + if e['type'] == 'block': + beg = e['begin_i'] + 1 + end = e['end_i'] + 1 + print(f" name={e['name']} | block {e['kind']} | lines {beg}-{end} (content {e['content_start']+1}-{e['content_end']})") + elif e['type'] == 'table': + print(f" name={e['name']} | table | lines {e['start_i']+1}-{e['end_i']}") + else: + print(f" name={e['name']} | paragraph | lines {e['start_i']+1}-{e['end_i']}") + +def load_replacement(from_file: Optional[str], use_stdin: bool) -> str: + if from_file and use_stdin: + raise SystemExit("Choose either --from FILE or --stdin, not both.") + if from_file: + with open(from_file, 'r', encoding='utf-8') as f: + return f.read() + if use_stdin: + return sys.stdin.read() + raise SystemExit("You must provide replacement text with --from FILE or --stdin.") + +def replace_section(lines: List[str], heading_text: str, new_body: str) -> List[str]: + heads = parse_headings(lines) + bounds = section_bounds(lines, heads, heading_text) + if not bounds: + raise SystemExit(f"Section not found: {heading_text!r}") + start, end = bounds + new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_body.splitlines()] + return lines[:start] + new_lines + lines[end:] + +def replace_named(lines: List[str], name: str, new_content: str) -> List[str]: + elems = parse_named_elements(lines) + target = next((e for e in elems if e['name'] == name), None) + if not target: + raise SystemExit(f"No '#+NAME: {name}' element found (supported: named blocks, tables, and paragraphs).") + if target['type'] == 'block': + c0, c1 = target['content_start'], target['content_end'] + new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_content.splitlines()] + return lines[:c0] + new_lines + lines[c1:] + elif target['type'] == 'table': + s, e = target['start_i'], target['end_i'] + new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_content.splitlines()] + for idx, ln in enumerate(new_lines, 1): + if ln.strip() and not ln.lstrip().startswith('|'): + print(f"Warning: replacement line {idx} for table '{name}' does not begin with '|'", file=sys.stderr) + return lines[:s] + new_lines + lines[e:] + else: + s, e = target['start_i'], target['end_i'] + new_lines = [ln if ln.endswith('\n') else (ln + '\n') for ln in new_content.splitlines()] + if not new_lines or new_lines[-1].strip() != "": + new_lines.append("\n") + return lines[:s] + new_lines + lines[e:] + +# ===== Sync helpers ===== + +def extract_named_content(lines: List[str], name: str) -> str: + elems = parse_named_elements(lines) + e = next((x for x in elems if x['name'] == name), None) + if not e: + raise SystemExit(f"No '#+NAME: {name}' element found.") + if e['type'] == 'block': + c0, c1 = e['content_start'], e['content_end'] + return ''.join(lines[c0:c1]) + elif e['type'] == 'table': + s, eidx = e['start_i'], e['end_i'] + return ''.join(lines[s:eidx]) + else: + s, eidx = e['start_i'], e['end_i'] + return ''.join(lines[s:eidx]) + +def extract_section_body(lines: List[str], heading_text: str) -> str: + heads = parse_headings(lines) + bounds = section_bounds(lines, heads, heading_text) + if not bounds: + raise SystemExit(f"Section not found: {heading_text!r}") + s, e = bounds + return ''.join(lines[s:e]) + +def sync_apply_in(lines: List[str], mapping: list) -> List[str]: + """Apply updates from files into the Org according to mapping entries.""" + for idx, entry in enumerate(mapping, 1): + if 'file' not in entry: + raise SystemExit(f"Mapping entry #{idx} is missing 'file' field.") + fpath = entry['file'] + try: + with open(fpath, 'r', encoding='utf-8') as fh: + content = fh.read() + except FileNotFoundError: + raise SystemExit(f"Mapping entry #{idx}: source file not found: {fpath!r}") + if 'section' in entry: + lines = replace_section(lines, entry['section'], content) + elif 'name' in entry: + lines = replace_named(lines, entry['name'], content) + else: + raise SystemExit(f"Mapping entry #{idx} must have either 'name' or 'section'.") + return lines + +def sync_apply_out(lines: List[str], mapping: list, mkdirs: bool = False, overwrite: bool = True) -> None: + """Extract content from Org into files according to mapping entries (does not alter Org).""" + for idx, entry in enumerate(mapping, 1): + if 'file' not in entry: + raise SystemExit(f"Mapping entry #{idx} is missing 'file' field.") + fpath = entry['file'] + if mkdirs: + pathlib.Path(fpath).parent.mkdir(parents=True, exist_ok=True) + if (not overwrite) and os.path.exists(fpath): + print(f"Skipping existing file (overwrite disabled): {fpath}", file=sys.stderr) + continue + if 'section' in entry: + text = extract_section_body(lines, entry['section']) + elif 'name' in entry: + text = extract_named_content(lines, entry['name']) + else: + raise SystemExit(f"Mapping entry #{idx} must have either 'name' or 'section'.") + with open(fpath, 'w', encoding='utf-8') as fh: + fh.write(text) + +def load_mapping(path: str) -> list: + with open(path, 'r', encoding='utf-8') as fh: + data = json.load(fh) + if isinstance(data, dict) and 'entries' in data: + return data['entries'] + if not isinstance(data, list): + raise SystemExit("Mapping JSON must be a list of entries or an object with an 'entries' list.") + return data + +def main(argv: Optional[List[str]] = None) -> None: + p = argparse.ArgumentParser(description="Replace parts of Org files and sync via JSON map.") + sub = p.add_subparsers(dest='cmd', required=True) + + p_list = sub.add_parser('list', help="List replaceable sections and #+NAME elements") + p_list.add_argument('file', help="Org file") + + p_rep = sub.add_parser('replace', help="Replace a section body or a named element's contents") + p_rep.add_argument('file', help="Org file") + target = p_rep.add_mutually_exclusive_group(required=True) + target.add_argument('--section', help='Exact heading text to match (e.g., "** My Section")') + target.add_argument('--name', help="Name after '#+NAME:' to match") + src = p_rep.add_mutually_exclusive_group(required=True) + src.add_argument('--from', dest='from_file', metavar='FILE', help="Take replacement text from FILE") + src.add_argument('--stdin', action='store_true', help="Read replacement text from stdin") + p_rep.add_argument('--dry-run', action='store_true', help="Show the result to stdout without writing the file") + p_rep.add_argument('--backup', action='store_true', help="Write FILE.org.bak before modifying") + + p_sync = sub.add_parser('sync', help="Sync blocks/sections using a JSON mapping (in: files → Org, out: Org → files)") + p_sync.add_argument('file', help='Org file') + p_sync.add_argument('--map', required=True, help='Path to JSON mapping file') + p_sync.add_argument('--direction', choices=['in','out'], required=True, help="'in' updates Org from files; 'out' exports from Org to files") + p_sync.add_argument('--dry-run', action='store_true', help='Preview Org result (direction=in) without writing file') + p_sync.add_argument('--backup', action='store_true', help='Write FILE.org.bak before modifying (direction=in only)') + p_sync.add_argument('--mkdirs', action='store_true', help='Create parent directories when exporting (direction=out)') + p_sync.add_argument('--no-overwrite', action='store_true', help='When exporting, do not overwrite existing files') + + args = p.parse_args(argv) + lines = read_lines(args.file) + + if args.cmd == 'list': + list_targets(lines) + return + + if args.cmd == 'sync': + mapping = load_mapping(args.map) + if args.direction == 'in': + new_lines = sync_apply_in(lines, mapping) + if args.dry_run: + sys.stdout.write(''.join(new_lines)) + return + if args.backup: + shutil.copyfile(args.file, args.file + '.bak') + write_lines(args.file, new_lines) + print(f"Updated {args.file} from {args.map}.") + else: + sync_apply_out(lines, mapping, mkdirs=args.mkdirs, overwrite=not args.no_overwrite) + print(f"Exported content from {args.file} to files per {args.map}.") + return + + # replace + replacement = load_replacement(getattr(args, 'from_file', None), getattr(args, 'stdin', False)) + if args.section: + new_lines = replace_section(lines, args.section, replacement) + else: + new_lines = replace_named(lines, args.name, replacement) + + if args.dry_run: + sys.stdout.write(''.join(new_lines)) + else: + if args.backup: + shutil.copyfile(args.file, args.file + ".bak") + write_lines(args.file, new_lines) + if args.section: + print(f"Replaced body of section {args.section!r} in {args.file}") + else: + print(f"Replaced contents of '#+NAME: {args.name}' in {args.file}") + +if __name__ == '__main__': + main() diff --git a/code/test_orgpatch.py b/code/test_orgpatch.py new file mode 100755 index 0000000..0051733 --- /dev/null +++ b/code/test_orgpatch.py @@ -0,0 +1,130 @@ +import sys +from pathlib import Path + +# Import module from same folder +sys.path.insert(0, str(Path(__file__).parent)) +import orgpatch as op + +EXAMPLE_ORG = """* Main Title +** Introduction +Some intro text. + +#+NAME: intro-para +This is a named paragraph that will be replaced. +It continues until the first blank line. + +** Data +#+NAME: mytable +| Item | Value | +|------+-------| +| A | 1 | +| B | 2 | + +** Code +#+NAME: code-snippet +#+BEGIN_SRC python +print("hello world") +#+END_SRC + +** My Section +This section body will be replaced. +It ends at the next heading of level ** or *. + +** Another Section +Content of another section. +""" + +def write(p: Path, text: str): + p.write_text(text, encoding="utf-8") + +def test_parse_headings_and_sections(tmp_path: Path): + org = tmp_path / "example.org" + write(org, EXAMPLE_ORG) + lines = op.read_lines(str(org)) + heads = op.parse_headings(lines) + titles = [h['title'] for h in heads] + assert "Introduction" in titles + assert "My Section" in titles + b = op.section_bounds(lines, heads, "** My Section") + assert b is not None + s, e = b + body = ''.join(lines[s:e]) + assert "This section body will be replaced." in body + +def test_parse_named_elements(tmp_path: Path): + org = tmp_path / "example.org" + write(org, EXAMPLE_ORG) + lines = op.read_lines(str(org)) + elems = op.parse_named_elements(lines) + names = {e['name']: e for e in elems} + assert names['intro-para']['type'] == 'para' + assert names['mytable']['type'] == 'table' + assert names['code-snippet']['type'] == 'block' + +def test_replace_section(tmp_path: Path): + org = tmp_path / "example.org" + write(org, EXAMPLE_ORG) + lines = op.read_lines(str(org)) + out = op.replace_section(lines, "** My Section", "New body\nMore\n") + joined = ''.join(out) + assert "New body" in joined + assert "This section body will be replaced." not in joined + +def test_replace_named_block_table_para(tmp_path: Path): + org = tmp_path / "example.org" + write(org, EXAMPLE_ORG) + lines = op.read_lines(str(org)) + + out = op.replace_named(lines, "code-snippet", "print('updated')\n") + assert "print('updated')" in ''.join(out) + + out2 = op.replace_named(out, "mytable", "| X | 9 |\n") + assert "| X | 9 |" in ''.join(out2) + + out3 = op.replace_named(out2, "intro-para", "New intro\nSecond\n") + j3 = ''.join(out3) + assert "New intro" in j3 + assert "named paragraph that will be replaced" not in j3 + +def test_sync_in_and_out(tmp_path: Path): + org = tmp_path / "example.org" + write(org, EXAMPLE_ORG) + + code_src = tmp_path / "code.py" + tbl_src = tmp_path / "tbl.org" + sec_src = tmp_path / "section.txt" + para_src = tmp_path / "intro.txt" + write(code_src, "print('via in')\n") + write(tbl_src, "| Col | Val |\n|-----+-----|\n| A | 1 |\n") + write(sec_src, "Replaced via sync in.\nSecond line.\n") + write(para_src, "Intro via sync in.\n\n") + + mapping = [ + {"name": "code-snippet", "file": str(code_src)}, + {"name": "mytable", "file": str(tbl_src)}, + {"section": "** My Section", "file": str(sec_src)}, + {"name": "intro-para", "file": str(para_src)}, + ] + + lines = op.read_lines(str(org)) + new_lines = op.sync_apply_in(lines, mapping) + out = ''.join(new_lines) + assert "print('via in')" in out + assert "| Col | Val |" in out + assert "Replaced via sync in." in out + assert "Intro via sync in." in out + + export_dir = tmp_path / "exported" + export_dir.mkdir() + m_out = [ + {"name": "code-snippet", "file": str(export_dir / "code.py")}, + {"name": "mytable", "file": str(export_dir / "table.org")}, + {"section": "** My Section", "file": str(export_dir / "section.txt")}, + {"name": "intro-para", "file": str(export_dir / "intro.txt")}, + ] + op.sync_apply_out(new_lines, m_out, mkdirs=True, overwrite=True) + + assert (export_dir / "code.py").read_text(encoding="utf-8").strip() == "print('via in')" + assert "| Col | Val |" in (export_dir / "table.org").read_text(encoding="utf-8") + assert "Replaced via sync in." in (export_dir / "section.txt").read_text(encoding="utf-8") + assert "Intro via sync in." in (export_dir / "intro.txt").read_text(encoding="utf-8")