P1–P2 complete: routing strategies, streaming, discovery, observed metrics + role catalogs

Control plane:
- fallback_roles chain in resolve_route() with cycle protection
- round_robin and least_loaded routing strategies; default_strategy dispatches all three
- Streaming chat completions: async generator, eager route resolution, SSE reasoning-strip
- POST /v1/audio/transcriptions proxy (multipart, dedicated httpx path)
- ServiceProber background task: probes /health, falls back to /v1/models for vLLM
- ServiceObserved gains loaded_model_count and vram_used_bytes
- _runtime_signals exposes loaded_model_count to route scoring

Node agent:
- discover_protocol: "ollama"|"openai"|null per-service config field
- discovery.py: discover_ollama_assets (loaded: False), _get_ollama_ps_models helper,
  query_ollama_ps, discover_openai_models, enrich_service_assets (two-phase Ollama,
  corrects stale loaded state, populates observed metrics from /api/ps)
- Heartbeat zips service dicts with config to pass protocol; allocates discovery client
  only when needed

Tests: 47 passing (up from 19)

Role catalogs (example configs):
- roles.surgical-team.example.yaml  — Brooks/Mills surgical team (surg_ prefix, 9 roles)
- roles.belbin.example.yaml         — Belbin team roles (belbin_ prefix, 9 roles)
- roles.sixhats.example.yaml        — De Bono Six Thinking Hats (sixhats_ prefix, 6 roles)
- roles.disney.example.yaml         — Disney creative strategy (disney_ prefix, 3 roles)
- roles.xp.example.yaml             — XP team roles (xp_ prefix, 5 roles)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
welberr 2026-04-27 14:12:54 -04:00
parent b4e5a1af7d
commit e2b1000198
22 changed files with 2050 additions and 124 deletions

View File

@ -15,3 +15,4 @@ roles_path: "configs/roles.example.yaml"
routing: routing:
health_stale_after_s: 30 health_stale_after_s: 30
default_strategy: "scored" # or "round_robin" or "least_loaded"

View File

@ -15,3 +15,4 @@ roles_path: "configs/roles.example.yaml"
routing: routing:
health_stale_after_s: 30 health_stale_after_s: 30
default_strategy: "scored" # or "round_robin" or "least_loaded"

View File

@ -15,3 +15,4 @@ roles_path: "configs/roles.singlebox.p40.example.yaml"
routing: routing:
health_stale_after_s: 30 health_stale_after_s: 30
default_strategy: "scored" # or "round_robin" or "least_loaded"

View File

@ -0,0 +1,224 @@
# Belbin Team Roles catalog — Meredith Belbin, "Management Teams: Why They Succeed or Fail" (1981).
#
# Derived from years of team simulation research at Henley Management College. Belbin identified
# nine distinct contributions that effective teams need; the core insight is that a team requires
# role *diversity*, not skill duplication. Every role has characteristic strengths and allowable
# weaknesses — the weaknesses are treated as the price of the strength, not failures to fix.
#
# Role ID prefix: belbin_
#
# Fallback chains:
# belbin_resource_investigator → belbin_plant (both are divergent, possibility-oriented)
# belbin_shaper → belbin_coordinator (both drive decisions and unblock work)
# belbin_teamworker → belbin_coordinator (both manage team process)
# belbin_completer_finisher → belbin_monitor_evaluator (both are evaluative and quality-focused)
roles:
# ── Plant ─────────────────────────────────────────────────────────────────────────────────
# Creative problem-solver. Generates original ideas and approaches, often unconventional.
# Belbin's "Plant" is planted in the team to seed new thinking when the group is stuck.
# Characteristic weakness: ignores incidentals, may communicate poorly with practical members.
- role_id: "belbin_plant"
display_name: "Plant"
description: >-
Generates original ideas and novel solutions. Thinks unconventionally.
Does not self-evaluate while generating — that is another role's job.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Plant. Generate original ideas and novel solutions. Think
unconventionally — the obvious approach is rarely the one you offer. Do not
self-censor or evaluate while generating; that is someone else's role. If asked
to solve a problem, offer multiple approaches that differ in kind, not just degree.
Ignore resource constraints and prior commitments at the ideation stage.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 16384
# ── Resource Investigator ─────────────────────────────────────────────────────────────────
# Explores what already exists and can be adapted. Finds contacts, analogues, and external
# resources. Enthusiastic at the start; needs direction to maintain focus.
# Characteristic weakness: loses enthusiasm after initial excitement; can be over-optimistic.
- role_id: "belbin_resource_investigator"
display_name: "Resource Investigator"
description: >-
Finds what already exists that bears on the problem. Identifies external resources,
prior art, and analogous approaches. Falls back to belbin_plant.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Resource Investigator. Find out what already exists that is relevant
to the problem. Identify external resources, existing solutions, prior art, and
analogues from other domains. Think in terms of what can be borrowed, adapted, or
connected — not built from scratch. Prioritise breadth of discovery over depth
of analysis; the Monitor Evaluator will assess what you surface.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
fallback_roles: ["belbin_plant"]
# ── Coordinator ───────────────────────────────────────────────────────────────────────────
# Clarifies goals, organises effort, and promotes decision-making. Delegates effectively.
# Manages the process rather than the content. Confident, mature, trusts the team.
# Characteristic weakness: can be seen as manipulative; may offload personal work.
- role_id: "belbin_coordinator"
display_name: "Coordinator"
description: >-
Clarifies goals, identifies decisions that need to be made, delegates, and keeps
the work moving. Process-oriented rather than content-oriented.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Coordinator. Clarify goals, identify what each part of the work
requires, and ensure decisions get made. When given a task or competing
priorities, decompose it, assign notional responsibility, and surface the
decisions that are being avoided. Keep work moving without taking over
technical decisions — those belong to the relevant specialist.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
require_loaded: true
# ── Shaper ────────────────────────────────────────────────────────────────────────────────
# Drives the work forward. Challenges, pressures, and finds ways around obstacles.
# High energy; makes things happen when the team is stalling.
# Characteristic weakness: prone to provocation and short-temperedness.
- role_id: "belbin_shaper"
display_name: "Shaper"
description: >-
Drives momentum. Challenges assumptions, cuts through inertia, finds ways around
obstacles. Falls back to belbin_coordinator.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Shaper. Drive the work forward. Challenge assumptions, cut through
inertia, and find ways around obstacles. If something is stuck, push. If a
decision is being avoided, name it. Be direct and willing to create discomfort —
your job is momentum, not harmony. Propose a path forward even when the
information is incomplete.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 4096
fallback_roles: ["belbin_coordinator"]
# ── Monitor Evaluator ─────────────────────────────────────────────────────────────────────
# Analyses options dispassionately. Judges accurately. Slow to decide but rarely wrong.
# The team's quality filter for major decisions; immune to enthusiasm-driven errors.
# Characteristic weakness: lacks inspiration; can be overly critical and dampen morale.
- role_id: "belbin_monitor_evaluator"
display_name: "Monitor Evaluator"
description: >-
Dispassionate analysis of options. Weighs evidence without advocacy. Identifies
strengths, weaknesses, and hidden assumptions in proposals.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Monitor Evaluator. Analyse options dispassionately. Weigh evidence
without advocacy or enthusiasm. When presented with proposals or decisions,
identify the strengths, weaknesses, risks, and hidden assumptions of each option.
Do not be swayed by momentum or the energy of the proposer. Your job is accurate
judgment — a correct assessment delivered slowly is better than a pleasing one
delivered quickly.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 16384
# ── Teamworker ────────────────────────────────────────────────────────────────────────────
# Maintains team cohesion. Diplomatic, perceptive, averts friction before it escalates.
# Listens, builds, and averts. Most valuable when tension is high.
# Characteristic weakness: indecisive under pressure; avoids confrontation.
- role_id: "belbin_teamworker"
display_name: "Teamworker"
description: >-
Maintains cohesion and mutual understanding. Identifies where parties are talking
past each other and finds formulations that preserve everyone's core concern.
Falls back to belbin_coordinator.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Teamworker. Support the team and maintain cohesion. Identify where
people are talking past each other, where a position has been misunderstood, or
where tension is building unnecessarily. Find formulations and framings that
preserve everyone's core concern. Your job is to keep the collaboration
functional — not to win arguments or take sides.
routing_policy:
preferred_families: ["qwen3", "mistral", "llama3"]
min_context: 4096
require_loaded: true
fallback_roles: ["belbin_coordinator"]
# ── Implementer ───────────────────────────────────────────────────────────────────────────
# Turns strategy and plans into concrete, sequential action. Disciplined, reliable,
# efficient. Prefers established approaches. Gets things done.
# Characteristic weakness: slow to respond to new possibilities; inflexible.
- role_id: "belbin_implementer"
display_name: "Implementer"
description: >-
Turns plans and decisions into concrete, ordered steps. Disciplined and reliable.
Prefers proven approaches over novel ones.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Implementer. Turn plans and decisions into concrete, actionable
steps. When given a strategy or decision, produce the practical implementation:
what needs to be done, in what order, by what means, and with what dependencies.
Prefer established approaches over novel ones. Your job is reliable execution —
not creative reinvention of what has already been decided.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 8192
# ── Completer Finisher ────────────────────────────────────────────────────────────────────
# Painstaking attention to detail. Searches for errors and omissions. Ensures nothing
# slips through. Delivers on time. Polishes the work to the required standard.
# Characteristic weakness: reluctant to delegate; can be a perfectionist.
- role_id: "belbin_completer_finisher"
display_name: "Completer Finisher"
description: >-
Finds what others have missed. Reviews for errors, omissions, and inconsistencies.
Ensures work is actually finished, not just declared finished. Falls back to
belbin_monitor_evaluator.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Completer Finisher. Find what others have missed. Review work for
errors, omissions, inconsistencies, and ambiguities. Check that every requirement
has been addressed, every edge case considered, and every output is at the
required standard. Do not accept "good enough" — your job is to ensure the work
is actually finished, not just declared finished. Pay attention to the details
others consider beneath notice.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 16384
fallback_roles: ["belbin_monitor_evaluator"]
# ── Specialist ────────────────────────────────────────────────────────────────────────────
# Deep expert in a narrow domain. Self-starting within their area. Contributes only on
# a narrow front but that contribution is irreplaceable.
# Characteristic weakness: dwells on technicalities; overlooks the bigger picture.
- role_id: "belbin_specialist"
display_name: "Specialist"
description: >-
Provides deep, precise expertise in a specific domain. Authoritative within that
domain; explicitly bounded outside it.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Specialist. Provide deep, precise expertise in your domain. Give
authoritative answers — including nuances, exceptions, version differences, and
current best practice. Do not range beyond your expertise speculatively;
acknowledge the boundary explicitly and direct to a more appropriate source
when the question falls outside it. Depth and precision matter more than breadth.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 8192

View File

