fix: tool issues

This commit is contained in:
Timothy
2026-04-08 18:52:34 -07:00
parent dee3980dbe
commit 514d2828fa
6 changed files with 110 additions and 68 deletions
+46 -39
View File
@@ -101,14 +101,16 @@ logger = logging.getLogger(__name__)
# Tags whose content is internal reasoning and must be stripped from
# the user-visible stream. Covers <think> and the 5-pillar character
# assessment tags.
_INTERNAL_TAGS = frozenset({
"think",
"social_distance",
"context",
"mood_filter",
"physical_presence",
"language_engine",
})
_INTERNAL_TAGS = frozenset(
{
"think",
"social_distance",
"context",
"mood_filter",
"physical_presence",
"language_engine",
}
)
_STRIP_RE = re.compile(
r"<(?:" + "|".join(_INTERNAL_TAGS) + r")>"
r".*?"
@@ -117,9 +119,7 @@ _STRIP_RE = re.compile(
)
_INTERNAL_OPEN_RE = re.compile(
r"<(?:" + "|".join(_INTERNAL_TAGS) + r")>"
)
_INTERNAL_OPEN_RE = re.compile(r"<(?:" + "|".join(_INTERNAL_TAGS) + r")>")
# Matches a trailing `<` that could be the start of an internal tag.
# We build a pattern that matches `<` followed by any prefix of any
# internal tag name (e.g. `<so`, `<contex`, `<think`).
@@ -128,25 +128,42 @@ for _tag in _INTERNAL_TAGS:
for _i in range(1, len(_tag) + 1):
_PARTIAL_PREFIXES.add(_tag[:_i])
_PARTIAL_OPEN_RE = re.compile(
r"<(?:" + "|".join(re.escape(p) for p in sorted(_PARTIAL_PREFIXES, key=len, reverse=True)) + r")$"
r"<(?:"
+ "|".join(re.escape(p) for p in sorted(_PARTIAL_PREFIXES, key=len, reverse=True))
+ r")$"
)
_GENERIC_TAG_RE = re.compile(r"</?[a-zA-Z_][\w-]*\s*/?>")
_GENERIC_TAG_OR_PARTIAL_RE = re.compile(r"<[a-zA-Z_]|</[a-zA-Z_]|<$")
def _strip_internal_tags_from_snapshot(snapshot: str) -> str:
"""Remove all internal tag blocks from the full accumulated text.
Also truncates at any unclosed or partially-opened internal tag
so partial tags never leak to the frontend during streaming.
Also truncates at any unclosed or partially-opened tag so partial
tags never leak to the frontend during streaming.
"""
cleaned = _STRIP_RE.sub("", snapshot)
# Truncate at any fully-opened but unclosed internal tag
m = _INTERNAL_OPEN_RE.search(cleaned)
if m:
cleaned = cleaned[:m.start()]
cleaned = cleaned[: m.start()]
# Truncate at any partial opening tag at the end (e.g. `<social` or `<co`)
m2 = _PARTIAL_OPEN_RE.search(cleaned)
if m2:
cleaned = cleaned[:m2.start()]
cleaned = cleaned[: m2.start()]
# Generic pass: strip any remaining XML-like tags the LLM hallucinated
# (e.g. <professional>, <staging>, </neutral>). These are never
# intentional markup — just remove them outright.
cleaned = _GENERIC_TAG_RE.sub("", cleaned)
# Truncate at any remaining `<` that looks like it could be a tag
# start (followed by a letter) or a bare `<` at end of string.
# During streaming this suppresses partial tags until they resolve.
m3 = _GENERIC_TAG_OR_PARTIAL_RE.search(cleaned)
if m3:
cleaned = cleaned[: m3.start()]
return cleaned
@@ -548,9 +565,7 @@ class AgentLoop(NodeProtocol):
_consecutive_empty_turns: int = 0
# 6. Main loop
logger.debug(
"[AgentLoop.execute] Entering main loop, start_iteration=%d", start_iteration
)
logger.debug("[AgentLoop.execute] Entering main loop, start_iteration=%d", start_iteration)
for iteration in range(start_iteration, self._config.max_iterations):
iter_start = time.time()
logger.debug("[AgentLoop.execute] iteration=%d starting", iteration)
@@ -584,9 +599,7 @@ class AgentLoop(NodeProtocol):
)
# 6b. Drain injection queue
logger.debug(
"[AgentLoop.execute] iteration=%d: draining injection queue...", iteration
)
logger.debug("[AgentLoop.execute] iteration=%d: draining injection queue...", iteration)
drained_injections = await self._drain_injection_queue(conversation, ctx)
logger.debug(
"[AgentLoop.execute] iteration=%d: drained %d injections",
@@ -682,7 +695,9 @@ class AgentLoop(NodeProtocol):
_new_prompt = stamp_prompt_datetime(ctx.dynamic_prompt_provider())
else:
from framework.orchestrator.prompting import build_system_prompt_for_node_context
from framework.orchestrator.prompting import (
build_system_prompt_for_node_context,
)
_new_prompt = build_system_prompt_for_node_context(ctx)
if _new_prompt != conversation.system_prompt:
@@ -755,8 +770,7 @@ class AgentLoop(NodeProtocol):
ctx, conversation, tools, iteration, accumulator
)
logger.debug(
"[AgentLoop.execute] iteration=%d:"
" _run_single_turn completed successfully",
"[AgentLoop.execute] iteration=%d: _run_single_turn completed successfully",
iteration,
)
_turn_ms = int((time.monotonic() - _turn_t0) * 1000)
@@ -830,8 +844,7 @@ class AgentLoop(NodeProtocol):
except Exception as e:
logger.debug(
"[AgentLoop.execute] iteration=%d:"
" Exception in _run_single_turn: %s (%s)",
"[AgentLoop.execute] iteration=%d: Exception in _run_single_turn: %s (%s)",
iteration,
type(e).__name__,
str(e)[:200],
@@ -1006,7 +1019,6 @@ class AgentLoop(NodeProtocol):
and not outputs_set
and not user_input_requested
and not queen_input_requested
)
if truly_empty and accumulator is not None:
missing = self._get_missing_output_keys(
@@ -1265,7 +1277,6 @@ class AgentLoop(NodeProtocol):
_worker_no_tool_turn = (
not real_tool_results
and not outputs_set
and not queen_input_requested
and not user_input_requested
)
@@ -1771,7 +1782,7 @@ class AgentLoop(NodeProtocol):
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if missing and self._judge is not None :
if missing and self._judge is not None:
hint = (
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Follow your system prompt instructions to complete the work."
@@ -2216,11 +2227,9 @@ class AgentLoop(NodeProtocol):
# Strip internal reasoning tags from the full
# snapshot, then diff against what we already
# emitted to get the new visible delta.
_new_clean = _strip_internal_tags_from_snapshot(
event.snapshot
)
_new_clean = _strip_internal_tags_from_snapshot(event.snapshot)
if len(_new_clean) > len(_clean_snapshot):
_delta = _new_clean[len(_clean_snapshot):]
_delta = _new_clean[len(_clean_snapshot) :]
_clean_snapshot = _new_clean
await self._publish_text_delta(
stream_id,
@@ -2347,14 +2356,13 @@ class AgentLoop(NodeProtocol):
# execution. The LLM will see it on the next inner turn.
if not self._injection_queue.empty():
while not self._injection_queue.empty():
_inj_content, _inj_client, _inj_images = (
self._injection_queue.get_nowait()
)
_inj_content, _inj_client, _inj_images = self._injection_queue.get_nowait()
if _inj_client:
await conversation.add_user_message(_inj_content)
logger.info(
"[%s] Priority-injected user message mid-turn (%d chars)",
node_id, len(_inj_content),
node_id,
len(_inj_content),
)
else:
await conversation.add_user_message(_inj_content)
@@ -3536,4 +3544,3 @@ class AgentLoop(NodeProtocol):
# -------------------------------------------------------------------
# Subagent Execution
# -------------------------------------------------------------------
+3 -9
View File
@@ -961,9 +961,6 @@ def load_agent_config(data: str | dict) -> tuple[GraphSpec, Goal]:
elif nc.tools.policy == "none":
tools_list = []
tool_policy = "none"
elif nc.tools.policy == "all":
tools_list = []
tool_policy = "all"
else:
# Inherit agent-level tool config
if config.tools.policy == "explicit" and config.tools.allowed:
@@ -1037,9 +1034,7 @@ def load_agent_config(data: str | dict) -> tuple[GraphSpec, Goal]:
"max_tokens": config.max_tokens,
"loop_config": dict(config.loop_config),
"conversation_mode": config.conversation_mode,
"identity_prompt": _resolve_template_vars(
config.identity_prompt, tvars
) or "",
"identity_prompt": _resolve_template_vars(config.identity_prompt, tvars) or "",
}
graph = GraphSpec(**graph_kwargs)
@@ -1267,6 +1262,7 @@ class AgentLoader:
os.environ["HIVE_STORAGE_PATH"] = str(self._storage_path)
# MCP tools are loaded by McpRegistryStage in the pipeline during AgentHost.start()
@staticmethod
def _import_agent_module(agent_path: Path):
"""Import an agent package from its directory path.
@@ -2120,9 +2116,7 @@ class AgentLoader:
warnings.append(warning_msg)
except ImportError:
# aden_tools not installed - fall back to direct check
has_llm_nodes = any(
node.node_type == "event_loop" for node in self.graph.nodes
)
has_llm_nodes = any(node.node_type == "event_loop" for node in self.graph.nodes)
if has_llm_nodes:
api_key_env = self._get_api_key_env_var(self.model)
if api_key_env and not os.environ.get(api_key_env):
+5 -1
View File
@@ -283,7 +283,11 @@ def build_node_context_from_graph_context(
gc = graph_context
resolved_override_tools = override_tools
if resolved_override_tools is None and gc.is_continuous and gc.cumulative_tools:
resolved_override_tools = list(gc.cumulative_tools)
if node_spec.tool_access_policy == "explicit" and node_spec.tools:
declared = set(node_spec.tools)
resolved_override_tools = [t for t in gc.cumulative_tools if t.name in declared]
else:
resolved_override_tools = list(gc.cumulative_tools)
resolved_inherited_conversation = inherited_conversation
if resolved_inherited_conversation is None and gc.is_continuous:
+15 -6
View File
@@ -42,6 +42,7 @@ class McpRegistryStage(PipelineStage):
self._tool_registry = ToolRegistry()
from framework.loader.mcp_registry import MCPRegistry
from framework.orchestrator.files import FILES_MCP_SERVER_NAME
registry = MCPRegistry()
mcp_loaded = False
@@ -52,9 +53,7 @@ class McpRegistryStage(PipelineStage):
if names:
configs = registry.resolve_for_agent(include=names)
if configs:
self._tool_registry.load_registry_servers(
[asdict(c) for c in configs]
)
self._tool_registry.load_registry_servers([asdict(c) for c in configs])
mcp_loaded = True
logger.info(
"[pipeline] McpRegistryStage: loaded %d servers: %s",
@@ -73,14 +72,24 @@ class McpRegistryStage(PipelineStage):
if not mcp_loaded:
configs = registry.resolve_for_agent(profile="all")
if configs:
self._tool_registry.load_registry_servers(
[asdict(c) for c in configs]
)
self._tool_registry.load_registry_servers([asdict(c) for c in configs])
logger.info(
"[pipeline] McpRegistryStage: loaded %d servers (fallback)",
len(configs),
)
# 4. Ensure files-tools is always available — agents need file I/O
# for reading skills, writing data, etc. regardless of config.
loaded_names = set(self._tool_registry._mcp_server_tools.keys())
if FILES_MCP_SERVER_NAME not in loaded_names:
files_configs = registry.resolve_for_agent(include=[FILES_MCP_SERVER_NAME])
if files_configs:
self._tool_registry.load_registry_servers([asdict(c) for c in files_configs])
logger.info(
"[pipeline] McpRegistryStage: injected %s",
FILES_MCP_SERVER_NAME,
)
total = len(self._tool_registry.get_tools())
logger.info("[pipeline] McpRegistryStage: %d tools available", total)
+17 -3
View File
@@ -12,7 +12,7 @@ These are resolved at load time from ``AgentConfig.variables``.
from __future__ import annotations
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field, model_validator
class ToolAccessConfig(BaseModel):
@@ -20,24 +20,38 @@ class ToolAccessConfig(BaseModel):
Controls which tools a node/agent has access to.
* ``all`` -- every tool from the registry.
* ``explicit`` -- only tools listed in ``allowed`` (default; empty = zero tools).
* ``none`` -- no tools at all.
``all`` is not permitted agents must declare every tool they use.
"""
model_config = ConfigDict(populate_by_name=True)
policy: str = Field(
default="explicit",
description="One of: 'all', 'explicit', 'none'.",
description="One of: 'explicit', 'none'. 'all' is not allowed.",
)
allowed: list[str] = Field(
default_factory=list,
description="Tool names when policy='explicit'.",
alias="tools",
)
denied: list[str] = Field(
default_factory=list,
description="Tool names to deny (applied after allowed).",
)
@model_validator(mode="after")
def _reject_policy_all(self) -> ToolAccessConfig:
if self.policy == "all":
raise ValueError(
"tool policy 'all' is not allowed — "
"list every tool explicitly in 'allowed' instead. "
"This ensures agents only see the tools they need."
)
return self
class NodeConfig(BaseModel):
"""Declarative node definition."""
+24 -10
View File
@@ -1901,10 +1901,7 @@ def register_queen_lifecycle_tools(
# Create agent folder early so flowchart and agent_path are available
# throughout the entire BUILDING phase.
_agent_name = (
agent_name
or phase_state.draft_graph.get("agent_name", "").strip()
)
_agent_name = agent_name or phase_state.draft_graph.get("agent_name", "").strip()
if _agent_name:
from framework.config import COLONIES_DIR
@@ -1941,9 +1938,7 @@ def register_queen_lifecycle_tools(
# Transition to BUILDING phase
await phase_state.switch_to_building(source="tool")
_update_meta_json(
session_manager, manager_session_id, {"phase": "building"}
)
_update_meta_json(session_manager, manager_session_id, {"phase": "building"})
phase_state.build_confirmed = False
# No injection here -- the return message tells the queen what to do.
@@ -3018,8 +3013,7 @@ def register_queen_lifecycle_tools(
if parent_dir not in _sys.path:
_sys.path.insert(0, parent_dir)
stale = [
n for n in _sys.modules
if n == pkg_name or n.startswith(f"{pkg_name}.")
n for n in _sys.modules if n == pkg_name or n.startswith(f"{pkg_name}.")
]
for n in stale:
del _sys.modules[n]
@@ -3180,7 +3174,9 @@ def register_queen_lifecycle_tools(
"properties": {
"agent_path": {
"type": "string",
"description": ("Path to the agent directory (e.g. '~/.hive/colonies/my_agent')"),
"description": (
"Path to the agent directory (e.g. '~/.hive/colonies/my_agent')"
),
},
},
"required": ["agent_path"],
@@ -3205,6 +3201,24 @@ def register_queen_lifecycle_tools(
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
# Guard: refuse to start while an execution is already running.
# Calling again would cancel the active one via the
# "Restarted with new execution" path in ExecutionStream.execute(),
# which is almost never what the queen intends.
for graph_id in runtime.list_graphs():
reg = runtime.get_graph_registration(graph_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
if stream.active_execution_ids:
return json.dumps(
{
"error": "Worker is already running.",
"active_execution_ids": list(stream.active_execution_ids),
"hint": "Wait for the worker to finish (WORKER_TERMINAL event) or call stop_agent() before starting a new run.",
}
)
try:
# Pre-flight: validate credentials and resync MCP servers.
loop = asyncio.get_running_loop()