from __future__ import annotations import asyncio from pathlib import Path import httpx from rolemesh_node_agent.adapters.base import DeviceMetrics, DeviceRef from rolemesh_node_agent.adapters.cuda import CudaAdapter from rolemesh_node_agent.config import ModelEntry, NodeAgentConfig from rolemesh_node_agent.main import _merge_scheduler_metrics, _registration_payload, _select_device from rolemesh_node_agent.scheduler import AdmissionError, DeviceQueue def _node_config(tmp_path: Path) -> NodeAgentConfig: model_path = tmp_path / "model.gguf" model_path.write_bytes(b"GGUF") return NodeAgentConfig( node_id="node-1", model_roots=[tmp_path], models=[ModelEntry(model_id="planner-gguf", path=model_path, roles=["planner"])], ) async def _request(app, method: str, path: str, **kwargs) -> httpx.Response: transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: return await client.request(method, path, **kwargs) def test_inventory_reports_models_metrics_and_discovered_gguf(tmp_path): from rolemesh_node_agent.main import create_app cfg = _node_config(tmp_path) app = create_app(cfg) async def fake_discover_devices(): return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")] async def fake_get_metrics(): return [ DeviceMetrics( device=DeviceRef(kind="gpu", backend="cuda", id="gpu:0"), loaded_model_id="planner-gguf", queue_depth=1, ) ] app.state.cuda.discover_devices = fake_discover_devices app.state.cuda.get_metrics = fake_get_metrics response = asyncio.run(_request(app, "GET", "/v1/node/inventory")) body = response.json() assert response.status_code == 200 assert body["models"][0]["model_id"] == "planner-gguf" assert body["metrics"][0]["loaded_model_id"] == "planner-gguf" assert body["discovered_gguf"][0]["name"] == "model.gguf" asyncio.run(app.state.http.aclose()) def test_registration_payload_uses_served_models_from_local_catalog(tmp_path): model_path = tmp_path / "model.gguf" model_path.write_bytes(b"GGUF") cfg = NodeAgentConfig( node_id="node-1", listen_host="192.168.1.101", listen_port=8091, dispatcher_roles=["mentor", "code_tutor"], models=[ ModelEntry( model_id="qwen3-8b", path=model_path, roles=["mentor", "tutor"], alias="mentor", ), ModelEntry( model_id="qwen2.5-coder-14b", path=model_path, roles=["code_tutor"], ), ], ) payload = _registration_payload(cfg) assert payload["node_id"] == "node-1" assert payload["base_url"] == "http://192.168.1.101:8091" assert payload["roles"] == ["mentor", "code_tutor"] assert payload["served_models"] == [ { "model_id": "qwen3-8b", "roles": ["mentor"], "meta": {"alias": "mentor"}, }, { "model_id": "qwen2.5-coder-14b", "roles": ["code_tutor"], "meta": {}, }, ] def test_registration_endpoint_returns_computed_payload(tmp_path): from rolemesh_node_agent.main import create_app model_path = tmp_path / "model.gguf" model_path.write_bytes(b"GGUF") cfg = NodeAgentConfig( node_id="node-1", listen_host="192.168.1.101", listen_port=8091, models=[ ModelEntry(model_id="qwen3-8b", path=model_path, roles=["mentor", "tutor"]), ], ) app = create_app(cfg) response = asyncio.run(_request(app, "GET", "/v1/node/registration")) assert response.status_code == 200 body = response.json() assert body["node_id"] == "node-1" assert body["base_url"] == "http://192.168.1.101:8091" assert body["served_models"][0]["model_id"] == "qwen3-8b" assert body["served_models"][0]["roles"] == ["mentor", "tutor"] asyncio.run(app.state.http.aclose()) def test_chat_completions_routes_to_local_server_and_streams(tmp_path): from rolemesh_node_agent.main import create_app cfg = _node_config(tmp_path) app = create_app(cfg) calls = {} async def fake_discover_devices(): return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")] async def fake_ensure_server(device, *, model_path, model_id, server_args): calls["device"] = device.id calls["model_path"] = model_path calls["model_id"] = model_id return "http://127.0.0.1:9100" async def fake_chat(base_url, payload): calls["base_url"] = base_url calls["payload"] = payload return {"id": "node-cmpl", "choices": [{"message": {"role": "assistant", "content": "ok"}}]} async def fake_stream(base_url, payload): calls["stream_base_url"] = base_url calls["stream_payload"] = payload yield b"data: first\n\n" yield b"data: [DONE]\n\n" app.state.cuda.discover_devices = fake_discover_devices app.state.cuda.ensure_server = fake_ensure_server app.state.upstream.chat_completions = fake_chat app.state.upstream.stream_chat_completions = fake_stream response = asyncio.run( _request( app, "POST", "/v1/chat/completions", json={ "model": "planner-gguf", "messages": [{"role": "user", "content": "hello"}], }, ) ) stream_response = asyncio.run( _request( app, "POST", "/v1/chat/completions", json={ "model": "planner-gguf", "stream": True, "messages": [{"role": "user", "content": "hello"}], }, ) ) assert response.status_code == 200 assert response.json()["choices"][0]["message"]["content"] == "ok" assert calls["device"] == "gpu:0" assert calls["base_url"] == "http://127.0.0.1:9100" assert "data: first" in stream_response.text assert calls["stream_base_url"] == "http://127.0.0.1:9100" asyncio.run(app.state.http.aclose()) def test_chat_completions_returns_503_when_server_startup_fails(tmp_path): from rolemesh_node_agent.main import create_app from rolemesh_node_agent.adapters.cuda import ServerStartupError cfg = _node_config(tmp_path) app = create_app(cfg) async def fake_discover_devices(): return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")] async def fake_ensure_server(device, *, model_path, model_id, server_args): raise ServerStartupError("Timed out waiting for model load") app.state.cuda.discover_devices = fake_discover_devices app.state.cuda.ensure_server = fake_ensure_server response = asyncio.run( _request( app, "POST", "/v1/chat/completions", json={ "model": "planner-gguf", "messages": [{"role": "user", "content": "hello"}], }, ) ) assert response.status_code == 503 assert response.json()["error"]["code"] == "server_startup_error" asyncio.run(app.state.http.aclose()) def test_select_device_prefers_already_loaded_model(): devices = [ DeviceRef(kind="gpu", backend="cuda", id="gpu:0"), DeviceRef(kind="gpu", backend="cuda", id="gpu:1"), ] metrics = [ DeviceMetrics( device=devices[0], loaded_model_id="other-model", mem_total_gb=24.0, mem_used_gb=4.0, ), DeviceMetrics( device=devices[1], loaded_model_id="target-model", mem_total_gb=24.0, mem_used_gb=20.0, ), ] picked = _select_device(devices, metrics, model_id="target-model") assert picked == devices[1] def test_select_device_prefers_more_free_memory_and_lower_pressure(): devices = [ DeviceRef(kind="gpu", backend="cuda", id="gpu:0"), DeviceRef(kind="gpu", backend="cuda", id="gpu:1"), ] metrics = [ DeviceMetrics( device=devices[0], mem_total_gb=24.0, mem_used_gb=18.0, queue_depth=0, in_flight_jobs=0, utilization_pct=5.0, ), DeviceMetrics( device=devices[1], mem_total_gb=24.0, mem_used_gb=4.0, queue_depth=0, in_flight_jobs=0, utilization_pct=10.0, ), ] picked = _select_device(devices, metrics, model_id="target-model") assert picked == devices[1] def test_chat_completions_uses_selected_device_not_first_device(tmp_path): from rolemesh_node_agent.main import create_app cfg = _node_config(tmp_path) app = create_app(cfg) calls = {} devices = [ DeviceRef(kind="gpu", backend="cuda", id="gpu:0"), DeviceRef(kind="gpu", backend="cuda", id="gpu:1"), ] async def fake_discover_devices(): return devices async def fake_get_metrics(): return [ DeviceMetrics( device=devices[0], mem_total_gb=24.0, mem_used_gb=20.0, ), DeviceMetrics( device=devices[1], mem_total_gb=24.0, mem_used_gb=2.0, ), ] async def fake_ensure_server(device, *, model_path, model_id, server_args): calls["device"] = device.id return "http://127.0.0.1:9100" async def fake_chat(base_url, payload): return {"id": "node-cmpl", "choices": [{"message": {"role": "assistant", "content": "ok"}}]} app.state.cuda.discover_devices = fake_discover_devices app.state.cuda.get_metrics = fake_get_metrics app.state.cuda.ensure_server = fake_ensure_server app.state.upstream.chat_completions = fake_chat response = asyncio.run( _request( app, "POST", "/v1/chat/completions", json={ "model": "planner-gguf", "messages": [{"role": "user", "content": "hello"}], }, ) ) assert response.status_code == 200 assert calls["device"] == "gpu:1" asyncio.run(app.state.http.aclose()) def test_cuda_adapter_build_server_args_from_structured_fields(tmp_path): model_path = tmp_path / "model.gguf" model_path.write_bytes(b"GGUF") adapter = CudaAdapter() model = ModelEntry( model_id="planner-gguf", path=model_path, roles=["planner"], ctx_size=4096, batch_size=1024, ubatch_size=256, threads=8, threads_batch=4, gpu_layers=999, main_gpu=1, tensor_split=[3, 1], flash_attn=True, alias="planner", server_args={"parallel": 2}, ) args = adapter.build_server_args(model) assert args["ctx-size"] == 4096 assert args["batch-size"] == 1024 assert args["ubatch-size"] == 256 assert args["threads"] == 8 assert args["threads-batch"] == 4 assert args["gpu-layers"] == 999 assert args["main-gpu"] == 1 assert args["tensor-split"] == "3,1" assert args["flash-attn"] == "on" assert args["alias"] == "planner" assert args["parallel"] == 2 def test_merge_scheduler_metrics_overlays_queue_state(): device = DeviceRef(kind="gpu", backend="cuda", id="gpu:0") metric = DeviceMetrics(device=device) queue = DeviceQueue(max_pending=2) queue._queued = 1 queue._in_flight = 1 merged = _merge_scheduler_metrics([metric], {"gpu:0": queue}) assert merged[0].queue_depth == 1 assert merged[0].in_flight_jobs == 1 def test_device_queue_rejects_when_full(): queue = DeviceQueue(max_pending=1) queue._in_flight = 1 try: asyncio.run(queue.acquire()) except AdmissionError as exc: assert "full" in str(exc) else: raise AssertionError("expected AdmissionError") def test_chat_completions_returns_429_when_device_queue_is_full(tmp_path): from rolemesh_node_agent.main import create_app cfg = _node_config(tmp_path) cfg.max_pending_requests_per_device = 1 app = create_app(cfg) async def fake_discover_devices(): return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")] async def fake_get_metrics(): return [DeviceMetrics(device=DeviceRef(kind="gpu", backend="cuda", id="gpu:0"))] app.state.cuda.discover_devices = fake_discover_devices app.state.cuda.get_metrics = fake_get_metrics saturated = DeviceQueue(max_pending=1) saturated._in_flight = 1 app.state.device_queues["gpu:0"] = saturated response = asyncio.run( _request( app, "POST", "/v1/chat/completions", json={ "model": "planner-gguf", "messages": [{"role": "user", "content": "hello"}], }, ) ) assert response.status_code == 429 assert response.json()["error"]["code"] == "queue_full" asyncio.run(app.state.http.aclose())