Still trying to get the initial commits done

This commit is contained in:
welsberr 2026-02-06 16:20:34 -05:00
parent b6599a8214
commit 8537c6ef39
18 changed files with 778 additions and 230 deletions

11
.editorconfig Normal file
View File

@ -0,0 +1,11 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true
[*.py]
indent_style = space
indent_size = 4

234
.gitignore vendored
View File

@ -1,229 +1,15 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.venv/
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# ---> Emacs
# -*- mode: gitignore; -*-
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
# Org-mode
.org-id-locations
*_archive
# flymake-mode
*_flymake.*
# eshell files
/eshell/history
/eshell/lastdir
# elpa packages
/elpa/
# reftex files
*.rel
# AUCTeX auto folder
/auto/
# cask packages
.cask/
.env.*
dist/
# Flycheck
flycheck_*.el
# server auth directory
/server/
# projectiles files
.projectile
# directory configuration
.dir-locals.el
# network security
/network-security.data
# ---> Rust
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
build/
.pytest_cache/
.ruff_cache/
.mypy_cache/
.idea/
.vscode/
state/registry.json
configs/models.yaml

20
LICENSE
View File

@ -1,9 +1,21 @@
MIT License
Copyright (c) 2026 welsberr
Copyright (c) 2026 Wesley R. Elsberry
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,3 +1,66 @@
# RoleMesh-Gateway
# RoleMesh Gateway
Software to run multiple LLMs for agentic tasks over hosts on a defined network.
RoleMesh Gateway is a lightweight **OpenAI-compatible** API gateway for routing chat-completions requests to multiple
locally hosted LLM backends (e.g., `llama.cpp` `llama-server`) **by role** (planner, writer, coder, reviewer, …).
It is designed for **agentic workflows** that benefit from using different models for different steps, and for
deployments where **different machines host different models** (e.g., GPU box for fast inference, big RAM CPU box for large models).
## What you get
- OpenAI-compatible endpoints:
- `GET /v1/models`
- `POST /v1/chat/completions` (streaming and non-streaming)
- `GET /health` and `GET /ready`
- Model registry from `configs/models.yaml`
- Optional **node registration** so remote machines can announce role backends to the gateway
- Robust proxying with **explicit httpx timeouts** (no “hang forever”)
- Structured logging with request IDs
## Quick start (proxy mode)
1. Create a venv and install:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e .
```
2. Copy the example config:
```bash
cp configs/models.example.yaml configs/models.yaml
```
3. Run the gateway:
```bash
ROLE_MESH_CONFIG=configs/models.yaml uvicorn rolemesh_gateway.main:app --host 0.0.0.0 --port 8000
```
4. Smoke test:
```bash
bash scripts/smoke_test.sh http://127.0.0.1:8000
```
## Multi-host (node registration)
If you want machines to host backends and “register” them dynamically, run a tiny node agent on each backend host
(or just call the registration endpoint from your own tooling).
- Gateway endpoint: `POST /v1/nodes/register`
- Node payload describes which **roles** it serves and the base URL to reach its OpenAI-compatible backend.
See: `docs/DEPLOYMENT.md` and `docs/CONFIG.md`.
## Status
This repository is a **preliminary scaffold**:
- Proxying to OpenAI-compatible upstreams works.
- Registration and load-selection are implemented (basic round-robin), but persistence and auth are TODOs.
## License
MIT. See `LICENSE`.

View File

@ -0,0 +1,40 @@
version: 1
default_model: writer
gateway:
host: 0.0.0.0
port: 8000
# Models may be:
# - type: proxy (static URL to an OpenAI-compatible upstream)
# - type: discovered (resolved from registered nodes by role)
models:
planner:
type: proxy
openai_model_name: planner
proxy_url: http://127.0.0.1:8011
defaults:
temperature: 0.2
writer:
type: proxy
openai_model_name: writer
proxy_url: http://127.0.0.1:8012
defaults:
temperature: 0.6
coder:
type: proxy
openai_model_name: coder
proxy_url: http://127.0.0.1:8013
defaults:
temperature: 0.2
reviewer:
# Example: use discovery instead of hardcoding a proxy_url
type: discovered
openai_model_name: reviewer
role: reviewer
strategy: round_robin
defaults:
temperature: 0.2

19
docker-compose.yml Normal file
View File

@ -0,0 +1,19 @@
services:
rolemesh-gateway:
build:
context: ..
dockerfile: docker/Dockerfile
environment:
- ROLE_MESH_CONFIG=/app/configs/models.yaml
- ROLE_MESH_REGISTRY_PATH=/app/state/registry.json
- ROLE_MESH_CONNECT_TIMEOUT_S=10
- ROLE_MESH_READ_TIMEOUT_S=600
volumes:
- ../configs:/app/configs:ro
- ../state:/app/state
# For Linux + upstreams bound to 127.0.0.1 on the host, host networking is simplest:
network_mode: host
# If you can bind upstreams to 0.0.0.0 and reach them via bridge networking,
# remove network_mode and map ports instead:
# ports:
# - "8000:8000"

12
docker/Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM python:3.11-slim
WORKDIR /app
COPY pyproject.toml README.md LICENSE /app/
COPY src /app/src
RUN pip install --no-cache-dir -e .
ENV ROLE_MESH_CONFIG=/app/configs/models.yaml
EXPOSE 8000
CMD ["uvicorn", "rolemesh_gateway.main:app", "--host", "0.0.0.0", "--port", "8000"]

40
docs/ARCHITECTURE.md Normal file
View File

@ -0,0 +1,40 @@
# Architecture
RoleMesh Gateway sits between OpenAI-compatible clients and one or more upstream backends.
## Goals
- Present a stable OpenAI-like API surface to tools (agents, IDEs, scripts)
- Route by **role**, not by a specific model binary
- Support both:
- single-host (gateway + backends on the same machine)
- multi-host (different machines serve different roles/models)
## Request flow
1. Client sends `POST /v1/chat/completions` with `model: "<role>"`
2. Gateway resolves the role via:
- `type: proxy` → fixed `proxy_url`, or
- `type: discovered` → pick from registered nodes serving that role
3. Gateway forwards the request (and streams responses if requested)
4. Gateway returns an OpenAI-like response
## Registration model
Registration is optional and minimal:
- A node announces `(node_id, base_url, roles, meta)`
- The gateway stores this in memory (and optionally in `state/registry.json`)
- For `type: discovered`, the gateway picks a node by selection strategy
This is deliberately small so you can swap it out later for something stronger:
- Consul, etcd, Redis
- mDNS discovery on LAN
- static inventory in Ansible/systemd units
## Known limitations (scaffold)
- No auth on registration or inference endpoints
- No TTL/health polling
- Round-robin selection only
These are tracked in `docs/DEPLOYMENT.md` as next steps.

64
docs/CONFIG.md Normal file
View File

@ -0,0 +1,64 @@
# Configuration
RoleMesh Gateway loads configuration from a YAML file (default: `configs/models.yaml`).
Set `ROLE_MESH_CONFIG` to override.
## Top-level schema
```yaml
version: 1
default_model: writer
gateway:
host: 0.0.0.0
port: 8000
models:
<alias>:
type: proxy | discovered
openai_model_name: <string>
...
```
- `<alias>` is what clients pass as `model` in `/v1/chat/completions`.
- `openai_model_name` is the model id returned by `/v1/models` (usually same as alias).
## Proxy models
Route to a fixed upstream (any host reachable from the gateway):
```yaml
models:
writer:
type: proxy
openai_model_name: writer
proxy_url: http://127.0.0.1:8012
defaults:
temperature: 0.6
```
## Discovered models
Route to a dynamically registered node that claims the role:
```yaml
models:
reviewer:
type: discovered
openai_model_name: reviewer
role: reviewer
strategy: round_robin
```
### Registering nodes
Nodes register to `POST /v1/nodes/register`:
```json
{
"node_id": "gpu-box-1",
"base_url": "http://10.0.0.12:8014",
"roles": ["reviewer", "planner"],
"meta": {"gpu": "Tesla P40", "notes": "llama-server on GPU0"}
}
```
Security is intentionally omitted in this scaffold — add API keys or mTLS if the gateway is exposed beyond localhost.

39
docs/DEPLOYMENT.md Normal file
View File

@ -0,0 +1,39 @@
# Deployment
This scaffold supports two patterns.
## Pattern A: Single host, proxy to localhost backends
- Run `llama-server` (or other OpenAI-compatible servers) on the host:
- planner → `http://127.0.0.1:8011`
- writer → `http://127.0.0.1:8012`
- Run gateway:
- either directly on host (recommended for simplicity), or
- in Docker with `network_mode: host` (Linux) if upstream binds to 127.0.0.1
## Pattern B: Multi-host (roles distributed across machines)
- Choose one machine to run the gateway (or run multiple gateways)
- Each backend host exposes an OpenAI-compatible server on LAN, e.g.:
- `http://10.0.0.12:8012` (writer)
- `http://10.0.0.13:8011` (planner)
- Update `proxy_url` entries to those LAN URLs, **or** use discovery:
- Set model to `type: discovered` with `role: writer`, etc.
- Each host registers itself with the gateway.
### Minimal registration call
```bash
curl -sS -X POST http://GATEWAY:8000/v1/nodes/register \
-H 'Content-Type: application/json' \
-d '{"node_id":"gpu-box-1","base_url":"http://10.0.0.12:8012","roles":["writer"]}'
```
### Hardening checklist (recommended)
- Bind gateway to localhost by default, and explicitly expose it when needed
- Add API key checking (FastAPI dependency) for:
- inference endpoints
- registration endpoint
- Add TTL and periodic health checks for registered nodes
- Consider mTLS if registration happens over untrusted networks