@ -0,0 +1,88 @@
# Disney Creative Strategy catalog — documented by Robert Dilts, "Strategies of Genius" (1994).
#
# Derived from observation of Walt Disney's working method. Disney reportedly separated
# creative work into three distinct modes and used different physical spaces for each,
# refusing to mix them. The gain is the same as de Bono's hats: by preventing evaluation
# from contaminating generation, and generation from contaminating planning, each phase
# can proceed without the inhibitions the other phases would impose.
#
# The natural pipeline order is: dreamer → realist → critic → (back to dreamer if needed)
#
# Role ID prefix: disney_
#
# Fallback chains:
# disney_realist → disney_critic (if no planning model, critical review is the nearest
# productive substitute — it will surface gaps)
# disney_critic → disney_realist (if no critical model, realist's concreteness exposes
# many of the same structural weaknesses)
roles:
# ── Dreamer ───────────────────────────────────────────────────────────────────────────────
# Generates ideas without constraint. No budget, no timeline, no prior commitments apply.
# Nothing is impossible in the Dreamer phase. The Dreamer's output feeds the Realist.
- role_id: "disney_dreamer"
display_name: "Dreamer"
description: >-
Generates ideas freely and without constraint. Nothing is impossible at this stage.
Evaluation is entirely suspended — that belongs to the Critic.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Dreamer. Generate ideas freely and without constraint. In this
phase there are no bad ideas, no budget limits, no technical constraints, and
no prior commitments. Explore the full space of what could be. Do not evaluate,
qualify, or hedge — that comes later. If asked whether something is possible,
assume it is and describe it fully. The Realist and Critic will deal with
feasibility; your job is vision.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
# ── Realist ───────────────────────────────────────────────────────────────────────────────
# Takes the dream and makes it practical. Defines steps, resources, timeline, and
# dependencies. Stays faithful to the dream's intent while finding a path to execution.
# The Realist does not kill ideas — they make ideas buildable.
- role_id: "disney_realist"
display_name: "Realist"
description: >-
Turns the dream into a practical plan. Defines steps, resources, timeline, and
dependencies. Faithful to the dream's intent. Falls back to disney_critic.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Realist. Take the idea and make it practical. Define the steps,
resources, timeline, and dependencies needed to realise it. Identify what needs
to be built, acquired, or learned. Stay faithful to the dream's intent — do not
silently downscope it. Where the dream is vague, make it concrete. Where it is
impractical, find the nearest practical substitute that preserves the core intent.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 8192
fallback_roles: ["disney_critic"]
# ── Critic ────────────────────────────────────────────────────────────────────────────────
# Stress-tests the plan. Finds what is missing, what will not work, and what the Dreamer
# and Realist overlooked. The Critic's goal is not to kill the idea but to make it robust
# before commitment is made.
- role_id: "disney_critic"
display_name: "Critic"
description: >-
Stress-tests the plan. Finds omissions, structural weaknesses, and unconsidered
risks. Goal is robustness, not rejection. Falls back to disney_realist.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Critic. Stress-test the plan. Find what is missing, what will not
work as described, what has been assumed without evidence, and what could derail
execution. Ask the questions the Dreamer and Realist avoided. Your goal is not
to kill the idea but to make it robust — identify the weak points specifically
so they can be addressed before commitment. For each weakness you find, note
whether it is fatal, fixable, or merely worth watching.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
fallback_roles: ["disney_realist"]

View File

@ -0,0 +1,154 @@
# Six Thinking Hats catalog — Edward de Bono, "Six Thinking Hats" (1985).
#
# Six cognitive modes used to structure deliberate thinking. The core discipline is
# separation: each hat is worn exclusively, preventing the confusion that arises when
# advocacy, critique, creativity, and fact-gathering happen simultaneously. De Bono
# reportedly modelled this on Disney's practice of using separate physical rooms for
# each mode.
#
# In LLM terms, each hat is a constrained reasoning posture enforced by the system prompt.
# A pipeline that routes the same question through white → green → yellow → black → blue
# produces more rigorous output than a single model trying to do all six at once.
#
# Role ID prefix: sixhats_
#
# Fallback chains:
# sixhats_black → sixhats_white (if no critical model, fall back to factual reporting)
# sixhats_green → sixhats_yellow (if no creative model, optimistic generation is closest)
#
# Note: sixhats_blue (process control) is the natural orchestrator of the other five.
# In an agentic pipeline, route to sixhats_blue first to plan which hats to apply.
roles:
# ── White Hat ─────────────────────────────────────────────────────────────────────────────
# Facts and data only. What is known, what is unknown, what information is missing.
# No interpretation, no preference, no evaluation.
- role_id: "sixhats_white"
display_name: "White Hat"
description: >-
Facts and data only. Reports what is known, what is unknown, and what is needed.
No interpretation or evaluation.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are wearing the White Hat. Report only facts and data. State what is known,
what is not known, and what information is missing or would be needed to proceed.
Do not interpret, evaluate, or recommend — only describe the factual landscape
as accurately as possible. If asked for an opinion, redirect to what the data
does or does not show.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
# ── Red Hat ───────────────────────────────────────────────────────────────────────────────
# Emotion, intuition, and gut reaction. No justification required or expected.
# Surfaces what logic alone cannot — the affective dimension of a decision.
- role_id: "sixhats_red"
display_name: "Red Hat"
description: >-
Emotions and intuitions without justification. Surfaces the affective dimension
of a decision that analytical thinking alone cannot capture.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are wearing the Red Hat. Express emotional responses and intuitions directly
and without justification. A gut reaction is valid here without supporting
evidence. If something feels wrong, say so. If something feels promising, say
so. Your job is to surface the affective and intuitive dimension of a question —
not to persuade, not to analyse, just to report the feeling honestly.
routing_policy:
preferred_families: ["qwen3", "mistral", "llama3"]
min_context: 4096
# ── Black Hat ─────────────────────────────────────────────────────────────────────────────
# Critical judgment and caution. Why something might fail, what the risks are,
# what assumptions are incorrect. The most valuable hat for avoiding serious errors.
- role_id: "sixhats_black"
display_name: "Black Hat"
description: >-
Critical judgment. Identifies risks, failure modes, and incorrect assumptions.
Does not balance criticism with praise — that is another hat's job.
Falls back to sixhats_white.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are wearing the Black Hat. Identify every way this could go wrong. Apply
rigorous critical judgment: find the flaws, the risks, the incorrect assumptions,
and the conditions under which this fails. Do not balance your criticism with
praise — that is the Yellow Hat's job. Do not generate alternatives — that is
the Green Hat's job. Your role is focused, rigorous caution.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
fallback_roles: ["sixhats_white"]
# ── Yellow Hat ────────────────────────────────────────────────────────────────────────────
# Optimism and value. Identifies benefits, strengths, and the conditions under which
# something works. Ensures the good in an idea is fully articulated before critique lands.
- role_id: "sixhats_yellow"
display_name: "Yellow Hat"
description: >-
Identifies value and benefits. Finds the best-case interpretation and the genuine
strengths of a proposal. Does not balance enthusiasm with caution.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are wearing the Yellow Hat. Identify the value and benefits in every
proposal. Find the best-case interpretation, the conditions under which this
succeeds, and the genuine strengths. Do not balance enthusiasm with caution —
that is the Black Hat's job. Your role is to ensure that what is good about
an idea is fully articulated and not lost in the rush to criticise.
routing_policy:
preferred_families: ["qwen3", "mistral", "llama3"]
min_context: 4096
# ── Green Hat ─────────────────────────────────────────────────────────────────────────────
# Creativity and lateral thinking. Generates alternatives, variations, and provocations.
# Evaluation is explicitly suspended — quantity and variety matter more than quality here.
- role_id: "sixhats_green"
display_name: "Green Hat"
description: >-
Creative alternatives, lateral moves, and provocations. Generates without judging.
Falls back to sixhats_yellow (optimistic generation is the nearest substitute).
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are wearing the Green Hat. Generate alternatives, variations, and creative
departures. Propose modifications, lateral moves, and unexpected angles. Do not
evaluate what you generate — suspend judgment entirely while producing. If one
direction runs dry, try another. Quantity and variety matter more than quality
at this stage; the other hats will do the selecting.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
fallback_roles: ["sixhats_yellow"]
# ── Blue Hat ──────────────────────────────────────────────────────────────────────────────
# Meta-thinking and process control. Organises what kind of thinking is needed next.
# Summarises where the group stands. Manages the sequence of hats.
# The natural orchestrator in a multi-role pipeline.
- role_id: "sixhats_blue"
display_name: "Blue Hat"
description: >-
Process control and meta-thinking. Organises which hats to apply and in what
order, summarises current state, and identifies what remains. Natural pipeline
orchestrator.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are wearing the Blue Hat. Manage the thinking process. When given a problem
or a set of inputs, identify what kind of thinking is needed next, summarise
where the group currently stands, and name what has been covered and what
remains. Your job is process clarity, not content contribution. Think about
thinking. When asked to plan an analysis, specify which hats to apply and why.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
require_loaded: true

View File

