RoleMesh-Gateway/tests/test_node_agent.py

433 lines
13 KiB
Python

from __future__ import annotations
import asyncio
from pathlib import Path
import httpx
from rolemesh_node_agent.adapters.base import DeviceMetrics, DeviceRef
from rolemesh_node_agent.adapters.cuda import CudaAdapter
from rolemesh_node_agent.config import ModelEntry, NodeAgentConfig
from rolemesh_node_agent.main import _merge_scheduler_metrics, _registration_payload, _select_device
from rolemesh_node_agent.scheduler import AdmissionError, DeviceQueue
def _node_config(tmp_path: Path) -> NodeAgentConfig:
model_path = tmp_path / "model.gguf"
model_path.write_bytes(b"GGUF")
return NodeAgentConfig(
node_id="node-1",
model_roots=[tmp_path],
models=[ModelEntry(model_id="planner-gguf", path=model_path, roles=["planner"])],
)
async def _request(app, method: str, path: str, **kwargs) -> httpx.Response:
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client:
return await client.request(method, path, **kwargs)
def test_inventory_reports_models_metrics_and_discovered_gguf(tmp_path):
from rolemesh_node_agent.main import create_app
cfg = _node_config(tmp_path)
app = create_app(cfg)
async def fake_discover_devices():
return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")]
async def fake_get_metrics():
return [
DeviceMetrics(
device=DeviceRef(kind="gpu", backend="cuda", id="gpu:0"),
loaded_model_id="planner-gguf",
queue_depth=1,
)
]
app.state.cuda.discover_devices = fake_discover_devices
app.state.cuda.get_metrics = fake_get_metrics
response = asyncio.run(_request(app, "GET", "/v1/node/inventory"))
body = response.json()
assert response.status_code == 200
assert body["models"][0]["model_id"] == "planner-gguf"
assert body["metrics"][0]["loaded_model_id"] == "planner-gguf"
assert body["discovered_gguf"][0]["name"] == "model.gguf"
asyncio.run(app.state.http.aclose())
def test_registration_payload_uses_served_models_from_local_catalog(tmp_path):
model_path = tmp_path / "model.gguf"
model_path.write_bytes(b"GGUF")
cfg = NodeAgentConfig(
node_id="node-1",
listen_host="192.168.1.101",
listen_port=8091,
dispatcher_roles=["mentor", "code_tutor"],
models=[
ModelEntry(
model_id="qwen3-8b",
path=model_path,
roles=["mentor", "tutor"],
alias="mentor",
),
ModelEntry(
model_id="qwen2.5-coder-14b",
path=model_path,
roles=["code_tutor"],
),
],
)
payload = _registration_payload(cfg)
assert payload["node_id"] == "node-1"
assert payload["base_url"] == "http://192.168.1.101:8091"
assert payload["roles"] == ["mentor", "code_tutor"]
assert payload["served_models"] == [
{
"model_id": "qwen3-8b",
"roles": ["mentor"],
"meta": {"alias": "mentor"},
},
{
"model_id": "qwen2.5-coder-14b",
"roles": ["code_tutor"],
"meta": {},
},
]
def test_registration_endpoint_returns_computed_payload(tmp_path):
from rolemesh_node_agent.main import create_app
model_path = tmp_path / "model.gguf"
model_path.write_bytes(b"GGUF")
cfg = NodeAgentConfig(
node_id="node-1",
listen_host="192.168.1.101",
listen_port=8091,
models=[
ModelEntry(model_id="qwen3-8b", path=model_path, roles=["mentor", "tutor"]),
],
)
app = create_app(cfg)
response = asyncio.run(_request(app, "GET", "/v1/node/registration"))
assert response.status_code == 200
body = response.json()
assert body["node_id"] == "node-1"
assert body["base_url"] == "http://192.168.1.101:8091"
assert body["served_models"][0]["model_id"] == "qwen3-8b"
assert body["served_models"][0]["roles"] == ["mentor", "tutor"]
asyncio.run(app.state.http.aclose())
def test_chat_completions_routes_to_local_server_and_streams(tmp_path):
from rolemesh_node_agent.main import create_app
cfg = _node_config(tmp_path)
app = create_app(cfg)
calls = {}
async def fake_discover_devices():
return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")]
async def fake_ensure_server(device, *, model_path, model_id, server_args):
calls["device"] = device.id
calls["model_path"] = model_path
calls["model_id"] = model_id
return "http://127.0.0.1:9100"
async def fake_chat(base_url, payload):
calls["base_url"] = base_url
calls["payload"] = payload
return {"id": "node-cmpl", "choices": [{"message": {"role": "assistant", "content": "ok"}}]}
async def fake_stream(base_url, payload):
calls["stream_base_url"] = base_url
calls["stream_payload"] = payload
yield b"data: first\n\n"
yield b"data: [DONE]\n\n"
app.state.cuda.discover_devices = fake_discover_devices
app.state.cuda.ensure_server = fake_ensure_server
app.state.upstream.chat_completions = fake_chat
app.state.upstream.stream_chat_completions = fake_stream
response = asyncio.run(
_request(
app,
"POST",
"/v1/chat/completions",
json={
"model": "planner-gguf",
"messages": [{"role": "user", "content": "hello"}],
},
)
)
stream_response = asyncio.run(
_request(
app,
"POST",
"/v1/chat/completions",
json={
"model": "planner-gguf",
"stream": True,
"messages": [{"role": "user", "content": "hello"}],
},
)
)
assert response.status_code == 200
assert response.json()["choices"][0]["message"]["content"] == "ok"
assert calls["device"] == "gpu:0"
assert calls["base_url"] == "http://127.0.0.1:9100"
assert "data: first" in stream_response.text
assert calls["stream_base_url"] == "http://127.0.0.1:9100"
asyncio.run(app.state.http.aclose())
def test_chat_completions_returns_503_when_server_startup_fails(tmp_path):
from rolemesh_node_agent.main import create_app
from rolemesh_node_agent.adapters.cuda import ServerStartupError
cfg = _node_config(tmp_path)
app = create_app(cfg)
async def fake_discover_devices():
return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")]
async def fake_ensure_server(device, *, model_path, model_id, server_args):
raise ServerStartupError("Timed out waiting for model load")
app.state.cuda.discover_devices = fake_discover_devices
app.state.cuda.ensure_server = fake_ensure_server
response = asyncio.run(
_request(
app,
"POST",
"/v1/chat/completions",
json={
"model": "planner-gguf",
"messages": [{"role": "user", "content": "hello"}],
},
)
)
assert response.status_code == 503
assert response.json()["error"]["code"] == "server_startup_error"
asyncio.run(app.state.http.aclose())
def test_select_device_prefers_already_loaded_model():
devices = [
DeviceRef(kind="gpu", backend="cuda", id="gpu:0"),
DeviceRef(kind="gpu", backend="cuda", id="gpu:1"),
]
metrics = [
DeviceMetrics(
device=devices[0],
loaded_model_id="other-model",
mem_total_gb=24.0,
mem_used_gb=4.0,
),
DeviceMetrics(
device=devices[1],
loaded_model_id="target-model",
mem_total_gb=24.0,
mem_used_gb=20.0,
),
]
picked = _select_device(devices, metrics, model_id="target-model")
assert picked == devices[1]
def test_select_device_prefers_more_free_memory_and_lower_pressure():
devices = [
DeviceRef(kind="gpu", backend="cuda", id="gpu:0"),
DeviceRef(kind="gpu", backend="cuda", id="gpu:1"),
]
metrics = [
DeviceMetrics(
device=devices[0],
mem_total_gb=24.0,
mem_used_gb=18.0,
queue_depth=0,
in_flight_jobs=0,
utilization_pct=5.0,
),
DeviceMetrics(
device=devices[1],
mem_total_gb=24.0,
mem_used_gb=4.0,
queue_depth=0,
in_flight_jobs=0,
utilization_pct=10.0,
),
]
picked = _select_device(devices, metrics, model_id="target-model")
assert picked == devices[1]
def test_chat_completions_uses_selected_device_not_first_device(tmp_path):
from rolemesh_node_agent.main import create_app
cfg = _node_config(tmp_path)
app = create_app(cfg)
calls = {}
devices = [
DeviceRef(kind="gpu", backend="cuda", id="gpu:0"),
DeviceRef(kind="gpu", backend="cuda", id="gpu:1"),
]
async def fake_discover_devices():
return devices
async def fake_get_metrics():
return [
DeviceMetrics(
device=devices[0],
mem_total_gb=24.0,
mem_used_gb=20.0,
),
DeviceMetrics(
device=devices[1],
mem_total_gb=24.0,
mem_used_gb=2.0,
),
]
async def fake_ensure_server(device, *, model_path, model_id, server_args):
calls["device"] = device.id
return "http://127.0.0.1:9100"
async def fake_chat(base_url, payload):
return {"id": "node-cmpl", "choices": [{"message": {"role": "assistant", "content": "ok"}}]}
app.state.cuda.discover_devices = fake_discover_devices
app.state.cuda.get_metrics = fake_get_metrics
app.state.cuda.ensure_server = fake_ensure_server
app.state.upstream.chat_completions = fake_chat
response = asyncio.run(
_request(
app,
"POST",
"/v1/chat/completions",
json={
"model": "planner-gguf",
"messages": [{"role": "user", "content": "hello"}],
},
)
)
assert response.status_code == 200
assert calls["device"] == "gpu:1"
asyncio.run(app.state.http.aclose())
def test_cuda_adapter_build_server_args_from_structured_fields(tmp_path):
model_path = tmp_path / "model.gguf"
model_path.write_bytes(b"GGUF")
adapter = CudaAdapter()
model = ModelEntry(
model_id="planner-gguf",
path=model_path,
roles=["planner"],
ctx_size=4096,
batch_size=1024,
ubatch_size=256,
threads=8,
threads_batch=4,
gpu_layers=999,
main_gpu=1,
tensor_split=[3, 1],
flash_attn=True,
alias="planner",
server_args={"parallel": 2},
)
args = adapter.build_server_args(model)
assert args["ctx-size"] == 4096
assert args["batch-size"] == 1024
assert args["ubatch-size"] == 256
assert args["threads"] == 8
assert args["threads-batch"] == 4
assert args["gpu-layers"] == 999
assert args["main-gpu"] == 1
assert args["tensor-split"] == "3,1"
assert args["flash-attn"] == "on"
assert args["alias"] == "planner"
assert args["parallel"] == 2
def test_merge_scheduler_metrics_overlays_queue_state():
device = DeviceRef(kind="gpu", backend="cuda", id="gpu:0")
metric = DeviceMetrics(device=device)
queue = DeviceQueue(max_pending=2)
queue._queued = 1
queue._in_flight = 1
merged = _merge_scheduler_metrics([metric], {"gpu:0": queue})
assert merged[0].queue_depth == 1
assert merged[0].in_flight_jobs == 1
def test_device_queue_rejects_when_full():
queue = DeviceQueue(max_pending=1)
queue._in_flight = 1
try:
asyncio.run(queue.acquire())
except AdmissionError as exc:
assert "full" in str(exc)
else:
raise AssertionError("expected AdmissionError")
def test_chat_completions_returns_429_when_device_queue_is_full(tmp_path):
from rolemesh_node_agent.main import create_app
cfg = _node_config(tmp_path)
cfg.max_pending_requests_per_device = 1
app = create_app(cfg)
async def fake_discover_devices():
return [DeviceRef(kind="gpu", backend="cuda", id="gpu:0")]
async def fake_get_metrics():
return [DeviceMetrics(device=DeviceRef(kind="gpu", backend="cuda", id="gpu:0"))]
app.state.cuda.discover_devices = fake_discover_devices
app.state.cuda.get_metrics = fake_get_metrics
saturated = DeviceQueue(max_pending=1)
saturated._in_flight = 1
app.state.device_queues["gpu:0"] = saturated
response = asyncio.run(
_request(
app,
"POST",
"/v1/chat/completions",
json={
"model": "planner-gguf",
"messages": [{"role": "user", "content": "hello"}],
},
)
)
assert response.status_code == 429
assert response.json()["error"]["code"] == "queue_full"
asyncio.run(app.state.http.aclose())