32
pyproject.toml Normal file
View File

@ -0,0 +1,32 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "rolemesh-gateway"
version = "0.1.0"
description = "OpenAI-compatible role-based LLM gateway with optional multi-host registration."
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
authors = [{name="Wesley R. Elsberry"}]
dependencies = [
"fastapi>=0.110",
"uvicorn[standard]>=0.27",
"httpx>=0.25",
"pydantic>=2.6",
"pyyaml>=6.0",
]
[project.optional-dependencies]
dev = [
"ruff>=0.4",
"pytest>=8.0",
]
[tool.ruff]
line-length = 100
target-version = "py310"
[tool.ruff.lint]
select = ["E","F","I","B","UP"]

20
scripts/smoke_test.sh Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -euo pipefail
BASE_URL="${1:-http://127.0.0.1:8000}"
echo "== /v1/models =="
curl -sS "$BASE_URL/v1/models" | python -m json.tool >/dev/null
echo "ok"
echo "== /health =="
curl -sS "$BASE_URL/health" | python -m json.tool >/dev/null
echo "ok"
echo "== /v1/chat/completions (non-stream) =="
curl -sS -X POST "$BASE_URL/v1/chat/completions" -H "Content-Type: application/json" -d '{
"model": "planner",
"messages": [{"role":"user","content":"Say hello in 3 words."}],
"max_tokens": 32
}' | python -m json.tool >/dev/null
echo "ok"