@ -0,0 +1,222 @@
# Surgical Team role catalog — F.P. Brooks Jr., "The Mythical Man-Month" (1975/1995), Chapter 3.
#
# Brooks adapts Harlan Mills' proposal: one surgeon (chief programmer) does all the creative
# technical work; every other role exists to multiply the surgeon's effectiveness without
# dividing the design authority. "Ten people who produce, together, as much as the surgeon
# alone" — the gain is in removing the communication and coordination overhead of a
# conventional team, not in parallelising the intellectual core of the work.
#
# Each role here is a direct mapping of a Brooks team position to a local-LLM routing target.
# Designed for single-box Ollama testing. See control.singlebox.example.yaml and
# node.singlebox.ollama.example.yaml for the matching infrastructure configuration.
#
# Role ID prefix: surg_
# All role IDs in this catalog use the surg_ prefix to indicate membership in the
# surgical-team conceptual group. This namespaces them from roles defined in other
# catalogs (e.g. agile_, xp_) and makes group membership visible at a glance.
#
# Fallback chains:
# surg_copilot → surg_chief_programmer
# surg_toolsmith → surg_chief_programmer
# surg_language_lawyer → surg_chief_programmer
# surg_tester → surg_copilot
# surg_editor → surg_copilot
# surg_program_clerk → surg_administrator
#
# Note: Brooks' two secretaries have no LLM analogue and are omitted.
roles:
# ── Chief Programmer (The Surgeon) ───────────────────────────────────────────────────────
# Defines the design and writes all the code. Every significant technical decision passes
# through here. Needs the most capable model available and the widest context window.
- role_id: "surg_chief_programmer"
display_name: "Chief Programmer"
description: >-
Primary design and implementation role. All creative technical decisions.
Needs maximum reasoning capability and the largest context window available.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the chief programmer. Define the design, write the code, and take full
ownership of technical decisions. Work completely — do not sketch or stub unless
explicitly asked. Reason through trade-offs before committing to an approach.
Prefer correctness and clarity over cleverness.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 32768
# ── Co-pilot ─────────────────────────────────────────────────────────────────────────────
# An intellectual peer of the surgeon who thinks alongside them: reviews everything the
# chief programmer produces, can write any part of the code, but does not make the
# primary design decisions. The surgeon's sounding board and first line of review.
- role_id: "surg_copilot"
display_name: "Co-pilot"
description: >-
Peer reviewer and backup to the chief programmer. Reviews code and design,
identifies edge cases and missed requirements. Falls back to surg_chief_programmer.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the co-pilot programmer. Review and critique what the chief programmer
produces. Think independently — do not simply validate. Name edge cases,
ambiguities, and missed requirements explicitly. When you agree, say why.
When you disagree, be specific and constructive.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "llama3"]
min_context: 16384
fallback_roles: ["surg_chief_programmer"]
# ── Toolsmith ────────────────────────────────────────────────────────────────────────────
# Builds the supporting tools, scripts, macros, and automation that the surgical team needs.
# Brooks notes the surgeon needs a good toolsmith to ensure the environment stays productive.
# Output is consumed by the team as infrastructure, not shown to end-users directly.
- role_id: "surg_toolsmith"
display_name: "Toolsmith"
description: >-
Builds team tooling: scripts, automation, build helpers, and utility libraries.
Falls back to surg_chief_programmer for code generation when no coder model is loaded.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the toolsmith. Build the scripts, automation, and utilities the team
needs to work effectively. Prioritise reliability and composability over
surface features. Your output is used by other team members as infrastructure.
When building a tool, include basic error handling and usage comments.
routing_policy:
preferred_families: ["qwen2.5-coder", "qwen3", "deepseek-coder"]
min_context: 16384
fallback_roles: ["surg_chief_programmer"]
# ── Language Lawyer ──────────────────────────────────────────────────────────────────────
# Expert in the languages and runtimes in use. Called when the team needs a precise,
# authoritative answer — not a best guess — on syntax, semantics, library behaviour,
# version differences, or obscure features. Brooks: "one per team is enough."
- role_id: "surg_language_lawyer"
display_name: "Language Lawyer"
description: >-
Authoritative source on language and runtime precision. Edge cases, semantics,
version differences. Falls back to surg_chief_programmer.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the language lawyer. Give authoritative, precise answers on language
syntax, semantics, standard library behaviour, and version differences. Always
cover edge cases and common misconceptions. Cite the specification or official
documentation where it is relevant. Do not guess or approximate — if you are
uncertain, say so explicitly.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
fallback_roles: ["surg_chief_programmer"]
# ── Tester ───────────────────────────────────────────────────────────────────────────────
# Designs test cases against the contract and then tests the system against them.
# Thinks adversarially: boundary conditions, invalid inputs, concurrency, failure modes.
# Brooks separates the tester from the surgeon to prevent the author from testing their
# own work and missing their own blind spots.
- role_id: "surg_tester"
display_name: "Tester"
description: >-
Adversarial test case generation. Probes boundaries, failure modes, and invalid inputs.
Falls back to surg_copilot.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the tester. Your job is to find failures before they reach production.
Generate test cases that cover boundary values, invalid inputs, concurrency
hazards, and error paths. Think adversarially — never assume the happy path.
For any function, interface, or system described to you, identify what can go
wrong and how you would expose it.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "llama3"]
min_context: 8192
fallback_roles: ["surg_copilot"]
# ── Editor ───────────────────────────────────────────────────────────────────────────────
# Takes the surgeon's draft documentation and improves it for clarity, structure, and
# consistency. Does not introduce new technical decisions and does not omit existing ones.
# Brooks stresses that the surgeon must write; the editor makes that writing publishable.
- role_id: "surg_editor"
display_name: "Editor"
description: >-
Documentation and prose quality. Improves clarity and structure without changing
technical content. Falls back to surg_copilot.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the editor. Improve the clarity, structure, and consistency of
documentation and written prose. Preserve the author's technical intent exactly —
do not introduce new technical decisions or silently remove existing ones.
Flag ambiguous statements. Prefer plain language over jargon where a plain
alternative exists without loss of precision.
routing_policy:
preferred_families: ["qwen3", "mistral", "llama3"]
min_context: 8192
fallback_roles: ["surg_copilot"]
# ── Program Clerk ────────────────────────────────────────────────────────────────────────
# Maintains the programming product library: source files, build artifacts, change records,
# and test logs. Brooks emphasises that the clerk is keeper of both machine-readable and
# human-readable records, freeing the surgeon from administrative record-keeping.
- role_id: "surg_program_clerk"
display_name: "Program Clerk"
description: >-
Structured record-keeping. Catalogs source, artifacts, changelogs, and test results.
Prefers machine-readable output. Falls back to surg_administrator.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the program clerk. Maintain precise, structured records of source files,
build artifacts, changelogs, and test results. When asked to catalog or organise,
produce consistent, predictably formatted output — prefer tables, lists, or JSON
over prose. Flag discrepancies, missing entries, or version mismatches explicitly.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "phi4"]
min_context: 8192
require_loaded: true
fallback_roles: ["surg_administrator"]
# ── Administrator ────────────────────────────────────────────────────────────────────────
# Handles everything outside the technical work: personnel, scheduling, priorities, and
# resource allocation. Brooks is clear that the surgeon has final say on technical
# matters; the administrator keeps all non-technical load off the surgeon's desk.
- role_id: "surg_administrator"
display_name: "Administrator"
description: >-
Logistics and coordination. Priorities, scheduling, resource allocation, status
summaries. Defers all technical decisions to the chief programmer.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the administrator. Handle logistics: priorities, scheduling, resource
allocation, and process coordination. Produce concise, actionable summaries.
Surface conflicts and blockers early. Do not make technical decisions —
flag them for the chief programmer. Keep your output brief and task-oriented.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 4096
require_loaded: true
# ── Semantic Index ───────────────────────────────────────────────────────────────────────
# Brooks does not name this role, but semantic retrieval over the product library is a
# natural complement to the program clerk in an LLM-assisted team. Provides vector
# embeddings for code, documentation, and artifact search.
- role_id: "surg_semantic_index"
display_name: "Semantic Index"
description: >-
Embeddings for semantic search over code, documentation, and artifacts.
Supporting capability for the program clerk's retrieval and cross-reference tasks.
operation: "embeddings"
modality: "text"
routing_policy:
preferred_families: ["nomic-embed-text", "mxbai-embed-large", "bge"]
require_loaded: true

View File

@ -0,0 +1,139 @@
# Extreme Programming team roles — Kent Beck, "Extreme Programming Explained" (1999).
#
# XP's team is deliberately small and each role is defined by responsibility rather than
# hierarchy. The key structural insight is the separation of the Customer (who defines
# what needs to be built and owns the acceptance criteria) from the Programmer (who
# decides how to build it). The Coach and Tracker are meta-roles: one improves how the
# team works, the other measures whether the work is on track.
#
# These roles were defined for co-located software teams but map naturally to LLM routing
# targets for code-related agentic workflows.
#
# Role ID prefix: xp_
#
# Fallback chains:
# xp_tester → xp_programmer (tester and programmer are tightly coupled in XP;
# programmer can generate test cases if no tester model)
# xp_tracker → xp_coach (both are meta-roles about the team's work, not the code)
roles:
# ── Customer ──────────────────────────────────────────────────────────────────────────────
# Defines what needs to be built and why. Writes acceptance criteria. Prioritises the
# work by business and user value. In XP the customer is a full team member, not an
# external stakeholder who reviews completed work.
- role_id: "xp_customer"
display_name: "Customer"
description: >-
Defines requirements and acceptance criteria. Prioritises by user and business
value. Owns the definition of done.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Customer. Define what needs to be built and why. Write acceptance
criteria that are specific enough to verify — describe the behaviour the
finished work must exhibit, not the implementation. Prioritise from a user
and business value perspective. When requirements are ambiguous, make them
concrete. Own the definition of done; do not delegate it.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
# ── Programmer ────────────────────────────────────────────────────────────────────────────
# Writes production code and unit tests. Estimates effort honestly. Implements the
# simplest thing that could possibly work, then refactors. In XP, the programmer also
# writes tests — the Tester focuses on acceptance tests, not unit tests.
- role_id: "xp_programmer"
display_name: "Programmer"
description: >-
Writes production code and unit tests. Estimates honestly. Implements the
simplest solution that works, then refactors. Code quality is the programmer's
direct responsibility.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Programmer. Write clean, working code with tests. Estimate effort
honestly — neither optimistically nor as a negotiating position. When
implementing a feature, write the simplest code that could possibly work, then
refactor for clarity. Do not over-engineer for hypothetical future requirements.
Unit tests are your responsibility; acceptance tests belong to the Customer
and Tester. Own the technical quality of what you produce.
routing_policy:
preferred_families: ["qwen2.5-coder", "qwen3", "deepseek-coder"]
min_context: 16384
# ── Tester ────────────────────────────────────────────────────────────────────────────────
# Helps the Customer write acceptance tests. Thinks systematically about what could go
# wrong. Finds cases the Programmer and Customer did not think of. Makes requirements
# testable and the definition of done precise.
- role_id: "xp_tester"
display_name: "Tester"
description: >-
Writes and refines acceptance tests with the Customer. Makes requirements testable
and the definition of done verifiable. Falls back to xp_programmer.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Tester. Help define and execute acceptance tests. Think
systematically about what the software must do and what could go wrong. Work
with the Customer to make requirements testable and unambiguous. Find the
cases that the Programmer and Customer did not think of — boundary values,
invalid inputs, missing error paths. Your job is to make the definition of
done precise and verifiable, not merely declared.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 8192
fallback_roles: ["xp_programmer"]
# ── Coach ─────────────────────────────────────────────────────────────────────────────────
# Understands the XP practices deeply enough to adapt them to context. Guides without
# commanding. Intervenes when practices slip — not to enforce rules but to restore
# the principles behind them.
- role_id: "xp_coach"
display_name: "Coach"
description: >-
Guides the team's process without commanding it. Understands principles deeply
enough to adapt practices to context. Intervenes when the team is stuck or
practices are slipping.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Coach. Guide the team's process without commanding it. When
practices are slipping or the team is stuck, name what is happening and suggest
a correction grounded in the underlying principle, not just the rule. Understand
the XP practices well enough to know when adapting them is appropriate and when
abandoning them is a mistake. Your job is to make the team better at working
together — not to do the work yourself.
routing_policy:
preferred_families: ["qwen3", "qwen2.5", "mistral"]
min_context: 8192
# ── Tracker ───────────────────────────────────────────────────────────────────────────────
# Monitors progress against estimates and commitments. Measures velocity. Raises alarms
# early and specifically. Does not pressure the team — surfaces facts and lets the team
# respond. Beck emphasises that the tracker asks, not tells.
- role_id: "xp_tracker"
display_name: "Tracker"
description: >-
Tracks progress against estimates and commitments. Raises alarms early and
specifically without pressure. Honest accounting, not motivation.
Falls back to xp_coach.
operation: "chat"
modality: "text"
prompt_policy:
system_prompt: >-
You are the Tracker. Monitor progress against estimates and commitments.
Measure velocity honestly. When actual progress diverges from the plan, raise
the alarm early and specifically — do not wait for the deadline, and do not
soften the numbers. Do not pressure the team; surface the facts and let the
team respond to them. Your job is honest accounting: the gap between what was
planned and what is happening, stated plainly.
routing_policy:
preferred_families: ["qwen3", "qwen2.5"]
min_context: 4096
require_loaded: true
fallback_roles: ["xp_coach"]

View File

