Merge branch 'feature/colony-creation' into feature/new-colony

This commit is contained in:
Timothy
2026-04-14 16:34:30 -07:00
65 changed files with 6190 additions and 3502 deletions
+115 -14
View File
@@ -61,6 +61,33 @@ class ToolRegistry:
# and auto-injected at call time for tools that accept them.
CONTEXT_PARAMS = frozenset({"agent_id", "data_dir", "profile"})
# Tools that perform no filesystem/process/network writes and are safe
# to run concurrently with other safe tools in the same assistant turn.
# Unknown tools default to unsafe (serialized) - adding a name here is
# an explicit promise about that tool's side effects. Keep this list
# conservative: anything that mutates state, writes to disk, issues
# POST/PUT/DELETE requests, or drives a browser MUST NOT be listed.
CONCURRENCY_SAFE_TOOLS = frozenset(
{
# File system reads
"read_file",
"list_directory",
"grep",
"glob",
# Web reads
"web_search",
"web_fetch",
# Browser read-only snapshots (mutate-free observations)
"browser_screenshot",
"browser_snapshot",
"browser_console",
"browser_get_text",
# Background bash polling - reads output buffers only, does
# not touch the subprocess itself.
"bash_output",
}
)
# Credential directory used for change detection
_CREDENTIAL_DIR = Path("~/.hive/credentials/credentials").expanduser()
@@ -77,9 +104,24 @@ class ToolRegistry:
self._mcp_cred_snapshot: set[str] = set() # Credential filenames at MCP load time
self._mcp_aden_key_snapshot: str | None = None # ADEN_API_KEY value at MCP load time
self._mcp_server_tools: dict[str, set[str]] = {} # server name -> tool names
# tool name -> owning MCPClient (for force-kill on timeout)
self._mcp_tool_clients: dict[str, Any] = {}
# Per-agent env injected into every MCP server config.env. Kept
# here (not on the process-wide os.environ) so parallel workers
# in the same interpreter don't clobber each other's identity.
self._mcp_extra_env: dict[str, str] = {}
# Agent dir for re-loading registry MCP after credential resync.
self._mcp_registry_agent_path: Path | None = None
def set_mcp_extra_env(self, env: dict[str, str]) -> None:
"""Attach per-agent env vars to every MCPServerConfig this registry builds.
Use this instead of mutating ``os.environ`` the global env dict
is shared across all workers in a single interpreter, so writes
from one worker race with MCP spawns from another.
"""
self._mcp_extra_env = dict(env)
def register(
self,
name: str,
@@ -148,6 +190,7 @@ class ToolRegistry:
"properties": properties,
"required": required,
},
concurrency_safe=tool_name in self.CONCURRENCY_SAFE_TOOLS,
)
def executor(inputs: dict) -> Any:
@@ -337,6 +380,9 @@ class ToolRegistry:
is_error=True,
)
# Expose force-kill hook so the timeout handler can tear down a
# hung MCP subprocess (asyncio.wait_for alone cannot).
executor.kill_for_tool = registry_ref.kill_mcp_for_tool # type: ignore[attr-defined]
return executor
def get_registered_names(self) -> list[str]:
@@ -657,13 +703,17 @@ class ToolRegistry:
from framework.loader.mcp_client import MCPClient, MCPServerConfig
from framework.loader.mcp_connection_manager import MCPConnectionManager
# Build config object
# Build config object. Merge per-agent env on top of the
# server's own env so MCP subprocesses receive the identity
# of the worker that spawned them (instead of whichever
# worker most recently wrote to os.environ).
merged_env = {**self._mcp_extra_env, **(server_config.get("env") or {})}
config = MCPServerConfig(
name=server_config["name"],
transport=server_config["transport"],
command=server_config.get("command"),
args=server_config.get("args", []),
env=server_config.get("env", {}),
env=merged_env,
cwd=server_config.get("cwd"),
url=server_config.get("url"),
headers=server_config.get("headers", {}),
@@ -781,6 +831,7 @@ class ToolRegistry:
make_mcp_executor(client, mcp_tool.name, self, tool_params),
)
self._mcp_tool_names.add(mcp_tool.name)
self._mcp_tool_clients[mcp_tool.name] = client
self._mcp_server_tools[server_name].add(mcp_tool.name)
admitted_names.append(mcp_tool.name)
count += 1
@@ -863,20 +914,42 @@ class ToolRegistry:
"""
verified_names: set[str] = set()
manifest_present = False
# Only probe the sentinel when the server actually advertises it.
# Calling ``__aden_verified_manifest`` unconditionally on every
# MCP server at registration time (a) causes a bogus tool call
# round-trip to every third-party server, (b) pollutes any
# call-capturing fakes in tests, and (c) risks side effects on
# servers that eagerly execute unknown tool names. Listing is
# cheap and cached by the client; this keeps the manifest gate
# active for aden-flavoured servers without penalising others.
sentinel_advertised = False
try:
raw = client.call_tool(self._MCP_VERIFIED_MANIFEST_TOOL, {})
parsed: Any = raw
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, list):
verified_names = {str(n) for n in parsed}
manifest_present = True
for t in client.list_tools():
if getattr(t, "name", None) == self._MCP_VERIFIED_MANIFEST_TOOL:
sentinel_advertised = True
break
except Exception:
# Server doesn't expose the manifest — no verified gate applies.
pass
sentinel_advertised = False
if sentinel_advertised:
try:
raw = client.call_tool(self._MCP_VERIFIED_MANIFEST_TOOL, {})
parsed: Any = raw
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
parsed = None
# Only treat the response as a manifest when it's a list
# of strings. A malformed response shouldn't flip the gate
# on and silently hide every real tool from the server.
if isinstance(parsed, list) and all(isinstance(n, str) for n in parsed):
verified_names = set(parsed)
manifest_present = True
except Exception:
# Server advertised the sentinel but errored when called
# — treat as no manifest; fall back to third-party bypass.
pass
tool_provider_map: dict[str, str] = {}
live_providers: set[str] = set()
@@ -937,6 +1010,7 @@ class ToolRegistry:
"required": required,
},
produces_image=bool(_IMAGE_TOOL_NAME_RE.search(mcp_tool.name or "")),
concurrency_safe=mcp_tool.name in self.CONCURRENCY_SAFE_TOOLS,
)
return tool
@@ -1111,6 +1185,33 @@ class ToolRegistry:
self._mcp_clients.clear()
self._mcp_client_servers.clear()
self._mcp_managed_clients.clear()
self._mcp_tool_clients.clear()
def kill_mcp_for_tool(self, tool_name: str) -> bool:
"""Force-disconnect the MCP client that owns *tool_name*.
Called from the timeout handler in ``execute_tool`` when a tool
call hangs. Plain ``asyncio.wait_for`` cancellation cannot stop
a sync executor running inside a thread pool (and therefore
cannot stop the MCP subprocess), so we reach through to the
client here and tear it down. The next ``call_tool`` triggers
an automatic reconnect.
Returns True if a client was found and disconnect was attempted.
"""
client = self._mcp_tool_clients.get(tool_name)
if client is None:
return False
try:
logger.warning(
"Force-disconnecting MCP client for hung tool '%s' on server '%s'",
tool_name,
getattr(client.config, "name", "?"),
)
client.disconnect()
except Exception as exc:
logger.warning("Error force-disconnecting MCP client for '%s': %s", tool_name, exc)
return True
def __del__(self):
"""Destructor to ensure cleanup."""