View File

@ -0,0 +1,2 @@
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@ -0,0 +1,147 @@
from __future__ import annotations
import os
import time
import uuid
from typing import Any, Dict
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse, StreamingResponse
from rolemesh_gateway.config import Config, DiscoveredModel, ProxyModel
from rolemesh_gateway.registry import NodeRegistration, Registry
from rolemesh_gateway.upstream import UpstreamClient, UpstreamError
router = APIRouter()
def _openai_error(message: str, code: str = "upstream_error", status_code: int = 502) -> JSONResponse:
return JSONResponse(
status_code=status_code,
content={
"error": {
"message": message,
"type": "gateway_error",
"param": None,
"code": code,
}
},
)
@router.get("/v1/models")
async def list_models(request: Request) -> Dict[str, Any]:
cfg: Config = request.app.state.cfg
registry: Registry = request.app.state.registry
data = []
for name, entry in cfg.models.items():
item = {
"id": entry.openai_model_name,
"object": "model",
"owned_by": "local",
}
if isinstance(entry, ProxyModel):
item["rolemesh"] = {"type": "proxy", "proxy_url": str(entry.proxy_url)}
else:
item["rolemesh"] = {"type": "discovered", "role": entry.role, "strategy": entry.strategy}
data.append(item)
# Expose currently registered nodes (informational)
nodes = [n.model_dump(mode="json") for n in registry.list_nodes()]
return {"object": "list", "data": data, "rolemesh": {"registered_nodes": nodes}}
@router.post("/v1/chat/completions")
async def chat_completions(request: Request) -> Any:
cfg: Config = request.app.state.cfg
upstream: UpstreamClient = request.app.state.upstream
registry: Registry = request.app.state.registry
req_id = str(uuid.uuid4())
started = time.time()
body = await request.json()
model = body.get("model") or cfg.default_model
stream = bool(body.get("stream", False))
if model not in cfg.models:
return _openai_error(f"Unknown model '{model}'. Check GET /v1/models.", code="unknown_model", status_code=400)
entry = cfg.models[model]
# Apply per-model defaults if request didn't specify those keys.
defaults = entry.defaults if hasattr(entry, "defaults") else {}
for k, v in defaults.items():
body.setdefault(k, v)
# Resolve upstream base URL
if isinstance(entry, ProxyModel):
base_url = str(entry.proxy_url).rstrip("/")
elif isinstance(entry, DiscoveredModel):
node = registry.pick_node_for_role(entry.role)
if not node:
return _openai_error(
f"No registered nodes available for role '{entry.role}'. "
f"Register a node via POST /v1/nodes/register, or use proxy mode.",
code="no_upstream",
status_code=503,
)
base_url = str(node.base_url).rstrip("/")
else:
return _openai_error("Invalid model configuration.", code="bad_config", status_code=500)
# Proxy request
try:
if not stream:
out = await upstream.chat_completions(base_url, body)
request.app.logger.info(
"chat_completions ok",
extra={"req_id": req_id, "model": model, "upstream": base_url, "ms": int(1000*(time.time()-started))},
)
return out
# streaming: passthrough bytes (SSE)
async def gen():
async for chunk in upstream.chat_completions_stream(base_url, body):
yield chunk
return StreamingResponse(gen(), media_type="text/event-stream")
except UpstreamError as e:
request.app.logger.warning(
"chat_completions upstream error",
extra={"req_id": req_id, "model": model, "upstream": base_url, "err": str(e)},
)
return _openai_error(str(e), code="upstream_error", status_code=e.status_code or 502)
@router.get("/health")
async def health() -> Dict[str, str]:
return {"status": "ok"}
@router.get("/ready")
async def ready(request: Request) -> JSONResponse:
"""
Readiness checks for presence of config and (optionally) upstreams.
For now: verifies config loads and returns 200.
"""
cfg: Config = request.app.state.cfg
return JSONResponse(status_code=200, content={"status": "ready", "default_model": cfg.default_model})
@router.post("/v1/nodes/register")
async def register_node(request: Request) -> Dict[str, Any]:
"""
Allow a remote machine to register which roles it serves.
SECURITY NOTE: This endpoint is unauthenticated in the scaffold. If the gateway is reachable by
untrusted clients, add API-key gating or mTLS.
"""
registry: Registry = request.app.state.registry
payload = await request.json()
reg = NodeRegistration.model_validate(payload)
node = registry.register(reg)
return {"status": "ok", "node": node.model_dump(mode="json")}