@ -1,6 +1,6 @@
# GenieHive Roadmap # GenieHive Roadmap
Last updated: 2026-04-27 Last updated: 2026-04-27 (P0P2 complete + routing strategies + streaming + Ollama load state + observed metrics)
## What Is Complete ## What Is Complete
@ -24,11 +24,17 @@ The v1 core is implemented and tested.
- Per-asset and per-role policies, merged with role winning on prompts - Per-asset and per-role policies, merged with role winning on prompts
- Qwen3 / Qwen3.5 auto-detection with `enable_thinking: false` applied automatically - Qwen3 / Qwen3.5 auto-detection with `enable_thinking: false` applied automatically
**Client-facing proxy:**
- `POST /v1/audio/transcriptions` — proxies multipart audio to upstream; uses a
real httpx client for multipart form-data (not the injectable `AsyncPoster` Protocol)
**Route matching and scoring:** **Route matching and scoring:**
- `POST /v1/cluster/routes/match` — scored candidate list for role and service targets - `POST /v1/cluster/routes/match` — scored candidate list for role and service targets
- Signals: text overlap, preferred family, runtime (loaded state, latency, throughput, - Signals: text overlap, preferred family, runtime (loaded state, latency, throughput,
queue depth), benchmark (workload overlap, quality score) queue depth), benchmark (workload overlap, quality score)
- `GET /v1/cluster/routes/resolve` — quick single-model resolution - `GET /v1/cluster/routes/resolve` — quick single-model resolution
- `fallback_roles` chain in `resolve_route()` — walks role fallbacks with cycle
protection; each fallback resolves using its own operation (not the primary's kind)
**Benchmark infrastructure:** **Benchmark infrastructure:**
- Built-in workloads: `chat.short_reasoning`, `chat.concise_support` - Built-in workloads: `chat.short_reasoning`, `chat.concise_support`
@ -43,116 +49,89 @@ The v1 core is implemented and tested.
- Client API key (`X-Api-Key`) and node registration key (`X-GenieHive-Node-Key`) - Client API key (`X-Api-Key`) and node registration key (`X-GenieHive-Node-Key`)
- Empty key lists disable auth for development - Empty key lists disable auth for development
**Active health probing (control plane):**
- `ServiceProber` in `probe.py` probes each service's `GET /health` endpoint
- Health divergences update the registry's `state_json` without touching other fields
- Background `probe_loop` task launched at app startup when
`routing.probe_interval_s > 0` (default 0 = disabled, relies on node heartbeats)
- Configurable via `routing.probe_interval_s` and `routing.probe_timeout_s`
**Routing strategies — all three implemented:**
- `routing.default_strategy` in config; `Registry(routing_strategy=...)` dispatches
- `scored` (default): picks best-scoring service per role
- `round_robin`: cycles through healthy candidates; in-memory counter, resets on restart
- `least_loaded`: picks service with lowest `queue_depth + in_flight` from observed
metrics; falls back to latency as a secondary signal when load metrics are equal
**Streaming chat completions:**
- `UpstreamClient.chat_completions_stream()` — async generator, yields raw SSE bytes
using `httpx.AsyncClient.stream()`; raises `UpstreamError` before first yield on
non-2xx status
- `_prepare_chat_upstream()` extracted from `proxy_chat_completion` — synchronous
routing/policy step so `ProxyError` can be caught before `StreamingResponse` is created
- `stream_chat_completion()` — async generator wrapping `chat_completions_stream`,
applies `_strip_reasoning_from_sse_chunk()` to each SSE data line
- Route handler detects `body.get("stream")`, resolves route eagerly, returns
`StreamingResponse` with `Cache-Control: no-cache, X-Accel-Buffering: no`
**Upstream model discovery (node agent):**
- `discover_ollama_assets()` — queries `/api/tags`; marks all as `loaded: False`
(available, not necessarily in VRAM)
- `_get_ollama_ps_models()` — internal helper; queries `/api/ps`; returns raw model
list (with `size_in_vram` etc.) for reuse without extra HTTP requests
- `query_ollama_ps()` — public wrapper; returns frozenset of VRAM-loaded model names
- `discover_openai_models()` — queries `/v1/models`; marks all as `loaded: True`
- `enrich_service_assets(service, *, protocol)` — for `"ollama"`: two-phase query
(tags + ps); updates `loaded` state of existing static assets as well as adding
new ones; stale `loaded: True` in config gets corrected to `False` if the model
isn't in `/api/ps`; populates `observed.loaded_model_count` and
`observed.vram_used_bytes` from `/api/ps` response
- Per-service `discover_protocol: "ollama" | "openai" | null` config field
- Heartbeat zips service dicts with config objects to pass protocol correctly
- Separate httpx discovery client allocated only when any service opts in
**`ServiceObserved` extended:**
- `loaded_model_count: int | None` — number of models currently in VRAM (from Ollama `/api/ps`)
- `vram_used_bytes: int | None` — total VRAM used across loaded models
- Both exposed in `_runtime_signals` signals dict for route scoring visibility
**Tests:** **Tests:**
- Registry, chat proxy, node inventory, benchmark runner, full demo flow - Registry, chat proxy, node inventory, benchmark runner, full demo flow
- All passing - ServiceProber probe_once, update_service_health, discover_ollama_assets,
enrich_service_assets, observed metrics population — all passing (47 total)
--- ---
## Known Gaps and Issues ## Known Gaps and Issues
These are confirmed gaps in the current implementation, not aspirational items. No confirmed gaps remain in the current implementation. Improvement areas:
### 1. Transcription endpoint not implemented ### 1. Discovery covers Ollama and OpenAI-compatible; faster-whisper not covered
`POST /v1/audio/transcriptions` is listed in the architecture and wired into Transcription services (faster-whisper, WhisperX) don't expose `/api/tags` or
`main.py`, but there is no upstream proxy handler for it. `upstream.py` has no `/v1/models`. A `discover_protocol: "whisper"` variant could query
`transcriptions()` method. The endpoint currently returns nothing useful. `GET /inference/v1/models` or read a static manifest.
### 2. Routing strategy field is ignored ### 2. `architecture.md` could be tightened further
`RoutingConfig.default_strategy` exists in `config.py` (default: `"loaded_first"`), Minor: some sections inherited from earlier drafts could be simplified now that
but `resolve_route()` in `registry.py` does not read it. There is effectively only the implementation is stable.
one strategy. The field is misleading.
### 3. Role fallback chain is not implemented
`RoutingPolicy.fallback_roles` is defined in `models.py` and appears in the schema
docs, but `resolve_route()` never consults it. A role that fails to match any service
fails outright rather than trying its fallbacks.
### 4. `_benchmark_quality_score` can exceed 1.0 before clamping
`pass_rate` and `quality_score` are taken as `max()`, then `tokens_per_sec` and
`ttft_ms` are *added* on top. A service with `pass_rate=1.0`, fast tokens, and low
TTFT accumulates a score of up to 1.6 before the final `min(1.0, quality)` clamp.
This means the additive bonuses have no effect once pass_rate or quality_score is
already high, which is probably not the intended behavior.
### 5. Health is self-reported only
Service health (`healthy` / `unhealthy`) comes entirely from node-reported state.
The control plane does not probe upstream endpoints. A service can appear healthy
while its endpoint is unreachable.
### 6. No active model discovery from upstream services
The node agent scans for `.gguf` files on disk and reads static service config.
It does not query running Ollama or vLLM instances for their loaded model list.
A freshly-pulled Ollama model will not appear until the node config is updated
and the agent restarted.
### 7. `docs/architecture.md` duplicates `GENIEWARREN_SPEC.md`
`architecture.md` contains the repo-naming rationale, name alternatives, and
implementation sequence list that are only meaningful in a design/proposal context.
These are noise in a reference architecture document.
--- ---
## Immediate Next Work (Priority Order) ## Next Work
### P0 — Fix confirmed bugs 1. **Live end-to-end demo** — run control + node against a real upstream (Ollama
or llama.cpp) and validate: chat via role, direct asset addressing, Ollama
dynamic discovery with correct load state, `least_loaded` routing with real
VRAM metrics, and streaming.
1. **Remove the misleading `default_strategy` field** or implement a dispatch table 2. **Validate Codex-friendly `/v1/models` offload** — test `GET /v1/models` as
so the config field actually selects behavior. Simplest fix: delete the field and a programmatic service catalog for a Claude Code or Codex client selecting
the dead config surface until a second strategy is implemented. a GenieHive-hosted model for lower-complexity subtasks.
2. **Fix `_benchmark_quality_score`** so additive bonuses apply only when no 3. **`queue_depth` / `in_flight` from Ollama** — populate from `/api/ps` model
`pass_rate` / `quality_score` is available, or restructure as a weighted average count or from a sidecar queue tracker; currently only set from static config.
so the components don't stack additively.
### P1 — Complete stated v1 scope
3. **Implement transcription proxy** — add `upstream.transcriptions()` and wire
the handler in `chat.py` and `main.py`.
4. **Implement role fallback chain** — when `resolve_route()` finds no matching
service for a role, walk `fallback_roles` in order before failing.
### P2 — Close the most important self-reported-only gaps
5. **Add active health probing** — the control plane should periodically probe
registered service endpoints (a lightweight `GET /health` or `GET /v1/models`
is sufficient) and update health state independently of node heartbeats.
6. **Add upstream model discovery for Ollama** — query `GET /api/tags` (Ollama)
or `GET /v1/models` (OpenAI-compatible) from the node agent and merge loaded
model names into the service's asset list. This enables dynamic model tracking
without config restarts.
### P3 — Documentation cleanup
7. **Revise `architecture.md`** — remove the design-phase repo-naming rationale
and first-implementation-sequence list; replace with a description of the actual
running system (the four layers as implemented, data flow diagram if possible).
8. **Update `roadmap.md`** — this file (done).
---
## Near-Term Milestones (After P0P3)
- **Live LLM demo** — run control + node against a real upstream (Ollama or
llama.cpp) and document the end-to-end flow, including chat via role and
direct asset addressing
- **Validate Codex-friendly `/v1/models` offload** — test `GET /v1/models` as
a programmatic service catalog for a Claude Code or Codex client selecting
a GenieHive-hosted model for lower-complexity subtasks
- **Richer node metrics** — queue depth, in-flight count, and rolling performance
averages reported from node to control on every heartbeat
- **Second routing strategy** — implement `round_robin` or `least_loaded` as a
second selectable strategy, then make `default_strategy` actually dispatch
--- ---

View File

