Compare commits

..

No commits in common. "960aa11d93a7b6507ed472752c0beeb64fba6690" and "2355cf8114db5a1ac4630ca22aba63c703553f70" have entirely different histories.

13 changed files with 12 additions and 2092 deletions

View File

@ -1,59 +0,0 @@
deployment_profile: "foundation_gateway"
server:
host: "127.0.0.1"
port: 8800
auth:
# Keep a break-glass/static admin key only for initial provisioning or recovery.
client_api_keys:
- "change-me-foundation-admin-key"
node_api_keys:
- "change-me-node-key"
enable_named_client_keys: true
key_hash_secret_env: "GENIEHIVE_KEY_HASH_SECRET"
audit:
enabled: true
admin_api:
enabled: true
authorization:
enforce_model_allowlists: true
enforce_operation_allowlists: true
empty_allowlist_means_no_access: true
storage:
sqlite_path: "state/geniehive.foundation.sqlite3"
roles_path: "configs/roles.foundation.archive.yaml"
routing:
health_stale_after_s: 30
default_strategy: "scored"
providers:
# Provider-backed services are optional. Keep API keys in environment variables,
# not in YAML or client scripts.
- provider_id: "openai-foundation"
provider_kind: "openai_compatible"
base_url: "https://api.openai.com"
api_key_env: "OPENAI_API_KEY"
enabled: false
- provider_id: "anthropic-foundation"
provider_kind: "anthropic_messages"
base_url: "https://api.anthropic.com"
api_key_env: "ANTHROPIC_API_KEY"
default_headers:
anthropic-version: "2023-06-01"
enabled: false
budgeting:
enabled: false
reset_day_of_month: 1
global_monthly_budget_cents: 5000
provider_monthly_budget_cents:
openai-foundation: 3000
anthropic-foundation: 3000
deny_on_unknown_cost: false

View File

@ -1,73 +0,0 @@
# Foundation Gateway Baseline
Last updated: 2026-04-29
## Repository State
- Repository: `/home/netuser/bin/geniehive`
- Baseline commit: `2355cf8114db5a1ac4630ca22aba63c703553f70`
- Branch: `main`
## Current Capability Snapshot
GenieHive is currently a local-first control plane for heterogeneous generative
AI services. It already supports:
- OpenAI-compatible `GET /v1/models`
- OpenAI-compatible `POST /v1/chat/completions`
- OpenAI-compatible `POST /v1/embeddings`
- `POST /v1/audio/transcriptions` multipart proxying
- node registration and heartbeat
- SQLite-backed hosts, services, roles, and benchmark samples
- role-based route resolution
- request policy shaping
- benchmark-informed route scoring
- optional active service health probing
- static client and node API keys
## Casual Deployment Behavior To Preserve
- `configs/control.example.yaml` loads without Foundation-specific sections.
- Static `auth.client_api_keys` authorize client requests with `X-Api-Key`.
- Static `auth.node_api_keys` authorize node requests with
`X-GenieHive-Node-Key`.
- Empty client or node key lists disable that auth check for development.
- Local model servers do not require provider credential config.
- Admin endpoints, audit logging, named keys, and budget checks are not required
for a local-only deployment.
## Current Example Ports
- Control plane default: `127.0.0.1:8800`
- Node examples commonly use localhost service endpoints for Ollama,
llama.cpp, llamafile, or vLLM.
- Recent ZeroTier test deployment used control plane binding
`172.24.50.65:8800`, node `127.0.0.1:8891`, and llama.cpp
`127.0.0.1:18091`.
## Baseline Verification
Run from the repository root:
```bash
python -m pytest -q tests
```
Expected current result at baseline: all tests pass.
Current verification result after adding the Foundation roadmap, config profile
scaffold, named client key storage, opt-in named auth, admin key endpoints, and
request audit logging, and named-key model/operation authorization:
```text
66 passed
```
## Known Constraints
- Client authentication is static-key based, not named or revocable per user.
- Request attribution is not currently persisted.
- Provider credentials are not modeled as first-class control-plane objects.
- No budget or quota enforcement exists.
- Anthropic Messages API is not natively adapted behind the OpenAI-compatible
facade.

View File

