merge: pull upstream/main into feat/colony-session-transfer

This commit is contained in:
Vincent Jiang
2026-04-24 09:28:46 -07:00
43 changed files with 2135 additions and 249 deletions
+191 -14
View File
@@ -85,7 +85,12 @@ from framework.agent_loop.internals.types import (
JudgeVerdict,
TriggerEvent,
)
from framework.agent_loop.internals.vision_fallback import (
caption_tool_image,
extract_intent_for_tool,
)
from framework.agent_loop.types import AgentContext, AgentProtocol, AgentResult
from framework.config import get_vision_fallback_model
from framework.host.event_bus import EventBus
from framework.llm.capabilities import filter_tools_for_model, supports_image_tool_results
from framework.llm.provider import Tool, ToolResult, ToolUse
@@ -219,6 +224,52 @@ async def _describe_images_as_text(image_content: list[dict[str, Any]]) -> str |
return None
def _vision_fallback_active(model: str | None) -> bool:
"""Return True if tool-result images for *model* should be routed
through the vision-fallback chain rather than sent to the model.
Trigger: the model appears in Hive's curated text-only deny list
(``capabilities.supports_image_tool_results`` returns False).
That list is the only reliable signal LiteLLM's
``supports_vision`` returns False for any unknown model
(including custom-served vision-capable models like Jackrong/Qwopus3.5)
so it cannot be used as a gate; and LiteLLM's openai chat
transformer doesn't strip image blocks anyway, so passing them
through to a vision-capable but litellm-unrecognised model still
works end-to-end.
The ``vision_fallback`` config block is the *substitution* model
it doesn't widen the trigger. To force fallback for a model the
deny list doesn't cover yet, add it to
``capabilities._TEXT_ONLY_MODEL_BARE_PREFIXES`` /
``_TEXT_ONLY_PROVIDER_PREFIXES`` rather than relying on a runtime
config.
"""
if not model:
return False
return not supports_image_tool_results(model)
async def _captioning_chain(
intent: str,
image_content: list[dict[str, Any]],
) -> str | None:
"""Two-stage caption chain used by the agent-loop tool-result hook.
Stage 1: configured ``vision_fallback`` model with intent + images.
Stage 2: generic-caption rotation (gpt-4o-mini claude-3-haiku
gemini-flash) when stage 1 is unconfigured or fails.
Returns the caption text or None if both stages fail. Caller is
responsible for the placeholder-on-None and the splice into the
persisted tool-result content.
"""
caption = await caption_tool_image(intent, image_content)
if not caption:
caption = await _describe_images_as_text(image_content)
return caption
# Pattern for detecting context-window-exceeded errors across LLM providers.
_CONTEXT_TOO_LARGE_RE = re.compile(
r"context.{0,20}(length|window|limit|size)|"
@@ -575,6 +626,7 @@ class AgentLoop(AgentProtocol):
store=self._conversation_store,
run_id=ctx.effective_run_id,
compaction_buffer_tokens=self._config.compaction_buffer_tokens,
compaction_buffer_ratio=self._config.compaction_buffer_ratio,
compaction_warning_buffer_tokens=(self._config.compaction_warning_buffer_tokens),
)
accumulator = OutputAccumulator(
@@ -587,7 +639,12 @@ class AgentLoop(AgentProtocol):
initial_message = self._build_initial_message(ctx)
if initial_message:
await conversation.add_user_message(initial_message)
# Stamp with arrival time so the conversation has a
# temporal anchor for the first turn, matching the
# stamping done by drain_injection_queue for every
# subsequent event.
_stamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M %Z")
await conversation.add_user_message(f"[{_stamp}] {initial_message}")
await self._run_hooks("session_start", conversation, trigger=initial_message)
@@ -599,7 +656,8 @@ class AgentLoop(AgentProtocol):
initial_message = self._build_initial_message(ctx)
if not initial_message:
initial_message = "Hello"
await conversation.add_user_message(initial_message)
_stamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M %Z")
await conversation.add_user_message(f"[{_stamp}] {initial_message}")
# 2b. Restore spill counter from existing files (resume safety)
self._restore_spill_counter()
@@ -619,8 +677,23 @@ class AgentLoop(AgentProtocol):
# Hide image-producing tools from text-only models so they never try
# to call them. Avoids wasted turns + "screenshot failed" lessons
# getting saved to memory. See framework.llm.capabilities.
# EXCEPTION: when the model IS on the text-only deny list AND
# a vision_fallback subagent is configured, leave image tools
# visible. The post-execution hook in the inner tool loop
# will route each image_content through the fallback VLM and
# replace it with a text caption before the main agent sees
# the result — so the main agent gets captions instead of
# raw images, rather than losing the tool entirely. We DON'T
# bypass the filter for vision-capable models (that would be
# a no-op anyway — the filter doesn't fire for them) and we
# DON'T bypass it without a configured fallback (the agent
# would just see raw stripped tool results with no caption).
_llm_model = ctx.llm.model if ctx.llm else ""
tools, _hidden_image_tools = filter_tools_for_model(tools, _llm_model)
_text_only_main = _llm_model and not supports_image_tool_results(_llm_model)
if _text_only_main and get_vision_fallback_model() is not None:
_hidden_image_tools: list[str] = []
else:
tools, _hidden_image_tools = filter_tools_for_model(tools, _llm_model)
logger.info(
"[%s] Tools available (%d): %s | direct_user_io=%s | judge=%s | hidden_image_tools=%s",
@@ -799,14 +872,50 @@ class AgentLoop(AgentProtocol):
or ctx.dynamic_skills_catalog_provider is not None
):
if ctx.dynamic_prompt_provider is not None:
_new_prompt = stamp_prompt_datetime(ctx.dynamic_prompt_provider())
_new_prompt = ctx.dynamic_prompt_provider()
# When a suffix provider is also wired (Queen's
# static/dynamic split), keep the two pieces separate
# so the LLM wrapper can emit them as two system
# content blocks with a cache breakpoint between them.
# The timestamp used to be stamped here via
# stamp_prompt_datetime on every iteration — it now
# lives inside the frozen dynamic suffix and is only
# refreshed at user-turn boundaries, so per-iteration
# stamping would both double-stamp and bust the cache.
_new_suffix: str | None = None
if ctx.dynamic_prompt_suffix_provider is not None:
try:
_new_suffix = ctx.dynamic_prompt_suffix_provider() or ""
except Exception:
logger.debug(
"[%s] dynamic_prompt_suffix_provider raised — falling back to legacy stamp",
node_id,
exc_info=True,
)
_new_suffix = None
if _new_suffix is None:
# Legacy / fallback path: no split in use (or the
# suffix provider raised). Stamp the timestamp at
# the end of the single-string prompt so the model
# still sees a current "now".
_new_prompt = stamp_prompt_datetime(_new_prompt)
else:
# build_system_prompt_for_context reads dynamic_skills_catalog_provider
# directly; no separate branch needed.
_new_prompt = build_system_prompt_for_context(ctx)
if _new_prompt != conversation.system_prompt:
conversation.update_system_prompt(_new_prompt)
logger.info("[%s] Dynamic prompt updated", node_id)
_new_suffix = None
if _new_suffix is not None:
_combined_for_compare = f"{_new_prompt}\n\n{_new_suffix}" if _new_suffix else _new_prompt
if (
_combined_for_compare != conversation.system_prompt
or _new_suffix != conversation.system_prompt_dynamic_suffix
):
conversation.update_system_prompt(_new_prompt, dynamic_suffix=_new_suffix)
logger.info("[%s] Dynamic prompt updated (split)", node_id)
else:
if _new_prompt != conversation.system_prompt:
conversation.update_system_prompt(_new_prompt)
logger.info("[%s] Dynamic prompt updated", node_id)
# 6c. Publish iteration event (with per-iteration metadata when available)
_iter_meta = None
@@ -896,6 +1005,8 @@ class AgentLoop(AgentProtocol):
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
cached_tokens=turn_tokens.get("cached", 0),
cache_creation_tokens=turn_tokens.get("cache_creation", 0),
cost_usd=float(turn_tokens.get("cost", 0.0) or 0.0),
execution_id=execution_id,
iteration=iteration,
)
@@ -2296,7 +2407,9 @@ class AgentLoop(AgentProtocol):
stream_id = ctx.stream_id or ctx.agent_id
node_id = ctx.agent_id
execution_id = ctx.execution_id or ""
token_counts: dict[str, int] = {"input": 0, "output": 0, "cached": 0}
# Mixed-type dict: int token counts + str stop_reason/model + float cost.
# Typed loosely to avoid churn in the many call sites that read from it.
token_counts: dict[str, Any] = {"input": 0, "output": 0, "cached": 0, "cache_creation": 0, "cost": 0.0}
tool_call_count = 0
final_text = ""
final_system_prompt = conversation.system_prompt
@@ -2437,9 +2550,16 @@ class AgentLoop(AgentProtocol):
nonlocal _first_event_at
_clean_snapshot = "" # visible-only text for the frontend
# Split-prompt path: pass STATIC and DYNAMIC tail separately
# so the LLM wrapper can emit them as two Anthropic system
# content blocks with a cache breakpoint between them. When
# no split is in use, ``system_prompt_static`` equals the
# full prompt and the suffix is empty — identical to the
# legacy single-block request.
async for event in ctx.llm.stream(
messages=_msgs,
system=conversation.system_prompt,
system=conversation.system_prompt_static,
system_dynamic_suffix=(conversation.system_prompt_dynamic_suffix or None),
tools=tools if tools else None,
max_tokens=ctx.max_tokens,
):
@@ -2520,6 +2640,8 @@ class AgentLoop(AgentProtocol):
token_counts["input"] += event.input_tokens
token_counts["output"] += event.output_tokens
token_counts["cached"] += event.cached_tokens
token_counts["cache_creation"] += event.cache_creation_tokens
token_counts["cost"] = token_counts.get("cost", 0.0) + event.cost_usd
token_counts["stop_reason"] = event.stop_reason
token_counts["model"] = event.model
@@ -3312,6 +3434,32 @@ class AgentLoop(AgentProtocol):
# Phase 3: record results into conversation in original order,
# build logged/real lists, and publish completed events.
#
# Vision-fallback prefetch: a single turn may fire several
# image-producing tools in parallel (e.g. one screenshot
# per tab). Captioning each one takes a vision LLM round
# trip (130 s). Doing them sequentially in this loop
# would serialise that latency per image. Instead, kick
# off all caption tasks concurrently NOW, and await each
# one just-in-time inside the per-tc body. If only a
# single image needs captioning, this collapses to a
# single await with no overhead.
_model_text_only = ctx.llm and _vision_fallback_active(ctx.llm.model)
caption_tasks: dict[str, asyncio.Task[str | None]] = {}
if _model_text_only:
for tc in tool_calls[:executed_in_batch]:
res = results_by_id.get(tc.tool_use_id)
if not res or not res.image_content:
continue
intent = extract_intent_for_tool(
conversation,
tc.tool_name,
tc.tool_input or {},
)
caption_tasks[tc.tool_use_id] = asyncio.create_task(
_captioning_chain(intent, res.image_content)
)
for tc in tool_calls[:executed_in_batch]:
result = results_by_id.get(tc.tool_use_id)
if result is None:
@@ -3334,11 +3482,31 @@ class AgentLoop(AgentProtocol):
logged_tool_calls.append(tool_entry)
image_content = result.image_content
if image_content and ctx.llm and not supports_image_tool_results(ctx.llm.model):
logger.info(
"Stripping image_content from tool result; model '%s' does not support images in tool results",
ctx.llm.model,
)
# Vision-fallback marker spliced into the persisted text
# below. None when no captioning ran (vision-capable
# main model, no images, or no fallback chain reached
# this tool).
vision_fallback_marker: str | None = None
if image_content and tc.tool_use_id in caption_tasks:
caption = await caption_tasks.pop(tc.tool_use_id)
if caption:
vision_fallback_marker = f"[vision-fallback caption]\n{caption}"
logger.info(
"vision_fallback: captioned %d image(s) for tool '%s' "
"(model '%s' routed through fallback)",
len(image_content),
tc.tool_name,
ctx.llm.model if ctx.llm else "?",
)
else:
vision_fallback_marker = "[image stripped — vision fallback exhausted]"
logger.info(
"vision_fallback: exhausted; stripping %d image(s) from "
"tool '%s' result without caption (model '%s')",
len(image_content),
tc.tool_name,
ctx.llm.model if ctx.llm else "?",
)
image_content = None
# Apply replay-detector steer prefix if this call matched a
@@ -3350,6 +3518,11 @@ class AgentLoop(AgentProtocol):
if _prefix:
stored_content = f"{_prefix}{stored_content or ''}"
# Splice the vision-fallback caption / placeholder into
# the persisted text after any prefix has been applied.
if vision_fallback_marker:
stored_content = f"{stored_content or ''}\n\n{vision_fallback_marker}"
await conversation.add_tool_result(
tool_use_id=tc.tool_use_id,
content=stored_content,
@@ -4101,6 +4274,8 @@ class AgentLoop(AgentProtocol):
input_tokens: int,
output_tokens: int,
cached_tokens: int = 0,
cache_creation_tokens: int = 0,
cost_usd: float = 0.0,
execution_id: str = "",
iteration: int | None = None,
) -> None:
@@ -4113,6 +4288,8 @@ class AgentLoop(AgentProtocol):
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
execution_id=execution_id,
iteration=iteration,
)
+81 -11
View File
@@ -427,9 +427,20 @@ class NodeConversation:
store: ConversationStore | None = None,
run_id: str | None = None,
compaction_buffer_tokens: int | None = None,
compaction_buffer_ratio: float | None = None,
compaction_warning_buffer_tokens: int | None = None,
) -> None:
self._system_prompt = system_prompt
# Optional split: when a caller updates the prompt with a
# ``dynamic_suffix`` argument, we remember the static prefix and
# suffix separately so the LLM wrapper can emit them as two
# Anthropic system content blocks with a cache breakpoint between
# them. ``_system_prompt`` stays as the concatenated form used for
# persistence and for the legacy single-block LLM path.
# On restore, these default to the concat/empty pair — the next
# AgentLoop iteration's dynamic-prompt refresh step repopulates.
self._system_prompt_static: str = system_prompt
self._system_prompt_dynamic_suffix: str = ""
self._max_context_tokens = max_context_tokens
self._compaction_threshold = compaction_threshold
# Buffer-based compaction trigger (Gap 7). When set, takes
@@ -439,6 +450,11 @@ class NodeConversation:
# limit. If left as None the legacy threshold-based rule is
# used, keeping old call sites behaving identically.
self._compaction_buffer_tokens = compaction_buffer_tokens
# Ratio component of the hybrid buffer. Combines additively with
# _compaction_buffer_tokens so callers can express "reserve N tokens
# plus M% of the window" — the absolute floor matters on tiny
# windows, the ratio matters on large ones.
self._compaction_buffer_ratio = compaction_buffer_ratio
self._compaction_warning_buffer_tokens = compaction_warning_buffer_tokens
self._output_keys = output_keys
self._store = store
@@ -453,15 +469,56 @@ class NodeConversation:
@property
def system_prompt(self) -> str:
"""Full concatenated system prompt (static + dynamic suffix, if any).
This is the canonical form used for persistence and for the legacy
single-block LLM path. Split-prompt callers should read
``system_prompt_static`` and ``system_prompt_dynamic_suffix`` instead.
"""
return self._system_prompt
def update_system_prompt(self, new_prompt: str) -> None:
@property
def system_prompt_static(self) -> str:
"""Static prefix of the system prompt (cache-stable).
Equals ``system_prompt`` when no split is in use. When the AgentLoop
calls ``update_system_prompt(static, dynamic_suffix=...)``, this is
the piece sent as the cache-controlled first block.
"""
return self._system_prompt_static
@property
def system_prompt_dynamic_suffix(self) -> str:
"""Dynamic tail of the system prompt (not cached).
Empty unless the consumer splits its prompt. The LLM wrapper uses a
non-empty suffix to emit a two-block system content list with a
cache breakpoint between the static prefix and this tail.
"""
return self._system_prompt_dynamic_suffix
def update_system_prompt(self, new_prompt: str, dynamic_suffix: str | None = None) -> None:
"""Update the system prompt.
Used in continuous conversation mode at phase transitions to swap
Layer 3 (focus) while preserving the conversation history.
When ``dynamic_suffix`` is provided, ``new_prompt`` is interpreted as
the STATIC prefix and ``dynamic_suffix`` as the per-turn tail; they
travel to the LLM as two separate cache-controlled blocks but are
persisted as a single concatenated string for backward-compat
restore. ``new_prompt`` alone (suffix left None) keeps the legacy
single-string behavior.
"""
self._system_prompt = new_prompt
if dynamic_suffix is None:
# Legacy single-string path — static == full, no suffix split.
self._system_prompt = new_prompt
self._system_prompt_static = new_prompt
self._system_prompt_dynamic_suffix = ""
else:
self._system_prompt_static = new_prompt
self._system_prompt_dynamic_suffix = dynamic_suffix
self._system_prompt = f"{new_prompt}\n\n{dynamic_suffix}" if dynamic_suffix else new_prompt
self._meta_persisted = False # re-persist with new prompt
def set_current_phase(self, phase_id: str) -> None:
@@ -847,19 +904,30 @@ class NodeConversation:
"""True when the conversation should be compacted before the
next LLM call.
Buffer-based rule (Gap 7): trigger when the current estimate
plus the configured buffer would exceed the hard context limit.
Prevents compaction from firing only AFTER we're already over
the wire and forced into a reactive binary-split pass.
Hybrid buffer rule: the headroom reserved before compaction fires
is the SUM of an absolute fixed component and a ratio of the hard
context limit:
When no buffer is configured, falls back to the multiplicative
threshold the old callers were built around.
effective_buffer = compaction_buffer_tokens
+ compaction_buffer_ratio * max_context_tokens
The fixed component gives a floor on tiny windows; the ratio
keeps the trigger meaningful on large windows where any constant
buffer becomes a rounding error (an 8k buffer is 75% on a 32k
window but 96% on a 200k window). Compaction fires when the
current estimate would consume more than (limit - effective_buffer).
When neither component is configured, falls back to the legacy
multiplicative threshold so old callers keep behaving identically.
"""
if self._max_context_tokens <= 0:
return False
if self._compaction_buffer_tokens is not None:
budget = self._max_context_tokens - self._compaction_buffer_tokens
return self.estimate_tokens() >= max(0, budget)
fixed = self._compaction_buffer_tokens
ratio = self._compaction_buffer_ratio
if fixed is not None or ratio is not None:
effective_buffer = (fixed or 0) + (ratio or 0.0) * self._max_context_tokens
budget = self._max_context_tokens - effective_buffer
return self.estimate_tokens() >= max(0.0, budget)
return self.estimate_tokens() >= self._max_context_tokens * self._compaction_threshold
def compaction_warning(self) -> bool:
@@ -1516,6 +1584,7 @@ class NodeConversation:
"max_context_tokens": self._max_context_tokens,
"compaction_threshold": self._compaction_threshold,
"compaction_buffer_tokens": self._compaction_buffer_tokens,
"compaction_buffer_ratio": self._compaction_buffer_ratio,
"compaction_warning_buffer_tokens": (self._compaction_warning_buffer_tokens),
"output_keys": self._output_keys,
}
@@ -1565,6 +1634,7 @@ class NodeConversation:
store=store,
run_id=run_id,
compaction_buffer_tokens=meta.get("compaction_buffer_tokens"),
compaction_buffer_ratio=meta.get("compaction_buffer_ratio"),
compaction_warning_buffer_tokens=meta.get("compaction_warning_buffer_tokens"),
)
conv._meta_persisted = True
@@ -12,6 +12,7 @@ import json
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from framework.agent_loop.conversation import ConversationStore, NodeConversation
@@ -191,15 +192,21 @@ async def drain_injection_queue(
else:
logger.info("[drain] no vision fallback available; images dropped")
image_content = None
# Real user input is stored as-is; external events get a prefix
# Stamp every injected event with its arrival time so the model
# has a consistent temporal log to reason over (and so the
# stamp lives inside byte-stable conversation history instead
# of a per-turn system-prompt tail). Minute precision is what
# the queen needs for conversational / scheduling context.
stamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M %Z")
if is_client_input:
stamped = f"[{stamp}] {content}" if content else f"[{stamp}]"
await conversation.add_user_message(
content,
stamped,
is_client_input=True,
image_content=image_content,
)
else:
await conversation.add_user_message(f"[External event]: {content}")
await conversation.add_user_message(f"[{stamp}] [External event] {content}")
count += 1
except asyncio.QueueEmpty:
break
@@ -232,7 +239,8 @@ async def drain_trigger_queue(
payload_str = json.dumps(t.payload, default=str)
parts.append(f"[TRIGGER: {t.trigger_type}/{t.source_id}]{task_line}\n{payload_str}")
combined = "\n\n".join(parts)
stamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M %Z")
combined = f"[{stamp}]\n" + "\n\n".join(parts)
logger.info("[drain] %d trigger(s): %s", len(triggers), combined[:200])
# Tag the message so the UI can render a banner instead of the raw
# `[TRIGGER: ...]` text. The LLM still sees `combined` verbatim.
@@ -108,6 +108,8 @@ async def publish_llm_turn_complete(
input_tokens: int,
output_tokens: int,
cached_tokens: int = 0,
cache_creation_tokens: int = 0,
cost_usd: float = 0.0,
execution_id: str = "",
iteration: int | None = None,
) -> None:
@@ -120,6 +122,8 @@ async def publish_llm_turn_complete(
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
execution_id=execution_id,
iteration=iteration,
)
@@ -69,6 +69,20 @@ class LoopConfig:
# and less tight than Anthropic's own counting. Override via
# LoopConfig for larger windows.
compaction_buffer_tokens: int = 8_000
# Ratio-based component of the hybrid compaction buffer. Effective
# headroom reserved before compaction fires is
# compaction_buffer_tokens + compaction_buffer_ratio * max_context_tokens
# The ratio scales with the model's window where the absolute fixed
# component does not (an 8k absolute buffer is 75% trigger on a 32k
# window but 96% on a 200k window). Combining them gives an absolute
# floor sized for the worst-case single tool result (one un-spilled
# max_tool_result_chars payload ≈ 30k chars ≈ 7.5k tokens, rounded to
# 8k) plus a fractional headroom that keeps the trigger meaningful on
# large windows, so the inner tool loop always has room to grow
# without tripping the mid-turn pre-send guard. Defaults: 8k + 15%.
# On 32k that's a 12.8k buffer (~60% trigger); on 200k it's 38k
# (~81% trigger); on 1M it's 158k (~84% trigger).
compaction_buffer_ratio: float = 0.15
# Warning is emitted one buffer earlier so the user/telemetry gets
# a "we're close" signal without triggering a compaction pass.
compaction_warning_buffer_tokens: int = 12_000
@@ -0,0 +1,250 @@
"""Vision-fallback subagent for tool-result images on text-only LLMs.
When a tool returns image content but the main agent's model can't
accept image blocks (per ``supports_image_tool_results``), the framework
strips the images before they ever reach the LLM. Without this module,
the agent then sees only the tool's text envelope (URL, dimensions,
size) and is blind to whatever the image actually shows.
This module provides:
* ``caption_tool_image()`` direct LiteLLM call to a configured
vision model (``vision_fallback`` block in ``~/.hive/configuration.json``)
that takes the agent's intent + the image(s) and returns a textual
description tailored to that intent.
* ``extract_intent_for_tool()`` pull the most recent assistant text
+ the tool call descriptor and concatenate them into a 2KB intent
string the vision subagent can reason against.
Both helpers degrade silently return ``None`` / a placeholder rather
than raise so a vision-fallback failure can never kill the main
agent's run. The agent-loop call site is responsible for chaining
through to the existing generic-caption rotation
(``_describe_images_as_text``) on a None return.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any
from framework.config import (
get_vision_fallback_api_base,
get_vision_fallback_api_key,
get_vision_fallback_model,
)
if TYPE_CHECKING:
from ..conversation import NodeConversation
logger = logging.getLogger(__name__)
# Hard cap on the intent string handed to the vision subagent. The
# subagent only needs the agent's recent reasoning + the tool descriptor;
# anything longer is wasted tokens (and risks pushing past the vision
# model's context with the image attached).
_INTENT_MAX_CHARS = 4096
# Cap on the tool args JSON snippet inside the intent. Some tool inputs
# (large strings, file contents) would dominate the intent if uncapped.
_TOOL_ARGS_MAX_CHARS = 4096
# Subagent system prompt — kept short so it fits within any provider's
# system-prompt budget alongside the user message + image. Tells the
# subagent its role and constrains output format.
#
# Coordinate labeling: the main agent's browser tools
# (browser_click_coordinate / browser_hover_coordinate / browser_press_at)
# accept VIEWPORT FRACTIONS (x, y) in [0..1] where (0,0) is the top-left
# and (1,1) is the bottom-right of the screenshot. Without coordinates
# the text-only agent has no way to act on what we describe — it can
# read the caption but cannot point. So for every interactive element
# we name (button, link, input, icon, tab, menu item, dialog control),
# include its approximate viewport-fraction centre as ``(fx, fy)``
# right after the element's name, e.g. ``"Submit" button (0.83, 0.92)``.
# Three rules: (1) coordinates only for things plausibly clickable /
# hoverable / typeable — don't tag pure body text or decorative
# graphics. (2) Eyeball to two decimal places; precision beyond that
# is false confidence. (3) Never invent — if an element is partly
# off-screen or you can't locate it, omit the coordinate rather than
# guessing.
_VISION_SUBAGENT_SYSTEM = (
"You are a vision subagent for a text-only main agent. The main "
"agent invoked a tool that returned the image(s) attached. Their "
"intent (their reasoning + the tool call) is below. Describe what "
"the image shows in service of their intent — concrete, factual, "
"no speculation. If their intent asks a yes/no question, answer it "
"directly first.\n\n"
"Coordinate labeling: the main agent uses fractional viewport "
"coordinates (x, y) in [0..1] — (0, 0) is the top-left of the "
"image, (1, 1) is the bottom-right — to drive its click / hover / "
"key-press tools. For every interactive element you mention "
"(button, link, input, checkbox, radio, dropdown, tab, menu item, "
"dialog control, icon), append its approximate centre as "
"``(fx, fy)`` immediately after the element's name or label, e.g. "
'``"Submit" button (0.83, 0.92)`` or ``profile avatar icon '
"(0.05, 0.07)``. Use two decimal places — more is false precision. "
"Skip coordinates for pure body text and decorative elements that "
"aren't clickable. If an element is partially off-screen or you "
"cannot reliably locate its centre, omit the coordinate rather "
"than guessing.\n\n"
"Output plain text, no markdown, ≤ 600 words."
)
def extract_intent_for_tool(
conversation: NodeConversation,
tool_name: str,
tool_args: dict[str, Any] | None,
) -> str:
"""Build the intent string passed to the vision subagent.
Combines the most recent assistant text (the LLM's reasoning right
before invoking the tool) with a structured tool-call descriptor.
Truncates to ``_INTENT_MAX_CHARS`` total, favouring the head of the
assistant text where goal-stating sentences usually live.
If no preceding assistant text exists (rare first turn), falls
back to ``"<no preceding reasoning>"`` so the subagent still gets
the tool descriptor.
"""
args_json: str
try:
args_json = json.dumps(tool_args or {}, default=str)
except Exception:
args_json = repr(tool_args)
if len(args_json) > _TOOL_ARGS_MAX_CHARS:
args_json = args_json[:_TOOL_ARGS_MAX_CHARS] + ""
tool_line = f"Called: {tool_name}({args_json})"
# Walk newest → oldest, take the first assistant message with text.
assistant_text = ""
try:
messages = getattr(conversation, "_messages", []) or []
for msg in reversed(messages):
if getattr(msg, "role", None) != "assistant":
continue
content = getattr(msg, "content", "") or ""
if isinstance(content, str) and content.strip():
assistant_text = content.strip()
break
except Exception:
# Defensive — the agent loop must keep running even if the
# conversation structure changes shape.
assistant_text = ""
if not assistant_text:
assistant_text = "<no preceding reasoning>"
# Intent = tool descriptor (always intact) + reasoning (truncated).
head = f"{tool_line}\n\nReasoning before call:\n"
budget = _INTENT_MAX_CHARS - len(head)
if budget < 100:
# Tool descriptor is huge somehow — truncate it.
return head[:_INTENT_MAX_CHARS]
if len(assistant_text) > budget:
assistant_text = assistant_text[: budget - 1] + ""
return head + assistant_text
async def caption_tool_image(
intent: str,
image_content: list[dict[str, Any]],
*,
timeout_s: float = 30.0,
) -> str | None:
"""Caption the given images using the configured ``vision_fallback`` model.
Returns the model's text response on success, or ``None`` on any
failure (no config, no API key, timeout, exception, empty
response). Callers chain to the next stage of the fallback on None.
Logs each call to ``~/.hive/llm_logs`` via ``log_llm_turn`` so the
cost / latency / quality are auditable post-hoc, tagged with
``execution_id="vision_fallback_subagent"``.
"""
model = get_vision_fallback_model()
if not model:
return None
api_key = get_vision_fallback_api_key()
api_base = get_vision_fallback_api_base()
if not api_key:
logger.debug("vision_fallback configured but no API key resolved; skipping")
return None
try:
import litellm
except ImportError:
return None
user_blocks: list[dict[str, Any]] = [{"type": "text", "text": intent}]
user_blocks.extend(image_content)
messages = [
{"role": "system", "content": _VISION_SUBAGENT_SYSTEM},
{"role": "user", "content": user_blocks},
]
kwargs: dict[str, Any] = {
"model": model,
"messages": messages,
"max_tokens": 1024,
"timeout": timeout_s,
"api_key": api_key,
}
if api_base:
kwargs["api_base"] = api_base
started = datetime.now()
caption: str | None = None
error_text: str | None = None
try:
response = await litellm.acompletion(**kwargs)
text = (response.choices[0].message.content or "").strip()
if text:
caption = text
except Exception as exc:
error_text = f"{type(exc).__name__}: {exc}"
logger.debug("vision_fallback model '%s' failed: %s", model, exc)
# Best-effort audit log so users can grep ~/.hive/llm_logs/ for
# vision-fallback subagent calls. Failures here must not bubble.
try:
from framework.tracker.llm_debug_logger import log_llm_turn
# Don't dump the base64 image data into the log file — that
# would balloon the jsonl with mostly-binary noise.
elided_blocks: list[dict[str, Any]] = [{"type": "text", "text": intent}]
elided_blocks.extend(
{"type": "image_url", "image_url": {"url": "<elided>"}}
for _ in range(len(image_content))
)
log_llm_turn(
node_id="vision_fallback_subagent",
stream_id="vision_fallback",
execution_id="vision_fallback_subagent",
iteration=0,
system_prompt=_VISION_SUBAGENT_SYSTEM,
messages=[{"role": "user", "content": elided_blocks}],
assistant_text=caption or "",
tool_calls=[],
tool_results=[],
token_counts={
"model": model,
"elapsed_s": (datetime.now() - started).total_seconds(),
"error": error_text,
"num_images": len(image_content),
"intent_chars": len(intent),
},
)
except Exception:
pass
return caption
__all__ = ["caption_tool_image", "extract_intent_for_tool"]
+10
View File
@@ -182,6 +182,16 @@ class AgentContext:
dynamic_tools_provider: Any = None
dynamic_prompt_provider: Any = None
# Optional Callable[[], str]: when set alongside ``dynamic_prompt_provider``,
# the AgentLoop sends the system prompt as two pieces — the result of
# ``dynamic_prompt_provider`` is the STATIC block (cached), and this
# provider returns the DYNAMIC suffix (not cached). The LLM wrapper
# emits them as two Anthropic system content blocks with a cache
# breakpoint between them for providers that honor ``cache_control``.
# For providers that don't, the two strings are concatenated. Used by
# the Queen to keep her persona/role/tools block warm across iterations
# while the recall + timestamp tail refreshes per user turn.
dynamic_prompt_suffix_provider: Any = None
dynamic_memory_provider: Any = None
# Optional Callable[[], str]: when set, the current skills-catalog
# prompt is sourced from this provider each iteration. Lets workers
+51
View File
@@ -155,6 +155,57 @@ def get_preferred_worker_model() -> str | None:
return None
def get_vision_fallback_model() -> str | None:
"""Return the configured vision-fallback model, or None if not configured.
Reads from the ``vision_fallback`` section of ~/.hive/configuration.json.
Used by the agent-loop hook that captions tool-result images when the
main agent's model cannot accept image content (text-only LLMs).
When this returns None the fallback chain skips the configured-subagent
stage and proceeds straight to the generic caption rotation
(``_describe_images_as_text``).
"""
vision = get_hive_config().get("vision_fallback", {})
if vision.get("provider") and vision.get("model"):
provider = str(vision["provider"])
model = str(vision["model"]).strip()
if provider.lower() == "openrouter" and model.lower().startswith("openrouter/"):
model = model[len("openrouter/") :]
if model:
return f"{provider}/{model}"
return None
def get_vision_fallback_api_key() -> str | None:
"""Return the API key for the vision-fallback model.
Resolution order: ``vision_fallback.api_key_env_var`` from the env,
then the default ``get_api_key()``. No subscription-token branches
vision fallback is intended for hosted vision models (Anthropic,
OpenAI, Google), not for the subscription-bearer providers.
"""
vision = get_hive_config().get("vision_fallback", {})
if not vision:
return get_api_key()
api_key_env_var = vision.get("api_key_env_var")
if api_key_env_var:
return os.environ.get(api_key_env_var)
return get_api_key()
def get_vision_fallback_api_base() -> str | None:
"""Return the api_base for the vision-fallback model, or None."""
vision = get_hive_config().get("vision_fallback", {})
if not vision:
return None
if vision.get("api_base"):
return vision["api_base"]
if str(vision.get("provider", "")).lower() == "openrouter":
return OPENROUTER_API_BASE
return None
def get_worker_api_key() -> str | None:
"""Return the API key for the worker LLM, falling back to the default key."""
worker_llm = get_hive_config().get("worker_llm", {})
+4 -13
View File
@@ -435,9 +435,7 @@ class ColonyRuntime:
if queen_id:
queen_home = QUEENS_DIR / queen_id
queen_overrides_path = queen_home / "skills_overrides.json"
extras.append(
ExtraScope(directory=queen_home / "skills", label="queen_ui", priority=2)
)
extras.append(ExtraScope(directory=queen_home / "skills", label="queen_ui", priority=2))
colony_overrides_path: Path | None = None
if colony_name:
@@ -533,8 +531,7 @@ class ColonyRuntime:
return [
t
for t in tools
if getattr(t, "name", None) not in self._mcp_tool_names_all
or getattr(t, "name", None) in allowed
if getattr(t, "name", None) not in self._mcp_tool_names_all or getattr(t, "name", None) in allowed
]
# ── Lifecycle ───────────────────────────────────────────────
@@ -906,17 +903,11 @@ class ColonyRuntime:
# pre-activated catalog stays static because its contents are
# built for *this* worker's task (a tombstone toggle from the
# UI should not yank it mid-run).
_db_path_pre_activated = bool(
isinstance(input_data, dict) and input_data.get("db_path")
)
_db_path_pre_activated = bool(isinstance(input_data, dict) and input_data.get("db_path"))
# Default-bind the manager into the closure so each loop iteration
# captures the same manager instance — pyflakes B023 would flag a
# free-variable capture here.
_provider = (
None
if _db_path_pre_activated
else (lambda mgr=self._skills_manager: mgr.skills_catalog_prompt)
)
_provider = None if _db_path_pre_activated else (lambda mgr=self._skills_manager: mgr.skills_catalog_prompt)
agent_context = AgentContext(
runtime=self._make_runtime_adapter(worker_id),
+13 -1
View File
@@ -809,16 +809,28 @@ class EventBus:
input_tokens: int,
output_tokens: int,
cached_tokens: int = 0,
cache_creation_tokens: int = 0,
cost_usd: float = 0.0,
execution_id: str | None = None,
iteration: int | None = None,
) -> None:
"""Emit LLM turn completion with stop reason and model metadata."""
"""Emit LLM turn completion with stop reason and model metadata.
``cached_tokens`` and ``cache_creation_tokens`` are subsets of
``input_tokens`` (already inside provider ``prompt_tokens``).
Subscribers should display them, not add them to a total.
``cost_usd`` is the USD cost for this turn when known (Anthropic,
OpenAI, OpenRouter). 0.0 means unreported (not free).
"""
data: dict = {
"stop_reason": stop_reason,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cached_tokens": cached_tokens,
"cache_creation_tokens": cache_creation_tokens,
"cost_usd": cost_usd,
}
if iteration is not None:
data["iteration"] = iteration
+7
View File
@@ -653,10 +653,17 @@ class AntigravityProvider(LLMProvider):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
system_dynamic_suffix: str | None = None,
) -> AsyncIterator[StreamEvent]:
import asyncio # noqa: PLC0415
import concurrent.futures # noqa: PLC0415
# Antigravity (Google's proprietary endpoint) doesn't expose a
# cache_control hook. Concatenate the dynamic suffix so its shape
# matches the legacy single-string call site.
if system_dynamic_suffix:
system = f"{system}\n\n{system_dynamic_suffix}" if system else system_dynamic_suffix
loop = asyncio.get_running_loop()
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
+298 -28
View File
@@ -213,9 +213,72 @@ _CACHE_CONTROL_PREFIXES = (
"glm-",
)
# OpenRouter sub-provider prefixes whose upstream API honors `cache_control`.
# OpenRouter passes the marker through to the underlying provider for these.
# (See https://openrouter.ai/docs/guides/best-practices/prompt-caching.)
# OpenAI/DeepSeek/Groq/Grok/Moonshot route through OpenRouter but cache
# automatically server-side — sending cache_control there is a no-op, not a
# win, and they need a separate prefix-stability fix to actually get hits.
_OPENROUTER_CACHE_CONTROL_PREFIXES = (
"openrouter/anthropic/",
"openrouter/google/gemini-",
"openrouter/z-ai/glm",
"openrouter/minimax/",
)
def _model_supports_cache_control(model: str) -> bool:
return any(model.startswith(p) for p in _CACHE_CONTROL_PREFIXES)
if any(model.startswith(p) for p in _CACHE_CONTROL_PREFIXES):
return True
return any(model.startswith(p) for p in _OPENROUTER_CACHE_CONTROL_PREFIXES)
def _build_system_message(
system: str,
system_dynamic_suffix: str | None,
model: str,
) -> dict[str, Any] | None:
"""Construct the system-role message for the chat completion.
Returns ``None`` when there is nothing to send.
Two-block split path used when the caller supplied a non-empty
``system_dynamic_suffix`` AND the provider honors ``cache_control``
(Anthropic, MiniMax, Z-AI/GLM). We emit ``content`` as a list of two
text blocks with an ephemeral ``cache_control`` marker on the first
block only. The prompt cache keeps the static prefix warm across
turns and across iterations within a turn; only the small dynamic
tail is recomputed on every request.
Single-string path used for every other case (no suffix provided,
or provider doesn't honor ``cache_control``). We concatenate
``system`` + ``\\n\\n`` + ``system_dynamic_suffix`` and attach
``cache_control`` to the whole message when the provider supports
it. This is byte-identical to the pre-split behavior for all
non-cache-control providers (OpenAI, Gemini, Groq, Ollama, etc.).
"""
if not system and not system_dynamic_suffix:
return None
if system_dynamic_suffix and _model_supports_cache_control(model):
content_blocks: list[dict[str, Any]] = []
if system:
content_blocks.append(
{
"type": "text",
"text": system,
"cache_control": {"type": "ephemeral"},
}
)
content_blocks.append({"type": "text", "text": system_dynamic_suffix})
return {"role": "system", "content": content_blocks}
# Single-string path (legacy or no-cache-control provider).
combined = system
if system_dynamic_suffix:
combined = f"{system}\n\n{system_dynamic_suffix}" if system else system_dynamic_suffix
sys_msg: dict[str, Any] = {"role": "system", "content": combined}
if _model_supports_cache_control(model):
sys_msg["cache_control"] = {"type": "ephemeral"}
return sys_msg
# Kimi For Coding uses an Anthropic-compatible endpoint (no /v1 suffix).
@@ -297,6 +360,118 @@ FAILED_REQUESTS_DIR = Path.home() / ".hive" / "failed_requests"
MAX_FAILED_REQUEST_DUMPS = 50
def _extract_cost(response: Any, model: str) -> float:
"""Pull the USD cost for a non-streaming completion response.
Sources checked, in priority order:
1. ``usage.cost`` populated when OpenRouter returns native cost via
``usage: {include: true}`` or when ``litellm.include_cost_in_streaming_usage``
is on.
2. ``response._hidden_params["response_cost"]`` set by LiteLLM's
logging layer after most successful completions.
3. ``litellm.completion_cost(...)`` computes from the model pricing
table; works across Anthropic, OpenAI, and OpenRouter as long as the
model is in LiteLLM's catalog.
Returns 0.0 for unpriced models or unexpected response shapes cost is a
display concern, never let it break the hot path. For streaming paths
where the aggregate response isn't a full ``ModelResponse``, use
:func:`_cost_from_tokens` with the already-extracted token counts.
"""
if response is None:
return 0.0
usage = getattr(response, "usage", None)
usage_cost = getattr(usage, "cost", None) if usage is not None else None
if isinstance(usage_cost, (int, float)) and usage_cost > 0:
return float(usage_cost)
hidden = getattr(response, "_hidden_params", None)
if isinstance(hidden, dict):
hp_cost = hidden.get("response_cost")
if isinstance(hp_cost, (int, float)) and hp_cost > 0:
return float(hp_cost)
try:
import litellm as _litellm
computed = _litellm.completion_cost(completion_response=response, model=model)
if isinstance(computed, (int, float)) and computed > 0:
return float(computed)
except Exception as exc:
logger.debug("[cost] completion_cost failed for %s: %s", model, exc)
return 0.0
def _cost_from_tokens(
model: str,
input_tokens: int,
output_tokens: int,
cached_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> float:
"""Compute USD cost from already-normalized token counts.
Used on streaming paths where the aggregate ``response`` is the stream
wrapper (not a full ``ModelResponse``) and ``litellm.completion_cost`` on
it either no-ops or raises. Calls ``litellm.cost_per_token`` directly
with the cache-aware inputs so Anthropic's 5-min-write / cache-read
multipliers are applied correctly.
"""
if not model or (input_tokens == 0 and output_tokens == 0):
return 0.0
try:
import litellm as _litellm
prompt_cost, completion_cost = _litellm.cost_per_token(
model=model,
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cache_read_input_tokens=cached_tokens,
cache_creation_input_tokens=cache_creation_tokens,
)
total = (prompt_cost or 0.0) + (completion_cost or 0.0)
return float(total) if total > 0 else 0.0
except Exception as exc:
logger.debug("[cost] cost_per_token failed for %s: %s", model, exc)
return 0.0
def _extract_cache_tokens(usage: Any) -> tuple[int, int]:
"""Pull (cache_read, cache_creation) from a LiteLLM usage object.
Both are subsets of ``prompt_tokens`` already providers count them
inside the input total. Surface separately for visibility, never sum.
Field names vary by provider/proxy; check the known shapes in priority
order and fall back to 0:
cache_read:
- ``prompt_tokens_details.cached_tokens`` OpenAI-shape; also what
LiteLLM normalizes Anthropic and OpenRouter into.
- ``cache_read_input_tokens`` raw Anthropic field name.
cache_creation:
- ``prompt_tokens_details.cache_write_tokens`` OpenRouter's
normalized field for cache writes (verified empirically against
``openrouter/anthropic/*`` and ``openrouter/z-ai/*`` responses).
- ``cache_creation_input_tokens`` raw Anthropic top-level field.
"""
if not usage:
return 0, 0
_details = getattr(usage, "prompt_tokens_details", None)
cache_read = (
getattr(_details, "cached_tokens", 0) or 0
if _details is not None
else getattr(usage, "cache_read_input_tokens", 0) or 0
)
cache_creation = (
getattr(_details, "cache_write_tokens", 0) or 0
if _details is not None
else 0
) or (getattr(usage, "cache_creation_input_tokens", 0) or 0)
return cache_read, cache_creation
def _estimate_tokens(model: str, messages: list[dict]) -> tuple[int, str]:
"""Estimate token count for messages. Returns (token_count, method)."""
# Try litellm's token counter first
@@ -1015,12 +1190,17 @@ class LiteLLMProvider(LLMProvider):
usage = response.usage
input_tokens = usage.prompt_tokens if usage else 0
output_tokens = usage.completion_tokens if usage else 0
cached_tokens, cache_creation_tokens = _extract_cache_tokens(usage)
cost_usd = _extract_cost(response, self.model)
return LLMResponse(
content=content,
model=response.model or self.model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
stop_reason=response.choices[0].finish_reason or "",
raw_response=response,
)
@@ -1169,8 +1349,16 @@ class LiteLLMProvider(LLMProvider):
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
system_dynamic_suffix: str | None = None,
) -> LLMResponse:
"""Async version of complete(). Uses litellm.acompletion — non-blocking."""
"""Async version of complete(). Uses litellm.acompletion — non-blocking.
``system_dynamic_suffix`` is an optional per-turn tail. When set and
the provider honors ``cache_control``, ``system`` is sent as the
cached prefix and the suffix trails as an uncached second content
block. Otherwise the two strings are concatenated into a single
system message (legacy behavior).
"""
# Codex ChatGPT backend requires streaming — route through stream() which
# already handles Codex quirks and has proper tool call accumulation.
if self._codex_backend:
@@ -1181,6 +1369,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
system_dynamic_suffix=system_dynamic_suffix,
)
return await self._collect_stream_to_response(stream_iter)
@@ -1188,10 +1377,8 @@ class LiteLLMProvider(LLMProvider):
if self._claude_code_oauth:
billing = _claude_code_billing_header(messages)
full_messages.append({"role": "system", "content": billing})
if system:
sys_msg: dict[str, Any] = {"role": "system", "content": system}
if _model_supports_cache_control(self.model):
sys_msg["cache_control"] = {"type": "ephemeral"}
sys_msg = _build_system_message(system, system_dynamic_suffix, self.model)
if sys_msg is not None:
full_messages.append(sys_msg)
full_messages.extend(messages)
@@ -1228,12 +1415,17 @@ class LiteLLMProvider(LLMProvider):
usage = response.usage
input_tokens = usage.prompt_tokens if usage else 0
output_tokens = usage.completion_tokens if usage else 0
cached_tokens, cache_creation_tokens = _extract_cache_tokens(usage)
cost_usd = _extract_cost(response, self.model)
return LLMResponse(
content=content,
model=response.model or self.model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
stop_reason=response.choices[0].finish_reason or "",
raw_response=response,
)
@@ -1619,6 +1811,7 @@ class LiteLLMProvider(LLMProvider):
messages: list[dict[str, Any]],
system: str,
tools: list[Tool],
system_dynamic_suffix: str | None = None,
) -> list[dict[str, Any]]:
"""Build a JSON-only prompt for models without native tool support."""
tool_specs = [
@@ -1646,7 +1839,19 @@ class LiteLLMProvider(LLMProvider):
)
compat_system = compat_instruction if not system else f"{system}\n\n{compat_instruction}"
full_messages: list[dict[str, Any]] = [{"role": "system", "content": compat_system}]
# If the routed sub-provider honors cache_control (e.g.
# openrouter/anthropic/*), split the static prefix from the dynamic
# suffix so the prefix stays cache-warm across turns. Otherwise fall
# back to a single concatenated string.
system_message = _build_system_message(
compat_system,
system_dynamic_suffix,
self.model,
)
full_messages: list[dict[str, Any]] = []
if system_message is not None:
full_messages.append(system_message)
full_messages.extend(messages)
return [
message
@@ -1660,9 +1865,21 @@ class LiteLLMProvider(LLMProvider):
system: str,
tools: list[Tool],
max_tokens: int,
system_dynamic_suffix: str | None = None,
) -> LLMResponse:
"""Emulate tool calling via JSON when OpenRouter rejects native tools."""
full_messages = self._build_openrouter_tool_compat_messages(messages, system, tools)
"""Emulate tool calling via JSON when OpenRouter rejects native tools.
When the routed sub-provider honors ``cache_control`` (e.g.
``openrouter/anthropic/*``), the message builder splits the static
prefix from the dynamic suffix so the prefix stays cache-warm.
Otherwise the suffix is concatenated into a single system string.
"""
full_messages = self._build_openrouter_tool_compat_messages(
messages,
system,
tools,
system_dynamic_suffix=system_dynamic_suffix,
)
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
@@ -1683,6 +1900,8 @@ class LiteLLMProvider(LLMProvider):
usage = response.usage
input_tokens = usage.prompt_tokens if usage else 0
output_tokens = usage.completion_tokens if usage else 0
cached_tokens, cache_creation_tokens = _extract_cache_tokens(usage)
cost_usd = _extract_cost(response, self.model)
stop_reason = "tool_calls" if tool_calls else (response.choices[0].finish_reason or "stop")
return LLMResponse(
@@ -1690,6 +1909,9 @@ class LiteLLMProvider(LLMProvider):
model=response.model or self.model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
stop_reason=stop_reason,
raw_response={
"compat_mode": "openrouter_tool_emulation",
@@ -1704,6 +1926,7 @@ class LiteLLMProvider(LLMProvider):
system: str,
tools: list[Tool],
max_tokens: int,
system_dynamic_suffix: str | None = None,
) -> AsyncIterator[StreamEvent]:
"""Fallback stream for OpenRouter models without native tool support."""
from framework.llm.stream_events import (
@@ -1724,6 +1947,7 @@ class LiteLLMProvider(LLMProvider):
system=system,
tools=tools,
max_tokens=max_tokens,
system_dynamic_suffix=system_dynamic_suffix,
)
except Exception as e:
yield StreamErrorEvent(error=str(e), recoverable=False)
@@ -1747,6 +1971,9 @@ class LiteLLMProvider(LLMProvider):
stop_reason=response.stop_reason,
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
cached_tokens=response.cached_tokens,
cache_creation_tokens=response.cache_creation_tokens,
cost_usd=response.cost_usd,
model=response.model,
)
@@ -1758,6 +1985,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens: int,
response_format: dict[str, Any] | None,
json_mode: bool,
system_dynamic_suffix: str | None = None,
) -> AsyncIterator[StreamEvent]:
"""Fallback path: convert non-stream completion to stream events.
@@ -1781,6 +2009,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
system_dynamic_suffix=system_dynamic_suffix,
)
except Exception as e:
yield StreamErrorEvent(error=str(e), recoverable=False)
@@ -1812,6 +2041,9 @@ class LiteLLMProvider(LLMProvider):
stop_reason=response.stop_reason or "stop",
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
cached_tokens=response.cached_tokens,
cache_creation_tokens=response.cache_creation_tokens,
cost_usd=response.cost_usd,
model=response.model,
)
@@ -1823,6 +2055,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens: int = 4096,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
system_dynamic_suffix: str | None = None,
) -> AsyncIterator[StreamEvent]:
"""Stream a completion via litellm.acompletion(stream=True).
@@ -1833,6 +2066,9 @@ class LiteLLMProvider(LLMProvider):
Empty responses (e.g. Gemini stealth rate-limits that return 200
with no content) are retried with exponential backoff, mirroring
the retry behaviour of ``_completion_with_rate_limit_retry``.
``system_dynamic_suffix`` is an optional per-turn tail. See
``acomplete`` docstring for the two-block split semantics.
"""
from framework.llm.stream_events import (
FinishEvent,
@@ -1852,6 +2088,7 @@ class LiteLLMProvider(LLMProvider):
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
system_dynamic_suffix=system_dynamic_suffix,
):
yield event
return
@@ -1862,6 +2099,7 @@ class LiteLLMProvider(LLMProvider):
system=system,
tools=tools,
max_tokens=max_tokens,
system_dynamic_suffix=system_dynamic_suffix,
):
yield event
return
@@ -1870,10 +2108,8 @@ class LiteLLMProvider(LLMProvider):
if self._claude_code_oauth:
billing = _claude_code_billing_header(messages)
full_messages.append({"role": "system", "content": billing})
if system:
sys_msg: dict[str, Any] = {"role": "system", "content": system}
if _model_supports_cache_control(self.model):
sys_msg["cache_control"] = {"type": "ephemeral"}
sys_msg = _build_system_message(system, system_dynamic_suffix, self.model)
if sys_msg is not None:
full_messages.append(sys_msg)
full_messages.extend(messages)
@@ -2109,37 +2345,46 @@ class LiteLLMProvider(LLMProvider):
type(usage).__name__,
)
cached_tokens = 0
cache_creation_tokens = 0
if usage:
input_tokens = getattr(usage, "prompt_tokens", 0) or 0
output_tokens = getattr(usage, "completion_tokens", 0) or 0
_details = getattr(usage, "prompt_tokens_details", None)
cached_tokens = (
getattr(_details, "cached_tokens", 0) or 0
if _details is not None
else getattr(usage, "cache_read_input_tokens", 0) or 0
)
cached_tokens, cache_creation_tokens = _extract_cache_tokens(usage)
logger.debug(
"[tokens] finish-chunk usage: input=%d output=%d cached=%d model=%s",
"[tokens] finish-chunk usage: input=%d output=%d "
"cached=%d cache_creation=%d model=%s",
input_tokens,
output_tokens,
cached_tokens,
cache_creation_tokens,
self.model,
)
logger.debug(
"[tokens] finish event: input=%d output=%d cached=%d stop=%s model=%s",
"[tokens] finish event: input=%d output=%d cached=%d "
"cache_creation=%d stop=%s model=%s",
input_tokens,
output_tokens,
cached_tokens,
cache_creation_tokens,
choice.finish_reason,
self.model,
)
cost_usd = _cost_from_tokens(
self.model,
input_tokens,
output_tokens,
cached_tokens,
cache_creation_tokens,
)
tail_events.append(
FinishEvent(
stop_reason=choice.finish_reason,
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
model=self.model,
)
)
@@ -2159,19 +2404,36 @@ class LiteLLMProvider(LLMProvider):
_usage = calculate_total_usage(chunks=_chunks)
input_tokens = _usage.prompt_tokens or 0
output_tokens = _usage.completion_tokens or 0
_details = getattr(_usage, "prompt_tokens_details", None)
cached_tokens = (
getattr(_details, "cached_tokens", 0) or 0
if _details is not None
else getattr(_usage, "cache_read_input_tokens", 0) or 0
)
# `calculate_total_usage` aggregates token totals
# but discards `prompt_tokens_details` — which is
# where OpenRouter puts `cached_tokens` and
# `cache_write_tokens`. Recover them directly
# from the most recent chunk that carries usage.
cached_tokens, cache_creation_tokens = 0, 0
for _raw in reversed(_chunks):
_raw_usage = getattr(_raw, "usage", None)
if _raw_usage is None:
continue
_cr, _cc = _extract_cache_tokens(_raw_usage)
if _cr or _cc:
cached_tokens, cache_creation_tokens = _cr, _cc
break
logger.debug(
"[tokens] post-loop chunks fallback: input=%d output=%d cached=%d model=%s",
"[tokens] post-loop chunks fallback: input=%d output=%d "
"cached=%d cache_creation=%d model=%s",
input_tokens,
output_tokens,
cached_tokens,
cache_creation_tokens,
self.model,
)
cost_usd = _cost_from_tokens(
self.model,
input_tokens,
output_tokens,
cached_tokens,
cache_creation_tokens,
)
# Patch the FinishEvent already queued with 0 tokens
for _i, _ev in enumerate(tail_events):
if isinstance(_ev, FinishEvent) and _ev.input_tokens == 0:
@@ -2180,6 +2442,8 @@ class LiteLLMProvider(LLMProvider):
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
cost_usd=cost_usd,
model=_ev.model,
)
break
@@ -2390,6 +2654,8 @@ class LiteLLMProvider(LLMProvider):
tool_calls: list[dict[str, Any]] = []
input_tokens = 0
output_tokens = 0
cached_tokens = 0
cache_creation_tokens = 0
stop_reason = ""
model = self.model
@@ -2407,6 +2673,8 @@ class LiteLLMProvider(LLMProvider):
elif isinstance(event, FinishEvent):
input_tokens = event.input_tokens
output_tokens = event.output_tokens
cached_tokens = event.cached_tokens
cache_creation_tokens = event.cache_creation_tokens
stop_reason = event.stop_reason
if event.model:
model = event.model
@@ -2419,6 +2687,8 @@ class LiteLLMProvider(LLMProvider):
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
cache_creation_tokens=cache_creation_tokens,
stop_reason=stop_reason,
raw_response={"tool_calls": tool_calls} if tool_calls else None,
)
+6
View File
@@ -155,8 +155,11 @@ class MockLLMProvider(LLMProvider):
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
system_dynamic_suffix: str | None = None,
) -> LLMResponse:
"""Async mock completion (no I/O, returns immediately)."""
if system_dynamic_suffix:
system = f"{system}\n\n{system_dynamic_suffix}" if system else system_dynamic_suffix
return self.complete(
messages=messages,
system=system,
@@ -173,6 +176,7 @@ class MockLLMProvider(LLMProvider):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
system_dynamic_suffix: str | None = None,
) -> AsyncIterator[StreamEvent]:
"""Stream a mock completion as word-level TextDeltaEvents.
@@ -180,6 +184,8 @@ class MockLLMProvider(LLMProvider):
TextDeltaEvent with an accumulating snapshot, exercising the full
streaming pipeline without any API calls.
"""
if system_dynamic_suffix:
system = f"{system}\n\n{system_dynamic_suffix}" if system else system_dynamic_suffix
content = self._generate_mock_response(system=system, json_mode=False)
words = content.split(" ")
accumulated = ""
+5 -5
View File
@@ -265,8 +265,8 @@
"max_context_tokens": 240000
},
{
"id": "GLM-5",
"label": "GLM-5 - Via Hive",
"id": "glm-5.1",
"label": "GLM-5.1 - Via Hive",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 180000
@@ -352,7 +352,7 @@
"zai_code": {
"provider": "openai",
"api_key_env_var": "ZAI_API_KEY",
"model": "glm-5",
"model": "glm-5.1",
"max_tokens": 32768,
"max_context_tokens": 180000,
"api_base": "https://api.z.ai/api/coding/paas/v4"
@@ -399,8 +399,8 @@
"recommended": false
},
{
"id": "GLM-5",
"label": "GLM-5",
"id": "glm-5.1",
"label": "glm-5.1",
"recommended": false
}
]
+31 -2
View File
@@ -10,12 +10,24 @@ from typing import Any
@dataclass
class LLMResponse:
"""Response from an LLM call."""
"""Response from an LLM call.
``cached_tokens`` and ``cache_creation_tokens`` are subsets of
``input_tokens`` (providers report them inside ``prompt_tokens``).
Surface them for visibility; do not add to a total.
``cost_usd`` is the per-call USD cost when the provider / pricing table
can produce one (Anthropic, OpenAI, OpenRouter are supported). 0.0 when
unknown or unpriced treat as "unreported", not "free".
"""
content: str
model: str
input_tokens: int = 0
output_tokens: int = 0
cached_tokens: int = 0
cache_creation_tokens: int = 0
cost_usd: float = 0.0
stop_reason: str = ""
raw_response: Any = None
@@ -110,19 +122,28 @@ class LLMProvider(ABC):
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
system_dynamic_suffix: str | None = None,
) -> "LLMResponse":
"""Async version of complete(). Non-blocking on the event loop.
Default implementation offloads the sync complete() to a thread pool.
Subclasses SHOULD override for native async I/O.
``system_dynamic_suffix`` is an optional per-turn tail for providers
that honor ``cache_control`` (see LiteLLMProvider for semantics).
The default implementation concatenates it onto ``system`` since the
sync ``complete()`` path does not support the split.
"""
combined_system = system
if system_dynamic_suffix:
combined_system = f"{system}\n\n{system_dynamic_suffix}" if system else system_dynamic_suffix
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
partial(
self.complete,
messages=messages,
system=system,
system=combined_system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
@@ -137,6 +158,7 @@ class LLMProvider(ABC):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
system_dynamic_suffix: str | None = None,
) -> AsyncIterator["StreamEvent"]:
"""
Stream a completion as an async iterator of StreamEvents.
@@ -147,6 +169,9 @@ class LLMProvider(ABC):
Tool orchestration is the CALLER's responsibility:
- Caller detects ToolCallEvent, executes tool, adds result
to messages, calls stream() again.
``system_dynamic_suffix`` is forwarded to ``acomplete``; see its
docstring for the two-block split semantics.
"""
from framework.llm.stream_events import (
FinishEvent,
@@ -159,6 +184,7 @@ class LLMProvider(ABC):
system=system,
tools=tools,
max_tokens=max_tokens,
system_dynamic_suffix=system_dynamic_suffix,
)
yield TextDeltaEvent(content=response.content, snapshot=response.content)
yield TextEndEvent(full_text=response.content)
@@ -166,6 +192,9 @@ class LLMProvider(ABC):
stop_reason=response.stop_reason,
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
cached_tokens=response.cached_tokens,
cache_creation_tokens=response.cache_creation_tokens,
cost_usd=response.cost_usd,
model=response.model,
)
+11 -1
View File
@@ -65,13 +65,23 @@ class ReasoningDeltaEvent:
@dataclass(frozen=True)
class FinishEvent:
"""The LLM has finished generating."""
"""The LLM has finished generating.
``cached_tokens`` and ``cache_creation_tokens`` are subsets of
``input_tokens`` providers count both inside ``prompt_tokens`` already.
Surface them separately for visibility; never add to a total.
``cost_usd`` is the per-turn USD cost when the provider or LiteLLM's
pricing table supplies one; 0.0 means unreported (not free).
"""
type: Literal["finish"] = "finish"
stop_reason: str = ""
input_tokens: int = 0
output_tokens: int = 0
cached_tokens: int = 0
cache_creation_tokens: int = 0
cost_usd: float = 0.0
model: str = ""
+22 -3
View File
@@ -140,6 +140,25 @@ async def cors_middleware(request: web.Request, handler):
return response
@web.middleware
async def no_cache_api_middleware(request: web.Request, handler):
"""Prevent browsers from caching API responses.
Without this, a one-off bad response (e.g. the SPA catch-all leaking
index.html for an /api/* URL before a route was registered) can get
pinned in the browser's disk cache and replayed forever, since our
JSON handlers don't emit ETag/Last-Modified and browsers fall back
to heuristic freshness.
"""
try:
response = await handler(request)
except web.HTTPException as exc:
response = exc
if request.path.startswith("/api/"):
response.headers["Cache-Control"] = "no-store"
return response
@web.middleware
async def error_middleware(request: web.Request, handler):
"""Catch exceptions and return JSON error responses.
@@ -268,7 +287,7 @@ def create_app(model: str | None = None) -> web.Application:
Returns:
Configured aiohttp Application ready to run.
"""
app = web.Application(middlewares=[cors_middleware, error_middleware])
app = web.Application(middlewares=[cors_middleware, no_cache_api_middleware, error_middleware])
# Initialize credential store (before SessionManager so it can be shared)
from framework.credentials.store import CredentialStore
@@ -325,16 +344,16 @@ def create_app(model: str | None = None) -> web.Application:
app.router.add_get("/api/browser/status/stream", handle_browser_status_stream)
# Register route modules
from framework.server.routes_colony_tools import register_routes as register_colony_tools_routes
from framework.server.routes_colony_workers import register_routes as register_colony_worker_routes
from framework.server.routes_config import register_routes as register_config_routes
from framework.server.routes_credentials import register_routes as register_credential_routes
from framework.server.routes_events import register_routes as register_event_routes
from framework.server.routes_execution import register_routes as register_execution_routes
from framework.server.routes_logs import register_routes as register_log_routes
from framework.server.routes_mcp import register_routes as register_mcp_routes
from framework.server.routes_messages import register_routes as register_message_routes
from framework.server.routes_prompts import register_routes as register_prompt_routes
from framework.server.routes_colony_tools import register_routes as register_colony_tools_routes
from framework.server.routes_mcp import register_routes as register_mcp_routes
from framework.server.routes_queen_tools import register_routes as register_queen_tools_routes
from framework.server.routes_queens import register_routes as register_queen_routes
from framework.server.routes_sessions import register_routes as register_session_routes
+44 -13
View File
@@ -265,9 +265,10 @@ def build_queen_tool_registry_bare() -> tuple[Any, dict[str, list[dict[str, Any]
backend process and cache the result.
"""
from pathlib import Path
import framework.agents.queen as _queen_pkg
from framework.loader.mcp_registry import MCPRegistry
from framework.loader.tool_registry import ToolRegistry
import framework.agents.queen as _queen_pkg
queen_registry = ToolRegistry()
queen_pkg_dir = Path(_queen_pkg.__file__).parent
@@ -302,9 +303,7 @@ def build_queen_tool_registry_bare() -> tuple[Any, dict[str, list[dict[str, Any]
if extra:
try:
extra_configs = reg.resolve_for_agent(include=extra)
registry_configs = list(registry_configs) + [
reg._server_config_to_dict(c) for c in extra_configs
]
registry_configs = list(registry_configs) + [reg._server_config_to_dict(c) for c in extra_configs]
except Exception:
logger.debug("build_queen_tool_registry_bare: resolve_for_agent(extra) failed", exc_info=True)
@@ -548,9 +547,7 @@ async def create_queen(
# ``QueenPhaseState`` only gates MCP tools (lifecycle and synthetic
# tools always pass through). Then apply the queen profile's stored
# allowlist (if any) and memoize the filtered independent tool list.
mcp_server_tools_map: dict[str, set[str]] = dict(
getattr(queen_registry, "_mcp_server_tools", {})
)
mcp_server_tools_map: dict[str, set[str]] = dict(getattr(queen_registry, "_mcp_server_tools", {}))
phase_state.mcp_tool_names_all = set().union(*mcp_server_tools_map.values()) if mcp_server_tools_map else set()
# The queen's MCP tool allowlist now lives in a dedicated
# ``tools.json`` sidecar next to ``profile.yaml``. ``load_queen_tools_config``
@@ -564,10 +561,11 @@ async def create_queen(
# now so ``@server:NAME`` shorthands in the role-default table can
# expand against the just-loaded MCP servers.
_boot_catalog: dict[str, list[dict]] = {
srv: [{"name": name} for name in sorted(names)]
for srv, names in mcp_server_tools_map.items()
srv: [{"name": name} for name in sorted(names)] for srv, names in mcp_server_tools_map.items()
}
phase_state.enabled_mcp_tools = load_queen_tools_config(queen_dir.name, _boot_catalog)
# ``queen_dir`` is ``queens/<queen_id>/sessions/<session_id>``; the
# allowlist sidecar is keyed by queen_id, not session_id.
phase_state.enabled_mcp_tools = load_queen_tools_config(session.queen_name, _boot_catalog)
phase_state.rebuild_independent_filter()
if phase_state.enabled_mcp_tools is not None:
total_mcp = len(phase_state.mcp_tool_names_all)
@@ -734,8 +732,37 @@ async def create_queen(
# ---- Recall on each real user turn --------------------------------
async def _recall_on_user_input(event: AgentEvent) -> None:
"""Re-select memories when real user input arrives."""
await _refresh_recall_cache((event.data or {}).get("content", ""))
"""On real user input, freeze the dynamic system-prompt suffix and
refresh recall memories in the background.
The EventBus drops handlers that exceed 15s, so we MUST return fast.
Recall selection queries the LLM and can take >15s on slow backends;
we fire it off as a background task and re-stamp the suffix when it
completes. The immediate refresh_dynamic_suffix call stamps a fresh
timestamp using the last-known recall blocks so every iteration of
THIS user turn sees a byte-stable prompt (prompt cache hits on the
static block). Phase-change injections and worker-report injections
go through agent_loop.inject_event() and do NOT publish
CLIENT_INPUT_RECEIVED, so this runs exactly once per real user turn.
"""
query = (event.data or {}).get("content", "")
# Immediate: stamp "now" into the frozen suffix, using whatever
# recall blocks we already cached (from the prior turn or seeding).
phase_state.refresh_dynamic_suffix()
async def _bg_refresh() -> None:
try:
await _refresh_recall_cache(query)
# Re-stamp with the fresh recall blocks. Any iteration that
# read the suffix before this point used the older recall
# — acceptable; recall was already eventual-consistency.
phase_state.refresh_dynamic_suffix()
except Exception:
logger.debug("background recall refresh failed", exc_info=True)
import asyncio as _asyncio
_asyncio.create_task(_bg_refresh())
session.event_bus.subscribe(
[EventType.CLIENT_INPUT_RECEIVED],
@@ -845,6 +872,9 @@ async def create_queen(
except Exception:
logger.debug("recall: initial seeding failed", exc_info=True)
# Freeze the dynamic suffix once so the first real turn sends a
# byte-stable prompt even before CLIENT_INPUT_RECEIVED fires.
phase_state.refresh_dynamic_suffix()
return HookResult(system_prompt=phase_state.get_current_prompt())
# ---- Colony preparation -------------------------------------------
@@ -944,7 +974,8 @@ async def create_queen(
stream_id="queen",
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
dynamic_prompt_provider=phase_state.get_static_prompt,
dynamic_prompt_suffix_provider=phase_state.get_dynamic_suffix,
iteration_metadata_provider=lambda: {"phase": phase_state.phase},
skills_catalog_prompt=phase_state.skills_catalog_prompt,
protocols_prompt=phase_state.protocols_prompt,
+35 -10
View File
@@ -248,15 +248,22 @@ async def handle_queen_session(request: web.Request) -> web.Response:
# Skip colony sessions: a colony forked from this queen also carries
# queen_name == queen_id, but it has a worker loaded (colony_id /
# worker_path set) and is the colony's chat, not the queen's DM.
for session in manager.list_sessions():
if session.queen_name == queen_id and session.colony_id is None and session.worker_path is None:
return web.json_response(
{
"session_id": session.id,
"queen_id": queen_id,
"status": "live",
}
)
# When multiple DM sessions for this queen are live at once (e.g. the
# user created a new session, then navigated away and back), return
# the most recently loaded one so we don't resurrect a stale older
# session ahead of a freshly created one.
live_matches = [
s for s in manager.list_sessions() if s.queen_name == queen_id and s.colony_id is None and s.worker_path is None
]
if live_matches:
latest = max(live_matches, key=lambda s: s.loaded_at)
return web.json_response(
{
"session_id": latest.id,
"queen_id": queen_id,
"status": "live",
}
)
# 2. Find the most recent cold session for this queen and resume it.
# IMPORTANT: skip sessions that don't belong in the queen DM:
@@ -378,6 +385,8 @@ async def handle_select_queen_session(request: web.Request) -> web.Response:
async def handle_new_queen_session(request: web.Request) -> web.Response:
"""POST /api/queen/{queen_id}/session/new -- create a fresh queen session."""
from framework.tools.queen_lifecycle_tools import QUEEN_PHASES
queen_id = request.match_info["queen_id"]
manager = request.app["manager"]
@@ -387,9 +396,25 @@ async def handle_new_queen_session(request: web.Request) -> web.Response:
except FileNotFoundError:
return web.json_response({"error": f"Queen '{queen_id}' not found"}, status=404)
body = await request.json() if request.can_read_body else {}
if request.can_read_body:
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "Invalid JSON body"}, status=400)
if not isinstance(body, dict):
return web.json_response({"error": "Request body must be a JSON object"}, status=400)
else:
body = {}
initial_prompt = body.get("initial_prompt")
initial_phase = body.get("initial_phase") or "independent"
if initial_phase not in QUEEN_PHASES:
return web.json_response(
{
"error": f"Invalid initial_phase '{initial_phase}'",
"valid": sorted(QUEEN_PHASES),
},
status=400,
)
session = await manager.create_session(
initial_prompt=initial_prompt,
+28 -1
View File
@@ -122,8 +122,19 @@ async def handle_create_session(request: web.Request) -> web.Response:
(equivalent to the old POST /api/agents). Otherwise creates a queen-only
session that can later have a colony loaded via POST /sessions/{id}/colony.
"""
from framework.agents.queen.queen_profiles import ensure_default_queens, load_queen_profile
from framework.tools.queen_lifecycle_tools import QUEEN_PHASES
manager = _get_manager(request)
body = await request.json() if request.can_read_body else {}
if request.can_read_body:
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "Invalid JSON body"}, status=400)
if not isinstance(body, dict):
return web.json_response({"error": "Request body must be a JSON object"}, status=400)
else:
body = {}
agent_path = body.get("agent_path")
agent_id = body.get("agent_id")
session_id = body.get("session_id")
@@ -134,6 +145,21 @@ async def handle_create_session(request: web.Request) -> web.Response:
initial_phase = body.get("initial_phase")
worker_name = body.get("worker_name")
if initial_phase is not None and initial_phase not in QUEEN_PHASES:
return web.json_response(
{
"error": f"Invalid initial_phase '{initial_phase}'",
"valid": sorted(QUEEN_PHASES),
},
status=400,
)
if queen_name:
ensure_default_queens()
try:
load_queen_profile(queen_name)
except FileNotFoundError:
return web.json_response({"error": f"Queen '{queen_name}' not found"}, status=404)
if agent_path:
try:
agent_path = str(validate_agent_path(agent_path))
@@ -160,6 +186,7 @@ async def handle_create_session(request: web.Request) -> web.Response:
model=model,
initial_prompt=initial_prompt,
queen_resume_from=queen_resume_from,
queen_name=queen_name,
initial_phase=initial_phase,
)
except ValueError as e:
+22 -41
View File
@@ -227,9 +227,7 @@ def _build_admin_manager(
if colony_name:
colony_home = COLONIES_DIR / colony_name
colony_overrides_path = colony_home / "skills_overrides.json"
extras.append(
ExtraScope(directory=colony_home / ".hive" / "skills", label="colony_ui", priority=3)
)
extras.append(ExtraScope(directory=colony_home / ".hive" / "skills", label="colony_ui", priority=3))
cfg = SkillsManagerConfig(
queen_id=queen_id,
queen_overrides_path=queen_overrides_path,
@@ -309,14 +307,20 @@ def _effective_enabled(
queen_store: SkillOverrideStore | None,
colony_store: SkillOverrideStore | None,
) -> bool:
# Colony explicit wins over queen explicit; either explicit wins over
# master switch + default. Keeps the UI's enable/disable toggle simple.
# Mirrors ``SkillsManager._apply_overrides`` so the UI's "enabled" column
# matches what the queen actually sees in her prompt. Colony explicit wins
# over queen explicit; either explicit wins over preset-off-by-default and
# over the ``all_defaults_disabled`` master switch.
for store in (colony_store, queen_store):
if store is None:
continue
entry = store.get(skill.name)
if entry is not None and entry.enabled is not None:
return entry.enabled
# Preset-scope capability packs ship OFF; they only appear in the queen's
# catalog after an explicit per-queen/colony opt-in.
if skill.source_scope == "preset":
return False
for store in (colony_store, queen_store):
if store is not None and store.all_defaults_disabled and skill.source_scope == "framework":
return False
@@ -364,8 +368,7 @@ async def handle_list_queen_skills(request: web.Request) -> web.Response:
mgr = scope.manager
assert mgr is not None
skills = [
_serialize_skill(s, queen_store=scope.store, colony_store=None)
for s in mgr.enumerate_skills_with_source()
_serialize_skill(s, queen_store=scope.store, colony_store=None) for s in mgr.enumerate_skills_with_source()
]
skills.sort(key=lambda r: r["name"])
return web.json_response(
@@ -402,10 +405,7 @@ async def handle_list_colony_skills(request: web.Request) -> web.Response:
)
all_skills = mgr.enumerate_skills_with_source()
rows = [
_serialize_skill(s, queen_store=queen_store, colony_store=scope.store)
for s in all_skills
]
rows = [_serialize_skill(s, queen_store=queen_store, colony_store=scope.store) for s in all_skills]
rows.sort(key=lambda r: r["name"])
inherited = [s.name for s in all_skills if s.source_scope == "queen_ui"]
return web.json_response(
@@ -429,9 +429,7 @@ async def handle_list_all_skills(request: web.Request) -> web.Response:
# Enumerate queens and colonies by walking the standard dirs.
_ensure_queens_known()
queen_ids = (
sorted(p.name for p in QUEENS_DIR.glob("*") if (p / "profile.yaml").exists())
if QUEENS_DIR.is_dir()
else []
sorted(p.name for p in QUEENS_DIR.glob("*") if (p / "profile.yaml").exists()) if QUEENS_DIR.is_dir() else []
)
colony_names: list[str] = []
if COLONIES_DIR.is_dir():
@@ -458,16 +456,12 @@ async def handle_list_all_skills(request: web.Request) -> web.Response:
# Raw discovery (no override filtering) — we apply per-scope stores
# below when computing ``visible_to``.
discovery = SkillDiscovery(
DiscoveryConfig(project_root=None, skip_framework_scope=False, extra_scopes=extras)
)
discovery = SkillDiscovery(DiscoveryConfig(project_root=None, skip_framework_scope=False, extra_scopes=extras))
discovered = discovery.discover()
# Load all stores once.
queen_stores: dict[str, SkillOverrideStore] = {
qid: SkillOverrideStore.load(
QUEENS_DIR / qid / "skills_overrides.json", scope_label=f"queen:{qid}"
)
qid: SkillOverrideStore.load(QUEENS_DIR / qid / "skills_overrides.json", scope_label=f"queen:{qid}")
for qid in queen_ids
}
colony_stores: dict[str, SkillOverrideStore] = {}
@@ -484,6 +478,7 @@ async def handle_list_all_skills(request: web.Request) -> web.Response:
colony_queens[cn] = None
rows: list[dict[str, Any]] = []
# Owner mapping for queen_ui / colony_ui scopes: the dir path encodes
# which queen/colony the skill belongs to.
def _owner_for(skill: ParsedSkill) -> dict[str, str] | None:
@@ -637,9 +632,7 @@ async def _handle_create(scope: SkillScope, payload: dict[str, Any], user_id: st
if err or draft is None:
return web.json_response({"error": err}, status=400)
replace_existing = bool(payload.get("replace_existing", False))
installed, wrote_err, replaced = write_skill(
draft, target_root=scope.write_dir, replace_existing=replace_existing
)
installed, wrote_err, replaced = write_skill(draft, target_root=scope.write_dir, replace_existing=replace_existing)
if wrote_err is not None or installed is None:
status = 409 if "already exists" in (wrote_err or "") else 500
return web.json_response({"error": wrote_err}, status=status)
@@ -698,8 +691,6 @@ async def _handle_patch(scope: SkillScope, skill_name: str, payload: dict[str, A
return web.json_response({"name": name, "enabled": existing.enabled, "ok": True})
async def _handle_put_body(scope: SkillScope, skill_name: str, payload: dict[str, Any]) -> web.Response:
name, err = validate_skill_name(skill_name)
if err or name is None:
@@ -707,9 +698,7 @@ async def _handle_put_body(scope: SkillScope, skill_name: str, payload: dict[str
entry = scope.store.get(name)
provenance = entry.provenance if entry else Provenance.FRAMEWORK
if provenance not in _EDITABLE_PROVENANCE:
return web.json_response(
{"error": f"skill '{name}' is not editable (provenance={provenance})"}, status=403
)
return web.json_response({"error": f"skill '{name}' is not editable (provenance={provenance})"}, status=403)
description = payload.get("description")
body = payload.get("body")
if not isinstance(body, str) or not body.strip():
@@ -746,9 +735,7 @@ async def _handle_delete(scope: SkillScope, skill_name: str) -> web.Response:
entry = scope.store.get(name)
provenance = entry.provenance if entry else None
if provenance is not None and provenance not in _EDITABLE_PROVENANCE:
return web.json_response(
{"error": f"skill '{name}' is not deletable (provenance={provenance})"}, status=403
)
return web.json_response({"error": f"skill '{name}' is not deletable (provenance={provenance})"}, status=403)
removed, rerr = authoring_remove_skill(scope.write_dir, name)
if rerr:
return web.json_response({"error": rerr}, status=500)
@@ -915,9 +902,7 @@ async def handle_upload_skill(request: web.Request) -> web.Response:
break
buf.write(chunk)
if buf.tell() > _MAX_UPLOAD_BYTES:
return web.json_response(
{"error": f"upload exceeds {_MAX_UPLOAD_BYTES} bytes"}, status=413
)
return web.json_response({"error": f"upload exceeds {_MAX_UPLOAD_BYTES} bytes"}, status=413)
upload_bytes = buf.getvalue()
upload_filename = part.filename or ""
else:
@@ -962,9 +947,7 @@ async def handle_upload_skill(request: web.Request) -> web.Response:
# Extract into a draft
draft: Any
if upload_bytes.startswith(_ZIP_MAGIC) or (upload_filename or "").endswith(".zip"):
draft_name, draft_desc, draft_body, draft_files, err = _extract_from_zip(
upload_bytes, name_hint=name_override
)
draft_name, draft_desc, draft_body, draft_files, err = _extract_from_zip(upload_bytes, name_hint=name_override)
if err:
return web.json_response({"error": err}, status=400)
else:
@@ -983,9 +966,7 @@ async def handle_upload_skill(request: web.Request) -> web.Response:
if derr or draft is None:
return web.json_response({"error": derr}, status=400)
installed, werr, replaced = write_skill(
draft, target_root=write_dir, replace_existing=replace_existing
)
installed, werr, replaced = write_skill(draft, target_root=write_dir, replace_existing=replace_existing)
if werr or installed is None:
status = 409 if "already exists" in (werr or "") else 500
return web.json_response({"error": werr}, status=status)
@@ -1086,7 +1067,7 @@ def _extract_from_zip(
continue
if not entry_name.startswith(root_prefix):
continue
rel = entry_name[len(root_prefix):]
rel = entry_name[len(root_prefix) :]
if not rel or rel == "SKILL.md":
continue
content_bytes = z.read(entry_name)
+21 -2
View File
@@ -1223,8 +1223,27 @@ class SessionManager:
logger.info("Session '%s': shutdown reflection spawned", session_id)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
except Exception:
logger.warning("Session '%s': failed to spawn shutdown reflection", session_id, exc_info=True)
except RuntimeError as exc:
# Most common when a session is stopped after the event loop
# has closed (e.g. during server shutdown or from an atexit
# handler). The reflection would have had nothing to write
# anyway — no new turns since the last periodic reflection.
logger.warning(
"Session '%s': shutdown reflection skipped — event loop unavailable (%s). "
"Normal during server shutdown; anything worth persisting was saved by the "
"periodic reflection after the last turn.",
session_id,
exc,
)
except Exception as exc:
logger.warning(
"Session '%s': failed to spawn shutdown reflection: %s: %s. "
"Check that queen_dir exists and session.llm is configured; full traceback follows.",
session_id,
type(exc).__name__,
exc,
exc_info=True,
)
if session.queen_task is not None:
session.queen_task.cancel()
@@ -164,9 +164,7 @@ async def test_patch_persists_and_validates(colony_dir):
metadata_path = colonies_dir / name / "metadata.json"
async with TestClient(TestServer(app)) as client:
resp = await client.patch(
f"/api/colony/{name}/tools", json={"enabled_mcp_tools": ["read_file"]}
)
resp = await client.patch(f"/api/colony/{name}/tools", json={"enabled_mcp_tools": ["read_file"]})
assert resp.status == 200
body = await resp.json()
assert body["enabled_mcp_tools"] == ["read_file"]
@@ -186,9 +184,7 @@ async def test_patch_persists_and_validates(colony_dir):
assert tools["write_file"]["enabled"] is False
# Unknown → 400
resp = await client.patch(
f"/api/colony/{name}/tools", json={"enabled_mcp_tools": ["ghost"]}
)
resp = await client.patch(f"/api/colony/{name}/tools", json={"enabled_mcp_tools": ["ghost"]})
assert resp.status == 400
assert "ghost" in (await resp.json()).get("unknown", [])
@@ -214,9 +210,7 @@ async def test_patch_refreshes_live_runtime(colony_dir):
app = await _app(manager)
async with TestClient(TestServer(app)) as client:
resp = await client.patch(
f"/api/colony/{name}/tools", json={"enabled_mcp_tools": ["read_file"]}
)
resp = await client.patch(f"/api/colony/{name}/tools", json={"enabled_mcp_tools": ["read_file"]})
assert resp.status == 200
body = await resp.json()
assert body["refreshed_runtimes"] == 1
@@ -230,9 +224,7 @@ async def test_404_for_unknown_colony(colony_dir):
async with TestClient(TestServer(app)) as client:
resp = await client.get("/api/colony/unknown/tools")
assert resp.status == 404
resp = await client.patch(
"/api/colony/unknown/tools", json={"enabled_mcp_tools": None}
)
resp = await client.patch("/api/colony/unknown/tools", json={"enabled_mcp_tools": None})
assert resp.status == 404
@@ -8,7 +8,6 @@ or spawning actual subprocesses.
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock
import pytest
from aiohttp import web
@@ -14,7 +14,6 @@ from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
@@ -27,7 +26,6 @@ from framework.llm.provider import Tool
from framework.server import routes_queen_tools
from framework.tools.queen_lifecycle_tools import QueenPhaseState
# ---------------------------------------------------------------------------
# QueenPhaseState filter — pure unit tests
# ---------------------------------------------------------------------------
@@ -157,9 +155,7 @@ async def test_get_tools_default_allows_everything_for_unknown_queen(queen_dir,
# the fallback-to-allow-all path.
custom_id = "queen_custom_unknown"
(queens_dir / custom_id).mkdir()
(queens_dir / custom_id / "profile.yaml").write_text(
yaml.safe_dump({"name": "Custom", "title": "Custom Role"})
)
(queens_dir / custom_id / "profile.yaml").write_text(yaml.safe_dump({"name": "Custom", "title": "Custom Role"}))
manager = _FakeManager()
manager._mcp_tool_catalog = {
@@ -285,9 +281,7 @@ async def test_patch_persists_and_validates(queen_dir, monkeypatch):
assert servers["write_file"]["enabled"] is False
# Null resets
resp = await client.patch(
f"/api/queen/{queen_id}/tools", json={"enabled_mcp_tools": None}
)
resp = await client.patch(f"/api/queen/{queen_id}/tools", json={"enabled_mcp_tools": None})
assert resp.status == 200
body = await resp.json()
assert body["enabled_mcp_tools"] is None
+1 -2
View File
@@ -128,8 +128,7 @@ def build_draft(
body = skill_body if isinstance(skill_body, str) else ""
if not body.strip():
return None, (
"skill_body is required — the operational procedure the "
"colony worker needs to run this job unattended"
"skill_body is required — the operational procedure the colony worker needs to run this job unattended"
)
files, err = validate_files(skill_files)
if err or files is None:
+1 -3
View File
@@ -189,9 +189,7 @@ class SkillsManager:
self._queen_overrides = queen_store
self._colony_overrides = colony_store
self._watched_files = [
str(p)
for p in (self._config.queen_overrides_path, self._config.colony_overrides_path)
if p is not None
str(p) for p in (self._config.queen_overrides_path, self._config.colony_overrides_path) if p is not None
]
# 1c. Apply override filtering. Colony entries take precedence over
+85 -10
View File
@@ -116,6 +116,9 @@ class WorkerSessionAdapter:
worker_path: Path | None = None
QUEEN_PHASES: frozenset[str] = frozenset({"independent", "incubating", "working", "reviewing"})
@dataclass
class QueenPhaseState:
"""Mutable state container for queen operating phase.
@@ -131,7 +134,7 @@ class QueenPhaseState:
that trigger phase transitions.
"""
phase: str = "independent" # "independent", "incubating", "working", or "reviewing"
phase: str = "independent" # one of QUEEN_PHASES
independent_tools: list = field(default_factory=list) # list[Tool]
incubating_tools: list = field(default_factory=list) # list[Tool]
working_tools: list = field(default_factory=list) # list[Tool]
@@ -182,6 +185,11 @@ class QueenPhaseState:
# Cached recall blocks — populated async by recall_selector after each turn.
_cached_global_recall_block: str = ""
_cached_queen_recall_block: str = ""
# Cached dynamic system-prompt suffix — frozen at user-turn boundaries so
# AgentLoop iterations within a single turn send a byte-stable prompt and
# Anthropic's prompt cache keeps the static block warm. Rebuilt by
# refresh_dynamic_suffix() on CLIENT_INPUT_RECEIVED and on phase change.
_cached_dynamic_suffix: str = ""
# Memory directories.
global_memory_dir: Path | None = None
queen_memory_dir: Path | None = None
@@ -235,11 +243,28 @@ class QueenPhaseState:
self._filtered_independent_tools = list(self.independent_tools)
return
allowed = set(self.enabled_mcp_tools)
# If ``mcp_tool_names_all`` is empty, every tool falls through the
# "not in mcp_tool_names_all" branch below and the allowlist is
# silently ignored. That's a fail-open bug (the symptom: a
# role-restricted queen sees every MCP tool). Log a warning so the
# upstream cause is visible next time it happens.
if not self.mcp_tool_names_all:
logger.warning(
"rebuild_independent_filter: mcp_tool_names_all is empty but "
"allowlist has %d entries — allowlist cannot be applied. "
"Check that queen boot populated phase_state.mcp_tool_names_all.",
len(allowed),
)
self._filtered_independent_tools = [
t
for t in self.independent_tools
if t.name not in self.mcp_tool_names_all or t.name in allowed
t for t in self.independent_tools if t.name not in self.mcp_tool_names_all or t.name in allowed
]
logger.info(
"rebuild_independent_filter: allowlist=%d, mcp_names=%d, independent=%d -> filtered=%d",
len(allowed),
len(self.mcp_tool_names_all),
len(self.independent_tools),
len(self._filtered_independent_tools),
)
def get_current_tools(self) -> list:
"""Return tools for the current phase."""
@@ -258,8 +283,20 @@ class QueenPhaseState:
self.rebuild_independent_filter()
return self._filtered_independent_tools
def get_current_prompt(self) -> str:
"""Return the system prompt for the current phase."""
def get_static_prompt(self) -> str:
"""Return the stable portion of the system prompt for the current phase.
Includes identity, phase-role prompt, connected-integrations block,
skills catalog, and default skill protocols. These change only on
phase transition, queen identity selection, or when the user adds/
removes an integration rare events. Designed to be byte-stable
across AgentLoop iterations within a single user turn so that
Anthropic's prompt cache keeps this block warm.
The dynamic tail (recall + timestamp) is returned separately by
``get_dynamic_suffix()``; the LLM wrapper emits them as two system
content blocks with a cache breakpoint between them.
"""
if self.phase == "working":
base = self.prompt_working
elif self.phase == "reviewing":
@@ -293,11 +330,51 @@ class QueenPhaseState:
parts.append(catalog_prompt)
if self.protocols_prompt:
parts.append(self.protocols_prompt)
return "\n\n".join(parts)
def refresh_dynamic_suffix(self) -> str:
"""Rebuild and cache the dynamic system-prompt suffix.
The suffix contains recall blocks only. Called from the
CLIENT_INPUT_RECEIVED subscriber so the suffix is byte-stable across
every AgentLoop iteration within a single user turn.
Timestamps used to live here too; they were moved into the
conversation itself as a ``[YYYY-MM-DD HH:MM TZ]`` prefix on each
injected event (see ``drain_injection_queue``) so they ride on
byte-stable conversation history instead of busting the
per-turn system-prompt cache tail.
"""
parts: list[str] = []
if self._cached_global_recall_block:
parts.append(self._cached_global_recall_block)
if self._cached_queen_recall_block:
parts.append(self._cached_queen_recall_block)
return "\n\n".join(parts)
self._cached_dynamic_suffix = "\n\n".join(parts)
return self._cached_dynamic_suffix
def get_dynamic_suffix(self) -> str:
"""Return the cached dynamic system-prompt suffix.
Lazily populates on first call so callers don't have to know about
the refresh lifecycle. Subsequent calls return the cached string
until ``refresh_dynamic_suffix()`` is invoked again.
"""
if not self._cached_dynamic_suffix:
self.refresh_dynamic_suffix()
return self._cached_dynamic_suffix
def get_current_prompt(self) -> str:
"""Return the concatenated system prompt (static + dynamic).
Retained for backward compatibility and for callers that want one
string (conversation persistence, debug dumps). The AgentLoop sends
the two pieces separately to the LLM so the cache can break between
them see ``get_static_prompt()`` / ``get_dynamic_suffix()``.
"""
static = self.get_static_prompt()
dynamic = self.get_dynamic_suffix()
return f"{static}\n\n{dynamic}" if dynamic else static
async def _emit_phase_event(self) -> None:
"""Publish a QUEEN_PHASE_CHANGED event so the frontend updates the tag."""
@@ -1632,9 +1709,7 @@ def register_queen_lifecycle_tools(
queen_overrides_path = QUEENS_DIR / queen_id / "skills_overrides.json"
if queen_overrides_path.exists():
queen_store = SkillOverrideStore.load(
queen_overrides_path, scope_label=f"queen:{queen_id}"
)
queen_store = SkillOverrideStore.load(queen_overrides_path, scope_label=f"queen:{queen_id}")
# Shallow clone: queen's explicit toggles + master switch
# become the colony's starting state. Tombstones propagate
# so a queen-deleted UI skill doesn't resurrect here.
+40 -7
View File
@@ -151,8 +151,11 @@ interface ChatPanelProps {
onStartNewSession?: () => void;
/** When true, disable the start-new-session button (request in flight). */
startingNewSession?: boolean;
/** Cumulative LLM token usage for this session */
tokenUsage?: { input: number; output: number };
/** Cumulative LLM token usage for this session.
* `cached` (cache reads) and `cacheCreated` (cache writes) are subsets of
* `input` — providers count both inside prompt_tokens. Display them
* separately; do not add to a total. */
tokenUsage?: { input: number; output: number; cached?: number; cacheCreated?: number; costUsd?: number };
/** Optional action element rendered on the right side of the "Conversation" header */
headerAction?: React.ReactNode;
}
@@ -1482,11 +1485,41 @@ export default function ChatPanel({
Context: {fmt(queenUsage.estimatedTokens)}/{fmt(queenUsage.maxTokens)}
</span>
)}
{hasTokens && (
<span title="LLM tokens used this session (input + output)">
Tokens: {fmt(tokenUsage!.input + tokenUsage!.output)}
</span>
)}
{hasTokens && (() => {
const cached = tokenUsage!.cached ?? 0;
const created = tokenUsage!.cacheCreated ?? 0;
const cost = tokenUsage!.costUsd ?? 0;
// cached/created are subsets of input — never sum; surface separately.
// Cost can be < $0.01; show 4 decimals so small-model sessions aren't "$0.00".
const costStr = cost > 0 ? `$${cost.toFixed(4)}` : "—";
return (
<span className="group relative cursor-help transition-colors hover:text-muted-foreground">
Tokens: {fmt(tokenUsage!.output)}
<span
role="tooltip"
className="pointer-events-none invisible absolute bottom-full right-0 z-50 mb-2 whitespace-nowrap rounded-md border border-border bg-popover px-3 py-2 text-[11px] text-popover-foreground opacity-0 shadow-lg transition-[opacity,transform] duration-150 translate-y-1 group-hover:visible group-hover:opacity-100 group-hover:translate-y-0"
>
<span className="mb-1.5 block text-muted-foreground">
LLM tokens used this session
</span>
<span className="grid grid-cols-[auto_1fr] gap-x-4 gap-y-0.5 tabular-nums">
<span>Input</span>
<span className="text-right">{fmt(tokenUsage!.input)}</span>
<span className="pl-3 text-muted-foreground">cache read</span>
<span className="text-right text-muted-foreground">{fmt(cached)}</span>
<span className="pl-3 text-muted-foreground">cache write</span>
<span className="text-right text-muted-foreground">{fmt(created)}</span>
<span>Output</span>
<span className="text-right">{fmt(tokenUsage!.output)}</span>
<span className="mt-1 border-t border-border/50 pt-1">Cost</span>
<span className="mt-1 border-t border-border/50 pt-1 text-right font-medium">
{costStr}
</span>
</span>
</span>
</span>
);
})()}
</div>
);
})()}
+81 -4
View File
@@ -23,6 +23,32 @@ import { getQueenForAgent, slugToColonyId } from "@/lib/colony-registry";
const makeId = () => Math.random().toString(36).slice(2, 9);
// Remembers the last session the user had open in each queen DM so that
// navigating away (e.g. to another queen) and back lands on the session
// they were just in, instead of whichever session the server picks.
const lastSessionKey = (queenId: string) => `hive:queen:${queenId}:lastSession`;
const readLastSession = (queenId: string): string | null => {
try {
return localStorage.getItem(lastSessionKey(queenId));
} catch {
return null;
}
};
const writeLastSession = (queenId: string, sessionId: string) => {
try {
localStorage.setItem(lastSessionKey(queenId), sessionId);
} catch {
/* storage disabled/full — best-effort */
}
};
const clearLastSession = (queenId: string) => {
try {
localStorage.removeItem(lastSessionKey(queenId));
} catch {
/* ignore */
}
};
export default function QueenDM() {
const { queenId } = useParams<{ queenId: string }>();
const [searchParams, setSearchParams] = useSearchParams();
@@ -45,7 +71,17 @@ export default function QueenDM() {
{ id: string; prompt: string; options?: string[] }[] | null
>(null);
const [awaitingInput, setAwaitingInput] = useState(false);
const [tokenUsage, setTokenUsage] = useState({ input: 0, output: 0 });
// `cached` and `cacheCreated` are subsets of `input` (providers count both
// inside prompt_tokens already) — display them, never add them to a total.
// `costUsd` is the session-total USD cost when the provider supplies one
// (Anthropic, OpenAI, OpenRouter); 0 means unreported, not free.
const [tokenUsage, setTokenUsage] = useState({
input: 0,
output: 0,
cached: 0,
cacheCreated: 0,
costUsd: 0,
});
const [historySessions, setHistorySessions] = useState<HistorySession[]>([]);
const [historyLoading, setHistoryLoading] = useState(false);
const [switchingSessionId, setSwitchingSessionId] = useState<string | null>(
@@ -92,7 +128,7 @@ export default function QueenDM() {
setPendingQuestions(null);
setAwaitingInput(false);
setQueenPhase("independent");
setTokenUsage({ input: 0, output: 0 });
setTokenUsage({ input: 0, output: 0, cached: 0, cacheCreated: 0, costUsd: 0 });
setInitialDraft(null);
setColonySpawned(false);
setSpawnedColonyName(null);
@@ -199,6 +235,19 @@ export default function QueenDM() {
useEffect(() => {
if (!queenId) return;
// If we arrived without an explicit session in the URL and aren't
// bootstrapping a new one, redirect to the last session the user had
// open for this queen. Session IDs are always of the form
// "session_<timestamp>_<hex>", so we gate on that prefix to avoid
// redirecting to anything unexpected that landed in storage.
if (!selectedSessionParam && newSessionFlag !== "1") {
const stored = readLastSession(queenId);
if (stored && stored.startsWith("session_")) {
setSearchParams({ session: stored }, { replace: true });
return;
}
}
resetViewState();
setLoading(true);
@@ -314,7 +363,17 @@ export default function QueenDM() {
await restoreMessages(sid, () => cancelled);
refresh();
} catch {
// Session creation failed
// Session creation/selection failed. If the URL param came from
// our own localStorage restore, the stored session is stale (e.g.
// deleted on disk) — clear it so the next navigation falls
// through to getOrCreate instead of looping on the bad id.
if (
queenId &&
selectedSessionParam &&
selectedSessionParam === readLastSession(queenId)
) {
clearLastSession(queenId);
}
} finally {
if (!cancelled) {
setLoading(false);
@@ -337,6 +396,13 @@ export default function QueenDM() {
setSearchParams,
]);
// Remember the session the user is currently viewing so switching queens
// and coming back lands on it instead of whatever the server picks.
useEffect(() => {
if (!queenId || !sessionId) return;
writeLastSession(queenId, sessionId);
}, [queenId, sessionId]);
useEffect(() => {
if (!queenId) return;
let cancelled = false;
@@ -520,7 +586,18 @@ export default function QueenDM() {
if (event.data) {
const inp = (event.data.input_tokens as number) || 0;
const out = (event.data.output_tokens as number) || 0;
setTokenUsage((prev) => ({ input: prev.input + inp, output: prev.output + out }));
// cached / cache_creation are subsets of input — accumulate
// separately for display, do NOT roll into input/total.
const cached = (event.data.cached_tokens as number) || 0;
const cacheCreated = (event.data.cache_creation_tokens as number) || 0;
const costUsd = (event.data.cost_usd as number) || 0;
setTokenUsage((prev) => ({
input: prev.input + inp,
output: prev.output + out,
cached: prev.cached + cached,
cacheCreated: prev.cacheCreated + cacheCreated,
costUsd: prev.costUsd + costUsd,
}));
}
// Flush one queued message per LLM turn boundary. This is the
// real "turn ended" signal in a queen DM — execution_completed
+12
View File
@@ -11,6 +11,7 @@ needs and run everything against a temp directory.
from __future__ import annotations
import asyncio
import importlib
import json
from pathlib import Path
@@ -278,6 +279,17 @@ async def test_colony_spawn_creates_correct_artifacts(tmp_path, monkeypatch):
assert resp.status == 200, await resp.text()
body = await resp.json()
# fork_session_into_colony schedules the compaction + worker-storage
# copy onto _BACKGROUND_FORK_TASKS and returns. In prod the colony-
# open path blocks on compaction_status.await_completion; the test
# skips that step, so drain the bg tasks here before asserting on
# the artifacts they produce (otherwise the worker-storage check is
# a race that flakes under CI load).
from framework.server.routes_execution import _BACKGROUND_FORK_TASKS
if _BACKGROUND_FORK_TASKS:
await asyncio.gather(*list(_BACKGROUND_FORK_TASKS), return_exceptions=True)
colony_session_id = body["queen_session_id"]
assert body["colony_name"] == "honeycomb"
assert body["is_new"] is True
+2 -1
View File
@@ -63,6 +63,7 @@ class MockStreamingLLM(LLMProvider):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
**kwargs,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
@@ -311,7 +312,7 @@ class TestReportToParent:
model: str = "mock"
stream_calls: list[dict] = []
async def stream(self, messages, system="", tools=None, max_tokens=4096):
async def stream(self, messages, system="", tools=None, max_tokens=4096, **kwargs):
self.stream_calls.append({"messages": messages})
raise RuntimeError("boom — simulated LLM crash")
yield # pragma: no cover — make this an async generator
+3 -9
View File
@@ -164,9 +164,7 @@ async def test_happy_path_emits_colony_created_event(patched_home: Path, patched
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_colony_inherits_queen_override_state(
patched_home: Path, patched_fork: list[dict]
) -> None:
async def test_colony_inherits_queen_override_state(patched_home: Path, patched_fork: list[dict]) -> None:
"""Seed the colony's skills_overrides.json from the queen's at fork
time. A queen who enabled a preset (e.g. hive.x-automation) before
calling create_colony must produce a colony that also has it
@@ -204,9 +202,7 @@ async def test_colony_inherits_queen_override_state(
)
assert payload.get("status") == "created", f"Tool error: {payload}"
colony_overrides = (
patched_home / ".hive" / "colonies" / "inheritance_check" / "skills_overrides.json"
)
colony_overrides = patched_home / ".hive" / "colonies" / "inheritance_check" / "skills_overrides.json"
cstore = SkillOverrideStore.load(colony_overrides)
# Inherited entries from the queen:
@@ -268,9 +264,7 @@ async def test_happy_path_materializes_skill_under_colony_dir(patched_home: Path
# display it as queen-authored + editable.
from framework.skills.overrides import Provenance, SkillOverrideStore
overrides_path = (
patched_home / ".hive" / "colonies" / "honeycomb_research" / "skills_overrides.json"
)
overrides_path = patched_home / ".hive" / "colonies" / "honeycomb_research" / "skills_overrides.json"
assert overrides_path.exists(), "create_colony should write a skills_overrides.json ledger"
store = SkillOverrideStore.load(overrides_path)
entry = store.get("honeycomb-api-protocol")
+8
View File
@@ -812,6 +812,9 @@ class TestConveniencePublishers:
model="claude-sonnet-4-20250514",
input_tokens=100,
output_tokens=50,
cached_tokens=30,
cache_creation_tokens=10,
cost_usd=0.0042,
execution_id="exec_1",
iteration=3,
)
@@ -822,6 +825,11 @@ class TestConveniencePublishers:
assert received[0].data["model"] == "claude-sonnet-4-20250514"
assert received[0].data["input_tokens"] == 100
assert received[0].data["output_tokens"] == 50
# cached / cache_creation are subsets of input — propagated for
# display, NOT additive to input_tokens.
assert received[0].data["cached_tokens"] == 30
assert received[0].data["cache_creation_tokens"] == 10
assert received[0].data["cost_usd"] == 0.0042
assert received[0].data["iteration"] == 3
@pytest.mark.asyncio
+5 -4
View File
@@ -53,6 +53,7 @@ class MockStreamingLLM(LLMProvider):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
**kwargs,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if not self.scenarios:
@@ -1079,7 +1080,7 @@ class ErrorThenSuccessLLM(LLMProvider):
self.success_scenario = success_scenario
self._call_index = 0
async def stream(self, messages, system="", tools=None, max_tokens=4096):
async def stream(self, messages, system="", tools=None, max_tokens=4096, **kwargs):
call_num = self._call_index
self._call_index += 1
if call_num < self.fail_count:
@@ -1201,7 +1202,7 @@ class TestTransientErrorRetry:
class StreamErrorThenSuccessLLM(LLMProvider):
model: str = "mock"
async def stream(self, messages, system="", tools=None, max_tokens=4096):
async def stream(self, messages, system="", tools=None, max_tokens=4096, **kwargs):
nonlocal call_index
idx = call_index
call_index += 1
@@ -1390,7 +1391,7 @@ class ToolRepeatLLM(LLMProvider):
self.final_text = final_text
self._call_index = 0
async def stream(self, messages, system="", tools=None, max_tokens=4096):
async def stream(self, messages, system="", tools=None, max_tokens=4096, **kwargs):
idx = self._call_index
self._call_index += 1
# Which outer iteration we're in (2 calls per iteration)
@@ -1999,7 +2000,7 @@ class TestToolConcurrencyPartition:
def __init__(self):
self._calls = 0
async def stream(self, messages, system="", tools=None, max_tokens=4096):
async def stream(self, messages, system="", tools=None, max_tokens=4096, **kwargs):
self._calls += 1
if self._calls == 1:
# Emit the tool call, stall, then finish.
+443
View File
@@ -23,9 +23,14 @@ from framework.llm.anthropic import AnthropicProvider
from framework.llm.litellm import (
OPENROUTER_TOOL_COMPAT_MODEL_CACHE,
LiteLLMProvider,
_build_system_message,
_compute_retry_delay,
_cost_from_tokens,
_ensure_ollama_chat_prefix,
_extract_cache_tokens,
_extract_cost,
_is_ollama_model,
_model_supports_cache_control,
_summarize_request_for_log,
)
from framework.llm.provider import LLMProvider, LLMResponse, Tool
@@ -1192,3 +1197,441 @@ class TestGetLlmExtraKwargsOllama:
with patch("framework.config.get_hive_config", return_value={}):
result = get_llm_extra_kwargs()
assert result == {}
class TestModelSupportsCacheControl:
"""`cache_control` allowlist covers native providers AND OpenRouter sub-providers
whose upstream API honors the marker (Anthropic, Gemini, GLM, MiniMax).
Auto-cache sub-providers (OpenAI, DeepSeek, Grok, Moonshot, Groq) are
intentionally excluded: sending cache_control is a no-op and a false win."""
@pytest.mark.parametrize(
"model",
[
"anthropic/claude-opus-4-5",
"claude-3-5-sonnet-20241022",
"minimax/minimax-text-01",
"MiniMax-Text-01",
"zai-glm-4.6",
"glm-4.6",
"openrouter/anthropic/claude-opus-4.5",
"openrouter/anthropic/claude-sonnet-4.5",
"openrouter/google/gemini-2.5-pro",
"openrouter/google/gemini-2.5-flash",
"openrouter/z-ai/glm-5.1",
"openrouter/z-ai/glm-4.6",
"openrouter/minimax/minimax-text-01",
],
)
def test_supported(self, model):
assert _model_supports_cache_control(model) is True
@pytest.mark.parametrize(
"model",
[
"gpt-4o-mini",
"gemini/gemini-1.5-flash",
"ollama_chat/llama3",
"openrouter/openai/gpt-4o",
"openrouter/deepseek/deepseek-chat",
"openrouter/x-ai/grok-2",
"openrouter/moonshotai/kimi-k2",
"openrouter/liquid/lfm-2.5-1.2b-thinking:free",
],
)
def test_unsupported(self, model):
assert _model_supports_cache_control(model) is False
class TestBuildSystemMessageOpenRouter:
"""`_build_system_message` should split static/dynamic blocks whenever
the model native OR OpenRouter-routed supports cache_control."""
def test_openrouter_anthropic_splits_into_two_blocks(self):
msg = _build_system_message(
system="static prefix",
system_dynamic_suffix="dynamic tail",
model="openrouter/anthropic/claude-opus-4.5",
)
assert msg == {
"role": "system",
"content": [
{
"type": "text",
"text": "static prefix",
"cache_control": {"type": "ephemeral"},
},
{"type": "text", "text": "dynamic tail"},
],
}
def test_openrouter_gemini_splits_into_two_blocks(self):
msg = _build_system_message(
system="static prefix",
system_dynamic_suffix="dynamic tail",
model="openrouter/google/gemini-2.5-pro",
)
assert isinstance(msg["content"], list)
assert msg["content"][0]["cache_control"] == {"type": "ephemeral"}
assert msg["content"][1] == {"type": "text", "text": "dynamic tail"}
def test_openrouter_glm_splits_into_two_blocks(self):
msg = _build_system_message(
system="static prefix",
system_dynamic_suffix="dynamic tail",
model="openrouter/z-ai/glm-5.1",
)
assert isinstance(msg["content"], list)
assert msg["content"][0]["cache_control"] == {"type": "ephemeral"}
def test_openrouter_openai_stays_concatenated(self):
"""OpenAI via OpenRouter auto-caches; sending cache_control is a no-op."""
msg = _build_system_message(
system="static prefix",
system_dynamic_suffix="dynamic tail",
model="openrouter/openai/gpt-4o",
)
assert msg == {
"role": "system",
"content": "static prefix\n\ndynamic tail",
}
def test_no_suffix_anthropic_gets_top_level_cache_control(self):
msg = _build_system_message(
system="static prefix",
system_dynamic_suffix=None,
model="openrouter/anthropic/claude-opus-4.5",
)
assert msg == {
"role": "system",
"content": "static prefix",
"cache_control": {"type": "ephemeral"},
}
class TestOpenRouterToolCompatCacheControl:
"""Tool-compat path must pass cache_control through when the routed
sub-provider honors it. Before this, the queen persona+tool-list prefix
was recomputed every turn on Anthropic/GLM via OpenRouter."""
def test_tool_compat_messages_split_for_cache_capable_model(self):
provider = LiteLLMProvider(
model="openrouter/anthropic/claude-opus-4.5",
api_key="test-key",
)
tools = [
Tool(
name="web_search",
description="Search the web",
parameters={"properties": {"query": {"type": "string"}}, "required": ["query"]},
)
]
full_messages = provider._build_openrouter_tool_compat_messages(
messages=[{"role": "user", "content": "hi"}],
system="You are a queen.",
tools=tools,
system_dynamic_suffix="Current time: 2026-04-23T00:00:00Z",
)
system_msg = full_messages[0]
assert system_msg["role"] == "system"
assert isinstance(system_msg["content"], list)
assert len(system_msg["content"]) == 2
static_block = system_msg["content"][0]
assert static_block["cache_control"] == {"type": "ephemeral"}
assert "You are a queen." in static_block["text"]
assert "Tool compatibility mode is active" in static_block["text"]
assert "web_search" in static_block["text"]
assert "2026-04-23" not in static_block["text"]
dynamic_block = system_msg["content"][1]
assert "cache_control" not in dynamic_block
assert dynamic_block["text"] == "Current time: 2026-04-23T00:00:00Z"
def test_tool_compat_messages_stay_concatenated_for_liquid(self):
"""Liquid (and other non-cache-control OR sub-providers) keep legacy behavior."""
provider = LiteLLMProvider(
model="openrouter/liquid/lfm-2.5-1.2b-thinking:free",
api_key="test-key",
)
tools = [
Tool(
name="web_search",
description="Search the web",
parameters={"properties": {"query": {"type": "string"}}, "required": ["query"]},
)
]
full_messages = provider._build_openrouter_tool_compat_messages(
messages=[{"role": "user", "content": "hi"}],
system="You are a queen.",
tools=tools,
system_dynamic_suffix="Current time: 2026-04-23T00:00:00Z",
)
system_msg = full_messages[0]
assert isinstance(system_msg["content"], str)
assert "2026-04-23" in system_msg["content"]
assert "cache_control" not in system_msg
class TestExtractCacheTokens:
"""`_extract_cache_tokens` reads cache_read + cache_creation from the
LiteLLM-normalized usage object. Both fields are subsets of
``prompt_tokens`` the helper surfaces them for display, the call sites
are responsible for never adding them to a total."""
def test_none_usage_returns_zero(self):
assert _extract_cache_tokens(None) == (0, 0)
def test_openai_shape(self):
"""Pure OpenAI responses expose cached reads via
``prompt_tokens_details.cached_tokens`` and have no cache write
field at all (OpenAI's automatic caching is read-only from the
client's perspective)."""
usage = MagicMock(spec=["prompt_tokens_details", "cache_creation_input_tokens"])
usage.prompt_tokens_details = MagicMock(
spec=["cached_tokens"], cached_tokens=120,
)
usage.cache_creation_input_tokens = 0
cache_read, cache_creation = _extract_cache_tokens(usage)
assert cache_read == 120
assert cache_creation == 0
def test_openrouter_cache_write_tokens_shape(self):
"""OpenRouter normalizes cache writes into
``prompt_tokens_details.cache_write_tokens`` (verified empirically
against openrouter/anthropic and openrouter/z-ai responses). The
legacy ``usage.cache_creation_input_tokens`` field is NOT set on
OpenRouter responses, so this is the path that matters in practice."""
usage = MagicMock()
usage.prompt_tokens_details = MagicMock(
cached_tokens=80, cache_write_tokens=50,
)
# Explicitly set the Anthropic-native field to 0 to prove we don't
# depend on it for OpenRouter responses.
usage.cache_creation_input_tokens = 0
cache_read, cache_creation = _extract_cache_tokens(usage)
assert cache_read == 80
assert cache_creation == 50
def test_anthropic_native_cache_creation_field_still_works(self):
"""Direct Anthropic API responses (not via OpenRouter) put cache
writes on the top-level ``cache_creation_input_tokens`` field. Keep
the fallback so non-OpenRouter Anthropic continues to work."""
usage = MagicMock(spec=["prompt_tokens_details", "cache_creation_input_tokens"])
usage.prompt_tokens_details = MagicMock(
spec=["cached_tokens"], cached_tokens=80,
)
usage.cache_creation_input_tokens = 50
cache_read, cache_creation = _extract_cache_tokens(usage)
assert cache_read == 80
assert cache_creation == 50
def test_raw_anthropic_shape_falls_back(self):
"""Raw Anthropic usage (no prompt_tokens_details) — fall back to
cache_read_input_tokens."""
usage = MagicMock(spec=["cache_read_input_tokens", "cache_creation_input_tokens"])
usage.cache_read_input_tokens = 200
usage.cache_creation_input_tokens = 75
# Force prompt_tokens_details to be missing on the spec'd mock.
cache_read, cache_creation = _extract_cache_tokens(usage)
assert cache_read == 200
assert cache_creation == 75
def test_no_cache_fields_returns_zero(self):
"""A provider that doesn't report cache tokens at all (e.g. Gemini)
returns (0, 0) never raises."""
usage = MagicMock(spec=["prompt_tokens", "completion_tokens"])
cache_read, cache_creation = _extract_cache_tokens(usage)
assert cache_read == 0
assert cache_creation == 0
class TestStreamingChunksFallbackPreservesCacheFields:
"""Regression: when LiteLLM strips usage from yielded streaming chunks,
we fall back to ``response.chunks`` to recover token totals. LiteLLM's
own ``calculate_total_usage()`` aggregates ``prompt_tokens`` /
``completion_tokens`` correctly but DROPS ``prompt_tokens_details``
which is where OpenRouter places ``cached_tokens`` and
``cache_write_tokens``. The fallback path must walk the raw chunks to
recover those fields, otherwise streaming OpenRouter calls always
report zero cache tokens. (Verified empirically against
openrouter/anthropic/* and openrouter/z-ai/*.)"""
def test_chunks_with_cache_fields_recovered(self):
"""Simulate the chunks-fallback hot path: build raw chunks where the
last one carries cache_write_tokens, run the same recovery loop the
streaming code uses, and assert we surface the cache fields."""
# Three chunks: text deltas, then a final chunk with usage.
empty_usage_chunk = MagicMock()
empty_usage_chunk.usage = None
last_chunk = MagicMock()
last_chunk.usage = MagicMock()
last_chunk.usage.prompt_tokens_details = MagicMock(
cached_tokens=0, cache_write_tokens=5601,
)
last_chunk.usage.cache_creation_input_tokens = 0
chunks = [empty_usage_chunk, empty_usage_chunk, last_chunk]
# Mirror the production loop in litellm.py's chunks-fallback.
cached, creation = 0, 0
for raw in reversed(chunks):
usage = getattr(raw, "usage", None)
if usage is None:
continue
cr, cc = _extract_cache_tokens(usage)
if cr or cc:
cached, creation = cr, cc
break
assert cached == 0
assert creation == 5601, (
"chunks-fallback must recover cache_write_tokens from the raw "
"chunk, not from calculate_total_usage which strips details"
)
def test_chunks_with_cache_read_recovered(self):
"""Same path, but for a cache HIT (cached_tokens populated)."""
last_chunk = MagicMock()
last_chunk.usage = MagicMock()
last_chunk.usage.prompt_tokens_details = MagicMock(
cached_tokens=5601, cache_write_tokens=0,
)
last_chunk.usage.cache_creation_input_tokens = 0
cached, creation = 0, 0
for raw in reversed([last_chunk]):
usage = getattr(raw, "usage", None)
if usage is None:
continue
cr, cc = _extract_cache_tokens(usage)
if cr or cc:
cached, creation = cr, cc
break
assert cached == 5601
assert creation == 0
class TestExtractCost:
"""`_extract_cost` pulls USD cost from three sources in order:
usage.cost (OpenRouter native / include_cost_in_streaming_usage)
response._hidden_params['response_cost'] (LiteLLM logging)
litellm.completion_cost() (pricing-table fallback)."""
def test_none_response_returns_zero(self):
assert _extract_cost(None, "gpt-4o-mini") == 0.0
def test_openrouter_usage_cost_is_preferred(self):
"""OpenRouter returns authoritative per-call cost on usage.cost when
the caller opts in (usage.include=true). That beats LiteLLM's
pricing-table estimate because it reflects promo pricing and BYOK markup."""
response = MagicMock()
response.usage = MagicMock(cost=0.00123)
response._hidden_params = {"response_cost": 99.99} # should be ignored
assert _extract_cost(response, "openrouter/anthropic/claude-opus-4.5") == 0.00123
def test_hidden_params_response_cost_used_when_no_usage_cost(self):
"""LiteLLM's logging layer attaches response_cost after most
completions this is how OpenAI/Anthropic responses get costed
without going back to the pricing table."""
response = MagicMock()
response.usage = MagicMock(spec=[]) # no .cost attribute
response._hidden_params = {"response_cost": 0.0042}
assert _extract_cost(response, "gpt-4o-mini") == 0.0042
def test_falls_back_to_completion_cost_when_nothing_pre_populated(self):
"""For providers where LiteLLM didn't pre-populate cost, call
litellm.completion_cost() against the pricing table. Mocked here
because we don't want tests depending on the exact price of
claude-sonnet-4.5 in LiteLLM's model map."""
response = MagicMock()
response.usage = MagicMock(spec=[])
response._hidden_params = {}
with patch("litellm.completion_cost", return_value=0.00789):
assert _extract_cost(response, "anthropic/claude-sonnet-4.5") == 0.00789
def test_completion_cost_exception_returns_zero(self):
"""Unpriced models (e.g. new OpenRouter routes not yet in LiteLLM's
catalog) must not crash the hot path."""
response = MagicMock()
response.usage = MagicMock(spec=[])
response._hidden_params = {}
with patch("litellm.completion_cost", side_effect=Exception("no pricing")):
assert _extract_cost(response, "openrouter/mystery/model") == 0.0
def test_zero_cost_falls_through_to_next_source(self):
"""usage.cost == 0 should NOT short-circuit; fall through to
_hidden_params / completion_cost so we don't cement a false zero."""
response = MagicMock()
response.usage = MagicMock(cost=0.0)
response._hidden_params = {"response_cost": 0.0055}
assert _extract_cost(response, "gpt-4o-mini") == 0.0055
class TestCostFromTokens:
"""`_cost_from_tokens` is the streaming-path cost helper: stream wrappers
don't expose the full ModelResponse shape that completion_cost() expects,
so we go through cost_per_token() with the already-extracted totals."""
def test_zero_tokens_returns_zero_without_calling_litellm(self):
with patch("litellm.cost_per_token") as mock:
assert _cost_from_tokens("claude-opus-4.5", 0, 0) == 0.0
mock.assert_not_called()
def test_empty_model_returns_zero(self):
assert _cost_from_tokens("", 1000, 500) == 0.0
def test_computes_from_tokens(self):
with patch("litellm.cost_per_token", return_value=(0.001, 0.002)) as mock:
cost = _cost_from_tokens(
"anthropic/claude-opus-4.5",
input_tokens=1000,
output_tokens=500,
cached_tokens=200,
cache_creation_tokens=100,
)
assert cost == pytest.approx(0.003)
# Verify the cache-aware kwargs are threaded through — Anthropic
# needs these to apply the 1.25x write / 0.1x read multipliers.
call_kwargs = mock.call_args.kwargs
assert call_kwargs["prompt_tokens"] == 1000
assert call_kwargs["completion_tokens"] == 500
assert call_kwargs["cache_read_input_tokens"] == 200
assert call_kwargs["cache_creation_input_tokens"] == 100
def test_exception_returns_zero(self):
with patch("litellm.cost_per_token", side_effect=Exception("unpriced")):
assert _cost_from_tokens("mystery/model", 1000, 500) == 0.0
def test_negative_or_none_components_coerce_to_zero(self):
"""LiteLLM returns (None, None) for unknown models in some versions;
treat as 0 rather than crashing on None+None."""
with patch("litellm.cost_per_token", return_value=(None, None)):
assert _cost_from_tokens("some/model", 1, 1) == 0.0
class TestLLMResponseAndFinishEventHaveCostUsd:
"""Regression: both LLMResponse and FinishEvent must carry cost_usd so
the agent loop event bus frontend pipeline doesn't lose cost."""
def test_llm_response_defaults_cost_to_zero(self):
from framework.llm.provider import LLMResponse
r = LLMResponse(content="", model="m")
assert r.cost_usd == 0.0
def test_finish_event_defaults_cost_to_zero(self):
from framework.llm.stream_events import FinishEvent
e = FinishEvent()
assert e.cost_usd == 0.0
def test_finish_event_accepts_cost(self):
from framework.llm.stream_events import FinishEvent
e = FinishEvent(cost_usd=0.0123)
assert e.cost_usd == 0.0123
+29
View File
@@ -298,6 +298,35 @@ class TestNodeConversation:
assert conv.compaction_warning() is True
assert conv.needs_compaction() is True
@pytest.mark.asyncio
async def test_needs_compaction_uses_hybrid_buffer(self):
"""Hybrid: effective buffer is fixed_tokens + ratio * max_context.
With max=1000, fixed=200, ratio=0.1 effective_buffer=300, so
the trigger threshold is 700.
"""
conv = NodeConversation(
max_context_tokens=1000,
compaction_buffer_tokens=200,
compaction_buffer_ratio=0.1,
)
conv.update_token_count(650)
assert conv.needs_compaction() is False
conv.update_token_count(700)
assert conv.needs_compaction() is True
@pytest.mark.asyncio
async def test_needs_compaction_ratio_only(self):
"""Ratio component alone (without a fixed floor) still works."""
conv = NodeConversation(
max_context_tokens=1000,
compaction_buffer_ratio=0.25,
)
conv.update_token_count(740)
assert conv.needs_compaction() is False
conv.update_token_count(760)
assert conv.needs_compaction() is True
@pytest.mark.asyncio
async def test_legacy_threshold_rule_still_works_without_buffer(self):
"""Without compaction_buffer_tokens, the old multiplicative rule
+10 -30
View File
@@ -63,9 +63,7 @@ def _seed_queen(tmp_path: Path):
"""Write a queen profile so _queen_scope recognises the id."""
queen_home = Path.home() / ".hive" / "agents" / "queens" / "ops"
queen_home.mkdir(parents=True, exist_ok=True)
(queen_home / "profile.yaml").write_text(
"name: Ops\ntitle: Ops queen\n", encoding="utf-8"
)
(queen_home / "profile.yaml").write_text("name: Ops\ntitle: Ops queen\n", encoding="utf-8")
return queen_home
@@ -76,9 +74,7 @@ def _seed_colony(tmp_path: Path):
return colony_home
async def test_get_queen_skills_returns_empty_for_fresh_queen(
client: TestClient, _seed_queen
) -> None:
async def test_get_queen_skills_returns_empty_for_fresh_queen(client: TestClient, _seed_queen) -> None:
resp = await client.get("/api/queen/ops/skills")
assert resp.status == 200
data = await resp.json()
@@ -88,9 +84,7 @@ async def test_get_queen_skills_returns_empty_for_fresh_queen(
assert isinstance(data["skills"], list)
async def test_create_queen_skill_writes_file_and_override(
client: TestClient, _seed_queen
) -> None:
async def test_create_queen_skill_writes_file_and_override(client: TestClient, _seed_queen) -> None:
payload = {
"name": "ops-runbook",
"description": "Runbook for ops",
@@ -112,9 +106,7 @@ async def test_create_queen_skill_writes_file_and_override(
assert entry.enabled is True
async def test_patch_queen_skill_toggles_enabled(
client: TestClient, _seed_queen
) -> None:
async def test_patch_queen_skill_toggles_enabled(client: TestClient, _seed_queen) -> None:
await client.post(
"/api/queen/ops/skills",
json={"name": "ops-a", "description": "a", "body": "body"},
@@ -128,9 +120,7 @@ async def test_patch_queen_skill_toggles_enabled(
assert store.get("ops-a").enabled is False
async def test_delete_queen_skill_removes_files(
client: TestClient, _seed_queen
) -> None:
async def test_delete_queen_skill_removes_files(client: TestClient, _seed_queen) -> None:
await client.post(
"/api/queen/ops/skills",
json={"name": "tmp-skill", "description": "d", "body": "body"},
@@ -145,9 +135,7 @@ async def test_delete_queen_skill_removes_files(
assert "tmp-skill" in store.deleted_ui_skills
async def test_delete_framework_skill_is_refused(
client: TestClient, _seed_queen
) -> None:
async def test_delete_framework_skill_is_refused(client: TestClient, _seed_queen) -> None:
# Pre-seed an override entry with framework provenance — simulates the
# user toggling a framework default so the override exists on disk.
store = SkillOverrideStore.load(_seed_queen / "skills_overrides.json")
@@ -177,9 +165,7 @@ async def test_upload_markdown_places_in_user_library(client: TestClient) -> Non
assert (Path.home() / ".hive" / "skills" / "from-upload" / "SKILL.md").exists()
async def test_upload_zip_bundle_places_in_queen_scope(
client: TestClient, _seed_queen
) -> None:
async def test_upload_zip_bundle_places_in_queen_scope(client: TestClient, _seed_queen) -> None:
# Build a zip in memory with SKILL.md + a supporting file.
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
@@ -203,9 +189,7 @@ async def test_upload_zip_bundle_places_in_queen_scope(
assert (skill_dir / "scripts" / "helper.py").exists()
async def test_patch_does_not_mislabel_legacy_colony_skill_as_framework(
client: TestClient, _seed_colony
) -> None:
async def test_patch_does_not_mislabel_legacy_colony_skill_as_framework(client: TestClient, _seed_colony) -> None:
"""Regression: toggling a legacy colony skill (no ledger entry yet)
must not stamp provenance=FRAMEWORK on the new entry. Before the fix,
the first PATCH wrote FRAMEWORK and the next GET displayed 'Framework'
@@ -230,9 +214,7 @@ async def test_patch_does_not_mislabel_legacy_colony_skill_as_framework(
assert rows["legacy-queen-skill"]["enabled"] is False
async def test_colony_skill_is_editable_even_without_override_entry(
client: TestClient, _seed_colony
) -> None:
async def test_colony_skill_is_editable_even_without_override_entry(client: TestClient, _seed_colony) -> None:
"""Regression: a SKILL.md dropped into a colony's .hive/skills dir
(e.g. from a pre-override-store colony) must still be marked editable
when listed via /api/colonies/{name}/skills. The admin manager used
@@ -261,9 +243,7 @@ async def test_colony_skill_is_editable_even_without_override_entry(
assert rows["legacy-skill"]["provenance"] == "queen_created"
async def test_list_scopes_enumerates_queens_and_colonies(
client: TestClient, _seed_queen, _seed_colony
) -> None:
async def test_list_scopes_enumerates_queens_and_colonies(client: TestClient, _seed_queen, _seed_colony) -> None:
resp = await client.get("/api/skills/scopes")
assert resp.status == 200
data = await resp.json()
@@ -53,6 +53,7 @@ class _ByTaskMockLLM(LLMProvider):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
**kwargs,
) -> AsyncIterator:
first_user = ""
for m in messages:
@@ -329,6 +330,7 @@ class _SlowLLM(LLMProvider):
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
**kwargs,
) -> AsyncIterator:
self._turn_count += 1
# On the second call (after the watcher's inject), check whether the
+3 -9
View File
@@ -142,9 +142,7 @@ class TestSkillsManagerOverrides:
SkillsManagerConfig(
queen_id="q",
queen_overrides_path=overrides_path,
extra_scope_dirs=[
ExtraScope(directory=fake_fw, label="framework", priority=0)
],
extra_scope_dirs=[ExtraScope(directory=fake_fw, label="framework", priority=0)],
project_root=None,
skip_community_discovery=True,
interactive=False,
@@ -186,9 +184,7 @@ class TestSkillsManagerOverrides:
queen_overrides_path=queen_overrides,
colony_name="c",
colony_overrides_path=colony_overrides,
extra_scope_dirs=[
ExtraScope(directory=queen_skills, label="queen_ui", priority=2)
],
extra_scope_dirs=[ExtraScope(directory=queen_skills, label="queen_ui", priority=2)],
project_root=None,
skip_community_discovery=True,
skills_config=SkillsConfig(),
@@ -224,9 +220,7 @@ class TestSkillsManagerOverrides:
assert "hive.x-automation" in enumerated
assert "hive.browser-automation" in enumerated
def test_preset_skill_enabled_via_explicit_override(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
def test_preset_skill_enabled_via_explicit_override(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(Path, "home", lambda: tmp_path / "home")
fake_presets = tmp_path / "fake_presets"
_write_skill_file(fake_presets, "hive.x-automation")
+2
View File
@@ -241,6 +241,8 @@ class TestEventSerialization:
"input_tokens": 10,
"output_tokens": 20,
"cached_tokens": 0,
"cache_creation_tokens": 0,
"cost_usd": 0.0,
"model": "gpt-4",
}
+77
View File
@@ -0,0 +1,77 @@
# 🐝 Hive Agent v0.10.4: Skill & Tool Library
> Skills and tools move from something the framework hands down into something you curate. Every queen and every colony now has a dedicated allowlist and a UI to manage it, and the system prompt gets smaller and cache-friendlier along the way.
---
## ✨ Highlights
v0.10.4 turns skills and tools into first-class, user-editable surfaces.
Before this release, the skill and tool catalogs were effectively framework defaults: whatever a queen could reach, every queen could reach. Now each queen has her own tool allowlist, her own skills, and a pair of library pages where you can browse, enable, disable, upload, and author them. Colonies inherit their founding queen's configuration at creation time and then drift on their own — flip a tool off for a colony without touching the queen, or the other way around.
A quiet but important second theme: the system prompt is now **static** across a session. Date and time — the main source of per-turn churn — are now injected at turn time instead of baked into the prompt. That keeps the prompt prefix stable so provider-side prompt caching can do its job.
---
## 🆕 What's New
### Skill Library
- **Skill Library page** — browse every skill by scope (queen / colony / framework preset), view SKILL.md inline, toggle per-scope enablement, upload skills as `.md` or `.zip`, and author new skills from the UI.
- **Per-scope overrides** — skill enablement is recorded in `~/.hive/agents/queens/{queen_id}/skills_overrides.json` and `~/.hive/colonies/{colony_name}/skills_overrides.json`; framework presets stay read-only, user-authored skills live under each scope's own skills directory.
- **Skill provenance** — the API and UI now distinguish framework-preset skills, queen-authored skills, and colony-authored skills, so you can tell at a glance who owns a given skill.
- **Skill authoring primitives** — a shared `framework.skills.authoring` module validates names, parses frontmatter, and materializes skill folders for the UI upload path, the `create_colony` tool's inline skills, and future runtime-learned skills.
- **Preset rename** — built-in skills moved from `_default_skills/` to `_preset_skills/` to match the new "preset vs. user" split. Existing browser/linkedin/x automation skills carry over untouched.
### Tool Library
- **Tool Library page** with a shared `ToolsEditor` component used by the queen profile and colony settings panels.
- **Per-queen tool allowlist** at `~/.hive/agents/queens/{queen_id}/tools.json`: `null` = allow all, `[]` = disable all, `["foo", "bar"]` = only these MCP tools pass the filter.
- **Per-colony tool allowlist** at `~/.hive/colonies/{colony_name}/tools.json`, with the same schema, atomic writes, and independent lifecycle.
- **Configurable defaults** — queens now carry a default tool/skill bundle that seeds each new colony, and the bundle itself is editable.
- **Colony inheritance** — when a queen spawns a colony, the colony starts from the queen's tool and skill configuration. After spawn the two diverge freely.
- **Colony sidecar** — `tools.json` lives next to `metadata.json` so identity/provenance (queen, created_at, workers) and tool gating evolve independently.
### MCP Server Management
- **MCP Servers panel** — dedicated settings UI for browsing, configuring, and enabling bundled and user MCP servers.
- **`/api/mcp` routes** for listing built-in servers, inspecting state, and reporting errors with structured MCP error responses.
- **Tool catalog wiring** — live queen sessions now surface their MCP tool catalog to the queen-tools and colony-tools endpoints, so the UI shows exactly what the running session can see.
### Prompt & Runtime
- **Static system prompt** — the agent loop, conversation, and provider adapters (LiteLLM, Antigravity, Codex, Mock) now build and freeze the system prompt once per session. Per-turn values that used to churn the prompt are gone.
- **Date/time injected at turn time** — today's date and current time move from the system prompt into a turn-level injection path that updates cursor persistence and queen-lifecycle tooling.
- **Queen orchestrator** — refreshed to pair with the static prompt model and the new tool/skill configuration layers.
- **Session manager** — tightened session-creation input validation and reflection/skill edge handling; "create new session and switch branch" is now reliable.
---
## 🐛 Bug Fixes
- **No-cache middleware on `/api/*`** — every API response now carries `Cache-Control: no-store`. Without this, a one-off bad response (e.g. the SPA catch-all leaking `index.html` for an `/api/*` URL before a route was registered) could get pinned in the browser's disk cache and replayed forever, since our JSON handlers don't emit ETag/Last-Modified. Hard-refresh no longer required to recover.
- **Tools & skills registration** — queens and colonies no longer end up with stale or duplicated entries after reloads.
- **Session creation** — invalid inputs are rejected up front with clear errors instead of surfacing later as runtime failures.
- **Skill / reflection edges** — tightened handling so reflection runs no longer see half-built skill state during scope reloads.
- **Create new session + switch branch** flow works end-to-end without orphaning sessions.
- **CI** — broken workflow repaired.
---
## 🧪 Tests
- `test_routes_skills.py`, `test_skill_overrides.py`, `test_colony_tools.py`, `test_queen_tools.py`, `test_mcp_routes.py` — coverage added for every new route group and the override store.
---
## 🚀 Upgrading from v0.10.3
No migration. Pull `main` at `v0.10.4` and restart Hive — existing `~/.hive/` profiles, queens, colonies, and sessions keep working.
Two things to know:
1. **Preset skills directory renamed** from `_default_skills/` to `_preset_skills/` inside the framework. If you had external scripts pointing at that path, update them. User-authored skills under `~/.hive/` are unaffected.
2. **First open of a queen or colony writes a `tools.json` sidecar** the first time you edit its allowlist. If you don't touch the Tool Library, nothing is written and behavior matches v0.10.3 (allow all MCP tools).
Curate your queens. 🐝
+165
View File
@@ -1042,6 +1042,49 @@ print(json.dumps(config, indent=2))
PY
}
save_vision_fallback() {
# Write the `vision_fallback` block to ~/.hive/configuration.json.
# Args: provider_id, model, env_var (api_key_env_var), api_base (optional)
# When provider_id is empty, REMOVE the block entirely (user opted out).
local provider_id="$1"
local model="$2"
local env_var="$3"
local api_base="${4:-}"
uv run python - "$provider_id" "$model" "$env_var" "$api_base" <<'PY'
import json
import sys
from pathlib import Path
provider_id, model, env_var, api_base = sys.argv[1:5]
cfg_path = Path.home() / ".hive" / "configuration.json"
cfg_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(cfg_path, encoding="utf-8-sig") as f:
config = json.load(f)
except (OSError, json.JSONDecodeError):
config = {}
# Empty provider_id means the user opted out — drop the block.
if not provider_id:
config.pop("vision_fallback", None)
else:
block = {"provider": provider_id, "model": model}
if env_var:
block["api_key_env_var"] = env_var
if api_base:
block["api_base"] = api_base
config["vision_fallback"] = block
tmp_path = cfg_path.with_name(cfg_path.name + ".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2)
tmp_path.replace(cfg_path)
PY
}
# Source shell rc file to pick up existing env vars (temporarily disable set -e)
set +e
if [ -f "$SHELL_RC_FILE" ]; then
@@ -1772,6 +1815,128 @@ fi
echo ""
# ============================================================
# Vision Fallback (subagent for tool-result images)
# ============================================================
#
# When a tool returns an image (browser_screenshot, render_image, etc.)
# but the main agent's model is text-only, the framework can route the
# image through a separate VLM subagent that returns a text caption,
# preserving the agent's ability to reason about visual state.
#
# We always offer the prompt — even for vision-capable main models —
# so the user gets a working fallback if they ever swap to a text-only
# model. The block is dormant for vision-capable mains (the gating
# in agent_loop only fires for models on Hive's deny list).
if [ -n "$SELECTED_PROVIDER_ID" ]; then
echo -e "${YELLOW}${NC} ${BLUE}${BOLD}Vision fallback subagent${NC}"
echo ""
echo -e " ${DIM}When a screenshot/image tool is called from a text-only model,${NC}"
echo -e " ${DIM}the framework can route the image through a vision-capable VLM${NC}"
echo -e " ${DIM}and inject the caption into the conversation. Inert when your${NC}"
echo -e " ${DIM}main model already supports vision (most do).${NC}"
echo ""
# Build the candidate list from the same model_catalog.json the main
# LLM step uses — never hardcode model IDs in this script. For each
# provider in the catalogue, take the catalogue's default model and
# the env var name it expects, then keep only providers the user
# already has an API key for. Output one TSV row per candidate:
# provider_id<TAB>model<TAB>env_var<TAB>display_name
VISION_CANDIDATES_TSV=$(uv run python - <<'PY'
import os
from framework.llm.model_catalog import get_default_models, get_models_catalogue
# Map provider_id → the env-var name the framework reads its key from.
# Mirrors PROVIDER_ENV_VARS at the top of quickstart.sh, plus how the
# rest of the script picks an env var per provider.
PROVIDER_KEY_ENV = {
"anthropic": "ANTHROPIC_API_KEY",
"openai": "OPENAI_API_KEY",
"gemini": "GEMINI_API_KEY",
"groq": "GROQ_API_KEY",
"cerebras": "CEREBRAS_API_KEY",
"minimax": "MINIMAX_API_KEY",
"mistral": "MISTRAL_API_KEY",
"together": "TOGETHER_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
"kimi": "KIMI_API_KEY",
"openrouter": "OPENROUTER_API_KEY",
}
defaults = get_default_models()
catalog = get_models_catalogue()
for provider_id, default_model in sorted(defaults.items()):
env = PROVIDER_KEY_ENV.get(provider_id)
if not env:
continue
# GEMINI_API_KEY OR GOOGLE_API_KEY both unlock gemini
has_key = bool(os.environ.get(env))
if provider_id == "gemini" and not has_key:
if os.environ.get("GOOGLE_API_KEY"):
has_key = True
env = "GOOGLE_API_KEY"
if not has_key:
continue
# Display name: provider/model from the catalogue verbatim
display = f"{provider_id}/{default_model}"
print(f"{provider_id}\t{default_model}\t{env}\t{display}")
PY
)
if [ -z "$VISION_CANDIDATES_TSV" ]; then
echo -e " ${YELLOW}No matching API keys detected for any catalog provider.${NC}"
echo -e " ${DIM}Set an API key for any provider in model_catalog.json and rerun.${NC}"
echo -e " ${DIM}Skipping for now — text-only models will lose image content silently.${NC}"
else
# Materialise into bash array for selection
VISION_CANDIDATES=()
while IFS= read -r line; do
[ -n "$line" ] && VISION_CANDIDATES+=("$line")
done <<< "$VISION_CANDIDATES_TSV"
echo -e " ${BOLD}Available vision-fallback models${NC} ${DIM}(from model_catalog.json):${NC}"
echo -e " ${DIM}0)${NC} (skip — don't configure vision fallback)"
idx=1
for entry in "${VISION_CANDIDATES[@]}"; do
IFS=$'\t' read -r _vp _vm _vk _vd <<< "$entry"
echo -e " ${DIM}${idx})${NC} ${_vd} ${DIM}[\$${_vk}]${NC}"
idx=$((idx + 1))
done
echo ""
VISION_CHOICE=""
while true; do
read -r -p " Pick a vision-fallback model [1-${#VISION_CANDIDATES[@]}, 0=skip, default=1]: " VISION_CHOICE || VISION_CHOICE=""
VISION_CHOICE="${VISION_CHOICE:-1}"
if [[ "$VISION_CHOICE" =~ ^[0-9]+$ ]] && \
[ "$VISION_CHOICE" -ge 0 ] && \
[ "$VISION_CHOICE" -le "${#VISION_CANDIDATES[@]}" ]; then
break
fi
echo -e " ${YELLOW}Please enter 0 (skip) or 1-${#VISION_CANDIDATES[@]}.${NC}"
done
if [ "$VISION_CHOICE" = "0" ]; then
# Explicit skip — drop any prior block so config stays clean.
save_vision_fallback "" "" "" "" > /dev/null 2>&1 || true
echo -e " ${DIM}skipped — no vision_fallback block written${NC}"
else
chosen="${VISION_CANDIDATES[$((VISION_CHOICE - 1))]}"
IFS=$'\t' read -r vf_provider vf_model vf_env vf_display <<< "$chosen"
echo -n " Saving vision_fallback... "
if save_vision_fallback "$vf_provider" "$vf_model" "$vf_env" "" > /dev/null; then
echo -e "${GREEN}${NC}"
echo -e " ${DIM}vision_fallback: ${vf_display} (key from \$${vf_env})${NC}"
else
echo -e "${RED}failed${NC}"
echo -e " ${YELLOW}Could not write vision_fallback to ~/.hive/configuration.json — non-fatal, edit manually if needed.${NC}"
fi
fi
fi
echo ""
fi
# ============================================================
# Browser Automation (GCU) — always enabled
# ============================================================