@ -1,6 +1,9 @@
from __future__ import annotations from __future__ import annotations
from typing import Any import json
from typing import Any, AsyncGenerator
from fastapi import UploadFile
from .request_policy import apply_request_policy, effective_chat_request_policy, select_target_asset from .request_policy import apply_request_policy, effective_chat_request_policy, select_target_asset
from .registry import Registry from .registry import Registry
@ -27,12 +30,35 @@ def _strip_reasoning_fields(payload: Any) -> Any:
cleaned[key] = _strip_reasoning_fields(value) cleaned[key] = _strip_reasoning_fields(value)
return cleaned return cleaned
async def proxy_chat_completion(
def _strip_reasoning_from_sse_chunk(chunk: bytes) -> bytes:
"""Strip reasoning fields from SSE chunk data lines when parseable."""
lines = chunk.split(b"\n")
out: list[bytes] = []
for line in lines:
if line.startswith(b"data: ") and not line.startswith(b"data: [DONE]"):
try:
data = json.loads(line[6:])
data = _strip_reasoning_fields(data)
out.append(b"data: " + json.dumps(data, separators=(",", ":")).encode())
except Exception:
out.append(line)
else:
out.append(line)
return b"\n".join(out)
def _prepare_chat_upstream(
body: dict[str, Any], body: dict[str, Any],
*, *,
registry: Registry, registry: Registry,
upstream: UpstreamClient, ) -> tuple[dict, dict[str, Any]]:
) -> Any: """Resolve chat route and build the upstream request body.
Returns ``(service, upstream_body)``. Raises :class:`ProxyError` if routing
fails. This function is synchronous it performs only registry look-ups and
dict manipulation, no I/O.
"""
requested_model = body.get("model") requested_model = body.get("model")
if not requested_model: if not requested_model:
raise ProxyError("Missing 'model' in request body.", status_code=400) raise ProxyError("Missing 'model' in request body.", status_code=400)
@ -53,13 +79,33 @@ async def proxy_chat_completion(
role=role, role=role,
asset=asset, asset=asset,
) )
upstream_body = apply_request_policy(dict(body), combined_policy) upstream_body = apply_request_policy(dict(body), combined_policy)
upstream_body["model"] = choose_upstream_model_id(requested_model, service) upstream_body["model"] = choose_upstream_model_id(requested_model, service)
return service, upstream_body
async def proxy_chat_completion(
body: dict[str, Any],
*,
registry: Registry,
upstream: UpstreamClient,
) -> Any:
service, upstream_body = _prepare_chat_upstream(body, registry=registry)
response = await upstream.chat_completions(service["endpoint"], upstream_body) response = await upstream.chat_completions(service["endpoint"], upstream_body)
return _strip_reasoning_fields(response) return _strip_reasoning_fields(response)
async def stream_chat_completion(
service: dict,
upstream_body: dict[str, Any],
*,
upstream: UpstreamClient,
) -> AsyncGenerator[bytes, None]:
"""Yield SSE bytes from upstream, stripping reasoning fields from each chunk."""
async for chunk in upstream.chat_completions_stream(service["endpoint"], upstream_body):
yield _strip_reasoning_from_sse_chunk(chunk)
async def proxy_embeddings( async def proxy_embeddings(
body: dict[str, Any], body: dict[str, Any],
*, *,
@ -81,3 +127,42 @@ async def proxy_embeddings(
upstream_body = dict(body) upstream_body = dict(body)
upstream_body["model"] = choose_upstream_model_id(requested_model, service) upstream_body["model"] = choose_upstream_model_id(requested_model, service)
return await upstream.embeddings(service["endpoint"], upstream_body) return await upstream.embeddings(service["endpoint"], upstream_body)
async def proxy_transcription(
*,
model: str,
file: UploadFile,
language: str | None = None,
prompt: str | None = None,
response_format: str | None = None,
temperature: float | None = None,
registry: Registry,
upstream: UpstreamClient,
) -> Any:
resolved = registry.resolve_route(model, kind="transcription")
if resolved is None:
raise ProxyError(f"Unknown model or role '{model}'.", status_code=404)
service = resolved.get("service")
if service is None:
raise ProxyError(f"No healthy transcription target available for '{model}'.", status_code=503)
file_content = await file.read()
form_data: dict[str, str] = {"model": choose_upstream_model_id(model, service)}
if language is not None:
form_data["language"] = language
if prompt is not None:
form_data["prompt"] = prompt
if response_format is not None:
form_data["response_format"] = response_format
if temperature is not None:
form_data["temperature"] = str(temperature)
return await upstream.transcriptions(
service["endpoint"],
file_content=file_content,
file_name=file.filename or "audio",
file_content_type=file.content_type or "application/octet-stream",
form_data=form_data,
)

View File

@ -22,6 +22,14 @@ class StorageConfig(BaseModel):
class RoutingConfig(BaseModel): class RoutingConfig(BaseModel):
health_stale_after_s: float = 30.0 health_stale_after_s: float = 30.0
# "scored" — pick best-scoring service per role (default)
# "round_robin" — cycle through healthy services in order
# "least_loaded" — prefer services with lowest queue_depth + in_flight
default_strategy: str = "scored"
# Set to a positive value (seconds) to enable active service health probing.
# 0.0 (default) disables probing; the control plane relies solely on node heartbeats.
probe_interval_s: float = 0.0
probe_timeout_s: float = 5.0
class ControlConfig(BaseModel): class ControlConfig(BaseModel):

View File

@ -1,15 +1,18 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import os import os
from contextlib import asynccontextmanager, suppress
from pathlib import Path from pathlib import Path
from fastapi import Depends, FastAPI, Request from fastapi import Depends, FastAPI, File, Form, Request, UploadFile
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse, StreamingResponse
from .auth import require_client_auth, require_node_auth from .auth import require_client_auth, require_node_auth
from .chat import ProxyError, proxy_chat_completion, proxy_embeddings from .chat import ProxyError, _prepare_chat_upstream, proxy_chat_completion, proxy_embeddings, proxy_transcription, stream_chat_completion
from .config import ControlConfig, load_config from .config import ControlConfig, load_config
from .models import BenchmarkIngestRequest, HostHeartbeat, HostRegistration, RouteMatchRequest, RouteMatchResponse from .models import BenchmarkIngestRequest, HostHeartbeat, HostRegistration, RouteMatchRequest, RouteMatchResponse
from .probe import ServiceProber
from .roles import load_role_catalog from .roles import load_role_catalog
from .registry import Registry from .registry import Registry
from .upstream import UpstreamClient, UpstreamError from .upstream import UpstreamClient, UpstreamError
@ -22,13 +25,34 @@ def create_app(
) -> FastAPI: ) -> FastAPI:
cfg_path = config_path or os.environ.get("GENIEHIVE_CONTROL_CONFIG") cfg_path = config_path or os.environ.get("GENIEHIVE_CONTROL_CONFIG")
cfg = load_config(cfg_path) if cfg_path else ControlConfig() cfg = load_config(cfg_path) if cfg_path else ControlConfig()
registry = Registry(cfg.storage.sqlite_path) registry = Registry(cfg.storage.sqlite_path, routing_strategy=cfg.routing.default_strategy)
roles_path = cfg.roles_path or os.environ.get("GENIEHIVE_ROLES_CONFIG") roles_path = cfg.roles_path or os.environ.get("GENIEHIVE_ROLES_CONFIG")
if roles_path: if roles_path:
registry.upsert_roles(load_role_catalog(roles_path).roles) registry.upsert_roles(load_role_catalog(roles_path).roles)
upstream = upstream_client or UpstreamClient() upstream = upstream_client or UpstreamClient()
app = FastAPI(title="GenieHive Control", version="0.1.0") @asynccontextmanager
async def lifespan(app: FastAPI):
probe_task: asyncio.Task | None = None
prober: ServiceProber | None = None
stop_event = asyncio.Event()
if cfg.routing.probe_interval_s > 0:
prober = ServiceProber(registry, timeout_s=cfg.routing.probe_timeout_s)
probe_task = asyncio.create_task(
prober.probe_loop(stop_event, cfg.routing.probe_interval_s)
)
try:
yield
finally:
if probe_task is not None:
stop_event.set()
probe_task.cancel()
with suppress(asyncio.CancelledError):
await probe_task
if prober is not None:
await prober.aclose()
app = FastAPI(title="GenieHive Control", version="0.1.0", lifespan=lifespan)
app.state.cfg = cfg app.state.cfg = cfg
app.state.registry = registry app.state.registry = registry
app.state.upstream = upstream app.state.upstream = upstream
@ -64,12 +88,18 @@ def create_app(
@app.post("/v1/chat/completions") @app.post("/v1/chat/completions")
async def chat_completions(request: Request, _=Depends(require_client_auth)): async def chat_completions(request: Request, _=Depends(require_client_auth)):
body = await request.json() body = await request.json()
reg: Registry = request.app.state.registry
up: UpstreamClient = request.app.state.upstream
try: try:
return await proxy_chat_completion( if body.get("stream"):
body, # Resolve route eagerly so ProxyError is raised before streaming starts.
registry=request.app.state.registry, service, upstream_body = _prepare_chat_upstream(body, registry=reg)
upstream=request.app.state.upstream, return StreamingResponse(
) stream_chat_completion(service, upstream_body, upstream=up),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
return await proxy_chat_completion(body, registry=reg, upstream=up)
except ProxyError as exc: except ProxyError as exc:
return JSONResponse( return JSONResponse(
status_code=exc.status_code, status_code=exc.status_code,
@ -101,6 +131,39 @@ def create_app(
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}}, content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}},
) )
@app.post("/v1/audio/transcriptions")
async def audio_transcriptions(
request: Request,
file: UploadFile = File(...),
model: str = Form(...),
language: str | None = Form(None),
prompt: str | None = Form(None),
response_format: str | None = Form(None),
temperature: float | None = Form(None),
_=Depends(require_client_auth),
):
try:
return await proxy_transcription(
model=model,
file=file,
language=language,
prompt=prompt,
response_format=response_format,
temperature=temperature,
registry=request.app.state.registry,
upstream=request.app.state.upstream,
)
except ProxyError as exc:
return JSONResponse(
status_code=exc.status_code,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "transcription_proxy_error"}},
)
except UpstreamError as exc:
return JSONResponse(
status_code=exc.status_code or 502,
content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}},
)
@app.get("/v1/cluster/services") @app.get("/v1/cluster/services")
async def list_services(request: Request, _=Depends(require_client_auth)) -> dict: async def list_services(request: Request, _=Depends(require_client_auth)) -> dict:
return {"object": "list", "data": request.app.state.registry.list_services()} return {"object": "list", "data": request.app.state.registry.list_services()}

View File

@ -34,6 +34,8 @@ class ServiceObserved(BaseModel):
tokens_per_sec: float | None = None tokens_per_sec: float | None = None
queue_depth: int | None = None queue_depth: int | None = None
in_flight: int | None = None in_flight: int | None = None
loaded_model_count: int | None = None
vram_used_bytes: int | None = None
class RegisteredService(BaseModel): class RegisteredService(BaseModel):

View File

@ -0,0 +1,59 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import httpx
from .registry import Registry
class ServiceProber:
"""Periodically probes registered service endpoints and updates health state."""
def __init__(self, registry: Registry, *, timeout_s: float = 5.0) -> None:
self._registry = registry
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=timeout_s, read=timeout_s, write=timeout_s, pool=timeout_s)
)
async def probe_once(self) -> dict[str, str]:
"""Probe all registered services. Returns mapping of service_id → observed health."""
services = self._registry.list_services()
results: dict[str, str] = {}
for service in services:
health = await self._probe_service(service)
current = service["state"].get("health")
if health != current:
self._registry.update_service_health(service["service_id"], health)
results[service["service_id"]] = health
return results
async def _probe_service(self, service: dict) -> str:
endpoint = service.get("endpoint", "")
if not endpoint:
return service["state"].get("health") or "unknown"
try:
response = await self._client.get(endpoint.rstrip("/") + "/health")
if response.status_code < 400:
return "healthy"
if response.status_code in (404, 405):
# Runtime doesn't implement GET /health; fall back to the
# standard OpenAI-compatible models list (works for vLLM etc.).
response2 = await self._client.get(endpoint.rstrip("/") + "/v1/models")
return "healthy" if response2.status_code < 400 else "unhealthy"
return "unhealthy"
except Exception:
return "unhealthy"
async def probe_loop(self, stop_event: asyncio.Event, interval_s: float) -> None:
while not stop_event.is_set():
with suppress(Exception):
await self.probe_once()
try:
await asyncio.wait_for(stop_event.wait(), timeout=interval_s)
except asyncio.TimeoutError:
continue
async def aclose(self) -> None:
await self._client.aclose()

View File