@ -1,359 +0,0 @@
# Foundation Gateway Roadmap
Last updated: 2026-04-29
## Decision
Do not fork GenieHive for the Foundation AI gateway work. Implement the feature
set as an optional hardening profile on top of the existing local-first control
plane.
The core project should continue to support casual deployment:
- local model services remain first-class
- static `client_api_keys` and `node_api_keys` remain supported
- empty key lists can still disable auth for development
- audit logging, named keys, quotas, provider accounts, and admin endpoints are
opt-in
Foundation deployments should enable stricter controls through config, role
catalogs, and operator documentation.
## Design Principle
Separate mechanism from policy.
Core GenieHive mechanisms:
- authenticate a client and attach a request identity
- route OpenAI-compatible requests through roles and services
- optionally record audit metadata without prompt or completion content
- optionally enforce model and operation scopes
- optionally route to external provider-backed services
- optionally summarize usage and enforce budgets
Foundation policy:
- who may receive a key
- what models and roles are approved
- what budgets apply
- what provider accounts are used
- how requests are reviewed before public publication
- how emergency disable and key rotation are performed
## Compatibility Contract
Every Foundation hardening change must preserve these behaviors unless a config
explicitly opts into stricter operation:
1. Existing `configs/control.example.yaml` continues to load.
2. Existing static `auth.client_api_keys` continues to authorize requests.
3. Existing node registration keys continue to work.
4. Existing role catalogs continue to route without client allowlists.
5. `GET /v1/models`, chat, embeddings, transcription, and cluster inspection
remain available in casual deployments.
6. No provider credentials are required for local-only deployment.
7. Admin endpoints are disabled unless admin authentication is configured.
## Profiles
### Casual Profile
The casual profile is the default shape of GenieHive.
Expected traits:
- local or LAN-bound control plane
- static shared client key, or no auth during isolated development
- no audit log by default
- no budget enforcement
- no provider credential store
- no admin API exposed by default
### Foundation Gateway Profile
The Foundation gateway profile is an opt-in deployment mode for managed access
to local and paid AI services.
Expected traits:
- named, revocable client credentials
- request audit log without prompt or completion content
- model and operation allowlists per key
- Foundation-owned provider account indirection
- optional budget and quota enforcement
- migration-specific role catalogs
- operator and board-readable governance documentation
## Configuration Shape
The final config shape may evolve, but the intended compatibility model is:
```yaml
deployment_profile: "casual"
auth:
client_api_keys:
- "change-me-client-key"
node_api_keys:
- "change-me-node-key"
enable_named_client_keys: false
key_hash_secret_env: "GENIEHIVE_KEY_HASH_SECRET"
audit:
enabled: false
admin_api:
enabled: false
authorization:
enforce_model_allowlists: false
enforce_operation_allowlists: false
empty_allowlist_means_no_access: true
providers: []
budgeting:
enabled: false
```
Foundation example configs can switch these flags on. Casual example configs
should stay short and understandable.
## Revised Milestones
### M0: Baseline and Compatibility Guard
Goal: record the current behavior and make compatibility explicit before adding
governance features.
Tasks:
- Add `docs/foundation_gateway_baseline.md`.
- Record current commit, test command, existing exposed ports, and supported
casual deployment behavior.
- Add or preserve tests proving `configs/control.example.yaml` still loads and
static `X-Api-Key` auth still works.
Acceptance:
- Baseline document exists.
- Current test suite passes or failures are documented.
- Compatibility contract is visible in docs.
### M1: Config Profiles and Feature Flags
Goal: introduce opt-in switches without changing runtime behavior.
Tasks:
- Add config models for `deployment_profile`, `audit`, `admin_api`,
`authorization`, `providers`, and `budgeting`.
- Keep default values equivalent to current casual behavior.
- Add a Foundation example config skeleton.
- Add tests for default values and legacy config loading.
Acceptance:
- Existing configs load unchanged.
- New config sections are accepted.
- No governance feature activates by default.
### M2: Named Client Credentials
Goal: support named, revocable API keys while keeping static keys working.
Tasks:
- Add `ClientContext` with principal metadata.
- Add API key generation, hashing, verification, and redaction helpers.
- Add a `client_keys` SQLite table.
- Add registry methods to create, list, disable, enable, and touch keys.
- Support named keys only when `auth.enable_named_client_keys` is true.
- Preserve static `auth.client_api_keys`.
Acceptance:
- Static keys still work.
- Named keys work through `X-Api-Key` when enabled.
- Disabled named keys fail.
- Raw keys are never stored.
- Request handlers can read authenticated client context.
### M3: Request Audit Log
Goal: make production requests attributable without storing prompt or completion
content.
Status: implemented for chat, embeddings, and transcription request wrappers.
Audit logging is disabled by default and enabled by `audit.enabled`. Admin audit
read endpoints are only mounted when `admin_api.enabled` is true.
Tasks:
- Add request ID generation from `X-Request-Id` or UUID.
- Add `request_audit_log` SQLite table.
- Record identity, operation, requested model, resolved service, upstream model,
provider kind, status, duration, token usage when available, estimated cost
when available, and error category.
- Add admin-only query and summary endpoints, disabled unless admin API is
enabled.
Acceptance:
- Chat, embeddings, and transcription requests create audit rows when enabled.
- Prompt and completion content are not logged.
- Failed routing and upstream errors are logged.
- Casual deployments have no audit behavior unless enabled.
### M4: Model and Operation Authorization
Goal: let Foundation keys be limited to approved roles, models, and operations.
Status: implemented for named client keys. Enforcement is controlled by
`authorization.enforce_model_allowlists` and
`authorization.enforce_operation_allowlists`. Static and development auth retain
casual-deployment behavior.
Tasks:
- Add allowed models and allowed operations to named keys.
- Enforce operation scopes only when authorization enforcement is enabled.
- Support exact model IDs and conservative glob patterns such as `local/*`,
`openai/*`, `anthropic/*`, and `role/*`.
- Prefer role IDs for migration workflows.
Acceptance:
- A chat-only key cannot call embeddings when enforcement is enabled.
- A key restricted to `archive_migrator` cannot call unrelated roles.
- Legacy static keys are unaffected unless explicitly mapped into stricter mode.
### M5: Archive Migration Profile
Goal: support TalkOrigins/SciSiteForge-style migration without direct provider
keys in migration scripts.
Tasks:
- Add `configs/roles.foundation.archive.yaml`.
- Add roles such as `archive_migrator`, `archive_metadata_extractor`,
`archive_link_reviewer`, `archive_copyeditor`, and
`archive_factcheck_assistant`.
- Add `configs/control.foundation.example.yaml`.
- Add `configs/clients/archive_migration.example.env`.
- Add a smoke script that calls `archive_migrator` through the OpenAI-compatible
facade.
Acceptance:
- A migration client only needs `GENIEHIVE_BASE_URL`, `GENIEHIVE_API_KEY`, and
`GENIEHIVE_MODEL`.
- The requested model is a role, not a provider-specific model.
- Local-only provider routing remains possible.
### M6: Provider Credential Indirection
Goal: keep paid provider credentials out of role configs, node configs, and
client scripts.
Tasks:
- Add provider config entries using environment variables first.
- Add external/provider-backed service registration without requiring node
heartbeat.
- Resolve provider headers centrally in the upstream layer.
- Keep provider credential storage optional; encrypted-at-rest credentials can
be deferred.
Acceptance:
- Provider keys are loaded from environment variables, not committed YAML.
- Provider-backed services can be routed like local services.
- Local-only deployments do not need provider sections.
### M7: Anthropic Messages Adapter
Goal: expose Anthropic models through the existing OpenAI-compatible chat facade.
Tasks:
- Add provider protocol dispatch in `UpstreamClient`.
- Transform OpenAI-shaped messages into Anthropic Messages requests.
- Transform Anthropic responses back to OpenAI-compatible chat completions.
- Reject Anthropic streaming clearly until implemented.
Acceptance:
- A chat request can route to an Anthropic-backed service.
- System messages and usage fields are mapped correctly.
- Unsupported streaming fails with a specific error.
### M8: Budget and Quota Enforcement
Goal: prevent accidental provider overspend.
Tasks:
- Add budget config with disabled default.
- Use audit summaries to calculate monthly usage.
- Add request, token, and estimated-cost limits per key, provider, and globally.
- Add configurable price maps.
Acceptance:
- Requests over configured limits are denied before upstream calls.
- Unknown-cost behavior is configurable.
- Casual deployments do not perform budget checks.
### M9: Admin CLI and Operations Docs
Goal: make managed operation scriptable and understandable.
Tasks:
- Add `geniehive-admin` CLI for create/list/disable/enable keys and usage
summaries.
- Add Foundation docs for gateway operation, provider accounts, key management,
archive migration workflow, and emergency disable.
- Document when provider-native seats are needed instead of GenieHive routing.
Acceptance:
- A new operator can provision and revoke a user key without editing SQLite.
- A board-facing control summary explains ownership, auditability, and budget
control.
### M10: Security Review
Goal: make the Foundation profile safe to expose beyond localhost.
Tasks:
- Add a security checklist covering provider keys, admin auth, content logging,
CORS, TLS/reverse proxy, backup/restore, rate limits, and emergency disable.
- Implement critical checklist items or explicitly defer with issue references.
- Keep WAN and zero-trust networking as deployment concerns unless a concrete
need appears.
Acceptance:
- Security checklist exists.
- Critical production risks have implementation or documented mitigations.
## Initial Implementation Order
1. M0: Baseline and compatibility guard.
2. M1: Config profiles and feature flags.
3. M2: Named client credentials.
4. M3: Request audit log.
5. M4: Model and operation authorization.
6. M5: Archive migration profile.
7. M6: Provider credential indirection.
8. M7: Anthropic Messages adapter.
9. M8: Budget and quota enforcement.
10. M9: Admin CLI and operations docs.
11. M10: Security review.
This order lets local-only and TalkOrigins migration pilots start before paid
provider routing and budget controls are complete.

View File