View File

@ -0,0 +1,50 @@
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Literal, Optional
import yaml
from pydantic import BaseModel, Field, HttpUrl
class GatewayConfig(BaseModel):
host: str = "127.0.0.1"
port: int = 8000
ModelType = Literal["proxy", "discovered"]
class ProxyModel(BaseModel):
type: Literal["proxy"] = "proxy"
openai_model_name: str
proxy_url: HttpUrl
# Optional default parameters applied if not provided in request
defaults: dict = Field(default_factory=dict)
class DiscoveredModel(BaseModel):
type: Literal["discovered"] = "discovered"
openai_model_name: str
role: str # e.g. planner / writer / coder
# Strategy for selecting among registered nodes
strategy: Literal["round_robin", "random"] = "round_robin"
defaults: dict = Field(default_factory=dict)
ModelEntry = ProxyModel | DiscoveredModel
class Config(BaseModel):
version: int = 1
default_model: str
gateway: GatewayConfig = Field(default_factory=GatewayConfig)
models: Dict[str, ModelEntry] = Field(default_factory=dict)
def load_config(path: str | Path) -> Config:
path = Path(path)
data = yaml.safe_load(path.read_text())
if not isinstance(data, dict):
raise ValueError(f"Invalid config file at {path}: expected a YAML mapping at top level.")
return Config.model_validate(data)