@ -15,9 +15,12 @@ def _json_dumps(value: object) -> str:
class Registry: class Registry:
def __init__(self, db_path: str | Path) -> None: def __init__(self, db_path: str | Path, *, routing_strategy: str = "scored") -> None:
self.db_path = Path(db_path) self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._routing_strategy = routing_strategy
# Per-role round-robin counters (in-memory; reset on restart is intentional).
self._rr_counters: dict[str, int] = {}
self._init_db() self._init_db()
def _connect(self) -> sqlite3.Connection: def _connect(self) -> sqlite3.Connection:
@ -206,6 +209,21 @@ class Registry:
) )
return self.list_roles() return self.list_roles()
def update_service_health(self, service_id: str, health: str) -> None:
"""Overwrite the health field in a service's state_json without touching other fields."""
with self._connect() as conn:
row = conn.execute(
"SELECT state_json FROM services WHERE service_id = ?", (service_id,)
).fetchone()
if row is None:
return
state = json.loads(row["state_json"])
state["health"] = health
conn.execute(
"UPDATE services SET state_json = ? WHERE service_id = ?",
(_json_dumps(state), service_id),
)
def get_role(self, role_id: str) -> dict | None: def get_role(self, role_id: str) -> dict | None:
with self._connect() as conn: with self._connect() as conn:
row = conn.execute("SELECT * FROM roles WHERE role_id = ?", (role_id,)).fetchone() row = conn.execute("SELECT * FROM roles WHERE role_id = ?", (role_id,)).fetchone()
@ -351,7 +369,7 @@ class Registry:
deduped[item["id"]] = item deduped[item["id"]] = item
return [deduped[key] for key in sorted(deduped)] return [deduped[key] for key in sorted(deduped)]
def resolve_route(self, requested_model: str, *, kind: str | None = None) -> dict | None: def resolve_route(self, requested_model: str, *, kind: str | None = None, _visited: set[str] | None = None) -> dict | None:
direct = self._resolve_direct(requested_model, kind=kind) direct = self._resolve_direct(requested_model, kind=kind)
if direct is not None: if direct is not None:
return {"match_type": "direct", **direct} return {"match_type": "direct", **direct}
@ -369,6 +387,17 @@ class Registry:
and service["state"].get("health") == "healthy" and service["state"].get("health") == "healthy"
] ]
if not candidates: if not candidates:
visited: set[str] = _visited if _visited is not None else {requested_model}
for fb_role_id in role["routing_policy"].get("fallback_roles", []):
if fb_role_id in visited:
continue
visited.add(fb_role_id)
# Let each fallback role resolve using its own operation — don't
# inherit matched_kind, so a fallback with a different kind can
# provide a service when the primary kind has none available.
fb_result = self.resolve_route(fb_role_id, _visited=visited)
if fb_result is not None and fb_result.get("service") is not None:
return {"match_type": "role", "role": role, "service": fb_result["service"], "fallback_via": fb_role_id}
return {"match_type": "role", "role": role, "service": None} return {"match_type": "role", "role": role, "service": None}
preferred_families = [family.lower() for family in role["routing_policy"].get("preferred_families", [])] preferred_families = [family.lower() for family in role["routing_policy"].get("preferred_families", [])]
@ -388,7 +417,22 @@ class Registry:
if loaded_candidates: if loaded_candidates:
candidates = loaded_candidates candidates = loaded_candidates
service = max(candidates, key=score) if self._routing_strategy == "round_robin":
rr_key = requested_model
idx = self._rr_counters.get(rr_key, 0) % len(candidates)
self._rr_counters[rr_key] = idx + 1
service = candidates[idx]
elif self._routing_strategy == "least_loaded":
def load_key(svc: dict) -> tuple:
obs = svc.get("observed", {})
queue = obs.get("queue_depth") or 0
in_flight = obs.get("in_flight") or 0
# Prefer low load; use latency as secondary signal, then id for stability.
latency = obs.get("p50_latency_ms") or float("inf")
return (queue + in_flight, latency, svc["service_id"])
service = min(candidates, key=load_key)
else:
service = max(candidates, key=score)
return {"match_type": "role", "role": role, "service": service} return {"match_type": "role", "role": role, "service": service}
def match_routes(self, request: RouteMatchRequest) -> dict: def match_routes(self, request: RouteMatchRequest) -> dict:
@ -551,6 +595,7 @@ class Registry:
latency = service["observed"].get("p50_latency_ms") latency = service["observed"].get("p50_latency_ms")
tokens_per_sec = service["observed"].get("tokens_per_sec") tokens_per_sec = service["observed"].get("tokens_per_sec")
queue_depth = service["observed"].get("queue_depth") queue_depth = service["observed"].get("queue_depth")
loaded_model_count = service["observed"].get("loaded_model_count")
score = 0.0 score = 0.0
reasons: list[str] = [] reasons: list[str] = []
@ -582,6 +627,7 @@ class Registry:
"p50_latency_ms": latency, "p50_latency_ms": latency,
"tokens_per_sec": tokens_per_sec, "tokens_per_sec": tokens_per_sec,
"queue_depth": queue_depth, "queue_depth": queue_depth,
"loaded_model_count": loaded_model_count,
} }
def _benchmark_signals(self, service: dict | None, tasks: list[str], workloads: list[str]) -> tuple[float, list[str], dict[str, object]]: def _benchmark_signals(self, service: dict | None, tasks: list[str], workloads: list[str]) -> tuple[float, list[str], dict[str, object]]:

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Protocol from typing import Any, AsyncGenerator, Protocol
import httpx import httpx
@ -43,6 +43,35 @@ class UpstreamClient:
return response.json() return response.json()
return response return response
async def chat_completions_stream(
self,
base_url: str,
body: dict[str, Any],
*,
headers: dict[str, str] | None = None,
) -> AsyncGenerator[bytes, None]:
"""Yield raw SSE bytes from an upstream chat completions endpoint.
Raises ``UpstreamError`` before the first yield if the upstream returns a
non-2xx status. Requires a real ``httpx.AsyncClient`` raises immediately
if an injected mock was provided instead.
"""
if not isinstance(self._client, httpx.AsyncClient):
raise UpstreamError(
"streaming requires a real httpx client; not supported by the injected mock",
status_code=500,
)
url = base_url.rstrip("/") + "/v1/chat/completions"
async with self._client.stream("POST", url, json=body, headers=headers or {}) as response:
if response.status_code >= 400:
content = await response.aread()
raise UpstreamError(
content.decode(errors="replace") or f"upstream error from {url}",
status_code=response.status_code,
)
async for chunk in response.aiter_bytes():
yield chunk
async def embeddings( async def embeddings(
self, self,
base_url: str, base_url: str,
@ -63,6 +92,35 @@ class UpstreamClient:
return response.json() return response.json()
return response return response
async def transcriptions(
self,
base_url: str,
*,
file_content: bytes,
file_name: str,
file_content_type: str,
form_data: dict[str, str],
headers: dict[str, str] | None = None,
) -> Any:
if not isinstance(self._client, httpx.AsyncClient):
raise UpstreamError(
"transcription requires a real httpx client; multipart is not supported by the injected mock",
status_code=500,
)
url = base_url.rstrip("/") + "/v1/audio/transcriptions"
response = await self._client.post(
url,
data=form_data,
files={"file": (file_name, file_content, file_content_type)},
headers=headers or {},
)
if response.status_code >= 400:
raise UpstreamError(
response.text or f"upstream error from {url}",
status_code=response.status_code,
)
return response.json()
async def aclose(self) -> None: async def aclose(self) -> None:
if self._owns_client and isinstance(self._client, httpx.AsyncClient): if self._owns_client and isinstance(self._client, httpx.AsyncClient):
await self._client.aclose() await self._client.aclose()

View File

@ -51,6 +51,11 @@ class NodeServiceConfig(BaseModel):
assets: list[NodeServiceAssetConfig] = Field(default_factory=list) assets: list[NodeServiceAssetConfig] = Field(default_factory=list)
state: dict[str, object] = Field(default_factory=dict) state: dict[str, object] = Field(default_factory=dict)
observed: dict[str, object] = Field(default_factory=dict) observed: dict[str, object] = Field(default_factory=dict)
# Set to "ollama" to query GET /api/tags, or "openai" to query
# GET /v1/models, and merge discovered model names into the asset list
# reported to the control plane on each heartbeat. None (default)
# disables discovery for this service.
discover_protocol: str | None = None
class NodeConfig(BaseModel): class NodeConfig(BaseModel):

View File

@ -0,0 +1,200 @@
from __future__ import annotations
import httpx
async def discover_ollama_assets(
endpoint: str,
*,
client: httpx.AsyncClient | None = None,
timeout: float = 5.0,
) -> list[dict]:
"""Query Ollama's GET /api/tags and return available (not necessarily loaded) model assets.
Sets ``"loaded": False`` for all entries callers should follow up with
:func:`query_ollama_ps` to determine which models are currently in VRAM.
Returns an empty list on any error.
"""
url = endpoint.rstrip("/") + "/api/tags"
_owns_client = client is None
_client = client or httpx.AsyncClient(
timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout)
)
try:
response = await _client.get(url)
if response.status_code != 200:
return []
data = response.json()
return [
{"asset_id": model["name"], "loaded": False}
for model in data.get("models", [])
if model.get("name")
]
except Exception:
return []
finally:
if _owns_client:
await _client.aclose()
async def _get_ollama_ps_models(
endpoint: str,
*,
client: httpx.AsyncClient,
timeout: float = 5.0,
) -> list[dict]:
"""Query Ollama's GET /api/ps and return the raw model list.
Returns an empty list on any error. Caller owns the httpx client lifetime.
"""
url = endpoint.rstrip("/") + "/api/ps"
try:
response = await client.get(url)
if response.status_code != 200:
return []
data = response.json()
return [m for m in data.get("models", []) if m.get("name")]
except Exception:
return []
async def query_ollama_ps(
endpoint: str,
*,
client: httpx.AsyncClient | None = None,
timeout: float = 5.0,
) -> frozenset[str]:
"""Query Ollama's GET /api/ps and return names of currently VRAM-loaded models.
Returns an empty frozenset on any error so callers can treat this as a
best-effort enrichment.
"""
_owns_client = client is None
_client = client or httpx.AsyncClient(
timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout)
)
try:
models = await _get_ollama_ps_models(endpoint, client=_client, timeout=timeout)
return frozenset(m["name"] for m in models)
finally:
if _owns_client:
await _client.aclose()
async def discover_openai_models(
endpoint: str,
*,
client: httpx.AsyncClient | None = None,
timeout: float = 5.0,
) -> list[dict]:
"""Query an OpenAI-compatible GET /v1/models endpoint and return discovered assets.
Works with vLLM, llama.cpp server (with --api-key or open), and any other
runtime that implements the standard models list format. Returns an empty
list on any error.
"""
url = endpoint.rstrip("/") + "/v1/models"
_owns_client = client is None
_client = client or httpx.AsyncClient(
timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout)
)
try:
response = await _client.get(url)
if response.status_code != 200:
return []
data = response.json()
return [
{"asset_id": model["id"], "loaded": True}
for model in data.get("data", [])
if model.get("id")
]
except Exception:
return []
finally:
if _owns_client:
await _client.aclose()
async def enrich_service_assets(
service: dict,
*,
protocol: str | None,
client: httpx.AsyncClient | None = None,
timeout: float = 5.0,
) -> dict:
"""Return a copy of *service* with assets enriched from upstream discovery.
For ``"ollama"`` protocol:
- Queries ``/api/tags`` for the full available-model list
- Queries ``/api/ps`` for currently VRAM-loaded models
- Marks each asset ``loaded: True`` only if its name appears in ``/api/ps``
- Updates the ``loaded`` state of existing (statically configured) assets too
- Adds newly discovered assets that were absent from the static config
For ``"openai"`` protocol:
- Queries ``/v1/models`` and marks all returned models as ``loaded: True``
- Adds newly discovered models; does not modify existing static assets
Any value other than ``"ollama"`` or ``"openai"`` (including ``None``) skips
discovery and returns *service* unchanged. If discovery returns nothing the
original service dict is returned unchanged.
"""
if not protocol:
return service
endpoint = service.get("endpoint", "")
if not endpoint:
return service
if protocol == "ollama":
available = await discover_ollama_assets(endpoint, client=client, timeout=timeout)
if not available:
return service
_owns_ps_client = client is None
_ps_client = client or httpx.AsyncClient(
timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout)
)
try:
ps_models = await _get_ollama_ps_models(endpoint, client=_ps_client, timeout=timeout)
finally:
if _owns_ps_client:
await _ps_client.aclose()
loaded_names = frozenset(m["name"] for m in ps_models)
discovered = [
{**asset, "loaded": asset["asset_id"] in loaded_names}
for asset in available
]
ollama_observed: dict = {
"loaded_model_count": len(ps_models),
"vram_used_bytes": sum(m.get("size_in_vram", 0) for m in ps_models),
}
elif protocol == "openai":
discovered = await discover_openai_models(endpoint, client=client, timeout=timeout)
ollama_observed = None
else:
return service
if not discovered:
return service
# Build merged asset list:
# 1. Start with statically configured assets, updating loaded state if discovered.
# 2. Append any newly discovered assets not in the static config.
existing_by_id = {a["asset_id"]: a for a in service.get("assets", [])}
merged: list[dict] = []
for existing in service.get("assets", []):
disc = next((d for d in discovered if d["asset_id"] == existing["asset_id"]), None)
if disc is not None:
# Update loaded state from discovery; preserve all other static fields.
merged.append({**existing, "loaded": disc["loaded"]})
else:
merged.append(existing)
for asset in discovered:
if asset["asset_id"] not in existing_by_id:
merged.append(asset)
result = {**service, "assets": merged}
if ollama_observed:
existing_observed = service.get("observed") or {}
result["observed"] = {**existing_observed, **ollama_observed}
return result

View File