@ -1,25 +1,7 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from fnmatch import fnmatchcase
from fastapi import HTTPException, Request, status
from .keys import hash_api_key
@dataclass(frozen=True)
class ClientContext:
auth_kind: str
key_id: str | None = None
display_name: str | None = None
principal_type: str | None = None
principal_ref: str | None = None
role: str | None = None
allowed_models: tuple[str, ...] = ()
allowed_operations: tuple[str, ...] = ()
def _check_key(request: Request, allowed_keys: list[str], header_name: str) -> None:
if not allowed_keys:
@ -33,137 +15,11 @@ def _check_key(request: Request, allowed_keys: list[str], header_name: str) -> N
)
def _set_client_context(request: Request, context: ClientContext) -> None:
request.state.client_context = context
def require_client_auth(request: Request) -> ClientContext:
def require_client_auth(request: Request) -> None:
cfg = request.app.state.cfg
provided = request.headers.get("X-Api-Key")
if cfg.auth.client_api_keys and provided in cfg.auth.client_api_keys:
context = ClientContext(auth_kind="static")
_set_client_context(request, context)
return context
if cfg.auth.enable_named_client_keys:
if not provided:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="unauthorized",
)
secret = os.environ.get(cfg.auth.key_hash_secret_env)
if not secret:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{cfg.auth.key_hash_secret_env} is required for named client keys",
)
key_hash = hash_api_key(provided, secret=secret)
key_row = request.app.state.registry.get_client_key_by_hash(key_hash)
if key_row is None or not key_row["enabled"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="unauthorized",
)
request.app.state.registry.touch_client_key(key_row["key_id"])
context = ClientContext(
auth_kind="named",
key_id=key_row["key_id"],
display_name=key_row["display_name"],
principal_type=key_row["principal_type"],
principal_ref=key_row["principal_ref"],
role=key_row["role"],
allowed_models=tuple(key_row["allowed_models"]),
allowed_operations=tuple(key_row["allowed_operations"]),
)
_set_client_context(request, context)
return context
if cfg.auth.client_api_keys:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="unauthorized",
)
context = ClientContext(auth_kind="development")
_set_client_context(request, context)
return context
_check_key(request, cfg.auth.client_api_keys, "X-Api-Key")
def require_node_auth(request: Request) -> None:
cfg = request.app.state.cfg
_check_key(request, cfg.auth.node_api_keys, "X-GenieHive-Node-Key")
def require_admin_auth(request: Request) -> ClientContext:
cfg = request.app.state.cfg
if not cfg.admin_api.enabled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not found",
)
context = require_client_auth(request)
if context.auth_kind == "static" or context.role == "admin":
return context
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="admin access required",
)
def authorize_client_request(request: Request, *, operation: str, model: str | None) -> None:
cfg = request.app.state.cfg
context = getattr(request.state, "client_context", None)
if context is None:
return
# Static and development auth preserve casual-deployment behavior. Foundation
# scoped access is enforced for named keys only.
if context.auth_kind != "named":
return
if cfg.authorization.enforce_operation_allowlists:
_authorize_value(
value=operation,
allowed=context.allowed_operations,
empty_means_no_access=cfg.authorization.empty_allowlist_means_no_access,
denied_detail=f"operation '{operation}' is not allowed for this key",
)
if cfg.authorization.enforce_model_allowlists:
if not model:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="model is required for model authorization",
)
_authorize_value(
value=model,
allowed=context.allowed_models,
empty_means_no_access=cfg.authorization.empty_allowlist_means_no_access,
denied_detail=f"model '{model}' is not allowed for this key",
)
def _authorize_value(
*,
value: str,
allowed: tuple[str, ...],
empty_means_no_access: bool,
denied_detail: str,
) -> None:
if not allowed:
if empty_means_no_access:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=denied_detail,
)
return
if any(_allow_pattern_matches(pattern, value) for pattern in allowed):
return
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=denied_detail,
)
def _allow_pattern_matches(pattern: str, value: str) -> bool:
if pattern.startswith("role/"):
pattern = pattern.removeprefix("role/")
return fnmatchcase(value, pattern)

View File

