from __future__ import annotations import asyncio from rolemesh_node_agent.adapters.base import DeviceRef from rolemesh_node_agent.adapters.cuda import CudaAdapter, ServerStartupError def test_probe_server_ready_accepts_health_response(monkeypatch): adapter = CudaAdapter() class _Response: def __init__(self, status: int, body: bytes): self.status = status self._body = body def read(self): return self._body def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return None def fake_urlopen(url, timeout): return _Response(200, b'{"status":"ok"}') monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) adapter._probe_server_ready("http://127.0.0.1:1234") def test_wait_for_server_ready_times_out_when_probe_never_succeeds(monkeypatch): adapter = CudaAdapter(startup_timeout_s=0.0, probe_interval_s=0.0) class _Proc: returncode = None def poll(self): return None try: asyncio.run(adapter._wait_for_server_ready("http://127.0.0.1:1234", proc=_Proc())) except ServerStartupError as exc: assert "Timed out" in str(exc) else: raise AssertionError("expected ServerStartupError") def test_ensure_server_reuses_existing_matching_process(): adapter = CudaAdapter() device = DeviceRef(kind="gpu", backend="cuda", id="gpu:0") class _Proc: def poll(self): return None adapter._servers[device.id] = type( "_SP", (), { "device": device, "model_id": "planner", "model_path": "/tmp/model.gguf", "port": 1234, "proc": _Proc(), "started_at": 1.0, }, )() base_url = asyncio.run( adapter.ensure_server( device, model_path="/tmp/model.gguf", model_id="planner", server_args={}, ) ) assert base_url == "http://127.0.0.1:1234"