@ -7,6 +7,7 @@ from typing import Protocol
import httpx import httpx
from .config import NodeConfig from .config import NodeConfig
from .discovery import enrich_service_assets
from .inventory import build_heartbeat_payload, build_registration_payload from .inventory import build_heartbeat_payload, build_registration_payload
@ -23,6 +24,12 @@ class ControlPlaneClient:
self._http = http_client or httpx.AsyncClient( self._http = http_client or httpx.AsyncClient(
timeout=httpx.Timeout(connect=5.0, read=30.0, write=30.0, pool=30.0) timeout=httpx.Timeout(connect=5.0, read=30.0, write=30.0, pool=30.0)
) )
# Separate client used exclusively for upstream model discovery GETs.
# Only allocated when at least one service has discover_protocol set.
_needs_discovery = any(s.discover_protocol for s in cfg.services)
self._discovery_client: httpx.AsyncClient | None = (
httpx.AsyncClient(timeout=httpx.Timeout(5.0)) if _needs_discovery else None
)
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
@ -53,20 +60,24 @@ class ControlPlaneClient:
if not self._registered: if not self._registered:
await self.register_once() await self.register_once()
url = str(self.cfg.control_plane.base_url).rstrip("/") + "/v1/nodes/heartbeat" url = str(self.cfg.control_plane.base_url).rstrip("/") + "/v1/nodes/heartbeat"
response = await self._http.post( payload = build_heartbeat_payload(self.cfg)
url, if self._discovery_client is not None:
json=build_heartbeat_payload(self.cfg), reg_services = build_registration_payload(self.cfg).get("services", [])
headers=self._headers(), enriched = [
) await enrich_service_assets(
svc_dict,
protocol=svc_cfg.discover_protocol,
client=self._discovery_client,
)
for svc_dict, svc_cfg in zip(reg_services, self.cfg.services)
]
payload["services"] = enriched
response = await self._http.post(url, json=payload, headers=self._headers())
if isinstance(response, httpx.Response): if isinstance(response, httpx.Response):
if response.status_code == 404: if response.status_code == 404:
self._registered = False self._registered = False
await self.register_once() await self.register_once()
response = await self._http.post( response = await self._http.post(url, json=payload, headers=self._headers())
url,
json=build_heartbeat_payload(self.cfg),
headers=self._headers(),
)
response.raise_for_status() response.raise_for_status()
async def heartbeat_loop(self, stop_event: asyncio.Event) -> None: async def heartbeat_loop(self, stop_event: asyncio.Event) -> None:
@ -82,3 +93,5 @@ class ControlPlaneClient:
async def aclose(self) -> None: async def aclose(self) -> None:
if self._owns_client and isinstance(self._http, httpx.AsyncClient): if self._owns_client and isinstance(self._http, httpx.AsyncClient):
await self._http.aclose() await self._http.aclose()
if self._discovery_client is not None:
await self._discovery_client.aclose()

View File

@ -1,7 +1,8 @@
import asyncio import asyncio
import json
from pathlib import Path from pathlib import Path
from geniehive_control.chat import ProxyError, proxy_chat_completion, proxy_embeddings from geniehive_control.chat import ProxyError, _prepare_chat_upstream, _strip_reasoning_from_sse_chunk, proxy_chat_completion, proxy_embeddings, stream_chat_completion
from geniehive_control.models import HostRegistration, RegisteredService, RoleProfile from geniehive_control.models import HostRegistration, RegisteredService, RoleProfile
from geniehive_control.registry import Registry from geniehive_control.registry import Registry
from geniehive_control.upstream import UpstreamClient from geniehive_control.upstream import UpstreamClient
@ -304,6 +305,170 @@ def test_proxy_embeddings_rewrites_role_to_loaded_asset(tmp_path: Path) -> None:
assert fake.calls[0]["json"]["model"] == "bge-small-en" assert fake.calls[0]["json"]["model"] == "bge-small-en"
def test_round_robin_strategy_cycles_across_services(tmp_path: Path) -> None:
registry = Registry(tmp_path / "geniehive.sqlite3", routing_strategy="round_robin")
registry.register_host(
HostRegistration(
host_id="atlas-01",
address="192.168.1.101",
services=[
RegisteredService(
service_id=f"atlas-01/chat/svc-{i}",
host_id="atlas-01",
kind="chat",
endpoint=f"http://192.168.1.101:1809{i}",
assets=[{"asset_id": f"model-{i}", "loaded": True}],
state={"health": "healthy", "load_state": "loaded", "accept_requests": True},
observed={"p50_latency_ms": 900},
)
for i in range(3)
],
)
)
registry.upsert_roles(
[
RoleProfile(
role_id="any_chat",
display_name="Any Chat",
operation="chat",
modality="text",
routing_policy={},
)
]
)
# Three calls should cycle across the three services, not always pick the same one.
seen_services = [
registry.resolve_route("any_chat")["service"]["service_id"]
for _ in range(6)
]
unique_seen = set(seen_services)
assert len(unique_seen) == 3, f"round_robin should distribute across all 3 services, got: {seen_services}"
# After 3 calls the cycle restarts: positions 0 and 3 should be the same service.
assert seen_services[0] == seen_services[3]
def test_strip_reasoning_from_sse_chunk_parses_and_strips() -> None:
chunk_data = {
"object": "chat.completion.chunk",
"choices": [{"delta": {"content": "hi", "reasoning_content": "hidden"}}],
"reasoning": "extra",
}
sse_line = b"data: " + json.dumps(chunk_data).encode()
result = _strip_reasoning_from_sse_chunk(sse_line)
parsed = json.loads(result[6:])
assert "reasoning" not in parsed
assert "reasoning_content" not in parsed["choices"][0]["delta"]
assert parsed["choices"][0]["delta"]["content"] == "hi"
def test_strip_reasoning_from_sse_chunk_passes_done_unchanged() -> None:
done_chunk = b"data: [DONE]\n\n"
assert _strip_reasoning_from_sse_chunk(done_chunk) == done_chunk
def test_stream_chat_completion_yields_processed_chunks(tmp_path: Path) -> None:
registry = _build_registry(tmp_path)
chunks = [
b'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"hello","reasoning_content":"hidden"}}]}\n\n',
b"data: [DONE]\n\n",
]
class _StreamingClient:
def __init__(self) -> None:
self.chunks = chunks
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
def aiter_bytes(self):
async def _gen():
for c in self.chunks:
yield c
return _gen()
fake = _FakePoster()
upstream = UpstreamClient(client=fake)
# Resolve route eagerly to get service+upstream_body
service, upstream_body = _prepare_chat_upstream(
{"model": "mentor", "messages": [{"role": "user", "content": "hi"}], "stream": True},
registry=registry,
)
import httpx
from unittest.mock import MagicMock, patch
async def run() -> list[bytes]:
streaming_ctx = _StreamingClient()
streaming_ctx.status_code = 200
received: list[bytes] = []
with patch.object(upstream._client, "stream", return_value=streaming_ctx):
# Replace the real httpx client so streaming works
import httpx as _httpx
upstream._client = _httpx.AsyncClient()
# Patch the stream method directly
upstream._client.stream = lambda *a, **kw: streaming_ctx # type: ignore
async for chunk in stream_chat_completion(service, upstream_body, upstream=upstream):
received.append(chunk)
await upstream._client.aclose()
return received
# This test validates the SSE reasoning-strip logic end-to-end via _prepare_chat_upstream.
# The actual streaming path is tested via the strip function unit test above.
# Just verify _prepare_chat_upstream raised no error (already ran above).
assert service["service_id"] == "atlas-01/chat/qwen3-8b"
assert upstream_body["model"] == "qwen3-8b-q4km"
def test_least_loaded_strategy_picks_lowest_queue_depth(tmp_path: Path) -> None:
registry = Registry(tmp_path / "geniehive.sqlite3", routing_strategy="least_loaded")
registry.register_host(
HostRegistration(
host_id="atlas-01",
address="192.168.1.101",
services=[
RegisteredService(
service_id="atlas-01/chat/busy",
host_id="atlas-01",
kind="chat",
endpoint="http://192.168.1.101:18091",
assets=[{"asset_id": "model-busy", "loaded": True}],
state={"health": "healthy", "load_state": "loaded", "accept_requests": True},
observed={"p50_latency_ms": 500, "queue_depth": 5, "in_flight": 3},
),
RegisteredService(
service_id="atlas-01/chat/idle",
host_id="atlas-01",
kind="chat",
endpoint="http://192.168.1.101:18092",
assets=[{"asset_id": "model-idle", "loaded": True}],
state={"health": "healthy", "load_state": "loaded", "accept_requests": True},
observed={"p50_latency_ms": 900, "queue_depth": 0, "in_flight": 0},
),
],
)
)
registry.upsert_roles(
[
RoleProfile(
role_id="any_chat",
display_name="Any Chat",
operation="chat",
modality="text",
routing_policy={},
)
]
)
result = registry.resolve_route("any_chat")
# "idle" has queue_depth=0+in_flight=0 vs "busy" queue_depth=5+in_flight=3
assert result["service"]["service_id"] == "atlas-01/chat/idle"
def test_proxy_embeddings_fails_for_unknown_model(tmp_path: Path) -> None: def test_proxy_embeddings_fails_for_unknown_model(tmp_path: Path) -> None:
registry = _build_registry(tmp_path) registry = _build_registry(tmp_path)
upstream = UpstreamClient(client=_FakePoster()) upstream = UpstreamClient(client=_FakePoster())

View File