View File

@ -0,0 +1,55 @@
from __future__ import annotations
import logging
import os
from pathlib import Path
from fastapi import FastAPI
from rolemesh_gateway.config import load_config
from rolemesh_gateway.registry import Registry
from rolemesh_gateway.upstream import UpstreamClient
from rolemesh_gateway.api.openai import router as openai_router
def _get_logger() -> logging.Logger:
logger = logging.getLogger("rolemesh_gateway")
if logger.handlers:
return logger
handler = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())
return logger
def create_app() -> FastAPI:
app = FastAPI(title="RoleMesh Gateway", version="0.1.0")
cfg_path = os.environ.get("ROLE_MESH_CONFIG", "configs/models.yaml")
cfg = load_config(cfg_path)
registry_path = os.environ.get("ROLE_MESH_REGISTRY_PATH", "state/registry.json")
registry = Registry(persist_path=Path(registry_path))
upstream = UpstreamClient(
connect_timeout_s=float(os.environ.get("ROLE_MESH_CONNECT_TIMEOUT_S", "10")),
read_timeout_s=float(os.environ.get("ROLE_MESH_READ_TIMEOUT_S", "600")),
)
app.state.cfg = cfg
app.state.registry = registry
app.state.upstream = upstream
app.logger = _get_logger()
app.include_router(openai_router)
@app.on_event("shutdown")
async def _shutdown():
await upstream.close()
return app
app = create_app()

View File

