From e4f8b144372f26d3f32aa43c08be118ae8312290 Mon Sep 17 00:00:00 2001 From: welsberr Date: Mon, 27 Apr 2026 15:13:31 -0400 Subject: [PATCH] Add smoke test, enable Ollama discovery in singlebox config, update demo doc scripts/smoke_test.py: end-to-end validation script covering health, cluster state, model catalog, route resolution, non-streaming chat (role + direct asset), streaming chat (SSE validation + reasoning-strip check), embeddings, and Ollama discovery metrics. Auto-detects targets from /v1/models; accepts --chat-role, --chat-asset, --embed-asset overrides. Exit 0 if all pass/skip, exit 1 on any failure. configs/node.singlebox.ollama.example.yaml: add discover_protocol: "ollama" to both services so the config works out of the box for Ollama discovery testing without manual edits. docs/llm_demo.md: update Current Readiness to reflect v1 complete feature set; add Smoke Test section; add New Capabilities section covering streaming, routing strategies, Ollama discovery, and role catalogs. Co-Authored-By: Claude Sonnet 4.6 --- configs/node.singlebox.ollama.example.yaml | 6 + docs/llm_demo.md | 127 +++++- scripts/smoke_test.py | 451 +++++++++++++++++++++ 3 files changed, 572 insertions(+), 12 deletions(-) create mode 100644 scripts/smoke_test.py diff --git a/configs/node.singlebox.ollama.example.yaml b/configs/node.singlebox.ollama.example.yaml index 30b950d..ac3e315 100644 --- a/configs/node.singlebox.ollama.example.yaml +++ b/configs/node.singlebox.ollama.example.yaml @@ -31,6 +31,11 @@ services: runtime: engine: "ollama" launcher: "external" + # discover_protocol: "ollama" queries GET /api/tags and GET /api/ps on each heartbeat. + # Assets listed here serve as a static baseline; discovered models are merged in and + # their loaded state is corrected from /api/ps (VRAM-resident = loaded: true). + # loaded_model_count and vram_used_bytes are populated in observed from /api/ps. + discover_protocol: "ollama" assets: - asset_id: "qwen3" loaded: true @@ -47,6 +52,7 @@ services: runtime: engine: "ollama" launcher: "external" + discover_protocol: "ollama" assets: - asset_id: "nomic-embed-text" loaded: true diff --git a/docs/llm_demo.md b/docs/llm_demo.md index 6798c16..62ee3d0 100644 --- a/docs/llm_demo.md +++ b/docs/llm_demo.md @@ -8,24 +8,127 @@ This runbook covers the first practical GenieHive LLM demo with three roles: ## Current Readiness -GenieHive is ready for a first live chat demo now. +GenieHive v1 is fully implemented and ready for live demo. -What works in GenieHive already: +What works: -- node registration -- heartbeat -- role-aware route resolution -- `GET /v1/models` -- `POST /v1/chat/completions` +- Node registration and heartbeat with auto-re-registration on 404 +- Role-aware route resolution with `fallback_roles` chain and cycle protection +- Three routing strategies: `scored` (default), `round_robin`, `least_loaded` +- `GET /v1/models` — OpenAI-compatible catalog with rich GenieHive metadata +- `POST /v1/chat/completions` — non-streaming and streaming (`stream: true`) - `POST /v1/embeddings` +- `POST /v1/audio/transcriptions` — multipart audio proxy +- Active health probing (`routing.probe_interval_s` in control config) +- Ollama dynamic model discovery: `discover_protocol: "ollama"` in node config + queries `/api/tags` and `/api/ps` each heartbeat; corrects loaded state and + populates `observed.loaded_model_count` and `observed.vram_used_bytes` +- OpenAI-compatible discovery: `discover_protocol: "openai"` queries `/v1/models` +- Reasoning-field stripping (`reasoning_content`, `reasoning`) from both + non-streaming and streaming responses +- Request policy: body defaults, overrides, system prompt injection per asset or role +- Qwen3/Qwen3.5 auto-detection with `enable_thinking: false` applied automatically -What GenieHive does not do yet: +GenieHive does not launch upstream LLM servers for you. Treat it as a +metadata-rich router over already-running local servers. -- launch upstream LLM servers for you automatically -- provide `POST /v1/audio/transcriptions` -- maintain advanced benchmark history or queue-aware scheduling +## Smoke Test -For the first demo, treat GenieHive as a metadata-rich router over already-running local servers. +After bringing up control + node + upstream, run: + +```bash +python scripts/smoke_test.py \ + --base-url http://127.0.0.1:8800 \ + --api-key change-me-client-key +``` + +This validates in sequence: health, cluster state, model catalog, route +resolution, non-streaming chat (role and direct asset), streaming chat, +embeddings, Ollama discovery metrics, and reasoning-field stripping. Each +check reports PASS / FAIL / SKIP with a short explanation. + +Optional flags: + +```bash +python scripts/smoke_test.py \ + --base-url http://127.0.0.1:8800 \ + --api-key change-me-client-key \ + --chat-role mentor \ + --chat-asset qwen3 \ + --embed-asset nomic-embed-text +``` + +## New Capabilities Since Initial Demo + +### Streaming chat + +Add `"stream": true` to any chat request. GenieHive returns a standard +`text/event-stream` response with `Cache-Control: no-cache` and +`X-Accel-Buffering: no` headers set for nginx/proxy compatibility: + +```bash +curl -sS http://127.0.0.1:8800/v1/chat/completions \ + -H 'Content-Type: application/json' \ + -H 'X-Api-Key: change-me-client-key' \ + -d '{ + "model": "mentor", + "messages": [{"role":"user","content":"Count to five."}], + "stream": true + }' +``` + +Reasoning fields (`reasoning_content`, `reasoning`) are stripped from every +SSE chunk before forwarding, just as they are for non-streaming responses. + +### Routing strategy + +Set `routing.default_strategy` in your control config: + +```yaml +routing: + default_strategy: "least_loaded" # scored | round_robin | least_loaded +``` + +`least_loaded` picks the service with the lowest `queue_depth + in_flight`. +When Ollama discovery is enabled, `loaded_model_count` and `vram_used_bytes` +are available in `observed` and visible via `GET /v1/cluster/services`. + +### Ollama dynamic model discovery + +Add `discover_protocol: "ollama"` to any Ollama-backed service in your node +config. On each heartbeat the node queries `/api/tags` (available models) and +`/api/ps` (VRAM-loaded models) and merges the results into the service's asset +list. Stale `loaded: true` entries in static config are corrected automatically. + +```yaml +services: + - service_id: "singlebox/chat/qwen3" + kind: "chat" + endpoint: "http://127.0.0.1:11434" + discover_protocol: "ollama" + assets: + - asset_id: "qwen3" # static baseline; enriched each heartbeat + loaded: true +``` + +After the first enriched heartbeat, `GET /v1/cluster/services` will show +`observed.loaded_model_count` and `observed.vram_used_bytes` for that service. + +### Role catalogs + +Five role catalog files are now included under `configs/`: + +| File | Framework | Roles | +|---|---|---| +| `roles.surgical-team.example.yaml` | Brooks/Mills surgical team | 9 (`surg_` prefix) | +| `roles.belbin.example.yaml` | Belbin team roles | 9 (`belbin_` prefix) | +| `roles.sixhats.example.yaml` | De Bono Six Thinking Hats | 6 (`sixhats_` prefix) | +| `roles.disney.example.yaml` | Disney creative strategy | 3 (`disney_` prefix) | +| `roles.xp.example.yaml` | XP team roles | 5 (`xp_` prefix) | + +Point `roles_path` in your control config at any of these files to load that +catalog. Multiple catalogs can be merged by listing them in sequence — or +concatenate the `roles:` blocks manually into a single file. ## Topologies diff --git a/scripts/smoke_test.py b/scripts/smoke_test.py new file mode 100644 index 0000000..a4b798c --- /dev/null +++ b/scripts/smoke_test.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +"""GenieHive end-to-end smoke test. + +Validates every major path introduced through P1–P2: registration, catalog, +non-streaming chat, streaming chat, embeddings, direct asset addressing, route +resolution, and Ollama discovery metrics. + +Usage: + python scripts/smoke_test.py --base-url http://127.0.0.1:8800 \ + --api-key change-me-client-key + +Optional: + --chat-role Role alias to use for chat tests (default: auto-detected) + --chat-asset Direct asset ID to use for chat (default: auto-detected) + --embed-asset Direct asset ID to use for embed (default: auto-detected) + +Exit codes: + 0 all checks passed (or skipped) + 1 one or more checks failed +""" +from __future__ import annotations + +import argparse +import json +import sys +import textwrap +from dataclasses import dataclass, field +from typing import Any + +import httpx + +# ── Result tracking ─────────────────────────────────────────────────────────── + +PASS = "PASS" +FAIL = "FAIL" +SKIP = "SKIP" + + +@dataclass +class Check: + name: str + status: str + detail: str = "" + + +@dataclass +class Suite: + checks: list[Check] = field(default_factory=list) + + def record(self, name: str, status: str, detail: str = "") -> Check: + c = Check(name, status, detail) + self.checks.append(c) + symbol = {"PASS": "✓", "FAIL": "✗", "SKIP": "–"}.get(status, "?") + line = f" [{symbol}] {name}" + if detail: + line += f"\n {detail}" + print(line) + return c + + def ok(self, name: str, detail: str = "") -> Check: + return self.record(name, PASS, detail) + + def fail(self, name: str, detail: str = "") -> Check: + return self.record(name, FAIL, detail) + + def skip(self, name: str, reason: str = "") -> Check: + return self.record(name, SKIP, reason) + + @property + def failed(self) -> list[Check]: + return [c for c in self.checks if c.status == FAIL] + + def summary(self) -> str: + passed = sum(1 for c in self.checks if c.status == PASS) + failed = len(self.failed) + skipped = sum(1 for c in self.checks if c.status == SKIP) + return f"{passed} passed, {failed} failed, {skipped} skipped" + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _headers(api_key: str) -> dict[str, str]: + return {"X-Api-Key": api_key} + + +def _json_headers(api_key: str) -> dict[str, str]: + return {"X-Api-Key": api_key, "Content-Type": "application/json"} + + +def _short(text: str, max_len: int = 120) -> str: + text = text.replace("\n", " ").strip() + return text if len(text) <= max_len else text[:max_len] + "…" + + +def _first_chat_role(models: list[dict]) -> str | None: + for m in models: + if m.get("geniehive", {}).get("route_type") == "role" and \ + m.get("geniehive", {}).get("operation") == "chat": + return m["id"] + return None + + +def _first_chat_asset(models: list[dict]) -> str | None: + for m in models: + if m.get("geniehive", {}).get("route_type") == "asset" and \ + m.get("geniehive", {}).get("operation") == "chat": + return m["id"] + return None + + +def _first_embed_asset(models: list[dict]) -> str | None: + for m in models: + if m.get("geniehive", {}).get("operation") == "embeddings": + return m["id"] + return None + + +# ── Individual checks ───────────────────────────────────────────────────────── + +def check_health(client: httpx.Client, base: str, s: Suite) -> bool: + try: + r = client.get(f"{base}/health") + if r.status_code == 200 and r.json().get("status") == "ok": + s.ok("control plane health") + return True + s.fail("control plane health", f"status={r.status_code} body={_short(r.text)}") + except Exception as exc: + s.fail("control plane health", str(exc)) + return False + + +def check_cluster_state(client: httpx.Client, base: str, api_key: str, s: Suite) -> dict: + """Returns {'hosts': [...], 'services': [...], 'roles': [...]} or partial.""" + result: dict[str, list] = {} + for name, path in [("hosts", "/v1/cluster/hosts"), + ("services", "/v1/cluster/services"), + ("roles", "/v1/cluster/roles")]: + try: + r = client.get(f"{base}{path}", headers=_headers(api_key)) + if r.status_code == 200: + data = r.json() + items = data.get(name, data.get("data", [])) + result[name] = items + s.ok(f"cluster {name} registered", f"{len(items)} {name}") + else: + s.fail(f"cluster {name} registered", + f"status={r.status_code} body={_short(r.text)}") + except Exception as exc: + s.fail(f"cluster {name} registered", str(exc)) + return result + + +def check_model_catalog(client: httpx.Client, base: str, api_key: str, + s: Suite) -> list[dict]: + try: + r = client.get(f"{base}/v1/models", headers=_headers(api_key)) + if r.status_code != 200: + s.fail("model catalog GET /v1/models", + f"status={r.status_code} body={_short(r.text)}") + return [] + models = r.json().get("data", []) + role_count = sum(1 for m in models + if m.get("geniehive", {}).get("route_type") == "role") + asset_count = sum(1 for m in models + if m.get("geniehive", {}).get("route_type") == "asset") + s.ok("model catalog GET /v1/models", + f"{len(models)} total ({role_count} roles, {asset_count} assets)") + return models + except Exception as exc: + s.fail("model catalog GET /v1/models", str(exc)) + return [] + + +def check_route_resolve(client: httpx.Client, base: str, api_key: str, + role: str, s: Suite) -> bool: + try: + r = client.get(f"{base}/v1/cluster/routes/resolve", + params={"model": role}, + headers=_headers(api_key)) + if r.status_code == 200: + data = r.json() + svc_id = data.get("service", {}).get("service_id", "?") + s.ok(f"route resolve '{role}'", f"→ {svc_id}") + return True + s.fail(f"route resolve '{role}'", + f"status={r.status_code} body={_short(r.text)}") + except Exception as exc: + s.fail(f"route resolve '{role}'", str(exc)) + return False + + +def check_chat_nonstreaming(client: httpx.Client, base: str, api_key: str, + model: str, label: str, s: Suite) -> bool: + body: dict[str, Any] = { + "model": model, + "messages": [{"role": "user", + "content": "Reply with exactly the word: ready"}], + "max_tokens": 16, + } + try: + r = client.post(f"{base}/v1/chat/completions", + headers=_json_headers(api_key), + json=body, + timeout=120.0) + if r.status_code == 200: + data = r.json() + content = (data.get("choices", [{}])[0] + .get("message", {}).get("content", "")) + s.ok(f"chat non-streaming [{label}]", f"model={data.get('model')} " + f"reply={_short(content, 60)!r}") + return True + s.fail(f"chat non-streaming [{label}]", + f"status={r.status_code} body={_short(r.text)}") + except Exception as exc: + s.fail(f"chat non-streaming [{label}]", str(exc)) + return False + + +def check_chat_streaming(base: str, api_key: str, model: str, s: Suite) -> bool: + """Sends a streaming chat request and validates the SSE response.""" + body: dict[str, Any] = { + "model": model, + "messages": [{"role": "user", + "content": "Reply with exactly the word: streaming"}], + "max_tokens": 16, + "stream": True, + } + url = f"{base.rstrip('/')}/v1/chat/completions" + try: + chunk_count = 0 + content_parts: list[str] = [] + got_done = False + with httpx.stream("POST", url, + headers=_json_headers(api_key), + json=body, + timeout=120.0) as resp: + if resp.status_code != 200: + body_text = resp.read().decode(errors="replace") + s.fail("chat streaming", f"status={resp.status_code} {_short(body_text)}") + return False + ct = resp.headers.get("content-type", "") + if "text/event-stream" not in ct: + s.fail("chat streaming", + f"expected text/event-stream content-type, got: {ct!r}") + return False + for line in resp.iter_lines(): + if not line.startswith("data:"): + continue + payload = line[5:].strip() + if payload == "[DONE]": + got_done = True + break + try: + chunk = json.loads(payload) + except json.JSONDecodeError: + continue + chunk_count += 1 + delta = (chunk.get("choices", [{}])[0] + .get("delta", {}).get("content") or "") + if delta: + content_parts.append(delta) + # reasoning fields must have been stripped + delta_obj = chunk.get("choices", [{}])[0].get("delta", {}) + if "reasoning_content" in delta_obj or "reasoning" in chunk: + s.fail("chat streaming", + "reasoning fields not stripped from SSE chunk") + return False + + if not got_done: + s.fail("chat streaming", "stream ended without [DONE] sentinel") + return False + reply = "".join(content_parts) + s.ok("chat streaming", + f"{chunk_count} data chunks, reply={_short(reply, 60)!r}") + return True + except Exception as exc: + s.fail("chat streaming", str(exc)) + return False + + +def check_embeddings(client: httpx.Client, base: str, api_key: str, + model: str, s: Suite) -> bool: + body = {"model": model, "input": "GenieHive smoke test embedding probe."} + try: + r = client.post(f"{base}/v1/embeddings", + headers=_json_headers(api_key), + json=body, + timeout=60.0) + if r.status_code == 200: + data = r.json() + vec = data.get("data", [{}])[0].get("embedding", []) + s.ok("embeddings", f"model={data.get('model')} dims={len(vec)}") + return True + s.fail("embeddings", f"status={r.status_code} body={_short(r.text)}") + except Exception as exc: + s.fail("embeddings", str(exc)) + return False + + +def check_ollama_discovery_metrics(services: list[dict], s: Suite) -> None: + """Checks that at least one Ollama-backed service has loaded_model_count populated.""" + ollama_services = [ + svc for svc in services + if svc.get("runtime", {}).get("engine") == "ollama" + or "ollama" in svc.get("service_id", "").lower() + ] + if not ollama_services: + s.skip("Ollama discovery metrics", + "no Ollama-backed services registered — " + "set discover_protocol: ollama in node config to enable") + return + populated = [ + svc for svc in ollama_services + if svc.get("observed", {}).get("loaded_model_count") is not None + ] + if populated: + examples = ", ".join( + f"{svc['service_id']}:" + f"loaded_model_count={svc['observed']['loaded_model_count']}" + for svc in populated[:2] + ) + s.ok("Ollama discovery metrics", examples) + else: + s.fail("Ollama discovery metrics", + f"{len(ollama_services)} Ollama service(s) registered but " + "observed.loaded_model_count is null — " + "check that discover_protocol: ollama is set and a heartbeat has completed") + + +def check_reasoning_stripped(client: httpx.Client, base: str, api_key: str, + model: str, s: Suite) -> None: + """Checks that reasoning_content is absent from non-streaming responses.""" + body: dict[str, Any] = { + "model": model, + "messages": [{"role": "user", "content": "Reply with exactly: ok"}], + "max_tokens": 8, + } + try: + r = client.post(f"{base}/v1/chat/completions", + headers=_json_headers(api_key), + json=body, + timeout=60.0) + if r.status_code != 200: + s.skip("reasoning fields stripped", + f"chat returned {r.status_code} — skipping strip check") + return + data = r.json() + choice = (data.get("choices") or [{}])[0] + msg = choice.get("message", {}) + if "reasoning_content" in msg or "reasoning" in choice: + s.fail("reasoning fields stripped", + "reasoning_content or reasoning present in response") + else: + s.ok("reasoning fields stripped") + except Exception as exc: + s.skip("reasoning fields stripped", str(exc)) + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser( + description="GenieHive end-to-end smoke test", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent(__doc__ or ""), + ) + parser.add_argument("--base-url", default="http://127.0.0.1:8800", + help="GenieHive control-plane base URL") + parser.add_argument("--api-key", default="change-me-client-key", + help="GenieHive client API key") + parser.add_argument("--chat-role", + help="Role alias to use for chat tests (auto-detected if omitted)") + parser.add_argument("--chat-asset", + help="Direct asset ID for chat tests (auto-detected if omitted)") + parser.add_argument("--embed-asset", + help="Direct asset ID for embeddings tests (auto-detected if omitted)") + args = parser.parse_args() + + base = args.base_url.rstrip("/") + s = Suite() + + print(f"\nGenieHive smoke test → {base}\n") + + with httpx.Client(timeout=30.0) as client: + # ── 1. Health ────────────────────────────────────────────────────────── + if not check_health(client, base, s): + print(f"\nControl plane unreachable — aborting.\n{s.summary()}") + sys.exit(1) + + # ── 2. Cluster state ─────────────────────────────────────────────────── + cluster = check_cluster_state(client, base, args.api_key, s) + services = cluster.get("services", []) + + # ── 3. Model catalog ─────────────────────────────────────────────────── + models = check_model_catalog(client, base, args.api_key, s) + + # ── 4. Detect targets ────────────────────────────────────────────────── + chat_role = args.chat_role or _first_chat_role(models) + chat_asset = args.chat_asset or _first_chat_asset(models) + embed_asset = args.embed_asset or _first_embed_asset(models) + + # ── 5. Route resolution ──────────────────────────────────────────────── + if chat_role: + check_route_resolve(client, base, args.api_key, chat_role, s) + else: + s.skip("route resolve", "no chat role in catalog") + + # ── 6. Non-streaming chat via role ───────────────────────────────────── + if chat_role: + ok = check_chat_nonstreaming( + client, base, args.api_key, chat_role, f"role={chat_role}", s) + if ok: + check_reasoning_stripped(client, base, args.api_key, chat_role, s) + else: + s.skip("chat non-streaming [role]", "no chat role in catalog") + s.skip("reasoning fields stripped", "no chat role in catalog") + + # ── 7. Non-streaming chat via direct asset ───────────────────────────── + if chat_asset: + check_chat_nonstreaming( + client, base, args.api_key, chat_asset, f"asset={chat_asset}", s) + else: + s.skip("chat non-streaming [direct asset]", "no chat asset in catalog") + + # ── 8. Streaming chat (requires its own httpx.stream context) ───────────── + if chat_role: + check_chat_streaming(base, args.api_key, chat_role, s) + else: + s.skip("chat streaming", "no chat role in catalog") + + # ── 9. Embeddings ────────────────────────────────────────────────────────── + with httpx.Client(timeout=60.0) as client: + if embed_asset: + check_embeddings(client, base, args.api_key, embed_asset, s) + else: + s.skip("embeddings", "no embeddings asset in catalog") + + # ── 10. Ollama discovery metrics ─────────────────────────────────────── + check_ollama_discovery_metrics(services, s) + + # ── Summary ─────────────────────────────────────────────────────────────── + print(f"\n{s.summary()}") + if s.failed: + print("\nFailed checks:") + for c in s.failed: + print(f" • {c.name}: {c.detail}") + sys.exit(1) + + +if __name__ == "__main__": + main()