@ -14,39 +14,6 @@ class ServerConfig(BaseModel):
class AuthConfig(BaseModel):
client_api_keys: list[str] = Field(default_factory=list)
node_api_keys: list[str] = Field(default_factory=list)
enable_named_client_keys: bool = False
key_hash_secret_env: str = "GENIEHIVE_KEY_HASH_SECRET"
class AuditConfig(BaseModel):
enabled: bool = False
class AdminApiConfig(BaseModel):
enabled: bool = False
class AuthorizationConfig(BaseModel):
enforce_model_allowlists: bool = False
enforce_operation_allowlists: bool = False
empty_allowlist_means_no_access: bool = True
class ProviderConfig(BaseModel):
provider_id: str
provider_kind: str
base_url: str
api_key_env: str | None = None
default_headers: dict[str, str] = Field(default_factory=dict)
enabled: bool = True
class BudgetingConfig(BaseModel):
enabled: bool = False
reset_day_of_month: int = 1
global_monthly_budget_cents: int | None = None
provider_monthly_budget_cents: dict[str, int] = Field(default_factory=dict)
deny_on_unknown_cost: bool = False
class StorageConfig(BaseModel):
@ -66,14 +33,8 @@ class RoutingConfig(BaseModel):
class ControlConfig(BaseModel):
deployment_profile: str = "casual"
server: ServerConfig = Field(default_factory=ServerConfig)
auth: AuthConfig = Field(default_factory=AuthConfig)
audit: AuditConfig = Field(default_factory=AuditConfig)
admin_api: AdminApiConfig = Field(default_factory=AdminApiConfig)
authorization: AuthorizationConfig = Field(default_factory=AuthorizationConfig)
providers: list[ProviderConfig] = Field(default_factory=list)
budgeting: BudgetingConfig = Field(default_factory=BudgetingConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
routing: RoutingConfig = Field(default_factory=RoutingConfig)
roles_path: str | None = None

View File

@ -1,39 +0,0 @@
from __future__ import annotations
import hashlib
import hmac
import secrets
DEFAULT_KEY_PREFIX = "gh"
def generate_api_key(*, prefix: str = DEFAULT_KEY_PREFIX, token_bytes: int = 32) -> str:
"""Generate a URL-safe API key. The raw value is only shown once."""
token = secrets.token_urlsafe(token_bytes)
return f"{prefix}_{token}"
def hash_api_key(api_key: str, *, secret: str) -> str:
if not secret:
raise ValueError("key hash secret must not be empty")
digest = hmac.new(
secret.encode("utf-8"),
api_key.encode("utf-8"),
hashlib.sha256,
).hexdigest()
return f"hmac-sha256:{digest}"
def verify_api_key(api_key: str, key_hash: str, *, secret: str) -> bool:
try:
expected = hash_api_key(api_key, secret=secret)
except ValueError:
return False
return hmac.compare_digest(expected, key_hash)
def redact_api_key(api_key: str) -> str:
if len(api_key) <= 12:
return "***"
return f"{api_key[:6]}...{api_key[-4:]}"

View File

@ -1,25 +1,20 @@
from __future__ import annotations
import asyncio
import json
import os
import time
import uuid
from contextlib import asynccontextmanager, suppress
from pathlib import Path
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
from fastapi import Depends, FastAPI, File, Form, Request, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from .auth import authorize_client_request, require_admin_auth, require_client_auth, require_node_auth
from .auth import require_client_auth, require_node_auth
from .chat import ProxyError, _prepare_chat_upstream, proxy_chat_completion, proxy_embeddings, proxy_transcription, stream_chat_completion
from .config import ControlConfig, load_config
from .keys import generate_api_key, hash_api_key
from .models import BenchmarkIngestRequest, HostHeartbeat, HostRegistration, RouteMatchRequest, RouteMatchResponse
from .probe import ServiceProber
from .roles import load_role_catalog
from .registry import Registry
from .routing import choose_upstream_model_id
from .upstream import UpstreamClient, UpstreamError
@ -66,184 +61,6 @@ def create_app(
async def health() -> dict[str, str]:
return {"status": "ok"}
def _public_client_key(row: dict) -> dict:
return {
key: value
for key, value in row.items()
if key != "key_hash"
}
def _request_id(request: Request) -> str:
return request.headers.get("X-Request-Id") or f"req_{uuid.uuid4().hex}"
def _client_context(request: Request):
return getattr(request.state, "client_context", None)
def _route_audit_metadata(reg: Registry, requested_model: str | None, *, kind: str) -> dict:
if not requested_model:
return {
"requested_model": None,
"resolved_service_id": None,
"resolved_host_id": None,
"upstream_model": None,
"provider_kind": None,
}
resolved = reg.resolve_route(requested_model, kind=kind)
service = resolved.get("service") if resolved else None
if not service:
return {
"requested_model": requested_model,
"resolved_service_id": None,
"resolved_host_id": None,
"upstream_model": None,
"provider_kind": None,
}
return {
"requested_model": requested_model,
"resolved_service_id": service.get("service_id"),
"resolved_host_id": service.get("host_id"),
"upstream_model": choose_upstream_model_id(requested_model, service),
"provider_kind": service.get("protocol"),
}
def _usage_from_response(response: object) -> dict[str, int | None]:
usage = response.get("usage", {}) if isinstance(response, dict) else {}
return {
"prompt_tokens": usage.get("prompt_tokens") if isinstance(usage, dict) else None,
"completion_tokens": usage.get("completion_tokens") if isinstance(usage, dict) else None,
"total_tokens": usage.get("total_tokens") if isinstance(usage, dict) else None,
}
def _audit_request(
request: Request,
*,
request_id: str,
operation: str,
route_metadata: dict,
started_at: float,
status_code: int,
success: bool,
response: object | None = None,
error_type: str | None = None,
input_bytes: int | None = None,
output_bytes: int | None = None,
) -> None:
if not cfg.audit.enabled:
return
context = _client_context(request)
usage = _usage_from_response(response)
request.app.state.registry.record_request_audit(
request_id=request_id,
key_id=getattr(context, "key_id", None),
principal_type=getattr(context, "principal_type", None),
principal_ref=getattr(context, "principal_ref", None),
operation=operation,
requested_model=route_metadata.get("requested_model"),
resolved_service_id=route_metadata.get("resolved_service_id"),
resolved_host_id=route_metadata.get("resolved_host_id"),
upstream_model=route_metadata.get("upstream_model"),
provider_kind=route_metadata.get("provider_kind"),
started_at=started_at,
finished_at=time.time(),
status_code=status_code,
success=success,
error_type=error_type,
input_bytes=input_bytes,
output_bytes=output_bytes,
**usage,
)
if cfg.admin_api.enabled:
@app.post("/v1/admin/client-keys")
async def create_client_key(request: Request, _=Depends(require_admin_auth)) -> dict:
if not cfg.auth.enable_named_client_keys:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="named client keys are not enabled",
)
secret = os.environ.get(cfg.auth.key_hash_secret_env)
if not secret:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{cfg.auth.key_hash_secret_env} is required for named client keys",
)
payload = await request.json()
raw_key = generate_api_key()
key_id = payload.get("key_id") or f"ck_{uuid.uuid4().hex}"
created = request.app.state.registry.create_client_key(
key_id=key_id,
key_hash=hash_api_key(raw_key, secret=secret),
display_name=payload["display_name"],
principal_type=payload["principal_type"],
principal_ref=payload["principal_ref"],
role=payload.get("role"),
allowed_models=payload.get("allowed_models") or [],
allowed_operations=payload.get("allowed_operations") or [],
monthly_budget_cents=payload.get("monthly_budget_cents"),
monthly_token_limit=payload.get("monthly_token_limit"),
enabled=payload.get("enabled", True),
notes=payload.get("notes"),
)
return {
"status": "ok",
"api_key": raw_key,
"client_key": _public_client_key(created),
}
@app.get("/v1/admin/client-keys")
async def list_client_keys(request: Request, _=Depends(require_admin_auth)) -> dict:
rows = request.app.state.registry.list_client_keys()
return {"object": "list", "data": [_public_client_key(row) for row in rows]}
@app.post("/v1/admin/client-keys/{key_id}/disable")
async def disable_client_key(key_id: str, request: Request, _=Depends(require_admin_auth)) -> dict:
updated = request.app.state.registry.set_client_key_enabled(key_id, False)
if updated is None:
return JSONResponse(status_code=404, content={"error": "unknown_client_key", "key_id": key_id})
return {"status": "ok", "client_key": _public_client_key(updated)}
@app.post("/v1/admin/client-keys/{key_id}/enable")
async def enable_client_key(key_id: str, request: Request, _=Depends(require_admin_auth)) -> dict:
updated = request.app.state.registry.set_client_key_enabled(key_id, True)
if updated is None:
return JSONResponse(status_code=404, content={"error": "unknown_client_key", "key_id": key_id})
return {"status": "ok", "client_key": _public_client_key(updated)}
@app.get("/v1/admin/audit/requests")
async def list_audit_requests(
request: Request,
key_id: str | None = None,
principal_ref: str | None = None,
operation: str | None = None,
model: str | None = None,
success: bool | None = None,
limit: int = 100,
_=Depends(require_admin_auth),
) -> dict:
if not cfg.audit.enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="audit logging is not enabled",
)
rows = request.app.state.registry.list_request_audit(
key_id=key_id,
principal_ref=principal_ref,
operation=operation,
model=model,
success=success,
limit=limit,
)
return {"object": "list", "data": rows}
@app.get("/v1/admin/audit/summary")
async def audit_summary(request: Request, _=Depends(require_admin_auth)) -> dict:
if not cfg.audit.enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="audit logging is not enabled",
)
return {"object": "list", "data": request.app.state.registry.request_audit_summary()}
@app.post("/v1/nodes/register")
async def register_node(request: Request, _=Depends(require_node_auth)) -> dict:
payload = await request.json()
@ -273,178 +90,45 @@ def create_app(
body = await request.json()
reg: Registry = request.app.state.registry
up: UpstreamClient = request.app.state.upstream
request_id = _request_id(request)
started_at = time.time()
route_metadata = _route_audit_metadata(reg, body.get("model"), kind="chat")
input_bytes = len(json.dumps(body, separators=(",", ":")).encode("utf-8"))
try:
authorize_client_request(request, operation="chat", model=body.get("model"))
if body.get("stream"):
# Resolve route eagerly so ProxyError is raised before streaming starts.
service, upstream_body = _prepare_chat_upstream(body, registry=reg)
_audit_request(
request,
request_id=request_id,
operation="chat",
route_metadata=route_metadata,
started_at=started_at,
status_code=200,
success=True,
input_bytes=input_bytes,
)
return StreamingResponse(
stream_chat_completion(service, upstream_body, upstream=up),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "X-Request-Id": request_id},
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
response = await proxy_chat_completion(body, registry=reg, upstream=up)
output_bytes = len(json.dumps(response, separators=(",", ":")).encode("utf-8")) if isinstance(response, dict) else None
_audit_request(
request,
request_id=request_id,
operation="chat",
route_metadata=route_metadata,
started_at=started_at,
status_code=200,
success=True,
response=response,
input_bytes=input_bytes,
output_bytes=output_bytes,
)
return JSONResponse(content=response, headers={"X-Request-Id": request_id})
return await proxy_chat_completion(body, registry=reg, upstream=up)
except ProxyError as exc:
_audit_request(
request,
request_id=request_id,
operation="chat",
route_metadata=route_metadata,
started_at=started_at,
status_code=exc.status_code,
success=False,
error_type="proxy_error",
input_bytes=input_bytes,
)
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "chat_proxy_error"}},
headers={"X-Request-Id": request_id},
)
except HTTPException as exc:
_audit_request(
request,
request_id=request_id,
operation="chat",
route_metadata=route_metadata,
started_at=started_at,
status_code=exc.status_code,
success=False,
error_type="authorization_error",
input_bytes=input_bytes,
)
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc.detail), "type": "geniehive_error", "code": "authorization_error"}},
headers={"X-Request-Id": request_id},
)
except UpstreamError as exc:
status_code = exc.status_code or 502
_audit_request(
request,
request_id=request_id,
operation="chat",
route_metadata=route_metadata,
started_at=started_at,
status_code=status_code,
success=False,
error_type="upstream_error",
input_bytes=input_bytes,
)
return JSONResponse(
status_code=status_code,
status_code=exc.status_code or 502,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}},
headers={"X-Request-Id": request_id},
)
@app.post("/v1/embeddings")
async def embeddings(request: Request, _=Depends(require_client_auth)):
body = await request.json()
reg: Registry = request.app.state.registry
request_id = _request_id(request)
started_at = time.time()
route_metadata = _route_audit_metadata(reg, body.get("model"), kind="embeddings")
input_bytes = len(json.dumps(body, separators=(",", ":")).encode("utf-8"))
try:
authorize_client_request(request, operation="embeddings", model=body.get("model"))
response = await proxy_embeddings(
return await proxy_embeddings(
body,
registry=reg,
registry=request.app.state.registry,
upstream=request.app.state.upstream,
)
output_bytes = len(json.dumps(response, separators=(",", ":")).encode("utf-8")) if isinstance(response, dict) else None
_audit_request(
request,
request_id=request_id,
operation="embeddings",
route_metadata=route_metadata,
started_at=started_at,
status_code=200,
success=True,
response=response,
input_bytes=input_bytes,
output_bytes=output_bytes,
)
return JSONResponse(content=response, headers={"X-Request-Id": request_id})
except ProxyError as exc:
_audit_request(
request,
request_id=request_id,
operation="embeddings",
route_metadata=route_metadata,
started_at=started_at,
status_code=exc.status_code,
success=False,
error_type="proxy_error",
input_bytes=input_bytes,
)
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "embeddings_proxy_error"}},
headers={"X-Request-Id": request_id},
)
except HTTPException as exc:
_audit_request(
request,
request_id=request_id,
operation="embeddings",
route_metadata=route_metadata,
started_at=started_at,
status_code=exc.status_code,
success=False,
error_type="authorization_error",
input_bytes=input_bytes,
)
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc.detail), "type": "geniehive_error", "code": "authorization_error"}},
headers={"X-Request-Id": request_id},
)
except UpstreamError as exc:
status_code = exc.status_code or 502
_audit_request(
request,
request_id=request_id,
operation="embeddings",
route_metadata=route_metadata,
started_at=started_at,
status_code=status_code,
success=False,
error_type="upstream_error",
input_bytes=input_bytes,
)
return JSONResponse(
status_code=status_code,
status_code=exc.status_code or 502,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}},
headers={"X-Request-Id": request_id},
)
@app.post("/v1/audio/transcriptions")
@ -458,12 +142,8 @@ def create_app(
temperature: float | None = Form(None),
_=Depends(require_client_auth),
):
request_id = _request_id(request)
started_at = time.time()
route_metadata = _route_audit_metadata(request.app.state.registry, model, kind="transcription")
try:
authorize_client_request(request, operation="transcription", model=model)
response = await proxy_transcription(
return await proxy_transcription(
model=model,
file=file,
language=language,
@ -473,67 +153,15 @@ def create_app(
registry=request.app.state.registry,
upstream=request.app.state.upstream,
)
output_bytes = len(json.dumps(response, separators=(",", ":")).encode("utf-8")) if isinstance(response, dict) else None
_audit_request(
request,
request_id=request_id,
operation="transcription",
route_metadata=route_metadata,
started_at=started_at,
status_code=200,
success=True,
response=response,
output_bytes=output_bytes,
)
return JSONResponse(content=response, headers={"X-Request-Id": request_id})
except ProxyError as exc:
_audit_request(
request,
request_id=request_id,
operation="transcription",
route_metadata=route_metadata,
started_at=started_at,
status_code=exc.status_code,
success=False,
error_type="proxy_error",
)
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "transcription_proxy_error"}},
headers={"X-Request-Id": request_id},
)
except HTTPException as exc:
_audit_request(
request,
request_id=request_id,
operation="transcription",
route_metadata=route_metadata,
started_at=started_at,
status_code=exc.status_code,
success=False,
error_type="authorization_error",
)
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc.detail), "type": "geniehive_error", "code": "authorization_error"}},
headers={"X-Request-Id": request_id},
)
except UpstreamError as exc:
status_code = exc.status_code or 502
_audit_request(
request,
request_id=request_id,
operation="transcription",
route_metadata=route_metadata,
started_at=started_at,
status_code=status_code,
success=False,
error_type="upstream_error",
)
return JSONResponse(
status_code=status_code,
status_code=exc.status_code or 502,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}},
headers={"X-Request-Id": request_id},
)
@app.get("/v1/cluster/services")

