ThreeGate/tools/validate_tool_request.py

265 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Validate a Tool Request against schemas/tool-request.schema.md (schema_version=1).
Supports two execution backends:
- backend=ERA (default): command execution in microVM lane
- backend=monty: Python-subset execution in TOOL-EXEC-Lite lane
Usage:
validate_tool_request.py /path/to/request.md
Exit codes:
0 = valid
2 = invalid
3 = error
"""
from __future__ import annotations
import json
import re
import sys
from typing import List, Tuple
import keyword
from validate_common import (
ValidationResult,
extract_front_matter,
find_forbidden,
read_text,
require_keys,
require_sections_in_order,
)
REQUIRED_KEYS_BASE = [
"request_type",
"schema_version",
"request_id",
"created_utc",
"requested_by",
"approved_by",
"approved_utc",
"purpose",
"language",
"network",
"cpu_limit",
"memory_limit_mb",
"time_limit_sec",
]
# Additional optional key (recommended); default is ERA if absent.
OPTIONAL_KEYS = ["backend"]
# ERA required headings
REQUIRED_H2_ERA = [
"## Command",
"## Input Files",
"## Output Expectations",
"## Risk Assessment",
]
# Monty required headings (Inputs optional)
REQUIRED_H2_MONTY = [
"## Code",
"## Output Expectations",
"## Risk Assessment",
]
# Strong rules for ERA: command must be a single line and must not contain shell chaining/pipes/redirection
DANGEROUS_CMD_TOKENS = re.compile(r"[;&|><`]|(\$\()|(\)\s*)", re.IGNORECASE)
# Monty code guardrails (best-effort, not a substitute for Monty itself)
FORBIDDEN_MONTY_CODE_TOKENS = re.compile(
r"\b(import|open|exec|eval|compile|__import__|globals|locals|vars|dir|getattr|setattr|delattr)\b",
re.IGNORECASE,
)
IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def extract_single_line_after_heading(body: str, heading: str) -> str:
lines = body.splitlines()
try:
i = lines.index(heading)
except ValueError:
return ""
for j in range(i + 1, len(lines)):
line = lines[j].strip()
if line.startswith("## "):
break
if line:
return line
return ""
def extract_section_text(body: str, heading: str) -> str:
"""
Extract full text under a heading until the next H2.
"""
lines = body.splitlines()
try:
i = lines.index(heading)
except ValueError:
return ""
out: List[str] = []
for j in range(i + 1, len(lines)):
if lines[j].startswith("## "):
break
out.append(lines[j])
return "\n".join(out).strip()
def validate_era(body: str, errors: List[str]) -> None:
# Headings
errors.extend(require_sections_in_order(body, REQUIRED_H2_ERA))
# Command rules
cmd = extract_single_line_after_heading(body, "## Command")
if not cmd:
errors.append("## Command must contain a single command line.")
return
if cmd.startswith("```") or cmd.endswith("```"):
errors.append("Command must be plain text, not a fenced code block.")
if DANGEROUS_CMD_TOKENS.search(cmd):
errors.append("Command contains forbidden shell metacharacters (chaining/pipes/redirection/subshell).")
lc = cmd.lower()
if "pip install" in lc or "apt" in lc or "apt-get" in lc or "npm install" in lc:
errors.append("Command appears to install packages; installs are forbidden in TOOL-EXEC.")
def validate_monty(fm: dict, body: str, errors: List[str], warnings: List[str]) -> None:
# Headings
errors.extend(require_sections_in_order(body, REQUIRED_H2_MONTY))
# Monty requires code block text (not fenced)
code = extract_section_text(body, "## Code")
if not code:
errors.append("## Code must contain Monty-executable Python-subset code.")
return
# For safety, discourage fenced blocks (users can still paste raw code)
if "```" in code:
errors.append("Monty code must be plain text, not fenced code blocks.")
# Inputs JSON is optional but if present must parse as object
inputs_json = extract_section_text(body, "## Inputs (JSON)")
if inputs_json:
try:
obj = json.loads(inputs_json)
if not isinstance(obj, dict):
errors.append("## Inputs (JSON) must be a JSON object/dict.")
else:
bad_keys = []
for k in obj.keys():
if not isinstance(k, str):
bad_keys.append(repr(k))
continue
if not IDENT_RE.match(k) or keyword.iskeyword(k):
bad_keys.append(k)
if bad_keys:
errors.append(
"## Inputs (JSON) keys must be valid Python identifiers and not keywords. "
f"Invalid keys: {', '.join(bad_keys)}"
)
except Exception as e:
errors.append(f"Invalid JSON in ## Inputs (JSON): {e}")
# Backend-policy constraints
lang = fm.get("language", "").strip().lower()
if lang != "python":
errors.append("backend=monty requires language=python.")
net = fm.get("network", "").strip().lower()
if net != "none":
errors.append("backend=monty requires network=none.")
# Best-effort code guardrails: these names should not appear in pure-compute Monty lane
# (Monty itself blocks many capabilities; this is defense-in-depth and discourages risky patterns.)
if FORBIDDEN_MONTY_CODE_TOKENS.search(code):
warnings.append(
"Monty code contains potentially risky builtins/names (import/open/exec/eval/etc). "
"Monty may block these, but review intent carefully."
)
def validate(path: str) -> ValidationResult:
errors: List[str] = []
warnings: List[str] = []
md = read_text(path)
fm, body = extract_front_matter(md)
# Base required keys
missing = require_keys(fm, REQUIRED_KEYS_BASE)
if missing:
errors.append(f"Missing required front matter keys: {', '.join(missing)}")
if fm.get("request_type") != "tool_request":
errors.append(f"request_type must be 'tool_request' (got: {fm.get('request_type')!r})")
if fm.get("schema_version") != "1":
errors.append(f"schema_version must be '1' (got: {fm.get('schema_version')!r})")
# Approval gate: require approved_by and approved_utc
if not fm.get("approved_by") or not fm.get("approved_utc"):
errors.append("Tool Request must include approved_by and approved_utc (human approval gate).")
# language must not be shell
if fm.get("language", "").strip().lower() in ("shell", "bash", "sh", "zsh", "powershell", "pwsh", "cmd"):
errors.append("language must not be a shell. Use a supported language runtime only.")
# network must be none or allowlist
net = fm.get("network", "").strip().lower()
if net not in ("none", "allowlist"):
errors.append("network must be 'none' or 'allowlist'.")
# Determine backend (default ERA)
backend = (fm.get("backend") or "ERA").strip().lower()
if backend not in ("era", "monty"):
errors.append("backend must be 'ERA' or 'monty' (default ERA if omitted).")
# Backend-specific validation
if backend == "era":
validate_era(body, errors)
elif backend == "monty":
validate_monty(fm, body, errors, warnings)
# Forbidden content scan (whole doc)
forbidden_hits = find_forbidden(md)
if forbidden_hits:
errors.extend(forbidden_hits)
return ValidationResult(ok=(len(errors) == 0), errors=errors, warnings=warnings)
def main() -> int:
if len(sys.argv) != 2:
print(__doc__.strip(), file=sys.stderr)
return 3
path = sys.argv[1]
try:
res = validate(path)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 3
if res.ok:
for w in res.warnings:
print(f"WARNING: {w}", file=sys.stderr)
print("ACCEPT")
return 0
else:
for e in res.errors:
print(f"ERROR: {e}", file=sys.stderr)
for w in res.warnings:
print(f"WARNING: {w}", file=sys.stderr)
print("REJECT")
return 2
if __name__ == "__main__":
raise SystemExit(main())