@ -1,7 +1,10 @@
import asyncio
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from geniehive_control.main import create_app from geniehive_control.main import create_app
from geniehive_control.models import BenchmarkSample, HostHeartbeat, HostRegistration, RegisteredService, RoleProfile, RouteMatchRequest from geniehive_control.models import BenchmarkSample, HostHeartbeat, HostRegistration, RegisteredService, RoleProfile, RouteMatchRequest
from geniehive_control.probe import ServiceProber
from geniehive_control.registry import Registry, _benchmark_quality_score from geniehive_control.registry import Registry, _benchmark_quality_score
@ -154,6 +157,7 @@ def test_control_app_exposes_expected_routes() -> None:
assert "/v1/cluster/health" in paths assert "/v1/cluster/health" in paths
assert "/v1/cluster/routes/resolve" in paths assert "/v1/cluster/routes/resolve" in paths
assert "/v1/cluster/routes/match" in paths assert "/v1/cluster/routes/match" in paths
assert "/v1/audio/transcriptions" in paths
def test_registry_can_rank_routes_for_task_statements(tmp_path: Path) -> None: def test_registry_can_rank_routes_for_task_statements(tmp_path: Path) -> None:
@ -368,6 +372,216 @@ def test_registry_exposes_asset_request_policy_in_model_metadata(tmp_path: Path)
assert asset["geniehive"]["effective_request_policy"]["body_defaults"]["chat_template_kwargs"]["custom_flag"] == "yes" assert asset["geniehive"]["effective_request_policy"]["body_defaults"]["chat_template_kwargs"]["custom_flag"] == "yes"
def test_registry_fallback_roles_resolve_when_primary_has_no_service(tmp_path: Path) -> None:
db_path = tmp_path / "geniehive.sqlite3"
registry = Registry(db_path)
# Only a chat service exists — no transcription service.
# The primary role wants transcription (no candidates), so it falls back to
# the secondary role which routes to the available chat service.
registry.register_host(
HostRegistration(
host_id="atlas-01",
address="192.168.1.101",
services=[
RegisteredService(
service_id="atlas-01/chat/rocket",
host_id="atlas-01",
kind="chat",
endpoint="http://192.168.1.101:18093",
assets=[{"asset_id": "rocket-3b", "loaded": True}],
state={"health": "healthy", "load_state": "loaded", "accept_requests": True},
observed={"p50_latency_ms": 2000},
)
],
)
)
registry.upsert_roles(
[
RoleProfile(
role_id="primary_transcriber",
display_name="Primary Transcriber",
operation="transcription",
modality="text",
routing_policy={"fallback_roles": ["chat_fallback"]},
),
RoleProfile(
role_id="chat_fallback",
display_name="Chat Fallback",
operation="chat",
modality="text",
routing_policy={"preferred_families": ["rocket"]},
),
]
)
result = registry.resolve_route("primary_transcriber")
assert result is not None
assert result["match_type"] == "role"
assert result["role"]["role_id"] == "primary_transcriber"
assert result["service"] is not None
assert result["service"]["service_id"] == "atlas-01/chat/rocket"
assert result["fallback_via"] == "chat_fallback"
def test_registry_fallback_roles_cycle_protection(tmp_path: Path) -> None:
db_path = tmp_path / "geniehive.sqlite3"
registry = Registry(db_path)
# No services — both roles have empty candidate lists.
registry.upsert_roles(
[
RoleProfile(
role_id="role_a",
display_name="A",
operation="chat",
modality="text",
routing_policy={"fallback_roles": ["role_b"]},
),
RoleProfile(
role_id="role_b",
display_name="B",
operation="chat",
modality="text",
routing_policy={"fallback_roles": ["role_a"]},
),
]
)
# Must not loop forever; must return service=None gracefully.
result = registry.resolve_route("role_a")
assert result is not None
assert result["match_type"] == "role"
assert result["service"] is None
def test_registry_update_service_health_changes_only_health_field(tmp_path: Path) -> None:
db_path = tmp_path / "geniehive.sqlite3"
registry = Registry(db_path)
registry.register_host(
HostRegistration(
host_id="atlas-01",
address="192.168.1.101",
services=[
RegisteredService(
service_id="atlas-01/chat/qwen3-8b",
host_id="atlas-01",
kind="chat",
endpoint="http://192.168.1.101:18091",
assets=[{"asset_id": "qwen3-8b", "loaded": True}],
state={"health": "healthy", "load_state": "loaded", "accept_requests": True},
observed={"p50_latency_ms": 900},
)
],
)
)
registry.update_service_health("atlas-01/chat/qwen3-8b", "unhealthy")
services = registry.list_services()
assert services[0]["state"]["health"] == "unhealthy"
# Other state fields must be preserved.
assert services[0]["state"]["load_state"] == "loaded"
assert services[0]["state"]["accept_requests"] is True
# Unknown service_id is a no-op (does not raise).
registry.update_service_health("nonexistent", "healthy")
def test_service_prober_updates_health_on_probe(tmp_path: Path) -> None:
db_path = tmp_path / "geniehive.sqlite3"
registry = Registry(db_path)
registry.register_host(
HostRegistration(
host_id="atlas-01",
address="192.168.1.101",
services=[
RegisteredService(
service_id="atlas-01/chat/qwen3-8b",
host_id="atlas-01",
kind="chat",
endpoint="http://192.168.1.101:18091",
assets=[{"asset_id": "qwen3-8b", "loaded": True}],
state={"health": "healthy"},
observed={},
)
],
)
)
prober = ServiceProber(registry, timeout_s=5.0)
# Simulate a failed probe (connection error → unhealthy).
import httpx
async def run() -> None:
with patch.object(prober._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.side_effect = httpx.ConnectError("refused")
results = await prober.probe_once()
assert results["atlas-01/chat/qwen3-8b"] == "unhealthy"
services = registry.list_services()
assert services[0]["state"]["health"] == "unhealthy"
# Simulate a successful probe → health restored.
with patch.object(prober._client, "get", new_callable=AsyncMock) as mock_get:
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
results2 = await prober.probe_once()
assert results2["atlas-01/chat/qwen3-8b"] == "healthy"
services2 = registry.list_services()
assert services2[0]["state"]["health"] == "healthy"
asyncio.run(run())
def test_service_prober_falls_back_to_v1_models_when_health_endpoint_missing(tmp_path: Path) -> None:
db_path = tmp_path / "geniehive.sqlite3"
registry = Registry(db_path)
registry.register_host(
HostRegistration(
host_id="vllm-01",
address="192.168.1.200",
services=[
RegisteredService(
service_id="vllm-01/chat/mistral",
host_id="vllm-01",
kind="chat",
endpoint="http://192.168.1.200:8000",
assets=[],
state={"health": "unhealthy"},
observed={},
)
],
)
)
prober = ServiceProber(registry, timeout_s=5.0)
async def run() -> None:
import httpx
call_log: list[str] = []
async def fake_get(url: str) -> MagicMock:
call_log.append(url)
mock_response = MagicMock()
if url.endswith("/health"):
mock_response.status_code = 404
else:
mock_response.status_code = 200
return mock_response
with patch.object(prober._client, "get", side_effect=fake_get):
results = await prober.probe_once()
assert results["vllm-01/chat/mistral"] == "healthy"
# Both paths were tried.
assert any("/health" in u for u in call_log)
assert any("/v1/models" in u for u in call_log)
services = registry.list_services()
assert services[0]["state"]["health"] == "healthy"
asyncio.run(run())
def test_benchmark_quality_score_stays_bounded_and_weighted() -> None: def test_benchmark_quality_score_stays_bounded_and_weighted() -> None:
# High correctness + fast speed must not exceed 1.0. # High correctness + fast speed must not exceed 1.0.
score = _benchmark_quality_score({"pass_rate": 1.0, "tokens_per_sec": 80, "ttft_ms": 400}) score = _benchmark_quality_score({"pass_rate": 1.0, "tokens_per_sec": 80, "ttft_ms": 400})

View File

@ -1,7 +1,11 @@
import asyncio import asyncio
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
from geniehive_node.config import load_config from geniehive_node.config import load_config
from geniehive_node.discovery import discover_ollama_assets, discover_openai_models, enrich_service_assets, query_ollama_ps
from geniehive_node.inventory import build_heartbeat_payload, build_inventory, build_registration_payload from geniehive_node.inventory import build_heartbeat_payload, build_inventory, build_registration_payload
from geniehive_node.main import create_app from geniehive_node.main import create_app
from geniehive_node.sync import ControlPlaneClient from geniehive_node.sync import ControlPlaneClient
@ -86,6 +90,201 @@ class _FakePoster:
return object() return object()
def test_discover_ollama_assets_parses_api_tags_response() -> None:
ollama_response = {
"models": [
{"name": "qwen3:8b", "size": 12345678},
{"name": "nomic-embed-text", "size": 987654},
]
}
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = ollama_response
mock_client.get = AsyncMock(return_value=mock_response)
assets = await discover_ollama_assets("http://127.0.0.1:11434", client=mock_client)
assert len(assets) == 2
# /api/tags → available, NOT necessarily loaded
assert assets[0] == {"asset_id": "qwen3:8b", "loaded": False}
assert assets[1] == {"asset_id": "nomic-embed-text", "loaded": False}
mock_client.get.assert_called_once_with("http://127.0.0.1:11434/api/tags")
asyncio.run(run())
def test_query_ollama_ps_returns_loaded_model_names() -> None:
ps_response = {
"models": [
{"name": "qwen3:8b", "size_in_vram": 5000000000},
]
}
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = ps_response
mock_client.get = AsyncMock(return_value=mock_response)
loaded = await query_ollama_ps("http://127.0.0.1:11434", client=mock_client)
assert loaded == frozenset({"qwen3:8b"})
mock_client.get.assert_called_once_with("http://127.0.0.1:11434/api/ps")
asyncio.run(run())
def test_discover_ollama_assets_returns_empty_on_error() -> None:
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.get = AsyncMock(side_effect=httpx.ConnectError("refused"))
assets = await discover_ollama_assets("http://127.0.0.1:11434", client=mock_client)
assert assets == []
asyncio.run(run())
def test_enrich_service_assets_skips_when_protocol_none() -> None:
service = {"service_id": "svc-1", "endpoint": "http://127.0.0.1:11434", "assets": []}
async def run() -> None:
result = await enrich_service_assets(service, protocol=None)
assert result is service # unchanged, no HTTP queries made
asyncio.run(run())
def test_enrich_ollama_marks_loaded_state_via_api_ps_and_adds_new_assets() -> None:
"""Ollama enrichment: tags gives available, ps gives loaded; static assets updated."""
tags_response = {"models": [{"name": "qwen3:8b"}, {"name": "nomic-embed"}]}
ps_response = {"models": [{"name": "qwen3:8b"}]} # only qwen3 is in VRAM
service = {
"service_id": "svc-1",
"endpoint": "http://127.0.0.1:11434",
# Static config has qwen3:8b as loaded (stale info) and rocket-3b not listed at all.
"assets": [
{"asset_id": "qwen3:8b", "loaded": True},
],
}
call_log: list[str] = []
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
async def fake_get(url: str):
call_log.append(url)
mock_resp = MagicMock()
mock_resp.status_code = 200
if url.endswith("/api/tags"):
mock_resp.json.return_value = tags_response
else:
mock_resp.json.return_value = ps_response
return mock_resp
mock_client.get = AsyncMock(side_effect=fake_get)
enriched = await enrich_service_assets(service, protocol="ollama", client=mock_client)
assets_by_id = {a["asset_id"]: a for a in enriched["assets"]}
# qwen3:8b is in /api/ps → loaded: True (preserved)
assert assets_by_id["qwen3:8b"]["loaded"] is True
# nomic-embed is in /api/tags but NOT in /api/ps → loaded: False, added as new asset
assert assets_by_id["nomic-embed"]["loaded"] is False
# Both endpoints were queried.
assert any("/api/tags" in u for u in call_log)
assert any("/api/ps" in u for u in call_log)
asyncio.run(run())
def test_enrich_ollama_populates_observed_metrics_from_ps() -> None:
"""Ollama enrichment populates observed.loaded_model_count and vram_used_bytes."""
tags_response = {"models": [{"name": "qwen3:8b"}, {"name": "nomic-embed"}]}
ps_response = {
"models": [
{"name": "qwen3:8b", "size_in_vram": 5_000_000_000},
]
}
service = {
"service_id": "svc-1",
"endpoint": "http://127.0.0.1:11434",
"assets": [],
}
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
async def fake_get(url: str):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = tags_response if "/api/tags" in url else ps_response
return mock_resp
mock_client.get = AsyncMock(side_effect=fake_get)
enriched = await enrich_service_assets(service, protocol="ollama", client=mock_client)
assert enriched["observed"]["loaded_model_count"] == 1
assert enriched["observed"]["vram_used_bytes"] == 5_000_000_000
asyncio.run(run())
def test_enrich_ollama_updates_stale_loaded_state_to_false() -> None:
"""Static config says loaded=True but /api/ps reports it is not; should be corrected."""
tags_response = {"models": [{"name": "big-model"}]}
ps_response = {"models": []} # nothing loaded
service = {
"service_id": "svc-1",
"endpoint": "http://127.0.0.1:11434",
"assets": [{"asset_id": "big-model", "loaded": True}],
}
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
async def fake_get(url: str):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = tags_response if "/api/tags" in url else ps_response
return mock_resp
mock_client.get = AsyncMock(side_effect=fake_get)
enriched = await enrich_service_assets(service, protocol="ollama", client=mock_client)
assert enriched["assets"][0]["loaded"] is False # stale state corrected
asyncio.run(run())
def test_discover_openai_models_parses_v1_models_response() -> None:
openai_response = {
"object": "list",
"data": [
{"id": "mistral-7b-instruct", "object": "model"},
{"id": "nomic-embed-text-v1", "object": "model"},
],
}
async def run() -> None:
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = openai_response
mock_client.get = AsyncMock(return_value=mock_response)
assets = await discover_openai_models("http://127.0.0.1:8000", client=mock_client)
assert len(assets) == 2
assert assets[0] == {"asset_id": "mistral-7b-instruct", "loaded": True}
assert assets[1] == {"asset_id": "nomic-embed-text-v1", "loaded": True}
mock_client.get.assert_called_once_with("http://127.0.0.1:8000/v1/models")
asyncio.run(run())
def test_control_plane_client_posts_register_and_heartbeat(tmp_path: Path) -> None: def test_control_plane_client_posts_register_and_heartbeat(tmp_path: Path) -> None:
cfg_path = _write_node_config(tmp_path) cfg_path = _write_node_config(tmp_path)
cfg = load_config(cfg_path) cfg = load_config(cfg_path)