@ -0,0 +1,96 @@
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from pydantic import BaseModel, Field, HttpUrl
class NodeRegistration(BaseModel):
node_id: str
base_url: HttpUrl # OpenAI-compatible upstream base, e.g. http://10.0.0.12:8011
roles: List[str] # roles served by this node, e.g. ["planner", "writer"]
meta: Dict[str, str] = Field(default_factory=dict)
class RegisteredNode(BaseModel):
node_id: str
base_url: HttpUrl
roles: List[str]
meta: Dict[str, str] = Field(default_factory=dict)
registered_at: float = Field(default_factory=lambda: time.time())
last_seen: float = Field(default_factory=lambda: time.time())
class Registry:
"""
Minimal in-memory registry with optional JSON persistence.
NOTE: This is intentionally simple. For real deployments you likely want:
- persistence (sqlite/redis)
- auth (API key or mTLS)
- TTL + health checks
"""
def __init__(self, persist_path: Optional[Path] = None) -> None:
self._nodes: Dict[str, RegisteredNode] = {}
self._rr_counters: Dict[str, int] = {}
self._persist_path = persist_path
if self._persist_path:
self._load()
def _load(self) -> None:
if not self._persist_path or not self._persist_path.exists():
return
try:
raw = json.loads(self._persist_path.read_text())
for node_id, node_data in raw.get("nodes", {}).items():
self._nodes[node_id] = RegisteredNode.model_validate(node_data)
self._rr_counters = dict(raw.get("rr_counters", {}))
except Exception:
# If persistence is corrupted, start empty (do not crash the gateway).
self._nodes = {}
self._rr_counters = {}
def _save(self) -> None:
if not self._persist_path:
return
payload = {
"nodes": {k: v.model_dump(mode="json") for k, v in self._nodes.items()},
"rr_counters": self._rr_counters,
}
self._persist_path.parent.mkdir(parents=True, exist_ok=True)
self._persist_path.write_text(json.dumps(payload, indent=2, sort_keys=True))
def register(self, reg: NodeRegistration) -> RegisteredNode:
node = RegisteredNode(
node_id=reg.node_id,
base_url=reg.base_url,
roles=reg.roles,
meta=reg.meta,
last_seen=time.time(),
)
self._nodes[reg.node_id] = node
self._save()
return node
def heartbeat(self, node_id: str) -> None:
n = self._nodes.get(node_id)
if n:
n.last_seen = time.time()
self._save()
def list_nodes(self) -> List[RegisteredNode]:
return list(self._nodes.values())
def pick_node_for_role(self, role: str) -> Optional[RegisteredNode]:
candidates = [n for n in self._nodes.values() if role in n.roles]
if not candidates:
return None
idx = self._rr_counters.get(role, 0) % len(candidates)
self._rr_counters[role] = idx + 1
self._save()
return candidates[idx]

View File

@ -0,0 +1,60 @@
from __future__ import annotations
import json
from typing import Any, AsyncIterator, Dict, Optional
import httpx
class UpstreamError(RuntimeError):
def __init__(self, message: str, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
def _timeout(connect_s: float, read_s: float) -> httpx.Timeout:
# httpx expects all phases; keep write/pool conservative.
return httpx.Timeout(connect=connect_s, read=read_s, write=30.0, pool=30.0)
class UpstreamClient:
def __init__(self, connect_timeout_s: float = 10.0, read_timeout_s: float = 600.0) -> None:
self._client = httpx.AsyncClient(timeout=_timeout(connect_timeout_s, read_timeout_s))
async def close(self) -> None:
await self._client.aclose()
async def get_models(self, base_url: str) -> Dict[str, Any]:
url = base_url.rstrip("/") + "/v1/models"
try:
r = await self._client.get(url)
except httpx.RequestError as e:
raise UpstreamError(f"Upstream unreachable: {e!s}") from e
if r.status_code >= 400:
raise UpstreamError(f"Upstream error {r.status_code}: {r.text}", status_code=r.status_code)
return r.json()
async def chat_completions(self, base_url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = base_url.rstrip("/") + "/v1/chat/completions"
try:
r = await self._client.post(url, json=payload)
except httpx.RequestError as e:
raise UpstreamError(f"Upstream unreachable: {e!s}") from e
if r.status_code >= 400:
raise UpstreamError(f"Upstream error {r.status_code}: {r.text}", status_code=r.status_code)
return r.json()
async def chat_completions_stream(self, base_url: str, payload: Dict[str, Any]) -> AsyncIterator[bytes]:
url = base_url.rstrip("/") + "/v1/chat/completions"
try:
async with self._client.stream("POST", url, json=payload) as r:
if r.status_code >= 400:
body = await r.aread()
raise UpstreamError(
f"Upstream error {r.status_code}: {body.decode('utf-8','replace')}",
status_code=r.status_code,
)
async for chunk in r.aiter_bytes():
yield chunk
except httpx.RequestError as e:
raise UpstreamError(f"Upstream unreachable: {e!s}") from e