154 lines
4.2 KiB
Python
154 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validate a Tool Request against schemas/tool-request.schema.md (schema_version=1).
|
|
|
|
Usage:
|
|
validate_tool_request.py /path/to/request.md
|
|
|
|
Exit codes:
|
|
0 = valid
|
|
2 = invalid
|
|
3 = error
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import sys
|
|
from typing import List
|
|
|
|
from validate_common import (
|
|
ValidationResult,
|
|
extract_front_matter,
|
|
find_forbidden,
|
|
read_text,
|
|
require_keys,
|
|
require_sections_in_order,
|
|
)
|
|
|
|
REQUIRED_KEYS = [
|
|
"request_type",
|
|
"schema_version",
|
|
"request_id",
|
|
"created_utc",
|
|
"requested_by",
|
|
"approved_by",
|
|
"approved_utc",
|
|
"purpose",
|
|
"language",
|
|
"network",
|
|
"cpu_limit",
|
|
"memory_limit_mb",
|
|
"time_limit_sec",
|
|
]
|
|
|
|
REQUIRED_H2 = [
|
|
"## Command",
|
|
"## Input Files",
|
|
"## Output Expectations",
|
|
"## Risk Assessment",
|
|
]
|
|
|
|
# Strong rules: command must be a single line and must not contain shell chaining/pipes/redirection
|
|
DANGEROUS_CMD_TOKENS = re.compile(r"[;&|><`]|(\$\()|(\)\s*)", re.IGNORECASE)
|
|
|
|
|
|
def extract_command(body: str) -> str:
|
|
lines = body.splitlines()
|
|
try:
|
|
i = lines.index("## Command")
|
|
except ValueError:
|
|
return ""
|
|
# Next non-empty line after heading is the command, until next heading
|
|
cmd = ""
|
|
for j in range(i + 1, len(lines)):
|
|
line = lines[j].strip()
|
|
if line.startswith("## "):
|
|
break
|
|
if line:
|
|
cmd = line
|
|
break
|
|
return cmd
|
|
|
|
|
|
def validate(path: str) -> ValidationResult:
|
|
errors: List[str] = []
|
|
warnings: List[str] = []
|
|
|
|
md = read_text(path)
|
|
fm, body = extract_front_matter(md)
|
|
|
|
missing = require_keys(fm, REQUIRED_KEYS)
|
|
if missing:
|
|
errors.append(f"Missing required front matter keys: {', '.join(missing)}")
|
|
|
|
if fm.get("request_type") != "tool_request":
|
|
errors.append(f"request_type must be 'tool_request' (got: {fm.get('request_type')!r})")
|
|
|
|
if fm.get("schema_version") != "1":
|
|
errors.append(f"schema_version must be '1' (got: {fm.get('schema_version')!r})")
|
|
|
|
# Approval gate: require approved_by and approved_utc
|
|
if not fm.get("approved_by") or not fm.get("approved_utc"):
|
|
errors.append("Tool Request must include approved_by and approved_utc (human approval gate).")
|
|
|
|
# language must not be shell
|
|
if fm.get("language", "").strip().lower() in ("shell", "bash", "sh", "zsh", "powershell", "pwsh", "cmd"):
|
|
errors.append("language must not be a shell. Use a supported language runtime only.")
|
|
|
|
# network defaults: none or allowlist
|
|
net = fm.get("network", "").strip().lower()
|
|
if net not in ("none", "allowlist"):
|
|
errors.append("network must be 'none' or 'allowlist'.")
|
|
|
|
errors.extend(require_sections_in_order(body, REQUIRED_H2))
|
|
|
|
# Command rules
|
|
cmd = extract_command(body)
|
|
if not cmd:
|
|
errors.append("## Command must contain a single command line.")
|
|
else:
|
|
if cmd.startswith("```") or cmd.endswith("```"):
|
|
errors.append("Command must be plain text, not a fenced code block.")
|
|
if DANGEROUS_CMD_TOKENS.search(cmd):
|
|
errors.append("Command contains forbidden shell metacharacters (chaining/pipes/redirection/subshell).")
|
|
if "pip install" in cmd.lower() or "apt" in cmd.lower() or "npm install" in cmd.lower():
|
|
errors.append("Command appears to install packages; installs are forbidden in TOOL-EXEC.")
|
|
|
|
# Forbidden content scan (whole doc)
|
|
forbidden_hits = find_forbidden(md)
|
|
if forbidden_hits:
|
|
errors.extend(forbidden_hits)
|
|
|
|
return ValidationResult(ok=(len(errors) == 0), errors=errors, warnings=warnings)
|
|
|
|
|
|
def main() -> int:
|
|
if len(sys.argv) != 2:
|
|
print(__doc__.strip(), file=sys.stderr)
|
|
return 3
|
|
path = sys.argv[1]
|
|
try:
|
|
res = validate(path)
|
|
except Exception as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
return 3
|
|
|
|
if res.ok:
|
|
for w in res.warnings:
|
|
print(f"WARNING: {w}", file=sys.stderr)
|
|
print("ACCEPT")
|
|
return 0
|
|
else:
|
|
for e in res.errors:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
for w in res.warnings:
|
|
print(f"WARNING: {w}", file=sys.stderr)
|
|
print("REJECT")
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|