diff --git a/.gitignore b/.gitignore index 3ffff1e..03ac4e6 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ build/ state/registry.json configs/models.yaml tmp-codex/ +codex-resume.sh +codex* \ No newline at end of file diff --git a/README.md b/README.md index 154f97d..be9cd87 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,59 @@ curl -sS -X POST http://127.0.0.1:8000/v1/chat/completions \ If you prefer the provided example file, copy `configs/models.example.yaml` and adjust the `proxy_url` values. +## Known Good Inference Backends + +The gateway is designed to work with any backend that exposes OpenAI-compatible `GET /v1/models` and +`POST /v1/chat/completions` endpoints. The following applications have been exercised successfully in this repository. + +### Ollama + +- Verified directly against `http://127.0.0.1:11434` +- Verified through RoleMesh Gateway proxy routing +- Tested with model `dolphin3:latest` + +Example upstream: + +```yaml +models: + planner: + type: proxy + openai_model_name: planner + proxy_url: http://127.0.0.1:11434 + defaults: + model: dolphin3:latest +``` + +Note: when proxying to Ollama's OpenAI-compatible API, the upstream Ollama model name still needs to be supplied. +One simple pattern is to set it in `defaults.model` and let the gateway inject it. + +### Llamafile + +- Verified directly with the newer `llamafile` runner in `tmp-codex/llamafile` +- Verified through RoleMesh Gateway proxy routing +- Verified role switching between two live backends +- Tested successfully with: + - `phi-2.Q5_K_M.llamafile` + - `rocket-3b.Q5_K_M.llamafile` + +Example launch: + +```bash +./llamafile --server -m /path/to/model.gguf --host 127.0.0.1 --port 8011 --nobrowser +``` + +### llama.cpp / llama-server + +- Verified live through the RoleMesh Node Agent on NVIDIA GPUs +- Tested with `/home/netuser/bin/llama.cpp/build/bin/llama-server` +- Tested model load and model switching on Tesla P40 GPUs +- Tested successfully with: + - `gemma-2b-it-q8_0.gguf` + - `Mistral-7B-Instruct-v0.3-Q5_K_M.gguf` + +The node agent now waits for `llama-server` readiness during model load or model switch before proxying the first +request, which avoids transient "Loading model" failures on cold start. + ## Multi-host (node registration) If you want machines to host backends and “register” them dynamically, run a tiny node agent on each backend host @@ -110,6 +163,8 @@ This repository is a **preliminary scaffold**: - Registration and load-selection are implemented (basic round-robin). - API-key auth for clients and nodes is available. - Persistence is basic JSON-backed state, not a full service registry. +- Gateway proxying has been exercised live with Ollama and `llamafile`. +- Node-agent managed inference has been exercised live with `llama-server` on CUDA hardware. ## License diff --git a/configs/node_agent.example.yaml b/configs/node_agent.example.yaml index 4b90d0c..e16a6c5 100644 --- a/configs/node_agent.example.yaml +++ b/configs/node_agent.example.yaml @@ -10,6 +10,8 @@ dispatcher_roles: ["planner", "coder"] heartbeat_interval_sec: 5 llama_server_bin: "llama-server" +llama_server_startup_timeout_s: 30 +llama_server_probe_interval_s: 0.5 model_roots: - "/models" diff --git a/docs/NODE_AGENT.md b/docs/NODE_AGENT.md index ef45d9b..a9e8511 100644 --- a/docs/NODE_AGENT.md +++ b/docs/NODE_AGENT.md @@ -13,6 +13,9 @@ For each GPU device, the node agent starts a dedicated `llama-server` process, p environment variables (e.g. `CUDA_VISIBLE_DEVICES=0` for `gpu:0`) and bound to `127.0.0.1:`. Model switching is handled by **restart** in the scaffold. +The agent now waits for the replacement `llama-server` to report readiness before proxying the first request. +If startup or switching takes too long, the request fails with a `503` instead of passing through a transient upstream +"Loading model" error. ## Backends @@ -31,6 +34,20 @@ pip install -e . rolemesh-node-agent --config configs/node_agent.example.yaml ``` +### Startup timing guards + +Two config knobs control how long the node agent waits for a managed `llama-server` to become ready: + +```yaml +llama_server_startup_timeout_s: 30.0 +llama_server_probe_interval_s: 0.5 +``` + +- `llama_server_startup_timeout_s`: maximum time to wait for a newly started or switched model +- `llama_server_probe_interval_s`: polling interval for readiness checks + +The readiness probe checks the managed server's local `GET /health` and `GET /v1/models` endpoints. + ## Registering If `dispatcher_base_url` is set in the node-agent config, the node agent will periodically call: diff --git a/src/rolemesh_node_agent/adapters/cuda.py b/src/rolemesh_node_agent/adapters/cuda.py index 46cdf6d..b8f0e7b 100644 --- a/src/rolemesh_node_agent/adapters/cuda.py +++ b/src/rolemesh_node_agent/adapters/cuda.py @@ -1,13 +1,15 @@ from __future__ import annotations import asyncio +import json import os -import shlex import socket import subprocess import time +import urllib.error +import urllib.request from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List from .base import DeviceMetrics, DeviceRef, RuntimeAdapter @@ -56,6 +58,10 @@ class _ServerProc: started_at: float +class ServerStartupError(RuntimeError): + pass + + class CudaAdapter(RuntimeAdapter): """ CUDA adapter that manages one persistent ggml-org/llama.cpp 'llama-server' per GPU. @@ -74,8 +80,15 @@ class CudaAdapter(RuntimeAdapter): backend_name = "cuda" - def __init__(self, llama_server_bin: str = "llama-server") -> None: + def __init__( + self, + llama_server_bin: str = "llama-server", + startup_timeout_s: float = 30.0, + probe_interval_s: float = 0.5, + ) -> None: self._bin = llama_server_bin + self._startup_timeout_s = startup_timeout_s + self._probe_interval_s = probe_interval_s self._servers: Dict[str, _ServerProc] = {} # key: device.id self._lock = asyncio.Lock() @@ -160,9 +173,67 @@ class CudaAdapter(RuntimeAdapter): started_at=time.time(), ) - # TODO: replace with a real readiness probe (e.g., GET /health on llama-server) - await asyncio.sleep(0.25) - return f"http://127.0.0.1:{port}" + base_url = f"http://127.0.0.1:{port}" + try: + await self._wait_for_server_ready(base_url, proc=proc) + except Exception: + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=5) + except Exception: + proc.kill() + self._servers.pop(device.id, None) + raise + return base_url + + async def _wait_for_server_ready(self, base_url: str, *, proc: subprocess.Popen) -> None: + deadline = time.monotonic() + self._startup_timeout_s + last_error = "server did not become ready" + while time.monotonic() < deadline: + if proc.poll() is not None: + raise ServerStartupError( + f"llama-server exited before becoming ready (exit code {proc.returncode})." + ) + try: + await asyncio.to_thread(self._probe_server_ready, base_url) + return + except ServerStartupError as exc: + last_error = str(exc) + await asyncio.sleep(self._probe_interval_s) + raise ServerStartupError( + f"Timed out after {self._startup_timeout_s:.1f}s waiting for llama-server readiness: {last_error}" + ) + + def _probe_server_ready(self, base_url: str) -> None: + for path in ("/health", "/v1/models"): + url = base_url.rstrip("/") + path + try: + with urllib.request.urlopen(url, timeout=2.0) as response: + if response.status >= 400: + continue + body = response.read() + except (urllib.error.URLError, TimeoutError, OSError): + continue + + if path == "/health": + try: + payload = json.loads(body.decode("utf-8")) + except Exception: + payload = None + if isinstance(payload, dict) and payload.get("status") == "ok": + return + continue + + if path == "/v1/models": + try: + payload = json.loads(body.decode("utf-8")) + except Exception: + payload = None + if isinstance(payload, dict) and isinstance(payload.get("data"), list): + return + + raise ServerStartupError("llama-server probe did not return a ready response.") async def shutdown(self) -> None: async with self._lock: diff --git a/src/rolemesh_node_agent/config.py b/src/rolemesh_node_agent/config.py index 6006a9f..3d454f9 100644 --- a/src/rolemesh_node_agent/config.py +++ b/src/rolemesh_node_agent/config.py @@ -38,3 +38,5 @@ class NodeAgentConfig(BaseModel): # llama-server binary name/path llama_server_bin: str = "llama-server" + llama_server_startup_timeout_s: float = 30.0 + llama_server_probe_interval_s: float = 0.5 diff --git a/src/rolemesh_node_agent/main.py b/src/rolemesh_node_agent/main.py index 3348e69..e8147c9 100644 --- a/src/rolemesh_node_agent/main.py +++ b/src/rolemesh_node_agent/main.py @@ -10,7 +10,7 @@ from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, StreamingResponse from rolemesh_gateway.upstream import UpstreamClient, UpstreamError # reuse gateway client -from .adapters.cuda import CudaAdapter +from .adapters.cuda import CudaAdapter, ServerStartupError from .adapters.base import DeviceRef from .config import NodeAgentConfig from .inventory import discover_gguf_models @@ -28,7 +28,11 @@ def create_app(cfg: NodeAgentConfig) -> FastAPI: timeout=httpx.Timeout(connect=5.0, read=3600.0, write=30.0, pool=30.0) ) upstream = UpstreamClient(client=http) - cuda = CudaAdapter(llama_server_bin=cfg.llama_server_bin) + cuda = CudaAdapter( + llama_server_bin=cfg.llama_server_bin, + startup_timeout_s=cfg.llama_server_startup_timeout_s, + probe_interval_s=cfg.llama_server_probe_interval_s, + ) @asynccontextmanager async def lifespan(app: FastAPI): @@ -102,12 +106,15 @@ def create_app(cfg: NodeAgentConfig) -> FastAPI: return _error("No CUDA GPUs discovered on this node.", code="no_device", status_code=503) device = devices[0] - base_url = await app.state.cuda.ensure_server( - device, - model_path=str(model_entry.path), - model_id=model_entry.model_id, - server_args=model_entry.server_args, - ) + try: + base_url = await app.state.cuda.ensure_server( + device, + model_path=str(model_entry.path), + model_id=model_entry.model_id, + server_args=model_entry.server_args, + ) + except ServerStartupError as e: + return _error(str(e), code="server_startup_error", status_code=503) upstream = app.state.upstream try: diff --git a/tests/test_cuda_adapter.py b/tests/test_cuda_adapter.py new file mode 100644 index 0000000..678b6a3 --- /dev/null +++ b/tests/test_cuda_adapter.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import asyncio + +from rolemesh_node_agent.adapters.base import DeviceRef +from rolemesh_node_agent.adapters.cuda import CudaAdapter, ServerStartupError + + +def test_probe_server_ready_accepts_health_response(monkeypatch): + adapter = CudaAdapter() + + class _Response: + def __init__(self, status: int, body: bytes): + self.status = status + self._body = body + + def read(self): + return self._body + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return None + + def fake_urlopen(url, timeout): + return _Response(200, b'{"status":"ok"}') + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + + adapter._probe_server_ready("http://127.0.0.1:1234") + + +def test_wait_for_server_ready_times_out_when_probe_never_succeeds(monkeypatch): + adapter = CudaAdapter(startup_timeout_s=0.0, probe_interval_s=0.0) + + class _Proc: + returncode = None + + def poll(self): + return None + + try: + asyncio.run(adapter._wait_for_server_ready("http://127.0.0.1:1234", proc=_Proc())) + except ServerStartupError as exc: + assert "Timed out" in str(exc) + else: + raise AssertionError("expected ServerStartupError") + + +def test_ensure_server_reuses_existing_matching_process(): + adapter = CudaAdapter() + device = DeviceRef(kind="gpu", backend="cuda", id="gpu:0") + + class _Proc: + def poll(self): + return None + + adapter._servers[device.id] = type( + "_SP", + (), + { + "device": device, + "model_id": "planner", + "model_path": "/tmp/model.gguf", + "port": 1234, + "proc": _Proc(), + "started_at": 1.0, + }, + )() + + base_url = asyncio.run( + adapter.ensure_server( + device, + model_path="/tmp/model.gguf", + model_id="planner", + server_args={}, + ) + ) + + assert base_url == "http://127.0.0.1:1234" diff --git a/tests/test_node_agent.py b/tests/test_node_agent.py index 59dd478..dfd9bf8 100644 --- a/tests/test_node_agent.py +++ b/tests/test_node_agent.py @@ -119,3 +119,36 @@ def test_chat_completions_routes_to_local_server_and_streams(tmp_path): assert "data: first" in stream_response.text assert calls["stream_base_url"] == "http://127.0.0.1:9100" asyncio.run(app.state.http.aclose()) + + +def test_chat_completions_returns_503_when_server_startup_fails(tmp_path): + from rolemesh_node_agent.main import create_app + from rolemesh_node_agent.adapters.cuda import ServerStartupError + + cfg = _node_config(tmp_path) + app = create_app(cfg) + + async def fake_discover_devices(): + return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")] + + async def fake_ensure_server(device, *, model_path, model_id, server_args): + raise ServerStartupError("Timed out waiting for model load") + + app.state.cuda.discover_devices = fake_discover_devices + app.state.cuda.ensure_server = fake_ensure_server + + response = asyncio.run( + _request( + app, + "POST", + "/v1/chat/completions", + json={ + "model": "planner-gguf", + "messages": [{"role": "user", "content": "hello"}], + }, + ) + ) + + assert response.status_code == 503 + assert response.json()["error"]["code"] == "server_startup_error" + asyncio.run(app.state.http.aclose())