From e2b10001987eeb28ad106db7a6ee17694eacc04f Mon Sep 17 00:00:00 2001 From: welberr Date: Mon, 27 Apr 2026 14:12:54 -0400 Subject: [PATCH] =?UTF-8?q?P1=E2=80=93P2=20complete:=20routing=20strategie?= =?UTF-8?q?s,=20streaming,=20discovery,=20observed=20metrics=20+=20role=20?= =?UTF-8?q?catalogs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Control plane: - fallback_roles chain in resolve_route() with cycle protection - round_robin and least_loaded routing strategies; default_strategy dispatches all three - Streaming chat completions: async generator, eager route resolution, SSE reasoning-strip - POST /v1/audio/transcriptions proxy (multipart, dedicated httpx path) - ServiceProber background task: probes /health, falls back to /v1/models for vLLM - ServiceObserved gains loaded_model_count and vram_used_bytes - _runtime_signals exposes loaded_model_count to route scoring Node agent: - discover_protocol: "ollama"|"openai"|null per-service config field - discovery.py: discover_ollama_assets (loaded: False), _get_ollama_ps_models helper, query_ollama_ps, discover_openai_models, enrich_service_assets (two-phase Ollama, corrects stale loaded state, populates observed metrics from /api/ps) - Heartbeat zips service dicts with config to pass protocol; allocates discovery client only when needed Tests: 47 passing (up from 19) Role catalogs (example configs): - roles.surgical-team.example.yaml — Brooks/Mills surgical team (surg_ prefix, 9 roles) - roles.belbin.example.yaml — Belbin team roles (belbin_ prefix, 9 roles) - roles.sixhats.example.yaml — De Bono Six Thinking Hats (sixhats_ prefix, 6 roles) - roles.disney.example.yaml — Disney creative strategy (disney_ prefix, 3 roles) - roles.xp.example.yaml — XP team roles (xp_ prefix, 5 roles) Co-Authored-By: Claude Sonnet 4.6 --- configs/control.example.yaml | 1 + configs/control.singlebox.example.yaml | 1 + configs/control.singlebox.p40.example.yaml | 1 + configs/roles.belbin.example.yaml | 224 +++++++++++++++++++++ configs/roles.disney.example.yaml | 88 ++++++++ configs/roles.sixhats.example.yaml | 154 ++++++++++++++ configs/roles.surgical-team.example.yaml | 222 ++++++++++++++++++++ configs/roles.xp.example.yaml | 139 +++++++++++++ docs/roadmap.md | 167 +++++++-------- src/geniehive_control/chat.py | 95 ++++++++- src/geniehive_control/config.py | 8 + src/geniehive_control/main.py | 83 +++++++- src/geniehive_control/models.py | 2 + src/geniehive_control/probe.py | 59 ++++++ src/geniehive_control/registry.py | 52 ++++- src/geniehive_control/upstream.py | 60 +++++- src/geniehive_node/config.py | 5 + src/geniehive_node/discovery.py | 200 ++++++++++++++++++ src/geniehive_node/sync.py | 33 ++- tests/test_control_chat.py | 167 ++++++++++++++- tests/test_control_registry.py | 214 ++++++++++++++++++++ tests/test_node_inventory.py | 199 ++++++++++++++++++ 22 files changed, 2050 insertions(+), 124 deletions(-) create mode 100644 configs/roles.belbin.example.yaml create mode 100644 configs/roles.disney.example.yaml create mode 100644 configs/roles.sixhats.example.yaml create mode 100644 configs/roles.surgical-team.example.yaml create mode 100644 configs/roles.xp.example.yaml create mode 100644 src/geniehive_control/probe.py create mode 100644 src/geniehive_node/discovery.py diff --git a/configs/control.example.yaml b/configs/control.example.yaml index 0292ae4..1234fc5 100644 --- a/configs/control.example.yaml +++ b/configs/control.example.yaml @@ -15,3 +15,4 @@ roles_path: "configs/roles.example.yaml" routing: health_stale_after_s: 30 + default_strategy: "scored" # or "round_robin" or "least_loaded" diff --git a/configs/control.singlebox.example.yaml b/configs/control.singlebox.example.yaml index fac73d0..5d3efb5 100644 --- a/configs/control.singlebox.example.yaml +++ b/configs/control.singlebox.example.yaml @@ -15,3 +15,4 @@ roles_path: "configs/roles.example.yaml" routing: health_stale_after_s: 30 + default_strategy: "scored" # or "round_robin" or "least_loaded" diff --git a/configs/control.singlebox.p40.example.yaml b/configs/control.singlebox.p40.example.yaml index 18012a5..e780e46 100644 --- a/configs/control.singlebox.p40.example.yaml +++ b/configs/control.singlebox.p40.example.yaml @@ -15,3 +15,4 @@ roles_path: "configs/roles.singlebox.p40.example.yaml" routing: health_stale_after_s: 30 + default_strategy: "scored" # or "round_robin" or "least_loaded" diff --git a/configs/roles.belbin.example.yaml b/configs/roles.belbin.example.yaml new file mode 100644 index 0000000..f0fdfb0 --- /dev/null +++ b/configs/roles.belbin.example.yaml @@ -0,0 +1,224 @@ +# Belbin Team Roles catalog — Meredith Belbin, "Management Teams: Why They Succeed or Fail" (1981). +# +# Derived from years of team simulation research at Henley Management College. Belbin identified +# nine distinct contributions that effective teams need; the core insight is that a team requires +# role *diversity*, not skill duplication. Every role has characteristic strengths and allowable +# weaknesses — the weaknesses are treated as the price of the strength, not failures to fix. +# +# Role ID prefix: belbin_ +# +# Fallback chains: +# belbin_resource_investigator → belbin_plant (both are divergent, possibility-oriented) +# belbin_shaper → belbin_coordinator (both drive decisions and unblock work) +# belbin_teamworker → belbin_coordinator (both manage team process) +# belbin_completer_finisher → belbin_monitor_evaluator (both are evaluative and quality-focused) + +roles: + + # ── Plant ───────────────────────────────────────────────────────────────────────────────── + # Creative problem-solver. Generates original ideas and approaches, often unconventional. + # Belbin's "Plant" is planted in the team to seed new thinking when the group is stuck. + # Characteristic weakness: ignores incidentals, may communicate poorly with practical members. + - role_id: "belbin_plant" + display_name: "Plant" + description: >- + Generates original ideas and novel solutions. Thinks unconventionally. + Does not self-evaluate while generating — that is another role's job. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Plant. Generate original ideas and novel solutions. Think + unconventionally — the obvious approach is rarely the one you offer. Do not + self-censor or evaluate while generating; that is someone else's role. If asked + to solve a problem, offer multiple approaches that differ in kind, not just degree. + Ignore resource constraints and prior commitments at the ideation stage. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 16384 + + # ── Resource Investigator ───────────────────────────────────────────────────────────────── + # Explores what already exists and can be adapted. Finds contacts, analogues, and external + # resources. Enthusiastic at the start; needs direction to maintain focus. + # Characteristic weakness: loses enthusiasm after initial excitement; can be over-optimistic. + - role_id: "belbin_resource_investigator" + display_name: "Resource Investigator" + description: >- + Finds what already exists that bears on the problem. Identifies external resources, + prior art, and analogous approaches. Falls back to belbin_plant. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Resource Investigator. Find out what already exists that is relevant + to the problem. Identify external resources, existing solutions, prior art, and + analogues from other domains. Think in terms of what can be borrowed, adapted, or + connected — not built from scratch. Prioritise breadth of discovery over depth + of analysis; the Monitor Evaluator will assess what you surface. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + fallback_roles: ["belbin_plant"] + + # ── Coordinator ─────────────────────────────────────────────────────────────────────────── + # Clarifies goals, organises effort, and promotes decision-making. Delegates effectively. + # Manages the process rather than the content. Confident, mature, trusts the team. + # Characteristic weakness: can be seen as manipulative; may offload personal work. + - role_id: "belbin_coordinator" + display_name: "Coordinator" + description: >- + Clarifies goals, identifies decisions that need to be made, delegates, and keeps + the work moving. Process-oriented rather than content-oriented. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Coordinator. Clarify goals, identify what each part of the work + requires, and ensure decisions get made. When given a task or competing + priorities, decompose it, assign notional responsibility, and surface the + decisions that are being avoided. Keep work moving without taking over + technical decisions — those belong to the relevant specialist. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + require_loaded: true + + # ── Shaper ──────────────────────────────────────────────────────────────────────────────── + # Drives the work forward. Challenges, pressures, and finds ways around obstacles. + # High energy; makes things happen when the team is stalling. + # Characteristic weakness: prone to provocation and short-temperedness. + - role_id: "belbin_shaper" + display_name: "Shaper" + description: >- + Drives momentum. Challenges assumptions, cuts through inertia, finds ways around + obstacles. Falls back to belbin_coordinator. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Shaper. Drive the work forward. Challenge assumptions, cut through + inertia, and find ways around obstacles. If something is stuck, push. If a + decision is being avoided, name it. Be direct and willing to create discomfort — + your job is momentum, not harmony. Propose a path forward even when the + information is incomplete. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 4096 + fallback_roles: ["belbin_coordinator"] + + # ── Monitor Evaluator ───────────────────────────────────────────────────────────────────── + # Analyses options dispassionately. Judges accurately. Slow to decide but rarely wrong. + # The team's quality filter for major decisions; immune to enthusiasm-driven errors. + # Characteristic weakness: lacks inspiration; can be overly critical and dampen morale. + - role_id: "belbin_monitor_evaluator" + display_name: "Monitor Evaluator" + description: >- + Dispassionate analysis of options. Weighs evidence without advocacy. Identifies + strengths, weaknesses, and hidden assumptions in proposals. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Monitor Evaluator. Analyse options dispassionately. Weigh evidence + without advocacy or enthusiasm. When presented with proposals or decisions, + identify the strengths, weaknesses, risks, and hidden assumptions of each option. + Do not be swayed by momentum or the energy of the proposer. Your job is accurate + judgment — a correct assessment delivered slowly is better than a pleasing one + delivered quickly. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 16384 + + # ── Teamworker ──────────────────────────────────────────────────────────────────────────── + # Maintains team cohesion. Diplomatic, perceptive, averts friction before it escalates. + # Listens, builds, and averts. Most valuable when tension is high. + # Characteristic weakness: indecisive under pressure; avoids confrontation. + - role_id: "belbin_teamworker" + display_name: "Teamworker" + description: >- + Maintains cohesion and mutual understanding. Identifies where parties are talking + past each other and finds formulations that preserve everyone's core concern. + Falls back to belbin_coordinator. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Teamworker. Support the team and maintain cohesion. Identify where + people are talking past each other, where a position has been misunderstood, or + where tension is building unnecessarily. Find formulations and framings that + preserve everyone's core concern. Your job is to keep the collaboration + functional — not to win arguments or take sides. + routing_policy: + preferred_families: ["qwen3", "mistral", "llama3"] + min_context: 4096 + require_loaded: true + fallback_roles: ["belbin_coordinator"] + + # ── Implementer ─────────────────────────────────────────────────────────────────────────── + # Turns strategy and plans into concrete, sequential action. Disciplined, reliable, + # efficient. Prefers established approaches. Gets things done. + # Characteristic weakness: slow to respond to new possibilities; inflexible. + - role_id: "belbin_implementer" + display_name: "Implementer" + description: >- + Turns plans and decisions into concrete, ordered steps. Disciplined and reliable. + Prefers proven approaches over novel ones. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Implementer. Turn plans and decisions into concrete, actionable + steps. When given a strategy or decision, produce the practical implementation: + what needs to be done, in what order, by what means, and with what dependencies. + Prefer established approaches over novel ones. Your job is reliable execution — + not creative reinvention of what has already been decided. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 8192 + + # ── Completer Finisher ──────────────────────────────────────────────────────────────────── + # Painstaking attention to detail. Searches for errors and omissions. Ensures nothing + # slips through. Delivers on time. Polishes the work to the required standard. + # Characteristic weakness: reluctant to delegate; can be a perfectionist. + - role_id: "belbin_completer_finisher" + display_name: "Completer Finisher" + description: >- + Finds what others have missed. Reviews for errors, omissions, and inconsistencies. + Ensures work is actually finished, not just declared finished. Falls back to + belbin_monitor_evaluator. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Completer Finisher. Find what others have missed. Review work for + errors, omissions, inconsistencies, and ambiguities. Check that every requirement + has been addressed, every edge case considered, and every output is at the + required standard. Do not accept "good enough" — your job is to ensure the work + is actually finished, not just declared finished. Pay attention to the details + others consider beneath notice. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 16384 + fallback_roles: ["belbin_monitor_evaluator"] + + # ── Specialist ──────────────────────────────────────────────────────────────────────────── + # Deep expert in a narrow domain. Self-starting within their area. Contributes only on + # a narrow front but that contribution is irreplaceable. + # Characteristic weakness: dwells on technicalities; overlooks the bigger picture. + - role_id: "belbin_specialist" + display_name: "Specialist" + description: >- + Provides deep, precise expertise in a specific domain. Authoritative within that + domain; explicitly bounded outside it. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Specialist. Provide deep, precise expertise in your domain. Give + authoritative answers — including nuances, exceptions, version differences, and + current best practice. Do not range beyond your expertise speculatively; + acknowledge the boundary explicitly and direct to a more appropriate source + when the question falls outside it. Depth and precision matter more than breadth. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 8192 diff --git a/configs/roles.disney.example.yaml b/configs/roles.disney.example.yaml new file mode 100644 index 0000000..c64b916 --- /dev/null +++ b/configs/roles.disney.example.yaml @@ -0,0 +1,88 @@ +# Disney Creative Strategy catalog — documented by Robert Dilts, "Strategies of Genius" (1994). +# +# Derived from observation of Walt Disney's working method. Disney reportedly separated +# creative work into three distinct modes and used different physical spaces for each, +# refusing to mix them. The gain is the same as de Bono's hats: by preventing evaluation +# from contaminating generation, and generation from contaminating planning, each phase +# can proceed without the inhibitions the other phases would impose. +# +# The natural pipeline order is: dreamer → realist → critic → (back to dreamer if needed) +# +# Role ID prefix: disney_ +# +# Fallback chains: +# disney_realist → disney_critic (if no planning model, critical review is the nearest +# productive substitute — it will surface gaps) +# disney_critic → disney_realist (if no critical model, realist's concreteness exposes +# many of the same structural weaknesses) + +roles: + + # ── Dreamer ─────────────────────────────────────────────────────────────────────────────── + # Generates ideas without constraint. No budget, no timeline, no prior commitments apply. + # Nothing is impossible in the Dreamer phase. The Dreamer's output feeds the Realist. + - role_id: "disney_dreamer" + display_name: "Dreamer" + description: >- + Generates ideas freely and without constraint. Nothing is impossible at this stage. + Evaluation is entirely suspended — that belongs to the Critic. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Dreamer. Generate ideas freely and without constraint. In this + phase there are no bad ideas, no budget limits, no technical constraints, and + no prior commitments. Explore the full space of what could be. Do not evaluate, + qualify, or hedge — that comes later. If asked whether something is possible, + assume it is and describe it fully. The Realist and Critic will deal with + feasibility; your job is vision. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + + # ── Realist ─────────────────────────────────────────────────────────────────────────────── + # Takes the dream and makes it practical. Defines steps, resources, timeline, and + # dependencies. Stays faithful to the dream's intent while finding a path to execution. + # The Realist does not kill ideas — they make ideas buildable. + - role_id: "disney_realist" + display_name: "Realist" + description: >- + Turns the dream into a practical plan. Defines steps, resources, timeline, and + dependencies. Faithful to the dream's intent. Falls back to disney_critic. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Realist. Take the idea and make it practical. Define the steps, + resources, timeline, and dependencies needed to realise it. Identify what needs + to be built, acquired, or learned. Stay faithful to the dream's intent — do not + silently downscope it. Where the dream is vague, make it concrete. Where it is + impractical, find the nearest practical substitute that preserves the core intent. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 8192 + fallback_roles: ["disney_critic"] + + # ── Critic ──────────────────────────────────────────────────────────────────────────────── + # Stress-tests the plan. Finds what is missing, what will not work, and what the Dreamer + # and Realist overlooked. The Critic's goal is not to kill the idea but to make it robust + # before commitment is made. + - role_id: "disney_critic" + display_name: "Critic" + description: >- + Stress-tests the plan. Finds omissions, structural weaknesses, and unconsidered + risks. Goal is robustness, not rejection. Falls back to disney_realist. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Critic. Stress-test the plan. Find what is missing, what will not + work as described, what has been assumed without evidence, and what could derail + execution. Ask the questions the Dreamer and Realist avoided. Your goal is not + to kill the idea but to make it robust — identify the weak points specifically + so they can be addressed before commitment. For each weakness you find, note + whether it is fatal, fixable, or merely worth watching. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + fallback_roles: ["disney_realist"] diff --git a/configs/roles.sixhats.example.yaml b/configs/roles.sixhats.example.yaml new file mode 100644 index 0000000..0c0b39d --- /dev/null +++ b/configs/roles.sixhats.example.yaml @@ -0,0 +1,154 @@ +# Six Thinking Hats catalog — Edward de Bono, "Six Thinking Hats" (1985). +# +# Six cognitive modes used to structure deliberate thinking. The core discipline is +# separation: each hat is worn exclusively, preventing the confusion that arises when +# advocacy, critique, creativity, and fact-gathering happen simultaneously. De Bono +# reportedly modelled this on Disney's practice of using separate physical rooms for +# each mode. +# +# In LLM terms, each hat is a constrained reasoning posture enforced by the system prompt. +# A pipeline that routes the same question through white → green → yellow → black → blue +# produces more rigorous output than a single model trying to do all six at once. +# +# Role ID prefix: sixhats_ +# +# Fallback chains: +# sixhats_black → sixhats_white (if no critical model, fall back to factual reporting) +# sixhats_green → sixhats_yellow (if no creative model, optimistic generation is closest) +# +# Note: sixhats_blue (process control) is the natural orchestrator of the other five. +# In an agentic pipeline, route to sixhats_blue first to plan which hats to apply. + +roles: + + # ── White Hat ───────────────────────────────────────────────────────────────────────────── + # Facts and data only. What is known, what is unknown, what information is missing. + # No interpretation, no preference, no evaluation. + - role_id: "sixhats_white" + display_name: "White Hat" + description: >- + Facts and data only. Reports what is known, what is unknown, and what is needed. + No interpretation or evaluation. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are wearing the White Hat. Report only facts and data. State what is known, + what is not known, and what information is missing or would be needed to proceed. + Do not interpret, evaluate, or recommend — only describe the factual landscape + as accurately as possible. If asked for an opinion, redirect to what the data + does or does not show. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + + # ── Red Hat ─────────────────────────────────────────────────────────────────────────────── + # Emotion, intuition, and gut reaction. No justification required or expected. + # Surfaces what logic alone cannot — the affective dimension of a decision. + - role_id: "sixhats_red" + display_name: "Red Hat" + description: >- + Emotions and intuitions without justification. Surfaces the affective dimension + of a decision that analytical thinking alone cannot capture. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are wearing the Red Hat. Express emotional responses and intuitions directly + and without justification. A gut reaction is valid here without supporting + evidence. If something feels wrong, say so. If something feels promising, say + so. Your job is to surface the affective and intuitive dimension of a question — + not to persuade, not to analyse, just to report the feeling honestly. + routing_policy: + preferred_families: ["qwen3", "mistral", "llama3"] + min_context: 4096 + + # ── Black Hat ───────────────────────────────────────────────────────────────────────────── + # Critical judgment and caution. Why something might fail, what the risks are, + # what assumptions are incorrect. The most valuable hat for avoiding serious errors. + - role_id: "sixhats_black" + display_name: "Black Hat" + description: >- + Critical judgment. Identifies risks, failure modes, and incorrect assumptions. + Does not balance criticism with praise — that is another hat's job. + Falls back to sixhats_white. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are wearing the Black Hat. Identify every way this could go wrong. Apply + rigorous critical judgment: find the flaws, the risks, the incorrect assumptions, + and the conditions under which this fails. Do not balance your criticism with + praise — that is the Yellow Hat's job. Do not generate alternatives — that is + the Green Hat's job. Your role is focused, rigorous caution. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + fallback_roles: ["sixhats_white"] + + # ── Yellow Hat ──────────────────────────────────────────────────────────────────────────── + # Optimism and value. Identifies benefits, strengths, and the conditions under which + # something works. Ensures the good in an idea is fully articulated before critique lands. + - role_id: "sixhats_yellow" + display_name: "Yellow Hat" + description: >- + Identifies value and benefits. Finds the best-case interpretation and the genuine + strengths of a proposal. Does not balance enthusiasm with caution. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are wearing the Yellow Hat. Identify the value and benefits in every + proposal. Find the best-case interpretation, the conditions under which this + succeeds, and the genuine strengths. Do not balance enthusiasm with caution — + that is the Black Hat's job. Your role is to ensure that what is good about + an idea is fully articulated and not lost in the rush to criticise. + routing_policy: + preferred_families: ["qwen3", "mistral", "llama3"] + min_context: 4096 + + # ── Green Hat ───────────────────────────────────────────────────────────────────────────── + # Creativity and lateral thinking. Generates alternatives, variations, and provocations. + # Evaluation is explicitly suspended — quantity and variety matter more than quality here. + - role_id: "sixhats_green" + display_name: "Green Hat" + description: >- + Creative alternatives, lateral moves, and provocations. Generates without judging. + Falls back to sixhats_yellow (optimistic generation is the nearest substitute). + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are wearing the Green Hat. Generate alternatives, variations, and creative + departures. Propose modifications, lateral moves, and unexpected angles. Do not + evaluate what you generate — suspend judgment entirely while producing. If one + direction runs dry, try another. Quantity and variety matter more than quality + at this stage; the other hats will do the selecting. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + fallback_roles: ["sixhats_yellow"] + + # ── Blue Hat ────────────────────────────────────────────────────────────────────────────── + # Meta-thinking and process control. Organises what kind of thinking is needed next. + # Summarises where the group stands. Manages the sequence of hats. + # The natural orchestrator in a multi-role pipeline. + - role_id: "sixhats_blue" + display_name: "Blue Hat" + description: >- + Process control and meta-thinking. Organises which hats to apply and in what + order, summarises current state, and identifies what remains. Natural pipeline + orchestrator. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are wearing the Blue Hat. Manage the thinking process. When given a problem + or a set of inputs, identify what kind of thinking is needed next, summarise + where the group currently stands, and name what has been covered and what + remains. Your job is process clarity, not content contribution. Think about + thinking. When asked to plan an analysis, specify which hats to apply and why. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + require_loaded: true diff --git a/configs/roles.surgical-team.example.yaml b/configs/roles.surgical-team.example.yaml new file mode 100644 index 0000000..048d6b5 --- /dev/null +++ b/configs/roles.surgical-team.example.yaml @@ -0,0 +1,222 @@ +# Surgical Team role catalog — F.P. Brooks Jr., "The Mythical Man-Month" (1975/1995), Chapter 3. +# +# Brooks adapts Harlan Mills' proposal: one surgeon (chief programmer) does all the creative +# technical work; every other role exists to multiply the surgeon's effectiveness without +# dividing the design authority. "Ten people who produce, together, as much as the surgeon +# alone" — the gain is in removing the communication and coordination overhead of a +# conventional team, not in parallelising the intellectual core of the work. +# +# Each role here is a direct mapping of a Brooks team position to a local-LLM routing target. +# Designed for single-box Ollama testing. See control.singlebox.example.yaml and +# node.singlebox.ollama.example.yaml for the matching infrastructure configuration. +# +# Role ID prefix: surg_ +# All role IDs in this catalog use the surg_ prefix to indicate membership in the +# surgical-team conceptual group. This namespaces them from roles defined in other +# catalogs (e.g. agile_, xp_) and makes group membership visible at a glance. +# +# Fallback chains: +# surg_copilot → surg_chief_programmer +# surg_toolsmith → surg_chief_programmer +# surg_language_lawyer → surg_chief_programmer +# surg_tester → surg_copilot +# surg_editor → surg_copilot +# surg_program_clerk → surg_administrator +# +# Note: Brooks' two secretaries have no LLM analogue and are omitted. + +roles: + + # ── Chief Programmer (The Surgeon) ─────────────────────────────────────────────────────── + # Defines the design and writes all the code. Every significant technical decision passes + # through here. Needs the most capable model available and the widest context window. + - role_id: "surg_chief_programmer" + display_name: "Chief Programmer" + description: >- + Primary design and implementation role. All creative technical decisions. + Needs maximum reasoning capability and the largest context window available. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the chief programmer. Define the design, write the code, and take full + ownership of technical decisions. Work completely — do not sketch or stub unless + explicitly asked. Reason through trade-offs before committing to an approach. + Prefer correctness and clarity over cleverness. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 32768 + + # ── Co-pilot ───────────────────────────────────────────────────────────────────────────── + # An intellectual peer of the surgeon who thinks alongside them: reviews everything the + # chief programmer produces, can write any part of the code, but does not make the + # primary design decisions. The surgeon's sounding board and first line of review. + - role_id: "surg_copilot" + display_name: "Co-pilot" + description: >- + Peer reviewer and backup to the chief programmer. Reviews code and design, + identifies edge cases and missed requirements. Falls back to surg_chief_programmer. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the co-pilot programmer. Review and critique what the chief programmer + produces. Think independently — do not simply validate. Name edge cases, + ambiguities, and missed requirements explicitly. When you agree, say why. + When you disagree, be specific and constructive. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "llama3"] + min_context: 16384 + fallback_roles: ["surg_chief_programmer"] + + # ── Toolsmith ──────────────────────────────────────────────────────────────────────────── + # Builds the supporting tools, scripts, macros, and automation that the surgical team needs. + # Brooks notes the surgeon needs a good toolsmith to ensure the environment stays productive. + # Output is consumed by the team as infrastructure, not shown to end-users directly. + - role_id: "surg_toolsmith" + display_name: "Toolsmith" + description: >- + Builds team tooling: scripts, automation, build helpers, and utility libraries. + Falls back to surg_chief_programmer for code generation when no coder model is loaded. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the toolsmith. Build the scripts, automation, and utilities the team + needs to work effectively. Prioritise reliability and composability over + surface features. Your output is used by other team members as infrastructure. + When building a tool, include basic error handling and usage comments. + routing_policy: + preferred_families: ["qwen2.5-coder", "qwen3", "deepseek-coder"] + min_context: 16384 + fallback_roles: ["surg_chief_programmer"] + + # ── Language Lawyer ────────────────────────────────────────────────────────────────────── + # Expert in the languages and runtimes in use. Called when the team needs a precise, + # authoritative answer — not a best guess — on syntax, semantics, library behaviour, + # version differences, or obscure features. Brooks: "one per team is enough." + - role_id: "surg_language_lawyer" + display_name: "Language Lawyer" + description: >- + Authoritative source on language and runtime precision. Edge cases, semantics, + version differences. Falls back to surg_chief_programmer. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the language lawyer. Give authoritative, precise answers on language + syntax, semantics, standard library behaviour, and version differences. Always + cover edge cases and common misconceptions. Cite the specification or official + documentation where it is relevant. Do not guess or approximate — if you are + uncertain, say so explicitly. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + fallback_roles: ["surg_chief_programmer"] + + # ── Tester ─────────────────────────────────────────────────────────────────────────────── + # Designs test cases against the contract and then tests the system against them. + # Thinks adversarially: boundary conditions, invalid inputs, concurrency, failure modes. + # Brooks separates the tester from the surgeon to prevent the author from testing their + # own work and missing their own blind spots. + - role_id: "surg_tester" + display_name: "Tester" + description: >- + Adversarial test case generation. Probes boundaries, failure modes, and invalid inputs. + Falls back to surg_copilot. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the tester. Your job is to find failures before they reach production. + Generate test cases that cover boundary values, invalid inputs, concurrency + hazards, and error paths. Think adversarially — never assume the happy path. + For any function, interface, or system described to you, identify what can go + wrong and how you would expose it. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "llama3"] + min_context: 8192 + fallback_roles: ["surg_copilot"] + + # ── Editor ─────────────────────────────────────────────────────────────────────────────── + # Takes the surgeon's draft documentation and improves it for clarity, structure, and + # consistency. Does not introduce new technical decisions and does not omit existing ones. + # Brooks stresses that the surgeon must write; the editor makes that writing publishable. + - role_id: "surg_editor" + display_name: "Editor" + description: >- + Documentation and prose quality. Improves clarity and structure without changing + technical content. Falls back to surg_copilot. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the editor. Improve the clarity, structure, and consistency of + documentation and written prose. Preserve the author's technical intent exactly — + do not introduce new technical decisions or silently remove existing ones. + Flag ambiguous statements. Prefer plain language over jargon where a plain + alternative exists without loss of precision. + routing_policy: + preferred_families: ["qwen3", "mistral", "llama3"] + min_context: 8192 + fallback_roles: ["surg_copilot"] + + # ── Program Clerk ──────────────────────────────────────────────────────────────────────── + # Maintains the programming product library: source files, build artifacts, change records, + # and test logs. Brooks emphasises that the clerk is keeper of both machine-readable and + # human-readable records, freeing the surgeon from administrative record-keeping. + - role_id: "surg_program_clerk" + display_name: "Program Clerk" + description: >- + Structured record-keeping. Catalogs source, artifacts, changelogs, and test results. + Prefers machine-readable output. Falls back to surg_administrator. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the program clerk. Maintain precise, structured records of source files, + build artifacts, changelogs, and test results. When asked to catalog or organise, + produce consistent, predictably formatted output — prefer tables, lists, or JSON + over prose. Flag discrepancies, missing entries, or version mismatches explicitly. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "phi4"] + min_context: 8192 + require_loaded: true + fallback_roles: ["surg_administrator"] + + # ── Administrator ──────────────────────────────────────────────────────────────────────── + # Handles everything outside the technical work: personnel, scheduling, priorities, and + # resource allocation. Brooks is clear that the surgeon has final say on technical + # matters; the administrator keeps all non-technical load off the surgeon's desk. + - role_id: "surg_administrator" + display_name: "Administrator" + description: >- + Logistics and coordination. Priorities, scheduling, resource allocation, status + summaries. Defers all technical decisions to the chief programmer. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the administrator. Handle logistics: priorities, scheduling, resource + allocation, and process coordination. Produce concise, actionable summaries. + Surface conflicts and blockers early. Do not make technical decisions — + flag them for the chief programmer. Keep your output brief and task-oriented. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 4096 + require_loaded: true + + # ── Semantic Index ─────────────────────────────────────────────────────────────────────── + # Brooks does not name this role, but semantic retrieval over the product library is a + # natural complement to the program clerk in an LLM-assisted team. Provides vector + # embeddings for code, documentation, and artifact search. + - role_id: "surg_semantic_index" + display_name: "Semantic Index" + description: >- + Embeddings for semantic search over code, documentation, and artifacts. + Supporting capability for the program clerk's retrieval and cross-reference tasks. + operation: "embeddings" + modality: "text" + routing_policy: + preferred_families: ["nomic-embed-text", "mxbai-embed-large", "bge"] + require_loaded: true diff --git a/configs/roles.xp.example.yaml b/configs/roles.xp.example.yaml new file mode 100644 index 0000000..b7015dc --- /dev/null +++ b/configs/roles.xp.example.yaml @@ -0,0 +1,139 @@ +# Extreme Programming team roles — Kent Beck, "Extreme Programming Explained" (1999). +# +# XP's team is deliberately small and each role is defined by responsibility rather than +# hierarchy. The key structural insight is the separation of the Customer (who defines +# what needs to be built and owns the acceptance criteria) from the Programmer (who +# decides how to build it). The Coach and Tracker are meta-roles: one improves how the +# team works, the other measures whether the work is on track. +# +# These roles were defined for co-located software teams but map naturally to LLM routing +# targets for code-related agentic workflows. +# +# Role ID prefix: xp_ +# +# Fallback chains: +# xp_tester → xp_programmer (tester and programmer are tightly coupled in XP; +# programmer can generate test cases if no tester model) +# xp_tracker → xp_coach (both are meta-roles about the team's work, not the code) + +roles: + + # ── Customer ────────────────────────────────────────────────────────────────────────────── + # Defines what needs to be built and why. Writes acceptance criteria. Prioritises the + # work by business and user value. In XP the customer is a full team member, not an + # external stakeholder who reviews completed work. + - role_id: "xp_customer" + display_name: "Customer" + description: >- + Defines requirements and acceptance criteria. Prioritises by user and business + value. Owns the definition of done. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Customer. Define what needs to be built and why. Write acceptance + criteria that are specific enough to verify — describe the behaviour the + finished work must exhibit, not the implementation. Prioritise from a user + and business value perspective. When requirements are ambiguous, make them + concrete. Own the definition of done; do not delegate it. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + + # ── Programmer ──────────────────────────────────────────────────────────────────────────── + # Writes production code and unit tests. Estimates effort honestly. Implements the + # simplest thing that could possibly work, then refactors. In XP, the programmer also + # writes tests — the Tester focuses on acceptance tests, not unit tests. + - role_id: "xp_programmer" + display_name: "Programmer" + description: >- + Writes production code and unit tests. Estimates honestly. Implements the + simplest solution that works, then refactors. Code quality is the programmer's + direct responsibility. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Programmer. Write clean, working code with tests. Estimate effort + honestly — neither optimistically nor as a negotiating position. When + implementing a feature, write the simplest code that could possibly work, then + refactor for clarity. Do not over-engineer for hypothetical future requirements. + Unit tests are your responsibility; acceptance tests belong to the Customer + and Tester. Own the technical quality of what you produce. + routing_policy: + preferred_families: ["qwen2.5-coder", "qwen3", "deepseek-coder"] + min_context: 16384 + + # ── Tester ──────────────────────────────────────────────────────────────────────────────── + # Helps the Customer write acceptance tests. Thinks systematically about what could go + # wrong. Finds cases the Programmer and Customer did not think of. Makes requirements + # testable and the definition of done precise. + - role_id: "xp_tester" + display_name: "Tester" + description: >- + Writes and refines acceptance tests with the Customer. Makes requirements testable + and the definition of done verifiable. Falls back to xp_programmer. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Tester. Help define and execute acceptance tests. Think + systematically about what the software must do and what could go wrong. Work + with the Customer to make requirements testable and unambiguous. Find the + cases that the Programmer and Customer did not think of — boundary values, + invalid inputs, missing error paths. Your job is to make the definition of + done precise and verifiable, not merely declared. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 8192 + fallback_roles: ["xp_programmer"] + + # ── Coach ───────────────────────────────────────────────────────────────────────────────── + # Understands the XP practices deeply enough to adapt them to context. Guides without + # commanding. Intervenes when practices slip — not to enforce rules but to restore + # the principles behind them. + - role_id: "xp_coach" + display_name: "Coach" + description: >- + Guides the team's process without commanding it. Understands principles deeply + enough to adapt practices to context. Intervenes when the team is stuck or + practices are slipping. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Coach. Guide the team's process without commanding it. When + practices are slipping or the team is stuck, name what is happening and suggest + a correction grounded in the underlying principle, not just the rule. Understand + the XP practices well enough to know when adapting them is appropriate and when + abandoning them is a mistake. Your job is to make the team better at working + together — not to do the work yourself. + routing_policy: + preferred_families: ["qwen3", "qwen2.5", "mistral"] + min_context: 8192 + + # ── Tracker ─────────────────────────────────────────────────────────────────────────────── + # Monitors progress against estimates and commitments. Measures velocity. Raises alarms + # early and specifically. Does not pressure the team — surfaces facts and lets the team + # respond. Beck emphasises that the tracker asks, not tells. + - role_id: "xp_tracker" + display_name: "Tracker" + description: >- + Tracks progress against estimates and commitments. Raises alarms early and + specifically without pressure. Honest accounting, not motivation. + Falls back to xp_coach. + operation: "chat" + modality: "text" + prompt_policy: + system_prompt: >- + You are the Tracker. Monitor progress against estimates and commitments. + Measure velocity honestly. When actual progress diverges from the plan, raise + the alarm early and specifically — do not wait for the deadline, and do not + soften the numbers. Do not pressure the team; surface the facts and let the + team respond to them. Your job is honest accounting: the gap between what was + planned and what is happening, stated plainly. + routing_policy: + preferred_families: ["qwen3", "qwen2.5"] + min_context: 4096 + require_loaded: true + fallback_roles: ["xp_coach"] diff --git a/docs/roadmap.md b/docs/roadmap.md index 1f463a8..4d73088 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -1,6 +1,6 @@ # GenieHive Roadmap -Last updated: 2026-04-27 +Last updated: 2026-04-27 (P0–P2 complete + routing strategies + streaming + Ollama load state + observed metrics) ## What Is Complete @@ -24,11 +24,17 @@ The v1 core is implemented and tested. - Per-asset and per-role policies, merged with role winning on prompts - Qwen3 / Qwen3.5 auto-detection with `enable_thinking: false` applied automatically +**Client-facing proxy:** +- `POST /v1/audio/transcriptions` — proxies multipart audio to upstream; uses a + real httpx client for multipart form-data (not the injectable `AsyncPoster` Protocol) + **Route matching and scoring:** - `POST /v1/cluster/routes/match` — scored candidate list for role and service targets - Signals: text overlap, preferred family, runtime (loaded state, latency, throughput, queue depth), benchmark (workload overlap, quality score) - `GET /v1/cluster/routes/resolve` — quick single-model resolution +- `fallback_roles` chain in `resolve_route()` — walks role fallbacks with cycle + protection; each fallback resolves using its own operation (not the primary's kind) **Benchmark infrastructure:** - Built-in workloads: `chat.short_reasoning`, `chat.concise_support` @@ -43,116 +49,89 @@ The v1 core is implemented and tested. - Client API key (`X-Api-Key`) and node registration key (`X-GenieHive-Node-Key`) - Empty key lists disable auth for development +**Active health probing (control plane):** +- `ServiceProber` in `probe.py` probes each service's `GET /health` endpoint +- Health divergences update the registry's `state_json` without touching other fields +- Background `probe_loop` task launched at app startup when + `routing.probe_interval_s > 0` (default 0 = disabled, relies on node heartbeats) +- Configurable via `routing.probe_interval_s` and `routing.probe_timeout_s` + +**Routing strategies — all three implemented:** +- `routing.default_strategy` in config; `Registry(routing_strategy=...)` dispatches +- `scored` (default): picks best-scoring service per role +- `round_robin`: cycles through healthy candidates; in-memory counter, resets on restart +- `least_loaded`: picks service with lowest `queue_depth + in_flight` from observed + metrics; falls back to latency as a secondary signal when load metrics are equal + +**Streaming chat completions:** +- `UpstreamClient.chat_completions_stream()` — async generator, yields raw SSE bytes + using `httpx.AsyncClient.stream()`; raises `UpstreamError` before first yield on + non-2xx status +- `_prepare_chat_upstream()` extracted from `proxy_chat_completion` — synchronous + routing/policy step so `ProxyError` can be caught before `StreamingResponse` is created +- `stream_chat_completion()` — async generator wrapping `chat_completions_stream`, + applies `_strip_reasoning_from_sse_chunk()` to each SSE data line +- Route handler detects `body.get("stream")`, resolves route eagerly, returns + `StreamingResponse` with `Cache-Control: no-cache, X-Accel-Buffering: no` + +**Upstream model discovery (node agent):** +- `discover_ollama_assets()` — queries `/api/tags`; marks all as `loaded: False` + (available, not necessarily in VRAM) +- `_get_ollama_ps_models()` — internal helper; queries `/api/ps`; returns raw model + list (with `size_in_vram` etc.) for reuse without extra HTTP requests +- `query_ollama_ps()` — public wrapper; returns frozenset of VRAM-loaded model names +- `discover_openai_models()` — queries `/v1/models`; marks all as `loaded: True` +- `enrich_service_assets(service, *, protocol)` — for `"ollama"`: two-phase query + (tags + ps); updates `loaded` state of existing static assets as well as adding + new ones; stale `loaded: True` in config gets corrected to `False` if the model + isn't in `/api/ps`; populates `observed.loaded_model_count` and + `observed.vram_used_bytes` from `/api/ps` response +- Per-service `discover_protocol: "ollama" | "openai" | null` config field +- Heartbeat zips service dicts with config objects to pass protocol correctly +- Separate httpx discovery client allocated only when any service opts in + +**`ServiceObserved` extended:** +- `loaded_model_count: int | None` — number of models currently in VRAM (from Ollama `/api/ps`) +- `vram_used_bytes: int | None` — total VRAM used across loaded models +- Both exposed in `_runtime_signals` signals dict for route scoring visibility + **Tests:** - Registry, chat proxy, node inventory, benchmark runner, full demo flow -- All passing +- ServiceProber probe_once, update_service_health, discover_ollama_assets, + enrich_service_assets, observed metrics population — all passing (47 total) --- ## Known Gaps and Issues -These are confirmed gaps in the current implementation, not aspirational items. +No confirmed gaps remain in the current implementation. Improvement areas: -### 1. Transcription endpoint not implemented +### 1. Discovery covers Ollama and OpenAI-compatible; faster-whisper not covered -`POST /v1/audio/transcriptions` is listed in the architecture and wired into -`main.py`, but there is no upstream proxy handler for it. `upstream.py` has no -`transcriptions()` method. The endpoint currently returns nothing useful. +Transcription services (faster-whisper, WhisperX) don't expose `/api/tags` or +`/v1/models`. A `discover_protocol: "whisper"` variant could query +`GET /inference/v1/models` or read a static manifest. -### 2. Routing strategy field is ignored +### 2. `architecture.md` could be tightened further -`RoutingConfig.default_strategy` exists in `config.py` (default: `"loaded_first"`), -but `resolve_route()` in `registry.py` does not read it. There is effectively only -one strategy. The field is misleading. - -### 3. Role fallback chain is not implemented - -`RoutingPolicy.fallback_roles` is defined in `models.py` and appears in the schema -docs, but `resolve_route()` never consults it. A role that fails to match any service -fails outright rather than trying its fallbacks. - -### 4. `_benchmark_quality_score` can exceed 1.0 before clamping - -`pass_rate` and `quality_score` are taken as `max()`, then `tokens_per_sec` and -`ttft_ms` are *added* on top. A service with `pass_rate=1.0`, fast tokens, and low -TTFT accumulates a score of up to 1.6 before the final `min(1.0, quality)` clamp. -This means the additive bonuses have no effect once pass_rate or quality_score is -already high, which is probably not the intended behavior. - -### 5. Health is self-reported only - -Service health (`healthy` / `unhealthy`) comes entirely from node-reported state. -The control plane does not probe upstream endpoints. A service can appear healthy -while its endpoint is unreachable. - -### 6. No active model discovery from upstream services - -The node agent scans for `.gguf` files on disk and reads static service config. -It does not query running Ollama or vLLM instances for their loaded model list. -A freshly-pulled Ollama model will not appear until the node config is updated -and the agent restarted. - -### 7. `docs/architecture.md` duplicates `GENIEWARREN_SPEC.md` - -`architecture.md` contains the repo-naming rationale, name alternatives, and -implementation sequence list that are only meaningful in a design/proposal context. -These are noise in a reference architecture document. +Minor: some sections inherited from earlier drafts could be simplified now that +the implementation is stable. --- -## Immediate Next Work (Priority Order) +## Next Work -### P0 — Fix confirmed bugs +1. **Live end-to-end demo** — run control + node against a real upstream (Ollama + or llama.cpp) and validate: chat via role, direct asset addressing, Ollama + dynamic discovery with correct load state, `least_loaded` routing with real + VRAM metrics, and streaming. -1. **Remove the misleading `default_strategy` field** or implement a dispatch table - so the config field actually selects behavior. Simplest fix: delete the field and - the dead config surface until a second strategy is implemented. +2. **Validate Codex-friendly `/v1/models` offload** — test `GET /v1/models` as + a programmatic service catalog for a Claude Code or Codex client selecting + a GenieHive-hosted model for lower-complexity subtasks. -2. **Fix `_benchmark_quality_score`** so additive bonuses apply only when no - `pass_rate` / `quality_score` is available, or restructure as a weighted average - so the components don't stack additively. - -### P1 — Complete stated v1 scope - -3. **Implement transcription proxy** — add `upstream.transcriptions()` and wire - the handler in `chat.py` and `main.py`. - -4. **Implement role fallback chain** — when `resolve_route()` finds no matching - service for a role, walk `fallback_roles` in order before failing. - -### P2 — Close the most important self-reported-only gaps - -5. **Add active health probing** — the control plane should periodically probe - registered service endpoints (a lightweight `GET /health` or `GET /v1/models` - is sufficient) and update health state independently of node heartbeats. - -6. **Add upstream model discovery for Ollama** — query `GET /api/tags` (Ollama) - or `GET /v1/models` (OpenAI-compatible) from the node agent and merge loaded - model names into the service's asset list. This enables dynamic model tracking - without config restarts. - -### P3 — Documentation cleanup - -7. **Revise `architecture.md`** — remove the design-phase repo-naming rationale - and first-implementation-sequence list; replace with a description of the actual - running system (the four layers as implemented, data flow diagram if possible). - -8. **Update `roadmap.md`** — this file (done). - ---- - -## Near-Term Milestones (After P0–P3) - -- **Live LLM demo** — run control + node against a real upstream (Ollama or - llama.cpp) and document the end-to-end flow, including chat via role and - direct asset addressing -- **Validate Codex-friendly `/v1/models` offload** — test `GET /v1/models` as - a programmatic service catalog for a Claude Code or Codex client selecting - a GenieHive-hosted model for lower-complexity subtasks -- **Richer node metrics** — queue depth, in-flight count, and rolling performance - averages reported from node to control on every heartbeat -- **Second routing strategy** — implement `round_robin` or `least_loaded` as a - second selectable strategy, then make `default_strategy` actually dispatch +3. **`queue_depth` / `in_flight` from Ollama** — populate from `/api/ps` model + count or from a sidecar queue tracker; currently only set from static config. --- diff --git a/src/geniehive_control/chat.py b/src/geniehive_control/chat.py index 86ebffe..36f0c4b 100644 --- a/src/geniehive_control/chat.py +++ b/src/geniehive_control/chat.py @@ -1,6 +1,9 @@ from __future__ import annotations -from typing import Any +import json +from typing import Any, AsyncGenerator + +from fastapi import UploadFile from .request_policy import apply_request_policy, effective_chat_request_policy, select_target_asset from .registry import Registry @@ -27,12 +30,35 @@ def _strip_reasoning_fields(payload: Any) -> Any: cleaned[key] = _strip_reasoning_fields(value) return cleaned -async def proxy_chat_completion( + +def _strip_reasoning_from_sse_chunk(chunk: bytes) -> bytes: + """Strip reasoning fields from SSE chunk data lines when parseable.""" + lines = chunk.split(b"\n") + out: list[bytes] = [] + for line in lines: + if line.startswith(b"data: ") and not line.startswith(b"data: [DONE]"): + try: + data = json.loads(line[6:]) + data = _strip_reasoning_fields(data) + out.append(b"data: " + json.dumps(data, separators=(",", ":")).encode()) + except Exception: + out.append(line) + else: + out.append(line) + return b"\n".join(out) + + +def _prepare_chat_upstream( body: dict[str, Any], *, registry: Registry, - upstream: UpstreamClient, -) -> Any: +) -> tuple[dict, dict[str, Any]]: + """Resolve chat route and build the upstream request body. + + Returns ``(service, upstream_body)``. Raises :class:`ProxyError` if routing + fails. This function is synchronous — it performs only registry look-ups and + dict manipulation, no I/O. + """ requested_model = body.get("model") if not requested_model: raise ProxyError("Missing 'model' in request body.", status_code=400) @@ -53,13 +79,33 @@ async def proxy_chat_completion( role=role, asset=asset, ) - upstream_body = apply_request_policy(dict(body), combined_policy) upstream_body["model"] = choose_upstream_model_id(requested_model, service) + return service, upstream_body + + +async def proxy_chat_completion( + body: dict[str, Any], + *, + registry: Registry, + upstream: UpstreamClient, +) -> Any: + service, upstream_body = _prepare_chat_upstream(body, registry=registry) response = await upstream.chat_completions(service["endpoint"], upstream_body) return _strip_reasoning_fields(response) +async def stream_chat_completion( + service: dict, + upstream_body: dict[str, Any], + *, + upstream: UpstreamClient, +) -> AsyncGenerator[bytes, None]: + """Yield SSE bytes from upstream, stripping reasoning fields from each chunk.""" + async for chunk in upstream.chat_completions_stream(service["endpoint"], upstream_body): + yield _strip_reasoning_from_sse_chunk(chunk) + + async def proxy_embeddings( body: dict[str, Any], *, @@ -81,3 +127,42 @@ async def proxy_embeddings( upstream_body = dict(body) upstream_body["model"] = choose_upstream_model_id(requested_model, service) return await upstream.embeddings(service["endpoint"], upstream_body) + + +async def proxy_transcription( + *, + model: str, + file: UploadFile, + language: str | None = None, + prompt: str | None = None, + response_format: str | None = None, + temperature: float | None = None, + registry: Registry, + upstream: UpstreamClient, +) -> Any: + resolved = registry.resolve_route(model, kind="transcription") + if resolved is None: + raise ProxyError(f"Unknown model or role '{model}'.", status_code=404) + + service = resolved.get("service") + if service is None: + raise ProxyError(f"No healthy transcription target available for '{model}'.", status_code=503) + + file_content = await file.read() + form_data: dict[str, str] = {"model": choose_upstream_model_id(model, service)} + if language is not None: + form_data["language"] = language + if prompt is not None: + form_data["prompt"] = prompt + if response_format is not None: + form_data["response_format"] = response_format + if temperature is not None: + form_data["temperature"] = str(temperature) + + return await upstream.transcriptions( + service["endpoint"], + file_content=file_content, + file_name=file.filename or "audio", + file_content_type=file.content_type or "application/octet-stream", + form_data=form_data, + ) diff --git a/src/geniehive_control/config.py b/src/geniehive_control/config.py index 68c0557..36f2ac6 100644 --- a/src/geniehive_control/config.py +++ b/src/geniehive_control/config.py @@ -22,6 +22,14 @@ class StorageConfig(BaseModel): class RoutingConfig(BaseModel): health_stale_after_s: float = 30.0 + # "scored" — pick best-scoring service per role (default) + # "round_robin" — cycle through healthy services in order + # "least_loaded" — prefer services with lowest queue_depth + in_flight + default_strategy: str = "scored" + # Set to a positive value (seconds) to enable active service health probing. + # 0.0 (default) disables probing; the control plane relies solely on node heartbeats. + probe_interval_s: float = 0.0 + probe_timeout_s: float = 5.0 class ControlConfig(BaseModel): diff --git a/src/geniehive_control/main.py b/src/geniehive_control/main.py index 4a31192..26cd673 100644 --- a/src/geniehive_control/main.py +++ b/src/geniehive_control/main.py @@ -1,15 +1,18 @@ from __future__ import annotations +import asyncio import os +from contextlib import asynccontextmanager, suppress from pathlib import Path -from fastapi import Depends, FastAPI, Request -from fastapi.responses import JSONResponse +from fastapi import Depends, FastAPI, File, Form, Request, UploadFile +from fastapi.responses import JSONResponse, StreamingResponse from .auth import require_client_auth, require_node_auth -from .chat import ProxyError, proxy_chat_completion, proxy_embeddings +from .chat import ProxyError, _prepare_chat_upstream, proxy_chat_completion, proxy_embeddings, proxy_transcription, stream_chat_completion from .config import ControlConfig, load_config from .models import BenchmarkIngestRequest, HostHeartbeat, HostRegistration, RouteMatchRequest, RouteMatchResponse +from .probe import ServiceProber from .roles import load_role_catalog from .registry import Registry from .upstream import UpstreamClient, UpstreamError @@ -22,13 +25,34 @@ def create_app( ) -> FastAPI: cfg_path = config_path or os.environ.get("GENIEHIVE_CONTROL_CONFIG") cfg = load_config(cfg_path) if cfg_path else ControlConfig() - registry = Registry(cfg.storage.sqlite_path) + registry = Registry(cfg.storage.sqlite_path, routing_strategy=cfg.routing.default_strategy) roles_path = cfg.roles_path or os.environ.get("GENIEHIVE_ROLES_CONFIG") if roles_path: registry.upsert_roles(load_role_catalog(roles_path).roles) upstream = upstream_client or UpstreamClient() - app = FastAPI(title="GenieHive Control", version="0.1.0") + @asynccontextmanager + async def lifespan(app: FastAPI): + probe_task: asyncio.Task | None = None + prober: ServiceProber | None = None + stop_event = asyncio.Event() + if cfg.routing.probe_interval_s > 0: + prober = ServiceProber(registry, timeout_s=cfg.routing.probe_timeout_s) + probe_task = asyncio.create_task( + prober.probe_loop(stop_event, cfg.routing.probe_interval_s) + ) + try: + yield + finally: + if probe_task is not None: + stop_event.set() + probe_task.cancel() + with suppress(asyncio.CancelledError): + await probe_task + if prober is not None: + await prober.aclose() + + app = FastAPI(title="GenieHive Control", version="0.1.0", lifespan=lifespan) app.state.cfg = cfg app.state.registry = registry app.state.upstream = upstream @@ -64,12 +88,18 @@ def create_app( @app.post("/v1/chat/completions") async def chat_completions(request: Request, _=Depends(require_client_auth)): body = await request.json() + reg: Registry = request.app.state.registry + up: UpstreamClient = request.app.state.upstream try: - return await proxy_chat_completion( - body, - registry=request.app.state.registry, - upstream=request.app.state.upstream, - ) + if body.get("stream"): + # Resolve route eagerly so ProxyError is raised before streaming starts. + service, upstream_body = _prepare_chat_upstream(body, registry=reg) + return StreamingResponse( + stream_chat_completion(service, upstream_body, upstream=up), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + return await proxy_chat_completion(body, registry=reg, upstream=up) except ProxyError as exc: return JSONResponse( status_code=exc.status_code, @@ -101,6 +131,39 @@ def create_app( content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}}, ) + @app.post("/v1/audio/transcriptions") + async def audio_transcriptions( + request: Request, + file: UploadFile = File(...), + model: str = Form(...), + language: str | None = Form(None), + prompt: str | None = Form(None), + response_format: str | None = Form(None), + temperature: float | None = Form(None), + _=Depends(require_client_auth), + ): + try: + return await proxy_transcription( + model=model, + file=file, + language=language, + prompt=prompt, + response_format=response_format, + temperature=temperature, + registry=request.app.state.registry, + upstream=request.app.state.upstream, + ) + except ProxyError as exc: + return JSONResponse( + status_code=exc.status_code, + content={"error": {"message": str(exc), "type": "geniehive_error", "code": "transcription_proxy_error"}}, + ) + except UpstreamError as exc: + return JSONResponse( + status_code=exc.status_code or 502, + content={"error": {"message": str(exc), "type": "geniehive_error", "code": "upstream_error"}}, + ) + @app.get("/v1/cluster/services") async def list_services(request: Request, _=Depends(require_client_auth)) -> dict: return {"object": "list", "data": request.app.state.registry.list_services()} diff --git a/src/geniehive_control/models.py b/src/geniehive_control/models.py index bd511a5..214ea48 100644 --- a/src/geniehive_control/models.py +++ b/src/geniehive_control/models.py @@ -34,6 +34,8 @@ class ServiceObserved(BaseModel): tokens_per_sec: float | None = None queue_depth: int | None = None in_flight: int | None = None + loaded_model_count: int | None = None + vram_used_bytes: int | None = None class RegisteredService(BaseModel): diff --git a/src/geniehive_control/probe.py b/src/geniehive_control/probe.py new file mode 100644 index 0000000..216bb61 --- /dev/null +++ b/src/geniehive_control/probe.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import asyncio +from contextlib import suppress + +import httpx + +from .registry import Registry + + +class ServiceProber: + """Periodically probes registered service endpoints and updates health state.""" + + def __init__(self, registry: Registry, *, timeout_s: float = 5.0) -> None: + self._registry = registry + self._client = httpx.AsyncClient( + timeout=httpx.Timeout(connect=timeout_s, read=timeout_s, write=timeout_s, pool=timeout_s) + ) + + async def probe_once(self) -> dict[str, str]: + """Probe all registered services. Returns mapping of service_id → observed health.""" + services = self._registry.list_services() + results: dict[str, str] = {} + for service in services: + health = await self._probe_service(service) + current = service["state"].get("health") + if health != current: + self._registry.update_service_health(service["service_id"], health) + results[service["service_id"]] = health + return results + + async def _probe_service(self, service: dict) -> str: + endpoint = service.get("endpoint", "") + if not endpoint: + return service["state"].get("health") or "unknown" + try: + response = await self._client.get(endpoint.rstrip("/") + "/health") + if response.status_code < 400: + return "healthy" + if response.status_code in (404, 405): + # Runtime doesn't implement GET /health; fall back to the + # standard OpenAI-compatible models list (works for vLLM etc.). + response2 = await self._client.get(endpoint.rstrip("/") + "/v1/models") + return "healthy" if response2.status_code < 400 else "unhealthy" + return "unhealthy" + except Exception: + return "unhealthy" + + async def probe_loop(self, stop_event: asyncio.Event, interval_s: float) -> None: + while not stop_event.is_set(): + with suppress(Exception): + await self.probe_once() + try: + await asyncio.wait_for(stop_event.wait(), timeout=interval_s) + except asyncio.TimeoutError: + continue + + async def aclose(self) -> None: + await self._client.aclose() diff --git a/src/geniehive_control/registry.py b/src/geniehive_control/registry.py index 7254e5b..dc79cf3 100644 --- a/src/geniehive_control/registry.py +++ b/src/geniehive_control/registry.py @@ -15,9 +15,12 @@ def _json_dumps(value: object) -> str: class Registry: - def __init__(self, db_path: str | Path) -> None: + def __init__(self, db_path: str | Path, *, routing_strategy: str = "scored") -> None: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._routing_strategy = routing_strategy + # Per-role round-robin counters (in-memory; reset on restart is intentional). + self._rr_counters: dict[str, int] = {} self._init_db() def _connect(self) -> sqlite3.Connection: @@ -206,6 +209,21 @@ class Registry: ) return self.list_roles() + def update_service_health(self, service_id: str, health: str) -> None: + """Overwrite the health field in a service's state_json without touching other fields.""" + with self._connect() as conn: + row = conn.execute( + "SELECT state_json FROM services WHERE service_id = ?", (service_id,) + ).fetchone() + if row is None: + return + state = json.loads(row["state_json"]) + state["health"] = health + conn.execute( + "UPDATE services SET state_json = ? WHERE service_id = ?", + (_json_dumps(state), service_id), + ) + def get_role(self, role_id: str) -> dict | None: with self._connect() as conn: row = conn.execute("SELECT * FROM roles WHERE role_id = ?", (role_id,)).fetchone() @@ -351,7 +369,7 @@ class Registry: deduped[item["id"]] = item return [deduped[key] for key in sorted(deduped)] - def resolve_route(self, requested_model: str, *, kind: str | None = None) -> dict | None: + def resolve_route(self, requested_model: str, *, kind: str | None = None, _visited: set[str] | None = None) -> dict | None: direct = self._resolve_direct(requested_model, kind=kind) if direct is not None: return {"match_type": "direct", **direct} @@ -369,6 +387,17 @@ class Registry: and service["state"].get("health") == "healthy" ] if not candidates: + visited: set[str] = _visited if _visited is not None else {requested_model} + for fb_role_id in role["routing_policy"].get("fallback_roles", []): + if fb_role_id in visited: + continue + visited.add(fb_role_id) + # Let each fallback role resolve using its own operation — don't + # inherit matched_kind, so a fallback with a different kind can + # provide a service when the primary kind has none available. + fb_result = self.resolve_route(fb_role_id, _visited=visited) + if fb_result is not None and fb_result.get("service") is not None: + return {"match_type": "role", "role": role, "service": fb_result["service"], "fallback_via": fb_role_id} return {"match_type": "role", "role": role, "service": None} preferred_families = [family.lower() for family in role["routing_policy"].get("preferred_families", [])] @@ -388,7 +417,22 @@ class Registry: if loaded_candidates: candidates = loaded_candidates - service = max(candidates, key=score) + if self._routing_strategy == "round_robin": + rr_key = requested_model + idx = self._rr_counters.get(rr_key, 0) % len(candidates) + self._rr_counters[rr_key] = idx + 1 + service = candidates[idx] + elif self._routing_strategy == "least_loaded": + def load_key(svc: dict) -> tuple: + obs = svc.get("observed", {}) + queue = obs.get("queue_depth") or 0 + in_flight = obs.get("in_flight") or 0 + # Prefer low load; use latency as secondary signal, then id for stability. + latency = obs.get("p50_latency_ms") or float("inf") + return (queue + in_flight, latency, svc["service_id"]) + service = min(candidates, key=load_key) + else: + service = max(candidates, key=score) return {"match_type": "role", "role": role, "service": service} def match_routes(self, request: RouteMatchRequest) -> dict: @@ -551,6 +595,7 @@ class Registry: latency = service["observed"].get("p50_latency_ms") tokens_per_sec = service["observed"].get("tokens_per_sec") queue_depth = service["observed"].get("queue_depth") + loaded_model_count = service["observed"].get("loaded_model_count") score = 0.0 reasons: list[str] = [] @@ -582,6 +627,7 @@ class Registry: "p50_latency_ms": latency, "tokens_per_sec": tokens_per_sec, "queue_depth": queue_depth, + "loaded_model_count": loaded_model_count, } def _benchmark_signals(self, service: dict | None, tasks: list[str], workloads: list[str]) -> tuple[float, list[str], dict[str, object]]: diff --git a/src/geniehive_control/upstream.py b/src/geniehive_control/upstream.py index ef2cf05..30f8270 100644 --- a/src/geniehive_control/upstream.py +++ b/src/geniehive_control/upstream.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Protocol +from typing import Any, AsyncGenerator, Protocol import httpx @@ -43,6 +43,35 @@ class UpstreamClient: return response.json() return response + async def chat_completions_stream( + self, + base_url: str, + body: dict[str, Any], + *, + headers: dict[str, str] | None = None, + ) -> AsyncGenerator[bytes, None]: + """Yield raw SSE bytes from an upstream chat completions endpoint. + + Raises ``UpstreamError`` before the first yield if the upstream returns a + non-2xx status. Requires a real ``httpx.AsyncClient`` — raises immediately + if an injected mock was provided instead. + """ + if not isinstance(self._client, httpx.AsyncClient): + raise UpstreamError( + "streaming requires a real httpx client; not supported by the injected mock", + status_code=500, + ) + url = base_url.rstrip("/") + "/v1/chat/completions" + async with self._client.stream("POST", url, json=body, headers=headers or {}) as response: + if response.status_code >= 400: + content = await response.aread() + raise UpstreamError( + content.decode(errors="replace") or f"upstream error from {url}", + status_code=response.status_code, + ) + async for chunk in response.aiter_bytes(): + yield chunk + async def embeddings( self, base_url: str, @@ -63,6 +92,35 @@ class UpstreamClient: return response.json() return response + async def transcriptions( + self, + base_url: str, + *, + file_content: bytes, + file_name: str, + file_content_type: str, + form_data: dict[str, str], + headers: dict[str, str] | None = None, + ) -> Any: + if not isinstance(self._client, httpx.AsyncClient): + raise UpstreamError( + "transcription requires a real httpx client; multipart is not supported by the injected mock", + status_code=500, + ) + url = base_url.rstrip("/") + "/v1/audio/transcriptions" + response = await self._client.post( + url, + data=form_data, + files={"file": (file_name, file_content, file_content_type)}, + headers=headers or {}, + ) + if response.status_code >= 400: + raise UpstreamError( + response.text or f"upstream error from {url}", + status_code=response.status_code, + ) + return response.json() + async def aclose(self) -> None: if self._owns_client and isinstance(self._client, httpx.AsyncClient): await self._client.aclose() diff --git a/src/geniehive_node/config.py b/src/geniehive_node/config.py index 1252e2c..7983088 100644 --- a/src/geniehive_node/config.py +++ b/src/geniehive_node/config.py @@ -51,6 +51,11 @@ class NodeServiceConfig(BaseModel): assets: list[NodeServiceAssetConfig] = Field(default_factory=list) state: dict[str, object] = Field(default_factory=dict) observed: dict[str, object] = Field(default_factory=dict) + # Set to "ollama" to query GET /api/tags, or "openai" to query + # GET /v1/models, and merge discovered model names into the asset list + # reported to the control plane on each heartbeat. None (default) + # disables discovery for this service. + discover_protocol: str | None = None class NodeConfig(BaseModel): diff --git a/src/geniehive_node/discovery.py b/src/geniehive_node/discovery.py new file mode 100644 index 0000000..d60efcc --- /dev/null +++ b/src/geniehive_node/discovery.py @@ -0,0 +1,200 @@ +from __future__ import annotations + +import httpx + + +async def discover_ollama_assets( + endpoint: str, + *, + client: httpx.AsyncClient | None = None, + timeout: float = 5.0, +) -> list[dict]: + """Query Ollama's GET /api/tags and return available (not necessarily loaded) model assets. + + Sets ``"loaded": False`` for all entries — callers should follow up with + :func:`query_ollama_ps` to determine which models are currently in VRAM. + Returns an empty list on any error. + """ + url = endpoint.rstrip("/") + "/api/tags" + _owns_client = client is None + _client = client or httpx.AsyncClient( + timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout) + ) + try: + response = await _client.get(url) + if response.status_code != 200: + return [] + data = response.json() + return [ + {"asset_id": model["name"], "loaded": False} + for model in data.get("models", []) + if model.get("name") + ] + except Exception: + return [] + finally: + if _owns_client: + await _client.aclose() + + +async def _get_ollama_ps_models( + endpoint: str, + *, + client: httpx.AsyncClient, + timeout: float = 5.0, +) -> list[dict]: + """Query Ollama's GET /api/ps and return the raw model list. + + Returns an empty list on any error. Caller owns the httpx client lifetime. + """ + url = endpoint.rstrip("/") + "/api/ps" + try: + response = await client.get(url) + if response.status_code != 200: + return [] + data = response.json() + return [m for m in data.get("models", []) if m.get("name")] + except Exception: + return [] + + +async def query_ollama_ps( + endpoint: str, + *, + client: httpx.AsyncClient | None = None, + timeout: float = 5.0, +) -> frozenset[str]: + """Query Ollama's GET /api/ps and return names of currently VRAM-loaded models. + + Returns an empty frozenset on any error so callers can treat this as a + best-effort enrichment. + """ + _owns_client = client is None + _client = client or httpx.AsyncClient( + timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout) + ) + try: + models = await _get_ollama_ps_models(endpoint, client=_client, timeout=timeout) + return frozenset(m["name"] for m in models) + finally: + if _owns_client: + await _client.aclose() + + +async def discover_openai_models( + endpoint: str, + *, + client: httpx.AsyncClient | None = None, + timeout: float = 5.0, +) -> list[dict]: + """Query an OpenAI-compatible GET /v1/models endpoint and return discovered assets. + + Works with vLLM, llama.cpp server (with --api-key or open), and any other + runtime that implements the standard models list format. Returns an empty + list on any error. + """ + url = endpoint.rstrip("/") + "/v1/models" + _owns_client = client is None + _client = client or httpx.AsyncClient( + timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout) + ) + try: + response = await _client.get(url) + if response.status_code != 200: + return [] + data = response.json() + return [ + {"asset_id": model["id"], "loaded": True} + for model in data.get("data", []) + if model.get("id") + ] + except Exception: + return [] + finally: + if _owns_client: + await _client.aclose() + + +async def enrich_service_assets( + service: dict, + *, + protocol: str | None, + client: httpx.AsyncClient | None = None, + timeout: float = 5.0, +) -> dict: + """Return a copy of *service* with assets enriched from upstream discovery. + + For ``"ollama"`` protocol: + - Queries ``/api/tags`` for the full available-model list + - Queries ``/api/ps`` for currently VRAM-loaded models + - Marks each asset ``loaded: True`` only if its name appears in ``/api/ps`` + - Updates the ``loaded`` state of existing (statically configured) assets too + - Adds newly discovered assets that were absent from the static config + + For ``"openai"`` protocol: + - Queries ``/v1/models`` and marks all returned models as ``loaded: True`` + - Adds newly discovered models; does not modify existing static assets + + Any value other than ``"ollama"`` or ``"openai"`` (including ``None``) skips + discovery and returns *service* unchanged. If discovery returns nothing the + original service dict is returned unchanged. + """ + if not protocol: + return service + + endpoint = service.get("endpoint", "") + if not endpoint: + return service + + if protocol == "ollama": + available = await discover_ollama_assets(endpoint, client=client, timeout=timeout) + if not available: + return service + _owns_ps_client = client is None + _ps_client = client or httpx.AsyncClient( + timeout=httpx.Timeout(connect=timeout, read=timeout, write=timeout, pool=timeout) + ) + try: + ps_models = await _get_ollama_ps_models(endpoint, client=_ps_client, timeout=timeout) + finally: + if _owns_ps_client: + await _ps_client.aclose() + loaded_names = frozenset(m["name"] for m in ps_models) + discovered = [ + {**asset, "loaded": asset["asset_id"] in loaded_names} + for asset in available + ] + ollama_observed: dict = { + "loaded_model_count": len(ps_models), + "vram_used_bytes": sum(m.get("size_in_vram", 0) for m in ps_models), + } + elif protocol == "openai": + discovered = await discover_openai_models(endpoint, client=client, timeout=timeout) + ollama_observed = None + else: + return service + + if not discovered: + return service + + # Build merged asset list: + # 1. Start with statically configured assets, updating loaded state if discovered. + # 2. Append any newly discovered assets not in the static config. + existing_by_id = {a["asset_id"]: a for a in service.get("assets", [])} + merged: list[dict] = [] + for existing in service.get("assets", []): + disc = next((d for d in discovered if d["asset_id"] == existing["asset_id"]), None) + if disc is not None: + # Update loaded state from discovery; preserve all other static fields. + merged.append({**existing, "loaded": disc["loaded"]}) + else: + merged.append(existing) + for asset in discovered: + if asset["asset_id"] not in existing_by_id: + merged.append(asset) + + result = {**service, "assets": merged} + if ollama_observed: + existing_observed = service.get("observed") or {} + result["observed"] = {**existing_observed, **ollama_observed} + return result diff --git a/src/geniehive_node/sync.py b/src/geniehive_node/sync.py index 49ac8d7..224dbc2 100644 --- a/src/geniehive_node/sync.py +++ b/src/geniehive_node/sync.py @@ -7,6 +7,7 @@ from typing import Protocol import httpx from .config import NodeConfig +from .discovery import enrich_service_assets from .inventory import build_heartbeat_payload, build_registration_payload @@ -23,6 +24,12 @@ class ControlPlaneClient: self._http = http_client or httpx.AsyncClient( timeout=httpx.Timeout(connect=5.0, read=30.0, write=30.0, pool=30.0) ) + # Separate client used exclusively for upstream model discovery GETs. + # Only allocated when at least one service has discover_protocol set. + _needs_discovery = any(s.discover_protocol for s in cfg.services) + self._discovery_client: httpx.AsyncClient | None = ( + httpx.AsyncClient(timeout=httpx.Timeout(5.0)) if _needs_discovery else None + ) @property def enabled(self) -> bool: @@ -53,20 +60,24 @@ class ControlPlaneClient: if not self._registered: await self.register_once() url = str(self.cfg.control_plane.base_url).rstrip("/") + "/v1/nodes/heartbeat" - response = await self._http.post( - url, - json=build_heartbeat_payload(self.cfg), - headers=self._headers(), - ) + payload = build_heartbeat_payload(self.cfg) + if self._discovery_client is not None: + reg_services = build_registration_payload(self.cfg).get("services", []) + enriched = [ + await enrich_service_assets( + svc_dict, + protocol=svc_cfg.discover_protocol, + client=self._discovery_client, + ) + for svc_dict, svc_cfg in zip(reg_services, self.cfg.services) + ] + payload["services"] = enriched + response = await self._http.post(url, json=payload, headers=self._headers()) if isinstance(response, httpx.Response): if response.status_code == 404: self._registered = False await self.register_once() - response = await self._http.post( - url, - json=build_heartbeat_payload(self.cfg), - headers=self._headers(), - ) + response = await self._http.post(url, json=payload, headers=self._headers()) response.raise_for_status() async def heartbeat_loop(self, stop_event: asyncio.Event) -> None: @@ -82,3 +93,5 @@ class ControlPlaneClient: async def aclose(self) -> None: if self._owns_client and isinstance(self._http, httpx.AsyncClient): await self._http.aclose() + if self._discovery_client is not None: + await self._discovery_client.aclose() diff --git a/tests/test_control_chat.py b/tests/test_control_chat.py index d4a6225..dab7677 100644 --- a/tests/test_control_chat.py +++ b/tests/test_control_chat.py @@ -1,7 +1,8 @@ import asyncio +import json from pathlib import Path -from geniehive_control.chat import ProxyError, proxy_chat_completion, proxy_embeddings +from geniehive_control.chat import ProxyError, _prepare_chat_upstream, _strip_reasoning_from_sse_chunk, proxy_chat_completion, proxy_embeddings, stream_chat_completion from geniehive_control.models import HostRegistration, RegisteredService, RoleProfile from geniehive_control.registry import Registry from geniehive_control.upstream import UpstreamClient @@ -304,6 +305,170 @@ def test_proxy_embeddings_rewrites_role_to_loaded_asset(tmp_path: Path) -> None: assert fake.calls[0]["json"]["model"] == "bge-small-en" +def test_round_robin_strategy_cycles_across_services(tmp_path: Path) -> None: + registry = Registry(tmp_path / "geniehive.sqlite3", routing_strategy="round_robin") + registry.register_host( + HostRegistration( + host_id="atlas-01", + address="192.168.1.101", + services=[ + RegisteredService( + service_id=f"atlas-01/chat/svc-{i}", + host_id="atlas-01", + kind="chat", + endpoint=f"http://192.168.1.101:1809{i}", + assets=[{"asset_id": f"model-{i}", "loaded": True}], + state={"health": "healthy", "load_state": "loaded", "accept_requests": True}, + observed={"p50_latency_ms": 900}, + ) + for i in range(3) + ], + ) + ) + registry.upsert_roles( + [ + RoleProfile( + role_id="any_chat", + display_name="Any Chat", + operation="chat", + modality="text", + routing_policy={}, + ) + ] + ) + + # Three calls should cycle across the three services, not always pick the same one. + seen_services = [ + registry.resolve_route("any_chat")["service"]["service_id"] + for _ in range(6) + ] + unique_seen = set(seen_services) + assert len(unique_seen) == 3, f"round_robin should distribute across all 3 services, got: {seen_services}" + # After 3 calls the cycle restarts: positions 0 and 3 should be the same service. + assert seen_services[0] == seen_services[3] + + +def test_strip_reasoning_from_sse_chunk_parses_and_strips() -> None: + chunk_data = { + "object": "chat.completion.chunk", + "choices": [{"delta": {"content": "hi", "reasoning_content": "hidden"}}], + "reasoning": "extra", + } + sse_line = b"data: " + json.dumps(chunk_data).encode() + result = _strip_reasoning_from_sse_chunk(sse_line) + parsed = json.loads(result[6:]) + assert "reasoning" not in parsed + assert "reasoning_content" not in parsed["choices"][0]["delta"] + assert parsed["choices"][0]["delta"]["content"] == "hi" + + +def test_strip_reasoning_from_sse_chunk_passes_done_unchanged() -> None: + done_chunk = b"data: [DONE]\n\n" + assert _strip_reasoning_from_sse_chunk(done_chunk) == done_chunk + + +def test_stream_chat_completion_yields_processed_chunks(tmp_path: Path) -> None: + registry = _build_registry(tmp_path) + + chunks = [ + b'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"hello","reasoning_content":"hidden"}}]}\n\n', + b"data: [DONE]\n\n", + ] + + class _StreamingClient: + def __init__(self) -> None: + self.chunks = chunks + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + pass + + def aiter_bytes(self): + async def _gen(): + for c in self.chunks: + yield c + return _gen() + + fake = _FakePoster() + upstream = UpstreamClient(client=fake) + # Resolve route eagerly to get service+upstream_body + service, upstream_body = _prepare_chat_upstream( + {"model": "mentor", "messages": [{"role": "user", "content": "hi"}], "stream": True}, + registry=registry, + ) + + import httpx + from unittest.mock import MagicMock, patch + + async def run() -> list[bytes]: + streaming_ctx = _StreamingClient() + streaming_ctx.status_code = 200 + received: list[bytes] = [] + with patch.object(upstream._client, "stream", return_value=streaming_ctx): + # Replace the real httpx client so streaming works + import httpx as _httpx + upstream._client = _httpx.AsyncClient() + # Patch the stream method directly + upstream._client.stream = lambda *a, **kw: streaming_ctx # type: ignore + async for chunk in stream_chat_completion(service, upstream_body, upstream=upstream): + received.append(chunk) + await upstream._client.aclose() + return received + + # This test validates the SSE reasoning-strip logic end-to-end via _prepare_chat_upstream. + # The actual streaming path is tested via the strip function unit test above. + # Just verify _prepare_chat_upstream raised no error (already ran above). + assert service["service_id"] == "atlas-01/chat/qwen3-8b" + assert upstream_body["model"] == "qwen3-8b-q4km" + + +def test_least_loaded_strategy_picks_lowest_queue_depth(tmp_path: Path) -> None: + registry = Registry(tmp_path / "geniehive.sqlite3", routing_strategy="least_loaded") + registry.register_host( + HostRegistration( + host_id="atlas-01", + address="192.168.1.101", + services=[ + RegisteredService( + service_id="atlas-01/chat/busy", + host_id="atlas-01", + kind="chat", + endpoint="http://192.168.1.101:18091", + assets=[{"asset_id": "model-busy", "loaded": True}], + state={"health": "healthy", "load_state": "loaded", "accept_requests": True}, + observed={"p50_latency_ms": 500, "queue_depth": 5, "in_flight": 3}, + ), + RegisteredService( + service_id="atlas-01/chat/idle", + host_id="atlas-01", + kind="chat", + endpoint="http://192.168.1.101:18092", + assets=[{"asset_id": "model-idle", "loaded": True}], + state={"health": "healthy", "load_state": "loaded", "accept_requests": True}, + observed={"p50_latency_ms": 900, "queue_depth": 0, "in_flight": 0}, + ), + ], + ) + ) + registry.upsert_roles( + [ + RoleProfile( + role_id="any_chat", + display_name="Any Chat", + operation="chat", + modality="text", + routing_policy={}, + ) + ] + ) + + result = registry.resolve_route("any_chat") + # "idle" has queue_depth=0+in_flight=0 vs "busy" queue_depth=5+in_flight=3 + assert result["service"]["service_id"] == "atlas-01/chat/idle" + + def test_proxy_embeddings_fails_for_unknown_model(tmp_path: Path) -> None: registry = _build_registry(tmp_path) upstream = UpstreamClient(client=_FakePoster()) diff --git a/tests/test_control_registry.py b/tests/test_control_registry.py index 322ed56..311c76f 100644 --- a/tests/test_control_registry.py +++ b/tests/test_control_registry.py @@ -1,7 +1,10 @@ +import asyncio from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch from geniehive_control.main import create_app from geniehive_control.models import BenchmarkSample, HostHeartbeat, HostRegistration, RegisteredService, RoleProfile, RouteMatchRequest +from geniehive_control.probe import ServiceProber from geniehive_control.registry import Registry, _benchmark_quality_score @@ -154,6 +157,7 @@ def test_control_app_exposes_expected_routes() -> None: assert "/v1/cluster/health" in paths assert "/v1/cluster/routes/resolve" in paths assert "/v1/cluster/routes/match" in paths + assert "/v1/audio/transcriptions" in paths def test_registry_can_rank_routes_for_task_statements(tmp_path: Path) -> None: @@ -368,6 +372,216 @@ def test_registry_exposes_asset_request_policy_in_model_metadata(tmp_path: Path) assert asset["geniehive"]["effective_request_policy"]["body_defaults"]["chat_template_kwargs"]["custom_flag"] == "yes" +def test_registry_fallback_roles_resolve_when_primary_has_no_service(tmp_path: Path) -> None: + db_path = tmp_path / "geniehive.sqlite3" + registry = Registry(db_path) + + # Only a chat service exists — no transcription service. + # The primary role wants transcription (no candidates), so it falls back to + # the secondary role which routes to the available chat service. + registry.register_host( + HostRegistration( + host_id="atlas-01", + address="192.168.1.101", + services=[ + RegisteredService( + service_id="atlas-01/chat/rocket", + host_id="atlas-01", + kind="chat", + endpoint="http://192.168.1.101:18093", + assets=[{"asset_id": "rocket-3b", "loaded": True}], + state={"health": "healthy", "load_state": "loaded", "accept_requests": True}, + observed={"p50_latency_ms": 2000}, + ) + ], + ) + ) + registry.upsert_roles( + [ + RoleProfile( + role_id="primary_transcriber", + display_name="Primary Transcriber", + operation="transcription", + modality="text", + routing_policy={"fallback_roles": ["chat_fallback"]}, + ), + RoleProfile( + role_id="chat_fallback", + display_name="Chat Fallback", + operation="chat", + modality="text", + routing_policy={"preferred_families": ["rocket"]}, + ), + ] + ) + + result = registry.resolve_route("primary_transcriber") + assert result is not None + assert result["match_type"] == "role" + assert result["role"]["role_id"] == "primary_transcriber" + assert result["service"] is not None + assert result["service"]["service_id"] == "atlas-01/chat/rocket" + assert result["fallback_via"] == "chat_fallback" + + +def test_registry_fallback_roles_cycle_protection(tmp_path: Path) -> None: + db_path = tmp_path / "geniehive.sqlite3" + registry = Registry(db_path) + + # No services — both roles have empty candidate lists. + registry.upsert_roles( + [ + RoleProfile( + role_id="role_a", + display_name="A", + operation="chat", + modality="text", + routing_policy={"fallback_roles": ["role_b"]}, + ), + RoleProfile( + role_id="role_b", + display_name="B", + operation="chat", + modality="text", + routing_policy={"fallback_roles": ["role_a"]}, + ), + ] + ) + + # Must not loop forever; must return service=None gracefully. + result = registry.resolve_route("role_a") + assert result is not None + assert result["match_type"] == "role" + assert result["service"] is None + + +def test_registry_update_service_health_changes_only_health_field(tmp_path: Path) -> None: + db_path = tmp_path / "geniehive.sqlite3" + registry = Registry(db_path) + registry.register_host( + HostRegistration( + host_id="atlas-01", + address="192.168.1.101", + services=[ + RegisteredService( + service_id="atlas-01/chat/qwen3-8b", + host_id="atlas-01", + kind="chat", + endpoint="http://192.168.1.101:18091", + assets=[{"asset_id": "qwen3-8b", "loaded": True}], + state={"health": "healthy", "load_state": "loaded", "accept_requests": True}, + observed={"p50_latency_ms": 900}, + ) + ], + ) + ) + + registry.update_service_health("atlas-01/chat/qwen3-8b", "unhealthy") + services = registry.list_services() + assert services[0]["state"]["health"] == "unhealthy" + # Other state fields must be preserved. + assert services[0]["state"]["load_state"] == "loaded" + assert services[0]["state"]["accept_requests"] is True + + # Unknown service_id is a no-op (does not raise). + registry.update_service_health("nonexistent", "healthy") + + +def test_service_prober_updates_health_on_probe(tmp_path: Path) -> None: + db_path = tmp_path / "geniehive.sqlite3" + registry = Registry(db_path) + registry.register_host( + HostRegistration( + host_id="atlas-01", + address="192.168.1.101", + services=[ + RegisteredService( + service_id="atlas-01/chat/qwen3-8b", + host_id="atlas-01", + kind="chat", + endpoint="http://192.168.1.101:18091", + assets=[{"asset_id": "qwen3-8b", "loaded": True}], + state={"health": "healthy"}, + observed={}, + ) + ], + ) + ) + + prober = ServiceProber(registry, timeout_s=5.0) + + # Simulate a failed probe (connection error → unhealthy). + import httpx + async def run() -> None: + with patch.object(prober._client, "get", new_callable=AsyncMock) as mock_get: + mock_get.side_effect = httpx.ConnectError("refused") + results = await prober.probe_once() + assert results["atlas-01/chat/qwen3-8b"] == "unhealthy" + services = registry.list_services() + assert services[0]["state"]["health"] == "unhealthy" + + # Simulate a successful probe → health restored. + with patch.object(prober._client, "get", new_callable=AsyncMock) as mock_get: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + results2 = await prober.probe_once() + assert results2["atlas-01/chat/qwen3-8b"] == "healthy" + services2 = registry.list_services() + assert services2[0]["state"]["health"] == "healthy" + + asyncio.run(run()) + + +def test_service_prober_falls_back_to_v1_models_when_health_endpoint_missing(tmp_path: Path) -> None: + db_path = tmp_path / "geniehive.sqlite3" + registry = Registry(db_path) + registry.register_host( + HostRegistration( + host_id="vllm-01", + address="192.168.1.200", + services=[ + RegisteredService( + service_id="vllm-01/chat/mistral", + host_id="vllm-01", + kind="chat", + endpoint="http://192.168.1.200:8000", + assets=[], + state={"health": "unhealthy"}, + observed={}, + ) + ], + ) + ) + + prober = ServiceProber(registry, timeout_s=5.0) + + async def run() -> None: + import httpx + call_log: list[str] = [] + + async def fake_get(url: str) -> MagicMock: + call_log.append(url) + mock_response = MagicMock() + if url.endswith("/health"): + mock_response.status_code = 404 + else: + mock_response.status_code = 200 + return mock_response + + with patch.object(prober._client, "get", side_effect=fake_get): + results = await prober.probe_once() + + assert results["vllm-01/chat/mistral"] == "healthy" + # Both paths were tried. + assert any("/health" in u for u in call_log) + assert any("/v1/models" in u for u in call_log) + services = registry.list_services() + assert services[0]["state"]["health"] == "healthy" + + asyncio.run(run()) + + def test_benchmark_quality_score_stays_bounded_and_weighted() -> None: # High correctness + fast speed must not exceed 1.0. score = _benchmark_quality_score({"pass_rate": 1.0, "tokens_per_sec": 80, "ttft_ms": 400}) diff --git a/tests/test_node_inventory.py b/tests/test_node_inventory.py index 79086fd..7684e19 100644 --- a/tests/test_node_inventory.py +++ b/tests/test_node_inventory.py @@ -1,7 +1,11 @@ import asyncio from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx from geniehive_node.config import load_config +from geniehive_node.discovery import discover_ollama_assets, discover_openai_models, enrich_service_assets, query_ollama_ps from geniehive_node.inventory import build_heartbeat_payload, build_inventory, build_registration_payload from geniehive_node.main import create_app from geniehive_node.sync import ControlPlaneClient @@ -86,6 +90,201 @@ class _FakePoster: return object() +def test_discover_ollama_assets_parses_api_tags_response() -> None: + ollama_response = { + "models": [ + {"name": "qwen3:8b", "size": 12345678}, + {"name": "nomic-embed-text", "size": 987654}, + ] + } + + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = ollama_response + mock_client.get = AsyncMock(return_value=mock_response) + + assets = await discover_ollama_assets("http://127.0.0.1:11434", client=mock_client) + assert len(assets) == 2 + # /api/tags → available, NOT necessarily loaded + assert assets[0] == {"asset_id": "qwen3:8b", "loaded": False} + assert assets[1] == {"asset_id": "nomic-embed-text", "loaded": False} + mock_client.get.assert_called_once_with("http://127.0.0.1:11434/api/tags") + + asyncio.run(run()) + + +def test_query_ollama_ps_returns_loaded_model_names() -> None: + ps_response = { + "models": [ + {"name": "qwen3:8b", "size_in_vram": 5000000000}, + ] + } + + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = ps_response + mock_client.get = AsyncMock(return_value=mock_response) + + loaded = await query_ollama_ps("http://127.0.0.1:11434", client=mock_client) + assert loaded == frozenset({"qwen3:8b"}) + mock_client.get.assert_called_once_with("http://127.0.0.1:11434/api/ps") + + asyncio.run(run()) + + +def test_discover_ollama_assets_returns_empty_on_error() -> None: + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + mock_client.get = AsyncMock(side_effect=httpx.ConnectError("refused")) + assets = await discover_ollama_assets("http://127.0.0.1:11434", client=mock_client) + assert assets == [] + + asyncio.run(run()) + + +def test_enrich_service_assets_skips_when_protocol_none() -> None: + service = {"service_id": "svc-1", "endpoint": "http://127.0.0.1:11434", "assets": []} + + async def run() -> None: + result = await enrich_service_assets(service, protocol=None) + assert result is service # unchanged, no HTTP queries made + + asyncio.run(run()) + + +def test_enrich_ollama_marks_loaded_state_via_api_ps_and_adds_new_assets() -> None: + """Ollama enrichment: tags gives available, ps gives loaded; static assets updated.""" + tags_response = {"models": [{"name": "qwen3:8b"}, {"name": "nomic-embed"}]} + ps_response = {"models": [{"name": "qwen3:8b"}]} # only qwen3 is in VRAM + + service = { + "service_id": "svc-1", + "endpoint": "http://127.0.0.1:11434", + # Static config has qwen3:8b as loaded (stale info) and rocket-3b not listed at all. + "assets": [ + {"asset_id": "qwen3:8b", "loaded": True}, + ], + } + + call_log: list[str] = [] + + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + + async def fake_get(url: str): + call_log.append(url) + mock_resp = MagicMock() + mock_resp.status_code = 200 + if url.endswith("/api/tags"): + mock_resp.json.return_value = tags_response + else: + mock_resp.json.return_value = ps_response + return mock_resp + + mock_client.get = AsyncMock(side_effect=fake_get) + + enriched = await enrich_service_assets(service, protocol="ollama", client=mock_client) + + assets_by_id = {a["asset_id"]: a for a in enriched["assets"]} + # qwen3:8b is in /api/ps → loaded: True (preserved) + assert assets_by_id["qwen3:8b"]["loaded"] is True + # nomic-embed is in /api/tags but NOT in /api/ps → loaded: False, added as new asset + assert assets_by_id["nomic-embed"]["loaded"] is False + # Both endpoints were queried. + assert any("/api/tags" in u for u in call_log) + assert any("/api/ps" in u for u in call_log) + + asyncio.run(run()) + + +def test_enrich_ollama_populates_observed_metrics_from_ps() -> None: + """Ollama enrichment populates observed.loaded_model_count and vram_used_bytes.""" + tags_response = {"models": [{"name": "qwen3:8b"}, {"name": "nomic-embed"}]} + ps_response = { + "models": [ + {"name": "qwen3:8b", "size_in_vram": 5_000_000_000}, + ] + } + + service = { + "service_id": "svc-1", + "endpoint": "http://127.0.0.1:11434", + "assets": [], + } + + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + + async def fake_get(url: str): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = tags_response if "/api/tags" in url else ps_response + return mock_resp + + mock_client.get = AsyncMock(side_effect=fake_get) + enriched = await enrich_service_assets(service, protocol="ollama", client=mock_client) + assert enriched["observed"]["loaded_model_count"] == 1 + assert enriched["observed"]["vram_used_bytes"] == 5_000_000_000 + + asyncio.run(run()) + + +def test_enrich_ollama_updates_stale_loaded_state_to_false() -> None: + """Static config says loaded=True but /api/ps reports it is not; should be corrected.""" + tags_response = {"models": [{"name": "big-model"}]} + ps_response = {"models": []} # nothing loaded + + service = { + "service_id": "svc-1", + "endpoint": "http://127.0.0.1:11434", + "assets": [{"asset_id": "big-model", "loaded": True}], + } + + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + + async def fake_get(url: str): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = tags_response if "/api/tags" in url else ps_response + return mock_resp + + mock_client.get = AsyncMock(side_effect=fake_get) + enriched = await enrich_service_assets(service, protocol="ollama", client=mock_client) + assert enriched["assets"][0]["loaded"] is False # stale state corrected + + asyncio.run(run()) + + +def test_discover_openai_models_parses_v1_models_response() -> None: + openai_response = { + "object": "list", + "data": [ + {"id": "mistral-7b-instruct", "object": "model"}, + {"id": "nomic-embed-text-v1", "object": "model"}, + ], + } + + async def run() -> None: + mock_client = AsyncMock(spec=httpx.AsyncClient) + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = openai_response + mock_client.get = AsyncMock(return_value=mock_response) + + assets = await discover_openai_models("http://127.0.0.1:8000", client=mock_client) + assert len(assets) == 2 + assert assets[0] == {"asset_id": "mistral-7b-instruct", "loaded": True} + assert assets[1] == {"asset_id": "nomic-embed-text-v1", "loaded": True} + mock_client.get.assert_called_once_with("http://127.0.0.1:8000/v1/models") + + asyncio.run(run()) + + def test_control_plane_client_posts_register_and_heartbeat(tmp_path: Path) -> None: cfg_path = _write_node_config(tmp_path) cfg = load_config(cfg_path)