View File

@ -77,50 +77,6 @@ class Registry:
observed_at REAL NOT NULL,
results_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS client_keys (
key_id TEXT PRIMARY KEY,
key_hash TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
principal_type TEXT NOT NULL,
principal_ref TEXT NOT NULL,
role TEXT,
allowed_models_json TEXT NOT NULL DEFAULT '[]',
allowed_operations_json TEXT NOT NULL DEFAULT '[]',
monthly_budget_cents INTEGER,
monthly_token_limit INTEGER,
enabled INTEGER NOT NULL DEFAULT 1,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
last_used_at REAL,
notes TEXT
);
CREATE TABLE IF NOT EXISTS request_audit_log (
request_id TEXT PRIMARY KEY,
key_id TEXT,
principal_type TEXT,
principal_ref TEXT,
operation TEXT NOT NULL,
requested_model TEXT,
resolved_service_id TEXT,
resolved_host_id TEXT,
upstream_model TEXT,
provider_kind TEXT,
started_at REAL NOT NULL,
finished_at REAL NOT NULL,
duration_ms REAL NOT NULL,
status_code INTEGER NOT NULL,
success INTEGER NOT NULL,
error_type TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
estimated_cost_cents REAL,
input_bytes INTEGER,
output_bytes INTEGER,
metadata_json TEXT NOT NULL DEFAULT '{}'
);
"""
)
@ -334,227 +290,6 @@ class Registry:
rows = conn.execute(query, params).fetchall()
return [self._benchmark_row_to_dict(row) for row in rows]
def create_client_key(
self,
*,
key_id: str,
key_hash: str,
display_name: str,
principal_type: str,
principal_ref: str,
role: str | None = None,
allowed_models: list[str] | None = None,
allowed_operations: list[str] | None = None,
monthly_budget_cents: int | None = None,
monthly_token_limit: int | None = None,
enabled: bool = True,
notes: str | None = None,
) -> dict:
now = time.time()
with self._connect() as conn:
conn.execute(
"""
INSERT INTO client_keys (
key_id, key_hash, display_name, principal_type, principal_ref,
role, allowed_models_json, allowed_operations_json,
monthly_budget_cents, monthly_token_limit, enabled,
created_at, updated_at, last_used_at, notes
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, ?)
""",
(
key_id,
key_hash,
display_name,
principal_type,
principal_ref,
role,
_json_dumps(allowed_models or []),
_json_dumps(allowed_operations or []),
monthly_budget_cents,
monthly_token_limit,
1 if enabled else 0,
now,
now,
notes,
),
)
created = self.get_client_key(key_id)
if created is None:
raise RuntimeError(f"created client key {key_id!r} could not be loaded")
return created
def get_client_key(self, key_id: str) -> dict | None:
with self._connect() as conn:
row = conn.execute("SELECT * FROM client_keys WHERE key_id = ?", (key_id,)).fetchone()
return self._client_key_row_to_dict(row) if row is not None else None
def get_client_key_by_hash(self, key_hash: str) -> dict | None:
with self._connect() as conn:
row = conn.execute("SELECT * FROM client_keys WHERE key_hash = ?", (key_hash,)).fetchone()
return self._client_key_row_to_dict(row) if row is not None else None
def list_client_keys(self) -> list[dict]:
with self._connect() as conn:
rows = conn.execute("SELECT * FROM client_keys ORDER BY created_at, key_id").fetchall()
return [self._client_key_row_to_dict(row) for row in rows]
def set_client_key_enabled(self, key_id: str, enabled: bool) -> dict | None:
now = time.time()
with self._connect() as conn:
conn.execute(
"UPDATE client_keys SET enabled = ?, updated_at = ? WHERE key_id = ?",
(1 if enabled else 0, now, key_id),
)
return self.get_client_key(key_id)
def touch_client_key(self, key_id: str) -> None:
now = time.time()
with self._connect() as conn:
conn.execute(
"UPDATE client_keys SET last_used_at = ?, updated_at = ? WHERE key_id = ?",
(now, now, key_id),
)
def record_request_audit(
self,
*,
request_id: str,
key_id: str | None,
principal_type: str | None,
principal_ref: str | None,
operation: str,
requested_model: str | None,
resolved_service_id: str | None,
resolved_host_id: str | None,
upstream_model: str | None,
provider_kind: str | None,
started_at: float,
finished_at: float,
status_code: int,
success: bool,
error_type: str | None = None,
prompt_tokens: int | None = None,
completion_tokens: int | None = None,
total_tokens: int | None = None,
estimated_cost_cents: float | None = None,
input_bytes: int | None = None,
output_bytes: int | None = None,
metadata: dict | None = None,
) -> dict:
duration_ms = max(0.0, (finished_at - started_at) * 1000.0)
with self._connect() as conn:
conn.execute(
"""
INSERT INTO request_audit_log (
request_id, key_id, principal_type, principal_ref,
operation, requested_model, resolved_service_id,
resolved_host_id, upstream_model, provider_kind,
started_at, finished_at, duration_ms, status_code, success,
error_type, prompt_tokens, completion_tokens, total_tokens,
estimated_cost_cents, input_bytes, output_bytes,
metadata_json
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
request_id,
key_id,
principal_type,
principal_ref,
operation,
requested_model,
resolved_service_id,
resolved_host_id,
upstream_model,
provider_kind,
started_at,
finished_at,
duration_ms,
status_code,
1 if success else 0,
error_type,
prompt_tokens,
completion_tokens,
total_tokens,
estimated_cost_cents,
input_bytes,
output_bytes,
_json_dumps(metadata or {}),
),
)
row = self.get_request_audit(request_id)
if row is None:
raise RuntimeError(f"created audit row {request_id!r} could not be loaded")
return row
def get_request_audit(self, request_id: str) -> dict | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM request_audit_log WHERE request_id = ?",
(request_id,),
).fetchone()
return self._request_audit_row_to_dict(row) if row is not None else None
def list_request_audit(
self,
*,
key_id: str | None = None,
principal_ref: str | None = None,
operation: str | None = None,
model: str | None = None,
success: bool | None = None,
limit: int = 100,
) -> list[dict]:
query = "SELECT * FROM request_audit_log"
clauses = []
params: list[object] = []
if key_id:
clauses.append("key_id = ?")
params.append(key_id)
if principal_ref:
clauses.append("principal_ref = ?")
params.append(principal_ref)
if operation:
clauses.append("operation = ?")
params.append(operation)
if model:
clauses.append("requested_model = ?")
params.append(model)
if success is not None:
clauses.append("success = ?")
params.append(1 if success else 0)
if clauses:
query += " WHERE " + " AND ".join(clauses)
query += " ORDER BY started_at DESC LIMIT ?"
params.append(max(1, min(limit, 1000)))
with self._connect() as conn:
rows = conn.execute(query, params).fetchall()
return [self._request_audit_row_to_dict(row) for row in rows]
def request_audit_summary(self) -> list[dict]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT
key_id,
principal_ref,
operation,
requested_model,
COUNT(*) AS request_count,
SUM(success) AS success_count,
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) AS failure_count,
SUM(COALESCE(prompt_tokens, 0)) AS prompt_tokens,
SUM(COALESCE(completion_tokens, 0)) AS completion_tokens,
SUM(COALESCE(total_tokens, 0)) AS total_tokens,
SUM(COALESCE(estimated_cost_cents, 0)) AS estimated_cost_cents
FROM request_audit_log
GROUP BY key_id, principal_ref, operation, requested_model
ORDER BY request_count DESC, requested_model
"""
).fetchall()
return [dict(row) for row in rows]
def list_client_models(self) -> list[dict]:
services = self.list_services()
roles = self.list_roles()
@ -1072,54 +807,6 @@ class Registry:
"results": json.loads(row["results_json"]),
}
@staticmethod
def _client_key_row_to_dict(row: sqlite3.Row) -> dict:
return {
"key_id": row["key_id"],
"key_hash": row["key_hash"],
"display_name": row["display_name"],
"principal_type": row["principal_type"],
"principal_ref": row["principal_ref"],
"role": row["role"],
"allowed_models": json.loads(row["allowed_models_json"]),
"allowed_operations": json.loads(row["allowed_operations_json"]),
"monthly_budget_cents": row["monthly_budget_cents"],
"monthly_token_limit": row["monthly_token_limit"],
"enabled": bool(row["enabled"]),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"last_used_at": row["last_used_at"],
"notes": row["notes"],
}
@staticmethod
def _request_audit_row_to_dict(row: sqlite3.Row) -> dict:
return {
"request_id": row["request_id"],
"key_id": row["key_id"],
"principal_type": row["principal_type"],
"principal_ref": row["principal_ref"],
"operation": row["operation"],
"requested_model": row["requested_model"],
"resolved_service_id": row["resolved_service_id"],
"resolved_host_id": row["resolved_host_id"],
"upstream_model": row["upstream_model"],
"provider_kind": row["provider_kind"],
"started_at": row["started_at"],
"finished_at": row["finished_at"],
"duration_ms": row["duration_ms"],
"status_code": row["status_code"],
"success": bool(row["success"]),
"error_type": row["error_type"],
"prompt_tokens": row["prompt_tokens"],
"completion_tokens": row["completion_tokens"],
"total_tokens": row["total_tokens"],
"estimated_cost_cents": row["estimated_cost_cents"],
"input_bytes": row["input_bytes"],
"output_bytes": row["output_bytes"],
"metadata": json.loads(row["metadata_json"]),
}
def _tokenize_text(value: str) -> set[str]:
return {token for token in re.split(r"[^a-z0-9]+", value.lower()) if token}

View File

@ -1,154 +0,0 @@
import json
from pathlib import Path
from fastapi.testclient import TestClient
from geniehive_control.main import create_app
from geniehive_control.models import HostRegistration, RegisteredService
from geniehive_control.upstream import UpstreamClient
class _FakeResponse:
def __init__(self, payload: dict, status_code: int = 200) -> None:
self._payload = payload
self.status_code = status_code
self.text = str(payload)
def json(self) -> dict:
return self._payload
class _UsagePoster:
async def post(self, url: str, *, json: dict, headers: dict[str, str] | None = None) -> _FakeResponse:
return _FakeResponse(
{
"object": "chat.completion",
"model": json["model"],
"choices": [{"index": 0, "message": {"role": "assistant", "content": "done"}}],
"usage": {
"prompt_tokens": 7,
"completion_tokens": 3,
"total_tokens": 10,
},
}
)
def _write_audit_config(tmp_path: Path) -> Path:
config_path = tmp_path / "control.yaml"
config_path.write_text(
f"""
auth:
client_api_keys:
- audit-key
audit:
enabled: true
admin_api:
enabled: true
storage:
sqlite_path: "{tmp_path / 'geniehive.sqlite3'}"
"""
)
return config_path
def _register_chat_service(app) -> None:
app.state.registry.register_host(
HostRegistration(
host_id="atlas-01",
address="127.0.0.1",
services=[
RegisteredService(
service_id="atlas-01/chat/qwen",
host_id="atlas-01",
kind="chat",
protocol="openai",
endpoint="http://127.0.0.1:18091",
assets=[{"asset_id": "qwen-test", "loaded": True}],
state={"health": "healthy", "accept_requests": True},
observed={"p50_latency_ms": 100},
)
],
)
)
def test_successful_chat_request_is_audited_without_prompt_content(tmp_path: Path) -> None:
app = create_app(_write_audit_config(tmp_path), upstream_client=UpstreamClient(client=_UsagePoster()))
_register_chat_service(app)
client = TestClient(app)
response = client.post(
"/v1/chat/completions",
headers={"X-Api-Key": "audit-key", "X-Request-Id": "req-test-success"},
json={
"model": "qwen-test",
"messages": [{"role": "user", "content": "private prompt text"}],
},
)
assert response.status_code == 200
assert response.headers["x-request-id"] == "req-test-success"
row = app.state.registry.get_request_audit("req-test-success")
assert row is not None
assert row["operation"] == "chat"
assert row["requested_model"] == "qwen-test"
assert row["resolved_service_id"] == "atlas-01/chat/qwen"
assert row["upstream_model"] == "qwen-test"
assert row["provider_kind"] == "openai"
assert row["success"] is True
assert row["status_code"] == 200
assert row["prompt_tokens"] == 7
assert row["completion_tokens"] == 3
assert row["total_tokens"] == 10
assert "private prompt text" not in json.dumps(row)
def test_failed_chat_route_is_audited(tmp_path: Path) -> None:
app = create_app(_write_audit_config(tmp_path), upstream_client=UpstreamClient(client=_UsagePoster()))
client = TestClient(app)
response = client.post(
"/v1/chat/completions",
headers={"X-Api-Key": "audit-key", "X-Request-Id": "req-test-failure"},
json={
"model": "missing-model",
"messages": [{"role": "user", "content": "private failure prompt"}],
},
)
assert response.status_code == 404
assert response.headers["x-request-id"] == "req-test-failure"
row = app.state.registry.get_request_audit("req-test-failure")
assert row is not None
assert row["operation"] == "chat"
assert row["requested_model"] == "missing-model"
assert row["success"] is False
assert row["status_code"] == 404
assert row["error_type"] == "proxy_error"
assert "private failure prompt" not in json.dumps(row)
def test_admin_audit_endpoints_list_and_summarize_requests(tmp_path: Path) -> None:
app = create_app(_write_audit_config(tmp_path), upstream_client=UpstreamClient(client=_UsagePoster()))
_register_chat_service(app)
client = TestClient(app)
client.post(
"/v1/chat/completions",
headers={"X-Api-Key": "audit-key"},
json={"model": "qwen-test", "messages": [{"role": "user", "content": "hello"}]},
)
listed = client.get("/v1/admin/audit/requests", headers={"X-Api-Key": "audit-key"})
assert listed.status_code == 200
assert listed.json()["data"][0]["requested_model"] == "qwen-test"
summary = client.get("/v1/admin/audit/summary", headers={"X-Api-Key": "audit-key"})
assert summary.status_code == 200
summary_row = summary.json()["data"][0]
assert summary_row["requested_model"] == "qwen-test"
assert summary_row["request_count"] == 1
assert summary_row["success_count"] == 1
assert summary_row["total_tokens"] == 10

View File

@ -1,221 +0,0 @@
from pathlib import Path
import pytest
from fastapi import Depends, Request
from fastapi.testclient import TestClient
from geniehive_control.auth import require_client_auth
from geniehive_control.keys import hash_api_key
from geniehive_control.main import create_app
def _write_config(tmp_path: Path, body: str) -> Path:
config_path = tmp_path / "control.yaml"
config_path.write_text(body)
return config_path
def test_static_client_key_auth_still_works(tmp_path: Path) -> None:
config_path = _write_config(
tmp_path,
f"""
auth:
client_api_keys:
- static-key
storage:
sqlite_path: "{tmp_path / 'geniehive.sqlite3'}"
""",
)
app = create_app(config_path)
client = TestClient(app)
assert client.get("/v1/models").status_code == 401
ok = client.get("/v1/models", headers={"X-Api-Key": "static-key"})
assert ok.status_code == 200
def test_empty_static_keys_still_allow_development_access(tmp_path: Path) -> None:
config_path = _write_config(
tmp_path,
f"""
storage:
sqlite_path: "{tmp_path / 'geniehive.sqlite3'}"
""",
)
app = create_app(config_path)
client = TestClient(app)
response = client.get("/v1/models")
assert response.status_code == 200
def test_named_client_key_auth_when_enabled(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
db_path = tmp_path / "geniehive.sqlite3"
config_path = _write_config(
tmp_path,
f"""
auth:
enable_named_client_keys: true
storage:
sqlite_path: "{db_path}"
""",
)
app = create_app(config_path)
raw_key = "gh_test_named"
app.state.registry.create_client_key(
key_id="ck_named",
key_hash=hash_api_key(raw_key, secret="test-secret"),
display_name="Named User",
principal_type="person",
principal_ref="named-user",
role="developer",
allowed_models=["archive_migrator"],
allowed_operations=["chat"],
)
@app.get("/_test/client-context")
async def client_context(request: Request, _=Depends(require_client_auth)) -> dict:
context = request.state.client_context
return {
"auth_kind": context.auth_kind,
"key_id": context.key_id,
"principal_ref": context.principal_ref,
"allowed_models": list(context.allowed_models),
"allowed_operations": list(context.allowed_operations),
}
client = TestClient(app)
missing = client.get("/_test/client-context")
assert missing.status_code == 401
bad = client.get("/_test/client-context", headers={"X-Api-Key": "wrong"})
assert bad.status_code == 401
ok = client.get("/_test/client-context", headers={"X-Api-Key": raw_key})
assert ok.status_code == 200
assert ok.json() == {
"auth_kind": "named",
"key_id": "ck_named",
"principal_ref": "named-user",
"allowed_models": ["archive_migrator"],
"allowed_operations": ["chat"],
}
touched = app.state.registry.get_client_key("ck_named")
assert touched is not None
assert touched["last_used_at"] is not None
def test_disabled_named_client_key_fails(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
db_path = tmp_path / "geniehive.sqlite3"
config_path = _write_config(
tmp_path,
f"""
auth:
enable_named_client_keys: true
storage:
sqlite_path: "{db_path}"
""",
)
app = create_app(config_path)
raw_key = "gh_test_disabled"
app.state.registry.create_client_key(
key_id="ck_disabled",
key_hash=hash_api_key(raw_key, secret="test-secret"),
display_name="Disabled User",
principal_type="person",
principal_ref="disabled-user",
enabled=False,
)
client = TestClient(app)
response = client.get("/v1/models", headers={"X-Api-Key": raw_key})
assert response.status_code == 401
def test_admin_client_key_endpoints_are_hidden_by_default() -> None:
app = create_app()
paths = {route.path for route in app.routes}
assert "/v1/admin/client-keys" not in paths
def test_admin_can_create_list_disable_and_enable_named_keys(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
db_path = tmp_path / "geniehive.sqlite3"
config_path = _write_config(
tmp_path,
f"""
auth:
client_api_keys:
- admin-static-key
enable_named_client_keys: true
admin_api:
enabled: true
storage:
sqlite_path: "{db_path}"
""",
)
app = create_app(config_path)
client = TestClient(app)
denied = client.get("/v1/admin/client-keys")
assert denied.status_code == 401
created = client.post(
"/v1/admin/client-keys",
headers={"X-Api-Key": "admin-static-key"},
json={
"key_id": "ck_created",
"display_name": "Archive Migration",
"principal_type": "person",
"principal_ref": "wesley",
"role": "developer",
"allowed_models": ["archive_migrator"],
"allowed_operations": ["chat"],
},
)
assert created.status_code == 200
created_body = created.json()
assert created_body["api_key"].startswith("gh_")
assert created_body["client_key"]["key_id"] == "ck_created"
assert "key_hash" not in created_body["client_key"]
listed = client.get(
"/v1/admin/client-keys",
headers={"X-Api-Key": "admin-static-key"},
)
assert listed.status_code == 200
assert listed.json()["data"][0]["key_id"] == "ck_created"
assert "key_hash" not in listed.json()["data"][0]
disabled = client.post(
"/v1/admin/client-keys/ck_created/disable",
headers={"X-Api-Key": "admin-static-key"},
)
assert disabled.status_code == 200
assert disabled.json()["client_key"]["enabled"] is False
named_denied = client.get(
"/v1/models",
headers={"X-Api-Key": created_body["api_key"]},
)
assert named_denied.status_code == 401
enabled = client.post(
"/v1/admin/client-keys/ck_created/enable",
headers={"X-Api-Key": "admin-static-key"},
)
assert enabled.status_code == 200
assert enabled.json()["client_key"]["enabled"] is True
named_ok = client.get(
"/v1/models",
headers={"X-Api-Key": created_body["api_key"]},
)
assert named_ok.status_code == 200

View File

@ -1,202 +0,0 @@
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from geniehive_control.keys import hash_api_key
from geniehive_control.main import create_app
from geniehive_control.models import HostRegistration, RegisteredService
from geniehive_control.upstream import UpstreamClient
class _FakeResponse:
def __init__(self, payload: dict, status_code: int = 200) -> None:
self._payload = payload
self.status_code = status_code
self.text = str(payload)
def json(self) -> dict:
return self._payload
class _FakePoster:
async def post(self, url: str, *, json: dict, headers: dict[str, str] | None = None) -> _FakeResponse:
if url.endswith("/v1/embeddings"):
return _FakeResponse({"object": "list", "data": [{"embedding": [0.1, 0.2]}]})
return _FakeResponse({"object": "chat.completion", "model": json["model"], "choices": []})
def _write_config(tmp_path: Path, *, static_key: bool = False) -> Path:
config_path = tmp_path / "control.yaml"
static_auth = """
client_api_keys:
- static-key
""" if static_key else ""
config_path.write_text(
f"""
auth:
{static_auth} enable_named_client_keys: true
authorization:
enforce_model_allowlists: true
enforce_operation_allowlists: true
empty_allowlist_means_no_access: true
storage:
sqlite_path: "{tmp_path / 'geniehive.sqlite3'}"
"""
)
return config_path
def _register_services(app) -> None:
app.state.registry.register_host(
HostRegistration(
host_id="atlas-01",
address="127.0.0.1",
services=[
RegisteredService(
service_id="atlas-01/chat/qwen",
host_id="atlas-01",
kind="chat",
endpoint="http://127.0.0.1:18091",
assets=[{"asset_id": "archive_migrator", "loaded": True}],
state={"health": "healthy", "accept_requests": True},
observed={"p50_latency_ms": 100},
),
RegisteredService(
service_id="atlas-01/embeddings/bge",
host_id="atlas-01",
kind="embeddings",
endpoint="http://127.0.0.1:18092",
assets=[{"asset_id": "bge-small", "loaded": True}],
state={"health": "healthy", "accept_requests": True},
observed={"p50_latency_ms": 100},
),
],
)
)
def _create_named_key(
app,
raw_key: str,
*,
allowed_models: list[str],
allowed_operations: list[str],
) -> None:
app.state.registry.create_client_key(
key_id=f"ck_{raw_key}",
key_hash=hash_api_key(raw_key, secret="test-secret"),
display_name="Scoped User",
principal_type="person",
principal_ref="scoped-user",
role="developer",
allowed_models=allowed_models,
allowed_operations=allowed_operations,
)
def test_named_key_allows_scoped_chat_request(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
app = create_app(_write_config(tmp_path), upstream_client=UpstreamClient(client=_FakePoster()))
_register_services(app)
_create_named_key(
app,
"gh_allowed",
allowed_models=["archive_migrator"],
allowed_operations=["chat"],
)
client = TestClient(app)
response = client.post(
"/v1/chat/completions",
headers={"X-Api-Key": "gh_allowed"},
json={"model": "archive_migrator", "messages": [{"role": "user", "content": "hello"}]},
)
assert response.status_code == 200
def test_named_key_denies_unlisted_operation(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
app = create_app(_write_config(tmp_path), upstream_client=UpstreamClient(client=_FakePoster()))
_register_services(app)
_create_named_key(
app,
"gh_chat_only",
allowed_models=["*"],
allowed_operations=["chat"],
)
client = TestClient(app)
response = client.post(
"/v1/embeddings",
headers={"X-Api-Key": "gh_chat_only"},
json={"model": "bge-small", "input": "hello"},
)
assert response.status_code == 403
assert response.json()["error"]["code"] == "authorization_error"
def test_named_key_denies_unlisted_model(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
app = create_app(_write_config(tmp_path), upstream_client=UpstreamClient(client=_FakePoster()))
_register_services(app)
_create_named_key(
app,
"gh_archive_only",
allowed_models=["archive_migrator"],
allowed_operations=["chat"],
)
client = TestClient(app)
response = client.post(
"/v1/chat/completions",
headers={"X-Api-Key": "gh_archive_only"},
json={"model": "other_role", "messages": [{"role": "user", "content": "hello"}]},
)
assert response.status_code == 403
assert response.json()["error"]["code"] == "authorization_error"
def test_empty_allowlist_denies_when_configured(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
app = create_app(_write_config(tmp_path), upstream_client=UpstreamClient(client=_FakePoster()))
_register_services(app)
_create_named_key(
app,
"gh_empty",
allowed_models=[],
allowed_operations=[],
)
client = TestClient(app)
response = client.post(
"/v1/chat/completions",
headers={"X-Api-Key": "gh_empty"},
json={"model": "archive_migrator", "messages": [{"role": "user", "content": "hello"}]},
)
assert response.status_code == 403
def test_static_key_is_not_restricted_by_named_key_allowlists(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("GENIEHIVE_KEY_HASH_SECRET", "test-secret")
app = create_app(
_write_config(tmp_path, static_key=True),
upstream_client=UpstreamClient(client=_FakePoster()),
)
_register_services(app)
client = TestClient(app)
response = client.post(
"/v1/embeddings",
headers={"X-Api-Key": "static-key"},
json={"model": "bge-small", "input": "hello"},
)
assert response.status_code == 200

View File

@ -1,45 +0,0 @@
from pathlib import Path
from geniehive_control.config import ControlConfig, load_config
def test_default_control_config_is_casual_and_non_governed() -> None:
cfg = ControlConfig()
assert cfg.deployment_profile == "casual"
assert cfg.auth.client_api_keys == []
assert cfg.auth.node_api_keys == []
assert cfg.auth.enable_named_client_keys is False
assert cfg.audit.enabled is False
assert cfg.admin_api.enabled is False
assert cfg.authorization.enforce_model_allowlists is False
assert cfg.authorization.enforce_operation_allowlists is False
assert cfg.providers == []
assert cfg.budgeting.enabled is False
def test_legacy_control_example_loads_without_foundation_sections() -> None:
cfg = load_config(Path("configs/control.example.yaml"))
assert cfg.deployment_profile == "casual"
assert cfg.auth.client_api_keys == ["change-me-client-key"]
assert cfg.auth.node_api_keys == ["change-me-node-key"]
assert cfg.auth.enable_named_client_keys is False
assert cfg.audit.enabled is False
assert cfg.admin_api.enabled is False
assert cfg.providers == []
def test_foundation_control_example_loads_as_opt_in_profile() -> None:
cfg = load_config(Path("configs/control.foundation.example.yaml"))
assert cfg.deployment_profile == "foundation_gateway"
assert cfg.auth.enable_named_client_keys is True
assert cfg.audit.enabled is True
assert cfg.admin_api.enabled is True
assert cfg.authorization.enforce_model_allowlists is True
assert cfg.authorization.enforce_operation_allowlists is True
assert cfg.providers[0].provider_id == "openai-foundation"
assert cfg.providers[0].api_key_env == "OPENAI_API_KEY"
assert cfg.providers[1].provider_kind == "anthropic_messages"
assert cfg.budgeting.global_monthly_budget_cents == 5000

View File

@ -1,60 +0,0 @@
from pathlib import Path
from geniehive_control.keys import generate_api_key, hash_api_key, redact_api_key, verify_api_key
from geniehive_control.registry import Registry
def test_api_key_hash_verify_and_redact() -> None:
raw_key = generate_api_key(prefix="gh_test")
key_hash = hash_api_key(raw_key, secret="test-secret")
assert raw_key.startswith("gh_test_")
assert key_hash.startswith("hmac-sha256:")
assert verify_api_key(raw_key, key_hash, secret="test-secret") is True
assert verify_api_key(raw_key + "-wrong", key_hash, secret="test-secret") is False
assert verify_api_key(raw_key, key_hash, secret="other-secret") is False
assert raw_key not in redact_api_key(raw_key)
def test_registry_client_key_lifecycle(tmp_path: Path) -> None:
registry = Registry(tmp_path / "geniehive.sqlite3")
raw_key = "gh_test_secret"
key_hash = hash_api_key(raw_key, secret="test-secret")
created = registry.create_client_key(
key_id="ck_test",
key_hash=key_hash,
display_name="Test User",
principal_type="person",
principal_ref="test-user",
role="developer",
allowed_models=["archive_migrator"],
allowed_operations=["chat"],
monthly_budget_cents=1000,
monthly_token_limit=20000,
notes="created by test",
)
assert created["key_id"] == "ck_test"
assert created["key_hash"] == key_hash
assert created["display_name"] == "Test User"
assert created["allowed_models"] == ["archive_migrator"]
assert created["allowed_operations"] == ["chat"]
assert created["enabled"] is True
assert created["last_used_at"] is None
listed = registry.list_client_keys()
assert [item["key_id"] for item in listed] == ["ck_test"]
by_hash = registry.get_client_key_by_hash(key_hash)
assert by_hash is not None
assert by_hash["principal_ref"] == "test-user"
disabled = registry.set_client_key_enabled("ck_test", False)
assert disabled is not None
assert disabled["enabled"] is False
registry.touch_client_key("ck_test")
touched = registry.get_client_key("ck_test")
assert touched is not None
assert touched["last_used_at"] is not None