Add admin endpoints for named client keys

This commit is contained in:
welsberr 2026-04-29 14:45:45 -04:00
parent 04b9bec83a
commit 960f12f92b
4 changed files with 172 additions and 5 deletions

View File

@ -55,11 +55,11 @@ python -m pytest -q tests
Expected current result at baseline: all tests pass.
Current verification result after adding the Foundation roadmap and config
profile scaffold:
Current verification result after adding the Foundation roadmap, config profile
scaffold, named client key storage, opt-in named auth, and admin key endpoints:
```text
50 passed
58 passed
```
## Known Constraints

View File

@ -92,3 +92,19 @@ def require_client_auth(request: Request) -> ClientContext:
def require_node_auth(request: Request) -> None:
cfg = request.app.state.cfg
_check_key(request, cfg.auth.node_api_keys, "X-GenieHive-Node-Key")
def require_admin_auth(request: Request) -> ClientContext:
cfg = request.app.state.cfg
if not cfg.admin_api.enabled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not found",
)
context = require_client_auth(request)
if context.auth_kind == "static" or context.role == "admin":
return context
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="admin access required",
)

View File

@ -2,15 +2,17 @@ from __future__ import annotations
import asyncio
import os
import uuid
from contextlib import asynccontextmanager, suppress
from pathlib import Path
from fastapi import Depends, FastAPI, File, Form, Request, UploadFile
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
from fastapi.responses import JSONResponse, StreamingResponse
from .auth import require_client_auth, require_node_auth
from .auth import require_admin_auth, require_client_auth, require_node_auth
from .chat import ProxyError, _prepare_chat_upstream, proxy_chat_completion, proxy_embeddings, proxy_transcription, stream_chat_completion
from .config import ControlConfig, load_config
from .keys import generate_api_key, hash_api_key
from .models import BenchmarkIngestRequest, HostHeartbeat, HostRegistration, RouteMatchRequest, RouteMatchResponse
from .probe import ServiceProber
from .roles import load_role_catalog
@ -61,6 +63,69 @@ def create_app(
async def health() -> dict[str, str]:
return {"status": "ok"}
def _public_client_key(row: dict) -> dict:
return {
key: value
for key, value in row.items()
if key != "key_hash"
}
if cfg.admin_api.enabled:
@app.post("/v1/admin/client-keys")
async def create_client_key(request: Request, _=Depends(require_admin_auth)) -> dict:
if not cfg.auth.enable_named_client_keys:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="named client keys are not enabled",
)
secret = os.environ.get(cfg.auth.key_hash_secret_env)
if not secret:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{cfg.auth.key_hash_secret_env} is required for named client keys",
)
payload = await request.json()
raw_key = generate_api_key()
key_id = payload.get("key_id") or f"ck_{uuid.uuid4().hex}"
created = request.app.state.registry.create_client_key(
key_id=key_id,
key_hash=hash_api_key(raw_key, secret=secret),
display_name=payload["display_name"],
principal_type=payload["principal_type"],
principal_ref=payload["principal_ref"],
role=payload.get("role"),
allowed_models=payload.get("allowed_models") or [],
allowed_operations=payload.get("allowed_operations") or [],
monthly_budget_cents=payload.get("monthly_budget_cents"),
monthly_token_limit=payload.get("monthly_token_limit"),
enabled=payload.get("enabled", True),
notes=payload.get("notes"),
)
return {
"status": "ok",
"api_key": raw_key,
"client_key": _public_client_key(created),
}
@app.get("/v1/admin/client-keys")
async def list_client_keys(request: Request, _=Depends(require_admin_auth)) -> dict:
rows = request.app.state.registry.list_client_keys()
return {"object": "list", "data": [_public_client_key(row) for row in rows]}
@app.post("/v1/admin/client-keys/{key_id}/disable")
async def disable_client_key(key_id: str, request: Request, _=Depends(require_admin_auth)) -> dict:
updated = request.app.state.registry.set_client_key_enabled(key_id, False)
if updated is None:
return JSONResponse(status_code=404, content={"error": "unknown_client_key", "key_id": key_id})
return {"status": "ok", "client_key": _public_client_key(updated)}
@app.post("/v1/admin/client-keys/{key_id}/enable")
async def enable_client_key(key_id: str, request: Request, _=Depends(require_admin_auth)) -> dict:
updated = request.app.state.registry.set_client_key_enabled(key_id, True)
if updated is None:
return JSONResponse(status_code=404, content={"error": "unknown_client_key", "key_id": key_id})
return {"status": "ok", "client_key": _public_client_key(updated)}
@app.post("/v1/nodes/register")
async def register_node(request: Request, _=Depends(require_node_auth)) -> dict:
payload = await request.json()

View File

@ -133,3 +133,89 @@ storage:
response = client.get("/v1/models", headers={"X-Api-Key": raw_key})
assert response.status_code == 401
def test_admin_client_key_endpoints_are_hidden_by_default() -> None:
app = create_app()
paths = {route.path for route in app.routes}
assert "/v1/admin/client-keys" not in paths
def test_admin_can_create_list_disable_and_enable_named_keys(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
db_path = tmp_path / "geniehive.sqlite3"
config_path = _write_config(
tmp_path,
f"""
auth:
client_api_keys:
- admin-static-key
enable_named_client_keys: true
admin_api:
enabled: true
storage:
sqlite_path: "{db_path}"
""",
)
app = create_app(config_path)
client = TestClient(app)
denied = client.get("/v1/admin/client-keys")
assert denied.status_code == 401
created = client.post(
"/v1/admin/client-keys",
headers={"X-Api-Key": "admin-static-key"},
json={
"key_id": "ck_created",
"display_name": "Archive Migration",
"principal_type": "person",
"principal_ref": "wesley",
"role": "developer",
"allowed_models": ["archive_migrator"],
"allowed_operations": ["chat"],
},
)
assert created.status_code == 200
created_body = created.json()
assert created_body["api_key"].startswith("gh_")
assert created_body["client_key"]["key_id"] == "ck_created"
assert "key_hash" not in created_body["client_key"]
listed = client.get(
"/v1/admin/client-keys",
headers={"X-Api-Key": "admin-static-key"},
)
assert listed.status_code == 200
assert listed.json()["data"][0]["key_id"] == "ck_created"
assert "key_hash" not in listed.json()["data"][0]
disabled = client.post(
"/v1/admin/client-keys/ck_created/disable",
headers={"X-Api-Key": "admin-static-key"},
)
assert disabled.status_code == 200
assert disabled.json()["client_key"]["enabled"] is False
named_denied = client.get(
"/v1/models",
headers={"X-Api-Key": created_body["api_key"]},
)
assert named_denied.status_code == 401
enabled = client.post(
"/v1/admin/client-keys/ck_created/enable",
headers={"X-Api-Key": "admin-static-key"},
)
assert enabled.status_code == 200
assert enabled.json()["client_key"]["enabled"] is True
named_ok = client.get(
"/v1/models",
headers={"X-Api-Key": created_body["api_key"]},
)
assert named_ok.status_code == 200